RabbitMQを用いた.NETアプリケーションでのメッセージング実装

RabbitMQ基本設定

サーバーにRabbitMQをインストール後、vhostの作成とexchange/queueのバインディングを設定します。

プロジェクト構成

NuGetからRabbitMQ.Clientパッケージをインストールし、以下のヘルパークラスを実装します。

RabbitMQ操作クラス

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Configuration;
using System.Text;

namespace MessagingService.Providers
{
    public class RabbitOperator
    {
        private static IConnection _connection;
        private static readonly object LockObj = new object();
        
        public static IConnection GetConnection()
        {
            if (_connection?.IsOpen ?? false) return _connection;
            
            lock (LockObj)
            {
                _connection = CreateFactory().CreateConnection();
            }
            return _connection;
        }

        private static ConnectionFactory CreateFactory()
        {
            var config = ConfigurationManager.AppSettings["mqConfig"];
            var parts = config.Split(';');
            
            return new ConnectionFactory
            {
                HostName = parts[0],
                Port = int.Parse(parts[1]),
                VirtualHost = parts[2],
                UserName = parts[3],
                Password = parts[4]
            };
        }

        public static void SendMessage(MessagePayload payload)
        {
            using var conn = CreateFactory().CreateConnection();
            using var channel = conn.CreateModel();
            
            var body = Encoding.UTF8.GetBytes(payload.Content);
            var props = channel.CreateBasicProperties();
            props.DeliveryMode = 2;
            
            channel.BasicPublish(
                exchange: payload.Exchange,
                routingKey: payload.RoutingKey,
                basicProperties: props,
                body: body
            );
        }

        public static void ReceiveMessages(string queueName, Action<string> processor)
        {
            using var connection = GetConnection();
            using var channel = connection.CreateModel();
            
            channel.BasicQos(0, 2, false);
            var consumer = new EventingBasicConsumer(channel);
            
            consumer.Received += (_, eventArgs) => 
            {
                var body = eventArgs.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                processor(message);
            };
            
            channel.BasicConsume(
                queue: queueName,
                autoAck: true,
                consumer: consumer
            );
            
            while (true) Thread.Sleep(1000);
        }
    }
}

メッセージモデル

namespace MessagingService.Models
{
    public class MessagePayload
    {
        public string Content { get; set; }
        public string Exchange { get; set; }
        public string RoutingKey { get; set; }
    }
}

設定ファイル例

<configuration>
  <appSettings>
    <add key="mqConfig" value="192.168.1.10;5672;my_vhost;admin;password123" />
    <add key="exchangeName" value="direct_exchange" />
    <add key="messageQueue" value="message_processing" />
  </appSettings>
</configuration>

メッセージ送信実装

var exchange = ConfigurationManager.AppSettings["exchangeName"];
var routeKey = ConfigurationManager.AppSettings["messageQueue"];

var messageData = new MessagePayload
{
    Exchange = exchange,
    RoutingKey = routeKey,
    Content = JsonConvert.SerializeObject(messageObj)
};

RabbitOperator.SendMessage(messageData);

メッセージ受信処理

var targetQueue = ConfigurationManager.AppSettings["messageQueue"];

Task.Run(() => 
{
    RabbitOperator.ReceiveMessages(targetQueue, ProcessMessage);
});

private void ProcessMessage(string jsonMessage)
{
    // メッセージ処理ロジック
}

メッセージ消費はWindowsサービスとして常時実行し、キューを監視します。

タグ: RabbitMQ .NET C# メッセージキュー 分散システム

5月18日 09:41 投稿