Flink と Kafka を連携する際、オフセット管理は重要な課題です。
自動管理モードでは、以下のような問題が発生します:
- プロセス途中での停止によりデータが失われることがあります
- 再起動時、同じデータが再び処理される可能性があります
これらの問題を解決するためには、Kafka のオフセットを手動で管理し、Flinkのチェックポイントとオフセットを同期させる必要があります。
- オフセットとチェックポイントを同期させる方法
// Kafka データストリームの作成
val properties = new Properties()
properties.setProperty("bootstrap.servers", GlobalConfigUtils.getBootstrapServer)
properties.setProperty("zookeeper.connect", GlobalConfigUtils.getZookeeper)
properties.setProperty("group.id", GlobalConfigUtils.getConsumerGroup)
properties.setProperty("enable.auto.commit", "true")
properties.setProperty("auto.commit.interval.ms", "5000")
properties.setProperty("auto.offset.reset", "latest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val kafkaConsumer = new FlinkKafkaConsumer09[String](
GlobalConfigUtils.getInputTopic,
new SimpleStringSchema(),
properties
)
// チェックポイント完了後にオフセットを保存します
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
kafkaConsumer.setStartFromLatest()
val dataStream: DataStream[String] = env.addSource(kafkaConsumer)
- 手動オフセット管理の実装方法
object KafkaOffsetManager {
private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = null
private var config: Properties = null
// 初期設定
private def initialize(): Properties = {
config = new Properties()
config.setProperty("bootstrap.servers", GlobalConfigUtils.getBootstrapServer)
config.setProperty("zookeeper.connect", GlobalConfigUtils.getZookeeper)
config.setProperty("group.id", GlobalConfigUtils.getConsumerGroup)
config.setProperty("enable.auto.commit", "true")
config.setProperty("auto.commit.interval.ms", "5000")
config.setProperty("auto.offset.reset", "latest")
config
}
// Zookeeper クライアントを作成
private def createZkClient(): ZkUtils = {
val client = new ZkClient("hadoop01:2181")
ZkUtils.apply(client, false)
}
// オフセットを取得
def getCommittedOffset(topic: String, partition: Int): Long = {
initialize()
createZkClient()
val tp = new TopicPartition(topic, partition)
val offset = consumer.committed(tp).offset()
println(s"トピック: $topic, パーティション: $partition, オフセット: $offset")
if (offset != null) {
offset
} else {
0L
}
}
// オフセットを設定
def setCommittedOffset(topic: String, partition: Int, offset: Long): Unit = {
initialize()
createZkClient()
val tp = new TopicPartition(topic, partition)
val metadata = new OffsetAndMetadata(offset)
val map = new util.HashMap[TopicPartition, OffsetAndMetadata]()
map.put(tp, metadata)
consumer.commitSync(map)
}
// リソース解放
def close(): Unit = {
if (consumer != null) {
consumer.close()
}
}
}