mysql-binlog-connector-java は、MySQL のバイナリログ(binlog)をリアルタイムで読み取り・解析するための Java ライブラリです。これにより、INSERT・UPDATE・DELETE といったデータ変更イベントを即座に検知できます。
基本設定と接続
使用前に以下の条件を満たす必要があります:
- MySQL サーバーで binlog が有効であること(
SHOW VARIABLES LIKE 'log_bin';で確認) - REPLICATION SLAVE 権限を持つユーザーを使用
- Maven 依存関係の追加:
<dependency> <groupId>com.zendesk</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.29.2</version> </dependency>
イベントリスナーの実装
BinaryLogClient を初期化し、イベントハンドラを登録します:
BinaryLogClient client = new BinaryLogClient("localhost", 3306, "replica_user", "secret");
client.registerEventListener(event -> {
EventData data = event.getData();
if (data instanceof WriteRowsEventData) {
System.out.println("Inserted rows: " + ((WriteRowsEventData) data).getRows());
} else if (data instanceof UpdateRowsEventData) {
System.out.println("Updated rows: " + ((UpdateRowsEventData) data).getRows());
} else if (data instanceof DeleteRowsEventData) {
System.out.println("Deleted rows: " + ((DeleteRowsEventData) data).getRows());
}
});
client.connect();
消費位置の永続化
アプリケーション再起動時に前回の処理位置から再開するため、binlog のファイル名とオフセット(または GTID)を永続化する必要があります。
ファイルベースの位置保存
private static final String POS_FILE = "checkpoint.pos";
static void loadCheckpoint(BinaryLogClient client) {
try (var reader = Files.newBufferedReader(Paths.get(POS_FILE))) {
String file = reader.readLine();
long pos = Long.parseLong(reader.readLine());
client.setBinlogFilename(file);
client.setBinlogPosition(pos);
} catch (IOException ignored) {}
}
static void saveCheckpoint(String filename, long position) {
try (var writer = Files.newBufferedWriter(Paths.get(POS_FILE))) {
writer.write(filename + "\n" + position);
} catch (IOException e) {
// エラーを記録しつつ継続
}
}
GTID モードでの位置管理(推奨)
MySQL が GTID モードで動作している場合、より堅牢な位置追跡が可能です:
client.setGtidMode(true);
String lastGtidSet = readLastGtidFromFile();
if (lastGtidSet != null) {
client.setGtidSet(lastGtidSet);
}
client.registerEventListener(event -> {
if (event.getHeader().getEventType() == EventType.GTID) {
GtidLogEvent gtidEv = (GtidLogEvent) event.getData();
persistGtid(gtidEv.getGTID().toString());
}
});
運用上のベストプラクティス
- 位置情報はイベントごとではなく、バッチ単位または定期的に保存
- 保存失敗時は例外を投げず、ロギングして継続
- GTID が利用可能なら、ファイル位置よりも優先して使用
- 複数スレッドでイベント処理を行う場合は、位置保存処理を同期化
- 起動時に指定位置の有効性を検証(例:binlog ファイルの存在確認)