欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot整合mqtt的詳細(xì)圖文教程

 更新時(shí)間:2023年02月24日 09:22:09   作者:i小喇叭  
MQTT是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級(jí)”通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布,下面這篇文章主要給大家介紹了關(guān)于springboot整合mqtt的詳細(xì)圖文教程,需要的朋友可以參考下

springboot 整合 mqtt

最近由于iot越來(lái)越火, 物聯(lián)網(wǎng)的需求越來(lái)越多, 那么理所當(dāng)然的使用mqtt的場(chǎng)景也就越來(lái)越多,

接下來(lái)是我使用springboot整合mqtt的過(guò)程, 以及踩過(guò)的一些坑.

mqtt服務(wù)器使用的是 EMQX, 官網(wǎng) : 這里

搭建的時(shí)候如果你使用的是集群 記得開(kāi)放以下端口:

好了, 搭建成功下一步就是我們的java程序要與mqtt連接, 這里有兩種方式(其實(shí)不止兩種)進(jìn)行連接.

一是 直接使用 MQTT Java 客戶端庫(kù),詳情可以查看官方的例子: MQTT Java 客戶端 我就跳過(guò)了

二是使用 spring integration mqtt也是比較推薦的一種,也是我們主講這種.

第一步 添加 maven dependency

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.14</version>
        </dependency>

第二步 添加配置

1 先寫好一些基本配置

mqtt:
 username: test                        # 賬號(hào)
 password: 123456                      # 密碼
 host-url: tcp://127.0.0.1:1883        # mqtt連接tcp地址
 in-client-id: ${random.value}         # 隨機(jī)值,使出入站 client ID 不同
 out-client-id: ${random.value}
 client-id: ${random.int}                   # 客戶端Id,不能相同,采用隨機(jī)數(shù) ${random.value}
 default-topic: test/#,topic/+/+/up         # 默認(rèn)主題
 timeout: 60                                # 超時(shí)時(shí)間
 keepalive: 60                              # 保持連接
 clearSession: true                         # 清除會(huì)話(設(shè)置為false,斷開(kāi)連接,重連后使用原來(lái)的會(huì)話 保留訂閱的主題,能接收離線期間的消息)

2.然后寫一個(gè)對(duì)應(yīng)的類MqttProperties

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * MqttProperties 
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Component
public class MqttProperties {

    /**
     * 用戶名
     */
    @Value("${mqtt.username}")
    private String username;

    /**
     * 密碼
     */
    @Value("${mqtt.password}")
    private String password;

    /**
     * 連接地址
     */
    @Value("${mqtt.host-url}")
    private String hostUrl;

    /**
     * 進(jìn)-客戶Id
     */
    @Value("${mqtt.in-client-id}")
    private String inClientId;

    /**
     * 出-客戶Id
     */
    @Value("${mqtt.out-client-id}")
    private String outClientId;

    /**
     * 客戶Id
     */
    @Value("${mqtt.client-id}")
    private String clientId;

    /**
     * 默認(rèn)連接話題
     */
    @Value("${mqtt.default-topic}")
    private String defaultTopic;

    /**
     * 超時(shí)時(shí)間
     */
    @Value("${mqtt.timeout}")
    private int timeout;

    /**
     * 保持連接數(shù)
     */
    @Value("${mqtt.keepalive}")
    private int keepalive;

    /**是否清除session*/
    @Value("${mqtt.clearSession}")
    private boolean clearSession;

	// ...getter and setter

}

接下來(lái)就是配置一些亂七八糟的東西, 這里有很多概念性的東西 比如 管道channel, 適配器 adapter, 入站Inbound, 出站Outbound,等等等等, 看起來(lái)是非常頭痛的

好吧,那就一個(gè)一個(gè)來(lái),

首先連接mqtt需要一個(gè)客戶端, 那么我們就開(kāi)一個(gè)客戶端工廠, 這里可以產(chǎn)生很多很多的客戶端

    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

然后再搞兩根管子(channel),一個(gè)出站,一個(gè)入站

    //出站消息管道,
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }

    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }

為了使這些管子能流通 就需要一個(gè)適配器(adapter)

    // Mqtt 管道適配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }

然后定義消息生產(chǎn)者

    // 消息生產(chǎn)者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投遞的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);
        return adapter;
    }

