SpringBoot實現(xiàn)MQTT消息發(fā)送和接收方式
Spring integration交互邏輯
對于發(fā)布者:
1.消息通過消息網(wǎng)關(guān)發(fā)送出去,由 MessageChannel 的實例 DirectChannel 處理發(fā)送的細節(jié)。
2.DirectChannel 收到消息后,內(nèi)部通過 MessageHandler 的實例 MqttPahoMessageHandler 發(fā)送到指定的 Topic。
對于訂閱者:
1.通過注入 MessageProducerSupport 的實例 MqttPahoMessageDrivenChannelAdapter,實現(xiàn)訂閱 Topic 和綁定消息消費的 MessageChannel。
2.同樣由 MessageChannel 的實例 DirectChannel 處理消費細節(jié)。
Channel 消息后會發(fā)送給我們自定義的 MqttInboundMessageHandler 實例進行消費。
可以看到整個處理的流程和前面將的基本一致。Spring Integration 就是抽象出了這么一套消息通信的機制,具體的通信細節(jié)由它集成的中間件來決定
1、maven依賴
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.5.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.5.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.5</version>
</dependency>2、yaml配置文件
#mqtt配置
mqtt:
username: 123
password: 123
#MQTT-服務(wù)器連接地址,如果有多個,用逗號隔開
url: tcp://127.0.0.1:1883
#MQTT-連接服務(wù)器默認客戶端ID
client:
id: ${random.value}
default:
#MQTT-默認的消息推送主題,實際可在調(diào)用接口時指定
topic: topic,mqtt/test/#
#連接超時
completionTimeout: 30003、mqtt生產(chǎn)者消費者配置類
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.util.Arrays;
import java.util.List;
/**
* mqtt 推送and接收 消息類
**/
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderAndReceiveConfig {
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Autowired
private MqttReceiveHandle mqttReceiveHandle;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String hostUrl;
@Value("${mqtt.client.id}")
private String clientId;
@Value("${mqtt.default.topic}")
private String defaultTopic;
@Value("${mqtt.completionTimeout}")
private int completionTimeout; //連接超時
/**
* MQTT連接器選項
**/
@Bean(value = "getMqttConnectOptions")
public MqttConnectOptions getMqttConnectOptions1() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
mqttConnectOptions.setCleanSession(true);
// 設(shè)置超時時間 單位為秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
// 設(shè)置會話心跳時間 單位為秒 服務(wù)器會每隔1.5*20秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線,但這個方法并沒有重連的機制
mqttConnectOptions.setKeepAliveInterval(10);
// 設(shè)置“遺囑”消息的話題,若客戶端與服務(wù)器之間的連接意外中斷,服務(wù)器將發(fā)布客戶端的“遺囑”消息。
//mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
return mqttConnectOptions;
}
/**
* MQTT工廠
**/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions1());
return factory;
}
/**
* MQTT信息通道(生產(chǎn)者)
**/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息處理器(生產(chǎn)者)
**/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setAsyncEvents(true); // 消息發(fā)送和傳輸完成會有異步的通知回調(diào)
//設(shè)置轉(zhuǎn)換器 發(fā)送bytes數(shù)據(jù)
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
return messageHandler;
}
/**
* 配置client,監(jiān)聽的topic
* MQTT消息訂閱綁定(消費者)
**/
@Bean
public MessageProducer inbound() {
List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
String[] topics = new String[topicList.size()];
topicList.toArray(topics);
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics);
adapter.setCompletionTimeout(completionTimeout);
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* MQTT信息通道(消費者)
**/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* MQTT消息處理器(消費者)
**/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//處理接收消息
mqttReceiveHandle.handle(message);
}
};
}
}4、消息處理類
/**
* mqtt客戶端消息處理類
**/
@Slf4j
@Component
public class MqttReceiveHandle {
public void handle(Message<?> message) {
log.info("收到訂閱消息: {}", message);
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
log.info("消息主題:{}", topic);
Object payLoad = message.getPayload();
byte[] data = (byte[]) payLoad;
Packet packet = Packet.parse(data);
log.info("發(fā)送的Packet數(shù)據(jù){}", JSON.toJSONString(packet));
}
}5、mqtt發(fā)送接口
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
/**
* mqtt發(fā)送消息
* (defaultRequestChannel = "mqttOutboundChannel" 對應(yīng)config配置)
* **/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 發(fā)送信息到MQTT服務(wù)器
*
* @param
*/
void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
/**
* 發(fā)送信息到MQTT服務(wù)器
*
* @param topic 主題
* @param payload 消息主體
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 發(fā)送信息到MQTT服務(wù)器
*
* @param topic 主題
* @param qos 對消息處理的幾種機制。
* 0 表示的是訂閱者沒收到消息不會再次發(fā)送,消息會丟失。
* 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息。
* 2 多了一次去重的動作,確保訂閱者收到的消息有一次。
* @param payload 消息主體
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
/**
* 發(fā)送信息到MQTT服務(wù)器
*
* @param topic 主題
* @param payload 消息主體
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);
/**
* 發(fā)送信息到MQTT服務(wù)器
*
* @param topic 主題
* @param payload 消息主體
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
}6、mqtt事件監(jiān)聽類
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqttListener {
/**
* 連接失敗的事件通知
* @param mqttConnectionFailedEvent
*/
@EventListener(classes = MqttConnectionFailedEvent.class)
public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {
log.info("連接失敗的事件通知");
}
/**
* 已發(fā)送的事件通知
* @param mqttMessageSentEvent
*/
@EventListener(classes = MqttMessageSentEvent.class)
public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {
log.info("已發(fā)送的事件通知");
}
/**
* 已傳輸完成的事件通知
* 1.QOS == 0,發(fā)送消息后會即可進行此事件回調(diào),因為不需要等待回執(zhí)
* 2.QOS == 1,發(fā)送消息后會等待ACK回執(zhí),ACK回執(zhí)后會進行此事件通知
* 3.QOS == 2,發(fā)送消息后會等待PubRECV回執(zhí),知道收到PubCOMP后會進行此事件通知
* @param mqttMessageDeliveredEvent
*/
@EventListener(classes = MqttMessageDeliveredEvent.class)
public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {
log.info("已傳輸完成的事件通知");
}
/**
* 消息訂閱的事件通知
* @param mqttSubscribedEvent
*/
@EventListener(classes = MqttSubscribedEvent.class)
public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {
log.info("消息訂閱的事件通知");
}
}7、接口測試
@Resource
private MqttGateway mqttGateway;
/**
* sendData 消息
* topic 訂閱主題
**/
@RequestMapping(value = "/sendMqtt",method = RequestMethod.POST)
public String sendMqtt(String sendData, String topic) {
MqttMessage mqttMessage = new MqttMessage();
mqttGateway.sendToMqtt(topic, sendData);
//mqttGateway.sendToMqttObject(topic, sendData.getBytes());
return "OK";
}總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
- springboot集成mqtt的實踐開發(fā)
- springboot 實現(xiàn)mqtt物聯(lián)網(wǎng)的示例代碼
- springboot整合netty-mqtt-client實現(xiàn)Mqtt消息的訂閱和發(fā)布示例
- SpringBoot+MQTT+apollo實現(xiàn)訂閱發(fā)布功能的示例
- SpringBoot整合MQTT并實現(xiàn)異步線程調(diào)用的問題
- SpringBoot集成mqtt的多模塊項目配置詳解
- SpringBoot2.0集成MQTT消息推送功能實現(xiàn)
- Springboot整合mqtt服務(wù)的示例代碼
- springboot整合mqtt的詳細圖文教程
- springboot整合mqtt客戶端示例分享
相關(guān)文章
java面向?qū)ο缶幊讨匾拍罾^承和多態(tài)示例解析
這篇文章主要為大家介紹了java面向?qū)ο缶幊痰膬蓚€重要概念繼承和多態(tài)示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-05-05
Java學(xué)習(xí)基礎(chǔ)之安裝JDK/配置JDK環(huán)境&IEDA工具安裝
這篇文章主要介紹了Java學(xué)習(xí)基礎(chǔ)系列文章的第一篇,主要內(nèi)容是安裝JDK/配置JDK環(huán)境&IEDA工具安裝的相關(guān)資料,需要的朋友可以參考下2020-02-02
利用Spring Validation實現(xiàn)輸入驗證功能
這篇文章主要給大家介紹了如何利用Spring Validation完美的實現(xiàn)輸入驗證功能,文中有詳細的代碼示例,具有一定的參考價值,感興趣的朋友可以借鑒一下2023-06-06
Java二維數(shù)組與動態(tài)數(shù)組ArrayList類詳解
這篇文章主要給大家介紹了關(guān)于Java二維數(shù)組與動態(tài)數(shù)組ArrayList類的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
Java基礎(chǔ)之switch分支結(jié)構(gòu)詳解
這篇文章主要介紹了Java基礎(chǔ)之switch分支結(jié)構(gòu)詳解,文中有非常詳細的代碼示例,對正在學(xué)習(xí)java的小伙伴們有很大的幫助,需要的朋友可以參考下2021-05-05
SpringBoot在IDEA中實現(xiàn)熱部署(JRebel實用版)
這篇文章主要介紹了SpringBoot在IDEA中實現(xiàn)熱部署(JRebel實用版),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05
java數(shù)學(xué)歸納法非遞歸求斐波那契數(shù)列的方法
這篇文章主要介紹了java數(shù)學(xué)歸納法非遞歸求斐波那契數(shù)列的方法,涉及java非遞歸算法的使用技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-07-07

