欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合MQTT小結(jié)匯總

 更新時間:2022年01月26日 16:14:44   作者:Be?Moster  
MQTT 客戶端是運行 MQTT 庫并通過網(wǎng)絡連接到 MQTT 代理的任何設(shè)備,是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級”通訊協(xié)議,該協(xié)議構(gòu)建于 TCP/IP 協(xié)議上,由 IBM 于 1999 年發(fā)明,對SpringBoot整合MQTT相關(guān)知識感興趣的朋友一起看看吧

前言:

        這幾天在準備面試的過程中做的一個小demo,主要是用通過SpringBoot實現(xiàn)一個與MQTT服務交互通信,也是看著別人的項目改的,這兩個技術(shù)之前都沒有接觸過,希望記錄一下可以分享給大家,也好久沒更新了,借此機會更新一波blog。在正式的開始這個項目前還是學了一下SSM和SpringBoot的基礎(chǔ),上手起來不會這么的無力。期間也是查閱了很多的資料和詢問了諸多大佬。

好了話不多說,一步步的搭建項目和原理詳解就在下面了

一、什么是mqtt

        MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級”通訊協(xié)議,該協(xié)議構(gòu)建于 TCP/IP 協(xié)議上,由 IBM 于 1999 年發(fā)明。MQTT 協(xié)議的主要特征是開放、簡單、輕量級和易于實現(xiàn),這些特征使得它適用于受約束的應用環(huán)境,如:

網(wǎng)絡受限:網(wǎng)絡帶寬較低且傳輸不可靠
終端受限:協(xié)議運行在嵌入式設(shè)備上,嵌入式終端的處理器、內(nèi)存等是受限的

        通過 MQTT 協(xié)議,目前已經(jīng)擴展出了數(shù)十種 MQTT 服務器端程序,可以通過 PHP、Java、Python、C、C# 等語言向 MQTT 發(fā)送消息。由于開放源代碼、耗電量小等特點,MQTT 非常適用于物聯(lián)網(wǎng)領(lǐng)域,如傳感器與服務器的通信、傳感器信息采集等。

二、主要思想

發(fā)布/訂閱模式

        訂閱發(fā)布模式定義了一種一對多的依賴關(guān)系,讓多個訂閱者對象同時監(jiān)聽某一個主題對象。這個主題對象在自身狀態(tài)變化時,會通知所有訂閱者對象,使它們能夠自動更新自己的狀態(tài)。

       將一個系統(tǒng)分割成一系列相互協(xié)作的類有一個很不好的副作用,那就是需要維護相應對象間的一致性,這樣會給維護、擴展和重用都帶來不便。當一個對象的改變需要同時改變其他對象,而且它不知道具體有多少對象需要改變時,就可以使用訂閱發(fā)布模式了。

       一個抽象模型有兩個方面,其中一方面依賴于另一方面,這時訂閱發(fā)布模式可以將這兩者封裝在獨立的對象中,使它們各自獨立地改變和復用。訂閱發(fā)布模式所做的工作其實就是在解耦合。讓耦合的雙方都依賴于抽象,而不是依賴于具體,從而使得各自的變化都不會影響另一邊的變化。

發(fā)布/訂閱模式并不是 MQTT 協(xié)議特有的模式,像我們很多消息中間件都有使用發(fā)布/訂閱模式,這里你是不是想說,這不就是我們所說的觀察者模式嘛,還真不是,這兩個模式很容易混淆。觀察者模式只有觀察者 + 被觀察者兩個角色,而發(fā)布/訂閱模式還有一個經(jīng)紀人 Broker;往更深層次的講觀察者和被觀察者,是松耦合的關(guān)系,而發(fā)布者和訂閱者,則完全不存在耦合。

在我們?nèi)粘懗绦驎r,經(jīng)常遇到下面這種情況:

public void 前端業(yè)務/硬件業(yè)務()
{
    刷新界面();
    更新數(shù)據(jù)庫();
    對界面更新數(shù)據(jù)();
    ………………………………
}

當有前端和硬件業(yè)務產(chǎn)生時,需要依次要去執(zhí)行:刷新界面()、更新數(shù)據(jù)庫()、對界面更新數(shù)據(jù)()等操作。表面上看代碼寫得很工整,其實這里面有很多的問題:

