SpringBoot實現(xiàn)EMQ設(shè)備的上下線告警
前言

我遇到了一個難題,即在使用 EMQ X 4.4.10 的開源版本時,我需要實現(xiàn)設(shè)備的上下線狀態(tài)監(jiān)控,但該 4.4.10開源版本 并未內(nèi)置設(shè)備上下線提醒模塊,只有企業(yè)版才內(nèi)置了該模塊。這為我?guī)砹艘恍┘夹g(shù)上的難題,迫使我必須另辟蹊徑,通過其他方法來監(jiān)聽設(shè)備的上下線狀態(tài)。為了解決這個問題,我采取了以下步驟:
首先,我修改了 EMQ X 的 acl.config 文件,添加了以下規(guī)則:
{allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.圖示:

這個規(guī)則允許訂閱 $SYS/brokers/+/clients/# 主題的所有客戶端。
接下來,我使用 Spring Boot 創(chuàng)建了一個應(yīng)用程序,其中我設(shè)置了與 EMQ X 代理的連接。在這個應(yīng)用程序中,我創(chuàng)建了一個監(jiān)聽器,用于訂閱 $SYS/brokers/+/clients/# 主題,以感知設(shè)備的上下線信息。
通過這個解決方案,我能夠?qū)崟r監(jiān)控設(shè)備的連接和斷開事件,而不受 EMQ X 開源版本的功能限制。這使我能夠根據(jù)設(shè)備狀態(tài)采取適當(dāng)?shù)拇胧热绨l(fā)送通知或執(zhí)行其他自定義操作。這個方法允許我靈活地定制解決方案,以滿足我的特定需求,而無需依賴 EMQ X 的功能。
EMQ簡介

EMQ ( Erlang MQTT Broker )是一種基于 Erlang 編程語言開發(fā)的開源消息傳遞代理( MQTT broker ),專門用于支持 MQTT ( Message Queuing Telemetry Transport )協(xié)議。MQTT是一種輕量級、高效的消息傳遞協(xié)議,最初設(shè)計用于連接受限的設(shè)備,如嵌入式系統(tǒng)和物聯(lián)網(wǎng)設(shè)備。 EMQ 作為 MQTT broker ,提供了可靠的消息傳遞機制,使設(shè)備能夠相互通信,同時也可與后端應(yīng)用程序集成,使其成為物聯(lián)網(wǎng)生態(tài)系統(tǒng)中的重要組成部分。
環(huán)境
EMQX安裝方式:DockerEMQX版本:4.4.10開源版本- 操作系統(tǒng):
CentOS 7

下載

準備工作
安裝EMQX

