概要
.NET 環境で MQTT 通信を実装する際、HslCommunication ライブラリを利用することで、サーバー(ブローカー)およびクライアント機能を比較的簡易に構築できます。本稿では、同ライブラリを用いた基本的なサーバーとクライアントのコード構成について解説します。
サーバー(ブローカー)側の実装
サーバー側では、指定したポートでリスニングを行い、クライアントの接続、切断、メッセージ受信イベントをハンドリングします。認証処理もここで定義可能です。
using HslCommunication.MQTT;
using System;
using System.Text;
namespace MqttBrokerApp
{
public class BrokerService
{
private MqttServer _broker;
public void Initialize(int port = 6666)
{
_broker = new MqttServer();
_broker.ServerStart(port);
// 各種イベントの登録
_broker.ClientVerification += ValidateCredentials;
_broker.OnClientConnected += HandleClientConnect;
_broker.OnClientDisConnected += HandleClientDisconnect;
_broker.OnClientApplicationMessageReceive += HandleIncomingMessage;
Console.WriteLine($"MQTT Broker started on port {port}");
}
// 接続認証コールバック
private int ValidateCredentials(string clientId, string user, string pass)
{
Console.WriteLine($"Auth request from {clientId}");
return 0; // 0 を返すと認証成功
}
// 接続イベント
private void HandleClientConnect(MqttSession session)
{
LogInfo($"Client Online: {session.ClientId} [{session.EndPoint}]");
}
// 切断イベント
private void HandleClientDisconnect(MqttSession session)
{
LogInfo($"Client Offline: {session.ClientId}");
}
// メッセージ受信イベント
private void HandleIncomingMessage(MqttSession session, MqttClientApplicationMessage message)
{
var content = Encoding.UTF8.GetString(message.Payload);
LogInfo($"Msg from {session.ClientId} on '{message.Topic}': {content}");
}
private void LogInfo(string message)
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] {message}");
}
}
class Program
{
static void Main(string[] args)
{
var service = new BrokerService();
service.Initialize();
// サーバーを維持するために待機
Console.ReadLine();
}
}
}
クライアント側の実装
クライアント側では、サーバーへ接続し、特定のトピックを購読および発行します。ここでは接続後にユーザー入力を受け付け、メッセージとしてpublishする構成としています。
using HslCommunication.MQTT;
using System;
using System.Text;
using System.Threading;
namespace MqttClientApp
{
public class DeviceSimulator
{
private MqttClient _client;
private readonly string _topicName = "device/telemetry";
public void Configure()
{
var options = new MqttConnectionOptions
{
ClientId = "Simulator_01",
IpAddress = "127.0.0.1",
Port = 6666,
Credentials = new MqttCredential("admin", "password")
};
_client = new MqttClient(options);
}
public void EstablishConnection()
{
var result = _client.ConnectServer();
if (result.IsSuccess)
{
Console.WriteLine("Connection established.");
}
else
{
Console.WriteLine($"Connection failed: {result.Message}");
}
}
public void SubscribeTopic()
{
_client.OnMqttMessageReceived += OnMessageReceived;
_client.SubscribeMessage(_topicName);
Console.WriteLine($"Subscribed to {_topicName}");
}
public void SendData(string data)
{
var message = new MqttApplicationMessage
{
Topic = _topicName,
Payload = Encoding.UTF8.GetBytes(data),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
Retain = false
};
_client.PublishMessage(message);
}
private void OnMessageReceived(string topic, byte[] payload)
{
var text = Encoding.UTF8.GetString(payload);
Console.WriteLine($"Received on {topic}: {text}");
}
}
class Program
{
static void Main(string[] args)
{
var device = new DeviceSimulator();
device.Configure();
device.EstablishConnection();
device.SubscribeTopic();
Console.WriteLine("Enter message to publish (type 'exit' to quit):");
while (true)
{
var input = Console.ReadLine();
if (string.IsNullOrEmpty(input) || input.ToLower() == "exit")
{
break;
}
device.SendData(input);
}
}
}
}