SpringBoot集成MQTT示例詳解
引言
特別提醒: 文中提到的MQTT服務(wù)器Apache-Apollo,現(xiàn)在已經(jīng)不維護(hù)。但是客戶端的寫(xiě)法是通用的。目前我常用的是RabbitMQ加mqtt插件。
MQTT
MQTT(消息隊(duì)列遙測(cè)傳輸)是ISO標(biāo)準(zhǔn)(ISO/IEC PRF 20922)下基于發(fā)布/訂閱范式的消息協(xié)議。它工作在 TCP/IP協(xié)議族上,是為硬件性能低下的遠(yuǎn)程設(shè)備以及網(wǎng)絡(luò)狀況糟糕的情況下而設(shè)計(jì)的發(fā)布/訂閱型消息協(xié)議。國(guó)內(nèi)很多企業(yè)都廣泛使用MQTT作為Android手機(jī)客戶端與服務(wù)器端推送消息的協(xié)議。
特點(diǎn)
MQTT協(xié)議是為大量計(jì)算能力有限,且工作在低帶寬、不可靠的網(wǎng)絡(luò)的遠(yuǎn)程傳感器和控制設(shè)備通訊而設(shè)計(jì)的協(xié)議,它具有以下主要的幾項(xiàng)特性:
- 使用發(fā)布/訂閱消息模式,提供一對(duì)多的消息發(fā)布,解除應(yīng)用程序耦合;
- 對(duì)負(fù)載內(nèi)容屏蔽的消息傳輸;
- 使用TCP/IP提供網(wǎng)絡(luò)連接;
- 有三種消息發(fā)布服務(wù)質(zhì)量;
至多一次:消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會(huì)發(fā)生消息丟失或重復(fù)。這一級(jí)別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無(wú)所謂,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送。
至少一次:確保消息到達(dá),但消息重復(fù)可能會(huì)發(fā)生。
只有一次:確保消息到達(dá)一次。這一級(jí)別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或丟失會(huì)導(dǎo)致不正確的結(jié)果。
- 小型傳輸,開(kāi)銷很小(固定長(zhǎng)度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量;
- 使用Last Will和Testament特性通知有關(guān)各方客戶端異常中斷的機(jī)制。
Apache-Apollo
Apache Apollo是一個(gè)代理服務(wù)器,其是在ActiveMQ基礎(chǔ)上發(fā)展而來(lái)的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, WebSockets 等多種協(xié)議。
原理:服務(wù)器端創(chuàng)建一個(gè)唯一訂閱號(hào),發(fā)送者可以向這個(gè)訂閱號(hào)中發(fā)東西,然后接受者(即訂閱了這個(gè)訂閱號(hào)的人)都會(huì)收到這個(gè)訂閱號(hào)發(fā)出來(lái)的消息。以此來(lái)完成消息的推送。服務(wù)器其實(shí)是一個(gè)消息中轉(zhuǎn)站。
下載
下載地址:http://archive.apache.org/dist/activemq/activemq-apollo/
配置與啟動(dòng)
- 需要安裝JDK環(huán)境
- 在命令行模式下進(jìn)入bin,執(zhí)行apollo create mybroker d:\apache-apollo\broker,創(chuàng)建一個(gè)名為mybroker虛擬主機(jī)(Virtual Host)。需要特別注意的是,生成的目錄就是以后真正啟動(dòng)程序的位置。
- 在命令行模式下進(jìn)入d:\apache-apollo\broker\bin,執(zhí)行apollo-broker run,也可以用apollo-broker-service.exe配置服務(wù)。
- 訪問(wèn)http://127.0.0.1:61680打開(kāi)web管理界面。(密碼查看broker/etc/users.properties)
- 啟動(dòng)端口,看cmd輸出。
SpringBoot2的開(kāi)發(fā)
添加依賴
<!-- spring-boot版本 2.1.0.RELEASE --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
自定義配置
# src/main/resources/config/mqtt.properties ################## # MQTT 配置 ################## # 用戶名 mqtt.username=admin # 密碼 mqtt.password=password # 推送信息的連接地址,如果有多個(gè),用逗號(hào)隔開(kāi),如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613 mqtt.url=tcp://127.0.0.1:61613 ################## # MQTT 生產(chǎn)者 ################## # 連接服務(wù)器默認(rèn)客戶端ID mqtt.producer.clientId=mqttProducer # 默認(rèn)的推送主題,實(shí)際可在調(diào)用接口時(shí)指定 mqtt.producer.defaultTopic=topic1 ################## # MQTT 消費(fèi)者 ################## # 連接服務(wù)器默認(rèn)客戶端ID mqtt.consumer.clientId=mqttConsumer # 默認(rèn)的接收主題,可以訂閱多個(gè)Topic,逗號(hào)分隔 mqtt.consumer.defaultTopic=topic1
配置MQTT發(fā)布和訂閱
import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; /** * MQTT配置,生產(chǎn)者 * * @author BBF */ @Configuration public class MqttConfig { private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class); private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } /** * 訂閱的bean名稱 */ public static final String CHANNEL_NAME_IN = "mqttInboundChannel"; /** * 發(fā)布的bean名稱 */ public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel"; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.url}") private String url; @Value("${mqtt.producer.clientId}") private String producerClientId; @Value("${mqtt.producer.defaultTopic}") private String producerDefaultTopic; @Value("${mqtt.consumer.clientId}") private String consumerClientId; @Value("${mqtt.consumer.defaultTopic}") private String consumerDefaultTopic; /** * MQTT連接器選項(xiàng) * * @return {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions} */ @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); // 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄, // 這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接 options.setCleanSession(true); // 設(shè)置連接的用戶名 options.setUserName(username); // 設(shè)置連接的密碼 options.setPassword(password.toCharArray()); options.setServerURIs(StringUtils.split(url, ",")); // 設(shè)置超時(shí)時(shí)間 單位為秒 options.setConnectionTimeout(10); // 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線,但這個(gè)方法并沒(méi)有重連的機(jī)制 options.setKeepAliveInterval(20); // 設(shè)置“遺囑”消息的話題,若客戶端與服務(wù)器之間的連接意外中斷,服務(wù)器將發(fā)布客戶端的“遺囑”消息。 options.setWill("willTopic", WILL_DATA, 2, false); return options; } /** * MQTT客戶端 * * @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory} */ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } /** * MQTT信息通道(生產(chǎn)者) * * @return {@link org.springframework.messaging.MessageChannel} */ @Bean(name = CHANNEL_NAME_OUT) public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * MQTT消息處理器(生產(chǎn)者) * * @return {@link org.springframework.messaging.MessageHandler} */ @Bean @ServiceActivator(inputChannel = CHANNEL_NAME_OUT) public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( producerClientId, mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(producerDefaultTopic); return messageHandler; } /** * MQTT消息訂閱綁定(消費(fèi)者) * * @return {@link org.springframework.integration.core.MessageProducer} */ @Bean public MessageProducer inbound() { // 可以同時(shí)消費(fèi)(訂閱)多個(gè)Topic MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( consumerClientId, mqttClientFactory(), StringUtils.split(consumerDefaultTopic, ",")); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); // 設(shè)置訂閱通道 adapter.setOutputChannel(mqttInboundChannel()); return adapter; } /** * MQTT信息通道(消費(fèi)者) * * @return {@link org.springframework.messaging.MessageChannel} */ @Bean(name = CHANNEL_NAME_IN) public MessageChannel mqttInboundChannel() { return new DirectChannel(); } /** * MQTT消息處理器(消費(fèi)者) * * @return {@link org.springframework.messaging.MessageHandler} */ @Bean @ServiceActivator(inputChannel = CHANNEL_NAME_IN) public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { LOGGER.error("===================={}============", message.getPayload()); } }; } }
消息發(fā)布器
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; /** * MQTT生產(chǎn)者消息發(fā)送接口 * <p>MessagingGateway要指定生產(chǎn)者的通道名稱</p> * @author BBF */ @Component @MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT) public interface IMqttSender { /** * 發(fā)送信息到MQTT服務(wù)器 * * @param data 發(fā)送的文本 */ void sendToMqtt(String data); /** * 發(fā)送信息到MQTT服務(wù)器 * * @param topic 主題 * @param payload 消息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 發(fā)送信息到MQTT服務(wù)器 * * @param topic 主題 * @param qos 對(duì)消息處理的幾種機(jī)制。 0 表示的是訂閱者沒(méi)收到消息不會(huì)再次發(fā)送,消息會(huì)丟失。 * 1 表示的是會(huì)嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息。 * 2 多了一次去重的動(dòng)作,確保訂閱者收到的消息有一次。 * @param payload 消息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
發(fā)送消息
/** * MQTT消息發(fā)送 * * @author BBF */ @Controller @RequestMapping(value = "/") public class MqttController { /** * 注入發(fā)送MQTT的Bean */ @Resource private IMqttSender iMqttSender; /** * 發(fā)送MQTT消息 * @param message 消息內(nèi)容 * @return 返回 */ @ResponseBody @GetMapping(value = "/mqtt", produces ="text/html") public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String message) { iMqttSender.sendToMqtt(message); return new ResponseEntity<>("OK", HttpStatus.OK); } }
入口類
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; import org.springframework.context.annotation.PropertySource; /** * SpringBoot 入口類 * * @author BBF */ @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class}) @PropertySource(encoding = "UTF-8", value = {"classpath:config/mqtt.properties"}) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
代碼
https://gitee.com/bbfbbf/mqtt-test
以上就是SpringBoot集成MQTT示例詳解的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot集成MQTT的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
JavaBean valication驗(yàn)證實(shí)現(xiàn)方法示例
這篇文章主要介紹了JavaBean valication驗(yàn)證實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了JavaBean valication驗(yàn)證相關(guān)概念、原理、用法及操作注意事項(xiàng),需要的朋友可以參考下2020-03-03Java實(shí)現(xiàn)簡(jiǎn)單通訊錄管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單通訊錄管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07解決運(yùn)行jar包出錯(cuò):ClassNotFoundException問(wèn)題
這篇文章主要介紹了解決運(yùn)行jar包出錯(cuò):ClassNotFoundException問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄
這篇文章主要介紹了Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01swagger添加權(quán)限驗(yàn)證保證API(接口)安全性(兩種方法)
這篇文章主要介紹了swagger添加權(quán)限驗(yàn)證保證API(接口)安全性(兩種方法),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01Spring?WebClient實(shí)戰(zhàn)示例
本文主要介紹了Spring?WebClient實(shí)戰(zhàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01