Android MQTT與WebSocket協(xié)議詳細(xì)講解
MQTT
MQTT是一個(gè)極其輕量級(jí)的發(fā)布/訂閱消息傳輸協(xié)議,對(duì)于需要較小代碼占用空間或網(wǎng)絡(luò)帶寬非常寶貴的遠(yuǎn)程連接非常有用
有如下特點(diǎn):
- 開放消息協(xié)議,簡(jiǎn)單易實(shí)現(xiàn);
- 發(fā)布訂閱模式,一對(duì)多消息發(fā)布;
- 基于TCP/IP網(wǎng)絡(luò)連接,提供有序,無(wú)損,雙向連接;
- 1字節(jié)固定報(bào)頭,2字節(jié)心跳報(bào)文,最小化傳輸開銷和協(xié)議交換,有效減少網(wǎng)絡(luò)流量;
- 消息QoS支持,可靠傳輸保證。
添加依賴
maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
首先,連接服務(wù)器,SERVER_HOST為主機(jī)名,CLIENT_ID為客戶端唯一標(biāo)識(shí),還需要用戶名和密碼,當(dāng)然,這些一般由服務(wù)器返回。
fun connect() {
try {
//MemoryPersistence設(shè)置clientId的保存形式,默認(rèn)為以內(nèi)存保存
client = MqttAsyncClient(SERVER_HOST, CLIENT_ID, MemoryPersistence())
//MQTT連接設(shè)置
options = MqttConnectOptions()
with(options) {
//是否清空session,true表示每次連接到服務(wù)器都以新的身份,false表示服務(wù)器會(huì)保留客戶的連接記錄
isCleanSession = true
//用戶名和密碼
userName = USERNAME
password = PASSWORD.toCharArray()
//超時(shí)時(shí)間,單位是秒
connectionTimeout = 30
//會(huì)話心跳時(shí)間,單位是秒,服務(wù)器每隔30秒向客戶端發(fā)送消息判斷客戶端是否在線
keepAliveInterval = 30
}
//設(shè)置回調(diào)
client!!.setCallback(PushCallBack())
client!!.connect(options, context, iMqttActionListener)
} catch (e: MqttException) {
e.printStackTrace()
}
}
心跳包:在空閑時(shí)間內(nèi),如果客戶端沒有其他任何報(bào)文發(fā)送,必須發(fā)送一個(gè)PINGREQ報(bào)文到服務(wù)器,而如果服務(wù)端在 1.5 倍心跳時(shí)間內(nèi)沒有收到客戶端消息,則會(huì)主動(dòng)斷開客戶端的連接,發(fā)送其遺囑消息給所有訂閱者,而服務(wù)端收到PINGREQ報(bào)文之后,立即返回PINGRESP報(bào)文給客戶端。
如果兩個(gè)客戶端的 clientID 一樣,那么服務(wù)端記錄第一個(gè)客戶端連接之后再收到第二個(gè)客戶端連接請(qǐng)求,則會(huì)向第一個(gè)客戶端發(fā)送 Disconnect 報(bào)文斷開連接,并連接第二個(gè)客戶端。
遺囑消息:MqttConnectOptions的setWill方法可以設(shè)置遺囑消息,客戶端沒有主動(dòng)向服務(wù)端發(fā)起disconnect斷開連接消息,但服務(wù)端檢測(cè)到和客戶端的連接已斷開,此時(shí)服務(wù)端將該客戶端設(shè)置的遺囑消息發(fā)送出去,遺囑Topic中不能存在通配符。
應(yīng)用場(chǎng)景:客戶端掉線之后,可以及時(shí)通知到所有訂閱該遺囑topic的客戶端。
訂閱消息。
fun subscribeMessage(topic: String, qos: Int) {
client?.let {
try {
it.subscribe(topic, qos)
} catch (e: MqttException) {
e.printStackTrace()
}
}
}
MQTT是一種發(fā)布/訂閱的消息協(xié)議, 通過(guò)設(shè)定的主題topic,發(fā)布者向topic發(fā)送的payload負(fù)載消息會(huì)經(jīng)過(guò)服務(wù)器,轉(zhuǎn)發(fā)到所有訂閱該topic的訂閱者。topic有兩個(gè)通配符,“+” 和 “#”,與 “/” 組合使用,“+” 只能表示一級(jí)topic,“#” 可以表示任意層級(jí),例如,訂閱topic為 “guangdong/+”,發(fā)布者發(fā)布的topic可以是 guangdong,guangdong/shenzhen,但是不能是guangdong/shenzhen/nanshan,而訂閱的topic如果是 “guangdong/#” 則可以收到。
Qos為服務(wù)質(zhì)量等級(jí),qos=0,表示發(fā)送方只發(fā)一次,不管是否發(fā)成功,也不管接收方是否成功接收,適用于不重要的數(shù)據(jù)傳輸;qos=1,表示消息至少有一次到達(dá)接收方,發(fā)送方發(fā)送消息,需要等待接收方返回應(yīng)答消息,如果發(fā)送方在一定時(shí)間內(nèi)未收到應(yīng)答,發(fā)送方將繼續(xù)發(fā)送,直到收到應(yīng)答消息,刪除本地消息緩存,不再發(fā)送。適用于需要收到所有消息,客戶端可以處理重復(fù)消息;qos = 2:確保消息只一次到達(dá)接收方,一般我們都會(huì)選擇2。
發(fā)布消息
fun publish(topic: String, msg: String, isRetained: Boolean, qos: Int) {
try {
client?.let {
val message = MqttMessage()
message.qos = qos
message.isRetained = isRetained
message.payload = msg.toByteArray()
it.publish(topic, message)
}
} catch (e: MqttPersistenceException) {
e.printStackTrace()
} catch (e: MqttException) {
e.printStackTrace()
}
}
payload為負(fù)載消息,字節(jié)流類型,是 MQTT 通信傳輸?shù)恼鎸?shí)數(shù)據(jù)。retain是保留信息,服務(wù)端將保留對(duì)應(yīng)topic最新的一條消息記錄,保留消息的作用是每次客戶端連上都會(huì)收到其topic的最后一條保留消息。
整個(gè)封裝類如下:
class MQTTManager private constructor(private val context: Context) {
private var client: MqttAsyncClient? = null
private lateinit var options: MqttConnectOptions
private val iMqttActionListener = object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
//連接成功,開始訂閱
subscribeMessage(TOPIC, 2)
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
//連接失敗
}
}
companion object {
@Volatile
private var instance: MQTTManager? = null
fun getInstance(context: Context): MQTTManager = instance ?: synchronized(this) {
instance ?: MQTTManager(context).also {
instance = it
}
}
}
/**
* 連接服務(wù)器
*/
fun connect() {
try {
//MemoryPersistence設(shè)置clientId的保存形式,默認(rèn)為以內(nèi)存保存
client = MqttAsyncClient(SERVER_HOST, CLIENT_ID, MemoryPersistence())
//MQTT連接設(shè)置
options = MqttConnectOptions()
with(options) {
//是否清空session,true表示每次連接到服務(wù)器都以新的身份,false表示服務(wù)器會(huì)保留客戶的連接記錄
isCleanSession = true
//用戶名和密碼
userName = USERNAME
password = PASSWORD.toCharArray()
//超時(shí)時(shí)間,單位是秒
connectionTimeout = 30
//會(huì)話心跳時(shí)間,單位是秒,服務(wù)器每隔30秒向客戶端發(fā)送消息判斷客戶端是否在線
keepAliveInterval = 30
//自動(dòng)重連
isAutomaticReconnect = true
}
//設(shè)置回調(diào)
client!!.setCallback(PushCallBack())
client!!.connect(options, context, iMqttActionListener)
} catch (e: MqttException) {
e.printStackTrace()
}
}
/**
* 訂閱消息
*/
fun subscribeMessage(topic: String, qos: Int) {
client?.let {
try {
it.subscribe(topic, qos)
} catch (e: MqttException) {
e.printStackTrace()
}
}
}
/**
* 發(fā)布消息
*/
fun publish(topic: String, msg: String, isRetained: Boolean, qos: Int) {
try {
client?.let {
val message = MqttMessage()
message.qos = qos
message.isRetained = isRetained
message.payload = msg.toByteArray()
it.publish(topic, message)
}
} catch (e: MqttPersistenceException) {
e.printStackTrace()
} catch (e: MqttException) {
e.printStackTrace()
}
}
/**
* 斷開連接
*/
fun disconnect() {
client?.takeIf {
it.isConnected
}?.let {
try {
it.disconnect()
instance = null
} catch (e: MqttException) {
e.printStackTrace()
}
}
}
/**
* 判斷是否連接
*/
fun isConnected() = if (client != null) client!!.isConnected else false
inner class PushCallBack : MqttCallback {
override fun connectionLost(cause: Throwable?) {
//斷開連接
}
override fun messageArrived(topic: String?, message: MqttMessage?) {
//接收消息回調(diào)
}
override fun deliveryComplete(token: IMqttDeliveryToken?) {
//發(fā)布消息完成后的回調(diào)
}
}
}WebSocket
WebSocket是應(yīng)用層的一種協(xié)議,是建立在TCP協(xié)議基礎(chǔ)上的,主要特點(diǎn)就是全雙工通信,允許服務(wù)器主動(dòng)發(fā)送信息給客戶端。
這里使用OKHTTP進(jìn)行WebSocket開發(fā)。
class WebSocketManager private constructor() {
private val client: OkHttpClient = OkHttpClient.Builder().writeTimeout(5, TimeUnit.SECONDS)
.readTimeout(5, TimeUnit.SECONDS)
.connectTimeout(5, TimeUnit.SECONDS)
.build()
private var webSocket: WebSocket? = null
companion object {
val instance: WebSocketManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { WebSocketManager() }
}
fun connect(url: String) {
webSocket?.cancel()
val request = Request.Builder().url(url).build()
webSocket = client.newWebSocket(request, object : WebSocketListener() {
//連接成功后回調(diào)
override fun onOpen(webSocket: WebSocket, response: Response) {
super.onOpen(webSocket, response)
}
//服務(wù)器發(fā)送消息給客戶端時(shí)回調(diào)
override fun onMessage(webSocket: WebSocket, text: String) {
super.onMessage(webSocket, text)
}
//服務(wù)器發(fā)送的byte類型消息
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
super.onMessage(webSocket, bytes)
}
//服務(wù)器連接關(guān)閉中
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
super.onClosing(webSocket, code, reason)
}
//服務(wù)器連接已關(guān)閉
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
super.onClosed(webSocket, code, reason)
}
//服務(wù)器連接失敗
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
super.onFailure(webSocket, t, response)
}
})
}
//發(fā)送消息
private fun sendMessage(message: String) {
webSocket?.send(message)
}
private fun close(code: Int, reason: String) {
webSocket?.close(code, reason)
}
}總結(jié)
WebSocket是一種簡(jiǎn)單的報(bào)文協(xié)議,著重解決客戶端與服務(wù)端不能雙向通信的問(wèn)題。
MQTT是基于TCP的發(fā)布/訂閱消息傳輸協(xié)議,客戶端可以創(chuàng)建和訂閱任意主題,并向主題發(fā)布或接收消息,此外,有許多為物聯(lián)網(wǎng)優(yōu)化的特性,比如服務(wù)質(zhì)量等級(jí)Qos,層級(jí)主題,遺囑信息等。
MQTT和WebSocket都是應(yīng)用層協(xié)議,都使用TCP協(xié)議來(lái)確保可靠傳輸,都支持雙向通信,都使用二進(jìn)制編碼。WebSocket更簡(jiǎn)單靈活,MQTT相對(duì)復(fù)雜,但功能強(qiáng)大,大家可以根據(jù)自己的使用場(chǎng)景來(lái)選擇 。
到此這篇關(guān)于Android MQTT與WebSocket協(xié)議詳細(xì)講解的文章就介紹到這了,更多相關(guān)Android MQTT與WebSocket內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 在Node.js下運(yùn)用MQTT協(xié)議實(shí)現(xiàn)即時(shí)通訊及離線推送的方法
- Django連接MQTT的示例代碼
- vue3+vite2+mqtt連接遇到的坑及解決
- vue3使用mqtt的示例代碼
- MQTT.js 入門使用教程
- 詳解JS HTML Web端使用MQTT通訊測(cè)試
- VUE3+mqtt封裝解決多頁(yè)面使用需重復(fù)連接等問(wèn)題(附實(shí)例)
- 解決spring-integration-mqtt頻繁報(bào)Lost connection錯(cuò)誤問(wèn)題
- JS?連接MQTT的使用方法
- 使用java?實(shí)現(xiàn)mqtt兩種常用方式
- MQTT Client實(shí)現(xiàn)消息推送功能的方法詳解
相關(guān)文章
Android itemDecoration接口實(shí)現(xiàn)吸頂懸浮標(biāo)題
這篇文章主要介紹了Android中使用itemdecoration實(shí)現(xiàn)吸頂懸浮標(biāo)題,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-11-11
android閱讀器長(zhǎng)按選擇文字功能實(shí)現(xiàn)代碼
本篇文章主要介紹了android閱讀器長(zhǎng)按選擇文字功能實(shí)現(xiàn)代碼,具有一定的參考價(jià)值,有興趣的可以了解一下2017-07-07
android之json數(shù)據(jù)過(guò)長(zhǎng)打印不全問(wèn)題的解決
這篇文章主要介紹了android之json數(shù)據(jù)過(guò)長(zhǎng)打印不全問(wèn)題的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-04-04
Android 快速實(shí)現(xiàn)防止網(wǎng)絡(luò)重復(fù)請(qǐng)求&按鈕重復(fù)點(diǎn)擊的方法
下面小編就為大家分享一篇Android 快速實(shí)現(xiàn)防止網(wǎng)絡(luò)重復(fù)請(qǐng)求&按鈕重復(fù)點(diǎn)擊的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-01-01
Android SeekBar 自定義thumb旋轉(zhuǎn)動(dòng)畫效果
某些音樂(lè)播放或者視頻播放的界面上,資源還在加載時(shí),進(jìn)度條的原點(diǎn)(thumb)會(huì)顯示一個(gè)轉(zhuǎn)圈的效果。這篇文章主要介紹了Android SeekBar 自定義thumb thumb旋轉(zhuǎn)動(dòng)畫效果,需要的朋友可以參考下2021-11-11
Android中ProgressDialog的dismiss()與cancel()方法的區(qū)別
本文主要介紹了Android中ProgressDialog的dismiss()與cancel()方法的區(qū)別,具有很好的參考價(jià)值。下面跟著小編一起來(lái)看下吧2017-04-04

