業務処理中にログを出力し、重要情報を記録することは一般的です。問題の追跡やトラブルシューティングを円滑に行うためには、関連するログを一貫したトレースIDで結びつけることが理想的です。この要件を満たすためには、2つの方法があります。
- Spring AOPを用いたインターセプト
- RocketMQの拡張ポイントを活用したカスタマイズ
Spring AOPの実装方法は比較的シンプルで、インターネット上にも多数の事例が公開されていますので、本稿ではRocketMQの拡張ポイントを活用した方法に焦点を当てます。
RocketMQの拡張ポイント仕組みを理解するためには、内部実装を確認することが重要です。RocketMQは、メッセージの送信、消費、フィルタリングなどの主要な処理段階で拡張性を考慮し、プラグイン可能なインターセプトポイントを提供しています。この仕組みは、SPI(Service Provider Interface)メカニズムとプラグインアーキテクチャを組み合わせています。
メッセージ送信時の実装例を以下に示します。
@Override
public SendResult send(Message message) {
this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
DefaultInvocationContext invocationContext = new DefaultInvocationContext();
invocationContext.setMessages(Collections.singletonList(message));
final List<Runnable> postHandleStack = new ArrayList<>();
boolean proceed = preHandle(serviceLoader, invocationContext, postHandleStack);
try {
if (proceed) {
com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);
try {
com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ);
message.setMsgID(sendResultRMQ.getMsgId());
SendResult sendResult = new SendResult();
sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic());
sendResult.setMessageId(sendResultRMQ.getMsgId());
invocationContext.setSendResult(sendResult);
return sendResult;
} catch (Exception e) {
LOGGER.error(String.format("メッセージ送信エラー: %s", message), e);
ONSClientException exception = checkProducerException(message.getTopic(), message.getMsgID(), e);
invocationContext.setException(exception);
throw exception;
}
}
} finally {
executePostHandle(postHandleStack);
}
throw new ONSClientException("ProducerInterceptorがメッセージ送信をキャンセルしました");
}
/**
* 拡張ポイントの実行を管理するメソッド
*/
protected boolean preHandle(ServiceLoader serviceLoader, final InvocationContext context,
final List<Runnable> postHandlers) {
if (serviceLoader != null) {
for (Object item : serviceLoader) {
if (item instanceof Interceptor) {
final Interceptor interceptor = (Interceptor) item;
Runnable postTask = () -> {
try {
interceptor.postHandle(context, ONSClientAbstract.this);
} catch (Exception e) {
LOGGER.error("拡張インターセプターの後処理でエラー発生", e);
}
};
postHandlers.add(postTask);
try {
if (!interceptor.preHandle(context, this)) {
return false;
}
} catch (Exception e) {
LOGGER.error("拡張インターセプターの前処理でエラー発生", e);
}
}
}
}
return true;
}
// ServiceLoaderの初期化
private ServiceLoader<MessageHandlerInterceptor> serviceLoader;
public ProducerImpl(final Properties props) {
...
serviceLoader = ServiceLoader.load(MessageHandlerInterceptor.class);
}
次に、具体的なインターセプター実装例を示します。
// メッセージ送信インターセプター
package com.example.rocketmq;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.spi.InvocationContext;
import com.aliyun.openservices.ons.api.spi.MessageHandlerInterceptor;
import com.example.util.TraceUtil;
import org.slf4j.MDC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Properties;
import java.util.Optional;
@Slf4j
public class RequestIdProducerInterceptor implements MessageHandlerInterceptor {
@Override
public boolean preHandle(InvocationContext context, Object instance) throws Exception {
List<Message> messages = context.getMessages().get();
if (messages == null || messages.isEmpty()) {
return true;
}
Message message = messages.get(0);
Properties props = message.getUserProperties();
if (props == null) {
props = new Properties();
}
if (props.getProperty("requestId") != null) {
return true;
}
String requestId = Optional.ofNullable(MDC.get("requestId")).orElse(TraceUtil.generateRequestId());
props.setProperty("requestId", requestId);
message.setUserProperties(props);
log.info("トレースID: {}", requestId);
return true;
}
@Override
public void postHandle(InvocationContext context, Object instance) throws Exception {
// 後処理は必要に応じて実装
}
}
// メッセージ受信インターセプター
package com.example.rocketmq;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.spi.InvocationContext;
import com.aliyun.openservices.ons.api.spi.MessageHandlerInterceptor;
import com.example.util.TraceUtil;
import org.slf4j.MDC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Properties;
import java.util.Optional;
@Slf4j
public class RequestIdConsumerInterceptor implements MessageHandlerInterceptor {
@Override
public boolean preHandle(InvocationContext context, Object instance) throws Exception {
List<Message> messages = context.getMessages().get();
if (messages == null || messages.isEmpty()) {
return true;
}
Message message = messages.get(0);
Properties props = message.getUserProperties();
if (props == null) {
return true;
}
String requestId = props.getProperty("requestId");
if (requestId != null) {
MDC.put("requestId", requestId);
log.info("トレースID: {}", requestId);
}
return true;
}
@Override
public void postHandle(InvocationContext context, Object instance) throws Exception {
// 後処理は必要に応じて実装
}
}
インターセプターを有効にするには、以下の設定ファイルを作成します。
com.example.rocketmq.RequestIdProducerInterceptor
com.example.rocketmq.RequestIdConsumerInterceptor
このファイルは、Javaのサービスプロバイダメカニズムが認識するパスであるclasspath:META-INF/servicesディレクトリ内に配置します。
最後に、実装を動作確認します。ログ出力にトレースIDが含まれているかを確認することで、インターセプターの動作を検証します。