java代碼mqtt接收發(fā)送消息方式
java代碼mqtt接收發(fā)送消息
mqtt消息第一用到不是太熟悉所以寫一篇文章鞏固一下。
前提是你已經(jīng)把mqtt已經(jīng)安裝好,并且啟動好了。
首先我們需要兩部分代碼。
所需依賴
<!-- mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
連接mqtt部分的代碼塊,因為我不需要發(fā)送消息所以把發(fā)送消息給注釋掉了。
package mqttclient.util; import lombok.extern.slf4j.Slf4j; import mqttclient.callback.MqttMessageCallback2; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Objects; @Component @Slf4j public class MqttClientUtil2 { private String username; private String password; @Value("tcp://127.0.0.1:1883")//這個是安裝mqtt的ip以及端口,1883是mqtt默認端口 private String host; @Value("CYT")//這個隨便寫但是是唯一的。 private String clientId; @Value("cyt/#")這個是mqtt發(fā)送消息的咱們要訂閱的topic,cyt/#代表以cyt/開始的所有topic都接收 private String topic; @Value("${mqtt.connection.timeout}")//IOT_MQTT_Yield會block住timeout的時間去嘗試接收數(shù)據(jù),直到timeout才會退出??梢詫懺谶@里也可以寫在yml配置文件中 private int timeOut; @Value("${mqtt.keep.alive.interval}") private int interval; @Autowired private MqttMessageCallback2 mqttMessageCallback2; private MqttClient mqttClient; private MqttConnectOptions mqttConnectOptions; @PostConstruct private void init(){ connect(host, clientId,topic); } /** * 鏈接mqtt * @param host * @param clientId */ private void connect(String host,String clientId,String topic){ try{ mqttClient = new MqttClient(host,clientId,new MemoryPersistence()); mqttConnectOptions = getMqttConnectOptions(); //設(shè)置回調(diào)函數(shù) mqttClient.setCallback(mqttMessageCallback2); //鏈接mqtt mqttClient.connect(mqttConnectOptions); //訂閱消息 mqttClient.subscribe(topic,2); }catch (Exception e){ log.error("mqtt服務(wù)鏈接異常!"); e.printStackTrace(); } } /** * 設(shè)置鏈接對象信息 * setCleanSession true 斷開鏈接即清楚會話 false 保留鏈接信息 離線還會繼續(xù)發(fā)消息 * @return */ private MqttConnectOptions getMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); /*mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray());*/ mqttConnectOptions.setServerURIs(new String[]{host}); mqttConnectOptions.setKeepAliveInterval(interval); mqttConnectOptions.setConnectionTimeout(timeOut); mqttConnectOptions.setCleanSession(true); return mqttConnectOptions; } /** *mqtt鏈接狀態(tài) * @return */ private boolean isConnect(){ if(Objects.isNull(this.mqttClient)){ return false; } return mqttClient.isConnected(); } /** * 設(shè)置重連 * @throws Exception */ private void reConnect() throws Exception{ if(Objects.nonNull(this.mqttClient)){ log.info("mqtt 服務(wù)已重新鏈接..."); this.mqttClient.connect(this.mqttConnectOptions); } } /** * 斷開鏈接 * @throws Exception */ private void closeConnect() throws Exception{ if(Objects.nonNull(this.mqttClient)){ log.info("mqtt 服務(wù)已斷開鏈接..."); this.mqttClient.disconnect(); } } /* *//** * 發(fā)布消息 * @param topic * @param message * @param qos * @throws Exception *//* public void sendMessage(String topic,String message,int qos) throws Exception { if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(message.getBytes()); mqttMessage.setQos(qos); MqttTopic mqttTopic = mqttClient.getTopic(topic); if(Objects.nonNull(mqttTopic)){ try{ MqttDeliveryToken publish = mqttTopic.publish(mqttMessage); if(publish.isComplete()){ log.info("消息發(fā)送成功---->{}",message); } }catch(Exception e){ log.error("消息發(fā)送異常",e); } } }else{ reConnect(); } }*/ }
接收消息部分
package mqttclient.callback; import lombok.extern.slf4j.Slf4j; import mqttclient.util.ParsingData2; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; import java.util.List; @Component @Slf4j public class MqttMessageCallback2 implements MqttCallback { /** * 鏈接丟失時處理 * @param throwable */ @Override public void connectionLost(Throwable throwable) { //可以做重連 或者 其他業(yè)務(wù)處理 } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("接收到消息topic---->{}"+topic); System.out.println("接收到消息topic---->{}"+mqttMessage); log.info("接收到消息質(zhì)量qos---->{}",mqttMessage.getQos()); System.out.println("接收到消息質(zhì)量qos---->{}"+mqttMessage.getQos()); log.info("接收到消息具體信息---->{}",new String(mqttMessage.getPayload())); System.out.println("接收到消息具體信息---->{}"+mqttMessage.getPayload()); //結(jié)合業(yè)務(wù) 編寫具體信息即可 } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }
這個兩個寫完之后只要有數(shù)據(jù)發(fā)送過來,這邊會自動進行接收打印。
是用mqtt網(wǎng)頁版圖形化界面進行模擬數(shù)據(jù)發(fā)送。
安裝mqtt后打開此網(wǎng)站:http://localhost:18083/
默認賬號是:admin / public
登錄后這邊可以設(shè)置中文:
模擬發(fā)送:這幾個地方不用改動但是一定要點擊綠色的連接才可以,進行發(fā)送。
需要修改的部分是:
然后點擊發(fā)送就可以收到信息了。
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java如何實現(xiàn)微信支付v3的支付回調(diào)
這篇文章主要給大家介紹了關(guān)于Java如何實現(xiàn)微信支付v3的支付回調(diào),微信實現(xiàn)支付功能與支付寶實現(xiàn)支付功能是相似的,文中給了詳細的示例代碼,需要的朋友可以參考下2023-07-07快速解決List集合add元素,添加多個對象出現(xiàn)重復(fù)的問題
這篇文章主要介紹了快速解決List集合add元素,添加多個對象出現(xiàn)重復(fù)的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08springboot項目中mapper.xml文件找不到的三種解決方案
這篇文章主要介紹了springboot項目中mapper.xml文件找不到的三種解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01如何在SpringBoot項目中集成SpringSecurity進行權(quán)限管理
在本文中,我們將討論如何在Spring?Boot項目中集成權(quán)限管理,我們將使用Spring?Security框架,這是一個專門用于實現(xiàn)安全性功能的框架,包括認證和授權(quán),需要的朋友可以參考下2023-07-07