go語(yǔ)言實(shí)現(xiàn)mqtt協(xié)議的實(shí)踐
一、什么是MQTT
MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級(jí)”通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。
MQTT最大優(yōu)點(diǎn)在于,可以以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。作為一種低開(kāi)銷(xiāo)、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有較廣泛的應(yīng)用
MQTT是一個(gè)基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡(jiǎn)單、開(kāi)放和易于實(shí)現(xiàn)的,這些特點(diǎn)使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機(jī)器與機(jī)器(M2M)通信和物聯(lián)網(wǎng)(IoT)。其在,通過(guò)衛(wèi)星鏈路通信傳感器、偶爾撥號(hào)的醫(yī)療設(shè)備、智能家居、及一些小型化設(shè)備中已廣泛使用
MQTT還有一個(gè)特點(diǎn)就是客戶端之間不用相互通信, MQTT通信更像是郵箱服務(wù),發(fā)布者發(fā)布消息到服務(wù)器,接收者只要訂閱了其服務(wù)在線后即可收到
實(shí)現(xiàn)MQTT協(xié)議需要客戶端和服務(wù)器端通訊完成,在通訊過(guò)程中,MQTT協(xié)議中有三種身份:發(fā)布者(Publish)、代理(Broker)(服務(wù)器)、訂閱者(Subscribe)。其中,消息的發(fā)布者和訂閱者都是客戶端,消息代理是服務(wù)器,消息發(fā)布者可以同時(shí)是訂閱者。
MQTT傳輸?shù)南⒎譃椋褐黝}(Topic)和負(fù)載(payload)兩部分:
(1)Topic,可以理解為消息的類(lèi)型,訂閱者訂閱(Subscribe)后,就會(huì)收到該主題的消息內(nèi)容(payload);
(2)payload,可以理解為消息的內(nèi)容,是指訂閱者具體要使用的內(nèi)容。
Topic就是消息名,payload就是消息體
MQTT會(huì)構(gòu)建底層網(wǎng)絡(luò)傳輸:它將建立客戶端到服務(wù)器的連接,提供兩者之間的一個(gè)有序的、無(wú)損的、基于字節(jié)流的雙向傳輸。
當(dāng)應(yīng)用數(shù)據(jù)通過(guò)MQTT網(wǎng)絡(luò)發(fā)送時(shí),MQTT會(huì)把與之相關(guān)的**服務(wù)質(zhì)量(QoS)和主題名(Topic)**相關(guān)連。
二、Go語(yǔ)言MQTT服務(wù)器Broker的搭建
服務(wù)端用erlang編寫(xiě)的一個(gè)開(kāi)源項(xiàng)目:emqqtd
# 下載安裝包 wget https://github.com/emqx/emqx/releases/download/v4.0.4/emqx-ubuntu18.04-v4.0.4.zip cd mqttd/emqx . ├── bin ├── data ├── erts-10.5.2 ├── etc ├── lib ├── log └── releases # 開(kāi)啟服務(wù) ./bin/emqx start # 查看狀態(tài) ./bin/emqx_ctl status # 停止服務(wù) ./bin/emqx stop
找到自己的IP,訪問(wèn)http://[你的IP]:18083/#/clients
- 用戶名:admin
- 密碼:public
即可進(jìn)入服務(wù)器的控制臺(tái)
三、Go客戶端訪問(wèn)簡(jiǎn)單API
客戶端用golang客戶端的庫(kù):“github.com/eclipse/paho.mqtt.golang”
# 下載依賴包 go get -u github.com/eclipse/paho.mqtt.golang
實(shí)例如下:
編寫(xiě)了兩個(gè)函數(shù)一個(gè)發(fā)布一個(gè)訂閱,傳入?yún)?shù)即可服務(wù)
修改EMQServerAddress
為你服務(wù)器的IP
package main // 與后端mqtt服務(wù)交互 import ( "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "log" "os" "strconv" "time" ) const EMQServerAddress = "你的IP" // 創(chuàng)建全局mqtt publish消息處理 handler var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Println("Push Message:") fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %s\n", msg.Payload()) } // 創(chuàng)建全局mqtt sub消息處理 handler var messageSubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Println("收到訂閱消息:") fmt.Printf("Sub Client Topic : %s \n", msg.Topic()) fmt.Printf("Sub Client msg : %s \n", msg.Payload()) } // 連接的回掉函數(shù) var connectHandler mqtt.OnConnectHandler =func(client mqtt.Client) { fmt.Println("新的連接!" + " Connected") } // 丟失連接的回掉函數(shù) var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { fmt.Printf("Connect loss: %v\n", err) } func init() { // 配置錯(cuò)誤提示 mqtt.DEBUG = log.New(os.Stdout, " [mqttDEBUG]", 0) mqtt.ERROR = log.New(os.Stdout, " [mqttERROR]", 0) } /** * @Description: 發(fā)布訂閱 * @param clientID * @param addr * @param topic * @param payload */ func Push(topic string, qos byte, retain bool, payload string) { // opts ClientOptions 用于設(shè)置 broker,端口,客戶端 id ,用戶名密碼等選項(xiàng) opts := mqtt.NewClientOptions().AddBroker("tcp://" + EMQServerAddress + ":1883").SetClientID("test_push") opts.SetKeepAlive(60 * time.Second) // Message callback handler,在沒(méi)有任何訂閱時(shí),發(fā)布端調(diào)用此函數(shù) opts.SetDefaultPublishHandler(messagePubHandler) opts.SetPingTimeout(1 * time.Second) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } //發(fā)布消息 // qos是服務(wù)質(zhì)量: ==1: 一次, >=1: 至少一次, <=1:最多一次 // retained: 表示mqtt服務(wù)器要保留這次推送的信息,如果有新的訂閱者出現(xiàn),就會(huì)把這消息推送給它(持久化推送) token := client.Publish(topic, qos, retain, payload) token.Wait() fmt.Println("Push Data : "+topic, "Data Size is "+strconv.Itoa(len(payload))) fmt.Println("Disconnect with broker") client.Disconnect(250) } /** * @Description: 訂閱與取消訂閱 * @param clientID * @param addr * @param topic * @param isSub */ func Subscription(topic string, qos byte, isSub bool, handleFun func([]byte)) { opts := mqtt.NewClientOptions().AddBroker("tcp://" + EMQServerAddress + ":1883").SetClientID("sub_test") opts.SetKeepAlive(60 * time.Second) opts.SetPingTimeout(1 * time.Second) opts.OnConnect = func(client mqtt.Client) { fmt.Println("New Subscription! Connected" + " => " + topic) } opts.OnConnectionLost = connectLostHandler client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } if isSub { // 訂閱消息 if token := client.Subscribe(topic, qos, func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("Receive Subscribe Message :") fmt.Printf("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload())) if len(msg.Payload()) > 0 { handleFun(msg.Payload()) } }); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } } else { // 取消訂閱 if token := client.Unsubscribe(topic); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } } }
學(xué)習(xí)資料:
到此這篇關(guān)于go語(yǔ)言實(shí)現(xiàn)mqtt協(xié)議的實(shí)踐的文章就介紹到這了,更多相關(guān)go語(yǔ)言 mqtt協(xié)議內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Go語(yǔ)言進(jìn)行安卓開(kāi)發(fā)的詳細(xì)教程
本文將介紹如何使用Go語(yǔ)言進(jìn)行安卓開(kāi)發(fā),我們將探討使用Go語(yǔ)言進(jìn)行安卓開(kāi)發(fā)的優(yōu)點(diǎn)、準(zhǔn)備工作、基本概念和示例代碼,通過(guò)本文的學(xué)習(xí),你將了解如何使用Go語(yǔ)言構(gòu)建高效的安卓應(yīng)用程序,需要的朋友可以參考下2023-11-11golang json.Marshal 特殊html字符被轉(zhuǎn)義的解決方法
今天小編就為大家分享一篇golang json.Marshal 特殊html字符被轉(zhuǎn)義的解決方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-08-08Golang使用Gin框架實(shí)現(xiàn)HTTP上傳文件過(guò)程介紹
由于需求中有文件上傳這一個(gè)需求,在這里我們就學(xué)習(xí)一下go語(yǔ)言如何上傳文件。本文主要通過(guò)表單的方式進(jìn)行文件上傳操作,本文實(shí)例為大家分享了Go實(shí)現(xiàn)文件上傳操作的具體代碼,供大家參考,具體內(nèi)容如下2023-04-04Go語(yǔ)言net包RPC遠(yuǎn)程調(diào)用三種方式http與json-rpc及tcp
這篇文章主要為大家介紹了Go語(yǔ)言net包RPC遠(yuǎn)程調(diào)用三種方式分別使用http與json-rpc及tcp的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-11-11golang實(shí)現(xiàn)redis的延時(shí)消息隊(duì)列功能示例
這篇文章主要介紹了golang實(shí)現(xiàn)redis的延時(shí)消息隊(duì)列功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11Go語(yǔ)言實(shí)現(xiàn)新春祝福二維碼的生成
二維碼現(xiàn)在是隨處度可以看到,買(mǎi)東西,支付,添加好友只要你掃一掃就能完成整個(gè)工作,簡(jiǎn)單且方便。所以利用這個(gè)新春佳節(jié)做一個(gè)帶著新春祝福的二維碼吧2023-02-02Go語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單Web服務(wù)器的方法
這篇文章主要介紹了Go語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單Web服務(wù)器的方法,實(shí)例分析了Web服務(wù)器的實(shí)現(xiàn)原理與相關(guān)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-02-02go語(yǔ)言基礎(chǔ)教程之(包、變量和函數(shù))
這篇文章主要介紹了go語(yǔ)言基礎(chǔ)教程之(包、變量和函數(shù))的相關(guān)資料,需要的朋友可以參考下2023-07-07