RocketMQの拡張ポイントを用いたトレースIDの実践

業務処理中にログを出力し、重要情報を記録することは一般的です。問題の追跡やトラブルシューティングを円滑に行うためには、関連するログを一貫したトレースIDで結びつけることが理想的です。この要件を満たすためには、2つの方法があります。

  • Spring AOPを用いたインターセプト
  • RocketMQの拡張ポイントを活用したカスタマイズ

Spring AOPの実装方法は比較的シンプルで、インターネット上にも多数の事例が公開されていますので、本稿ではRocketMQの拡張ポイントを活用した方法に焦点を当てます。

RocketMQの拡張ポイント仕組みを理解するためには、内部実装を確認することが重要です。RocketMQは、メッセージの送信、消費、フィルタリングなどの主要な処理段階で拡張性を考慮し、プラグイン可能なインターセプトポイントを提供しています。この仕組みは、SPI(Service Provider Interface)メカニズムとプラグインアーキテクチャを組み合わせています。

メッセージ送信時の実装例を以下に示します。


@Override
public SendResult send(Message message) {
    this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
    DefaultInvocationContext invocationContext = new DefaultInvocationContext();
    invocationContext.setMessages(Collections.singletonList(message));
    final List<Runnable> postHandleStack = new ArrayList<>();
    
    boolean proceed = preHandle(serviceLoader, invocationContext, postHandleStack);
    try {
        if (proceed) {
            com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);
            try {
                com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ);
                message.setMsgID(sendResultRMQ.getMsgId());
                SendResult sendResult = new SendResult();
                sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic());
                sendResult.setMessageId(sendResultRMQ.getMsgId());
                invocationContext.setSendResult(sendResult);
                return sendResult;
            } catch (Exception e) {
                LOGGER.error(String.format("メッセージ送信エラー: %s", message), e);
                ONSClientException exception = checkProducerException(message.getTopic(), message.getMsgID(), e);
                invocationContext.setException(exception);
                throw exception;
            }
        }
    } finally {
        executePostHandle(postHandleStack);
    }
    throw new ONSClientException("ProducerInterceptorがメッセージ送信をキャンセルしました");
}

/**
 * 拡張ポイントの実行を管理するメソッド
 */
protected boolean preHandle(ServiceLoader serviceLoader, final InvocationContext context,
        final List<Runnable> postHandlers) {
    if (serviceLoader != null) {
        for (Object item : serviceLoader) {
            if (item instanceof Interceptor) {
                final Interceptor interceptor = (Interceptor) item;
                Runnable postTask = () -> {
                    try {
                        interceptor.postHandle(context, ONSClientAbstract.this);
                    } catch (Exception e) {
                        LOGGER.error("拡張インターセプターの後処理でエラー発生", e);
                    }
                };
                postHandlers.add(postTask);

                try {
                    if (!interceptor.preHandle(context, this)) {
                        return false;
                    }
                } catch (Exception e) {
                    LOGGER.error("拡張インターセプターの前処理でエラー発生", e);
                }
            }
        }
    }
    return true;
}

// ServiceLoaderの初期化
private ServiceLoader<MessageHandlerInterceptor> serviceLoader;
public ProducerImpl(final Properties props) {
    ...
    serviceLoader = ServiceLoader.load(MessageHandlerInterceptor.class);
}

次に、具体的なインターセプター実装例を示します。


// メッセージ送信インターセプター
package com.example.rocketmq;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.spi.InvocationContext;
import com.aliyun.openservices.ons.api.spi.MessageHandlerInterceptor;
import com.example.util.TraceUtil;
import org.slf4j.MDC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;
import java.util.Optional;

@Slf4j
public class RequestIdProducerInterceptor implements MessageHandlerInterceptor {
    @Override
    public boolean preHandle(InvocationContext context, Object instance) throws Exception {
        List<Message> messages = context.getMessages().get();
        if (messages == null || messages.isEmpty()) {
            return true;
        }
        
        Message message = messages.get(0);
        Properties props = message.getUserProperties();
        if (props == null) {
            props = new Properties();
        }
        
        if (props.getProperty("requestId") != null) {
            return true;
        }
        
        String requestId = Optional.ofNullable(MDC.get("requestId")).orElse(TraceUtil.generateRequestId());
        props.setProperty("requestId", requestId);
        message.setUserProperties(props);
        
        log.info("トレースID: {}", requestId);
        return true;
    }

    @Override
    public void postHandle(InvocationContext context, Object instance) throws Exception {
        // 後処理は必要に応じて実装
    }
}

// メッセージ受信インターセプター
package com.example.rocketmq;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.spi.InvocationContext;
import com.aliyun.openservices.ons.api.spi.MessageHandlerInterceptor;
import com.example.util.TraceUtil;
import org.slf4j.MDC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;
import java.util.Optional;

@Slf4j
public class RequestIdConsumerInterceptor implements MessageHandlerInterceptor {
    @Override
    public boolean preHandle(InvocationContext context, Object instance) throws Exception {
        List<Message> messages = context.getMessages().get();
        if (messages == null || messages.isEmpty()) {
            return true;
        }
        
        Message message = messages.get(0);
        Properties props = message.getUserProperties();
        if (props == null) {
            return true;
        }
        
        String requestId = props.getProperty("requestId");
        if (requestId != null) {
            MDC.put("requestId", requestId);
            log.info("トレースID: {}", requestId);
        }
        return true;
    }

    @Override
    public void postHandle(InvocationContext context, Object instance) throws Exception {
        // 後処理は必要に応じて実装
    }
}

インターセプターを有効にするには、以下の設定ファイルを作成します。

com.example.rocketmq.RequestIdProducerInterceptor
com.example.rocketmq.RequestIdConsumerInterceptor

このファイルは、Javaのサービスプロバイダメカニズムが認識するパスであるclasspath:META-INF/servicesディレクトリ内に配置します。

最後に、実装を動作確認します。ログ出力にトレースIDが含まれているかを確認することで、インターセプターの動作を検証します。

タグ: RocketMQ Interceptor _SPIメカニズム ログ管理

6月22日 17:11 投稿