Kafkaブローカーのログファイルの構造と動作原理

1. パーティションディレクトリ内のファイル構造

Kafkaのメッセージストレージは、パーティション(Partition)を中心に構築されています。各パーティションは物理的なディレクトリに対応し、そのディレクトリ内にはコアログファイルインデックスファイルトランザクション関連ファイルメタデータファイルの4種類のファイルが含まれます。これらのファイルが連携することで、メッセージの効率的な保存、高速な検索、および一貫性の保証を実現しています。

1.1 コアログファイル(.log): メッセージデータの保存

コアログファイルはKafkaメッセージストレージの基盤であり、.logという拡張子を持っています。各ファイルは一つのログセグメント(Log Segment)に対応します。

  • 命名規則: ファイル名は、そのログセグメントの最初のメッセージのオフセット(Offset)で命名されます(例: 00000000000000000000.log)。これにより、ログセグメントの開始位置を迅速に特定できます。
  • 保存内容: メッセージは順次追加方式で保存されます。各メッセージには、オフセット(Offset)タイムスタンプ(Timestamp)キー(Key)値(Value)CRC検証コードなどのメタデータが含まれます。
  • ファイルサイズ: log.segment.bytesパラメータ(デフォルト1GB)によって制御されます。ファイルがいっぱいになると、新しいログセグメントが自動的に作成され、単一ファイルのサイズが大きくなることによる読み書き性能への影響を防ぎます。
  • 役割: メッセージの永続化を保証し、メッセージの消失を防ぎます(acks=all設定と組み合わせることで)。Kafkaの高信頼性の基盤となります。

1.2 インデックスファイル: メッセージ検索の高速化

インデックスファイルは、Kafkaが高速なメッセージ検索を実現するための鍵です。これにはオフセットインデックス.index)とタイムスタンインデックス.timeindex)の2種類があり、対応する.logファイルと同じ名前を持ちます(例: 00000000000000000000.index00000000000000000000.timeindex)。

1.2.1 オフセットインデックスファイル(.index)

  • 機能: メッセージのオフセット(Offset).logファイル内の物理位置とのマッピングを確立し、オフセットによるメッセージの高速検索をサポートします(例: 消費者がoffset=1000を指定してメッセージを読み取る場合)。
  • 保存内容: 相対オフセット(Relative Offset)物理位置(Position)の対応関係を記録します。ここで、相対オフセットは、メッセージの絶対オフセットからログセグメントの開始オフセットを引いた値です(例: ログセグメントの開始オフセットが1000で、メッセージの絶対オフセットが1500の場合、相対オフセットは500になります)。これは4バイトしか占有しないため、インデックスのスペースを節約します。
  • スパースインデックス設計: Kafkaはすべてのメッセージに対してインデックスを構築するわけではありません。代わりに、log.index.interval.bytesパラメータ(デフォルト4KB)に基づいて定期的にインデックスエントリを追加します(例: 4KBのデータが書き込まれるたびに1つのインデックスを追加)。これにより、インデックスのサイズと検索効率のバランスを取ります。
  • 検索フロー: 消費者はまず、.indexファイル内で二分探索を使用して、ターゲットオフセットに対応する相対オフセットと物理位置を見つけます。その後、.logファイルの対応する位置から順次スキャンして、ターゲットメッセージを見つけます(スパースインデックスのため、少量の順次読み取りが必要です)。

1.2.2 タイムスタンインデックスファイル(.timeindex)

  • 機能: メッセージのタイムスタンプ(Timestamp)メッセージオフセットとのマッピングを確立し、時間範囲によるメッセージの検索をサポートします(例: 消費者がtimestamp=2025-10-01 00:00:00を指定してメッセージを読み取る場合)。
  • 保存内容: タイムスタンプ対応するメッセージのオフセット(このオフセットは.indexファイルを介して物理位置に変換されます)を記録します。
  • 検索フロー: 消費者は、.timeindexファイル内で二分探索を使用して、ターゲットタイムスタンプに対応するオフセットを見つけます。その後、.indexファイルを介して.logファイルの物理位置を特定し、時間次元での高速検索を実現します。
  • 適用シナリオ: 主にログクリーンアップ(例: 7日前のメッセージを削除する場合、.timeindexファイルを使用して最も古いメッセージのタイムスタンプを見つける)、ストリーミング処理における時間ウィンドウ計算などで使用されます。

