ActiveMQの主要APIとメッセージング機能の活用

ActiveMQは、強力なメッセージング機能を提供するオープンソースのメッセージブローカーです。ここでは、JMS (Java Message Service) 標準に準拠したActiveMQの主要APIと、その上で利用できる高度なメッセージング機能について解説します。

JMSセッション管理とメッセージ処理

トランザクション管理

JMSでは、メッセージの送受信操作をアトミックに実行するためにトランザクションを利用できます。これにより、複数の操作がすべて成功するか、すべて失敗するかのいずれかになり、メッセージングの信頼性が向上します。


// トランザクションをコミットし、セッション内の未処理のメッセージ操作を確定
currentSession.commit();

// トランザクションをロールバックし、セッション内の未処理のメッセージ操作を取り消し
currentSession.rollback();

メッセージのクリア(Purge)

特定の宛先(キューやトピック)に蓄積されたメッセージを一括で削除する機能です。これは、テスト環境のリセットや、不要なメッセージが大量に滞留している場合の緊急対応などに利用されます。


// 例: 特定のキューから全てのメッセージをクリア(ActiveMQ管理APIの概念的な表現)
// ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Queue advisoryQueue = session.createQueue("ActiveMQ.Advisory.Queue"); // 管理目的のアドバイザリキュー
// MessageProducer producer = session.createProducer(advisoryQueue);
// Message purgeCommand = session.createMessage();
// purgeCommand.setStringProperty("command", "purge");
// purgeCommand.setStringProperty("destination", "myTargetQueue"); // クリアしたいキュー名
// producer.send(purgeCommand);
// session.close();
// connection.close();

注: ActiveMQの管理APIやJMX経由で実行することが一般的です。上記は概念的なコードであり、実際の運用ではより堅牢な方法が推奨されます。

メッセージの確認応答モード(Acknowledgement Modes)

メッセージがコンシューマによって正常に処理されたことをブローカーに通知する方法を定義します。JMSではいくつかの確認応答モードが提供されており、信頼性とパフォーマンスのバランスを考慮して選択します。

  • Session.AUTO_ACKNOWLEDGE: 最もシンプルなモードです。コンシューマがメッセージ処理ロジックから正常に復帰した際に、セッションが自動的に確認応答を行います。メッセージ処理中に予期せぬエラーが発生した場合でも、そのメッセージは再配信されない可能性があります。
  • Session.CLIENT_ACKNOWLEDGE: コンシューマがメッセージ(Message)のacknowledge()メソッドを明示的に呼び出すことで確認応答を行います。このモードでは、単一のacknowledge()呼び出しで、そのセッションで受け取ったすべての未確認メッセージが確認応答されます。これにより、開発者が確認応答のタイミングを制御できるため、より詳細なエラーハンドリングが可能です。
  • Session.DUPS_OK_ACKNOWLEDGE: メッセージの重複を許容する代わりに、確認応答のオーバーヘッドを削減するモードです。コンシューマがメッセージの重複を冪等に処理できる(複数回処理されても問題ない)場合に、パフォーマンスを向上させる目的で使用されます。

メッセージの特性設定

メッセージの永続性(Persistence)

ActiveMQは、デフォルトでメッセージを永続化します。これにより、ブローカーが予期せず停止した場合でも、再起動後にメッセージが失われることなく配信を継続できます。しかし、一部のアプリケーションでは、高いスループットを優先し、メッセージの永続化を無効にすることがあります。


import javax.jms.DeliveryMode; // JMS標準のDeliveryModeを使用

// メッセージを非永続として送信するようにプロデューサーを設定
someProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

メッセージの優先度(Priority)

メッセージには0から9までの優先度(9が最高優先度)を設定でき、ブローカーは優先度の高いメッセージから順にコンシューマに配信しようとします。ただし、コンシューマのロードバランスやネットワーク遅延、ブローカーの負荷状況により、必ずしも厳密な優先度順で配信されるとは限りません。


// 優先度7を設定してメッセージを送信する例
messageProducer.setPriority(7);

