MQTT SpringBoot整合實戰(zhàn)教程
MQTT-SpringBoot
創(chuàng)建簡單 SpringBoot 項目
導(dǎo)入必須依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.study</groupId>
<artifactId>MqttDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBootMqttDemo</name>
<description>SpringBootMqttDemo</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring boot項目web開發(fā)的起步依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- spring boot項目集成消息中間件基礎(chǔ)依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<!-- spring boot項目和mqtt客戶端集成起步依賴 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.3</version>
</dependency>
<!-- lombok依賴 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- fastjson依賴 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.study.mqtt.demo.MqttDemoApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>增加MQTT相關(guān)配置
application.yml
spring:
mqtt:
# mqtt 服務(wù)器地址
url: tcp://192.168.40.128:1883
# 訂閱客戶端ID
subClientId: sub_client_id_1
# 訂閱主題
subTopic: lq/iot/demo/
# 發(fā)布客戶端ID
pubClientId: pub_client_id_1
# 用戶名
username: admin
# 密碼
password: admin123456編寫對應(yīng)Java類
配置類
MqttConfig.java
package com.study.mqtt.demo.domain;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfig {
private String username;
private String password;
private String url;
private String subClientId ;
private String subTopic ;
private String pubClientId ;
}啟動類增加開啟配置
MqttDemoApplication.java
package com.study.mqtt.demo;
import com.study.mqtt.demo.domain.MqttConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
@EnableConfigurationProperties(value = MqttConfig.class)
public class MqttDemoApplication {
public static void main(String[] args) {
SpringApplication.run(MqttDemoApplication.class, args);
}
}創(chuàng)建MQTT連接工廠類
MqttFactory.java
package com.study.mqtt.demo.factory;
import com.study.mqtt.demo.domain.MqttConfig;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@Configuration
public class MqttFactory {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
// 創(chuàng)建客戶端工廠
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
options.setServerURIs(new String[]{mqttConfig.getUrl()});
options.setCleanSession(true);
factory.setConnectionOptions(options);
return factory;
}
}接收消息處理類
ReceiveMsgHandler.java
package com.study.mqtt.demo.handler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class ReceiveMsgHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("接收到消息對象:" + message);
// 消息內(nèi)容
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
Object mqttReceivedTopic = headers.get("mqtt_receivedTopic");
System.out.println("接收的消息主題:" + mqttReceivedTopic);
System.out.println("接收的消息內(nèi)容:" + payload);
}
}接收消息配置類
MqttInboundConfig.java
package com.study.mqtt.demo.inbound;
import com.study.mqtt.demo.domain.MqttConfig;
import com.study.mqtt.demo.factory.MqttFactory;
import com.study.mqtt.demo.handler.ReceiveMsgHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttInboundConfig {
@Autowired
private MqttConfig mqttConfig ;
@Autowired
private ReceiveMsgHandler receiveMsgHandler;
/**
* 配置消息接收通道
* @return
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 配置接收適配器
*/
@Bean
public MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory) {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl() ,
mqttConfig.getSubClientId() ,
mqttPahoClientFactory , mqttConfig.getSubTopic().split(",")) ;
adapter.setConverter(new DefaultPahoMessageConverter());
// 質(zhì)量服務(wù)等級
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter ;
}
/**
* 配置接收消息處理器
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel") // 指定處理消息使用得通道
public MessageHandler messageHandler() {
return this.receiveMsgHandler ;
}
}發(fā)送消息配置類
MqttOutboundConfig.java
package com.study.mqtt.demo.outbound;
import com.study.mqtt.demo.domain.MqttConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttOutboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Autowired
private MqttPahoClientFactory pahoClientFactory ;
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler mqttOutboundMassageHandler() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getUrl() ,
mqttConfig.getPubClientId() , pahoClientFactory ) ;
messageHandler.setAsync(true);
messageHandler.setDefaultQos(0);
messageHandler.setDefaultTopic("default");
return messageHandler ;
}
}發(fā)送消息網(wǎng)關(guān)接口類
MqttGateway.java
package com.study.mqtt.demo.gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
/**
* 發(fā)送mqtt消息
* @param topic 主題
* @param payload 內(nèi)容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 發(fā)送包含qos的消息
* @param topic 主題
* @param qos 對消息處理的幾種機制。
* * 0 發(fā)送成功就算完成,會出現(xiàn)消息丟失
* * 1 增加消息重試機制,消息發(fā)送失敗會重新發(fā)送,會出現(xiàn)重復(fù)消息
* * 2 多了一次去重的動作,確保只有一次消息推給訂閱者。
* @param payload 消息體
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}發(fā)送消息服務(wù)類
MqttMsgSenderService.java
package com.study.mqtt.demo.service;
import com.study.mqtt.demo.gateway.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MqttMsgSenderService {
@Autowired
private MqttGateway mqttGateway;
public void send(String topic, String payload) {
mqttGateway.sendToMqtt(topic, payload);
}
public void send(String topic, int qos, String payload) {
mqttGateway.sendToMqtt(topic, qos, payload);
}
}測試驗證
訂閱消息驗證
啟動項目

發(fā)送消息
- 主題為配置文件中配置的訂閱主題
lq/iot/demo/ - 發(fā)送時間:
2025-05-25 21:29:26:439

訂閱收到消息
- 接收到消息的時間:
Sun May 25 21:29:26 GMT+08:00 2025 - ???????接收到的主題:
lq/iot/demo/ - ???????接收到的內(nèi)容:
{ "msg":"spring boot mqtt demo" }

發(fā)送消息驗證
- 編寫測試類
- 發(fā)送主題:
sb/mqtt/test - ???????發(fā)送內(nèi)容:
hello world !=> 當(dāng)前時間
- 發(fā)送主題:
package com.study.mqtt.demo;
import com.study.mqtt.demo.service.MqttMsgSenderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
@SpringBootTest(classes = MqttDemoApplication.class)
class MqttDemoApplicationTests {
@Autowired
private MqttMsgSenderService mqttMsgSenderService;
@Test
void contextLoads() {
}
@Test
void sendMsg(){
mqttMsgSenderService.send("sb/mqtt/test", "hello world ! => " + new Date());
}
}創(chuàng)建訂閱者
訂閱主題: sb/mqtt/test

運行測試類

訂閱者接收消息
主題:sb/mqtt/test

到此這篇關(guān)于MQTT SpringBoot整合的文章就介紹到這了,更多相關(guān)MQTT SpringBoot整合內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
常用json與javabean互轉(zhuǎn)的方法實現(xiàn)
這篇文章主要介紹了常用json與javabean互轉(zhuǎn)的方法實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
Hibernate連接三種數(shù)據(jù)庫的配置文件
今天小編就為大家分享一篇關(guān)于Hibernate連接三種數(shù)據(jù)庫的配置文件,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03
Springboot為什么加載不上application.yml的配置文件
這篇文章主要介紹了Springboot為什么加載不上application.yml的配置文件,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10

