Kafkaを使用した.NET Coreでの動的トピック作成とパーティション管理

この記事では、Kafkaのトピックを動的に作成し、パーティションを管理する方法について説明します。また、API内での実装例も紹介します。

using Confluent.Kafka.Admin;
using Confluent.Kafka;
using Microsoft.AspNetCore.Mvc;

namespace KafkaApi.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class KafkaManagerController : ControllerBase
    {
        private readonly string _kafkaServerAddress;

        public KafkaManagerController(string kafkaServerAddress)
        {
            _kafkaServerAddress = kafkaServerAddress;
        }

        /// <summary>
        /// 既存のトピックに対して新しいパーティションを追加します。
        /// </summary>
        /// <param name="topicName">対象のトピック名。</param>
        /// <param name="newPartitionCount">追加後のパーティション数。</param>
        /// <returns>操作結果のメッセージ。</returns>
        [HttpGet("AddPartitions")]
        public async Task<string> AddPartitionsAsync(string topicName, int newPartitionCount)
        {
            var adminConfig = new AdminClientConfig { BootstrapServers = _kafkaServerAddress };
            using (var adminClient = new AdminClientBuilder(adminConfig).Build())
            {
                await adminClient.CreatePartitionsAsync(new[] { new PartitionsSpecification { Topic = topicName, IncreaseTo = newPartitionCount } });
                return "パーティションの追加に成功しました。";
            }
        }

        /// <summary>
        /// 新しいトピックを作成します。
        /// </summary>
        /// <param name="topicName">作成するトピック名。</param>
        /// <returns>操作結果のメッセージ。</returns>
        [HttpGet("CreateNewTopic")]
        public async Task<string> CreateNewTopicAsync(string topicName)
        {
            var adminConfig = new AdminClientConfig { BootstrapServers = _kafkaServerAddress };
            using (var adminClient = new AdminClientBuilder(adminConfig).Build())
            {
                await adminClient.CreateTopicsAsync(new[] { new TopicSpecification { Name = topicName } });
                return "トピックの作成に成功しました。";
            }
        }

        /// <summary>
        /// パーティション付きで新しいトピックを作成します。
        /// </summary>
        /// <param name="topicName">作成するトピック名。</param>
        /// <param name="partitionNumber">初期パーティション数。</param>
        /// <returns>操作結果のメッセージ。</returns>
        [HttpGet("CreateTopicWithPartitions")]
        public async Task<string> CreateTopicWithPartitionsAsync(string topicName, int partitionNumber)
        {
            var adminConfig = new AdminClientConfig { BootstrapServers = _kafkaServerAddress };
            using (var adminClient = new AdminClientBuilder(adminConfig).Build())
            {
                await adminClient.CreateTopicsAsync(new[] { new TopicSpecification { Name = topicName, NumPartitions = partitionNumber } });
                return "トピックとパーティションの作成に成功しました。";
            }
        }

        /// <summary>
        /// Kafkaにメッセージを送信します。
        /// </summary>
        /// <param name="message">送信するメッセージ。</param>
        /// <returns>空の文字列。</returns>
        [HttpGet("SendMessage")]
        public string SendMessage(string message)
        {
            // ここにメッセージ送信ロジックを実装します。
            return "";
        }
    }
}

次に、Kafkaにおけるパーティションの役割について説明します。パーティションは主にスケーラビリティとパフォーマンス向上のために使用されます。各パーティションには独立したコンシューマーが割り当てられるため、並列処理が可能になります。

以下は、特定のパーティションに直接メッセージを消費する例です。

consumer.Assign(new TopicPartition("test-topic", new Partition(1)));
consumer.Assign(new TopicPartitionOffset("test-topic", new Partition(0), new Offset(1)));

さらに、メッセージがどのようにパーティション間で均等に分散されるかについても解説します。Kafkaではカスタムのロードバランシングアルゴリズムを設定できます。例えば、リクエスト数に基づいてモジュロ演算を行うことで、均等な分散を実現できます。

private static int requestCounter = 0;

public Partition CustomPartitioner(string topic, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull)
{
    int selectedPartition = requestCounter % partitionCount;
    requestCounter++;
    return new Partition(selectedPartition);
}

// プロデューサー設定でカスタムパーティショナーを指定
var producerConfig = new ProducerConfig { BootstrapServers = "localhost:9092" };
producerConfig.SetDefaultPartitioner(CustomPartitioner);

タグ: .NET Core Kafka Confluent.Kafka

5月20日 09:30 投稿