修改EMQX的ACL規(guī)則內(nèi)容
EMQX的Docker容器配置文件所在目錄
# 進入EMQX容器 docker exec -it emqx /bin/sh # 進入配置文件目錄 cd /opt/emqx/etc # 查看acl配置文件 cat acl.conf # 編輯acl配置文件 vi acl.conf
非Docker容器配置文件所在目錄
# 進入配置文件目錄 cd /etc/emqx # 查看acl配置文件 cat acl.conf # 編輯acl配置文件 vi acl.conf
acl的默認文件內(nèi)容
%%--------------------------------------------------------------------
%% [ACL](https://docs.emqx.io/broker/v3/en/config.html)
%%
%% -type(who() :: all | binary() |
%% {ipaddr, esockd_access:cidr()} |
%% {ipaddrs, [esockd_access:cidr()]} |
%% {client, binary()} |
%% {user, binary()}).
%%
%% -type(access() :: subscribe | publish | pubsub).
%%
%% -type(topic() :: binary()).
%%
%% -type(rule() :: {allow, all} |
%% {allow, who(), access(), list(topic())} |
%% {deny, all} |
%% {deny, who(), access(), list(topic())}).
%%--------------------------------------------------------------------
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
{allow, all}.新增一條ACL規(guī)則
allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.
allow: 表示這是一個允許(allow)訪問的規(guī)則。這意味著與此規(guī)則匹配的操作將被允許。all: 表示這個規(guī)則適用于所有的客戶端。subscribe: 表示這個規(guī)則定義了對主題的訂閱權(quán)限。$SYS/brokers/+/clients/#: 這是一個主題過濾器,它指定了匹配的主題模式。在這里,$SYS/brokers/+/clients/#表示以$SYS/brokers/開頭,然后是一個通配符 +,它可以匹配單個層級的任何名稱,接著是clients/,最后又有一個 # 通配符,它可以匹配零個或多個層級的名稱。因此,這個主題過濾器匹配了所有以$SYS/brokers/開頭,然后緊跟著clients/的主題。
綜合起來,這個規(guī)則允許所有的客戶端訂閱以
$SYS/brokers/開頭,然后跟著clients/的所有主題。通常,這種規(guī)則用于允許所有客戶端訂閱系統(tǒng)級別的信息或監(jiān)控數(shù)據(jù),如經(jīng)紀人(Broker)的客戶端連接狀態(tài)等。這可以用于監(jiān)視和診斷MQTT Broker的性能和狀態(tài)。
注意:修改完畢之后使用 emqx_ctl reload_acl 重新加載acl規(guī)則或者直接重啟emqx服務(wù)
搭建一個能夠監(jiān)聽EMQX主題的Spring Boot應(yīng)用程序
MQTT相關(guān)依賴
<!-- MQTT相關(guān)依賴 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>MQTT接受訂閱的主題
$SYS/brokers/+/clients/#
處理設(shè)備上下線事件
獲取 EMQX 消息主題
// 從消息請求頭中獲取消息主題topic String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
獲取topic最后的節(jié)點字符串
以下方式通過主題 topic 來獲取 ClientId
// topic最后的節(jié)點字符串
String lastPart = extractLastPart(topic);
// 獲取消息內(nèi)容
String payload = StrUtil.str((byte[]) message.getPayload(), "utf-8");
// 判斷設(shè)備是上線或下線消息
if ("connected".equals(lastPart)) {
// 設(shè)備上線消息
clientId = extractClientIdFromTopic(topic);
log.info("設(shè)備上線提醒 -> IMEI:{}", clientId);
} else if ("disconnected".equals(lastPart)) {
// 設(shè)備下線消息
clientId = extractClientIdFromTopic(topic);
log.info("設(shè)備下線警告 -> IMEI:{}", clientId);
}
/**
* 獲取最后一個節(jié)點
*
* @param topic 主題
* @return 節(jié)點內(nèi)容
*/
public static String extractLastPart(String topic) {
// 使用split方法將字符串根據(jù)'/'分割成數(shù)組
String[] parts = topic.split("/");
// 獲取最后一個元素
String lastPart = parts[parts.length - 1];
return lastPart;
}
/**
* 從Topic中提取ClientId
*
* @param topic 主題
* @return ClientId
*/
public static String extractClientIdFromTopic(String topic) {
// 使用正則表達式匹配主題中的ClientId
String regex = "\\$SYS/brokers/[^/]+/clients/([^/]+)/(connected|disconnected)";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(topic);
// matcher.groupCount() 是一個方法,用于獲取正則表達式中定義的組數(shù)。在正則表達式中,使用括號 () 來定義捕獲組。在這個情況下,正則表達式 \\$SYS/brokers/[^/]+/clients/([^/]+)/(connected|disconnected) 中有兩組,分別是括號內(nèi)的 ([^/]+) 部分和 (connected|disconnected) 部分。matcher.groupCount() 返回的是正則表達式中捕獲組的數(shù)量
if (matcher.matches() && matcher.groupCount() == 2) {
// 如果正則匹配成功,提取ClientId并返回
return matcher.group(1);
} else {
// 如果匹配失敗,返回null或者拋出異常,視情況而定
return null;
}當(dāng)然你也可以通過解析 payload 來獲取更多詳細信息,可參照官方文檔:客戶端上下線事件
| 主題 (Topic) | 說明 |
|---|---|
| ${clientid}/connected | 上線事件。當(dāng)任意客戶端上線時,EMQX 就會發(fā)布該主題的消息 |
| ${clientid}/disconnected | 下線事件。當(dāng)任意客戶端下線時,EMQX 就會發(fā)布該主題的消息 |
connected 事件消息的 Payload 解析成 JSON 格式如下:
{
"username": "foo",
"ts": 1625572213873,
"sockport": 1883,
"proto_ver": 4,
"proto_name": "MQTT",
"keepalive": 60,
"ipaddress": "127.0.0.1",
"expiry_interval": 0,
"connected_at": 1625572213873,
"connack": 0,
"clientid": "emqtt-8348fe27a87976ad4db3",
"clean_start": true
}disconnected 事件消息的 Payload 解析成 JSON 格式如下:
{
"username": "foo",
"ts": 1625572213873,
"sockport": 1883,
"reason": "tcp_closed",
"proto_ver": 4,
"proto_name": "MQTT",
"ipaddress": "127.0.0.1",
"disconnected_at": 1625572213873,
"clientid": "emqtt-8348fe27a87976ad4db3"
}可以解析 JOSN 數(shù)據(jù)拿到 clientid ,注意此處使用的 JSON 解析工具是 Hutool 的。
// 獲取消息內(nèi)容
String payload = StrUtil.str((byte[]) message.getPayload(), "utf-8");
// 解析JSON字符串
JSONObject payloadJsonObject = JSONUtil.parseObj(payload);
// 獲取ClientId
String clientid = payloadJsonObject.getStr("clientid");實現(xiàn)效果

總結(jié)
- 修改 EMQ X ACL 配置: 你在 EMQ X 中修改了 acl.config 文件,添加了相應(yīng)的 ACL 規(guī)則,允許訂閱 $SYS/brokers/+/clients/# 主題的所有客戶端。這個步驟允許你在開源版本中訪問關(guān)鍵的設(shè)備連接信息。
- 創(chuàng)建 Spring Boot 應(yīng)用程序: 通過創(chuàng)建一個 Spring Boot 應(yīng)用程序,你建立了一個連接到 EMQ X 代理的橋梁。這個應(yīng)用程序充當(dāng)了監(jiān)聽器,用于訂閱 $SYS/brokers/+/clients/# 主題,以實時感知設(shè)備的連接和斷開事件。
- 實時設(shè)備監(jiān)控: 你的解決方案允許你實時監(jiān)控設(shè)備的連接狀態(tài),而無需依賴 EMQ X 企業(yè)版的內(nèi)置功能。這使你能夠快速響應(yīng)設(shè)備狀態(tài)的變化,并采取相應(yīng)的行動,如發(fā)送通知或執(zhí)行自定義操作。
- 定制性: 通過這個方法,你能夠靈活地定制解決方案,以滿足你的特定需求。你可以根據(jù)設(shè)備狀態(tài)采取不同的操作,因此具有更大的靈活性。
- 避免功能限制: 盡管 EMQ X 4.4.10 開源版本沒有內(nèi)置設(shè)備上下線提醒模塊,但你成功地繞過了這個限制,通過自定義配置和應(yīng)用程序開發(fā)來實現(xiàn)了所需的功能。
- 無需升級或許可證: 你的解決方案不需要升級到 EMQ X 企業(yè)版或購買額外的許可證。這使得你可以在開源版本的基礎(chǔ)上構(gòu)建所需的功能。
- 總而言之,你的解決方案是一種巧妙的方式,通過修改配置和創(chuàng)建一個自定義的 Spring Boot 應(yīng)用程序,實現(xiàn)了設(shè)備上下線狀態(tài)的監(jiān)控和管理。這個方法不僅解決了技術(shù)挑戰(zhàn),還提供了靈活性和定制性,以滿足你的特定需求。這是一個創(chuàng)造性的方法,適用于需要在 EMQ X 開源版本中實現(xiàn)設(shè)備監(jiān)控的情況。
以上就是SpringBoot實現(xiàn)EMQ設(shè)備的上下線告警的詳細內(nèi)容,更多關(guān)于SpringBoot EMQ上下線告警的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java 中組合模型之對象結(jié)構(gòu)模式的詳解
這篇文章主要介紹了Java 中組合模型之對象結(jié)構(gòu)模式的詳解的相關(guān)資料,希望通過本文能幫助到大家理解應(yīng)用對象結(jié)構(gòu)模型,需要的朋友可以參考下2017-09-09
ByteArrayInputStream簡介和使用_動力節(jié)點Java學(xué)院整理
ByteArrayInputStream 是字節(jié)數(shù)組輸入流。它繼承于InputStream。這篇文章主要介紹了ByteArrayInputStream簡介和使用_動力節(jié)點Java學(xué)院整理,需要的朋友可以參考下2017-05-05
解決Mybatis-Plus更新方法不更新NULL字段的問題
這篇文章主要介紹了解決Mybatis-Plus更新方法不更新NULL字段的問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12
java后端請求兌現(xiàn)request的中文亂碼問題解決
文章主要講述了在處理處理方案工作中遇到中文亂碼問題的解決過程,通過復(fù)現(xiàn)和分析亂碼問題,發(fā)現(xiàn)是由于解碼規(guī)則和后端服務(wù)編碼不一致導(dǎo)致的,最終通過修改過濾器中的編碼設(shè)置解決了問題2025-02-02
SpringBoot自動配置Quartz的實現(xiàn)步驟
本文主要介紹了SpringBoot自動配置Quartz的實現(xiàn)步驟,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-11-11
SpringBoot實現(xiàn)讀取YML,yaml,properties文件
yml,yaml,properties三種文件都是用來存放配置的文件,一些靜態(tài)數(shù)據(jù),配置的數(shù)據(jù)都會存放到里邊。本文主要為大家整理了SpringBoot實現(xiàn)讀取YML,yaml,properties文件的方法,需要的可以參考一下2023-04-04
springboot+gradle 構(gòu)建多模塊項目的步驟
這篇文章主要介紹了springboot+gradle 構(gòu)建多模塊項目的步驟,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-05-05