ブローカー側で特定のキューに対して優先度処理を有効にするには、activemq.xml<policyEntry>要素に設定を追加します。


<policyEntry queue="myPriorityQueue" prioritizedMessages="true" />

メッセージの有効期限(Time-to-Live)とデッドレターキュー(DLQ)

メッセージに有効期限(TTL: Time-to-Live)を設定することで、指定された期間内にコンシューマによって消費されなかったメッセージは期限切れとなり、通常はデッドレターキュー (DLQ: Dead Letter Queue) に転送されます。


// メッセージの有効期限を30秒に設定
messageProducer.setTimeToLive(30_000); // ミリ秒単位

デッドレターキュー (DLQ)

ActiveMQでは、デフォルトで期限切れメッセージや処理に失敗したメッセージはActiveMQ.DLQという特別なキューに送られます。このキュー内のメッセージは自動的に削除されないため、放置するとメッセージが蓄積し、ストレージ容量を圧迫するリスクがあります。DLQのメッセージは通常、管理者が手動で調査・処理するか、別のプロセスで再送信を試みる必要があります。

DLQの動作設定

activemq.xml<deadLetterStrategy>要素を使用して、DLQの挙動をカスタマイズできます。

  • DLQの命名規則変更:

    individualDeadLetterStrategyを使用すると、各宛先ごとに異なるDLQをプレフィックス付きで作成できます。

    
    <policyEntry queue="myQueue.with.customDLQ" prioritizedMessages="true">
        <deadLetterStrategy>
            <individualDeadLetterStrategy queuePrefix="CUSTOM.DLQ." useQueueForQueueMessages="true" />
        </deadLetterStrategy>
    </policyEntry>
    
  • 非永続メッセージのDLQへの転送:

    デフォルトでは、非永続メッセージは期限切れになってもDLQに転送されません。これを有効にするにはprocessNonPersistent="true"を設定します。

    
    <individualDeadLetterStrategy queuePrefix="CUSTOM.DLQ." useQueueForQueueMessages="true" processNonPersistent="true" />
    
  • 期限切れメッセージのDLQへの転送停止:

    期限切れになったメッセージをDLQに転送せず、単に破棄したい場合は、processExpired="false"を設定します。

    
    <individualDeadLetterStrategy processExpired="false" />
    

排他コンシューマ(Exclusive Consumers)

特定のキューに対して、一度にメッセージを受信できるアクティブなコンシューマを一つに限定する機能です。これにより、メッセージの厳密な順序性を保証したり、特定の処理が重複して実行されるのを避けたりする場合に有用です。


import javax.jms.Queue;

// 排他コンシューマを設定するキューの作成
Queue exclusiveQueue = session.createQueue("uniqueProcessingQueue?consumer.exclusive=true");

// 排他コンシューマに優先度も設定(複数の排他コンシューマが存在する場合に競合を解決)
Queue prioritizedExclusiveQueue = session.createQueue("importantExclusiveQueue?consumer.exclusive=true&consumer.priority=7");

JMSメッセージタイプ

JMSは、異なる種類のデータを効率的に送信するために複数のメッセージタイプをサポートしています。

ObjectMessage

Javaのシリアライズ可能なオブジェクトをメッセージとして送信します。カスタムのJavaオブジェクトをブローカー経由でやり取りするのに適しています。

送信側


import javax.jms.*;
import java.io.Serializable; // オブジェクトはSerializableを実装する必要がある

// シリアライズ可能なデータクラスの例
class UserData implements Serializable {
    private static final long serialVersionUID = 1L; // シリアライズバージョンID
    private String userId;
    private String userName;
    private int age;

    public UserData(String userId, String userName, int age) {
        this.userId = userId;
        this.userName = userName;
        this.age = age;
    }
    // ゲッターメソッド
    public String getUserId() { return userId; }
    public String getUserName() { return userName; }
    public int getAge() { return age; }

    @Override
    public String toString() {
        return "UserData{userId='" + userId + "', userName='" + userName + "', age=" + age + '}';
    }
}

