欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用java?實現(xiàn)mqtt兩種常用方式

 更新時間:2022年11月23日 09:48:23   作者:houxian1103  
在開發(fā)MQTT時有兩種方式一種是使用Paho Java 原生庫來完成,一種是使用spring boot 來完成,這篇文章主要介紹了使用java?實現(xiàn)mqtt兩種方式,需要的朋友可以參考下

前言

在開發(fā)MQTT時有兩種方式一種是使用Paho Java 原生庫來完成,一種是使用spring boot 來完成。

Paho Java 庫實現(xiàn)

Eclipse Paho Java Client (opens new window)是用 Java 編寫的 MQTT 客戶端庫(MQTT Java Client),可用于 JVM 或其他 Java 兼容平臺(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 異步和同步 API

  • 通過 Maven 安裝 Paho Java
<dependency>
  <groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.2</version>
</dependency>
  • Paho Java 使用示例

Java 體系中 Paho Java 是比較穩(wěn)定、廣泛應(yīng)用的 MQTT 客戶端庫,本示例包含 Java 語言的 Paho Java 連接 EMQX Broker,并進(jìn)行消息收發(fā)完整代碼:

package io.emqx;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class App {
    public static void main(String[] args) {
        String subTopic = "testtopic/#";
        String pubTopic = "testtopic/1";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://broker.emqx.io:1883";
        String clientId = "emqx_test";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT 連接選項
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("emqx_test");
            connOpts.setPassword("emqx_test_password".toCharArray());
            // 保留會話
            connOpts.setCleanSession(true);

            // 設(shè)置回調(diào)
            client.setCallback(new PushCallback());

            // 建立連接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            // 訂閱
            client.subscribe(subTopic);

            // 消息發(fā)布所需參數(shù)
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);
            System.out.println("Message published");

            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

回調(diào)消息處理類 OnMessageCallback.java

package io.emqx;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // 連接丟失后,一般在這里面進(jìn)行重連
        System.out.println("連接斷開,可以做重連");
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息會執(zhí)行到這里面
        System.out.println("接收消息主題:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息內(nèi)容:" + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

好的上述就實現(xiàn)了簡單的 MQTT的連接和消息收發(fā)。

spring boot集成mqtt

spring boot 環(huán)境

spring-boot 版本 2.2.2
spring-integration的版本為:5.4.3
Spring Integration提供了入站適配器和出站適配器以支持MQTT協(xié)議。

Maven 依賴:

<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->

 <dependency>
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-integration</artifactId> 
 </dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.4.3</version>
 </dependency>

配置文件 application.yml:

spring:
  mqtt:
    username:
    password:
    url: tcp://ip:port
    clientId: clientId
    topic: default
    completionTimeout: 2000

核心代碼

配置類

@Data
@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfiguration {

    private String username;
    private String password;
    private String url;
    private String clientId;
    private String topic = "TOPIC_DEFAULT";
    private Integer completionTimeout = 2000;

    /**
     * 注冊MQTT客戶端工廠
     * @return
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        //如果設(shè)置為 false,客戶端和服務(wù)器將在客戶端、服務(wù)器和連接重新啟動時保持狀態(tài)。隨著狀態(tài)的保持:
        //  即使客戶端、服務(wù)器或連接重新啟動,消息傳遞也將可靠地滿足指定的 QOS。服務(wù)器將訂閱視為持久的。
        // 如果設(shè)置為 true,客戶端和服務(wù)器將不會在客戶端、服務(wù)器或連接重新啟動時保持狀態(tài)。
        options.setCleanSession(true);
        //該值以秒為單位,必須>0,定義了客戶端等待與 MQTT 服務(wù)器建立網(wǎng)絡(luò)連接的最大時間間隔。
        // 默認(rèn)超時為 30 秒。值 0 禁用超時處理,這意味著客戶端將等待直到網(wǎng)絡(luò)連接成功或失敗。
        options.setConnectionTimeout(0);
        //此值以秒為單位,定義發(fā)送或接收消息之間的最大時間間隔,必須>0
        options.setKeepAliveInterval(90);
        //自動重新連接
        options.setAutomaticReconnect(true);
        options.setUserName(this.getUsername());
        options.setPassword(this.getPassword().toCharArray());
        options.setServerURIs(new String[]{this.getUrl()});

        factory.setConnectionOptions(options);
        return factory;
    }
}
@Slf4j
@AllArgsConstructor
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {

    private MqttConfiguration mqttConfig;
    private MqttPahoClientFactory factory;
    private MqttMessageReceiver mqttMessageReceiver;

    /**
     * 此處可以使用其他消息通道
     * Spring Integration默認(rèn)的消息通道,它允許將消息發(fā)送給一個訂閱者,然后阻礙發(fā)送直到消息被接收。
     *
     * @return
     */
    @Bean
    public MessageChannel mqttInBoundChannel() {
        return new DirectChannel();
    }

    /**
     * 適配器, 兩個topic共用一個adapter
     * 客戶端作為消費(fèi)者,訂閱主題,消費(fèi)消息
     *
     * @param
     * @param
     * @return
     */
    @Bean
    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic());

        adapter.setCompletionTimeout(60000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setRecoveryInterval(10000);
        adapter.setQos(0);
        adapter.setOutputChannel(mqttInBoundChannel());
        return adapter;
    }

    /**
     * mqtt入站消息處理工具,對于指定消息入站通道接收到生產(chǎn)者生產(chǎn)的消息后處理消息的工具。
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInBoundChannel")
    public MessageHandler mqttMessageHandler() {
        return this.mqttMessageReceiver;
    }
}

數(shù)據(jù)接收

@Slf4j
@AllArgsConstructor
@Component
public class MqttMessageReceiver implements MessageHandler {

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        try {
        
            MessageHeaders headers = message.getHeaders();
            //獲取消息Topic
            String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC);
            log.info("[獲取到的消息的topic :]{} ", receivedTopic);
            //獲取消息體
            String payload = (String) message.getPayload();
            log.info("[獲取到的消息的payload :]{} ", payload);
            //todo ....
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
@Slf4j
@AllArgsConstructor
@Configuration
public class MqttOutboundConfiguration {

    private MqttConfiguration mqttConfig;
    private MqttPahoClientFactory factory;



    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                mqttConfig.getClientId()+"-"+System.currentTimeMillis() + System.currentTimeMillis(), factory);

        messageHandler.setDefaultQos(0);
        //開啟異步
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttConfig.getTopic());
        return messageHandler;
    }
}

發(fā)送者

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    /**
     * 發(fā)送mqtt消息
     * @param topic 主題
     * @param payload 內(nèi)容
     * @return void
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 發(fā)送包含qos的消息
     * @param topic 主題
     * @param qos 對消息處理的幾種機(jī)制。
     *          * 0 表示的是訂閱者沒收到消息不會再次發(fā)送,消息會丟失。<br>
     *          * 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息。<br>
     *          * 2 多了一次去重的動作,確保訂閱者收到的消息有一次。
     * @param payload 消息體
     * @return void
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

    /**
     * 發(fā)送包含qos的消息
     * @param topic 主題
     * @param qos 對消息處理的幾種機(jī)制。
     *          * 0 表示的是訂閱者沒收到消息不會再次發(fā)送,消息會丟失。<br>
     *          * 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息。<br>
     *          * 2 多了一次去重的動作,確保訂閱者收到的消息有一次。
     * @param payload 消息體
     * @return void
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}

@Component
@AllArgsConstructor
public class MqttMessageSender {

    private MqttGateway mqttGateway;

    /**
     * 發(fā)送mqtt消息
     * @param topic 主題
     * @param message 內(nèi)容
     * @return void
     */
    public void send(String topic, String message) {
        mqttGateway.sendToMqtt(topic, message);
    }

    /**
     * 發(fā)送包含qos的消息
     * @param topic 主題
     * @param qos 質(zhì)量
     * @param messageBody 消息體
     * @return void
     */
    public void send(String topic, int qos, JSONObject messageBody){
        mqttGateway.sendToMqtt(topic, qos, messageBody.toString());
    }

    /**
     * 發(fā)送包含qos的消息
     * @param topic 主題
     * @param qos 質(zhì)量
     * @param message 消息體
     * @return void
     */
    public void send(String topic, int qos, byte[] message){
        mqttGateway.sendToMqtt(topic, qos, message);
    }
}

總結(jié)

綜上所述上面就是我們經(jīng)常用的到兩種方式,希望對你有所幫助

到此這篇關(guān)于使用java 實現(xiàn)mqtt兩種方式的文章就介紹到這了,更多相關(guān)java 實現(xiàn) mqtt內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring的連接數(shù)據(jù)庫以及JDBC模板(實例講解)

    Spring的連接數(shù)據(jù)庫以及JDBC模板(實例講解)

    下面小編就為大家?guī)硪黄猄pring的連接數(shù)據(jù)庫以及JDBC模板(實例講解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-10-10
  • Java中的functor實現(xiàn)

    Java中的functor實現(xiàn)

    Java中的functor實現(xiàn)...
    2006-12-12
  • Spring?Boot應(yīng)用程序中如何使用Keycloak詳解

    Spring?Boot應(yīng)用程序中如何使用Keycloak詳解

    這篇文章主要為大家介紹了Spring?Boot應(yīng)用程序中如何使用Keycloak詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-05-05
  • 總結(jié)Java調(diào)用Python程序方法

    總結(jié)Java調(diào)用Python程序方法

    這篇文章主要介紹了總結(jié)Java調(diào)用Python程序方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • java實現(xiàn)消息隊列的兩種方式(小結(jié))

    java實現(xiàn)消息隊列的兩種方式(小結(jié))

    本文主要介紹了兩種java實現(xiàn)消息隊列的方式,利用Spring消息模板發(fā)送消息和Apache ActiveMQ官方實例發(fā)送消息,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-12-12
  • JavaGUI常用窗體組件與面板使用詳解

    JavaGUI常用窗體組件與面板使用詳解

    GUI即圖形用戶界面,它是基于圖形的界面,windows就是一個圖形用戶界面的操作系統(tǒng),而DOS是基于命令提示符的操作系統(tǒng),GUI編程就是編出一個圖形用戶界面的軟件,它使用圖形的方式,以菜單、按鈕、表示、圖文框等標(biāo)準(zhǔn)界面元素組成的用戶操作界面
    2023-03-03
  • java使用POI讀取properties文件并寫到Excel的方法

    java使用POI讀取properties文件并寫到Excel的方法

    這篇文章主要介紹了java使用POI讀取properties文件并寫到Excel的方法,涉及java操作properties文件及Excel文件的相關(guān)技巧,需要的朋友可以參考下
    2015-06-06
  • Java中異常傳播的實現(xiàn)

    Java中異常傳播的實現(xiàn)

    在Java中,異常傳播是一個重要的概念,本文主要介紹了Java中異常傳播的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下
    2024-01-01
  • 聊聊Spring MVC JSON數(shù)據(jù)交互的問題

    聊聊Spring MVC JSON數(shù)據(jù)交互的問題

    我們在開發(fā)中后端經(jīng)常需要接受來自于前端傳遞的Json字符串?dāng)?shù)據(jù),怎么把Json字符串轉(zhuǎn)換為Java對象呢?下面小編給大家?guī)砹薙pring MVC JSON數(shù)據(jù)交互的問題,感興趣的朋友一起看看吧
    2021-10-10
  • Java中Random簡介_動力節(jié)點(diǎn)Java學(xué)院整理

    Java中Random簡介_動力節(jié)點(diǎn)Java學(xué)院整理

    本文詳細(xì)給大家介紹了Java中Random簡介相關(guān)知識,非常不錯,具有參考借鑒價值,需要的朋友參考下吧
    2017-06-06

最新評論