Flumeを使用したMySQLデータの履歴同期処理

1. Flumeディレクトリ構成

1.1 インストールパス情報

[admin@test01 flume-mysql-sync]$ pwd
/opt/flume-mysql-sync

1.2 必要なJARファイルをlibディレクトリへ配置

sql-source-connector-2.1.jar
mysql-driver-8.0.28.jar

2. パッケージングと展開操作

  • アーカイブ作成
tar czvf flume-package.tar.gz flume-mysql-sync/

3. Kafkaトピックの作成と確認

# トピック生成
/kafka_2.13-2.8.0/bin/kafka-topics.sh --create --zookeeper 172.16.100.50:2181 --replication-factor 3 --partitions 12 --topic mysql-data-stream

# トピック確認
/kafka_2.13-2.8.0/bin/kafka-console-consumer.sh --bootstrap-server 172.16.100.48:9092 --topic mysql-data-stream --from-beginning

4. Flumeエージェント起動

[admin@test01 flume-mysql-sync]$ ./bin/flume-ng agent --conf ./config --conf-file ./config/mysql-historical.conf --name dataAgent -Dflume.root.logger=WARN,console

5. Flume設定ファイル詳細

dataAgent.sources=dbExtractor
dataAgent.channels=memBuffer
dataAgent.sinks=kafkaOutput

# ソース定義
dataAgent.sources.dbExtractor.type = org.keedio.flume.source.SQLSource
dataAgent.sources.dbExtractor.hibernate.connection.url = jdbc:mysql://localhost:3306/historical_db

# 接続パラメータ設定
dataAgent.sources.dbExtractor.hibernate.connection.username = sync_user
dataAgent.sources.dbExtractor.hibernate.connection.password = secure_password
dataAgent.sources.dbExtractor.hibernate.connection.autocommit = false
dataAgent.sources.dbExtractor.hibernate.dialect = org.hibernate.dialect.MySQL8Dialect
dataAgent.sources.dbExtractor.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver

# 同期対象テーブル指定
# dataAgent.sources.dbExtractor.table = historical_records

# 取得カラム指定(デフォルトは全カラム)
# dataAgent.sources.dbExtractor.columns.to.select = id,name,timestamp,data

# クエリ実行間隔(ミリ秒)
dataAgent.sources.dbExtractor.run.query.delay=300000

# ステータス管理ファイル設定
dataAgent.sources.dbExtractor.status.file.path = /opt/flume-status
dataAgent.sources.dbExtractor.status.file.name = db_extractor_status

# カスタムクエリ定義
dataAgent.sources.dbExtractor.start.from = 1
ORDER BY record_id ASC
# $@$プレースホルダにより重複排除を実現
dataAgent.sources.dbExtractor.custom.query = SELECT * FROM records_table WHERE record_id > $@$
# dataAgent.sources.dbExtractor.order.by = record_id

# バッチ処理パラメータ
dataAgent.sources.dbExtractor.batch.size = 2000
dataAgent.sources.dbExtractor.max.rows = 50000
dataAgent.sources.dbExtractor.delimiter.entry = <SEP>

# コネクションプール設定
dataAgent.sources.dbExtractor.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
dataAgent.sources.dbExtractor.hibernate.c3p0.min_size=2
dataAgent.sources.dbExtractor.hibernate.c3p0.max_size=8

# チャネル定義
dataAgent.channels.memBuffer.type=memory
dataAgent.channels.memBuffer.capacity=200000
dataAgent.channels.memBuffer.transactionCapacity=50000
dataAgent.channels.memBuffer.keep-alive=5

# シンク定義
dataAgent.sinks.kafkaOutput.type = org.apache.flume.sink.kafka.KafkaSink
dataAgent.sinks.kafkaOutput.kafka.topic = mysql-data-stream
dataAgent.sinks.kafkaOutput.kafka.bootstrap.servers = server1.domain:9092,server2.domain:9092,server3.domain:9092
dataAgent.sinks.kafkaOutput.kafka.producer.acks = all
dataAgent.sinks.kafkaOutput.kafka.producer.linger.ms = 5
dataAgent.sinks.kafkaOutput.kafka.flumeBatchSize = 500

# チャネル接続設定
dataAgent.sources.dbExtractor.channels = memBuffer
dataAgent.sinks.kafkaOutput.channel = memBuffer

タグ: flume MySQL Kafka data-sync JDBC

6月28日 19:48 投稿