欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Android MQTT與WebSocket協(xié)議詳細講解

 更新時間:2022年11月01日 08:31:51   作者:幸大叔  
MQTT(消息隊列遙測傳輸)是ISO 標準(ISO/IEC PRF 20922)下基于發(fā)布/訂閱范式的消息協(xié)議。它工作在TCP/IP協(xié)議族上,是為硬件性能低下的遠程設(shè)備以及網(wǎng)絡(luò)狀況糟糕的情況下而設(shè)計的發(fā)布/訂閱型消息協(xié)議

MQTT

MQTT是一個極其輕量級的發(fā)布/訂閱消息傳輸協(xié)議,對于需要較小代碼占用空間或網(wǎng)絡(luò)帶寬非常寶貴的遠程連接非常有用

有如下特點:

  • 開放消息協(xié)議,簡單易實現(xiàn);
  • 發(fā)布訂閱模式,一對多消息發(fā)布;
  • 基于TCP/IP網(wǎng)絡(luò)連接,提供有序,無損,雙向連接;
  • 1字節(jié)固定報頭,2字節(jié)心跳報文,最小化傳輸開銷和協(xié)議交換,有效減少網(wǎng)絡(luò)流量;
  • 消息QoS支持,可靠傳輸保證。

添加依賴

        maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }

    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

