Spring?boot?集成?MQTT詳情
一、簡介
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的"輕量級"通訊協(xié)議,可以以極少的代碼和有限的帶寬為連接遠(yuǎn)程設(shè)備提供實時可靠的消息服務(wù)。目前在物聯(lián)網(wǎng)、小型設(shè)備、移動應(yīng)用等方面有較廣泛的應(yīng)用。
二、主要特性
- (1)使用發(fā)布/訂閱消息模式,提供一對多的消息發(fā)布,解除應(yīng)用程序耦合。
- (2)對負(fù)載內(nèi)容屏蔽的消息傳輸。
- (3)使用TCP/IP提供網(wǎng)絡(luò)連接。
- (4)有三種消息發(fā)布服務(wù)質(zhì)量:
“至多一次”,消息發(fā)布完全依賴底層TCP/IP網(wǎng)絡(luò)。會發(fā)生消息丟失或重復(fù)。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送。這一種方式主要普通APP的推送,倘若你的智能設(shè)備在消息推送時未聯(lián)網(wǎng),推送過去沒收到,再次聯(lián)網(wǎng)也就收不到了。
“至少一次”,確保消息到達(dá),但消息重復(fù)可能會發(fā)生。
“只有一次”,確保消息到達(dá)一次。在一些要求比較嚴(yán)格的計費系統(tǒng)中,可以使用此級別。在計費系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果。這種最高質(zhì)量的消息發(fā)布服務(wù)還可以用于即時通訊類的APP的推送,確保用戶收到且只會收到一次。
- (5)小型傳輸,開銷很?。ü潭ㄩL度的頭部是2字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量。
- (6)使用Last Will和Testament特性通知有關(guān)各方客戶端異常中斷的機(jī)制。
Last Will:即遺言機(jī)制,用于通知同一主題下的其他設(shè)備發(fā)送遺言的設(shè)備已經(jīng)斷開了連接。
Testament:遺囑機(jī)制,功能類似于Last Will。
三、集成步驟
1.引入相關(guān)jar包
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency> 2.核心配置類
@Configuration
public class MqttConfig
{
@Autowired
private MqttProperties mqttProperties;
/**
* 連接器
* @return
*/
@Bean
public MqttConnectOptions getMqttConnectOptions()
{
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 設(shè)置是否清空session,false表示服務(wù)器會保留客戶端的連接記錄,true表示每次連接到服務(wù)器都以新的身份連接
mqttConnectOptions.setCleanSession(true);
// 設(shè)置超時時間,默認(rèn)30秒
mqttConnectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeOut());
mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
mqttConnectOptions.setAutomaticReconnect(true);
// 設(shè)置連接的用戶名
mqttConnectOptions.setUserName(mqttProperties.getUsername());
// 設(shè)置連接的密碼
mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
//服務(wù)器地址
mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()});
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
/***
* MQTT客戶端
* @return
*/
@Bean("mqttClientFactory")
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/*----------------- 消息生產(chǎn)者的配置 ---------------------*/
/**
* MQTT生產(chǎn)端發(fā)布處理器
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getProducerClientId(), mqttClientFactory());
messageHandler.setAsync(true);
return messageHandler;
}
/**
* MQTT生產(chǎn)端發(fā)布通道
* @return
*/
@Bean("mqttOutboundChannel")
public MessageChannel mqttOutboundChannel()
{
return new DirectChannel();
}
/*----------------- 消息消費者的配置 ---------------------*/
/**
* MQTT消費端訂閱通道
*
* @return {@link org.springframework.messaging.MessageChannel}
*/
@Bean(name = "mqttInboundChannel")
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消費端連接配置
*
* @param channel {@link org.springframework.messaging.MessageChannel}
* @param factory {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
* @return {@link org.springframework.integration.core.MessageProducer}
*/
@Bean
public MessageProducer inbound(
@Qualifier("mqttInboundChannel") MessageChannel channel,
@Qualifier("mqttClientFactory") MqttPahoClientFactory factory) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getConsumerClientId(), factory, "test");
adapter.setCompletionTimeout(30000);
adapter.setConverter(new DefaultPahoMessageConverter());
// 0 至多一次,數(shù)據(jù)可能丟失
// 1 至少一次,數(shù)據(jù)可能重復(fù)
// 2 只有一次,且僅有一次,最耗性能
adapter.setQos(1);
// 設(shè)置訂閱通道
adapter.setOutputChannel(channel);
return adapter;
}
}@ConfigurationProperties("mqtt")
@Component
public class MqttProperties implements Serializable
{
private static final long serialVersionUID = -1425980007744001158L;
private String url;
private String username;
private String password;
private int keepAlive;
private int connectionTimeOut;
private String producerClientId;
private String producerQos;
private String consumerClientId;
private String consumerQos;
private String consumerTopic;
private int completionTimeout;
private String defaultTopic;
//get、set方法省略
}3.網(wǎng)關(guān)配置
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway
{
void sendToMqtt(byte[] data,@Header(MqttHeaders.TOPIC) String topic);
}4.編寫測試類
@Autowired
private MqttGateway mqttGateway;
@RequestMapping("/sendTest")
public String sendMqttTest(String msg)
{
mqttGateway.send("test",msg);
return "OK";
}5.yml配置信息
mqtt: url: tcp://localhost:1883 username: test password: test1234 keep-alive: 30 connection-timeout: 3000 producerClientId: test-producer producerQos: 1 consumerClientId: test-consumer consumerQos: 1 deafultTopic : test
到此這篇關(guān)于Spring boot 集成 MQTT詳情的文章就介紹到這了,更多相關(guān)Spring boot 集成 MQTT內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springcloud使用feign調(diào)用服務(wù)時參數(shù)內(nèi)容過大問題
這篇文章主要介紹了springcloud使用feign調(diào)用服務(wù)時參數(shù)內(nèi)容過大問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
Spring中@DependsOn注解的作用及實現(xiàn)原理解析
這篇文章主要介紹了Spring中@DependsOn注解的作用及實現(xiàn)原理解析,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03
springboot+zookeeper實現(xiàn)分布式鎖的示例代碼
本文主要介紹了springboot+zookeeper實現(xiàn)分布式鎖的示例代碼,文中根據(jù)實例編碼詳細(xì)介紹的十分詳盡,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-03-03
深入淺析Netty 在 Dubbo 中是如何應(yīng)用的
國內(nèi)知名框架 Dubbo 底層使用的是 Netty 作為網(wǎng)絡(luò)通信,那么內(nèi)部到底是如何使用的呢?今天通過本文給大家詳細(xì)講解,對Netty 在 Dubbo中應(yīng)用相關(guān)知識感興趣的朋友跟隨小編一起看看吧2020-05-05

