1. Flumeディレクトリ構成
1.1 インストールパス情報
[admin@test01 flume-mysql-sync]$ pwd
/opt/flume-mysql-sync
1.2 必要なJARファイルをlibディレクトリへ配置
sql-source-connector-2.1.jar
mysql-driver-8.0.28.jar
2. パッケージングと展開操作
tar czvf flume-package.tar.gz flume-mysql-sync/
3. Kafkaトピックの作成と確認
# トピック生成
/kafka_2.13-2.8.0/bin/kafka-topics.sh --create --zookeeper 172.16.100.50:2181 --replication-factor 3 --partitions 12 --topic mysql-data-stream
# トピック確認
/kafka_2.13-2.8.0/bin/kafka-console-consumer.sh --bootstrap-server 172.16.100.48:9092 --topic mysql-data-stream --from-beginning
4. Flumeエージェント起動
[admin@test01 flume-mysql-sync]$ ./bin/flume-ng agent --conf ./config --conf-file ./config/mysql-historical.conf --name dataAgent -Dflume.root.logger=WARN,console
5. Flume設定ファイル詳細
dataAgent.sources=dbExtractor
dataAgent.channels=memBuffer
dataAgent.sinks=kafkaOutput
# ソース定義
dataAgent.sources.dbExtractor.type = org.keedio.flume.source.SQLSource
dataAgent.sources.dbExtractor.hibernate.connection.url = jdbc:mysql://localhost:3306/historical_db
# 接続パラメータ設定
dataAgent.sources.dbExtractor.hibernate.connection.username = sync_user
dataAgent.sources.dbExtractor.hibernate.connection.password = secure_password
dataAgent.sources.dbExtractor.hibernate.connection.autocommit = false
dataAgent.sources.dbExtractor.hibernate.dialect = org.hibernate.dialect.MySQL8Dialect
dataAgent.sources.dbExtractor.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
# 同期対象テーブル指定
# dataAgent.sources.dbExtractor.table = historical_records
# 取得カラム指定(デフォルトは全カラム)
# dataAgent.sources.dbExtractor.columns.to.select = id,name,timestamp,data
# クエリ実行間隔(ミリ秒)
dataAgent.sources.dbExtractor.run.query.delay=300000
# ステータス管理ファイル設定
dataAgent.sources.dbExtractor.status.file.path = /opt/flume-status
dataAgent.sources.dbExtractor.status.file.name = db_extractor_status
# カスタムクエリ定義
dataAgent.sources.dbExtractor.start.from = 1
ORDER BY record_id ASC
# $@$プレースホルダにより重複排除を実現
dataAgent.sources.dbExtractor.custom.query = SELECT * FROM records_table WHERE record_id > $@$
# dataAgent.sources.dbExtractor.order.by = record_id
# バッチ処理パラメータ
dataAgent.sources.dbExtractor.batch.size = 2000
dataAgent.sources.dbExtractor.max.rows = 50000
dataAgent.sources.dbExtractor.delimiter.entry = <SEP>
# コネクションプール設定
dataAgent.sources.dbExtractor.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
dataAgent.sources.dbExtractor.hibernate.c3p0.min_size=2
dataAgent.sources.dbExtractor.hibernate.c3p0.max_size=8
# チャネル定義
dataAgent.channels.memBuffer.type=memory
dataAgent.channels.memBuffer.capacity=200000
dataAgent.channels.memBuffer.transactionCapacity=50000
dataAgent.channels.memBuffer.keep-alive=5
# シンク定義
dataAgent.sinks.kafkaOutput.type = org.apache.flume.sink.kafka.KafkaSink
dataAgent.sinks.kafkaOutput.kafka.topic = mysql-data-stream
dataAgent.sinks.kafkaOutput.kafka.bootstrap.servers = server1.domain:9092,server2.domain:9092,server3.domain:9092
dataAgent.sinks.kafkaOutput.kafka.producer.acks = all
dataAgent.sinks.kafkaOutput.kafka.producer.linger.ms = 5
dataAgent.sinks.kafkaOutput.kafka.flumeBatchSize = 500
# チャネル接続設定
dataAgent.sources.dbExtractor.channels = memBuffer
dataAgent.sinks.kafkaOutput.channel = memBuffer