SpringBoot整合MQTT并實(shí)現(xiàn)異步線程調(diào)用的問題
為什么選擇MQTT
MQTT的定義相信很多人都能講的頭頭是道,本文章也不討論什么高大上的東西,旨在用最簡(jiǎn)單直觀的方式讓每一位剛接觸的同行們可以最快的應(yīng)用起來
先從使用MQTT需要什么開始分析:
- 消息服務(wù)器
- 不同應(yīng)用/設(shè)備之間的頻繁交互
- 可能涉及一對(duì)多的消息傳遞
基于SpringBoot通過注解實(shí)現(xiàn)對(duì)mqtt消息處理的異步調(diào)用
使用背景
生產(chǎn)環(huán)境下, 由于mqtt 生產(chǎn)者生產(chǎn)的消息逐漸增多, 可能會(huì)導(dǎo)致消息堆積. 因此需要消費(fèi)者去快速的消費(fèi).
而其中的一個(gè)方案便是使用異步線程去加速消費(fèi)消息. 下面介紹下思路
我們可以在原來的mqtt工具類上面進(jìn)行改裝.
首先創(chuàng)建一個(gè)類MqttMessageListener并繼承IMqttMessageListener實(shí)現(xiàn)messageArrived, 用于處理這些消息(業(yè)務(wù)編寫)
然后改寫mqtt客戶端訂閱的方法, 注入MqttMessageListener, 并在訂閱方法中新增該參數(shù)
在然后在啟動(dòng)類開啟異步線程, 編寫一個(gè)配置類配置線程池參數(shù)并且在messageArrived加上@Async開啟異步線程調(diào)用
代碼實(shí)現(xiàn)
基礎(chǔ)代碼
指沒有開啟線程池的代碼
MqttPushClient 主要定義了連接參數(shù)
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Author
* @Date
* @Description 連接至EMQ X 服務(wù)器,獲取mqtt連接,發(fā)布消息
*/
@Component
public class MqttPushClient{
private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
public static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
public static MqttClient getClient() {
return client;
}
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, List<String> topicList) {
MqttClient client;
try {
client = new MqttClient(host, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
if (username != null) {
options.setUserName(username);
}
if (password != null) {
options.setPassword(password.toCharArray());
}
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client);
try {
//設(shè)置回調(diào)類
client.setCallback(pushCallback);
//client.connect(options);
IMqttToken iMqttToken = client.connectWithResult(options);
boolean complete = iMqttToken.isComplete();
log.info("MQTT連接"+(complete?"成功":"失敗"));
/** 訂閱主題 **/
for (String topic : topicList) {
log.info("連接訂閱主題:{}", topic);
client.subscribe(topic, 0);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
PushCallback 回調(diào)類, 實(shí)現(xiàn)重連, 消息發(fā)送監(jiān)聽, 消息接收監(jiān)聽
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author
* @Date
* @Description 消息回調(diào),處理接收的消息
*/
@Component
public class PushCallback implements MqttCallback {
private static final Logger log = LoggerFactory.getLogger(PushCallback.class);
@Autowired
private MqttConfiguration mqttConfiguration;
@Autowired
private MqttTopic mqttTopic;
@Override
public void connectionLost(Throwable cause) { // 連接丟失后,一般在這里面進(jìn)行重連
log.info("連接斷開,正在重連");
MqttPushClient mqttPushClient = mqttConfiguration.getMqttPushClient();
if (null != mqttPushClient) {
mqttPushClient.connect(mqttConfiguration.getHost(), mqttConfiguration.getClientid(), mqttConfiguration.getUsername(),
mqttConfiguration.getPassword(), mqttConfiguration.getTimeout(), mqttConfiguration.getKeepalive(), mqttConfiguration.getTopic());
log.info("已重連");
}
}
/**
* 發(fā)送消息,消息到達(dá)后處理方法
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
int messageId = token.getMessageId();
String[] topics = token.getTopics();
log.info("消息發(fā)送完成,messageId={},topics={}",messageId,topics.toString());
}
/**
* 訂閱主題接收到消息處理方法
* @param topic
* @param message
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
// subscribe后得到的消息會(huì)執(zhí)行到這里面,這里在控制臺(tái)有輸出
String messageStr = new String(message.getPayload());
// messageDistribute.distribute(topic, messageStr);
log.info("接收的主題:" + topic + ";接收到的信息:" + messageStr);
}
}
MqttConfiguration 配置了mqtt相關(guān)參數(shù), 并初始化連接(mqtt在這里啟動(dòng))
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Author
* @Date mqtt配置及連接
* @Description
*/
@Slf4j
@Component
@Configuration
@ConfigurationProperties(MqttConfiguration.PREFIX)
public class MqttConfiguration {
@Autowired
private MqttPushClient mqttPushClient;
/**
* 指定配置文件application-local.properties中的屬性名前綴
*/
public static final String PREFIX = "std.mqtt";
private String host;
private String clientId;
private String userName;
private String password;
private int timeout;
private int keepAlive;
private List<String> topic;
public String getClientid() {
return clientId;
}
public void setClientid(String clientid) {
this.clientId = clientid;
}
public String getUsername() {
return userName;
}
public void setUsername(String username) {
this.userName = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getKeepalive() {
return keepAlive;
}
public void setKeepalive(int keepalive) {
this.keepAlive = keepalive;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public List<String> getTopic() {
return topic;
}
public void setTopic(List<String> topic) {
this.topic = topic;
}
/**
* 連接至mqtt服務(wù)器,獲取mqtt連接
* @return
*/
@Bean
public MqttPushClient getMqttPushClient() {
//連接至mqtt服務(wù)器,獲取mqtt連接
mqttPushClient.connect(host, clientId, userName, password, timeout, keepAlive, topic);
return mqttPushClient;
}
}
properties.yml 配置文件
std.mqtt:
host: tcp://x.x.x.x:1883
username: your_username
password: your_password
#MQTT-連接服務(wù)器默認(rèn)客戶端ID
clientid: your_clientid
#連接超時(shí)
timeout: 1000
# deviceId
deviceId: your_deviceId
# mqtt-topic
topic[0]: your_tpoic
TopicOperation 定義了發(fā)布訂閱的方法
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @Author chy
*/
public class TopicOperation {
private static final Logger log = LoggerFactory.getLogger(TopicOperation.class);
/**
* 訂閱主題
* @param topic 主題名稱
*/
public static void subscribe(String topic) {
try {
MqttClient client = MqttPushClient.getClient();
if (client == null) {
return;
};
client.subscribe(topic, 0);
log.info("訂閱主題:{}",topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 發(fā)布主題
*
* @param topic
* @param pushMessage
*/
public static void publish(String topic, String pushMessage) {
log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage);
MqttMessage message = new MqttMessage();
message.setQos(0);
// 非持久化
message.setRetained(false);
message.setPayload(pushMessage.getBytes());
MqttClient client = MqttPushClient.getClient();
if (client == null) {
return;
};
MqttTopic mTopic = client.getTopic(topic);
if (null == mTopic) {
log.error("主題不存在:{}",mTopic);
}
try {
mTopic.publish(message);
} catch (Exception e) {
log.error("mqtt發(fā)送消息異常:",e);
}
}
}
定義了發(fā)布和訂閱的相關(guān)主題
import com.sxd.onlinereservation.exception.BusinessException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @Author
* @Date topic名稱
* @Description
*/
@Component
public class MqttTopic {
@Value("${std.mqtt.deviceId}")
private String[] deviceId;
public String getSubscribeTopic(String type){
switch (type){
case "appointTopic":
return String.format("/v1/%s/service/appointTopic", deviceId[0]);
default:
throw new BusinessException("mqtt 訂閱主題獲取錯(cuò)誤");
}
}
public String getPublishTopic(String type) {
switch (type){
//1.0接口立即取號(hào)發(fā)布主題
case "appointTopic":
return String.format("/v1/%s/service/appointTopic", deviceId[1]);
default:
throw new BusinessException("mqtt 發(fā)布主題獲取錯(cuò)誤");
}
}
}
ps: 如果想要使用該工具類進(jìn)行消息發(fā)送和接收看下面demo
//消息發(fā)布操作
TopicOperation.publish(mqttTopic.getPublishTopic("appointTopic"), "消息體"));
//消息訂閱操作
TopicOperation.subscribe(mqttTopic.getSubscribeTopic("appointTopic"), "消息體"));
異步線程處理實(shí)現(xiàn)
總結(jié)
- 創(chuàng)建消息監(jiān)聽類 , 用于監(jiān)聽消息并進(jìn)行業(yè)務(wù)處理
- 在原來訂閱時(shí), 注入并使用第一步創(chuàng)建的監(jiān)聽類
- 通過注解開啟異步線程并配置處理方式
創(chuàng)建消息監(jiān)聽類 , 用于監(jiān)聽消息并進(jìn)行業(yè)務(wù)處理
@Slf4j
@Component
public class MqttMessageListener implements IMqttMessageListener {
@Resource
private BusinessService businessService;
@Autowired
private MqttTopic mqttTopic;
@Autowired
private ThreeCallmachineService threeCallmachineService;
@Autowired
private BusinessHallService businessHallService;
@Autowired
private BusinessMaterialService businessMaterialService;
@Autowired
private BusinessWaitService businessWaitService;
@Autowired
private AppointmentService appointmentService;
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String messageStr = new String(message.getPayload());
log.info("接收的主題:" + topic + ";接收到的信息:" + messageStr);
//進(jìn)行 業(yè)務(wù)處理
}
}
在原來訂閱時(shí), 注入并使用第一步創(chuàng)建的監(jiān)聽類
注入了
MqttMessageListener, 并且在訂閱時(shí)加入client.subscribe(topic, mqttMessageListener);
修改MqttPushClient (必須)
@Component
public class MqttPushClient{
private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
@Autowired //這里進(jìn)行了注入操作
private MqttMessageListener mqttMessageListener;
private static MqttClient client;
public static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
public static MqttClient getClient() {
return client;
}
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, List<String> topicList) {
MqttClient client;
try {
client = new MqttClient(host, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
if (username != null) {
options.setUserName(username);
}
if (password != null) {
options.setPassword(password.toCharArray());
}
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client);
try {
//設(shè)置回調(diào)類
client.setCallback(pushCallback);
//client.connect(options);
IMqttToken iMqttToken = client.connectWithResult(options);
boolean complete = iMqttToken.isComplete();
log.info("MQTT連接"+(complete?"成功":"失敗"));
/** 訂閱主題 **/
for (String topic : topicList) {
log.info("連接訂閱主題:{}", topic);
//client.subscribe(topic, 0);
client.subscribe(topic, mqttMessageListener);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
如果業(yè)務(wù)還使用了手動(dòng)訂閱, 則也需要在訂閱的類上面注入MqttMessageListener , 并且在訂閱方法中作為參數(shù)使用. 但是我們需要將方法改成非靜態(tài)的, 因此在使用該方法時(shí)我們需要new該對(duì)象然后才能夠調(diào)用. 但是手動(dòng)訂閱很少用到. 因此有無此步驟都可
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @Author chy
* @Date
* @Description
*/
public class TopicOperation {
private static final Logger log = LoggerFactory.getLogger(TopicOperation.class);
//注入MqttMessageListener
@Autowired
private MqttMessageListener mqttMessageListener;
/**
* 訂閱主題
* @param topic 主題名稱
*/
public void subscribe(String topic) {
try {
MqttClient client = MqttPushClient.getClient();
if (client == null) {
return;
};
//client.subscribe(topic, 0);
//在訂閱方法中作為參數(shù)使用
client.subscribe(topic, mqttMessageListener);
log.info("訂閱主題:{}",topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 發(fā)布主題
*
* @param topic
* @param pushMessage
*/
public static void publish(String topic, String pushMessage) {
log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage);
MqttMessage message = new MqttMessage();
message.setQos(0);
// 非持久化
message.setRetained(false);
message.setPayload(pushMessage.getBytes());
MqttClient client = MqttPushClient.getClient();
if (client == null) {
return;
};
MqttTopic mTopic = client.getTopic(topic);
if (null == mTopic) {
log.error("主題不存在:{}",mTopic);
}
try {
mTopic.publish(message);
} catch (Exception e) {
log.error("mqtt發(fā)送消息異常:",e);
}
}
}
通過注解開啟異步線程并配置處理方式 啟動(dòng)類開啟 @EnableAsync(proxyTargetClass=true )
@SpringBootApplication
@MapperScan(basePackages = "com.x.x.mapper")
@EnableTransactionManagement
@EnableAsync(proxyTargetClass=true )
public class XXApplication {
public static void main(String[] args) {
SpringApplication.run(XXApplication.class, args);
}
}
配置類配置線程池參數(shù)
@Slf4j
@Configuration
public class ExecutorConfig {
@Bean
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(9);
//配置最大線程數(shù)
executor.setMaxPoolSize(20);
//配置隊(duì)列大小
executor.setQueueCapacity(200);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("sxd-async-service-");
// 設(shè)置拒絕策略:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執(zhí)行初始化
executor.initialize();
return executor;
}
}
MqttMessageListener的實(shí)現(xiàn)方法messageArrived開啟@Async("asyncServiceExecutor")
@Slf4j
@Component
public class MqttMessageListener implements IMqttMessageListener {
@Resource
private BusinessService businessService;
@Autowired
private MqttTopic mqttTopic;
@Autowired
private ThreeCallmachineService threeCallmachineService;
@Autowired
private BusinessHallService businessHallService;
@Autowired
private BusinessMaterialService businessMaterialService;
@Autowired
private BusinessWaitService businessWaitService;
@Autowired
private AppointmentService appointmentService;
@Override
@Async("asyncServiceExecutor")
public void messageArrived(String topic, MqttMessage message) throws Exception {
String messageStr = new String(message.getPayload());
log.info("接收的主題:" + topic + ";接收到的信息:" + messageStr);
System.out.println("線程名稱:【" + Thread.currentThread().getName() + "】");
//進(jìn)行 業(yè)務(wù)處理
}
}
到此這篇關(guān)于SpringBoot整合MQTT并實(shí)現(xiàn)異步線程調(diào)用的文章就介紹到這了,更多相關(guān)SpringBoot異步線程調(diào)用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式
- springboot?正確的在異步線程中使用request的示例代碼
- SpringBoot使用異步線程池實(shí)現(xiàn)生產(chǎn)環(huán)境批量數(shù)據(jù)推送
- SpringBoot?異步線程間傳遞上下文方式
- Spring Boot之@Async異步線程池示例詳解
- SpringBoot獲取HttpServletRequest的3種方式總結(jié)
- SpringBoot詳細(xì)講解異步任務(wù)如何獲取HttpServletRequest
- 在 Spring Boot 中使用異步線程時(shí)的 HttpServletRequest 復(fù)用問題記錄
相關(guān)文章
詳解基于java的Socket聊天程序——客戶端(附demo)
這篇文章主要介紹了詳解基于java的Socket聊天程序——客戶端(附demo),客戶端設(shè)計(jì)主要分成兩個(gè)部分,分別是socket通訊模塊設(shè)計(jì)和UI相關(guān)設(shè)計(jì)。有興趣的可以了解一下。2016-12-12
SpringBoot應(yīng)用啟動(dòng)失?。憾丝谡加脤?dǎo)致Tomcat啟動(dòng)失敗的問題分析與解決方法
在開發(fā)和運(yùn)維過程中,應(yīng)用程序啟動(dòng)失敗是我們經(jīng)常遇到的一個(gè)問題,尤其是在 Web 應(yīng)用程序中,涉及到 Web 服務(wù)器的配置時(shí),今天我們將探討一個(gè)常見的啟動(dòng)錯(cuò)誤,尤其是在使用 Spring Boot 和內(nèi)嵌 Tomcat 服務(wù)器時(shí),需要的朋友可以參考下2024-11-11
SpringBoot 回滾操作的幾種實(shí)現(xiàn)方式
回滾操作是一種常見的操作,用于撤銷之前執(zhí)行的操作,本文主要介紹了SpringBoot回滾操作的幾種實(shí)現(xiàn)方式,包含基于異常類型的回滾、基于自定義邏輯的回滾和基于數(shù)據(jù)庫(kù)狀態(tài)的回滾,感興趣的可以了解一下2024-03-03
使用Java第三方實(shí)現(xiàn)發(fā)送短信功能
這篇文章主要介紹了使用Java第三方實(shí)現(xiàn)發(fā)送短信功能,在一些開發(fā)中,經(jīng)常需要有給用戶發(fā)送短信接收驗(yàn)證碼的功能,那么在Java中該如何實(shí)現(xiàn)呢,今天我們就一起來看一看2023-03-03
RocketMQ實(shí)現(xiàn)隨緣分BUG小功能示例詳解
這篇文章主要為大家介紹了RocketMQ實(shí)現(xiàn)隨緣分BUG小功能示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
jvm雙親委派 vs 破壞雙親委派理解加載器的權(quán)責(zé)分配
這篇文章主要為大家介紹了jvm雙親委派 vs 破壞雙親委派對(duì)比來理解加載器的權(quán)責(zé)分配,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10

