Apache Kafkaにおけるコンシューマの実装と管理

コンシューマ基礎知識

コンシューマとコンシューマグループ

  • 各コンシューマは特定のコンシューマグループに所属し、異なるグループ間は互いに影響しない
  • パーティションのメッセージは1つのグループ内で単一コンシューマによって消費されるが、再バランスにより新しいコンシューマに再割当される可能性がある
  • トピックの異なるパーティションはクライアントパラメータpartition.assignment.strategyに基づきコンシューマに分配される

Kafkaのメッセージ配信モード

メッセージブローカーにはP2PとPub/Subの2つの配信モードが存在し、Kafkaは両方をサポートする

  • 全コンシューマが同一グループの場合:メッセージは各コンシューマに均等に配信され(P2Pモード)、各メッセージは1つのコンシューマのみが処理
  • 全コンシューマが異なるグループの場合:メッセージは全コンシューマにブロードキャストされ(Pub/Subモード)、各メッセージは全コンシューマが処理

クライアント開発

コンシューマロジックの実装手順

  1. コンシューマパラメータ設定とインスタンス作成
  2. トピック購読
  3. メッセージ取得と消費
  4. オフセットコミット
  5. コンシューマインスタンス閉鎖
public class KafkaMessageProcessor {
  private static final String SERVER_ADDRESS = "localhost:9092";
  private static final String TOPIC_NAME = "TOPIC-X";
  private static final String GROUP_NAME = "GROUP-X";
  private static final AtomicBoolean ACTIVE = new AtomicBoolean(true);

  public static Properties setupConfig() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER_ADDRESS);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-processor-client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_NAME);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    return props;
  }

  public static void main(String[] args) {
    Properties config = setupConfig();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
    consumer.subscribe(Collections.singletonList(TOPIC_NAME));

    try {
      while (ACTIVE.get()) {
        ConsumerRecords<String, String> messages = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> msg : messages) {
          System.out.println("Topic:" + msg.topic() + ", Partition:" + msg.partition() + ", Offset:" + msg.offset());
          System.out.println("Key:" + msg.key() + ", Value:" + msg.value());
        }
      }
    } catch (WakeupException ignored) {
    } finally {
      consumer.close();
    }
  }
}

注意: KafkaConsumerはスレッドセーフではないため、wakeup()メソッドのみが他のスレッドから安全に呼び出せる

subscribeメソッドのオーバーロード

public void subscribe(Collection<String> topics, ConsumerRebalanceListener handler)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener handler)
public void subscribe(Pattern pattern)

assignによる特定パーティション購読

consumer.assign(Arrays.asList(new TopicPartition("topic-sample", 0)));

subscribeメソッドは自動再バランス機能を提供するが、assignメソッドではこの機能は利用不可

購読解除

consumer.unsubscribe();
consumer.subscribe(new ArrayList<>());
consumer.assign(new ArrayList<>());

メッセージ消費

コンシューマが取得するメッセージはConsumerRecord型で、ProducerRecordに対応

public class ConsumerRecord<K, V> {
  private final String topic;
  private final int partition;
  private final long offset;
  private final long timestamp;
  private final TimestampType timestampType;
  private final K key;
  private final V value;
  // その他のフィールドとメソッド
}

パーティション別メッセージ処理:

for (TopicPartition tp : records.partitions()) {
  for (ConsumerRecord<String, String> rec : records.records(tp)) {
    System.out.println(rec.partition() + ":" + rec.value());
  }
}

デシリアライゼーション

Kafkaが提供するデシリアライザ:

  • ByteBufferDeserializer, ByteArrayDeserializer, BytesDeserializer
  • DoubleDeserializer, FloatDeserializer, IntegerDeserializer, LongDeserializer, ShortDeserializer, StringDeserializer

カスタム実装よりAvro, JSON, Protobufなどの汎用シリアライゼーションツールの使用を推奨

オフセットコミット

オフセット管理

消費オフセットは__consumer_offsets内部トピックに永続化され、コミット操作により保存

オフセット関係

  • lastConsumedOffset: 最後に消費した位置
  • committed offset: コミット済みオフセット
  • position: 次回取得位置

position = committed offset = lastConsumedOffset + 1

メッセージ損失と重複処理

  • 早すぎるコミット: 処理失敗後のメッセージ損失
  • 遅すぎるコミット: 処理中断後の重複処理

自動コミット機構

enable.auto.commit=true(デフォルト)時、auto.commit.interval.ms(デフォルト5秒)間隔で最大オフセットをコミット

手動コミット

同期コミット:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> rec : records) {
  consumer.commitSync();
}

非同期コミット:

consumer.commitAsync((offsets, exception) -> {
  if (exception != null) {
    // エラー処理
  }
});

安全な終了処理:

try {
  while (RUNNING.get()) {
    // メッセージ処理
    consumer.commitAsync();
  }
} finally {
  try {
    consumer.commitSync();
  } finally {
    consumer.close();
  }
}

消費一時停止と再開

consumer.pause(Arrays.asList(partition));
consumer.resume(Arrays.asList(partition));

特定オフセットからの消費

auto.offset.reset設定:

  • latest: パーティション末尾から
  • earliest: パーティション先頭から

seekメソッドによる位置指定:

Set<TopicPartition> assigned = new HashSet<>();
while (assigned.isEmpty()) {
  consumer.poll(Duration.ofMillis(1000));
  assigned = consumer.assignment();
}

for (TopicPartition tp : assigned) {
  consumer.seek(tp, 100);
}

タイムスタンプ指定:

Map<TopicPartition, Long> query = new HashMap<>();
query.put(tp, System.currentTimeMillis() - 3600000);
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);

コンシューマ再バランス

再バランス中にコンシューマグループは一時的に利用不可となり、重複処理が発生する可能性

consumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {
  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    consumer.commitSync();
  }

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition tp : partitions) {
      long storedOffset = fetchOffsetFromDB(tp);
      consumer.seek(tp, storedOffset);
    }
  }
});

コンシューマインターセプタ

public interface ConsumerInterceptor<K, V> {
  ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
  void close();
}

マルチスレッドモデル

  • 単一パーティション・単一スレッド: 順序保証が容易だが接続オーバーヘッド
  • 単一コンシューマ・マルチワーカー: スループット向上だが順序管理複雑

タグ: Kafka コンシューマ 消費グループ オフセット管理 再バランス

6月14日 16:07 投稿