那我們收到消息去哪里處理呢,答案是這里:

    @Bean
    //使用ServiceActivator 指定接收消息的管道為 mqttInboundChannel,投遞到mqttInboundChannel管道中的消息會(huì)被該方法接收并執(zhí)行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
    	// 這個(gè) mqttMessageHandle 其實(shí)就是一個(gè) MessageHandler 的實(shí)現(xiàn)類(這個(gè)類我放下面)
        return mqttMessageHandle;
		// 你也可以這樣寫
//        return new MessageHandler() {
//            @Override
//            public void handleMessage(Message<?> message) throws MessagingException {
//                // do something
//            }
//        };
    

到這里我們其實(shí)已經(jīng)可以接受到來(lái)自mqtt的消息了

接下來(lái)配置向mqtt發(fā)送消息

配置 出站處理器

    // 出站處理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return handler;
    }

這個(gè) 出站處理器 在我看來(lái)就是讓別人 (MqttPahoMessageHandler)處理了, 我就不處理了,我只管我要發(fā)送什么,至于怎么發(fā)送,由MqttPahoMessageHandler來(lái)完成

接下來(lái)我們定義一個(gè)接口即可

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * MqttGateway
 *
 * @author hengzi
 * @date 2022/8/23
 */

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data);
}

我們直接調(diào)用這個(gè)接口就可以向mqtt 發(fā)送數(shù)據(jù)

到目前為止,整個(gè)配置文件長(zhǎng)這樣:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
 * MqttConfig
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Configuration
public class MqttConfig {


    /**
     *  以下屬性將在配置文件中讀取
     **/
    @Autowired
    private MqttProperties mqttProperties;


    //Mqtt 客戶端工廠
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // Mqtt 管道適配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }


    // 消息生產(chǎn)者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投遞的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);
        return adapter;
    }


    // 出站處理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return handler;
    }

    @Bean
    //使用ServiceActivator 指定接收消息的管道為 mqttInboundChannel,投遞到mqttInboundChannel管道中的消息會(huì)被該方法接收并執(zhí)行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
        return mqttMessageHandle;
    }

    //出站消息管道,
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }


    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
}

處理消息的 MqttMessageHandle

@Component
public class MqttMessageHandle implements MessageHandler {
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
     
    }
}

在進(jìn)一步了解之后,發(fā)現(xiàn)可以優(yōu)化的地方,比如channel 的類型是有很多種的, 這里使用的DirectChannel,是Spring Integration默認(rèn)的消息通道,它將消息發(fā)送給為一個(gè)訂閱者,然后阻礙發(fā)送直到消息被接收,傳輸方式都是同步的方式,都是由一個(gè)線程來(lái)運(yùn)行的.

這里我們可以將入站channel改成 ExecutorChannel一個(gè)可以使用多線程的channel

    @Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可創(chuàng)建的線程數(shù)
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心線程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 隊(duì)列最大長(zhǎng)度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 線程池維護(hù)線程所允許的空閑時(shí)間
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 線程池對(duì)拒絕任務(wù)(無(wú)線程可用)的處理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        // 用線程池
        return new ExecutorChannel(mqttThreadPoolTaskExecutor());
    }

到這里其實(shí)可以運(yùn)行了.

但是這樣配置其實(shí)還是有點(diǎn)多, 有點(diǎn)亂, 于是我查找官網(wǎng), f發(fā)現(xiàn)一種更簡(jiǎn)單的配置方法 叫 Java DSL, 官網(wǎng)連接: Configuring with the Java DSL

我們參考官網(wǎng),稍微改一下,使用 DSL的方式進(jìn)行配置:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * MqttConfigV2
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Configuration
public class MqttConfigV2 {

    @Autowired
    private MqttProperties mqttProperties;

    @Autowired
    private MqttMessageHandle mqttMessageHandle;


    //Mqtt 客戶端工廠 所有客戶端從這里產(chǎn)生
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // Mqtt 管道適配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }

    // 消息生產(chǎn)者 (接收,處理來(lái)自mqtt的消息)
    @Bean
    public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);
        return IntegrationFlows.from( adapter)
                .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
                .handle(mqttMessageHandle)
                .get();
    }

    @Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可創(chuàng)建的線程數(shù)
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心線程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 隊(duì)列最大長(zhǎng)度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 線程池維護(hù)線程所允許的空閑時(shí)間
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 線程池對(duì)拒絕任務(wù)(無(wú)線程可用)的處理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    // 出站處理器 (向 mqtt 發(fā)送消息)
    @Bean
    public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {

        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();
    }

}

