欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C#MQTT協(xié)議服務(wù)器與客戶端通訊實(shí)現(xiàn)(客戶端包含斷開重連模塊)

 更新時(shí)間:2025年07月01日 11:10:04   作者:風(fēng),停下  
本文主要介紹了C#MQTT協(xié)議服務(wù)器與客戶端通訊實(shí)現(xiàn),包括客戶端包含斷開重連模塊,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

1 DLL版本

MQTTnet.DLL版本-2.7.5.0
基于比較老的項(xiàng)目中應(yīng)用的DLL,其他更高版本變化可能較大,謹(jǐn)慎參考。

2 服務(wù)器

開啟服務(wù)器
關(guān)閉服務(wù)器
綁定事件【客戶端連接服務(wù)器事件】
綁定事件【客戶端斷開(服務(wù)器)連接事件】
綁定事件【客戶端訂閱主題事件】
綁定事件【客戶端退訂主題事件】
綁定事件【接收客戶端(發(fā)送)消息事件】

using System;
using System.Net;
using MQTTnet;
using MQTTnet.Server;

namespace Demo_MQTT.Model
{
    public class ServerModel
    {
        private static MqttServer _mqttServer = null;


        private readonly Action<string> _callbackLog;

        public ServerModel(Action<string> callbackLog)
        {
            _callbackLog = callbackLog;
        }

        /// <summary>
        /// 綁定客戶端連接服務(wù)器事件
        /// </summary>
        private void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e)
        {
            WriteLog($"客戶端[{e.Client.ClientId}]已連接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
        }

        /// <summary>
        /// 綁定客戶端斷開連接事件
        /// </summary>
        private void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e)
        {
            WriteLog($"客戶端[{e.Client.ClientId}]已斷開連接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
        }

        /// <summary>
        /// 綁定客戶端訂閱主題事件
        /// </summary>
        private void Server_ClientSubscribedTopic(object sensor, MqttClientSubscribedTopicEventArgs e)
        {
            WriteLog($">>> 客戶端{(lán)e.ClientId}訂閱主題{e.TopicFilter.Topic}");
        }

        /// <summary>
        /// 綁定客戶端退訂主題事件
        /// </summary>
        /// <param name="e"></param>
        private void Server_ClientUnsubscribedTopic(object sensor, MqttClientUnsubscribedTopicEventArgs e)
        {
            WriteLog($">>> 客戶端{(lán)e.ClientId}退訂主題{e.TopicFilter}");

        }

        /// <summary>
        /// 綁定接收客戶端消息事件
        /// </summary>
        private void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
        {
            WriteLog($"接收到{e.ClientId}發(fā)送來的消息! {DateTime.Now:yyyy-MM-dd HH:mm:ss} {Environment.NewLine}");
        }

        private void WriteLog(string log)
        {
            _callbackLog?.Invoke(log);
        }

        /// <summary>
        /// 開啟服務(wù)器
        /// </summary>
        /// <param name="ip">IP地址</param>
        /// <param name="port">端口號(hào)</param>
        public void StartServer(string ip, int port)
        {
            if (_mqttServer == null)
            {
                var optionsBuilder = new MqttServerOptionsBuilder()
                    .WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip))
                    .WithConnectionBacklog(1000)
                    .WithDefaultEndpointPort(port);

                IMqttServerOptions options = optionsBuilder.Build();

                try
                {
                    _mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
                    _mqttServer.ClientConnected += MqttServer_ClientConnected;
                    _mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;
                    _mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;

                    _mqttServer.ClientSubscribedTopic += Server_ClientSubscribedTopic;
                    _mqttServer.ClientUnsubscribedTopic += Server_ClientUnsubscribedTopic;

                    _mqttServer.StartAsync(options);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    return;
                }

                WriteLog($"MQTT服務(wù)器啟動(dòng)成功 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
            }
        }

        /// <summary>
        /// 關(guān)閉服務(wù)器
        /// </summary>
        public void CloseServer()
        {
            _mqttServer?.StopAsync();
        }
    }
}

3 客戶端

