RabbitMQ基本設定
サーバーにRabbitMQをインストール後、vhostの作成とexchange/queueのバインディングを設定します。
プロジェクト構成
NuGetからRabbitMQ.Clientパッケージをインストールし、以下のヘルパークラスを実装します。
RabbitMQ操作クラス
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Configuration;
using System.Text;
namespace MessagingService.Providers
{
public class RabbitOperator
{
private static IConnection _connection;
private static readonly object LockObj = new object();
public static IConnection GetConnection()
{
if (_connection?.IsOpen ?? false) return _connection;
lock (LockObj)
{
_connection = CreateFactory().CreateConnection();
}
return _connection;
}
private static ConnectionFactory CreateFactory()
{
var config = ConfigurationManager.AppSettings["mqConfig"];
var parts = config.Split(';');
return new ConnectionFactory
{
HostName = parts[0],
Port = int.Parse(parts[1]),
VirtualHost = parts[2],
UserName = parts[3],
Password = parts[4]
};
}
public static void SendMessage(MessagePayload payload)
{
using var conn = CreateFactory().CreateConnection();
using var channel = conn.CreateModel();
var body = Encoding.UTF8.GetBytes(payload.Content);
var props = channel.CreateBasicProperties();
props.DeliveryMode = 2;
channel.BasicPublish(
exchange: payload.Exchange,
routingKey: payload.RoutingKey,
basicProperties: props,
body: body
);
}
public static void ReceiveMessages(string queueName, Action<string> processor)
{
using var connection = GetConnection();
using var channel = connection.CreateModel();
channel.BasicQos(0, 2, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (_, eventArgs) =>
{
var body = eventArgs.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
processor(message);
};
channel.BasicConsume(
queue: queueName,
autoAck: true,
consumer: consumer
);
while (true) Thread.Sleep(1000);
}
}
}
メッセージモデル
namespace MessagingService.Models
{
public class MessagePayload
{
public string Content { get; set; }
public string Exchange { get; set; }
public string RoutingKey { get; set; }
}
}
設定ファイル例
<configuration>
<appSettings>
<add key="mqConfig" value="192.168.1.10;5672;my_vhost;admin;password123" />
<add key="exchangeName" value="direct_exchange" />
<add key="messageQueue" value="message_processing" />
</appSettings>
</configuration>
メッセージ送信実装
var exchange = ConfigurationManager.AppSettings["exchangeName"];
var routeKey = ConfigurationManager.AppSettings["messageQueue"];
var messageData = new MessagePayload
{
Exchange = exchange,
RoutingKey = routeKey,
Content = JsonConvert.SerializeObject(messageObj)
};
RabbitOperator.SendMessage(messageData);
メッセージ受信処理
var targetQueue = ConfigurationManager.AppSettings["messageQueue"];
Task.Run(() =>
{
RabbitOperator.ReceiveMessages(targetQueue, ProcessMessage);
});
private void ProcessMessage(string jsonMessage)
{
// メッセージ処理ロジック
}
メッセージ消費はWindowsサービスとして常時実行し、キューを監視します。