這樣看起來(lái)真的簡(jiǎn)單多了, 頭也沒(méi)那么大了, 我要是早知道多好.

好了以上就是配置相關(guān)的, 到這里其實(shí)是已經(jīng)完成springboot 與 mqtt 的整合了.

但其實(shí)我一直有個(gè)想法, 就是我們接收的消息 都是在 handleMessage這個(gè)方法里面執(zhí)行的,

	@Override
    public void handleMessage(Message<?> message) throws MessagingException {
     			
    }

所以我就有了一個(gè)想法, 能不能根據(jù) 我訂閱的主題,在不同的方法執(zhí)行, 對(duì)于這個(gè)問(wèn)題,其實(shí)你用if ... else ...也能實(shí)現(xiàn), 但很明顯,如果我訂閱的主題很多的話, 那寫起來(lái)就很頭痛了.

對(duì)于這個(gè)問(wèn)題,有兩種思路, 一個(gè)是添加Spring Integration的路由 router,根據(jù)不同topic路由到不同的channel, 這個(gè)我也知道能不能實(shí)現(xiàn), 我這里就不討論了.

第二種是, 我也不知道名字改如何叫, 我是參考了 spring@Controller的設(shè)計(jì), 暫且叫他注解模式.

眾所周知,我們的接口都是在類上加 @Controller這個(gè)注解, 就代表這個(gè)類是 http 接口, 再在方法加上 @RequestMapping就能實(shí)現(xiàn)不同的 url 調(diào)用不同的方法.

參數(shù)這個(gè)設(shè)計(jì) 我們?cè)陬惿厦婕?@MqttService就代表這個(gè)類是專門處理mqtt消息的服務(wù)類
同時(shí) 在這個(gè)類的方法上 加上 @MqttTopic就代表 這個(gè)主題由這個(gè)方法處理.

OK, 理論有了,接下來(lái)就是 實(shí)踐.

先定義 兩個(gè)注解

import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;

import java.lang.annotation.*;

@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {

    @AliasFor(
            annotation = Component.class
    )
    String value() default "";
}

加上 @Component注解 spring就會(huì)掃描, 并注冊(cè)到IOC容器里

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {

    /**
     * 主題名字
     */
    String value() default "";

}

參考 @RequestMapping我們使用起來(lái)應(yīng)該是這樣的:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

/**
 * MqttTopicHandle
 *
 * @author hengzi
 * @date 2022/8/24
 */
@MqttService
public class MqttTopicHandle {

    public static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class);

	// 這里的 # 號(hào)是通配符
    @MqttTopic("test/#")
    public void test(Message<?> message){
        log.info("test="+message.getPayload());
    }
	
	// 這里的 + 號(hào)是通配符
    @MqttTopic("topic/+/+/up")
    public void up(Message<?> message){
        log.info("up="+message.getPayload());
    }

	// 注意 你必須先訂閱
    @MqttTopic("topic/1/2/down")
    public void down(Message<?> message){
        log.info("down="+message.getPayload());
    }
}

OK 接下來(lái)就是實(shí)現(xiàn)這樣的使用

分析 :

當(dāng)我們收到消息時(shí), 我們從IOC容器中 找到所有 帶 @MqttService注解的類

然后 遍歷這些類, 找到帶有 @MqttTopic的方法

接著 把 @MqttTopic注解的的值 與 接受到的topic 進(jìn)行對(duì)比

如果一致則執(zhí)行這個(gè)方法

廢話少說(shuō), 上代碼

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

