.NET6環(huán)境下實現(xiàn)MQTT通信及詳細(xì)代碼演示
前言: MQTT廣泛應(yīng)用于工業(yè)物聯(lián)網(wǎng)、智能家居、各類智能制造或各類自動化場景等。MQTT是一個基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議,在很多受限的環(huán)境下,比如說機器與機器通信、機器與物聯(lián)網(wǎng)通信等。好了,科普的廢話不多說,下面直接通過.NET環(huán)境來實現(xiàn)一套MQTT通信demo,實現(xiàn)服務(wù)端與客戶端的雙邊消息發(fā)布與訂閱的功能和演示。
開發(fā)環(huán)境:
VS2022 + .NET 6 + Webapi / 控制臺
1、新建一個webapi項目,用來后面做測試使用
2、新建一個繼承自IHostedService的服務(wù),用于隨著webapi程序的啟動而自動執(zhí)行。(最終代碼在文末)
3、引入 MQTTNet 包,該項目提供了.net環(huán)境下的MQTT通信協(xié)議支持,這款框架很優(yōu)秀,此處直接引用它來進(jìn)行使用。
4、在上面的MqttHostService類里面,開始方法里面新增初始化MQTT服務(wù)端的一些功能,例如 IP、端口號、事件等等。
5、mqtt服務(wù)端支持的一系列功能很多,大佬們可以自行去嘗試一些新發(fā)現(xiàn),此處只使用若干個簡單功能。
6、添加客戶端連接事件、連接關(guān)閉事件
7、由于事件要用的可能有點多,此處就不一一例舉了,可以直接看以下的代碼,以及有關(guān)注釋來理解。
8、事件觸發(fā)時候,打印輸出
9、輸出之前,記錄一個當(dāng)前事件名稱標(biāo)記一下,用于可以更加清楚看出是哪個事件輸出的。
10、對MqttHostService類進(jìn)行注冊,用于程序啟動時候跟隨啟動。
11、上面貌似設(shè)計的不是很友好,所以把mqtt服務(wù)實例單獨弄出來,寫入到單獨的類里面做成屬性,供方便調(diào)用。
12、把先前的一些東西改一下,換成使用上面步驟的屬性來直接調(diào)用使用。
13、運行一下,看看是否可以成功,顯示服務(wù)已啟動,說明服務(wù)啟動時OK的了.
14、新增一個控制臺程序 MqttClient,用于模擬客戶端。
15、創(chuàng)建客戶端啟動以及有關(guān)配置信息和有關(guān)事件,如圖。具體使用可以看代碼注釋,就不過多解釋了。
16、在program類里面,調(diào)用客戶端啟動方法,用于測試使用。
17、上面客戶端對應(yīng)的三個事件的實現(xiàn)如圖,同時進(jìn)行有關(guān)信息的打印輸出。
18、啟動服務(wù)端,然后啟動客戶端,可以看到服務(wù)端有一個連接失敗的消息,這個是因為上面配置的客戶端用戶名是admin,密碼是1234567,而服務(wù)端配置的規(guī)則是,用戶名是admin 密碼是123456
19、密碼改回正常匹配項以后,再重新運行試試看,可以看到客戶端與服務(wù)端連接上了。
20、如果關(guān)閉客戶端,也可以看到服務(wù)端會進(jìn)入客戶端關(guān)閉事件內(nèi)。
21、把上面主題訂閱的內(nèi)容寫到連接成功以后的事件里面,不然客戶端連接期間,可能就執(zhí)行了主題訂閱,會存在訂閱失敗的情況。改為寫入到連接成功以后的事件里面,可以保證主題訂閱肯定是在客戶端連接成功以后才執(zhí)行的。
22、接下來測試服務(wù)端消息推送,在MqttService服務(wù)里面,新增一個方法,用來執(zhí)行mqtt服務(wù)端發(fā)布主題消息使用。有關(guān)配置信息和消息格式,如圖所示。
23、新增一個API控制器,用來測試使用。API參數(shù)直接拿來進(jìn)行消息的推送使用。
24、運行服務(wù)端和客戶端,并訪問剛剛新增的api接口,手動隨意輸入一條消息,可以看到客戶端訂閱的主題消息已經(jīng)被實時接收到了。
25、接下來對客戶端新增一個消息推送的方法,用來測試客戶端消息發(fā)布的功能。有關(guān)消息格式和調(diào)用,如圖所示,以及注釋部分的說明。
26、客戶端program類里面,客戶端連接以后,通過手動回車,來執(zhí)行客戶端發(fā)布消息。
27、再次啟動服務(wù)端和客戶端
28、然后客戶端內(nèi)按一下回車,執(zhí)行消息發(fā)布功能??梢钥吹剑?wù)端成功接收到了客戶端發(fā)過來的主題消息。
29、接下來測試客戶端與客戶端之間的消息發(fā)布與訂閱,為了模擬多客戶端效果,把上面客戶端已經(jīng)編譯好的文件拷貝一份出來。
30、然后本地的代碼進(jìn)行一些修改,用來當(dāng)做第二個客戶端程序。所以客戶端id也進(jìn)行變更為 testclient02
31、對客戶端訂閱的主題,也改成 topic_02
32、啟動服務(wù)端,以及拷貝出來的客戶端1,和上面修改了部分代碼的客戶端2,保證都已經(jīng)連接上服務(wù)端。
33、調(diào)用服務(wù)端的api接口,由于服務(wù)端發(fā)布的消息是發(fā)布給topic_01的,所以只有客戶端1可以接收到消息。
34、客戶端1執(zhí)行回車,用于發(fā)布一段消息給主題 topic_02,可以看到客戶端01發(fā)布的消息,同時被服務(wù)端和客戶端02接收到了。因為服務(wù)端是總指揮,所以客戶端發(fā)布的消息都會經(jīng)過服務(wù)端,從而服務(wù)端都可以接收到連接的客戶端發(fā)布的所有消息。
35、測試數(shù)據(jù)保持,下面先對客戶端1進(jìn)行斷開,然后再重新連接客戶端1,可以看到客戶端1直接接收到了它訂閱的主題的上一次最新的消息內(nèi)容,這個就是消息里面,Retain屬性設(shè)為True的結(jié)果,用于讓服務(wù)端記憶該主題消息使用的。如果設(shè)為false,就沒有這個效果了,大佬們也可以自己嘗試。
36、最終的服務(wù)端代碼:
MqttHostService:
public class MqttHostService : IHostedService, IDisposable { public void Dispose() { } const string ServerClientId = "SERVER"; public Task StartAsync(CancellationToken cancellationToken) { MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder(); optionsBuilder.WithDefaultEndpoint(); optionsBuilder.WithDefaultEndpointPort(10086); // 設(shè)置 服務(wù)端 端口號 optionsBuilder.WithConnectionBacklog(1000); // 最大連接數(shù) MqttServerOptions options = optionsBuilder.Build(); MqttService._mqttServer = new MqttFactory().CreateMqttServer(options); MqttService._mqttServer.ClientConnectedAsync += _mqttServer_ClientConnectedAsync; //客戶端連接事件 MqttService._mqttServer.ClientDisconnectedAsync += _mqttServer_ClientDisconnectedAsync; // 客戶端關(guān)閉事件 MqttService._mqttServer.ApplicationMessageNotConsumedAsync += _mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件 MqttService._mqttServer.ClientSubscribedTopicAsync += _mqttServer_ClientSubscribedTopicAsync; // 客戶端訂閱主題事件 MqttService._mqttServer.ClientUnsubscribedTopicAsync += _mqttServer_ClientUnsubscribedTopicAsync; // 客戶端取消訂閱事件 MqttService._mqttServer.StartedAsync += _mqttServer_StartedAsync; // 啟動后事件 MqttService._mqttServer.StoppedAsync += _mqttServer_StoppedAsync; // 關(guān)閉后事件 MqttService._mqttServer.InterceptingPublishAsync += _mqttServer_InterceptingPublishAsync; // 消息接收事件 MqttService._mqttServer.ValidatingConnectionAsync += _mqttServer_ValidatingConnectionAsync; // 用戶名和密碼驗證有關(guān) MqttService._mqttServer.StartAsync(); return Task.CompletedTask; } /// <summary> /// 客戶端訂閱主題事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg) { Console.WriteLine($"ClientSubscribedTopicAsync:客戶端ID=【{arg.ClientId}】訂閱的主題=【{arg.TopicFilter}】 "); return Task.CompletedTask; } /// <summary> /// 關(guān)閉后事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_StoppedAsync(EventArgs arg) { Console.WriteLine($"StoppedAsync:MQTT服務(wù)已關(guān)閉……"); return Task.CompletedTask; } /// <summary> /// 用戶名和密碼驗證有關(guān) /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) { arg.ReasonCode = MqttConnectReasonCode.Success; if ((arg.Username ?? string.Empty)!="admin" || (arg.Password??String.Empty)!="123456") { arg.ReasonCode = MqttConnectReasonCode.Banned; Console.WriteLine($"ValidatingConnectionAsync:客戶端ID=【{arg.ClientId}】用戶名或密碼驗證錯誤 "); } return Task.CompletedTask; } /// <summary> /// 消息接收事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg) { if (string.Equals(arg.ClientId, ServerClientId)) { return Task.CompletedTask; } Console.WriteLine($"InterceptingPublishAsync:客戶端ID=【{arg.ClientId}】 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】"); return Task.CompletedTask; } /// <summary> /// 啟動后事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_StartedAsync(EventArgs arg) { Console.WriteLine($"StartedAsync:MQTT服務(wù)已啟動……"); return Task.CompletedTask; } /// <summary> /// 客戶端取消訂閱事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg) { Console.WriteLine($"ClientUnsubscribedTopicAsync:客戶端ID=【{arg.ClientId}】已取消訂閱的主題=【{arg.TopicFilter}】 "); return Task.CompletedTask; } private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg) { Console.WriteLine($"ApplicationMessageNotConsumedAsync:發(fā)送端ID=【{arg.SenderId}】 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】"); return Task.CompletedTask; } /// <summary> /// 客戶端斷開時候觸發(fā) /// </summary> /// <param name="arg"></param> /// <returns></returns> /// <exception cref="NotImplementedException"></exception> private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) { Console.WriteLine($"ClientDisconnectedAsync:客戶端ID=【{arg.ClientId}】已斷開, 地址=【{arg.Endpoint}】 "); return Task.CompletedTask; } /// <summary> /// 客戶端連接時候觸發(fā) /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg) { Console.WriteLine($"ClientConnectedAsync:客戶端ID=【{arg.ClientId}】已連接, 用戶名=【{arg.UserName}】地址=【{arg.Endpoint}】 "); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } }
MqttService:
public class MqttService { public static MqttServer _mqttServer { get; set; } public static void PublishData(string data) { var message = new MqttApplicationMessage { Topic = "topic_01", Payload = Encoding.Default.GetBytes(data), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Retain = true // 服務(wù)端是否保留消息。true為保留,如果有新的訂閱者連接,就會立馬收到該消息。 }; _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 發(fā)送消息給有訂閱 topic_01的客戶端 { SenderClientId = "Server_01" }).GetAwaiter().GetResult(); } }
37、最終的客戶端代碼:
MqttClientService:
public class MqttClientService { public static IMqttClient _mqttClient; public void MqttClientStart() { var optionsBuilder = new MqttClientOptionsBuilder() .WithTcpServer("127.0.0.1", 10086) // 要訪問的mqtt服務(wù)端的 ip 和 端口號 .WithCredentials("admin", "123456") // 要訪問的mqtt服務(wù)端的用戶名和密碼 .WithClientId("testclient02") // 設(shè)置客戶端id .WithCleanSession() .WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = false // 是否使用 tls加密 }); var clientOptions = optionsBuilder.Build(); _mqttClient = new MqttFactory().CreateMqttClient(); _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客戶端連接成功事件 _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客戶端連接關(guān)閉事件 _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件 _mqttClient.ConnectAsync(clientOptions); } /// <summary> /// 客戶端連接關(guān)閉事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) { Console.WriteLine($"客戶端已斷開與服務(wù)端的連接……"); return Task.CompletedTask; } /// <summary> /// 客戶端連接成功事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) { Console.WriteLine($"客戶端已連接服務(wù)端……"); // 訂閱消息主題 // MqttQualityOfServiceLevel: (QoS): 0 最多一次,接收者不確認(rèn)收到消息,并且消息不被發(fā)送者存儲和重新發(fā)送提供與底層 TCP 協(xié)議相同的保證。 // 1: 保證一條消息至少有一次會傳遞給接收方。發(fā)送方存儲消息,直到它從接收方收到確認(rèn)收到消息的數(shù)據(jù)包。一條消息可以多次發(fā)送或傳遞。 // 2: 保證每條消息僅由預(yù)期的收件人接收一次。級別2是最安全和最慢的服務(wù)質(zhì)量級別,保證由發(fā)送方和接收方之間的至少兩個請求/響應(yīng)(四次握手)。 _mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce); return Task.CompletedTask; } /// <summary> /// 收到消息事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { Console.WriteLine($"ApplicationMessageReceivedAsync:客戶端ID=【{arg.ClientId}】接收到消息。 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】"); return Task.CompletedTask; } public void Publish(string data) { var message = new MqttApplicationMessage { Topic = "topic_02", Payload = Encoding.Default.GetBytes(data), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Retain = true // 服務(wù)端是否保留消息。true為保留,如果有新的訂閱者連接,就會立馬收到該消息。 }; _mqttClient.PublishAsync(message); } }
38、后記:MQTT以上演示已經(jīng)完畢,可以看到它的一些特性,跟websocket很接近,但是又比websocket通信更加靈活。其實,實際上MQTT的客戶端在現(xiàn)實生產(chǎn)環(huán)境場景下,并不需要咱們開發(fā)者進(jìn)行開發(fā),很多硬件設(shè)備都支持提供MQTT協(xié)議的通信客戶端,所以只需要自己搭建一個服務(wù)端,就可以實現(xiàn)實時監(jiān)控各種設(shè)備推送過來的各種信號數(shù)據(jù)。同時客戶端支持發(fā)布消息給其他客戶端,所以就實現(xiàn)了設(shè)備與設(shè)備之間的一對一信號通信的效果了。如果需要下發(fā)信號給硬件設(shè)備,MQTT服務(wù)端也可以直接下發(fā)給某個指定設(shè)備來進(jìn)行實現(xiàn)即可。上面案例只提供入門方案,如果有感興趣的大佬,可以自己去拓展一下,來達(dá)到更好的效果。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
如何使用pm2守護(hù)你的.NET Core應(yīng)用程序詳解
pm2是nodejs的一個帶有負(fù)載均衡功能的應(yīng)用進(jìn)程管理器的模塊,下面這篇文章主要給大家介紹了關(guān)于如何使用pm2守護(hù)你的.NET Core應(yīng)用程序的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2018-10-10- 就是有時候窗口不能夠成功置頂,這時需要重新切換下標(biāo)簽,就可以置頂了,本文介紹C# SetWindowPos實現(xiàn)窗口置頂?shù)姆椒?/div> 2012-12-12
asp.net core集成kindeditor實現(xiàn)圖片上傳功能
這篇文章主要為大家詳細(xì)介紹了asp.net core集成kindeditor實現(xiàn)圖片上傳功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2016-11-11ASP.NET MVC+EF框架+EasyUI實現(xiàn)權(quán)限管系列
在學(xué)習(xí)MVC之前,我們有必要知道這些知識點(自動屬性,隱式類型var,對象初始化器和集合初始化器,匿名類,擴展方法,Lambda表達(dá)式),如果你還不知道的話就請看我下面的簡單的介紹,看下面我建立的項目的初步圖像,然后下篇我們開始簡單的介紹。2014-11-11最新評論