C#MQTT協(xié)議服務(wù)器與客戶端通訊實(shí)現(xiàn)(客戶端包含斷開重連模塊)
1 DLL版本
MQTTnet.DLL版本-2.7.5.0
基于比較老的項(xiàng)目中應(yīng)用的DLL,其他更高版本變化可能較大,謹(jǐn)慎參考。
2 服務(wù)器
開啟服務(wù)器
關(guān)閉服務(wù)器
綁定事件【客戶端連接服務(wù)器事件】
綁定事件【客戶端斷開(服務(wù)器)連接事件】
綁定事件【客戶端訂閱主題事件】
綁定事件【客戶端退訂主題事件】
綁定事件【接收客戶端(發(fā)送)消息事件】
using System; using System.Net; using MQTTnet; using MQTTnet.Server; namespace Demo_MQTT.Model { public class ServerModel { private static MqttServer _mqttServer = null; private readonly Action<string> _callbackLog; public ServerModel(Action<string> callbackLog) { _callbackLog = callbackLog; } /// <summary> /// 綁定客戶端連接服務(wù)器事件 /// </summary> private void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e) { WriteLog($"客戶端[{e.Client.ClientId}]已連接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}"); } /// <summary> /// 綁定客戶端斷開連接事件 /// </summary> private void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e) { WriteLog($"客戶端[{e.Client.ClientId}]已斷開連接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}"); } /// <summary> /// 綁定客戶端訂閱主題事件 /// </summary> private void Server_ClientSubscribedTopic(object sensor, MqttClientSubscribedTopicEventArgs e) { WriteLog($">>> 客戶端{(lán)e.ClientId}訂閱主題{e.TopicFilter.Topic}"); } /// <summary> /// 綁定客戶端退訂主題事件 /// </summary> /// <param name="e"></param> private void Server_ClientUnsubscribedTopic(object sensor, MqttClientUnsubscribedTopicEventArgs e) { WriteLog($">>> 客戶端{(lán)e.ClientId}退訂主題{e.TopicFilter}"); } /// <summary> /// 綁定接收客戶端消息事件 /// </summary> private void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { WriteLog($"接收到{e.ClientId}發(fā)送來的消息! {DateTime.Now:yyyy-MM-dd HH:mm:ss} {Environment.NewLine}"); } private void WriteLog(string log) { _callbackLog?.Invoke(log); } /// <summary> /// 開啟服務(wù)器 /// </summary> /// <param name="ip">IP地址</param> /// <param name="port">端口號(hào)</param> public void StartServer(string ip, int port) { if (_mqttServer == null) { var optionsBuilder = new MqttServerOptionsBuilder() .WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)) .WithConnectionBacklog(1000) .WithDefaultEndpointPort(port); IMqttServerOptions options = optionsBuilder.Build(); try { _mqttServer = new MqttFactory().CreateMqttServer() as MqttServer; _mqttServer.ClientConnected += MqttServer_ClientConnected; _mqttServer.ClientDisconnected += MqttServer_ClientDisconnected; _mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived; _mqttServer.ClientSubscribedTopic += Server_ClientSubscribedTopic; _mqttServer.ClientUnsubscribedTopic += Server_ClientUnsubscribedTopic; _mqttServer.StartAsync(options); } catch (Exception ex) { Console.WriteLine(ex.Message); return; } WriteLog($"MQTT服務(wù)器啟動(dòng)成功 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}"); } } /// <summary> /// 關(guān)閉服務(wù)器 /// </summary> public void CloseServer() { _mqttServer?.StopAsync(); } } }
3 客戶端
連接服務(wù)器
屬性:客戶端連接狀態(tài)
客戶端斷開重連線程
獲取所有訂閱主題
訂閱主題
退訂主題
發(fā)送消息
綁定事件【客戶端連接服務(wù)器事件】
綁定事件【客戶端斷開(服務(wù)器)連接事件】
綁定事件【客戶端接收消息事件】
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; namespace Demo_MQTT.Model { public class ClientModel { /// <summary> /// 記錄所有訂閱主題,用于斷開重連時(shí)重新訂閱主題 /// </summary> private readonly List<string> _subscribeTopics = new List<string>(); private MqttClient _mqttClient = null; private string _serverIp; private int _nServerPort; private bool _isRunningReConnectThreadStart = false; private string _clienID; /// <summary> /// 接受消息回調(diào)函數(shù),參數(shù):主題,消息內(nèi)容 /// </summary> private readonly Action<string, byte[]> _callbackReceived; private readonly Action<string> _callbackLog; /// <summary> /// 構(gòu)造函數(shù) /// </summary> /// <param name="callbackReceived">接受消息回調(diào)函數(shù),參數(shù):主題,消息內(nèi)容</param> /// <param name="callbackLog"></param> public ClientModel(Action<string, byte[]> callbackReceived, Action<string> callbackLog) { _callbackReceived = callbackReceived; _callbackLog = callbackLog; } /// <summary> /// 連接服務(wù)器 /// </summary> private async void ConnectServer() { try { if (_mqttClient == null) { _mqttClient = new MqttFactory().CreateMqttClient() as MqttClient; _mqttClient.Connected += (s, a) => WriteLog($"【{_clienID}】已連接到MQTT服務(wù)器!"); _mqttClient.Disconnected += (s, a) => WriteLog($"【{_clienID}】已斷開MQTT連接!"); _mqttClient.ApplicationMessageReceived += (sender, args) => { _callbackReceived?.Invoke(args.ApplicationMessage.Topic, args.ApplicationMessage.Payload); }; } if (_mqttClient.IsConnected) return; IMqttClientOptions options = new MqttClientOptions { ChannelOptions = new MqttClientTcpOptions() { Server = _serverIp, Port = _nServerPort }, CleanSession = true }; _clienID = options.ClientId; await _mqttClient.ConnectAsync(options); if (_mqttClient.IsConnected) { ReConnectThreadStart(); SubscribeAsync(); } } catch (Exception ex) { WriteLog("連接到MQTT服務(wù)器失敗!"); } } /// <summary> /// 客戶端重連服務(wù)器線程-啟動(dòng) /// </summary> /// <returns></returns> private void ReConnectThreadStart() { if (_isRunningReConnectThreadStart) return; if (_mqttClient != null) { new Task(() => { _isRunningReConnectThreadStart = true; Thread.Sleep(5000); while (true) { Thread.Sleep(1000); if (!IsConnect) { WriteLog($"客戶端[{_clienID}]斷開連接,嘗試重新連接服務(wù)器中..."); int i; for (i = 0; i < 60; i++) { if (IsConnect) break; WriteLog($"嘗試第{i + 1}次連接服務(wù)器"); ConnectServer(); Thread.Sleep(1000); if (IsConnect) break; } _isRunningReConnectThreadStart = i < 60; } if (!_isRunningReConnectThreadStart) break; } }).Start(); } } private void WriteLog(string log) { _callbackLog?.Invoke(log); } /// <summary> /// 客戶端連接狀態(tài) /// </summary> public bool IsConnect => _mqttClient?.IsConnected == true; /// <summary> /// 連接服務(wù)器 /// </summary> /// <param name="serverIp">服務(wù)器IP</param> /// <param name="serverPort">服務(wù)器端口</param> /// <param name="topic"></param> public async void ConnectServer(string serverIp, int serverPort) { _serverIp = serverIp; _nServerPort = serverPort; await Task.Run(() => { ConnectServer(); }); } /// <summary> /// 關(guān)閉客戶端,斷開客戶端和服務(wù)器的連接 /// </summary> public void CloseClient() { _mqttClient.DisconnectAsync(); } /// <summary> /// 發(fā)送消息 /// </summary> /// <param name="topic">發(fā)送主題</param> /// <param name="cmd">發(fā)送內(nèi)容</param> [Obsolete("Obsolete")] public void PublishAsync(string topic, string cmd) { var bytes = Encoding.UTF8.GetBytes(cmd); var mode = MqttQualityOfServiceLevel.AtMostOnce; var appMsg = new MqttApplicationMessage(topic, bytes, mode, false); _mqttClient.PublishAsync(appMsg);//發(fā)送消息 } /// <summary> /// 訂閱主題 /// </summary> /// <param name="topics">主題標(biāo)識(shí)</param> public void SubscribeAsync(params string[] topics) { foreach (var topic in topics) { if (!_subscribeTopics.Contains(topic)) { _subscribeTopics.Add(topic); } } var topicFilters = _subscribeTopics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList(); _mqttClient?.SubscribeAsync(topicFilters); } /// <summary> /// 退訂已訂閱主題 /// </summary> /// <param name="topics">主題標(biāo)識(shí)</param> public void UnSubscribeAsync(params string[] topics) { if (topics == null || topics.Length == 0) return; var topicFilters = topics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList(); _mqttClient.SubscribeAsync(topicFilters); } /// <summary> /// 獲取所有訂閱主題 /// </summary> public string[] GetAllTopic => _subscribeTopics.ToArray(); } }
到此這篇關(guān)于C#MQTT協(xié)議服務(wù)器與客戶端通訊實(shí)現(xiàn)(客戶端包含斷開重連模塊)的文章就介紹到這了,更多相關(guān)C#中MQTT服務(wù)器與客戶端通訊內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
在Winform動(dòng)態(tài)啟動(dòng)、控制臺(tái)命令行的方法
winForm 程序輸出類型為 windows 程序(不是命令行程序)在運(yùn)行時(shí)想輸入一些信息編譯開發(fā)調(diào)試,如何實(shí)現(xiàn)這一功能2013-02-02WPF實(shí)現(xiàn)背景燈光隨鼠標(biāo)閃動(dòng)效果
這篇文章主要為大家詳細(xì)介紹了WPF實(shí)現(xiàn)背景燈光隨鼠標(biāo)閃動(dòng)效果,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-08-08基于C#實(shí)現(xiàn)簡單的隨機(jī)抽獎(jiǎng)小程序
臨近春節(jié),大街小巷的地方都有抽獎(jiǎng)活動(dòng),那么基于C#是如何實(shí)現(xiàn)簡單的抽獎(jiǎng)程序的呢,下面小編給大家分享了具體代碼,有需要的朋友參考下2016-01-01

C#實(shí)現(xiàn)程序等待延遲執(zhí)行的方法

基于C#的電視臺(tái)節(jié)目表接口調(diào)用代碼