// ... JMSセッションの作成後
UserData user = new UserData("U001", "田中", 30);
ObjectMessage objMessage = session.createObjectMessage(user);
producer.send(objMessage);

受信側


import javax.jms.*;
import org.apache.activemq.command.ActiveMQObjectMessage; // ActiveMQ固有のクラスを使用することが一般的

// ... メッセージ受信後
if (receivedMessage instanceof ActiveMQObjectMessage) {
    ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) receivedMessage;
    try {
        UserData receivedUser = (UserData) objMsg.getObject();
        System.out.println("受信したユーザーデータ: " + receivedUser);
        System.out.println("ユーザーID: " + receivedUser.getUserId());
    } catch (JMSException e) {
        System.err.println("ObjectMessageのデシリアライズに失敗しました: " + e.getMessage());
        e.printStackTrace();
    }
}

信頼されたパッケージの設定

ObjectMessageを使用する際に、ClassNotFoundException: Forbidden class ... This class is not trusted to be serializedのようなエラーが発生することがあります。これは、ActiveMQがセキュリティ上の理由から、未知のクラスのデシリアライズを制限しているためです。

この問題を解決するには、ActiveMQConnectionFactoryに信頼するパッケージのリストを設定する必要があります。


import org.apache.activemq.ActiveMQConnectionFactory;
import java.util.Arrays;
import java.util.ArrayList;

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

// UserDataクラスが属するパッケージを信頼済みとして設定
connectionFactory.setTrustedPackages(
    new ArrayList<>(Arrays.asList(UserData.class.getPackage().getName()))
);

// または、複数のパッケージを信頼する場合
// connectionFactory.setTrustedPackages(new ArrayList<>(Arrays.asList(
//     "java.lang", "java.util", "com.yourcompany.app.data" // 例: 必要なパッケージを追加
// )));

BytesMessage

バイナリデータ(バイト配列)をメッセージとして送信します。ファイル転送や、カスタムプロトコル、あるいはメッセージとしてフォーマットされていない生のデータを扱う際に適しています。

送信側


import javax.jms.BytesMessage;
import java.nio.charset.StandardCharsets;

// ... セッション作成後
BytesMessage binaryMessage = session.createBytesMessage();
binaryMessage.writeBytes("これはテストのバイナリデータです。".getBytes(StandardCharsets.UTF_8));
binaryMessage.writeUTF("UTFデータも追加可能です。"); // UTF文字列も書き込める
producer.send(binaryMessage);

受信側


import javax.jms.BytesMessage;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