連接服務(wù)器
屬性:客戶端連接狀態(tài)
客戶端斷開重連線程
獲取所有訂閱主題
訂閱主題
退訂主題
發(fā)送消息
綁定事件【客戶端連接服務(wù)器事件】
綁定事件【客戶端斷開(服務(wù)器)連接事件】
綁定事件【客戶端接收消息事件】

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;

namespace Demo_MQTT.Model
{
    public class ClientModel
    {
        /// <summary>
        /// 記錄所有訂閱主題,用于斷開重連時(shí)重新訂閱主題
        /// </summary>
        private readonly List<string> _subscribeTopics = new List<string>();
        private MqttClient _mqttClient = null;
        private string _serverIp;
        private int _nServerPort;
        private bool _isRunningReConnectThreadStart = false;
        private string _clienID;

        /// <summary>
        /// 接受消息回調(diào)函數(shù),參數(shù):主題,消息內(nèi)容
        /// </summary>
        private readonly Action<string, byte[]> _callbackReceived;
        private readonly Action<string> _callbackLog;

        /// <summary>
        /// 構(gòu)造函數(shù)
        /// </summary>
        /// <param name="callbackReceived">接受消息回調(diào)函數(shù),參數(shù):主題,消息內(nèi)容</param>
        /// <param name="callbackLog"></param>
        public ClientModel(Action<string, byte[]> callbackReceived, Action<string> callbackLog)
        {
            _callbackReceived = callbackReceived;
            _callbackLog = callbackLog;
        }

        /// <summary>
        /// 連接服務(wù)器
        /// </summary>
        private async void ConnectServer()
        {
            try
            {
                if (_mqttClient == null)
                {
                    _mqttClient = new MqttFactory().CreateMqttClient() as MqttClient;
                    _mqttClient.Connected += (s, a) => WriteLog($"【{_clienID}】已連接到MQTT服務(wù)器!");
                    _mqttClient.Disconnected += (s, a) => WriteLog($"【{_clienID}】已斷開MQTT連接!");
                    _mqttClient.ApplicationMessageReceived += (sender, args) =>
                    {
                        _callbackReceived?.Invoke(args.ApplicationMessage.Topic, args.ApplicationMessage.Payload);
                    };
                }
                if (_mqttClient.IsConnected) return;

                IMqttClientOptions options = new MqttClientOptions
                {
                    ChannelOptions = new MqttClientTcpOptions()
                    {
                        Server = _serverIp,
                        Port = _nServerPort
                    },
                    CleanSession = true
                };

                _clienID = options.ClientId;
                await _mqttClient.ConnectAsync(options);
                if (_mqttClient.IsConnected)
                {
                    ReConnectThreadStart();
                    SubscribeAsync();
                }
            }
            catch (Exception ex)
            {
                WriteLog("連接到MQTT服務(wù)器失敗!");
            }
        }

        /// <summary>
        /// 客戶端重連服務(wù)器線程-啟動(dòng)
        /// </summary>
        /// <returns></returns>
        private void ReConnectThreadStart()
        {
            if (_isRunningReConnectThreadStart) return;

            if (_mqttClient != null)
            {
                new Task(() =>
                {
                    _isRunningReConnectThreadStart = true;
                    Thread.Sleep(5000);
                    while (true)
                    {
                        Thread.Sleep(1000);
                        if (!IsConnect)
                        {
                            WriteLog($"客戶端[{_clienID}]斷開連接,嘗試重新連接服務(wù)器中...");
                            int i;
                            for (i = 0; i < 60; i++)
                            {
                                if (IsConnect) break;
                                WriteLog($"嘗試第{i + 1}次連接服務(wù)器");
                                ConnectServer();
                                Thread.Sleep(1000);
                                if (IsConnect) break;
                            }
                            _isRunningReConnectThreadStart = i < 60;
                        }

                        if (!_isRunningReConnectThreadStart) break;
                    }

                }).Start();
            }
        }

        private void WriteLog(string log)
        {
            _callbackLog?.Invoke(log);
        }


        /// <summary>
        /// 客戶端連接狀態(tài)
        /// </summary>
        public bool IsConnect => _mqttClient?.IsConnected == true;

