Kafka コンシューマの冪等性を実現する実装手法

Kafka を扱う際、プロデューサ側の冪等性に注目が集まりがちですが、コンシューマ側の冪等性もシステムの信頼性を左右する重要な要素です。特に、ネットワーク障害やプロセスの再起動などにより同一メッセージが複数回配信される可能性がある分散環境では、コンシューマが冪等に動作しないと、データの重複登録や重複引き落としといった深刻な不整合を引き起こします。本稿では、Kafka コンシューマにおいて冪等性を担保する具体的な実装パターンを解説します。

メッセージ識別子を用いた重複排除

最も一般的なアプローチは、メッセージに一意なIDを付与し、そのIDを外部ストレージに記録して重複を防ぐ方法です。

データベースによる重複排除テーブル

リレーショナルデータベースを使用して、既に処理済みのメッセージIDを管理します。以下のSQL例では、メッセージIDを主キーとしたテーブルを作成しています。

CREATE TABLE processed_messages (
    message_id VARCHAR(255) PRIMARY KEY,
    consumed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Java での実装例を示します。消費前にIDの存在確認を行い、未処理の場合のみビジネスロジックを実行してからIDを登録します。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class DbDeduplicationHandler {
    private static final String DB_URL = "jdbc:mysql://localhost:3306/mydb";
    private static final String DB_USER = "app_user";
    private static final String DB_PASSWORD = "secret";

    public boolean isAlreadyProcessed(String messageId) {
        String sql = "SELECT 1 FROM processed_messages WHERE message_id = ?";
        try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
             PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, messageId);
            try (ResultSet rs = ps.executeQuery()) {
                return rs.next();
            }
        } catch (SQLException e) {
            // エラーハンドリング:ログ出力など
            return false; // 安全側に倒し、再処理を許容
        }
    }

    public void markAsProcessed(String messageId) {
        String sql = "INSERT INTO processed_messages (message_id) VALUES (?)";
        try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
             PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, messageId);
            ps.executeUpdate();
        } catch (SQLException e) {
            // 重複キーエラーは無視しても良い
        }
    }
}

Redis を用いたキャッシュベースの重複排除

データベースへのアクセスがボトルネックとなる場合は、インメモリキャッシュである Redis を利用します。TTL を設定することで古いエントリを自動削除し、メモリ消費を抑制します。

import redis.clients.jedis.Jedis;

public class RedisDeduplicationHandler {
    private static final String REDIS_HOST = "localhost";
    private static final int REDIS_PORT = 6379;

    public boolean isAlreadyProcessed(String messageId) {
        try (Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT)) {
            return jedis.exists(messageId);
        }
    }

    public void markAsProcessed(String messageId) {
        try (Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT)) {
            // TTL は 3600 秒(1時間)に設定
            jedis.setex(messageId, 3600, "1");
        }
    }
}

Kafka トランザクションを利用したアトミックな処理

メッセージ処理とオフセットコミットをトランザクションでラップすることで、処理の完了と進行管理を不可分にします。これにより、処理後にクラッシュが発生しても、トランザクションがアボートされてメッセージは再配送され、結果的に冪等性が保たれます。

以下のコード例では、read_committed 分離レベルとトランザクション ID を設定し、beginTransaction() から commitTransaction() までの処理を原子的に実行しています。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;

public class TransactionalConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "tx-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ConsumerConfig.TRANSACTIONAL_ID_CONFIG, "tx-consumer-1");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.initTransactions();
        consumer.subscribe(List.of("orders"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
            consumer.beginTransaction();
            try {
                for (ConsumerRecord<String, String> record : records) {
                    // ビジネスロジックを実行(冪等な設計が望ましい)
                    System.out.printf("Processing order: %s%n", record.value());
                }
                // オフセットをコミット
                consumer.commitSync();
                consumer.commitTransaction();
            } catch (Exception e) {
                consumer.abortTransaction();
                // エラーログ出力とリトライ戦略
            }
        }
    }
}

課題と対策

パフォーマンスへの影響

外部ストレージへの問い合わせやトランザクション処理はレイテンシを増加させます。対策として、データベースのインデックス最適化、バッチ書き込み、Redis の適切な TTL 設計、トランザクションタイムアウトの調整などが有効です。

分散環境でのデータ整合性

重複排除テーブルへの同時書き込みによる競合を防ぐには、データベースの楽観的ロックや、Redis の分散ロック(Redlock など)を導入します。また、トランザクション内で処理結果を副作用として保持し、再実行時に同一結果を得られるようビジネスロジック自体を冪等に設計することも重要です。

コンシューマの冪等性は、単一のテクニックだけで達成できるものではなく、メッセージの特性やシステム要件に応じて適切な戦略を組み合わせる必要があります。上記の手法を基盤に、各自のユースケースに最適な設計を検討してください。

タグ: Kafka 冪等性 メッセージ重複排除 redis データベース

5月30日 12:02 投稿