欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java代碼mqtt接收發(fā)送消息方式

 更新時間:2023年09月28日 10:33:10   作者:其妙的太空人  
這篇文章主要介紹了java代碼mqtt接收發(fā)送消息方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

java代碼mqtt接收發(fā)送消息

mqtt消息第一用到不是太熟悉所以寫一篇文章鞏固一下。

前提是你已經(jīng)把mqtt已經(jīng)安裝好,并且啟動好了。

首先我們需要兩部分代碼。

所需依賴

         <!-- mqtt -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

連接mqtt部分的代碼塊,因為我不需要發(fā)送消息所以把發(fā)送消息給注釋掉了。

package mqttclient.util;
import lombok.extern.slf4j.Slf4j;
import mqttclient.callback.MqttMessageCallback2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Objects;
@Component
@Slf4j
public class MqttClientUtil2 {
    private String username;
    private String password;
    @Value("tcp://127.0.0.1:1883")//這個是安裝mqtt的ip以及端口,1883是mqtt默認端口
    private String host;
    @Value("CYT")//這個隨便寫但是是唯一的。
    private String clientId;
    @Value("cyt/#")這個是mqtt發(fā)送消息的咱們要訂閱的topic,cyt/#代表以cyt/開始的所有topic都接收
    private String topic;
    @Value("${mqtt.connection.timeout}")//IOT_MQTT_Yield會block住timeout的時間去嘗試接收數(shù)據(jù),直到timeout才會退出??梢詫懺谶@里也可以寫在yml配置文件中
    private int timeOut;
    @Value("${mqtt.keep.alive.interval}")
    private int interval;
    @Autowired
    private MqttMessageCallback2 mqttMessageCallback2;
    private MqttClient mqttClient;
    private MqttConnectOptions mqttConnectOptions;
    @PostConstruct
    private void init(){
        connect(host, clientId,topic);
    }
    /**
     * 鏈接mqtt
     * @param host
     * @param clientId
     */
    private void connect(String host,String clientId,String topic){
        try{
            mqttClient = new MqttClient(host,clientId,new MemoryPersistence());
            mqttConnectOptions = getMqttConnectOptions();
            //設(shè)置回調(diào)函數(shù)
            mqttClient.setCallback(mqttMessageCallback2);
            //鏈接mqtt
            mqttClient.connect(mqttConnectOptions);
            //訂閱消息
            mqttClient.subscribe(topic,2);
        }catch (Exception e){
            log.error("mqtt服務(wù)鏈接異常!");
            e.printStackTrace();
        }
    }
    /**
     * 設(shè)置鏈接對象信息
     * setCleanSession  true 斷開鏈接即清楚會話  false 保留鏈接信息 離線還會繼續(xù)發(fā)消息
     * @return
     */
    private MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        /*mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());*/
        mqttConnectOptions.setServerURIs(new String[]{host});
        mqttConnectOptions.setKeepAliveInterval(interval);
        mqttConnectOptions.setConnectionTimeout(timeOut);
        mqttConnectOptions.setCleanSession(true);
        return mqttConnectOptions;
    }
    /**
     *mqtt鏈接狀態(tài)
     * @return
     */
    private boolean isConnect(){
        if(Objects.isNull(this.mqttClient)){
            return false;
        }
        return mqttClient.isConnected();
    }
    /**
     * 設(shè)置重連
     * @throws Exception
     */
    private void reConnect() throws Exception{
        if(Objects.nonNull(this.mqttClient)){
            log.info("mqtt 服務(wù)已重新鏈接...");
            this.mqttClient.connect(this.mqttConnectOptions);
        }
    }
    /**
     * 斷開鏈接
     * @throws Exception
     */
    private void closeConnect() throws Exception{
        if(Objects.nonNull(this.mqttClient)){
            log.info("mqtt 服務(wù)已斷開鏈接...");
            this.mqttClient.disconnect();
        }
    }
/*    *//**
     * 發(fā)布消息
     * @param topic
     * @param message
     * @param qos
     * @throws Exception
     *//*
    public void sendMessage(String topic,String message,int qos) throws Exception {
        if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(message.getBytes());
            mqttMessage.setQos(qos);
            MqttTopic mqttTopic = mqttClient.getTopic(topic);
            if(Objects.nonNull(mqttTopic)){
                try{
                    MqttDeliveryToken publish = mqttTopic.publish(mqttMessage);
                    if(publish.isComplete()){
                        log.info("消息發(fā)送成功---->{}",message);
                    }
                }catch(Exception e){
                    log.error("消息發(fā)送異常",e);
                }
            }
        }else{
            reConnect();
        }
    }*/
}

接收消息部分

package mqttclient.callback;
import lombok.extern.slf4j.Slf4j;
import mqttclient.util.ParsingData2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class MqttMessageCallback2 implements MqttCallback {
    /**
     * 鏈接丟失時處理
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        //可以做重連 或者 其他業(yè)務(wù)處理
    }
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
		System.out.println("接收到消息topic---->{}"+topic);
		System.out.println("接收到消息topic---->{}"+mqttMessage);
        log.info("接收到消息質(zhì)量qos---->{}",mqttMessage.getQos());
		System.out.println("接收到消息質(zhì)量qos---->{}"+mqttMessage.getQos());
        log.info("接收到消息具體信息---->{}",new String(mqttMessage.getPayload()));
		System.out.println("接收到消息具體信息---->{}"+mqttMessage.getPayload());
        //結(jié)合業(yè)務(wù) 編寫具體信息即可
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}

這個兩個寫完之后只要有數(shù)據(jù)發(fā)送過來,這邊會自動進行接收打印。

是用mqtt網(wǎng)頁版圖形化界面進行模擬數(shù)據(jù)發(fā)送。

安裝mqtt后打開此網(wǎng)站:http://localhost:18083/

默認賬號是:admin / public

登錄后這邊可以設(shè)置中文:

模擬發(fā)送:這幾個地方不用改動但是一定要點擊綠色的連接才可以,進行發(fā)送。

需要修改的部分是:

然后點擊發(fā)送就可以收到信息了。 

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論