// ... メッセージ受信後
if (receivedMessage instanceof BytesMessage) {
    BytesMessage bm = (BytesMessage) receivedMessage;
    
    // バイトデータを読み取る
    byte[] buffer = new byte[256]; // 読み込みバッファ
    int bytesRead;
    StringBuilder sb = new StringBuilder();
    try {
        while ((bytesRead = bm.readBytes(buffer)) != -1) {
            sb.append(new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
        }
        System.out.println("受信した最初のバイトデータ: " + sb.toString());
        
        // メッセージポインタをリセットし、次のデータを読み込む(書き込み順序と型に注意)
        bm.reset(); // 再度メッセージの先頭から読み込みたい場合
        // bm.readBytes(buffer); // 最初のバイトデータをスキップ
        System.out.println("受信したUTF文字列: " + bm.readUTF()); // 2番目に書き込まれたUTF文字列
        
    } catch (JMSException | IOException e) {
        System.err.println("BytesMessageの処理中にエラー: " + e.getMessage());
        e.printStackTrace();
    }
}

注: BytesMessageには、readBoolean()readInt()readDouble()など、様々なプリミティブ型を読み書きするメソッドが用意されていますが、書き込んだ順序と型で正確に読み込む必要があります。

ファイルへの書き込み

BytesMessageの内容を直接ファイルに保存する例です。


import javax.jms.BytesMessage;
import java.io.FileOutputStream;
import java.io.IOException;
import javax.jms.JMSException;

// ... Received BytesMessage 'bm'
try (FileOutputStream fos = new FileOutputStream("received_binary_output.dat")) {
    byte[] fileWriteBuffer = new byte[4096]; // 読み書き用バッファ
    int bytesCount;
    bm.reset(); // 必要であればメッセージの読み込み位置をリセット
    while ((bytesCount = bm.readBytes(fileWriteBuffer)) != -1) {
        fos.write(fileWriteBuffer, 0, bytesCount);
    }
    System.out.println("メッセージ内容を 'received_binary_output.dat' に保存しました。");
} catch (IOException | JMSException e) {
    System.err.println("ファイルへの書き込み中にエラー: " + e.getMessage());
    e.printStackTrace();
}

MapMessage

名前と値のペアを格納する、マップのような構造のメッセージです。様々なデータ型の値をキー文字列に関連付けて送信できます。

送信側


import javax.jms.MapMessage;

// ... セッション作成後
MapMessage configMessage = session.createMapMessage();
configMessage.setString("systemName", "PaymentService");
configMessage.setBoolean("isDebugMode", false);
configMessage.setInt("threadPoolSize", 10);
configMessage.setDouble("version", 1.25);

producer.send(configMessage);

受信側


import javax.jms.MapMessage;

// ... メッセージ受信後
if (receivedMessage instanceof MapMessage) {
    MapMessage configMap = (MapMessage) receivedMessage;
    try {
        System.out.println("受信した設定情報:");
        System.out.println("システム名: " + configMap.getString("systemName"));
        System.out.println("デバッグモード: " + configMap.getBoolean("isDebugMode"));
        System.out.println("スレッドプールサイズ: " + configMap.getInt("threadPoolSize"));
        System.out.println("バージョン: " + configMap.getDouble("version"));
    } catch (JMSException e) {
        System.err.println("MapMessageの読み取り中にエラー: " + e.getMessage());
        e.printStackTrace();
    }
}

メッセージの高度な送信機能

同期送信と非同期送信

メッセージの送信方法は、プロデューサがブローカーからの確認応答を待つかどうかで同期と非同期に分けられます。

  • 同期送信: プロデューサはメッセージを送信した後、ブローカーからの確認応答(メッセージの受信や永続化の完了など)を待ちます。これにより信頼性が向上しますが、スループットは低下する傾向があります。
  • 非同期送信: プロデューサはメッセージを送信後、ブローカーからの確認応答を待たずに次のメッセージ送信に進みます。高いスループットを実現できますが、内部的なエラーハンドリングに注意が必要です(ただし、JMSプロバイダは通常、内部的なバッファリングや再送メカニズムで信頼性を維持しようとします)。

メッセージの永続性とトランザクションの有無によるデフォルトの送信モードは以下の通りです。

トランザクション有効トランザクション無効
永続メッセージ非同期同期
非永続メッセージ非同期非同期

ActiveMQで非同期送信を明示的に有効にするには、ActiveMQConnectionFactoryまたはActiveMQConnectionに設定します。


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQConnection;

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    "admin", "admin", "tcp://localhost:61616"
);

// 1. ConnectionFactoryに非同期送信を設定
connectionFactory.setUseAsyncSend(true);

ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
// 2. Connectionにも非同期送信を設定 (ConnectionFactoryの設定がConnectionレベルで上書きされる)
connection.setUseAsyncSend(true);

プロデューサーのフロー制御(Producer Window Size)

プロデューサーがブローカーにメッセージを高速で送信しすぎると、ブローカーのリソースを使い果たし、メッセージの溢れ(過剰な堆積)を引き起こす可能性があります。ActiveMQでは、Producer Window Sizeという設定を通じてプロデューサー側のフロー制御を行うことができます。

この設定は、プロデューサーがブローカーからの確認応答なしに送信できるメッセージの総バイト数を制限します。このサイズに達すると、ブローカーがメッセージを処理し、確認応答を返すまでプロデューサーはブロックされます。


