Netty 4 で長さプレフィクス方式のカスタムコーデックを実装する

TCPはストリーム指向のトランスポートプロトコルであり、データに自然な境界が存在しません。送信側が連続して複数のメッセージを書き込むと、ネットワークスタックやカーネルバッファの都合により、それらが結合(粘包)または分割(拆包)される可能性があります。このため、受信側ではメッセージの境界を正しく再構成する必要があります。

一般的な対処手法には以下のようなものがあります:

  • 固定長メッセージ:すべてのメッセージを同一長さに揃える
  • 区切り文字方式:メッセージ末尾に特殊文字(例:\n)を付与
  • 長さプレフィクス方式:メッセージ先頭にペイロード長を符号化したヘッダを付加(本稿で採用)

この方式では、各メッセージの先頭4バイト(32ビット整数)に後続ペイロードのバイト長を格納し、その後にシリアライズ済みのデータ本体を配置します。これにより、受信側はまずヘッダを読み取り、次にその長さ分のデータを待機・収集できます。

以下は、汎用的なJavaオブジェクトを扱うためのカスタムエンコーダおよびデコーダの実装例です。シリアライザとしてKryoを想定していますが、JacksonやProtobufなど他のライブラリへの置換も容易です。

カスタムエンコーダ(ObjectToLengthPrefixedByteBufEncoder)

public class ObjectToLengthPrefixedByteBufEncoder<T> extends MessageToByteEncoder<T> {
    private final KryoSerializer serializer;

    public ObjectToLengthPrefixedByteBufEncoder(KryoSerializer kryo) {
        this.serializer = kryo;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, T message, ByteBuf out) throws Exception {
        byte[] payload = serializer.serialize(message);
        out.writeInt(payload.length); // ヘッダ:ペイロード長(int, 4バイト)
        out.writeBytes(payload);      // ボディ:シリアライズ済みデータ
    }
}

カスタムデコーダ(LengthPrefixedByteBufToObjectDecoder)

public class LengthPrefixedByteBufToObjectDecoder extends ByteToMessageDecoder {
    private static final int HEADER_SIZE = Integer.BYTES; // 4 bytes

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < HEADER_SIZE) {
            return; // ヘッダが不完全 → 待機
        }

        in.markReaderIndex();
        int payloadLength = in.readInt();

        if (payloadLength < 0 || payloadLength > 10 * 1024 * 1024) { // 10MB上限チェック
            ctx.close();
            return;
        }

        if (in.readableBytes() < payloadLength) {
            in.resetReaderIndex(); // ペイロードが未到着 → リセットして再試行
            return;
        }

        byte[] payload = new byte[payloadLength];
        in.readBytes(payload);
        Object deserialized = serializer.deserialize(payload);
        out.add(deserialized);
    }
}

この実装では、ByteBuf の読み取り位置(readerIndex)の管理に注意が必要です。readInt()readBytes() などの操作は内部ポインタを自動的に進めるため、条件分岐で読み取りをスキップする際は markReaderIndex()resetReaderIndex() を適切に使用しなければ、データの欠落や誤読が発生します。

また、Netty標準の LengthFieldBasedFrameDecoder を活用することも可能ですが、カスタムシリアライゼーションとの連携やエラーハンドリングの柔軟性を重視する場合は、上記のように明示的なコーデッククラスを実装するのが推奨されます。

タグ: netty4 kryo ByteBuf custom-encoder custom-decoder

6月23日 19:43 投稿