この記事では、Kafkaのトピックを動的に作成し、パーティションを管理する方法について説明します。また、API内での実装例も紹介します。
using Confluent.Kafka.Admin;
using Confluent.Kafka;
using Microsoft.AspNetCore.Mvc;
namespace KafkaApi.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class KafkaManagerController : ControllerBase
{
private readonly string _kafkaServerAddress;
public KafkaManagerController(string kafkaServerAddress)
{
_kafkaServerAddress = kafkaServerAddress;
}
/// <summary>
/// 既存のトピックに対して新しいパーティションを追加します。
/// </summary>
/// <param name="topicName">対象のトピック名。</param>
/// <param name="newPartitionCount">追加後のパーティション数。</param>
/// <returns>操作結果のメッセージ。</returns>
[HttpGet("AddPartitions")]
public async Task<string> AddPartitionsAsync(string topicName, int newPartitionCount)
{
var adminConfig = new AdminClientConfig { BootstrapServers = _kafkaServerAddress };
using (var adminClient = new AdminClientBuilder(adminConfig).Build())
{
await adminClient.CreatePartitionsAsync(new[] { new PartitionsSpecification { Topic = topicName, IncreaseTo = newPartitionCount } });
return "パーティションの追加に成功しました。";
}
}
/// <summary>
/// 新しいトピックを作成します。
/// </summary>
/// <param name="topicName">作成するトピック名。</param>
/// <returns>操作結果のメッセージ。</returns>
[HttpGet("CreateNewTopic")]
public async Task<string> CreateNewTopicAsync(string topicName)
{
var adminConfig = new AdminClientConfig { BootstrapServers = _kafkaServerAddress };
using (var adminClient = new AdminClientBuilder(adminConfig).Build())
{
await adminClient.CreateTopicsAsync(new[] { new TopicSpecification { Name = topicName } });
return "トピックの作成に成功しました。";
}
}
/// <summary>
/// パーティション付きで新しいトピックを作成します。
/// </summary>
/// <param name="topicName">作成するトピック名。</param>
/// <param name="partitionNumber">初期パーティション数。</param>
/// <returns>操作結果のメッセージ。</returns>
[HttpGet("CreateTopicWithPartitions")]
public async Task<string> CreateTopicWithPartitionsAsync(string topicName, int partitionNumber)
{
var adminConfig = new AdminClientConfig { BootstrapServers = _kafkaServerAddress };
using (var adminClient = new AdminClientBuilder(adminConfig).Build())
{
await adminClient.CreateTopicsAsync(new[] { new TopicSpecification { Name = topicName, NumPartitions = partitionNumber } });
return "トピックとパーティションの作成に成功しました。";
}
}
/// <summary>
/// Kafkaにメッセージを送信します。
/// </summary>
/// <param name="message">送信するメッセージ。</param>
/// <returns>空の文字列。</returns>
[HttpGet("SendMessage")]
public string SendMessage(string message)
{
// ここにメッセージ送信ロジックを実装します。
return "";
}
}
}
次に、Kafkaにおけるパーティションの役割について説明します。パーティションは主にスケーラビリティとパフォーマンス向上のために使用されます。各パーティションには独立したコンシューマーが割り当てられるため、並列処理が可能になります。
以下は、特定のパーティションに直接メッセージを消費する例です。
consumer.Assign(new TopicPartition("test-topic", new Partition(1)));
consumer.Assign(new TopicPartitionOffset("test-topic", new Partition(0), new Offset(1)));
さらに、メッセージがどのようにパーティション間で均等に分散されるかについても解説します。Kafkaではカスタムのロードバランシングアルゴリズムを設定できます。例えば、リクエスト数に基づいてモジュロ演算を行うことで、均等な分散を実現できます。
private static int requestCounter = 0;
public Partition CustomPartitioner(string topic, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull)
{
int selectedPartition = requestCounter % partitionCount;
requestCounter++;
return new Partition(selectedPartition);
}
// プロデューサー設定でカスタムパーティショナーを指定
var producerConfig = new ProducerConfig { BootstrapServers = "localhost:9092" };
producerConfig.SetDefaultPartitioner(CustomPartitioner);