java代碼mqtt接收發(fā)送消息方式
java代碼mqtt接收發(fā)送消息
mqtt消息第一用到不是太熟悉所以寫一篇文章鞏固一下。
前提是你已經(jīng)把mqtt已經(jīng)安裝好,并且啟動(dòng)好了。
首先我們需要兩部分代碼。
所需依賴
<!-- mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>連接mqtt部分的代碼塊,因?yàn)槲也恍枰l(fā)送消息所以把發(fā)送消息給注釋掉了。
package mqttclient.util;
import lombok.extern.slf4j.Slf4j;
import mqttclient.callback.MqttMessageCallback2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Objects;
@Component
@Slf4j
public class MqttClientUtil2 {
private String username;
private String password;
@Value("tcp://127.0.0.1:1883")//這個(gè)是安裝mqtt的ip以及端口,1883是mqtt默認(rèn)端口
private String host;
@Value("CYT")//這個(gè)隨便寫但是是唯一的。
private String clientId;
@Value("cyt/#")這個(gè)是mqtt發(fā)送消息的咱們要訂閱的topic,cyt/#代表以cyt/開始的所有topic都接收
private String topic;
@Value("${mqtt.connection.timeout}")//IOT_MQTT_Yield會(huì)block住timeout的時(shí)間去嘗試接收數(shù)據(jù),直到timeout才會(huì)退出??梢詫懺谶@里也可以寫在yml配置文件中
private int timeOut;
@Value("${mqtt.keep.alive.interval}")
private int interval;
@Autowired
private MqttMessageCallback2 mqttMessageCallback2;
private MqttClient mqttClient;
private MqttConnectOptions mqttConnectOptions;
@PostConstruct
private void init(){
connect(host, clientId,topic);
}
/**
* 鏈接mqtt
* @param host
* @param clientId
*/
private void connect(String host,String clientId,String topic){
try{
mqttClient = new MqttClient(host,clientId,new MemoryPersistence());
mqttConnectOptions = getMqttConnectOptions();
//設(shè)置回調(diào)函數(shù)
mqttClient.setCallback(mqttMessageCallback2);
//鏈接mqtt
mqttClient.connect(mqttConnectOptions);
//訂閱消息
mqttClient.subscribe(topic,2);
}catch (Exception e){
log.error("mqtt服務(wù)鏈接異常!");
e.printStackTrace();
}
}
/**
* 設(shè)置鏈接對(duì)象信息
* setCleanSession true 斷開鏈接即清楚會(huì)話 false 保留鏈接信息 離線還會(huì)繼續(xù)發(fā)消息
* @return
*/
private MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());*/
mqttConnectOptions.setServerURIs(new String[]{host});
mqttConnectOptions.setKeepAliveInterval(interval);
mqttConnectOptions.setConnectionTimeout(timeOut);
mqttConnectOptions.setCleanSession(true);
return mqttConnectOptions;
}
/**
*mqtt鏈接狀態(tài)
* @return
*/
private boolean isConnect(){
if(Objects.isNull(this.mqttClient)){
return false;
}
return mqttClient.isConnected();
}
/**
* 設(shè)置重連
* @throws Exception
*/
private void reConnect() throws Exception{
if(Objects.nonNull(this.mqttClient)){
log.info("mqtt 服務(wù)已重新鏈接...");
this.mqttClient.connect(this.mqttConnectOptions);
}
}
/**
* 斷開鏈接
* @throws Exception
*/
private void closeConnect() throws Exception{
if(Objects.nonNull(this.mqttClient)){
log.info("mqtt 服務(wù)已斷開鏈接...");
this.mqttClient.disconnect();
}
}
/* *//**
* 發(fā)布消息
* @param topic
* @param message
* @param qos
* @throws Exception
*//*
public void sendMessage(String topic,String message,int qos) throws Exception {
if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
MqttTopic mqttTopic = mqttClient.getTopic(topic);
if(Objects.nonNull(mqttTopic)){
try{
MqttDeliveryToken publish = mqttTopic.publish(mqttMessage);
if(publish.isComplete()){
log.info("消息發(fā)送成功---->{}",message);
}
}catch(Exception e){
log.error("消息發(fā)送異常",e);
}
}
}else{
reConnect();
}
}*/
}接收消息部分
package mqttclient.callback;
import lombok.extern.slf4j.Slf4j;
import mqttclient.util.ParsingData2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class MqttMessageCallback2 implements MqttCallback {
/**
* 鏈接丟失時(shí)處理
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
//可以做重連 或者 其他業(yè)務(wù)處理
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("接收到消息topic---->{}"+topic);
System.out.println("接收到消息topic---->{}"+mqttMessage);
log.info("接收到消息質(zhì)量qos---->{}",mqttMessage.getQos());
System.out.println("接收到消息質(zhì)量qos---->{}"+mqttMessage.getQos());
log.info("接收到消息具體信息---->{}",new String(mqttMessage.getPayload()));
System.out.println("接收到消息具體信息---->{}"+mqttMessage.getPayload());
//結(jié)合業(yè)務(wù) 編寫具體信息即可
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}這個(gè)兩個(gè)寫完之后只要有數(shù)據(jù)發(fā)送過(guò)來(lái),這邊會(huì)自動(dòng)進(jìn)行接收打印。
是用mqtt網(wǎng)頁(yè)版圖形化界面進(jìn)行模擬數(shù)據(jù)發(fā)送。
安裝mqtt后打開此網(wǎng)站:http://localhost:18083/
默認(rèn)賬號(hào)是:admin / public
登錄后這邊可以設(shè)置中文:

模擬發(fā)送:這幾個(gè)地方不用改動(dòng)但是一定要點(diǎn)擊綠色的連接才可以,進(jìn)行發(fā)送。

需要修改的部分是:

然后點(diǎn)擊發(fā)送就可以收到信息了。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringSecurity在分布式環(huán)境下的使用流程分析
文章介紹了Spring?Security在分布式環(huán)境下的使用,包括單點(diǎn)登錄(SSO)的概念、流程圖以及JWT(JSON?Web?Token)的生成和校驗(yàn),通過(guò)使用JWT和RSA非對(duì)稱加密,可以實(shí)現(xiàn)安全的分布式認(rèn)證,感興趣的朋友一起看看吧2025-02-02
spring boot集成rabbitmq的實(shí)例教程
這篇文章主要給大家介紹了關(guān)于spring boot集成rabbitmq的相關(guān)資料,springboot集成RabbitMQ非常簡(jiǎn)單,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友們可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-11-11
Spring Security在標(biāo)準(zhǔn)登錄表單中添加一個(gè)額外的字段
這篇文章主要介紹了Spring Security在標(biāo)準(zhǔn)登錄表單中添加一個(gè)額外的字段,我們將重點(diǎn)關(guān)注兩種不同的方法,以展示框架的多功能性以及我們可以使用它的靈活方式。 需要的朋友可以參考下2019-05-05
Spring/SpringBoot?@RequestParam注解無(wú)法讀取application/json格式數(shù)據(jù)問題
RequestParam用于將指定的請(qǐng)求參數(shù)賦值給方法中的形參,可以接受簡(jiǎn)單類型屬性,也可以接受對(duì)象類型,一般用于GET請(qǐng)求,下面這篇文章主要給大家介紹了關(guān)于Spring/SpringBoot?@RequestParam注解無(wú)法讀取application/json格式數(shù)據(jù)問題解決的相關(guān)資料,需要的朋友可以參考下2022-10-10
SpringBoot集成JWT生成token及校驗(yàn)方法過(guò)程解析
這篇文章主要介紹了SpringBoot集成JWT生成token及校驗(yàn)方法過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
列舉java語(yǔ)言中反射的常用方法及實(shí)例代碼
反射機(jī)制指的是程序在運(yùn)行時(shí)能夠獲取自身的信息。這篇文章主要介紹了列舉java語(yǔ)言中反射的常用方法,需要的朋友可以參考下2019-07-07
Spring Boot Web應(yīng)用開發(fā) CORS 跨域請(qǐng)求支持
本篇文章主要介紹了Spring Boot Web應(yīng)用開發(fā) CORS 跨域請(qǐng)求支持,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-05-05
Java實(shí)現(xiàn)獲取前、后N天日期的函數(shù)分享
本文給大家分享的是使用java實(shí)現(xiàn)的獲取當(dāng)前日期前后N天的函數(shù),非常的簡(jiǎn)單實(shí)用,有需要的小伙伴可以參考下。2015-03-03

