SpringBoot集成MQTT示例詳解
引言
特別提醒: 文中提到的MQTT服務(wù)器Apache-Apollo,現(xiàn)在已經(jīng)不維護(hù)。但是客戶端的寫法是通用的。目前我常用的是RabbitMQ加mqtt插件。
MQTT
MQTT(消息隊(duì)列遙測(cè)傳輸)是ISO標(biāo)準(zhǔn)(ISO/IEC PRF 20922)下基于發(fā)布/訂閱范式的消息協(xié)議。它工作在 TCP/IP協(xié)議族上,是為硬件性能低下的遠(yuǎn)程設(shè)備以及網(wǎng)絡(luò)狀況糟糕的情況下而設(shè)計(jì)的發(fā)布/訂閱型消息協(xié)議。國(guó)內(nèi)很多企業(yè)都廣泛使用MQTT作為Android手機(jī)客戶端與服務(wù)器端推送消息的協(xié)議。
特點(diǎn)
MQTT協(xié)議是為大量計(jì)算能力有限,且工作在低帶寬、不可靠的網(wǎng)絡(luò)的遠(yuǎn)程傳感器和控制設(shè)備通訊而設(shè)計(jì)的協(xié)議,它具有以下主要的幾項(xiàng)特性:
- 使用發(fā)布/訂閱消息模式,提供一對(duì)多的消息發(fā)布,解除應(yīng)用程序耦合;
- 對(duì)負(fù)載內(nèi)容屏蔽的消息傳輸;
- 使用TCP/IP提供網(wǎng)絡(luò)連接;
- 有三種消息發(fā)布服務(wù)質(zhì)量;
至多一次:消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會(huì)發(fā)生消息丟失或重復(fù)。這一級(jí)別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無(wú)所謂,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送。
至少一次:確保消息到達(dá),但消息重復(fù)可能會(huì)發(fā)生。
只有一次:確保消息到達(dá)一次。這一級(jí)別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或丟失會(huì)導(dǎo)致不正確的結(jié)果。
- 小型傳輸,開(kāi)銷很?。ü潭ㄩL(zhǎng)度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量;
- 使用Last Will和Testament特性通知有關(guān)各方客戶端異常中斷的機(jī)制。
Apache-Apollo
Apache Apollo是一個(gè)代理服務(wù)器,其是在ActiveMQ基礎(chǔ)上發(fā)展而來(lái)的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, WebSockets 等多種協(xié)議。
原理:服務(wù)器端創(chuàng)建一個(gè)唯一訂閱號(hào),發(fā)送者可以向這個(gè)訂閱號(hào)中發(fā)東西,然后接受者(即訂閱了這個(gè)訂閱號(hào)的人)都會(huì)收到這個(gè)訂閱號(hào)發(fā)出來(lái)的消息。以此來(lái)完成消息的推送。服務(wù)器其實(shí)是一個(gè)消息中轉(zhuǎn)站。
下載
下載地址:http://archive.apache.org/dist/activemq/activemq-apollo/
配置與啟動(dòng)
- 需要安裝JDK環(huán)境
- 在命令行模式下進(jìn)入bin,執(zhí)行apollo create mybroker d:\apache-apollo\broker,創(chuàng)建一個(gè)名為mybroker虛擬主機(jī)(Virtual Host)。需要特別注意的是,生成的目錄就是以后真正啟動(dòng)程序的位置。
- 在命令行模式下進(jìn)入d:\apache-apollo\broker\bin,執(zhí)行apollo-broker run,也可以用apollo-broker-service.exe配置服務(wù)。
- 訪問(wèn)http://127.0.0.1:61680打開(kāi)web管理界面。(密碼查看broker/etc/users.properties)
- 啟動(dòng)端口,看cmd輸出。
SpringBoot2的開(kāi)發(fā)
添加依賴
<!-- spring-boot版本 2.1.0.RELEASE --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
自定義配置
# src/main/resources/config/mqtt.properties ################## # MQTT 配置 ################## # 用戶名 mqtt.username=admin # 密碼 mqtt.password=password # 推送信息的連接地址,如果有多個(gè),用逗號(hào)隔開(kāi),如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613 mqtt.url=tcp://127.0.0.1:61613 ################## # MQTT 生產(chǎn)者 ################## # 連接服務(wù)器默認(rèn)客戶端ID mqtt.producer.clientId=mqttProducer # 默認(rèn)的推送主題,實(shí)際可在調(diào)用接口時(shí)指定 mqtt.producer.defaultTopic=topic1 ################## # MQTT 消費(fèi)者 ################## # 連接服務(wù)器默認(rèn)客戶端ID mqtt.consumer.clientId=mqttConsumer # 默認(rèn)的接收主題,可以訂閱多個(gè)Topic,逗號(hào)分隔 mqtt.consumer.defaultTopic=topic1
配置MQTT發(fā)布和訂閱
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* MQTT配置,生產(chǎn)者
*
* @author BBF
*/
@Configuration
public class MqttConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
/**
* 訂閱的bean名稱
*/
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
/**
* 發(fā)布的bean名稱
*/
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.producer.clientId}")
private String producerClientId;
@Value("${mqtt.producer.defaultTopic}")
private String producerDefaultTopic;
@Value("${mqtt.consumer.clientId}")
private String consumerClientId;
@Value("${mqtt.consumer.defaultTopic}")
private String consumerDefaultTopic;
/**
* MQTT連接器選項(xiàng)
*
* @return {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions}
*/
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,
// 這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
options.setCleanSession(true);
// 設(shè)置連接的用戶名
options.setUserName(username);
// 設(shè)置連接的密碼
options.setPassword(password.toCharArray());
options.setServerURIs(StringUtils.split(url, ","));
// 設(shè)置超時(shí)時(shí)間 單位為秒
options.setConnectionTimeout(10);
// 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線,但這個(gè)方法并沒(méi)有重連的機(jī)制
options.setKeepAliveInterval(20);
// 設(shè)置“遺囑”消息的話題,若客戶端與服務(wù)器之間的連接意外中斷,服務(wù)器將發(fā)布客戶端的“遺囑”消息。
options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}
/**
* MQTT客戶端
*
* @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/**
* MQTT信息通道(生產(chǎn)者)
*
* @return {@link org.springframework.messaging.MessageChannel}
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息處理器(生產(chǎn)者)
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
producerClientId,
mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(producerDefaultTopic);
return messageHandler;
}
/**
* MQTT消息訂閱綁定(消費(fèi)者)
*
* @return {@link org.springframework.integration.core.MessageProducer}
*/
@Bean
public MessageProducer inbound() {
// 可以同時(shí)消費(fèi)(訂閱)多個(gè)Topic
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
consumerClientId, mqttClientFactory(),
StringUtils.split(consumerDefaultTopic, ","));
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 設(shè)置訂閱通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT信息通道(消費(fèi)者)
*
* @return {@link org.springframework.messaging.MessageChannel}
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息處理器(消費(fèi)者)
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOGGER.error("===================={}============", message.getPayload());
}
};
}
}消息發(fā)布器
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* MQTT生產(chǎn)者消息發(fā)送接口
* <p>MessagingGateway要指定生產(chǎn)者的通道名稱</p>
* @author BBF
*/
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {
/**
* 發(fā)送信息到MQTT服務(wù)器
*
* @param data 發(fā)送的文本
*/
void sendToMqtt(String data);
/**
* 發(fā)送信息到MQTT服務(wù)器
*
* @param topic 主題
* @param payload 消息主體
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
String payload);
/**
* 發(fā)送信息到MQTT服務(wù)器
*
* @param topic 主題
* @param qos 對(duì)消息處理的幾種機(jī)制。
0 表示的是訂閱者沒(méi)收到消息不會(huì)再次發(fā)送,消息會(huì)丟失。
* 1 表示的是會(huì)嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息。
* 2 多了一次去重的動(dòng)作,確保訂閱者收到的消息有一次。
* @param payload 消息主體
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload);
}發(fā)送消息
/**
* MQTT消息發(fā)送
*
* @author BBF
*/
@Controller
@RequestMapping(value = "/")
public class MqttController {
/**
* 注入發(fā)送MQTT的Bean
*/
@Resource
private IMqttSender iMqttSender;
/**
* 發(fā)送MQTT消息
* @param message 消息內(nèi)容
* @return 返回
*/
@ResponseBody
@GetMapping(value = "/mqtt", produces ="text/html")
public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String message) {
iMqttSender.sendToMqtt(message);
return new ResponseEntity<>("OK", HttpStatus.OK);
}
}入口類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.PropertySource;
/**
* SpringBoot 入口類
*
* @author BBF
*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class,
HibernateJpaAutoConfiguration.class})
@PropertySource(encoding = "UTF-8", value = {"classpath:config/mqtt.properties"})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}代碼
https://gitee.com/bbfbbf/mqtt-test
以上就是SpringBoot集成MQTT示例詳解的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot集成MQTT的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
JavaBean valication驗(yàn)證實(shí)現(xiàn)方法示例
這篇文章主要介紹了JavaBean valication驗(yàn)證實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了JavaBean valication驗(yàn)證相關(guān)概念、原理、用法及操作注意事項(xiàng),需要的朋友可以參考下2020-03-03
Java實(shí)現(xiàn)簡(jiǎn)單通訊錄管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單通訊錄管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07
解決運(yùn)行jar包出錯(cuò):ClassNotFoundException問(wèn)題
這篇文章主要介紹了解決運(yùn)行jar包出錯(cuò):ClassNotFoundException問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄
這篇文章主要介紹了Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01
swagger添加權(quán)限驗(yàn)證保證API(接口)安全性(兩種方法)
這篇文章主要介紹了swagger添加權(quán)限驗(yàn)證保證API(接口)安全性(兩種方法),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01
Spring?WebClient實(shí)戰(zhàn)示例
本文主要介紹了Spring?WebClient實(shí)戰(zhàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01