/**
 * MessageHandleService
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Component
public class MqttMessageHandle implements MessageHandler {

    public static final Logger log = LoggerFactory.getLogger(MqttMessageHandle.class);

    // 包含 @MqttService注解 的類(Component)
    public static Map<String, Object> mqttServices;


    /**
     * 所有mqtt到達(dá)的消息都會(huì)在這里處理
     * 要注意這個(gè)方法是在線程池里面運(yùn)行的
     * @param message message
     */
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        getMqttTopicService(message);
    }

    public Map<String, Object> getMqttServices(){
        if(mqttServices==null){
            mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);
        }
        return mqttServices;
    }

    public void getMqttTopicService(Message<?> message){
        // 在這里 我們根據(jù)不同的 主題 分發(fā)不同的消息
        String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class);
        if(receivedTopic==null || "".equals(receivedTopic)){
            return;
        }
        for(Map.Entry<String, Object> entry : getMqttServices().entrySet()){
        	// 把所有帶有 @MqttService 的類遍歷
            Class<?> clazz = entry.getValue().getClass();
            // 獲取他所有方法
            Method[] methods = clazz.getDeclaredMethods();
            for ( Method method: methods ){
                if (method.isAnnotationPresent(MqttTopic.class)){
                	// 如果這個(gè)方法有 這個(gè)注解
                    MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);
                    if(isMatch(receivedTopic,handleTopic.value())){
                    	// 并且 這個(gè) topic 匹配成功
                        try {
                            method.invoke(SpringUtils.getBean(clazz),message);
                            return;
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                            log.error("代理炸了");
                        } catch (InvocationTargetException e) {
                            log.error("執(zhí)行 {} 方法出現(xiàn)錯(cuò)誤",handleTopic.value(),e);
                        }
                    }
                }
            }
        }
    }


    /**
     * mqtt 訂閱的主題與我實(shí)際的主題是否匹配
     * @param topic 是實(shí)際的主題
     * @param pattern 是我訂閱的主題 可以是通配符模式
     * @return 是否匹配
     */
    public static boolean isMatch(String topic, String pattern){

        if((topic==null) || (pattern==null) ){
            return false;
        }

        if(topic.equals(pattern)){
            // 完全相等是肯定匹配的
            return true;
        }

        if("#".equals(pattern)){
            // # 號(hào)代表所有主題  肯定匹配的
            return true;
        }
        String[] splitTopic = topic.split("/");
        String[] splitPattern = pattern.split("/");

        boolean match = true;

        // 如果包含 # 則只需要判斷 # 前面的
        for (int i = 0; i < splitPattern.length; i++) {
            if(!"#".equals(splitPattern[i])){
                // 不是# 號(hào) 正常判斷
                if(i>=splitTopic.length){
                    // 此時(shí)長(zhǎng)度不相等 不匹配
                    match = false;
                    break;
                }
                if(!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])){
                    // 不相等 且不等于 +
                    match = false;
                    break;
                }
            }
            else {
                // 是# 號(hào)  肯定匹配的
                break;
            }
        }

        return match;
    }

}

工具類 SpringUtils

import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * spring工具類 方便在非spring管理環(huán)境中獲取bean
 * 
 */
