Flink と Kafka のオフセット管理方法

Flink と Kafka を連携する際、オフセット管理は重要な課題です。

自動管理モードでは、以下のような問題が発生します:

  1. プロセス途中での停止によりデータが失われることがあります
  2. 再起動時、同じデータが再び処理される可能性があります

これらの問題を解決するためには、Kafka のオフセットを手動で管理し、Flinkのチェックポイントとオフセットを同期させる必要があります。

  1. オフセットとチェックポイントを同期させる方法
// Kafka データストリームの作成
val properties = new Properties()
properties.setProperty("bootstrap.servers", GlobalConfigUtils.getBootstrapServer)
properties.setProperty("zookeeper.connect", GlobalConfigUtils.getZookeeper)
properties.setProperty("group.id", GlobalConfigUtils.getConsumerGroup)
properties.setProperty("enable.auto.commit", "true")
properties.setProperty("auto.commit.interval.ms", "5000")
properties.setProperty("auto.offset.reset", "latest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

val kafkaConsumer = new FlinkKafkaConsumer09[String](
  GlobalConfigUtils.getInputTopic,
  new SimpleStringSchema(),
  properties
)

// チェックポイント完了後にオフセットを保存します
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
kafkaConsumer.setStartFromLatest()

val dataStream: DataStream[String] = env.addSource(kafkaConsumer)
  1. 手動オフセット管理の実装方法
object KafkaOffsetManager {
  private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = null
  private var config: Properties = null

  // 初期設定
  private def initialize(): Properties = {
    config = new Properties()
    config.setProperty("bootstrap.servers", GlobalConfigUtils.getBootstrapServer)
    config.setProperty("zookeeper.connect", GlobalConfigUtils.getZookeeper)
    config.setProperty("group.id", GlobalConfigUtils.getConsumerGroup)
    config.setProperty("enable.auto.commit", "true")
    config.setProperty("auto.commit.interval.ms", "5000")
    config.setProperty("auto.offset.reset", "latest")
    config
  }

  // Zookeeper クライアントを作成
  private def createZkClient(): ZkUtils = {
    val client = new ZkClient("hadoop01:2181")
    ZkUtils.apply(client, false)
  }

  // オフセットを取得
  def getCommittedOffset(topic: String, partition: Int): Long = {
    initialize()
    createZkClient()
    
    val tp = new TopicPartition(topic, partition)
    val offset = consumer.committed(tp).offset()
    println(s"トピック: $topic, パーティション: $partition, オフセット: $offset")
    
    if (offset != null) {
      offset
    } else {
      0L
    }
  }

  // オフセットを設定
  def setCommittedOffset(topic: String, partition: Int, offset: Long): Unit = {
    initialize()
    createZkClient()
    
    val tp = new TopicPartition(topic, partition)
    val metadata = new OffsetAndMetadata(offset)
    val map = new util.HashMap[TopicPartition, OffsetAndMetadata]()
    map.put(tp, metadata)
    
    consumer.commitSync(map)
  }

  // リソース解放
  def close(): Unit = {
    if (consumer != null) {
      consumer.close()
    }
  }
}

タグ: Flink Kafka Checkpoint Offset Management

6月21日 22:25 投稿