springboot整合netty-mqtt-client實(shí)現(xiàn)Mqtt消息的訂閱和發(fā)布示例
1.添加依賴
<dependency> <groupId>org.jetlinks</groupId> <artifactId>netty-mqtt-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency>
2.源碼
application.yml
#mqtt配置 mqtt: username: admin password: 123456 #推送信息的連接地址 url: localhost port: 1884 #默認(rèn)發(fā)送的主題 defaultTopic: topic #clientid clientId: client #連接超時(shí)時(shí)間 單位為秒 completionTimeout: 300 #設(shè)置會(huì)話心跳時(shí)間 單位為秒 keepAliveInterval: 20
MqttProperties.java
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @Data @ConfigurationProperties(prefix = "mqtt") public class MqttProperties { private String username; private String password; private String url; private int port; private String clientId; private String defaultTopic; private int completionTimeout; private int keepAliveInterval; }
MqttConfig.java
import com.xingyun.netty.mqtt.prop.MqttProperties; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import lombok.AllArgsConstructor; import org.jetlinks.mqtt.client.*; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @AllArgsConstructor @Configuration @EnableConfigurationProperties(MqttProperties.class) public class MqttConfig { private final MqttProperties mqttProperties; @Bean public MqttClientConfig getMqttClientConfig() { MqttClientConfig mqttClientConfig = new MqttClientConfig(); mqttClientConfig.setClientId(mqttProperties.getClientId()); mqttClientConfig.setUsername(mqttProperties.getClientId()); mqttClientConfig.setPassword(mqttProperties.getPassword()); /*mqttClientConfig.setTimeoutSeconds(mqttProperties.getCompletionTimeout()); mqttClientConfig.setRetryInterval(mqttProperties.getKeepAliveInterval()); mqttClientConfig.setProtocolVersion(MqttVersion.MQTT_3_1_1); mqttClientConfig.setReconnect(true);*/ return mqttClientConfig; } @Bean public MqttClient getMqttClient(){ EventLoopGroup loop = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2); MqttClient mqttClient = new MqttClientImpl(getMqttClientConfig(),null); mqttClient.setEventLoop(loop); mqttClient.setCallback(getMqttClientCallback()); mqttClient.connect(mqttProperties.getUrl(), mqttProperties.getPort()).addListener(future -> { if (future.isSuccess()){ System.out.println("mqtt客戶端已建立連接"); //#為多層通配符,+為單層通配符 mqttClient.on("#",getMqttHandler()); } }); return mqttClient; } @Bean public MqttHandler getMqttHandler(){ return (topic,payload) -> { System.out.println("消息主題:" + topic); System.out.println("消息內(nèi)容:" + payload); }; } @Bean public MqttClientCallback getMqttClientCallback(){ return new MqttClientCallback() { @Override public void connectionLost(Throwable cause) { cause.printStackTrace(); } @Override public void onSuccessfulReconnect() { System.out.println("客戶端已重連"); } }; } }
3.運(yùn)行測(cè)試
客戶端利用不同主題,發(fā)送消息
控制臺(tái)
消息主題:testTopic/001
消息內(nèi)容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32, widx: 32, cap: 512))
消息主題:testTopic/001
消息內(nèi)容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32, widx: 32, cap: 512))
消息主題:test/sub/001
消息內(nèi)容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 31, widx: 31, cap: 496))
消息主題:test1
消息內(nèi)容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 496))
單元測(cè)試發(fā)布消息
MqttSeviceDemo.java
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.jetlinks.mqtt.client.MqttClient; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class MqttSeviceDemo { @Autowired private MqttClient mqttClient; @Test public void publishMessage(){ String test = "I am client9527"; byte[] bytes = test.getBytes(); ByteBuf byteBuf = Unpooled.copiedBuffer(bytes); mqttClient.publish("test/pub/001",byteBuf); System.out.println("消息已發(fā)布"); } }
客戶端訂閱到消息
到此這篇關(guān)于springboot整合netty-mqtt-client實(shí)現(xiàn)Mqtt消息的訂閱和發(fā)布示例的文章就介紹到這了,更多相關(guān)springboot Mqtt消息訂閱和發(fā)布內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java doGet, doPost方法和文件上傳實(shí)例代碼
這篇文章主要介紹了Java doGet, doPost方法和文件上傳實(shí)例代碼的相關(guān)資料,需要的朋友可以參考下2016-11-11java設(shè)計(jì)模式-代理模式(實(shí)例講解)
下面小編就為大家?guī)?lái)一篇java設(shè)計(jì)模式-代理模式(實(shí)例講解)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-09-09Java實(shí)現(xiàn)的微信公眾號(hào)獲取微信用戶信息示例
這篇文章主要介紹了Java實(shí)現(xiàn)的微信公眾號(hào)獲取微信用戶信息,結(jié)合實(shí)例形式分析了Java微信公眾號(hào)獲取微信用戶信息相關(guān)原理、步驟與操作注意事項(xiàng),需要的朋友可以參考下2019-10-10Java實(shí)現(xiàn)大文件的分片上傳與下載(springboot+vue3)
這篇文章主要為大家詳細(xì)介紹了java基于springboot+vue3如何大文件的分片上傳與下載,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2023-06-06解決Java項(xiàng)目中request流只能獲取一次的問(wèn)題
Java項(xiàng)目開(kāi)發(fā)中可能存在以下幾種情況,你需要在攔截器中統(tǒng)一攔截請(qǐng)求和你項(xiàng)目里可能需要搞一個(gè)統(tǒng)一的異常處理器,這兩種情況是比較常見(jiàn)的,本文將給大家介紹如何解決Java項(xiàng)目中request流只能獲取一次的問(wèn)題,需要的朋友可以參考下2024-02-02