1.3 トランザクション関連ファイル: Exactly-Onceセマンティクスの保証

トランザクション機構(transactional.id設定)が有効になっている場合、パーティションディレクトリには未完了トランザクションインデックスファイル(.txnindex)が生成されます。これは、未完了のトランザクションメッセージの位置を記録し、消費者が一貫性のないトランザクションメッセージを読み取らないようにするために使用されます。

  • 機能: トランザクションの状態(例: ABORTED)をマークし、未コミットまたは中止されたトランザクションメッセージをフィルタリングします。これにより、read_committed分離レベルの消費者は、コミットされたメッセージのみを見ることができます。
  • 保存内容: トランザクションID(Transactional ID)プロデューサーID(PID)トランザクション開始オフセットなどの情報を記録します。
  • トリガー条件: トランザクションがコミット(commitTransaction)または中止(abortTransaction)されると、トランザクションコーディネーターが.txnindexファイルを更新し、トランザクションの状態をマークします。

1.4 メタデータファイル: パーティション状態とレプリカ同期の管理

メタデータファイルは、パーティションのレプリカ状態ログクリーンアップの進捗などを管理するために使用され、Kafkaクラスタの運用管理における重要なファイルです。主なものは以下の通りです。

1.4.1 Leader Epochチェックポイントファイル(leader-epoch-checkpoint)

  • 機能: パーティションのLeaderのEpoch情報(EpochはLeaderのバージョン番号で、各Leader選出時にインクリメントされます)を記録し、レプリカの同期(例: Followerレプリカが再起動した後、Epochを使用して全量データをLeaderから取得する必要があるかどうかを判断する)をサポートします。
  • 保存内容: Leader Epoch対応するLeaderのBroker ID開始オフセットなどの情報を含みます。
  • 役割: Leader切り替え時のデータ一貫性の問題を解決します(例: 旧Leaderが復旧した場合、新Leaderが書き込んだデータを上書きしないようにします)。

1.4.2 ログクリーンチェックポイントファイル(log-cleaner-checkpoint)

  • 機能: ログクリーンの進捗(例: 最近削除されたログセグメントのオフセット)を記録し、重複クリーンアップや漏れを防ぎます。
  • トリガー条件: ログクリーンタスク(例: 期限切れのメッセージの削除、ログの圧縮)が完了すると、このファイルが更新されます。

1.4.3 スナップショットファイル(.snapshot)

  • 機能: パーティションメタデータのスナップショット(例: パーティションのLeader、ISRリスト)を記録し、パーティション状態の迅速な復元(例: Brokerが再起動した後、スナップショットを使用してパーティションメタデータを迅速に再構築する)をサポートします。
  • 生成タイミング: パーティションメタデータが変更された場合(例: Leader選出、ISRの更新)に自動的に生成されます。

2. メッセージの保存形式と解析プロセス

Kafkaのメッセージは、固定長のヘッダーと可変長のボディから構成される固定フォーマットで保存されます。これにより、メッセージの境界を明確に区別し、offsetフィールドを正確に解析できます。

2.1 メッセージの保存形式

各メッセージは、以下の固定長ヘッダーと可変長ボディで構成されます。

| オフセット(8B) | キーサイズ(4B) | キー(...) | 値サイズ(4B) | 値(...) | タイムスタンプ(8B) | crc32(4B) |
  • オフセット(Offset): メッセージの一意の識別子。8バイト。
  • キーサイズ(Key Size): キーのバイト長。4バイト。
  • キー(Key): 可変長のキーデータ。
  • 値サイズ(Value Size): 値のバイト長。4バイト。
  • 値(Value): 可変長の値データ。
  • タイムスタンプ(Timestamp): メッセージのタイムスタンプ。8バイト。
  • CRC32: メッセージの整合性を検証するためのチェックサム。4バイト。

2.2 メッセージの解析プロセス

以下は、.logファイルからメッセージを読み取り、解析するための疑似コードの例です。この例では、JavaのNIO APIを使用してファイルをメモリにマッピングし、メッセージを解析します。

import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

public class LogFileParser {

