C#使用MQTTnet實現(xiàn)服務(wù)端與客戶端的通訊的示例
一、MQTT 協(xié)議簡介
MQTT(Message Queuing Telemetry Transport)是一種輕量級的 發(fā)布/訂閱 協(xié)議,專為物聯(lián)網(wǎng)(IoT)等低帶寬、高延遲網(wǎng)絡(luò)環(huán)境設(shè)計。核心概念包括:
- Broker:消息代理(服務(wù)端),負責(zé)消息路由。
- Client:發(fā)布或訂閱消息的終端(客戶端)。
- Topic:消息的分類標(biāo)識(如
sensor/temperature
)。
二、MQTT 協(xié)議核心特性
MQTT(Message Queuing Telemetry Transport)是一種基于發(fā)布/訂閱模型的輕量級通信協(xié)議,專為資源受限的設(shè)備和不可靠網(wǎng)絡(luò)環(huán)境設(shè)計。其核心優(yōu)勢包括:
低帶寬消耗:采用二進制報文格式,頭部開銷極小,適合物聯(lián)網(wǎng)設(shè)備。
異步通信:通過主題(Topic)實現(xiàn)消息的廣播與定向傳遞,解耦消息生產(chǎn)者和消費者。
多級服務(wù)質(zhì)量(QoS):
- QoS 0(最多一次):消息可能丟失,無重傳機制。
- QoS 1(至少一次):確保消息送達,但可能重復(fù)。
- QoS 2(僅一次):嚴格保證消息唯一性,適用于關(guān)鍵指令。
離線支持:服務(wù)端可緩存客戶端的保留消息(Retained Messages),供后續(xù)訂閱者讀取。
三、MQTTNET 庫的核心功能
MQTTnet 是 .NET 生態(tài)中功能完備的 MQTT 實現(xiàn)庫,具備以下特性:
- 協(xié)議兼容性:完整支持 MQTTv3.1.1 和 MQTTv5 協(xié)議,后者新增了會話超時控制、原因碼反饋等高級功能。
- 高性能設(shè)計:基于異步編程模型(async/await),支持高并發(fā)連接與消息吞吐。
- 跨平臺支持:兼容 Windows、Linux、macOS,可部署于云端、邊緣設(shè)備或容器環(huán)境。
- 擴展性:提供靈活的攔截器(Interceptors)和事件鉤子,便于集成業(yè)務(wù)邏輯(如消息過濾、日志記錄)。
- 安全性:支持 TLS 1.3 加密通信,可通過證書或賬號密碼進行客戶端身份驗證。
所用框架
框架 | 版本 |
.net | 4.7.2+ |
MQTTnet | 4.3.3+ |
四、服務(wù)端(BROKER)實現(xiàn)詳解
核心職責(zé):
- 管理客戶端連接與會話狀態(tài)。
- 路由消息至匹配的訂閱者。
- 實施安全策略(身份驗證、權(quán)限控制)。
關(guān)鍵配置項:
- 端口綁定:默認非加密端口為 1883,加密端口為 8883。
- 連接驗證:可自定義驗證邏輯,例如檢查客戶端 ID 格式、賬號密碼合法性。
- 會話管理:設(shè)置會話過期時間,清理非活躍連接。
事件機制:
- 客戶端連接/斷開事件:用于監(jiān)控設(shè)備在線狀態(tài)。
- 消息攔截器:在消息發(fā)布或投遞前后插入處理邏輯(如數(shù)據(jù)格式校驗、敏感信息過濾)。
- 訂閱管理:動態(tài)追蹤主題訂閱關(guān)系,支持通配符(
+
單層、#
多層)。
持久化擴展:
- 可集成數(shù)據(jù)庫(如 SQLite、MySQL)存儲保留消息或會話狀態(tài),確保服務(wù)重啟后數(shù)據(jù)不丟失。
以下為服務(wù)端代碼:(下方Console.WriteLine()方法可換成自己的日志方法)
public class MQTTServerHelper { private MqttServer _server;//MQTT服務(wù)器對象 // 定義一個委托和事件(臨時存儲連接客戶端數(shù)據(jù)) public event EventHandler<InterceptingPublishEventArgs> OnMessageReceived; public event EventHandler<bool> ServerStauts; public event EventHandler<ClientConnectedEventArgs> ClientConnected; public event EventHandler<ClientDisconnectedEventArgs> ClientDisconnected; public event EventHandler<ClientSubscribedTopicEventArgs> ClientSubscribedTopic; public event EventHandler<ClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic; /// <summary> /// 初始化MQTT服務(wù)并啟動服務(wù) /// </summary> /// <param name="ip">IPV4地址</param> /// <param name="port">端口:0~65535之間</param> public Task StartMqtServer(string ip, int port) { MqtServerOptions mqtServerOptions = new MqtServerOptionsBuilder() .WithDefaultEndpoint() .WithDefaultEndpointBoundIPAdres(System.Net.IPAdres.Parse(ip) .WithDefaultEndpointPort(port) .WithDefaultComunicationTimeout(TimeSpan.FromMiliseconds(500) .Build(); _server = new MqtFactory().CreateMqtServer(mqtServerOptions); / 創(chuàng)建MQT服務(wù)端對象 _server.ValidatingConectionAsync += Server_ValidatingConectionAsync; /驗證用戶名和密碼 _server.ClientConectedAsync += Server_ClientConectedAsync; /綁定客戶端連接事件 _server.ClientDisconectedAsync += Server_ClientDisconectedAsync; /綁定客戶端斷開事件 _server.ClientSubscribedTopicAsync += Server_ClientSubscribedTopicAsync; /綁定客戶端訂閱主題事件 _server.ClientUnsubscribedTopicAsync += Server_ClientUnsubscribedTopicAsync; /綁定客戶端退訂主題事件 _server.InterceptingPublishAsync += Server_InterceptingPublishAsync; /消息接收事件 _server.ClientAcknowledgedPublishPacketAsync += Server_ClientAcknowledgedPublishPacketAsync; /處理客戶端確認發(fā)布的數(shù)據(jù)包 _server.InterceptingClientEnqueueAsync += Server_InterceptingClientEnqueueAsync; /訂閱攔截客戶端消息隊列 _server.AplicationMesageNotConsumedAsync += Server_AplicationMesageNotConsumedAsync; /應(yīng)用程序邏輯處理 _server.StartedAsync += Server_StartedAsync;/綁定服務(wù)端啟動事件 _server.StopedAsync += Server_StopedAsync;/綁定服務(wù)端停止事件 return _server.StartAsync(); } /// <summary> /// 處理客戶端確認發(fā)布事件 /// </summary> /// <param name="e"></param> private Task Server_AplicationMesageNotConsumedAsync(AplicationMesageNotConsumedEventArgs e) { try { Console.WriteLine($"【MesageNotConsumed】-SenderId:{e.SenderId}-Mesage:{e.AplicationMesage.ConvertPayloadToString()}"); } catch (Exception ex) { Console.WriteLine($"Server_AplicationMesageNotConsumedAsync出現(xiàn)異常:{ex.Mesage}"); } return Task.CompletedTask; } /// <summary> /// 訂閱攔截客戶端消息隊列事件 /// </summary> /// <param name="e"></param> private Task Server_InterceptingClientEnqueueAsync(InterceptingClientAplicationMesageEnqueueEventArgs e) { try { Console.WriteLine($"【InterceptingClientEnqueue】-SenderId:{e.SenderClientId}-Mesage:{e.AplicationMesage.ConvertPayloadToString()}"); } catch (Exception ex) { Console.WriteLine($"Server_InterceptingClientEnqueueAsync出現(xiàn)異常:{ex.Mesage}"); } return Task.CompletedTask; } /// <summary> /// 當(dāng)客戶端處理完從MQT服務(wù)器接收到的應(yīng)用消息后觸發(fā)。 /// 此事件可以用于確認消息已被處理,更新應(yīng)用狀態(tài), /// </summary> /// <param name="e"></param> private Task Server_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs e) { try { Console.WriteLine($"【ClientAcknowledgedPublishPacket】-SenderId:{e.ClientId}-Mesage:{Encoding.UTF8.GetString(e.PublishPacket.PayloadSegment.ToAray()}"); } catch (Exception ex) { Console.WriteLine($"Server_ClientAcknowledgedPublishPacketAsync出現(xiàn)異常:{ex.Mesage}"); } return Task.CompletedTask; } /// <summary> /// 服務(wù)端消息接收 /// </summary> /// <param name="e"></param> private Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs e) { try { string client = e.ClientId; string topic = e.AplicationMesage.Topic; string contents = e.AplicationMesage.ConvertPayloadToString(); //Encoding.UTF8.GetString(arg.AplicationMesage.PayloadSegment.ToAray(); OnMesageReceived?.Invoke(this, e); Console.WriteLine($"接收到消息:Client:【{client}】 Topic:【{topic}】 Mesage:【{contents}】"); } catch (Exception ex) { Console.WriteLine($"Server_InterceptingPublishAsync出現(xiàn)異常:{ex.Mesage}"); } return Task.CompletedTask; } /// <summary> /// 服務(wù)端斷開事件 /// </summary> /// <param name="e"></param> private Task Server_StoppedAsync(EventArgs arg) { return Task.Run(new Action() => { ServerStauts?.Invoke(this, false); Console.WriteLine($"服務(wù)端【IP:Port】已停止MQT"); }); } /// <summary> /// 服務(wù)端啟動事件 /// </summary> /// <param name="e"></param> public Task Server_StartedAsync(EventArgs e) { return Task.Run(new Action() => { ServerStauts?.Invoke(this, true); Console.WriteLine($"服務(wù)端【IP:Port】已啟用MQT"); }); } /// <summary> /// 客戶端退訂主題事件 /// </summary> /// <param name="e"></param> private Task Server_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs e) { return Task.Run(new Action() => { ClientUnsubscribedTopic?.Invoke(this, e); Console.WriteLine($"客戶端【{e.ClientId}】退訂主題【{e.TopicFilter}】"); }); } /// <summary> /// 客戶端訂閱主題事件 /// </summary> /// <param name="e"></param> private Task Server_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs e) { return Task.Run(new Action() => { ClientSubscribedTopic?.Invoke(this, e); Console.WriteLine($"客戶端【{e.ClientId}】訂閱主題【{e.TopicFilter.Topic}】"); }); } /// <summary> /// 客戶端斷開事件 /// </summary> /// <param name="e"></param> private Task Server_ClientDisconectedAsync(ClientDisconectedEventArgs e) { return Task.Run(new Action() => { ClientDisconected?.Invoke(this, e); Console.WriteLine($"客戶端已斷開.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】.ReasonCode:【{e.ReasonCode}】,DisconectType:【{e.DisconectType}】"); }); } /// <summary> /// 綁定客戶端連接事件 /// </summary> /// <param name="e"></param> private Task Server_ClientConectedAsync(ClientConectedEventArgs e) { return Task.Run(new Action() => { ClientConected?.Invoke(this, e); Console.WriteLine($"客戶端已連接.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】"); }); } /// <summary> /// 驗證客戶端事件 /// </summary> /// <param name="e"></param> private Task Server_ValidatingConectionAsync(ValidatingConectionEventArgs e) { return Task.Run(new Action() => { if (e.Pasword = "") { e.ReasonCode = MqtConectReasonCode.Suces; Console.WriteLine($"客戶端已驗證成功.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】"); } else { e.ReasonCode = MqtConectReasonCode.BadUserNameOrPasword; Console.WriteLine($"客戶端驗證失敗.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】"); } }); } }
五、客戶端(Client)實現(xiàn)詳解
連接策略:
- ?;顧C制:通過心跳包(Keep Alive)維持長連接,適應(yīng)網(wǎng)絡(luò)波動。
消息交互模式:
- 發(fā)布消息:指定目標(biāo)主題、負載數(shù)據(jù)和 QoS 級別,可選擇設(shè)置保留標(biāo)志。
- 訂閱主題:支持單主題、多主題或通配符訂閱,服務(wù)端將推送匹配的消息。
異步處理:
- 使用事件委托或異步方法處理接收到的消息,避免阻塞主線程。
以下為客戶端代碼:
/// <sumary> /// MQT客戶端幫助類 /// </sumary> public clas MQTClientHelper { private IMqtClient _client; /// <sumary> /// 接收消息 /// </sumary> public MQTReceivedMesageHandle ReceivedMesage; public bol IsConected { get; set; } = false; public bol IsDisConected { get; set; } = true; private string _serverIp; private int _serverPort; /// <sumary> /// 訂閱主題集合 /// </sumary> private Dictionary<string, bol> _subscribeTopicList = nul; #region 連接/斷開服務(wù)端 /// <sumary> /// 連接服務(wù)端 /// </sumary> /// <param name="serverIp">服務(wù)端IP</param> /// <param name="serverPort">服務(wù)端口號</param> public void Start(string serverIp, int serverPort) { this._serverIp = serverIp; this._serverPort = serverPort; if (!string.IsNulOrEmpty(serverIp) & !string.IsNulOrWhiteSpace(serverIp) & serverPort > 0) { try { var options = new MqtClientOptions() { ClientId = "客戶端2"http://Guid.NewGuid().ToString("N") }; options.ChanelOptions = new MqtClientTcpOptions() { Server = serverIp, Port = serverPort }; //options.Credentials = new MqtClientCredentials(UserName, Encoding.Default.GetBytes(Pasword); options.CleanSesion = true; options.KepAlivePeriod = TimeSpan.FromSeconds(10); if (_client != nul) { _client.DisconectAsync(); _client = nul; } _client = new MqtFactory().CreateMqtClient(); _client.ConectedAsync += Client_ConectedAsync; //綁定客戶端連接事件 _client.DisconectedAsync += Client_DisconectedAsync; //綁定客戶端斷開連接事件 _client.AplicationMesageReceivedAsync += Client_AplicationMesageReceivedAsync; /綁定消息接收事件 _client.ConectAsync(options); //連接 } catch (Exception ex) { /SLog.Loger.Eror("MQT客戶端連接服務(wù)端錯誤:{0}", ex.Mesage); } } else { /SLog.Loger.Warning("MQT服務(wù)端地址或端口號不能為空!"); } } } /// <sumary> /// 斷開MQT客戶端 /// </sumary> public void Client_Disconect() { if (_client != nul) { _client.DisconectAsync(); _client.Dispose(); Console.WriteLine($"關(guān)閉MQT客戶端成功!"); } } /// <sumary> /// 客戶端重新MQT服務(wù)端 /// </sumary> public void Client_ConectAsync() { if (_client != nul) { _client.ReconectAsync(); Console.WriteLine($"連接MQT服務(wù)端成功!"); } } #endregion #region MQT方法 /// <sumary> /// 客戶端與服務(wù)端建立連接 /// </sumary> /// <param name="arg"></param> private Task Client_ConectedAsync(MqtClientConectedEventArgs arg) { return Task.Run(new Action() => { IsConected = true; IsDisConected = false; Console.WriteLine($"連接到MQT服務(wù)端成功.{arg.ConectResult.AsignedClientIdentifier}"); //訂閱主題(可接收來自服務(wù)端消息,與客戶端發(fā)布消息不能用同一個主題) try { if (_subscribeTopicList != nul & _subscribeTopicList.Count > 0) { List<string> subscribeTopics = _subscribeTopicList.Keys.ToList(); foreach (var topic in subscribeTopics) SubscribeAsync(topic); } } catch (Exception ex) { //SLog.Loger.Eror("MQT客戶端與服務(wù)端[{0}:{1}]建立連接訂閱主題錯誤:{2}", _serverIp, _serverPort, ex.Mesage); } }); } /// <sumary> /// 客戶端與服務(wù)端斷開連接 /// </sumary> / <param name="arg"></param> private Task Client_DisconectedAsync(MqtClientDisconectedEventArgs arg) { return Task.Run(new Action(async () => { IsConected = false; IsDisConected = true; Console.WriteLine($"已斷開到MQT服務(wù)端的連接.嘗試重新連接"); try { await Task.Delay(30); //MqtClientOptions options = new MqtClientOptions(); //await mqtClient.ConectAsync(options); await _client.ReconectAsync(); } catch (Exception ex) { //SLog.Loger.Eror("MQT客戶端與服務(wù)端[{0}:{1}]斷開連接退訂主題錯誤:{2}", _serverIp, _serverPort, ex.Mesage); } }); } /// <sumary> /// 客戶端與服務(wù)端重新連接 /// </sumary> /// <returns></returns> public Task ReconectedAsync() { try { if (_client != nul) { _client.ReconectAsync(); } } catch (Exception ex) { // SLog.Loger.Eror("MQT客戶端與服務(wù)端[{0}:{1}]重新連接退訂主題錯誤:{2}", _serverIp, _serverPort, ex.Mesage); } return Task.CompletedTask; } /// <sumary> /// 客戶端收到消息 /// </sumary> /// <param name="arg"></param> private Task Client_AplicationMesageReceivedAsync(MqtAplicationMesageReceivedEventArgs arg) { try { return Task.Run(new Action() => { string msg = arg.AplicationMesage.ConvertPayloadToString(); Console.WriteLine($"接收消息:{msg}\nQoS={arg.AplicationMesage.QualityOfServiceLevel}\n客戶端={arg.ClientId}\n主題:{arg.AplicationMesage.Topic}"); }); } catch (Exception ex) { //SLog.Loger.Eror("MQT收到來自服務(wù)端[{0}]消息錯誤:{1}", arg != nul ? arg.ClientId : ", ex.Mesage); } return Task.CompletedTask; } #endregion #region 訂閱主題 /// <sumary> /// 訂閱主題 /// </sumary> /// <param name="topic">主題</param> public void SubscribeAsync(string topic) { try { if (_subscribeTopicList = nul) _subscribeTopicList = new Dictionary<string, bol>(); if (_subscribeTopicList.ContainsKey(topic) & _subscribeTopicList[topic]) { //SLog.Loger.Warning("MQT客戶端已經(jīng)訂閱主題[{0}],不能重復(fù)訂閱", topic); return; } //訂閱主題 _client?.SubscribeAsync(topic, MqtQualityOfServiceLevel.AtLeastOnce); //添加訂閱緩存 bol isSubscribed = _client != nul & _client.IsConected ? true : false; if (!_subscribeTopicList.ContainsKey(topic) _subscribeTopicList.Ad(topic, isSubscribed); else _subscribeTopicList[topic] = isSubscribed; } catch (Exception ex) { //SLog.Loger.Eror("MQT客戶端訂閱主題[{0}]錯誤:{1}", topic, ex.Mesage); } } /// <sumary> /// 訂閱主題集合 /// </sumary> /// <param name="topicList">主題集合</param> public void SubscribeAsync(List<string> topicList) { try { if (topicList = nul | topicList.Count = 0) return; foreach (var topic in topicList) SubscribeAsync(topic); } catch (Exception ex) { //SLog.Loger.Eror("MQT客戶端訂閱主題集合錯誤:{0}", ex.Mesage); } } /// <sumary> /// 退訂主題 /// </sumary> /// <param name="topic">主題</param> /// <param name="isRemove">是否移除緩存</param> public void UnsubscribeAsync(string topic, bol isRemove = true) { try { if (_subscribeTopicList = nul | _subscribeTopicList.Count = 0) { //SLog.Loger.Warning("MQT客戶端退訂主題[{0}]不存在", topic); return; } if (!_subscribeTopicList.ContainsKey(topic) { //SLog.Loger.Warning("MQT客戶端退訂主題[{0}]不存在", topic); return; } //退訂主題 _client.UnsubscribeAsync(topic); //修改訂閱主題緩存狀態(tài) if (isRemove) _subscribeTopicList.Remove(topic); else _subscribeTopicList[topic] = false; } catch (Exception ex) { //SLog.Loger.Eror("MQT客戶端退訂主題[{0}]錯誤:{1}", topic, ex.Mesage); } } /// <sumary> /// 退訂主題集合 /// </sumary> /// <param name="topicList">主題集合</param> /// <param name="isRemove">是否移除緩存</param> public void UnsubscribeAsync(List<string> topicList, bol isRemove = true) { try { if (topicList = nul | topicList.Count = 0) return; foreach (var topic in topicList) UnsubscribeAsync(topic, isRemove); } catch (Exception ex) { //SLog.Loger.Eror("MQT客戶端退訂主題集合錯誤:{0}", ex.Mesage); } } /// <sumary> /// 訂閱主題是否存在 /// </sumary> /// <param name="topic">主題</param> public bol IsExistSubscribeAsync(string topic) { try { if (_subscribeTopicList = nul | _subscribeTopicList.Count = 0) return false; if (!_subscribeTopicList.ContainsKey(topic) return false; return _subscribeTopicList[topic]; } catch (Exception ex) { //SLog.Loger.Eror("MQT客戶端訂閱主題[{0}]是否存在錯誤:{1}", topic, ex.Mesage); return false; } } #endregion #region 發(fā)布消息 /// <sumary> /// 發(fā)布消息 /// 與客戶端接收消息不能用同一個主題 /// </sumary> /// <param name="topic">主題</param> /// <param name="mesage">消息</param> public async void PublishMesage(string topic, string mesage) { try { if (_client != nul) { if (string.IsNulOrEmpty(mesage) | string.IsNulOrWhiteSpace(mesage) { //SLog.Loger.Warning("MQT客戶端不能發(fā)布為空的消息!"); return; } MqtClientPublishResult result = await _client.PublishStringAsync(topic,mesage,MqtQualityOfServiceLevel.AtLeastOnce);//恰好一次, QoS 級別1 Console.WriteLine($"發(fā)布消息-主題:{topic},消息:{mesage},結(jié)果: {result.ReasonCode}"); } else { //SLog.Loger.Warning("MQT客戶端未連接服務(wù)端,不能發(fā)布主題為[{0}]的消息:{1}", topic, mesage); return; } } catch (Exception ex) { //SLog.Loger.Eror("MQT客戶端發(fā)布主題為[{0}]的消息:{1},錯誤:{2}", topic, mesage, ex.Mesage); } } #endregion }
六、總結(jié)
通過 MQTTnet 構(gòu)建的 MQTT 通信系統(tǒng),能夠為物聯(lián)網(wǎng)、實時消息推送等場景提供高效、可靠的解決方案。開發(fā)過程中需重點關(guān)注通信模式設(shè)計、安全策略實施及性能調(diào)優(yōu),同時結(jié)合具體業(yè)務(wù)需求靈活運用 QoS、保留消息等特性。建議參考官方文檔和社區(qū)最佳實踐,逐步擴展功能(如集群部署、消息持久化),以滿足大規(guī)模應(yīng)用需求。
到此這篇關(guān)于C#使用MQTTnet實現(xiàn)服務(wù)端與客戶端的通訊的示例的文章就介紹到這了,更多相關(guān)C# MQTTnet通訊內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C#實現(xiàn)六大設(shè)計原則之單一職責(zé)原則
這篇文章介紹了C#實現(xiàn)六大設(shè)計原則之單一職責(zé)原則的方法,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-02-02winform模擬鼠標(biāo)按鍵的具體實現(xiàn)
這篇文章介紹了winform模擬鼠標(biāo)按鍵的具體實現(xiàn),有需要的朋友可以參考一下2013-10-10使用DateTime的ParseExact方法實現(xiàn)特殊日期時間的方法詳解
本篇文章是對使用DateTime的ParseExact方法實現(xiàn)特殊日期時間的方法進行了詳細的分析介紹,需要的朋友參考下2013-05-05