MySQL Binlog イベントのサブスクリプションと消費位置管理

mysql-binlog-connector-java は、MySQL のバイナリログ(binlog)をリアルタイムで読み取り・解析するための Java ライブラリです。これにより、INSERT・UPDATE・DELETE といったデータ変更イベントを即座に検知できます。

基本設定と接続

使用前に以下の条件を満たす必要があります:

  • MySQL サーバーで binlog が有効であること(SHOW VARIABLES LIKE 'log_bin'; で確認)
  • REPLICATION SLAVE 権限を持つユーザーを使用
  • Maven 依存関係の追加:
    <dependency>
        <groupId>com.zendesk</groupId>
        <artifactId>mysql-binlog-connector-java</artifactId>
        <version>0.29.2</version>
    </dependency>

イベントリスナーの実装

BinaryLogClient を初期化し、イベントハンドラを登録します:

BinaryLogClient client = new BinaryLogClient("localhost", 3306, "replica_user", "secret");

client.registerEventListener(event -> {
    EventData data = event.getData();
    if (data instanceof WriteRowsEventData) {
        System.out.println("Inserted rows: " + ((WriteRowsEventData) data).getRows());
    } else if (data instanceof UpdateRowsEventData) {
        System.out.println("Updated rows: " + ((UpdateRowsEventData) data).getRows());
    } else if (data instanceof DeleteRowsEventData) {
        System.out.println("Deleted rows: " + ((DeleteRowsEventData) data).getRows());
    }
});

client.connect();

消費位置の永続化

アプリケーション再起動時に前回の処理位置から再開するため、binlog のファイル名とオフセット(または GTID)を永続化する必要があります。

ファイルベースの位置保存

private static final String POS_FILE = "checkpoint.pos";

static void loadCheckpoint(BinaryLogClient client) {
    try (var reader = Files.newBufferedReader(Paths.get(POS_FILE))) {
        String file = reader.readLine();
        long pos = Long.parseLong(reader.readLine());
        client.setBinlogFilename(file);
        client.setBinlogPosition(pos);
    } catch (IOException ignored) {}
}

static void saveCheckpoint(String filename, long position) {
    try (var writer = Files.newBufferedWriter(Paths.get(POS_FILE))) {
        writer.write(filename + "\n" + position);
    } catch (IOException e) {
        // エラーを記録しつつ継続
    }
}

GTID モードでの位置管理(推奨)

MySQL が GTID モードで動作している場合、より堅牢な位置追跡が可能です:

client.setGtidMode(true);
String lastGtidSet = readLastGtidFromFile();
if (lastGtidSet != null) {
    client.setGtidSet(lastGtidSet);
}

client.registerEventListener(event -> {
    if (event.getHeader().getEventType() == EventType.GTID) {
        GtidLogEvent gtidEv = (GtidLogEvent) event.getData();
        persistGtid(gtidEv.getGTID().toString());
    }
});

運用上のベストプラクティス

  • 位置情報はイベントごとではなく、バッチ単位または定期的に保存
  • 保存失敗時は例外を投げず、ロギングして継続
  • GTID が利用可能なら、ファイル位置よりも優先して使用
  • 複数スレッドでイベント処理を行う場合は、位置保存処理を同期化
  • 起動時に指定位置の有効性を検証(例:binlog ファイルの存在確認)

タグ: MySQL binlog Java CDC GTID

5月19日 02:11 投稿