Java MQTT實(shí)戰(zhàn)應(yīng)用
一、MQTT協(xié)議
MQTT(Message Queuing Telemetry Transport)是一種輕量級(jí)的發(fā)布/訂閱式消息傳遞協(xié)議,專為物聯(lián)網(wǎng)(IoT)和嵌入式設(shè)備設(shè)計(jì),它簡化了設(shè)備之間的通信,并優(yōu)化帶寬使用。
在MQTT中,消息的發(fā)送者稱為“發(fā)布者”(Publisher)消息的接收者稱為“訂閱者”(Subscriber),而消息的中轉(zhuǎn)站是“代理”(Broker)。發(fā)布者將消息發(fā)布到特定的“主題”(Topic),代理負(fù)責(zé)將消息轉(zhuǎn)發(fā)給所有訂閱了該主題的訂閱者。這種模式解耦了消息的發(fā)送者和接收者,使得系統(tǒng)更加靈活和可擴(kuò)展。
二、MQTT優(yōu)點(diǎn)
- 低功耗、高效、可靠。
- 輕量級(jí):協(xié)議設(shè)計(jì)簡潔,消息頭部開銷小,適用于低帶寬和低功耗設(shè)備。
- 支持發(fā)布/訂閱模式:設(shè)備可以發(fā)布消息到主題,其他設(shè)備可以訂閱對(duì)應(yīng)的主題接收消息。這一模式解耦了消息生產(chǎn)者和消費(fèi)者,簡化了系統(tǒng)架構(gòu),提高了靈活性和可擴(kuò)展性。
- 可拓展性和兼容性:MQTT允許使用不同的傳輸協(xié)議,包括TCP、WebSocket等。它的簡單性使得它易于與其他協(xié)議和服務(wù)集成。
- 持久化會(huì)話:MQTT支持消息持久化,允許設(shè)備在斷線后重新連接時(shí)恢復(fù)之前的會(huì)話狀態(tài),包括未完成的訂閱和未收到的消息隊(duì)列,這對(duì)于網(wǎng)絡(luò)不穩(wěn)定或經(jīng)常斷開的物聯(lián)網(wǎng)環(huán)境尤為重要。
三、三種服務(wù)質(zhì)量等級(jí)
- QoS = 0(最多一次):消息最多被傳遞一次,可能丟失,但不會(huì)重復(fù)。此級(jí)別提供的可靠性最低,一旦消息被客戶端發(fā)送出去,它不會(huì)等待任何確認(rèn),即“Fire and Forget”模式。這意味著發(fā)布者不會(huì)確認(rèn)消息是否到達(dá)Broker,也不會(huì)嘗試重傳失敗的消息)
- QoS = 1(至少一次):消息至少被傳遞一次,可能會(huì)重復(fù),但不會(huì)丟失。此級(jí)別保證消息至少被送達(dá)一次,但有可能被重復(fù)發(fā)送。在QoS 1下,Broker(消息隊(duì)列服務(wù)器)會(huì)發(fā)送PUBACK確認(rèn)消息給客戶端,如果客戶端沒有收到確認(rèn),則會(huì)重發(fā)消息,直到收到確認(rèn)為止。因此,雖然可以確保消息不會(huì)丟失,但也可能導(dǎo)致相同消息被多次接收
- QoS = 2(恰好一次):消息保證被傳遞一次且僅一次,不會(huì)丟失也不會(huì)重復(fù)。這是MQTT提供的最高級(jí)別服務(wù)質(zhì)量,確保每條消息只會(huì)被接收一次,提供最嚴(yán)格的可靠性保證。該機(jī)制通過一個(gè)復(fù)雜的四次握手過程實(shí)現(xiàn),包括消息標(biāo)識(shí)符的確認(rèn)和釋放,確保消息既不丟失也不重復(fù)
四、客戶端、代理、主題
MQTT協(xié)議中,三個(gè)核心概念分別是客戶端(Client)、代理(Broker)和主題(Topic),它們共同構(gòu)成了MQTT通信的基礎(chǔ)框架,實(shí)現(xiàn)了消息的發(fā)布與訂閱機(jī)制。
1. 客戶端(Client):
作用:客戶端可以是消息的發(fā)布者(Publisher)或訂閱者(Subscriber),也可以同時(shí)具備這兩種角色。發(fā)布者負(fù)責(zé)向MQTT系統(tǒng)中的某個(gè)主題發(fā)布消息;訂閱者則訂閱感興趣的主題,以接收來自該主題的消息??蛻舳丝梢允莻鞲衅?、手機(jī)應(yīng)用、服務(wù)器程序等各種設(shè)備或應(yīng)用。
相互關(guān)系:客戶端不直接相互通信,而是通過Broker中轉(zhuǎn)消息。發(fā)布者客戶端向Broker發(fā)送消息,而訂閱者客戶端從Broker接收消息。
2. 代理(Broker):
作用:Broker是MQTT通信的中心節(jié)點(diǎn),它接收來自發(fā)布者客戶端的消息,并根據(jù)消息中的主題分發(fā)給相應(yīng)的訂閱者客戶端。Broker負(fù)責(zé)維護(hù)客戶端的連接狀態(tài)、存儲(chǔ)消息(如果需要持久化)、管理主題的訂閱關(guān)系等。
相互關(guān)系:Broker是客戶端之間的中介,它管理著所有的消息流動(dòng)。每個(gè)客戶端都與Broker建立連接,無論發(fā)布還是訂閱操作,都必須通過Broker來完成。
3. 主題(Topic):
作用:主題是MQTT中消息的分類標(biāo)簽,類似于一個(gè)消息通道或者頻道。每個(gè)消息都會(huì)關(guān)聯(lián)一個(gè)主題,發(fā)布者通過指定主題來決定消息的去向,而訂閱者通過訂閱特定主題來接收相關(guān)消息。
相互關(guān)系:主題是連接發(fā)布者與訂閱者的橋梁。發(fā)布者向特定主題發(fā)布消息,而訂閱者則通過訂閱這些主題來接收消息。Broker根據(jù)主題匹配規(guī)則,確保消息被正確地路由到已訂閱該主題的所有客戶端。主題可以是靜態(tài)的字符串,也可以包含通配符(如"+“和”#”)來實(shí)現(xiàn)靈活的匹配規(guī)則。
五、實(shí)戰(zhàn)應(yīng)用
1. 安裝部署(linux)
-- 拉取鏡像
docker pull emqx/emqx:5.0.26
-- 安裝容器
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.26
2. 訪問控制臺(tái)
訪問:ip:18083
默認(rèn)的用戶名密碼:admin/public
3. 客戶端認(rèn)證
4. 創(chuàng)建用戶
5. SpringBoot中整合
5.1 導(dǎo)入jar包
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
5.2 yml配置
mqtt: #MQTT-服務(wù)器連接地址,如果有多個(gè),用逗號(hào)隔開 host: tcp://192.168.17.101:1883 #MQTT-連接服務(wù)器默認(rèn)客戶端ID,可以隨便寫 clientId: mqtt_test #MQTT-用戶名 username: zhangsan #MQTT-密碼 password: 123456 #MQTT-指定消息的推送和訂閱主題 topic: test #連接超時(shí) timeout: 100 #設(shè)置會(huì)話心跳時(shí)間 keepalive: 10
5.3 MqttConfig.java
@Slf4j @Configuration @ConfigurationProperties("mqtt") @Data public class MqttConfig { String host; String clientId; String topic; String username; String password; Integer timeout; Integer keepalive; // MQTT客戶端的配置類,可以設(shè)置mqtt服務(wù)器的賬號(hào)和密碼 @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); // 設(shè)置是否自動(dòng)重連 options.setAutomaticReconnect(true); // false 保持會(huì)話不被清理自動(dòng)重連后才能收到訂閱的主題消息(包括離線時(shí)發(fā)布的消息) options.setCleanSession(true); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); return options; } // MqttClient 類,MQTT的客戶端類,可以去連接MQTT服務(wù)器 @Bean public MqttClient mqttClient(MqttConnectOptions mqttConnectOptions) { try { MqttClient client = new MqttClient(host, clientId); // 回調(diào)對(duì)象,監(jiān)聽消息的獲取,采用的接口回調(diào),可以獲取對(duì)應(yīng)訂閱到的消息 client.setCallback(new MessageCallback(client, this.topic, mqttConnectOptions)); // 連接 client.connect(mqttConnectOptions()); return client; } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("mqtt 連接異常"); } } }
5.4 MessageCallback.java
/** * consumer 消費(fèi)者,對(duì)收到的消息進(jìn)行處理 */ //@Component @Slf4j public class MessageCallback implements MqttCallbackExtended { private MqttClient client; private String topic; private MqttConnectOptions mqttConnectOptions; public MessageCallback() { } public MessageCallback(MqttClient mqttClient, String topic, MqttConnectOptions mqttConnectOptions) { this.client = mqttClient; this.topic = topic; this.mqttConnectOptions = mqttConnectOptions; } // 在客戶端連接斷開時(shí)觸發(fā) @Override public void connectionLost(Throwable throwable) { if (client != null && !client.isConnected()) { log.info("{}, 連接斷開,正在reconnect....", client.getClientId()); try { client.reconnect(); // client.connect(this.mqttConnectOptions); } catch (MqttException e) { e.printStackTrace(); } } else { log.info("未知異常,連接斷開"); } } // 在客戶端與服務(wù)器連接成功時(shí)觸發(fā) @Override public void connectComplete(boolean b, String url) { log.info("{} 上線了{(lán)} {}", client.getClientId(), b, url); try { client.subscribe(this.topic, 0); } catch (MqttException e) { e.printStackTrace(); } } // 在客戶端收到訂閱的消息時(shí)觸發(fā) @Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info("接收消息主題 : " + topic); log.info("接收消息內(nèi)容 : " + new String(message.getPayload())); String msg = new String(message.getPayload()); try { JSONObject jsonObject = JSON.parseObject(msg); String clientId = String.valueOf(jsonObject.get("clientid")); if (topic.endsWith("disconnected")) { log.info("設(shè)備{}已掉線", clientId); } else if (topic.endsWith("connected")) { log.info("設(shè)備{}已上線", clientId); } else { log.info("其他主題的消息"); } } catch (JSONException e) { log.error("JSON Format Parsing Exception : {}", msg); } } // 在客戶端發(fā)送消息至服務(wù)器成功時(shí)觸發(fā) @Override public void deliveryComplete(IMqttDeliveryToken token) { log.info("deliveryComplete---------" + token.isComplete()); } }
5.5 MqttUtil.java
@Component @Slf4j public class MqttUtil { @Autowired(required = false) private MqttClient client; /** * 訂閱主題 * * @param topic * @param qos */ public void subscribe(String topic, int qos) { try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 訂閱主題 * * @param topic */ public void subscribe(String topic) { try { client.subscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } /** * 發(fā)布消息 * * @param qos 連接方式 0,1,2 默認(rèn)0 * @param retained 是否保留最新的消息 * @param topic 訂閱主題 * @param pushMessage 消息體 */ public void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mqttTopic = client.getTopic(topic); if (null == mqttTopic) { log.error("topic not exist"); } MqttDeliveryToken token; try { // 發(fā)送消息 token = mqttTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } /** * 發(fā)布消息 * * @param topic 主題 * @param pushMessage 消息內(nèi)容 */ public void publish(String topic, String pushMessage) { publish(0, true, topic, pushMessage); } }
5.6 MqttController.java
@RestController @Slf4j public class MqttController { @Autowired MqttClient client; @Autowired MqttUtil mqttUtil; @GetMapping("/send") public String send() { try { for (int i = 0; i < 3; i++) { mqttUtil.publish("test", "消息hello" + i); log.info("發(fā)送成功:{}", i); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } return "SUCCESS"; } }
六、MQTTX官網(wǎng)地址
MQTT客戶端工具M(jìn)QTTX下載地址 : MQTTX:全功能 MQTT 客戶端工具
到此這篇關(guān)于Java MQTT實(shí)戰(zhàn)應(yīng)用的文章就介紹到這了,更多相關(guān)Java MQTT內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MyBatis-Plus工具使用之EntityWrapper解析
這篇文章主要介紹了MyBatis-Plus工具使用之EntityWrapper解析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03spark中使用groupByKey進(jìn)行分組排序的示例代碼
這篇文章主要介紹了spark中使用groupByKey進(jìn)行分組排序的實(shí)例代碼,本文通過實(shí)例代碼給大家講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-03-03IDEA如何加載resources文件夾下文件相對(duì)路徑
這篇文章主要介紹了IDEA如何加載resources文件夾下文件相對(duì)路徑問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12Java工廠模式優(yōu)雅地創(chuàng)建對(duì)象以及提高代碼復(fù)用率和靈活性
Java工廠模式是一種創(chuàng)建型設(shè)計(jì)模式,通過定義一個(gè)工廠類來封裝對(duì)象的創(chuàng)建過程,將對(duì)象的創(chuàng)建和使用分離,提高代碼的可維護(hù)性和可擴(kuò)展性,同時(shí)可以實(shí)現(xiàn)更好的代碼復(fù)用和靈活性2023-05-05Spring Cloud中關(guān)于Feign的常見問題總結(jié)
這篇文章主要給大家介紹了Spring Cloud中關(guān)于Feign的常見問題,文中通過示例代碼介紹的很詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。2017-02-02Spring AOP攔截-三種方式實(shí)現(xiàn)自動(dòng)代理詳解
這篇文章主要介紹了Spring AOP攔截-三種方式實(shí)現(xiàn)自動(dòng)代理詳解,還是比較不錯(cuò)的,這里分享給大家,供需要的朋友參考。2017-11-11