        /// <summary>
        /// 連接服務(wù)器
        /// </summary>
        /// <param name="serverIp">服務(wù)器IP</param>
        /// <param name="serverPort">服務(wù)器端口</param>
        /// <param name="topic"></param>
        public async void ConnectServer(string serverIp, int serverPort)
        {
            _serverIp = serverIp;
            _nServerPort = serverPort;

            await Task.Run(() => { ConnectServer(); });
        }

        /// <summary>
        /// 關(guān)閉客戶端,斷開客戶端和服務(wù)器的連接
        /// </summary>
        public void CloseClient()
        {
            _mqttClient.DisconnectAsync();
        }

        /// <summary>
        /// 發(fā)送消息
        /// </summary>
        /// <param name="topic">發(fā)送主題</param>
        /// <param name="cmd">發(fā)送內(nèi)容</param>
        [Obsolete("Obsolete")]
        public void PublishAsync(string topic, string cmd)
        {
            var bytes = Encoding.UTF8.GetBytes(cmd);
            var mode = MqttQualityOfServiceLevel.AtMostOnce;
            var appMsg = new MqttApplicationMessage(topic, bytes, mode, false);
            _mqttClient.PublishAsync(appMsg);//發(fā)送消息
        }

        /// <summary>
        /// 訂閱主題
        /// </summary>
        /// <param name="topics">主題標(biāo)識(shí)</param>
        public void SubscribeAsync(params string[] topics)
        {
            foreach (var topic in topics)
            {
                if (!_subscribeTopics.Contains(topic))
                {
                    _subscribeTopics.Add(topic);
                }
            }
            var topicFilters = _subscribeTopics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();
            _mqttClient?.SubscribeAsync(topicFilters);
        }

        /// <summary>
        /// 退訂已訂閱主題
        /// </summary>
        /// <param name="topics">主題標(biāo)識(shí)</param>
        public void UnSubscribeAsync(params string[] topics)
        {
            if (topics == null || topics.Length == 0) return;
            var topicFilters = topics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();
            _mqttClient.SubscribeAsync(topicFilters);
        }

        /// <summary>
        /// 獲取所有訂閱主題
        /// </summary>
        public string[] GetAllTopic => _subscribeTopics.ToArray();

    }
}

到此這篇關(guān)于C#MQTT協(xié)議服務(wù)器與客戶端通訊實(shí)現(xiàn)(客戶端包含斷開重連模塊)的文章就介紹到這了,更多相關(guān)C#中MQTT服務(wù)器與客戶端通訊內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • C#純代碼實(shí)現(xiàn)打字游戲

    C#純代碼實(shí)現(xiàn)打字游戲

    這篇文章主要介紹了這篇文章主要為大家詳細(xì)介紹了C#純代碼實(shí)現(xiàn)打字游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-05-05
  • C#實(shí)現(xiàn)程序等待延遲執(zhí)行的方法

    C#實(shí)現(xiàn)程序等待延遲執(zhí)行的方法

    這篇文章主要介紹了C#實(shí)現(xiàn)程序等待延遲執(zhí)行的方法,涉及C#動(dòng)態(tài)鏈接庫的使用及延遲的實(shí)現(xiàn)技巧,需要的朋友可以參考下
    2015-09-09
  • C#轉(zhuǎn)換日期類型實(shí)例

    C#轉(zhuǎn)換日期類型實(shí)例

    這篇文章主要介紹了C#轉(zhuǎn)換日期類型的方法,以實(shí)例形式分析了將日期格式轉(zhuǎn)換為Unix時(shí)間戳與時(shí)區(qū)結(jié)合的形式,是比較實(shí)用的技巧,具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2014-12-12
  • 快速了解c# 常量

    快速了解c# 常量

    這篇文章主要介紹了c# 常量的相關(guān)資料,文中講解非常細(xì)致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下
    2020-07-07
  • 基于C#的電視臺(tái)節(jié)目表接口調(diào)用代碼

    基于C#的電視臺(tái)節(jié)目表接口調(diào)用代碼

    這篇文章主要介紹了基于C#的電視臺(tái)節(jié)目表接口調(diào)用代碼的相關(guān)資料,需要的朋友可以參考下
    2016-06-06
  • 最新評(píng)論