    public void parseLogSegment(Path logFilePath) throws IOException {
        // ファイルをメモリにマッピング(ゼロコピー)
        try (FileChannel channel = FileChannel.open(logFilePath, StandardOpenOption.READ)) {
            MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());

            while (buffer.hasRemaining()) {
                // 1. 固定ヘッダーを読み取る
                long messageOffset = buffer.getLong(); // オフセット(8バイト)
                int messageSize = buffer.getInt();     // メッセージサイズ(4バイト)

                // 2. CRC32を検証(省略)

                // 3. 可変長ボディを読み取る
                int keySize = buffer.getInt(); // キーサイズ(4バイト)
                byte[] keyBytes = new byte[keySize];
                buffer.get(keyBytes);

                int valueSize = buffer.getInt(); // 値サイズ(4バイト)
                byte[] valueBytes = new byte[valueSize];
                buffer.get(valueBytes);

                long timestamp = buffer.getLong(); // タイムスタンプ(8バイト)

                // 4. メッセージデータを処理
                String key = new String(keyBytes, java.nio.charset.StandardCharsets.UTF_8);
                String value = new String(valueBytes, java.nio.charset.StandardCharsets.UTF_8);

                System.out.printf("オフセット: %d, キー: %s, 値: %s, タイムスタンプ: %d%n",
                        messageOffset, key, value, timestamp);
            }
        }
    }
}

3. オフセット管理メカニズム

Kafkaは、パーティションレベルの順次書き込み独立したオフセット管理メカニズムを通じて、パーティション内のオフセットの単調増加を保証します。しかし、同じトピック内の異なるパーティション間では、オフセットは一意ではありません。

3.1 パーティション内のオフセットの単調増加

  • 物理ストレージ設計: 各パーティションは独立したログディレクトリに対応し、メッセージは現在アクティブなセグメントに順次追加されます。各セグメントファイルは、そのセグメントの最初のメッセージのオフセットで命名されます(例: 00000000000000000000.log)。これにより、物理ストレージの連続性が保証されます。
  • 追加書き込み: プロデューサーがパーティションに送信したメッセージは、ディスクに書き込まれる際に必ずoffsetが単調に増加する順序で書き込まれます。ランダムな挿入や変更はサポートされていません。例えば、現在のセグメントの最後のメッセージのoffsetが999の場合、新しいメッセージのoffsetは必ず1000になります。
  • メタデータ管理: 各パーティションのoffsetは0から開始し、Brokerによって自動的に管理されます。メッセージが書き込まれる際に、Brokerは現在のセグメントの最後のoffset値に1を加算して新しいoffsetを割り当てます。
  • 順序性の保証: 各パーティションは単一のスレッドによって書き込みリクエストが処理されるため、並行書き込みによるoffsetの乱れを防ぎます。

3.2 異なるパーティション間のオフセットの非一意性

  • パーティションの独立性: 同じトピックの異なるパーティションは、物理的に分離されたストレージユニットであり、それぞれが独立したoffsetシーケンスを維持します。
    - パーティション0: offset=0, 1, 2, ...
    - パーティション1: offset=0, 1, 2, ...
    
  • グローバルな調整なし: パーティション間のoffsetはカウンタを共有せず、Brokerはパーティション間のoffsetの一意性を保証しません。
  • パーティション戦略の影響: プロデューサーがメッセージのKeyを指定すると、同じKeyのメッセージは同じパーティションに割り当てられ、そのパーティション内で関連するメッセージのoffsetが連続します。Keyがない場合、メッセージはラウンドロビン戦略によって異なるパーティションに割り当てられ、異なるパーティションのoffsetが独立して増加します。

4. コアデザイン原則の要約

Kafkaのストレージアーキテクチャは、順次書き込みスパースインデックスページキャッシュなどのコアデザイン原則に基づいています。これらの原則は、ディスクI/Oとメモリキャッシュの利点を最大限に活用し、高いスループットと低遅延を実現します。

  • 順次書き込みとランダム読み取りの分離: プロデューサーはディスクの順次I/Oを最大限に活用して書き込みを行い、消費者はインデックスとページキャッシュを利用してランダムな読み取りを高速化します。
  • ログセグメンテーション: 大きなファイルを小さなセグメントに分割することで、メンテナンスと検索を効率化します。
  • スパースインデックス: インデックスファイルのサイズを小さく保ちながら、検索効率を維持します。
  • ページキャッシュの活用: オペレーティングシステムのページキャッシュを利用して、頻繁にアクセスされるデータをメモリに保持し、ディスクI/Oを削減します。

タグ: Kafka メッセージキュー 分散システム ログストレージ オフセット管理

6月26日 22:30 投稿