@Component
public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware 
{
    /** Spring應(yīng)用上下文環(huán)境 */
    private static ConfigurableListableBeanFactory beanFactory;

    private static ApplicationContext applicationContext;


    public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{

        return beanFactory.getBeansWithAnnotation(clsName);
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException 
    {
        SpringUtils.beanFactory = beanFactory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 
    {
        SpringUtils.applicationContext = applicationContext;
    }

    /**
     * 獲取對(duì)象
     *
     * @param name
     * @return Object 一個(gè)以所給名字注冊(cè)的bean的實(shí)例
     * @throws org.springframework.beans.BeansException
     *
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException
    {
        return (T) beanFactory.getBean(name);
    }

    /**
     * 獲取類型為requiredType的對(duì)象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     *
     */
    public static <T> T getBean(Class<T> clz) throws BeansException
    {
        T result = (T) beanFactory.getBean(clz);
        return result;
    }

    /**
     * 如果BeanFactory包含一個(gè)與所給名稱匹配的bean定義,則返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name)
    {
        return beanFactory.containsBean(name);
    }

    /**
     * 判斷以給定名字注冊(cè)的bean定義是一個(gè)singleton還是一個(gè)prototype。 如果與給定名字相應(yīng)的bean定義沒(méi)有被找到,將會(huì)拋出一個(gè)異常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注冊(cè)對(duì)象的類型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getType(name);
    }

    /**
     * 如果給定的bean名字在bean定義中有別名,則返回這些別名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getAliases(name);
    }

    /**
     * 獲取aop代理對(duì)象
     * 
     * @param invoker
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T getAopProxy(T invoker)
    {
        return (T) AopContext.currentProxy();
    }

    /**
     * 獲取當(dāng)前的環(huán)境配置,無(wú)配置返回null
     *
     * @return 當(dāng)前的環(huán)境配置
     */
    public static String[] getActiveProfiles()
    {
        return applicationContext.getEnvironment().getActiveProfiles();
    }

}

OK, 大功告成. 終于舒服了, 終于不用寫if...else...了, 個(gè)人感覺(jué)這樣處理起來(lái)會(huì)更加優(yōu)雅. 寫代碼最重要是什么, 是優(yōu)雅~

以上!

參考文章:

附:

動(dòng)態(tài)添加主題方式:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;

import java.util.Arrays;

/**
 * MqttService
 *
 * @author hengzi
 * @date 2022/8/25
 */
@Service
public class MqttService {

    @Autowired
    private MqttPahoMessageDrivenChannelAdapter adapter;


    public void addTopic(String topic) {
        addTopic(topic, 1);
    }

    public void addTopic(String topic,int qos) {
        String[] topics = adapter.getTopic();
        if(!Arrays.asList(topics).contains(topic)){
            adapter.addTopic(topic,qos);
        }
    }

    public void removeTopic(String topic) {
        adapter.removeTopic(topic);
    }

}

直接調(diào)用就行

總結(jié)

到此這篇關(guān)于springboot整合mqtt的文章就介紹到這了,更多相關(guān)springboot整合mqtt內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringAop源碼及調(diào)用過(guò)程概述

    SpringAop源碼及調(diào)用過(guò)程概述

    這篇文章主要介紹了SpringAop源碼及調(diào)用過(guò)程概述,Spring AOP(面向切面編程)是Spring框架的一個(gè)重要特性,它提供了一種在程序運(yùn)行期間動(dòng)態(tài)地將額外的行為織入到代碼中的方式,需要的朋友可以參考下
    2023-10-10
  • Win11系統(tǒng)下載安裝java的詳細(xì)過(guò)程

    Win11系統(tǒng)下載安裝java的詳細(xì)過(guò)程

    這篇文章主要介紹了Win11如何下載安裝java,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-05-05
  • 帶你重新認(rèn)識(shí)MyBatis的foreach

    帶你重新認(rèn)識(shí)MyBatis的foreach

    這篇文章主要介紹了重新認(rèn)識(shí)MyBatis的foreach,本文提出了一種簡(jiǎn)化<foreach>寫法的設(shè)想,更重要的是通過(guò)解決空集時(shí)生成的SQL語(yǔ)法問(wèn)題,更深刻地理解MyBatis的foreach的生成機(jī)制,需要的朋友可以參考下
    2022-11-11
  • SpringBoot集成Hadoop對(duì)HDFS的文件操作方法

    SpringBoot集成Hadoop對(duì)HDFS的文件操作方法

    這篇文章主要介紹了SpringBoot集成Hadoop對(duì)HDFS的文件操作方法,本文給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧
    2024-07-07
  • springboot結(jié)合redis實(shí)現(xiàn)搜索欄熱搜功能及文字過(guò)濾

    springboot結(jié)合redis實(shí)現(xiàn)搜索欄熱搜功能及文字過(guò)濾

    本文主要介紹了springboot結(jié)合redis實(shí)現(xiàn)搜索欄熱搜功能及文字過(guò)濾,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • spring mvc利用ajax向controller傳遞對(duì)象的方法示例

    spring mvc利用ajax向controller傳遞對(duì)象的方法示例

    這篇文章主要給大家介紹了關(guān)于spring mvc利用ajax向controller傳遞對(duì)象的相關(guān)資料,文中通過(guò)示例代碼將步驟介紹的非常詳細(xì),對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)跟著小編一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-07-07
  • 淺談Maven resrouce下filtering作用

    淺談Maven resrouce下filtering作用

    Filtering是Maven Resources Plugin的一個(gè)功能,本文主要介紹了淺談Maven resrouce下filtering作用,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-03-03
  • Java如何實(shí)現(xiàn)支付寶電腦支付基于servlet版本

    Java如何實(shí)現(xiàn)支付寶電腦支付基于servlet版本

    這篇文章主要介紹了Java如何實(shí)現(xiàn)支付寶電腦支付基于servlet版本,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • CountDownLatch和Atomic原子操作類源碼解析

    CountDownLatch和Atomic原子操作類源碼解析

    這篇文章主要為大家介紹了CountDownLatch和Atomic原子操作類的源碼解析以及理解應(yīng)用,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步
    2022-03-03
  • java時(shí)間日期使用與查詢代碼詳解

    java時(shí)間日期使用與查詢代碼詳解

    這篇文章主要介紹了java時(shí)間日期使用與查詢代碼詳解,具有一定借鑒價(jià)值,需要的朋友可以參考下。
    2017-11-11

最新評(píng)論