// Connection URIで設定する例
// tcp://localhost:61616?jms.producerWindowSize=2097152 (2MB)

// Destination URIで設定する例
// myQueue?producer.windowSize=2097152 (2MB)

遅延・スケジュールメッセージ配信

ActiveMQは、メッセージの遅延配信や定期的なスケジュール配信をサポートしています。この機能を利用するには、ブローカーのactivemq.xmlでスケジューラ機能を有効にする必要があります。


<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
    
</broker>

指定時間後に遅延送信

メッセージを指定されたミリ秒数後に配信するように設定します。


import org.apache.activemq.ScheduledMessage; // ActiveMQ固有のScheduledMessageプロパティ
import javax.jms.Message;
import javax.jms.TextMessage; // TextMessageの例

TextMessage delayedMsg = session.createTextMessage("このメッセージは10秒後に配信されます。");
delayedMsg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10_000); // 10秒遅延

producer.send(delayedMsg);

間隔を伴う繰り返し送信

指定された遅延後、一定の間隔でメッセージを繰り返し送信するように設定できます。


import org.apache.activemq.ScheduledMessage;
import javax.jms.Message;
import javax.jms.TextMessage;

TextMessage recurringMsg = session.createTextMessage("繰り返し送信される情報");

long initialDelay = 8_000;  // 最初の配信までの遅延 (8秒)
long repeatPeriod = 5_000;  // 繰り返し間隔 (5秒)
int repeatCount = 3;        // 繰り返しの回数 (3回、初回配信後)

recurringMsg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, initialDelay);
recurringMsg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, repeatPeriod);
recurringMsg.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeatCount);

producer.send(recurringMsg);

Cron式による定期送信

より複雑なスケジュールは、標準的なCron式を使用して定義できます。Cron式は、秒、分、時、日(月)、月、日(週)、年(オプション)のフィールドで構成されます。

Cron式のフィールド:

  • 秒 (Seconds): 0-59
  • 分 (Minutes): 0-59
  • 時 (Hours): 0-23
  • 日 (Day of Month): 1-31
  • 月 (Month): 1-12 または JAN-DEC
  • 曜日 (Day of Week): 1-7 (1=日曜日) または SUN-SAT
  • 年 (Year): 1970-2099 (オプション)

主な特殊文字:

  • *: 全ての値を表す。例: 毎分
  • ?: 日(月)と曜日フィールドで、どちらか一方を使用しない場合に使う。
  • -: 範囲を表す。例: 9-17 (9時から17時まで)
  • /: 増分を表す。例: 0/15 (0分から15分ごとに)
  • ,: リストを表す。例: 10,20 (10日と20日)
  • L: 最後の日を表す。例: DayofMonthでは月の最終日、DayofWeekでは月の最終曜日。
  • W: 直近の稼働日を表す(DayofMonthのみ)。
  • #: 第N番目の曜日を指定(DayofWeekのみ)。例: 3#2 (月の第2水曜日)

Cron式の例:

  • 0 0 10 * * ?: 毎日午前10時にトリガー
  • 0 30 9-17 ? * MON-FRI: 月曜日から金曜日の午前9時30分から午後5時30分の間、30分ごとにトリガー
  • 0 0 12 ? * WED: 毎週水曜日の正午にトリガー
  • 0 15 10 L * ?: 毎月最終日の午前10時15分にトリガー

import org.apache.activemq.ScheduledMessage;
import javax.jms.Message;
import javax.jms.TextMessage;

TextMessage cronScheduledMsg = session.createTextMessage("Cron式でスケジュールされたメッセージ");

// 毎日午前9時30分に配信
String cronSchedule = "0 30 9 * * ?"; 
cronScheduledMsg.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronSchedule);

producer.send(cronScheduledMsg);

メッセージの受信とフィルタリング

メッセージリスナー(Message Listener)

非同期的にメッセージを受信するには、JMSのMessageListenerインターフェースを実装したクラスを使用します。コンシューマにリスナーを登録すると、メッセージが到着するたびにonMessageメソッドが呼び出され、イベント駆動型のメッセージ処理を実現します。


