使用java?實現(xiàn)mqtt兩種常用方式
前言
在開發(fā)MQTT時有兩種方式一種是使用Paho Java 原生庫來完成,一種是使用spring boot 來完成。
Paho Java 庫實現(xiàn)
Eclipse Paho Java Client (opens new window)是用 Java 編寫的 MQTT 客戶端庫(MQTT Java Client),可用于 JVM 或其他 Java 兼容平臺(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 異步和同步 API
- 通過 Maven 安裝 Paho Java
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
- Paho Java 使用示例
Java 體系中 Paho Java 是比較穩(wěn)定、廣泛應用的 MQTT 客戶端庫,本示例包含 Java 語言的 Paho Java 連接 EMQX Broker,并進行消息收發(fā)完整代碼:
package io.emqx; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class App { public static void main(String[] args) { String subTopic = "testtopic/#"; String pubTopic = "testtopic/1"; String content = "Hello World"; int qos = 2; String broker = "tcp://broker.emqx.io:1883"; String clientId = "emqx_test"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(broker, clientId, persistence); // MQTT 連接選項 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName("emqx_test"); connOpts.setPassword("emqx_test_password".toCharArray()); // 保留會話 connOpts.setCleanSession(true); // 設置回調(diào) client.setCallback(new PushCallback()); // 建立連接 System.out.println("Connecting to broker: " + broker); client.connect(connOpts); System.out.println("Connected"); System.out.println("Publishing message: " + content); // 訂閱 client.subscribe(subTopic); // 消息發(fā)布所需參數(shù) MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); client.publish(pubTopic, message); System.out.println("Message published"); client.disconnect(); System.out.println("Disconnected"); client.close(); System.exit(0); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
回調(diào)消息處理類 OnMessageCallback.java
package io.emqx; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class OnMessageCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 連接丟失后,一般在這里面進行重連 System.out.println("連接斷開,可以做重連"); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息會執(zhí)行到這里面 System.out.println("接收消息主題:" + topic); System.out.println("接收消息Qos:" + message.getQos()); System.out.println("接收消息內(nèi)容:" + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
好的上述就實現(xiàn)了簡單的 MQTT的連接和消息收發(fā)。
spring boot集成mqtt
spring boot 環(huán)境
spring-boot 版本 2.2.2 spring-integration的版本為:5.4.3 Spring Integration提供了入站適配器和出站適配器以支持MQTT協(xié)議。
Maven 依賴:
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.4.3</version> </dependency>
配置文件 application.yml:
spring: mqtt: username: password: url: tcp://ip:port clientId: clientId topic: default completionTimeout: 2000
核心代碼
配置類
@Data @Configuration @ConfigurationProperties(prefix = "spring.mqtt") public class MqttConfiguration { private String username; private String password; private String url; private String clientId; private String topic = "TOPIC_DEFAULT"; private Integer completionTimeout = 2000; /** * 注冊MQTT客戶端工廠 * @return */ @Bean public MqttPahoClientFactory mqttClientFactory(){ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); //如果設置為 false,客戶端和服務器將在客戶端、服務器和連接重新啟動時保持狀態(tài)。隨著狀態(tài)的保持: // 即使客戶端、服務器或連接重新啟動,消息傳遞也將可靠地滿足指定的 QOS。服務器將訂閱視為持久的。 // 如果設置為 true,客戶端和服務器將不會在客戶端、服務器或連接重新啟動時保持狀態(tài)。 options.setCleanSession(true); //該值以秒為單位,必須>0,定義了客戶端等待與 MQTT 服務器建立網(wǎng)絡連接的最大時間間隔。 // 默認超時為 30 秒。值 0 禁用超時處理,這意味著客戶端將等待直到網(wǎng)絡連接成功或失敗。 options.setConnectionTimeout(0); //此值以秒為單位,定義發(fā)送或接收消息之間的最大時間間隔,必須>0 options.setKeepAliveInterval(90); //自動重新連接 options.setAutomaticReconnect(true); options.setUserName(this.getUsername()); options.setPassword(this.getPassword().toCharArray()); options.setServerURIs(new String[]{this.getUrl()}); factory.setConnectionOptions(options); return factory; } }
@Slf4j @AllArgsConstructor @Configuration @IntegrationComponentScan public class MqttInboundConfiguration { private MqttConfiguration mqttConfig; private MqttPahoClientFactory factory; private MqttMessageReceiver mqttMessageReceiver; /** * 此處可以使用其他消息通道 * Spring Integration默認的消息通道,它允許將消息發(fā)送給一個訂閱者,然后阻礙發(fā)送直到消息被接收。 * * @return */ @Bean public MessageChannel mqttInBoundChannel() { return new DirectChannel(); } /** * 適配器, 兩個topic共用一個adapter * 客戶端作為消費者,訂閱主題,消費消息 * * @param * @param * @return */ @Bean public MessageProducerSupport mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic()); adapter.setCompletionTimeout(60000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setRecoveryInterval(10000); adapter.setQos(0); adapter.setOutputChannel(mqttInBoundChannel()); return adapter; } /** * mqtt入站消息處理工具,對于指定消息入站通道接收到生產(chǎn)者生產(chǎn)的消息后處理消息的工具。 * * @return */ @Bean @ServiceActivator(inputChannel = "mqttInBoundChannel") public MessageHandler mqttMessageHandler() { return this.mqttMessageReceiver; } }
數(shù)據(jù)接收
@Slf4j @AllArgsConstructor @Component public class MqttMessageReceiver implements MessageHandler { @Override public void handleMessage(Message<?> message) throws MessagingException { try { MessageHeaders headers = message.getHeaders(); //獲取消息Topic String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); log.info("[獲取到的消息的topic :]{} ", receivedTopic); //獲取消息體 String payload = (String) message.getPayload(); log.info("[獲取到的消息的payload :]{} ", payload); //todo .... } catch (Exception e) { e.printStackTrace(); } } }
@Slf4j @AllArgsConstructor @Configuration public class MqttOutboundConfiguration { private MqttConfiguration mqttConfig; private MqttPahoClientFactory factory; @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( mqttConfig.getClientId()+"-"+System.currentTimeMillis() + System.currentTimeMillis(), factory); messageHandler.setDefaultQos(0); //開啟異步 messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttConfig.getTopic()); return messageHandler; } }
發(fā)送者
@Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { /** * 發(fā)送mqtt消息 * @param topic 主題 * @param payload 內(nèi)容 * @return void */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 發(fā)送包含qos的消息 * @param topic 主題 * @param qos 對消息處理的幾種機制。 * * 0 表示的是訂閱者沒收到消息不會再次發(fā)送,消息會丟失。<br> * * 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導致訂閱者收到多次重復消息。<br> * * 2 多了一次去重的動作,確保訂閱者收到的消息有一次。 * @param payload 消息體 * @return void */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); /** * 發(fā)送包含qos的消息 * @param topic 主題 * @param qos 對消息處理的幾種機制。 * * 0 表示的是訂閱者沒收到消息不會再次發(fā)送,消息會丟失。<br> * * 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導致訂閱者收到多次重復消息。<br> * * 2 多了一次去重的動作,確保訂閱者收到的消息有一次。 * @param payload 消息體 * @return void */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); }
@Component @AllArgsConstructor public class MqttMessageSender { private MqttGateway mqttGateway; /** * 發(fā)送mqtt消息 * @param topic 主題 * @param message 內(nèi)容 * @return void */ public void send(String topic, String message) { mqttGateway.sendToMqtt(topic, message); } /** * 發(fā)送包含qos的消息 * @param topic 主題 * @param qos 質(zhì)量 * @param messageBody 消息體 * @return void */ public void send(String topic, int qos, JSONObject messageBody){ mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); } /** * 發(fā)送包含qos的消息 * @param topic 主題 * @param qos 質(zhì)量 * @param message 消息體 * @return void */ public void send(String topic, int qos, byte[] message){ mqttGateway.sendToMqtt(topic, qos, message); } }
總結(jié)
綜上所述上面就是我們經(jīng)常用的到兩種方式,希望對你有所幫助
到此這篇關(guān)于使用java 實現(xiàn)mqtt兩種方式的文章就介紹到這了,更多相關(guān)java 實現(xiàn) mqtt內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MyBatis-Plus非表字段的三種處理方法小結(jié)
這篇文章主要介紹了MyBatis-Plus非表字段的三種處理方法小結(jié),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08Java畢業(yè)設計實戰(zhàn)項目之倉庫管理系統(tǒng)的實現(xiàn)流程
這是一個使用了java+SSM+Maven+Bootstrap+mysql開發(fā)的倉庫管理系統(tǒng),是一個畢業(yè)設計的實戰(zhàn)練習,具有一個倉庫管理系統(tǒng)該有的所有功能,感興趣的朋友快來看看吧2022-01-01SpringBoot整合Shiro的環(huán)境搭建教程
這篇文章主要為大家詳細介紹了SpringBoot整合Shiro的環(huán)境搭建教程,文中的示例代碼講解詳細,具有一定的借鑒價值,感興趣的小伙伴可以了解一下2022-12-12Jenkins+Docker+Gitee+SpringBoot自動化部署
本文主要介紹了Jenkins+Docker+Gitee+SpringBoot自動化部署,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-03-03SpringBoot權(quán)限認證Sa-Token的使用總結(jié)
Sa-Token是一款輕量級Java權(quán)限認證框架,適用于快速搭建權(quán)限系統(tǒng),它提供了豐富的功能,包括登錄認證、權(quán)限驗證、角色驗證、Session會話管理等,并且具有良好的社區(qū)支持和文檔資源,下面重點給大家介紹SpringBoot權(quán)限認證Sa-Token的使用,感興趣的朋友一起看看吧2025-02-02