springboot整合mqtt客戶端示例分享
用到的工具:
EMQX , mqttx , idea
工具使用都很簡單,自己看看就能會。
訂閱端config代碼:
package com.example.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* @Author: xct
* @Date: 2021/7/30 17:06
* @Description:
*/
@Configuration
public class MqttConsumerConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客戶端對象
*/
private MqttClient client;
/**
* 在bean初始化后連接到服務器
* @author xct
* @param
* @return void
* @date 2021/7/30 16:48
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客戶端連接服務端
* @author xct
* @param
* @return void
* @date 2021/7/30 16:01
*/
public void connect(){
try {
//創(chuàng)建MQTT客戶端對象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//連接設置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,設置為false表示服務器會保留客戶端的連接記錄,客戶端重連之后能獲取到服務器在客戶端斷開連接期間推送的消息
//設置為true表示每次連接到服務端都是以新的身份
options.setCleanSession(true);
//設置連接用戶名
options.setUserName(username);
//設置連接密碼
options.setPassword(password.toCharArray());
//設置超時時間,單位為秒
options.setConnectionTimeout(100);
//設置心跳時間 單位為秒,表示服務器每隔1.5*20秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發(fā)布客戶端的遺囑信息
options.setWill("willTopic",(clientId + "與服務器斷開連接").getBytes(),0,false);
//設置回調
client.setCallback(new MqttConsumerCallBack());
client.connect(options);
//訂閱主題
//消息等級,和主題數組一一對應,服務端將按照指定等級給訂閱了主題的客戶端推送消息
int[] qos = {1,1};
//主題
String[] topics = {"topic1","topic2"};
//訂閱主題
client.subscribe(topics,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 斷開連接
* @author xct
* @param
* @return void
* @date 2021/8/2 09:30
*/
public void disConnect(){
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 訂閱主題
* @author xct
* @param topic
* @param qos
* @return void
* @date 2021/7/30 17:12
*/
public void subscribe(String topic,int qos){
try {
client.subscribe(topic,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}訂閱端回調代碼:
package com.example.demo.config;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* @Author: xct
* @Date: 2021/7/30 17:06
* @Description:
*/
public class MqttConsumerCallBack implements MqttCallback {
/**
* 客戶端斷開連接的回調
* @author xct
* @param throwable
* @return void
* @date 2021/7/30 17:14
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("與服務器斷開連接,可重連");
}
/**
* 消息到達的回調
* @author xct
* @param topic
* @param message
* @return void
* @date 2021/7/30 17:14
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(String.format("接收消息主題 : %s",topic));
System.out.println(String.format("接收消息Qos : %d",message.getQos()));
System.out.println(String.format("接收消息內容 : %s",new String(message.getPayload())));
System.out.println(String.format("接收消息retained : %b",message.isRetained()));
//TODO 可以將消息持久化到數據庫中,然后在進行其他操作。
}
/**
* 消息發(fā)布成功的回調
* @author xct
* @param iMqttDeliveryToken
* @return void
* @date 2021/7/30 17:14
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}測試控制器:
package com.example.demo.controller;
import com.example.demo.config.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* @Author: xct
* @Date: 2021/7/30 17:20
* @Description:
*/
@Controller
public class TestController {
@Autowired
private MqttConsumerConfig client;
@Value("${spring.mqtt.client.id}")
private String clientId;
@RequestMapping("connect")
@ResponseBody
public String connect(){
client.connect();
return clientId + "連接到服務器";
}
@RequestMapping("disConnect")
@ResponseBody
public String disConnect(){
client.disConnect();
return clientId + "與服務器斷開連接";
}
}配置文件:
spring:
application:
name: consumer
#MQTT配置信息
mqtt:
#MQTT服務端地址,端口默認為1883,如果有多個,用逗號隔開,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
url: tcp://0.0.0.0:1883
#用戶名
username: admin
#密碼
password: public
#客戶端id(不能重復)
client:
id: consumer-id
#MQTT默認的消息推送主題,實際可在調用接口時指定
default:
topic: topic
server:
port: 8082啟動訂閱端代碼,將訂閱端和mqttx都連接到EMQX
確認主題是否正確 發(fā)送即可。
到此這篇關于springboot整合mqtt客戶端示例分享的文章就介紹到這了,更多相關springboot整合mqtt內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
MyBatis在注解上使用動態(tài)SQL方式(@select使用if)
這篇文章主要介紹了MyBatis在注解上使用動態(tài)SQL方式(@select使用if),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07
JavaSwing GridLayout 網格布局的實現(xiàn)代碼
這篇文章主要介紹了JavaSwing GridLayout 網格布局的實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-12-12
Spring @Configuration和@Component的區(qū)別
今天小編就為大家分享一篇關于Spring @Configuration和@Component的區(qū)別,小編覺得內容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12
Java聊天室之實現(xiàn)接收和發(fā)送Socket
這篇文章主要為大家詳細介紹了Java簡易聊天室之實現(xiàn)接收和發(fā)送Socket功能,文中的示例代碼講解詳細,具有一定的借鑒價值,需要的可以了解一下2022-10-10
MyBatis-Plus更新對象時將字段值更新為null的四種常見方法
MyBatis-Plus 是一個 MyBatis 的增強工具,在簡化開發(fā)、提高效率方面表現(xiàn)非常出色,而,在使用 MyBatis-Plus 更新對象時,默認情況下是不會將字段值更新為 null 的,如果你需要將某些字段的值更新為 null,有幾種方法可以實現(xiàn),本文將介紹幾種常見的方法2024-11-11
springboot實現(xiàn)的https單向認證和雙向認證(java生成證書)
springboot https單向認證和雙向認證,本文主要介紹了springboot實現(xiàn)的https單向認證和雙向認證,具有一定的參考價值,感興趣的可以了解一下2024-04-04