import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

// メッセージリスナーの実装例
public class AsyncMessageProcessor implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage textMsg = (TextMessage) message;
                System.out.println("リスナーがメッセージを受信: " + textMsg.getText());
                // ここでビジネスロジックを実行
            } catch (JMSException e) {
                System.err.println("メッセージ処理中にJMSエラーが発生: " + e.getMessage());
                e.printStackTrace();
            }
        } else {
            System.out.println("予期しないメッセージタイプを受信しました: " + message.getClass().getName());
        }
    }
}

// ... JMSコンシューマの作成後
MessageConsumer consumer = session.createConsumer(someQueue);
consumer.setMessageListener(new AsyncMessageProcessor());
System.out.println("メッセージリスナーがメッセージ受信待機中です...");
// アプリケーションは終了せず、メッセージを受信し続けます

メッセージセレクター(Message Selector)

JMSは、SQLのような構文でメッセージのプロパティに基づいてメッセージをフィルタリングするセレクター機能を提供します。これにより、特定の条件に合致するメッセージのみをコンシューマが受信し、不必要なメッセージを処理することなく効率的なルーティングが可能になります。

メッセージの送信(プロパティ設定)

セレクターで利用するために、メッセージにカスタムプロパティを設定します。これらのプロパティはメッセージヘッダーの一部として扱われます。


import javax.jms.MapMessage;
import javax.jms.MessageProducer;

// ... セッション作成後
// メッセージ1: priority=high, department=sales
MapMessage orderMsg1 = session.createMapMessage();
orderMsg1.setString("orderId", "ORD-001");
orderMsg1.setDouble("amount", 150.75);
orderMsg1.setStringProperty("priority", "high");
orderMsg1.setStringProperty("department", "sales");
producer.send(orderMsg1);

// メッセージ2: priority=normal, department=marketing
MapMessage orderMsg2 = session.createMapMessage();
orderMsg2.setString("orderId", "ORD-002");
orderMsg2.setDouble("amount", 50.00);
orderMsg2.setStringProperty("priority", "normal");
orderMsg2.setStringProperty("department", "marketing");
producer.send(orderMsg2);

// メッセージ3: priority=high, department=sales
MapMessage orderMsg3 = session.createMapMessage();
orderMsg3.setString("orderId", "ORD-003");
orderMsg3.setDouble("amount", 300.20);
orderMsg3.setStringProperty("priority", "high");
orderMsg3.setStringProperty("department", "sales");
producer.send(orderMsg3);

メッセージの受信(セレクター使用)

コンシューマ作成時にセレクター文字列を指定します。このセレクターに合致するメッセージのみがこのコンシューマに配信されます。


import javax.jms.MessageConsumer;
import javax.jms.Queue;

// ... セッション作成後
Queue orderProcessingQueue = session.createQueue("orderQueue");

// セレクター1: priorityが'high'のメッセージのみを受信するコンシューマ
String selectorHighPriority = "priority = 'high'";
MessageConsumer highPriorityConsumer = session.createConsumer(orderProcessingQueue, selectorHighPriority);
// このコンシューマはORD-001とORD-003のメッセージを受信する

// セレクター2: departmentが'sales'かつamountが100より大きいメッセージのみを受信するコンシューマ
String selectorSalesLargeOrders = "department = 'sales' AND amount > 100.0";
MessageConsumer salesLargeOrderConsumer = session.createConsumer(orderProcessingQueue, selectorSalesLargeOrders);
// このコンシューマはORD-001とORD-003のメッセージを受信する

// セレクター3: departmentが'marketing'のメッセージのみを受信するコンシューマ
String selectorMarketing = "department = 'marketing'";
MessageConsumer marketingConsumer = session.createConsumer(orderProcessingQueue, selectorMarketing);
// このコンシューマはORD-002のメッセージを受信する

タグ: ActiveMQ JMS メッセージキュー トランザクション メッセージ永続化

6月1日 01:06 投稿