首先,這完全是面向過程開發(fā),根本不適合大型項目。
第二,代碼維護量太大。設(shè)想一下,如果產(chǎn)生業(yè)務后要執(zhí)行10多個操作,那這將是個多么大,多少復雜的類呀,時間一長,可能連開發(fā)者自己都不知道如何去維護了。
第三,擴展性差。如果產(chǎn)生業(yè)務后,要增加一個聲音提示()功能,怎么辦呢?沒錯,只能加在前端業(yè)務/硬件業(yè)務()這個函數(shù)中,這樣一來,就違反了“開放-關(guān)閉原則”。而且修改了原有的函數(shù),那么在測試時,除了要測新增功能外,還要做原功能的回歸測試;在一個大型項目中,做一次回歸測試可能要花費大約兩周左右的時間,而且前提是新增功能沒有影響原來功能及產(chǎn)生新的bug。
那么如何把前端業(yè)務/硬件業(yè)務()函數(shù)同其他函數(shù)進行解耦合呢?別著急,下面就介紹今天的主角----訂閱發(fā)布模式。見下圖:

上面的流程就是對有告警信息產(chǎn)生()這個函數(shù)的描述。我們要做的,就是把產(chǎn)生告警和它需要通知的事件進行解耦,讓它們之間沒有相互依賴的關(guān)系,解耦合圖如下:

事件觸發(fā)者被抽象出來,稱為消息發(fā)布者,即圖中的P。事件接受都被抽象出來,稱為消息訂閱者,即圖中的S。P與S之間通過Broker(即訂閱器)連接。這樣就實現(xiàn)了P與S的解耦。首先,P就把消息發(fā)送到指定的訂閱器上,從始至終,它并不知道也不關(guān)心要把消息發(fā)向哪個S。S如果想接收消息,就要向訂閱器進行訂閱,訂閱成功后,S就可以接收來自Broker的消息了,從始至終,S并不知道也不關(guān)心消息來源于哪個具體的P。同理,S還可以向Broker進行退訂操作,成功退訂后,S就無法接收到來自指定Broker的消息了。這樣就完美的解決了P與S之間的解耦。

三、MQTT重要概念

3.1 MQTT Client

publisher 和 subscriber 都屬于 MQTT Client,之所以有發(fā)布者和訂閱者這個概念,其實是一種相對的概念,就是指當前客戶端是在發(fā)布消息還是在接收消息,發(fā)布和訂閱的功能也可以由同一個 MQTT Client 實現(xiàn)。

MQTT 客戶端是運行 MQTT 庫并通過網(wǎng)絡連接到 MQTT 代理的任何設(shè)備(從微控制器到成熟的服務器)。例如,MQTT 客戶端可以是一個非常小的、資源受限的設(shè)備,它通過無線網(wǎng)絡進行連接并具有一個最低限度的庫?;旧希魏问褂?TCP/IP 協(xié)議使用 MQTT 設(shè)備的都可以稱之為 MQTT Client。MQTT 協(xié)議的客戶端實現(xiàn)非常簡單直接,易于實施是 MQTT 非常適合小型設(shè)備的原因之一。MQTT 客戶端庫可用于多種編程語言。例如,Android、Arduino、C、C++、C#、Go、iOS、Java、JavaScript 和 .NET。        

3.2 MQTT Broker

與 MQTT Client 對應的就是 MQTT Broker,Broker 是任何發(fā)布/訂閱協(xié)議的核心,根據(jù)實現(xiàn)的不同,代理可以處理多達數(shù)百萬連接的 MQTT Client。

Broker 負責接收所有消息,過濾消息,確定是哪個Client 訂閱了每條消息,并將消息發(fā)送給對應的 Client,Broker 還負責保存會話數(shù)據(jù),這些數(shù)據(jù)包括訂閱的和錯過的消息。Broker 還負責客戶端的身份驗證和授權(quán)。

3.3 MQTT Connection

MQTT 協(xié)議基于 TCP/IP??蛻舳撕痛矶夹枰幸粋€ TCP/IP 協(xié)議支持。

 MQTT 連接始終位于一個客戶端和代理之間??蛻舳藦牟恢苯酉嗷ミB接。要發(fā)起連接,客戶端向代理發(fā)送 CONNECT 消息。代理使用 CONNACK 消息和狀態(tài)代碼進行響應。建立連接后,代理將保持打開狀態(tài),直到客戶端發(fā)送斷開連接命令或連接中斷。

3.4 MQTT主要參數(shù)

