欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用java?實(shí)現(xiàn)mqtt兩種常用方式

 更新時(shí)間:2022年11月23日 09:48:23   作者:houxian1103  
在開(kāi)發(fā)MQTT時(shí)有兩種方式一種是使用Paho Java 原生庫(kù)來(lái)完成,一種是使用spring boot 來(lái)完成,這篇文章主要介紹了使用java?實(shí)現(xiàn)mqtt兩種方式,需要的朋友可以參考下

前言

在開(kāi)發(fā)MQTT時(shí)有兩種方式一種是使用Paho Java 原生庫(kù)來(lái)完成,一種是使用spring boot 來(lái)完成。

Paho Java 庫(kù)實(shí)現(xiàn)

Eclipse Paho Java Client (opens new window)是用 Java 編寫的 MQTT 客戶端庫(kù)(MQTT Java Client),可用于 JVM 或其他 Java 兼容平臺(tái)(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 異步和同步 API

  • 通過(guò) Maven 安裝 Paho Java
<dependency>
  <groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.2</version>
</dependency>
  • Paho Java 使用示例

Java 體系中 Paho Java 是比較穩(wěn)定、廣泛應(yīng)用的 MQTT 客戶端庫(kù),本示例包含 Java 語(yǔ)言的 Paho Java 連接 EMQX Broker,并進(jìn)行消息收發(fā)完整代碼:

package io.emqx;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class App {
    public static void main(String[] args) {
        String subTopic = "testtopic/#";
        String pubTopic = "testtopic/1";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://broker.emqx.io:1883";
        String clientId = "emqx_test";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT 連接選項(xiàng)
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("emqx_test");
            connOpts.setPassword("emqx_test_password".toCharArray());
            // 保留會(huì)話
            connOpts.setCleanSession(true);

            // 設(shè)置回調(diào)
            client.setCallback(new PushCallback());

            // 建立連接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            // 訂閱
            client.subscribe(subTopic);

            // 消息發(fā)布所需參數(shù)
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);
            System.out.println("Message published");

            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

回調(diào)消息處理類 OnMessageCallback.java

package io.emqx;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // 連接丟失后,一般在這里面進(jìn)行重連
        System.out.println("連接斷開(kāi),可以做重連");
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息會(huì)執(zhí)行到這里面
        System.out.println("接收消息主題:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息內(nèi)容:" + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

好的上述就實(shí)現(xiàn)了簡(jiǎn)單的 MQTT的連接和消息收發(fā)。

spring boot集成mqtt

spring boot 環(huán)境

spring-boot 版本 2.2.2
spring-integration的版本為:5.4.3
Spring Integration提供了入站適配器和出站適配器以支持MQTT協(xié)議。

Maven 依賴:

<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->

 <dependency>
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-integration</artifactId> 
 </dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.4.3</version>
 </dependency>

配置文件 application.yml:

spring:
  mqtt:
    username:
    password:
    url: tcp://ip:port
    clientId: clientId
    topic: default
    completionTimeout: 2000

核心代碼

配置類

@Data
@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfiguration {

    private String username;
    private String password;
    private String url;
    private String clientId;
    private String topic = "TOPIC_DEFAULT";
    private Integer completionTimeout = 2000;

    /**
     * 注冊(cè)MQTT客戶端工廠
     * @return
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        //如果設(shè)置為 false,客戶端和服務(wù)器將在客戶端、服務(wù)器和連接重新啟動(dòng)時(shí)保持狀態(tài)。隨著狀態(tài)的保持:
        //  即使客戶端、服務(wù)器或連接重新啟動(dòng),消息傳遞也將可靠地滿足指定的 QOS。服務(wù)器將訂閱視為持久的。
        // 如果設(shè)置為 true,客戶端和服務(wù)器將不會(huì)在客戶端、服務(wù)器或連接重新啟動(dòng)時(shí)保持狀態(tài)。
        options.setCleanSession(true);
        //該值以秒為單位,必須>0,定義了客戶端等待與 MQTT 服務(wù)器建立網(wǎng)絡(luò)連接的最大時(shí)間間隔。
        // 默認(rèn)超時(shí)為 30 秒。值 0 禁用超時(shí)處理,這意味著客戶端將等待直到網(wǎng)絡(luò)連接成功或失敗。
        options.setConnectionTimeout(0);
        //此值以秒為單位,定義發(fā)送或接收消息之間的最大時(shí)間間隔,必須>0
        options.setKeepAliveInterval(90);
        //自動(dòng)重新連接
        options.setAutomaticReconnect(true);
        options.setUserName(this.getUsername());
        options.setPassword(this.getPassword().toCharArray());
        options.setServerURIs(new String[]{this.getUrl()});

        factory.setConnectionOptions(options);
        return factory;
    }
}
@Slf4j
@AllArgsConstructor
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {

    private MqttConfiguration mqttConfig;
    private MqttPahoClientFactory factory;
    private MqttMessageReceiver mqttMessageReceiver;

    /**
     * 此處可以使用其他消息通道
     * Spring Integration默認(rèn)的消息通道,它允許將消息發(fā)送給一個(gè)訂閱者,然后阻礙發(fā)送直到消息被接收。
     *
     * @return
     */
    @Bean
    public MessageChannel mqttInBoundChannel() {
        return new DirectChannel();
    }

    /**
     * 適配器, 兩個(gè)topic共用一個(gè)adapter
     * 客戶端作為消費(fèi)者,訂閱主題,消費(fèi)消息
     *
     * @param
     * @param
     * @return
     */
    @Bean
    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic());

        adapter.setCompletionTimeout(60000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setRecoveryInterval(10000);
        adapter.setQos(0);
        adapter.setOutputChannel(mqttInBoundChannel());
        return adapter;
    }

    /**
     * mqtt入站消息處理工具,對(duì)于指定消息入站通道接收到生產(chǎn)者生產(chǎn)的消息后處理消息的工具。
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInBoundChannel")
    public MessageHandler mqttMessageHandler() {
        return this.mqttMessageReceiver;
    }
}

數(shù)據(jù)接收

@Slf4j
@AllArgsConstructor
@Component
public class MqttMessageReceiver implements MessageHandler {

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        try {
        
            MessageHeaders headers = message.getHeaders();
            //獲取消息Topic
            String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC);
            log.info("[獲取到的消息的topic :]{} ", receivedTopic);
            //獲取消息體
            String payload = (String) message.getPayload();
            log.info("[獲取到的消息的payload :]{} ", payload);
            //todo ....
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
@Slf4j
@AllArgsConstructor
@Configuration
public class MqttOutboundConfiguration {

    private MqttConfiguration mqttConfig;
    private MqttPahoClientFactory factory;



    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                mqttConfig.getClientId()+"-"+System.currentTimeMillis() + System.currentTimeMillis(), factory);

        messageHandler.setDefaultQos(0);
        //開(kāi)啟異步
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttConfig.getTopic());
        return messageHandler;
    }
}

發(fā)送者

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    /**
     * 發(fā)送mqtt消息
     * @param topic 主題
     * @param payload 內(nèi)容
     * @return void
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 發(fā)送包含qos的消息
     * @param topic 主題
     * @param qos 對(duì)消息處理的幾種機(jī)制。
     *          * 0 表示的是訂閱者沒(méi)收到消息不會(huì)再次發(fā)送,消息會(huì)丟失。<br>
     *          * 1 表示的是會(huì)嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息。<br>
     *          * 2 多了一次去重的動(dòng)作,確保訂閱者收到的消息有一次。
     * @param payload 消息體
     * @return void
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

    /**
     * 發(fā)送包含qos的消息
     * @param topic 主題
     * @param qos 對(duì)消息處理的幾種機(jī)制。
     *          * 0 表示的是訂閱者沒(méi)收到消息不會(huì)再次發(fā)送,消息會(huì)丟失。<br>
     *          * 1 表示的是會(huì)嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息。<br>
     *          * 2 多了一次去重的動(dòng)作,確保訂閱者收到的消息有一次。
     * @param payload 消息體
     * @return void
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}

@Component
@AllArgsConstructor
public class MqttMessageSender {

    private MqttGateway mqttGateway;

    /**
     * 發(fā)送mqtt消息
     * @param topic 主題
     * @param message 內(nèi)容
     * @return void
     */
    public void send(String topic, String message) {
        mqttGateway.sendToMqtt(topic, message);
    }

    /**
     * 發(fā)送包含qos的消息
     * @param topic 主題
     * @param qos 質(zhì)量
     * @param messageBody 消息體
     * @return void
     */
    public void send(String topic, int qos, JSONObject messageBody){
        mqttGateway.sendToMqtt(topic, qos, messageBody.toString());
    }

    /**
     * 發(fā)送包含qos的消息
     * @param topic 主題
     * @param qos 質(zhì)量
     * @param message 消息體
     * @return void
     */
    public void send(String topic, int qos, byte[] message){
        mqttGateway.sendToMqtt(topic, qos, message);
    }
}

總結(jié)

綜上所述上面就是我們經(jīng)常用的到兩種方式,希望對(duì)你有所幫助

到此這篇關(guān)于使用java 實(shí)現(xiàn)mqtt兩種方式的文章就介紹到這了,更多相關(guān)java 實(shí)現(xiàn) mqtt內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot讀取外部配置文件的方法

    SpringBoot讀取外部配置文件的方法

    這篇文章主要介紹了SpringBoot讀取外部配置文件的方法,以端口配置為例,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-02-02
  • 手把手帶你實(shí)現(xiàn)第一個(gè)Mybatis程序

    手把手帶你實(shí)現(xiàn)第一個(gè)Mybatis程序

    這篇文章主要介紹了mybatis實(shí)現(xiàn)過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2021-07-07
  • Java導(dǎo)出網(wǎng)頁(yè)表格Excel過(guò)程詳解

    Java導(dǎo)出網(wǎng)頁(yè)表格Excel過(guò)程詳解

    這篇文章主要介紹了Java導(dǎo)出網(wǎng)頁(yè)表格Excel過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-07-07
  • MyBatis-Plus非表字段的三種處理方法小結(jié)

    MyBatis-Plus非表字段的三種處理方法小結(jié)

    這篇文章主要介紹了MyBatis-Plus非表字段的三種處理方法小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • 詳解如何全注解方式構(gòu)建SpringMVC項(xiàng)目

    詳解如何全注解方式構(gòu)建SpringMVC項(xiàng)目

    這篇文章主要介紹了詳解如何全注解方式構(gòu)建SpringMVC項(xiàng)目,利用Eclipse構(gòu)建SpringMVC項(xiàng)目,非常具有實(shí)用價(jià)值,需要的朋友可以參考下
    2018-10-10
  • Java畢業(yè)設(shè)計(jì)實(shí)戰(zhàn)項(xiàng)目之倉(cāng)庫(kù)管理系統(tǒng)的實(shí)現(xiàn)流程

    Java畢業(yè)設(shè)計(jì)實(shí)戰(zhàn)項(xiàng)目之倉(cāng)庫(kù)管理系統(tǒng)的實(shí)現(xiàn)流程

    這是一個(gè)使用了java+SSM+Maven+Bootstrap+mysql開(kāi)發(fā)的倉(cāng)庫(kù)管理系統(tǒng),是一個(gè)畢業(yè)設(shè)計(jì)的實(shí)戰(zhàn)練習(xí),具有一個(gè)倉(cāng)庫(kù)管理系統(tǒng)該有的所有功能,感興趣的朋友快來(lái)看看吧
    2022-01-01
  • SpringBoot整合Shiro的環(huán)境搭建教程

    SpringBoot整合Shiro的環(huán)境搭建教程

    這篇文章主要為大家詳細(xì)介紹了SpringBoot整合Shiro的環(huán)境搭建教程,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,感興趣的小伙伴可以了解一下
    2022-12-12
  • Jenkins+Docker+Gitee+SpringBoot自動(dòng)化部署

    Jenkins+Docker+Gitee+SpringBoot自動(dòng)化部署

    本文主要介紹了Jenkins+Docker+Gitee+SpringBoot自動(dòng)化部署,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下


    2022-03-03
  • mybatis-plus分頁(yè)查詢的實(shí)現(xiàn)實(shí)例

    mybatis-plus分頁(yè)查詢的實(shí)現(xiàn)實(shí)例

    頁(yè)查詢是一項(xiàng)常用的數(shù)據(jù)庫(kù)查詢方法,本文主要介紹了mybatis-plus分頁(yè)查詢的實(shí)現(xiàn)實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2024-06-06
  • SpringBoot權(quán)限認(rèn)證Sa-Token的使用總結(jié)

    SpringBoot權(quán)限認(rèn)證Sa-Token的使用總結(jié)

    Sa-Token是一款輕量級(jí)Java權(quán)限認(rèn)證框架,適用于快速搭建權(quán)限系統(tǒng),它提供了豐富的功能,包括登錄認(rèn)證、權(quán)限驗(yàn)證、角色驗(yàn)證、Session會(huì)話管理等,并且具有良好的社區(qū)支持和文檔資源,下面重點(diǎn)給大家介紹SpringBoot權(quán)限認(rèn)證Sa-Token的使用,感興趣的朋友一起看看吧
    2025-02-02

最新評(píng)論