gRPC の 4 種ストリーミングタイプと .NET Core での実装例

gRPC におけるストリーミングの種類

gRPC は次の 4 種類のストリーミングをサポートしています:

  1. ユニタリ RPC(Unary RPC)
  2. サーバーストリーミング RPC(Server Streaming RPC)
  3. クライアントストリーミング RPC(Client Streaming RPC)
  4. 双方向ストリーミング RPC(Bidirectional Streaming RPC)

1. ユニタリ RPC

リクエストとレスポンスが 1 対 1 でやり取りされる基本的な形式です。

proto ファイル

syntax = "proto3";

option csharp_namespace = "GrpcStreamDemo";

package calculation;

service Calculator {
  rpc Sum (SumRequest) returns (SumResponse);
}

message SumRequest {
  int32 a = 1;
  int32 b = 2;
}

message SumResponse {
  int32 result = 1;
}

サーバー側ロジック

using Grpc.Core;

namespace GrpcStreamDemo.Services
{
    public class CalculatorService : Calculator.CalculatorBase
    {
        public override Task<SumResponse> Sum(SumRequest request, ServerCallContext context)
        {
            return Task.FromResult(new SumResponse
            {
                Result = request.a + request.b
            });
        }
    }
}

クライアント側呼び出し

private static async Task CallSumAsync()
{
    using var channel = GrpcChannel.ForAddress("https://localhost:5001");
    var client = new Calculator.CalculatorClient(channel);

    var response = await client.SumAsync(new SumRequest
    {
        a = 3,
        b = 5
    });

    Console.WriteLine($"Sum result: {response.Result}");
}

2. サーバーストリーミング RPC

クライアントの 1 つのリクエストに対して、サーバーが複数回に分けてレスポンスを送信します。

proto ファイル

syntax = "proto3";

option csharp_namespace = "GrpcStreamDemo";

package streamserver;

service StreamService {
  rpc CountUp (NumberRequest) returns (stream MessageResponse);
}

message NumberRequest {
  int32 start = 1;
  int32 end = 2;
}

message MessageResponse {
  string content = 1;
}

サーバー側ロジック

public override async Task CountUp(NumberRequest request, IServerStreamWriter<MessageResponse> responseStream, ServerCallContext context)
{
    for (int i = request.start; i <= request.end; i++)
    {
        await responseStream.WriteAsync(new MessageResponse { content = $"Count: {i}" });
        await Task.Delay(300);
    }
}

クライアント側呼び出し

private static async Task CallCountUpAsync()
{
    using var channel = GrpcChannel.ForAddress("https://localhost:5001");
    var client = new StreamService.StreamServiceClient(channel);

    var reply = client.CountUp(new NumberRequest { start = 1, end = 5 });

    await foreach (var message in reply.ResponseStream.ReadAllAsync())
    {
        Console.WriteLine(message.content);
    }
}

3. クライアントストリーミング RPC

クライアントが複数回に分けてリクエストを送信し、サーバーが最後に 1 つのレスポンスを返します。

proto ファイル

syntax = "proto3";

option csharp_namespace = "GrpcStreamDemo";

package streamclient;

service ClientStreamService {
  rpc Accumulate (stream InputNumber) returns (TotalResponse);
}

message InputNumber {
  int32 value = 1;
}

message TotalResponse {
  int32 total = 1;
}

サーバー側ロジック

public override async Task<TotalResponse> Accumulate(IAsyncStreamReader<InputNumber> requestStream, ServerCallContext context)
{
    int total = 0;
    while (await requestStream.MoveNext())
    {
        total += requestStream.Current.value;
    }
    return new TotalResponse { total = total };
}

クライアント側呼び出し

private static async Task CallAccumulateAsync()
{
    using var channel = GrpcChannel.ForAddress("https://localhost:5001");
    var client = new ClientStreamService.ClientStreamServiceClient(channel);

    var call = client.Accumulate();

    var numbers = new[] { 2, 4, 6, 8 };

    foreach (var num in numbers)
    {
        await call.RequestStream.WriteAsync(new InputNumber { value = num });
        await Task.Delay(200);
    }

    await call.RequestStream.CompleteAsync();
    var response = await call.ResponseAsync;
    Console.WriteLine($"Total: {response.total}");
}

4. 双方向ストリーミング RPC

クライアントとサーバーが双方向で複数回にわたってメッセージを送受信します。

proto ファイル

syntax = "proto3";

option csharp_namespace = "GrpcStreamDemo";

package bidirectional;

service BidirectionalService {
  rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
  string text = 1;
}

サーバー側ロジック

public override async Task Chat(IAsyncStreamReader<ChatMessage> requestStream, IServerStreamWriter<ChatMessage> responseStream, ServerCallContext context)
{
    while (await requestStream.MoveNext())
    {
        var reply = new ChatMessage
        {
            text = $"Server received: {requestStream.Current.text}"
        };
        await responseStream.WriteAsync(reply);
    }
}

クライアント側呼び出し

private static async Task CallChatAsync()
{
    using var channel = GrpcChannel.ForAddress("https://localhost:5001");
    var client = new BidirectionalService.BidirectionalServiceClient(channel);

    var call = client.Chat();

    var readTask = Task.Run(async () =>
    {
        await foreach (var reply in call.ResponseStream.ReadAllAsync())
        {
            Console.WriteLine(reply.text);
        }
    });

    var messages = new[] { "Hello", "gRPC", "Streaming", "World" };
    foreach (var msg in messages)
    {
        await call.RequestStream.WriteAsync(new ChatMessage { text = msg });
        await Task.Delay(500);
    }

    await call.RequestStream.CompleteAsync();
    await readTask;
}

タグ: gRPC .NET Core protobuf streaming Server Streaming

6月5日 18:54 投稿