ClientId:ClientId 的長度可以是 1-23 個字符,在一個服務器上 ClientId 不能重復。如果超過 23 個字符,則服務器返回 CONNACK 消息中的返回碼為 Identifier Rejected。在 MQTT 3.1.1 中,如果您不需要代理持有狀態(tài),您可以發(fā)送一個空的 ClientId??盏?ClientId 導致連接沒有任何狀態(tài)。在這種情況下,clean session 標志必須設(shè)置為 true,否則代理將拒絕連接。

Clean Session:Clean Session 標志告訴代理客戶端是否要建立持久會話。在持久會話 (CleanSession = false) 中,代理存儲客戶端的所有訂閱以及以服務質(zhì)量(QoS)級別 1 或 2 訂閱的客戶端的所有丟失消息。 如果會話不是持久的 (CleanSession = true ),代理不為客戶端存儲任何內(nèi)容,并清除任何先前持久會話中的所有信息。

Username/Password:MQTT 可以發(fā)送用戶名和密碼進行客戶端認證和授權(quán)。但是,如果此信息未加密或散列,則密碼將以純文本形式發(fā)送。我們強烈建議將用戶名和密碼與安全傳輸一起使用。像 HiveMQ 這樣的代理可以使用 SSL 證書對客戶端進行身份驗證,因此不需要用戶名和密碼。

Will Message:LastWillxxx 表示的是遺愿,client 在連接 broker 的時候?qū)O(shè)立一個遺愿,這個遺愿會保存在 broker 中,當 client 因為非正常原因斷開與 broker 的連接時,broker 會將遺愿發(fā)送給訂閱了這個 topic(訂閱遺愿的 topic)的 client。

KeepAlive:keepAlive 是 client 在連接建立時與 broker 通信的時間間隔,通常以秒為單位。這個時間指的是 client 與 broker 在不發(fā)送消息下所能承受的最大時長。

QOS:此數(shù)字表示消息的服務質(zhì)量 (QoS)。有三個級別:0、1 和 2。服務級別決定了消息到達預期接收者(客戶端或代理)的保證類型。

Payload:這個是每條消息的實際內(nèi)容。MQTT 是數(shù)據(jù)無關(guān)性的??梢园l(fā)送任何文本、圖像、加密數(shù)據(jù)以及二進制數(shù)據(jù)。

timeout:MQTT會嘗試接收數(shù)據(jù),直到timeout時間到后才會退出。

四、軟件和Apollo

4.1 安裝Apollo

Apollo(阿波羅)是攜程框架部門研發(fā)的分布式配置中心,能夠集中化管理應用不同環(huán)境、不同集群的配置,配置修改后能夠?qū)崟r推送到應用端,并且具備規(guī)范的權(quán)限、流程治理等特性,適用于微服務配置管理場景。

服務端基于Spring Boot和Spring Cloud開發(fā),打包后可以直接運行,不需要額外安裝Tomcat等應用容器。

Java客戶端不依賴任何框架,能夠運行于所有Java運行時環(huán)境,同時對Spring/Spring Boot環(huán)境也有較好的支持。

Apollo下載地址

http://xn--apollo-np7ii83deeq211d/

相關(guān)鏈接:

Apollo 官方安裝教程:https://github.com/ctripcorp/apollo/wiki/Quick-Start
Apollo 分布式部署官方指南:https://github.com/ctripcorp/apollo/wiki/%E5%88%86%E5%B8%83%E5%BC%8F%E9%83%A8%E7%BD%B2%E6%8C%87%E5%8D%97
Apollo Github 地址:https://github.com/ctripcorp/apollo

4.1.1 解壓,進入到D:\java\apache-apollo-1.7.1\bin 目錄下,執(zhí)行命令

.\apollo.cmd create mybroker2

4.1.2 進入剛剛創(chuàng)

4.1.2 進入剛剛創(chuàng)建好的mybroker/bin目錄,執(zhí)行:

.\apollo-broker.cmd run

4.1.3 瀏覽器打開地址http://127.0.0.1:61680/,默認用戶名:admin,密碼:password,即可登錄主頁面

4.2 安裝Postman

 4.3 安裝MQTTBox

  Microsoft Store里面就有。 

賬號密碼輸入即可

五、代碼實現(xiàn)

5.1 配置pom.xml

