コンシューマ基礎知識
コンシューマとコンシューマグループ
- 各コンシューマは特定のコンシューマグループに所属し、異なるグループ間は互いに影響しない
- パーティションのメッセージは1つのグループ内で単一コンシューマによって消費されるが、再バランスにより新しいコンシューマに再割当される可能性がある
- トピックの異なるパーティションはクライアントパラメータpartition.assignment.strategyに基づきコンシューマに分配される
Kafkaのメッセージ配信モード
メッセージブローカーにはP2PとPub/Subの2つの配信モードが存在し、Kafkaは両方をサポートする
- 全コンシューマが同一グループの場合:メッセージは各コンシューマに均等に配信され(P2Pモード)、各メッセージは1つのコンシューマのみが処理
- 全コンシューマが異なるグループの場合:メッセージは全コンシューマにブロードキャストされ(Pub/Subモード)、各メッセージは全コンシューマが処理
クライアント開発
コンシューマロジックの実装手順
- コンシューマパラメータ設定とインスタンス作成
- トピック購読
- メッセージ取得と消費
- オフセットコミット
- コンシューマインスタンス閉鎖
public class KafkaMessageProcessor {
private static final String SERVER_ADDRESS = "localhost:9092";
private static final String TOPIC_NAME = "TOPIC-X";
private static final String GROUP_NAME = "GROUP-X";
private static final AtomicBoolean ACTIVE = new AtomicBoolean(true);
public static Properties setupConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER_ADDRESS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-processor-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_NAME);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}
public static void main(String[] args) {
Properties config = setupConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
try {
while (ACTIVE.get()) {
ConsumerRecords<String, String> messages = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> msg : messages) {
System.out.println("Topic:" + msg.topic() + ", Partition:" + msg.partition() + ", Offset:" + msg.offset());
System.out.println("Key:" + msg.key() + ", Value:" + msg.value());
}
}
} catch (WakeupException ignored) {
} finally {
consumer.close();
}
}
}
注意: KafkaConsumerはスレッドセーフではないため、wakeup()メソッドのみが他のスレッドから安全に呼び出せる
subscribeメソッドのオーバーロード
public void subscribe(Collection<String> topics, ConsumerRebalanceListener handler)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener handler)
public void subscribe(Pattern pattern)
assignによる特定パーティション購読
consumer.assign(Arrays.asList(new TopicPartition("topic-sample", 0)));
subscribeメソッドは自動再バランス機能を提供するが、assignメソッドではこの機能は利用不可
購読解除
consumer.unsubscribe();
consumer.subscribe(new ArrayList<>());
consumer.assign(new ArrayList<>());
メッセージ消費
コンシューマが取得するメッセージはConsumerRecord型で、ProducerRecordに対応
public class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final K key;
private final V value;
// その他のフィールドとメソッド
}
パーティション別メッセージ処理:
for (TopicPartition tp : records.partitions()) {
for (ConsumerRecord<String, String> rec : records.records(tp)) {
System.out.println(rec.partition() + ":" + rec.value());
}
}
デシリアライゼーション
Kafkaが提供するデシリアライザ:
- ByteBufferDeserializer, ByteArrayDeserializer, BytesDeserializer
- DoubleDeserializer, FloatDeserializer, IntegerDeserializer, LongDeserializer, ShortDeserializer, StringDeserializer
カスタム実装よりAvro, JSON, Protobufなどの汎用シリアライゼーションツールの使用を推奨
オフセットコミット
オフセット管理
消費オフセットは__consumer_offsets内部トピックに永続化され、コミット操作により保存
オフセット関係
- lastConsumedOffset: 最後に消費した位置
- committed offset: コミット済みオフセット
- position: 次回取得位置
position = committed offset = lastConsumedOffset + 1
メッセージ損失と重複処理
- 早すぎるコミット: 処理失敗後のメッセージ損失
- 遅すぎるコミット: 処理中断後の重複処理
自動コミット機構
enable.auto.commit=true(デフォルト)時、auto.commit.interval.ms(デフォルト5秒)間隔で最大オフセットをコミット
手動コミット
同期コミット:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> rec : records) {
consumer.commitSync();
}
非同期コミット:
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
// エラー処理
}
});
安全な終了処理:
try {
while (RUNNING.get()) {
// メッセージ処理
consumer.commitAsync();
}
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
消費一時停止と再開
consumer.pause(Arrays.asList(partition));
consumer.resume(Arrays.asList(partition));
特定オフセットからの消費
auto.offset.reset設定:
- latest: パーティション末尾から
- earliest: パーティション先頭から
seekメソッドによる位置指定:
Set<TopicPartition> assigned = new HashSet<>();
while (assigned.isEmpty()) {
consumer.poll(Duration.ofMillis(1000));
assigned = consumer.assignment();
}
for (TopicPartition tp : assigned) {
consumer.seek(tp, 100);
}
タイムスタンプ指定:
Map<TopicPartition, Long> query = new HashMap<>();
query.put(tp, System.currentTimeMillis() - 3600000);
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
コンシューマ再バランス
再バランス中にコンシューマグループは一時的に利用不可となり、重複処理が発生する可能性
consumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
long storedOffset = fetchOffsetFromDB(tp);
consumer.seek(tp, storedOffset);
}
}
});
コンシューマインターセプタ
public interface ConsumerInterceptor<K, V> {
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
void close();
}
マルチスレッドモデル
- 単一パーティション・単一スレッド: 順序保証が容易だが接続オーバーヘッド
- 単一コンシューマ・マルチワーカー: スループット向上だが順序管理複雑