gRPC におけるストリーミングの種類
gRPC は次の 4 種類のストリーミングをサポートしています:
- ユニタリ RPC(Unary RPC)
- サーバーストリーミング RPC(Server Streaming RPC)
- クライアントストリーミング RPC(Client Streaming RPC)
- 双方向ストリーミング RPC(Bidirectional Streaming RPC)
1. ユニタリ RPC
リクエストとレスポンスが 1 対 1 でやり取りされる基本的な形式です。
proto ファイル
syntax = "proto3";
option csharp_namespace = "GrpcStreamDemo";
package calculation;
service Calculator {
rpc Sum (SumRequest) returns (SumResponse);
}
message SumRequest {
int32 a = 1;
int32 b = 2;
}
message SumResponse {
int32 result = 1;
}
サーバー側ロジック
using Grpc.Core;
namespace GrpcStreamDemo.Services
{
public class CalculatorService : Calculator.CalculatorBase
{
public override Task<SumResponse> Sum(SumRequest request, ServerCallContext context)
{
return Task.FromResult(new SumResponse
{
Result = request.a + request.b
});
}
}
}
クライアント側呼び出し
private static async Task CallSumAsync()
{
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new Calculator.CalculatorClient(channel);
var response = await client.SumAsync(new SumRequest
{
a = 3,
b = 5
});
Console.WriteLine($"Sum result: {response.Result}");
}
2. サーバーストリーミング RPC
クライアントの 1 つのリクエストに対して、サーバーが複数回に分けてレスポンスを送信します。
proto ファイル
syntax = "proto3";
option csharp_namespace = "GrpcStreamDemo";
package streamserver;
service StreamService {
rpc CountUp (NumberRequest) returns (stream MessageResponse);
}
message NumberRequest {
int32 start = 1;
int32 end = 2;
}
message MessageResponse {
string content = 1;
}
サーバー側ロジック
public override async Task CountUp(NumberRequest request, IServerStreamWriter<MessageResponse> responseStream, ServerCallContext context)
{
for (int i = request.start; i <= request.end; i++)
{
await responseStream.WriteAsync(new MessageResponse { content = $"Count: {i}" });
await Task.Delay(300);
}
}
クライアント側呼び出し
private static async Task CallCountUpAsync()
{
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new StreamService.StreamServiceClient(channel);
var reply = client.CountUp(new NumberRequest { start = 1, end = 5 });
await foreach (var message in reply.ResponseStream.ReadAllAsync())
{
Console.WriteLine(message.content);
}
}
3. クライアントストリーミング RPC
クライアントが複数回に分けてリクエストを送信し、サーバーが最後に 1 つのレスポンスを返します。
proto ファイル
syntax = "proto3";
option csharp_namespace = "GrpcStreamDemo";
package streamclient;
service ClientStreamService {
rpc Accumulate (stream InputNumber) returns (TotalResponse);
}
message InputNumber {
int32 value = 1;
}
message TotalResponse {
int32 total = 1;
}
サーバー側ロジック
public override async Task<TotalResponse> Accumulate(IAsyncStreamReader<InputNumber> requestStream, ServerCallContext context)
{
int total = 0;
while (await requestStream.MoveNext())
{
total += requestStream.Current.value;
}
return new TotalResponse { total = total };
}
クライアント側呼び出し
private static async Task CallAccumulateAsync()
{
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new ClientStreamService.ClientStreamServiceClient(channel);
var call = client.Accumulate();
var numbers = new[] { 2, 4, 6, 8 };
foreach (var num in numbers)
{
await call.RequestStream.WriteAsync(new InputNumber { value = num });
await Task.Delay(200);
}
await call.RequestStream.CompleteAsync();
var response = await call.ResponseAsync;
Console.WriteLine($"Total: {response.total}");
}
4. 双方向ストリーミング RPC
クライアントとサーバーが双方向で複数回にわたってメッセージを送受信します。
proto ファイル
syntax = "proto3";
option csharp_namespace = "GrpcStreamDemo";
package bidirectional;
service BidirectionalService {
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string text = 1;
}
サーバー側ロジック
public override async Task Chat(IAsyncStreamReader<ChatMessage> requestStream, IServerStreamWriter<ChatMessage> responseStream, ServerCallContext context)
{
while (await requestStream.MoveNext())
{
var reply = new ChatMessage
{
text = $"Server received: {requestStream.Current.text}"
};
await responseStream.WriteAsync(reply);
}
}
クライアント側呼び出し
private static async Task CallChatAsync()
{
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new BidirectionalService.BidirectionalServiceClient(channel);
var call = client.Chat();
var readTask = Task.Run(async () =>
{
await foreach (var reply in call.ResponseStream.ReadAllAsync())
{
Console.WriteLine(reply.text);
}
});
var messages = new[] { "Hello", "gRPC", "Streaming", "World" };
foreach (var msg in messages)
{
await call.RequestStream.WriteAsync(new ChatMessage { text = msg });
await Task.Delay(500);
}
await call.RequestStream.CompleteAsync();
await readTask;
}