<dependencies>
    <!--導入起步依賴-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
			<artifactId>spring-boot-starter-integration</artifactId>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-stream</artifactId>
			<artifactId>spring-integration-mqtt</artifactId>
			<groupId>org.mybatis.spring.boot</groupId>
			<artifactId>mybatis-spring-boot-starter</artifactId>
			<version>1.3.0</version>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.47</version>
		
</dependencies>

5.2 配置MQTT服務器基本信息

在springBoot配置文件application.yml中配置,添加如下:

#mqtt配置
com:
  mqtt:
    url: tcp://127.0.0.1:61613
    clientId: mqtt_test1234
    topics: topic01,topic02
    username: admin
    password: password
    timeout: 10
    keepalive: 20
    
#指定服務端口 
server:
  port: 8081   #一般沒改過tomcat服務器的端口不用修改

5.3 配置讀取yml文件的類MqttConfiguration

package com.vcarecity.config;
 
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
 * 讀取yml
 */
@Component
@ConfigurationProperties(prefix = "com.mqtt")     //對應yml文件中的com下的mqtt文件配置
public class MqttConfiguration {
	private String url;
	private String clientId;
	private String topics;
    private String username;
    private String password;
    private String timeout;
    private String keepalive;
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	public String getUsername() {
		return username;
	public void setUsername(String username) {
		this.username = username;
	public String getPassword() {
		return password;
	public void setPassword(String password) {
		this.password = password;
	public String getClientId() {
		return clientId;
	public void setClientId(String clientId) {
		this.clientId = clientId;
	public String getTopics() {
		return topics;
	public void setTopics(String topics) {
		this.topics = topics;
	public String getTimeout() {
		return timeout;
	public void setTimeout(String timeout) {
		this.timeout = timeout;
	public String getKeepalive() {
		return keepalive;
	public void setKeepalive(String keepalive) {
		this.keepalive = keepalive;
}

5.4  MQTT生產(chǎn)端的Handler處理

package com.vcarecity.mqtt;
 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import com.vcarecity.config.MqttConfiguration;
/**
 * MQTT生產(chǎn)端
 *
 */
@Configuration
public class MqttOutboundConfiguration {
	@Autowired
	private MqttConfiguration mqttProperties;
	@Bean
	public MessageChannel mqttOutboundChannel() {
		return new DirectChannel();
	}
	public MqttPahoClientFactory mqttClientFactory() {
		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
		String[] array = mqttProperties.getUrl().split(",");
		MqttConnectOptions options = new MqttConnectOptions();
		options.setServerURIs(array);
		options.setUserName(mqttProperties.getUsername());
		options.setPassword(mqttProperties.getPassword().toCharArray());
		// 接受離線消息
		options.setCleanSession(false); //告訴代理客戶端是否要建立持久會話   false為建立持久會話
		factory.setConnectionOptions(options);
		return factory;
	@ServiceActivator(inputChannel = "mqttOutboundChannel")
	public MessageHandler mqttOutbound() {
		MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
				mqttProperties.getClientId()+"outbound", mqttClientFactory());
		messageHandler.setAsync(true);
		return messageHandler;
}

5.5  MQTT消費端的Handler處理 

實現(xiàn)了對inboundtopic中的主題監(jiān)聽,當有消息推送到inboundtopic主題上時可以接受

package com.vcarecity.mqtt;
 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import com.vcarecity.config.MqttConfiguration;
/**
 * MQTT消費端
 *
 */
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {
	@Autowired
	private MqttConfiguration mqttProperties;
	@Bean
	public MessageChannel mqttInputChannel() {
		return new DirectChannel();
	}
	public MqttPahoClientFactory mqttClientFactory() {
		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
		    String[] array = mqttProperties.getUrl().split(",");
			MqttConnectOptions options = new MqttConnectOptions();
			options.setServerURIs(array);
			options.setUserName(mqttProperties.getUsername());
			options.setPassword(mqttProperties.getPassword().toCharArray());
			options.setKeepAliveInterval(2);
			//接受離線消息
			options.setCleanSession(false);
			factory.setConnectionOptions(options);
		return factory;
	//配置client,監(jiān)聽的topic
	public MessageProducer inbound() {
		String[] inboundTopics = mqttProperties.getTopics().split(",");
		MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
				mqttProperties.getClientId()+"_inbound",mqttClientFactory(), inboundTopics);  //對inboundTopics主題進行監(jiān)聽
		adapter.setCompletionTimeout(5000);
		adapter.setQos(1);
		adapter.setConverter(new DefaultPahoMessageConverter());
		adapter.setOutputChannel(mqttInputChannel());
		return adapter;
	//通過通道獲取數(shù)據(jù)
	@ServiceActivator(inputChannel = "mqttInputChannel")  //異步處理
	public MessageHandler handler() {
		return new MessageHandler() {
			@Override
			public void handleMessage(Message<?> message) throws MessagingException {
//				System.out.println("message:"+message);
				System.out.println("----------------------");
				System.out.println("message:"+message.getPayload());
				System.out.println("PacketId:"+message.getHeaders().getId());
				System.out.println("Qos:"+message.getHeaders().get(MqttHeaders.QOS));
				String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
				System.out.println("topic:"+topic);
			}
		};
}

5.6 寫個Controller類來進行訪問控制測試

package com.vcarecity.controller;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.vcarecity.mqtt.MqttGateway;
@RestController
public class MqttPubController {
	
	@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
	@Autowired
	private MqttGateway mqttGateway;
	@RequestMapping("/hello")
	public String hello() {
		return "hello!";
	}
	 @RequestMapping("/sendMqtt")
	    public String sendMqtt(String  sendData){
		 	System.out.println(sendData);
	    	System.out.println("進入sendMqtt-------"+sendData);
	        mqttGateway.sendToMqtt("topic01",(String) sendData);
	        return "Test is OK";
	    }
	@RequestMapping("/sendMqttTopic")
	public String sendMqtt(String  sendData,String topic){
		//System.out.println(sendData+"   "+topic);
		//System.out.println("進入inbound發(fā)送:"+sendData);
		mqttGateway.sendToMqtt(topic,(String) sendData);
		return "Test is OK";
}

六、測試 

直接調(diào)用Controller中的URL進行調(diào)用測試:

6.1測試生產(chǎn)端的Handler

6.2 測試消費端的Handler

使用Postman:

http://localhost:8081/sendMqttTopic?sendData=this is mq55555&topic=topic01

 可以看見測試臺上會出現(xiàn)Message消息,這邊實現(xiàn)的是對inboundtopic中的主題監(jiān)聽實現(xiàn):

剛開始沒有出現(xiàn)上圖效果,查了好久的bug。結(jié)果重啟Apollo就好了

如果我要配置多個client,應該怎么處理呢?這個也簡單

(1)我們只要配置多個通道即可,簡單代碼如下:

//通道2
@Bean
public MessageChannel mqttInputChannelTwo() {
    return new DirectChannel();
}
//配置client2,監(jiān)聽的topic:hell2,hello3
public MessageProducer inbound1() {
    MqttPahoMessageDrivenChannelAdapter adapter =
            new MqttPahoMessageDrivenChannelAdapter(clientId+"_inboundTwo", mqttClientFactory(),
                    "hello2","hello3");
    adapter.setCompletionTimeout(completionTimeout);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    adapter.setOutputChannel(mqttInputChannelTwo());
    return adapter;
 
//通過通道2獲取數(shù)據(jù)
@ServiceActivator(inputChannel = "mqttInputChannelTwo")
public MessageHandler handlerTwo() {
    return new MessageHandler() {
       @Override
			public void handleMessage(Message<?> message) throws MessagingException {
//				System.out.println("message:"+message);
				System.out.println("----------------------");
				System.out.println("message:"+message.getPayload());
				System.out.println("PacketId:"+message.getHeaders().getId());
				System.out.println("Qos:"+message.getHeaders().get(MqttHeaders.QOS));
				String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
				System.out.println("topic:"+topic);
    };

 (2)因為我這個項目用的是讀取yml文件的,所以只需要在yml文件中的topics即可,加自己想要的topic。

topics: topic03,topic04,topic01,topic02

 以上測試都可以使用MQTTBox完成

后言:

資料參考: 

Spring官網(wǎng)對MQTT的支持:MQTT Support (spring.io)

Tackoverflow上面關(guān)于MQTT的資料,需要翻閱墻體:

Google上的MQTT論壇

參考文章:

https://blog.csdn.net/tjvictor/article/details/5223309

https://blog.csdn.net/riemann_/article/details/118686072

到此這篇關(guān)于SpringBoot整合MQTT總結(jié)的文章就介紹到這了,更多相關(guān)SpringBoot整合MQTT內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論