首先,連接服務(wù)器,SERVER_HOST為主機名,CLIENT_ID為客戶端唯一標識,還需要用戶名和密碼,當然,這些一般由服務(wù)器返回。

    fun connect() {
        try {
            //MemoryPersistence設(shè)置clientId的保存形式,默認為以內(nèi)存保存
            client = MqttAsyncClient(SERVER_HOST, CLIENT_ID, MemoryPersistence())
            //MQTT連接設(shè)置
            options = MqttConnectOptions()
            with(options) {
                //是否清空session,true表示每次連接到服務(wù)器都以新的身份,false表示服務(wù)器會保留客戶的連接記錄
                isCleanSession = true
                //用戶名和密碼
                userName = USERNAME
                password = PASSWORD.toCharArray()
                //超時時間,單位是秒
                connectionTimeout = 30
                //會話心跳時間,單位是秒,服務(wù)器每隔30秒向客戶端發(fā)送消息判斷客戶端是否在線
                keepAliveInterval = 30
            }
            //設(shè)置回調(diào)
            client!!.setCallback(PushCallBack())
            client!!.connect(options, context, iMqttActionListener)
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

心跳包:在空閑時間內(nèi),如果客戶端沒有其他任何報文發(fā)送,必須發(fā)送一個PINGREQ報文到服務(wù)器,而如果服務(wù)端在 1.5 倍心跳時間內(nèi)沒有收到客戶端消息,則會主動斷開客戶端的連接,發(fā)送其遺囑消息給所有訂閱者,而服務(wù)端收到PINGREQ報文之后,立即返回PINGRESP報文給客戶端。

如果兩個客戶端的 clientID 一樣,那么服務(wù)端記錄第一個客戶端連接之后再收到第二個客戶端連接請求,則會向第一個客戶端發(fā)送 Disconnect 報文斷開連接,并連接第二個客戶端。

遺囑消息:MqttConnectOptions的setWill方法可以設(shè)置遺囑消息,客戶端沒有主動向服務(wù)端發(fā)起disconnect斷開連接消息,但服務(wù)端檢測到和客戶端的連接已斷開,此時服務(wù)端將該客戶端設(shè)置的遺囑消息發(fā)送出去,遺囑Topic中不能存在通配符。

應(yīng)用場景:客戶端掉線之后,可以及時通知到所有訂閱該遺囑topic的客戶端。

訂閱消息。

    fun subscribeMessage(topic: String, qos: Int) {
        client?.let {
            try {
                it.subscribe(topic, qos)
            } catch (e: MqttException) {
                e.printStackTrace()
            }
        }
    }

MQTT是一種發(fā)布/訂閱的消息協(xié)議, 通過設(shè)定的主題topic,發(fā)布者向topic發(fā)送的payload負載消息會經(jīng)過服務(wù)器,轉(zhuǎn)發(fā)到所有訂閱該topic的訂閱者。topic有兩個通配符,“+” 和 “#”,與 “/” 組合使用,“+” 只能表示一級topic,“#” 可以表示任意層級,例如,訂閱topic為 “guangdong/+”,發(fā)布者發(fā)布的topic可以是 guangdong,guangdong/shenzhen,但是不能是guangdong/shenzhen/nanshan,而訂閱的topic如果是 “guangdong/#” 則可以收到。

Qos為服務(wù)質(zhì)量等級,qos=0,表示發(fā)送方只發(fā)一次,不管是否發(fā)成功,也不管接收方是否成功接收,適用于不重要的數(shù)據(jù)傳輸;qos=1,表示消息至少有一次到達接收方,發(fā)送方發(fā)送消息,需要等待接收方返回應(yīng)答消息,如果發(fā)送方在一定時間內(nèi)未收到應(yīng)答,發(fā)送方將繼續(xù)發(fā)送,直到收到應(yīng)答消息,刪除本地消息緩存,不再發(fā)送。適用于需要收到所有消息,客戶端可以處理重復(fù)消息;qos = 2:確保消息只一次到達接收方,一般我們都會選擇2。

發(fā)布消息

    fun publish(topic: String, msg: String, isRetained: Boolean, qos: Int) {
        try {
            client?.let {
                val message = MqttMessage()
                message.qos = qos
                message.isRetained = isRetained
                message.payload = msg.toByteArray()
                it.publish(topic, message)
            }
        } catch (e: MqttPersistenceException) {
            e.printStackTrace()
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

payload為負載消息,字節(jié)流類型,是 MQTT 通信傳輸?shù)恼鎸崝?shù)據(jù)。retain是保留信息,服務(wù)端將保留對應(yīng)topic最新的一條消息記錄,保留消息的作用是每次客戶端連上都會收到其topic的最后一條保留消息。

整個封裝類如下:

class MQTTManager private constructor(private val context: Context) {
    private var client: MqttAsyncClient? = null
    private lateinit var options: MqttConnectOptions
    private val iMqttActionListener = object : IMqttActionListener {
        override fun onSuccess(asyncActionToken: IMqttToken?) {
            //連接成功,開始訂閱
            subscribeMessage(TOPIC, 2)
        }
        override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
            //連接失敗
        }
    }
    companion object {
        @Volatile
        private var instance: MQTTManager? = null
        fun getInstance(context: Context): MQTTManager = instance ?: synchronized(this) {
            instance ?: MQTTManager(context).also {
                instance = it
            }
        }
    }
    /**
     * 連接服務(wù)器
     */
    fun connect() {
        try {
            //MemoryPersistence設(shè)置clientId的保存形式,默認為以內(nèi)存保存
            client = MqttAsyncClient(SERVER_HOST, CLIENT_ID, MemoryPersistence())
            //MQTT連接設(shè)置
            options = MqttConnectOptions()
            with(options) {
                //是否清空session,true表示每次連接到服務(wù)器都以新的身份,false表示服務(wù)器會保留客戶的連接記錄
                isCleanSession = true
                //用戶名和密碼
                userName = USERNAME
                password = PASSWORD.toCharArray()
                //超時時間,單位是秒
                connectionTimeout = 30
                //會話心跳時間,單位是秒,服務(wù)器每隔30秒向客戶端發(fā)送消息判斷客戶端是否在線
                keepAliveInterval = 30
                //自動重連
                isAutomaticReconnect = true
            }
            //設(shè)置回調(diào)
            client!!.setCallback(PushCallBack())
            client!!.connect(options, context, iMqttActionListener)
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }
    /**
     * 訂閱消息
     */
    fun subscribeMessage(topic: String, qos: Int) {
        client?.let {
            try {
                it.subscribe(topic, qos)
            } catch (e: MqttException) {
                e.printStackTrace()
            }
        }
    }
    /**
     * 發(fā)布消息
     */
    fun publish(topic: String, msg: String, isRetained: Boolean, qos: Int) {
        try {
            client?.let {
                val message = MqttMessage()
                message.qos = qos
                message.isRetained = isRetained
                message.payload = msg.toByteArray()
                it.publish(topic, message)
            }
        } catch (e: MqttPersistenceException) {
            e.printStackTrace()
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }
    /**
     * 斷開連接
     */
    fun disconnect() {
        client?.takeIf {
            it.isConnected
        }?.let {
            try {
                it.disconnect()
                instance = null
            } catch (e: MqttException) {
                e.printStackTrace()
            }
        }
    }
    /**
     * 判斷是否連接
     */
    fun isConnected() = if (client != null) client!!.isConnected else false
    inner class PushCallBack : MqttCallback {
        override fun connectionLost(cause: Throwable?) {
            //斷開連接
        }
        override fun messageArrived(topic: String?, message: MqttMessage?) {
            //接收消息回調(diào)
        }
        override fun deliveryComplete(token: IMqttDeliveryToken?) {
            //發(fā)布消息完成后的回調(diào)
        }
    }
}

WebSocket

WebSocket是應(yīng)用層的一種協(xié)議,是建立在TCP協(xié)議基礎(chǔ)上的,主要特點就是全雙工通信,允許服務(wù)器主動發(fā)送信息給客戶端。

這里使用OKHTTP進行WebSocket開發(fā)。

class WebSocketManager private constructor() {
    private val client: OkHttpClient = OkHttpClient.Builder().writeTimeout(5, TimeUnit.SECONDS)
        .readTimeout(5, TimeUnit.SECONDS)
        .connectTimeout(5, TimeUnit.SECONDS)
        .build()
    private var webSocket: WebSocket? = null
    companion object {
        val instance: WebSocketManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { WebSocketManager() }
    }
    fun connect(url: String) {
        webSocket?.cancel()
        val request = Request.Builder().url(url).build()
        webSocket = client.newWebSocket(request, object : WebSocketListener() {
            //連接成功后回調(diào)
            override fun onOpen(webSocket: WebSocket, response: Response) {
                super.onOpen(webSocket, response)
            }
            //服務(wù)器發(fā)送消息給客戶端時回調(diào)
            override fun onMessage(webSocket: WebSocket, text: String) {
                super.onMessage(webSocket, text)
            }
            //服務(wù)器發(fā)送的byte類型消息
            override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
                super.onMessage(webSocket, bytes)
            }
            //服務(wù)器連接關(guān)閉中
            override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
                super.onClosing(webSocket, code, reason)
            }
            //服務(wù)器連接已關(guān)閉
            override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
                super.onClosed(webSocket, code, reason)
            }
            //服務(wù)器連接失敗
            override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
                super.onFailure(webSocket, t, response)
            }
        })
    }
    //發(fā)送消息
    private fun sendMessage(message: String) {
        webSocket?.send(message)
    }
    private fun close(code: Int, reason: String) {
        webSocket?.close(code, reason)
    }
}

總結(jié)

WebSocket是一種簡單的報文協(xié)議,著重解決客戶端與服務(wù)端不能雙向通信的問題。

MQTT是基于TCP的發(fā)布/訂閱消息傳輸協(xié)議,客戶端可以創(chuàng)建和訂閱任意主題,并向主題發(fā)布或接收消息,此外,有許多為物聯(lián)網(wǎng)優(yōu)化的特性,比如服務(wù)質(zhì)量等級Qos,層級主題,遺囑信息等。

MQTT和WebSocket都是應(yīng)用層協(xié)議,都使用TCP協(xié)議來確??煽總鬏?,都支持雙向通信,都使用二進制編碼。WebSocket更簡單靈活,MQTT相對復(fù)雜,但功能強大,大家可以根據(jù)自己的使用場景來選擇 。

到此這篇關(guān)于Android MQTT與WebSocket協(xié)議詳細講解的文章就介紹到這了,更多相關(guān)Android MQTT與WebSocket內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論