圖文并茂講解RocketMQ消息類別
1、同步消息
即時性較強,重要的消息,且必須有回執(zhí)的消息,例如短信,通知(轉(zhuǎn)賬成功)
生產(chǎn)者:
public class Producer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.23.127:9876"); producer.start(); for (int i = 1; i <= 5; i++) { Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8")); //同步消息發(fā)送 SendResult result = producer.send(msg); System.out.println("返回結(jié)果:"+result); } producer.shutdown(); } }
消費者:
public class Consumer { public static void main(String[] args) throws Exception{ DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.23.127:9876"); consumer.subscribe("topic2","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個 標記后相同的消息講不會再次發(fā)給消費者 } }); consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運行 System.out.println("接收消息服務(wù)已運行"); } }
測試:
2、異步消息
即時性較弱,但需要有回執(zhí)的消息,例如訂單中的某些信息
生產(chǎn)者:
public class Producer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.23.127:9876"); producer.start(); for (int i = 1; i <= 5; i++) { //異步消息發(fā)送 Message msg = new Message("topic2",("異步消息:hello rocketmq "+i).getBytes("UTF-8")); producer.send(msg, new SendCallback() { //表示成功返回結(jié)果 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } //表示發(fā)送消息失敗 @Override public void onException(Throwable throwable) { System.out.println(throwable); } }); } //添加一個休眠操作,確保異步消息返回后能夠輸出 TimeUnit.SECONDS.sleep(10); producer.shutdown(); } }
消費者:
public class Consumer { public static void main(String[] args) throws Exception{ DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.23.127:9876"); consumer.subscribe("topic2","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個 標記后相同的消息講不會再次發(fā)給消費者 } }); consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運行 System.out.println("接收消息服務(wù)已運行"); } }
測試:
3、單向消息
不需要有回執(zhí)的消息,例如日志類消息
生產(chǎn)者:
public class Producer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.23.127:9876"); producer.start(); for (int i = 1; i <= 5; i++) { //單向消息 Message msg = new Message("topic2",("單向消息:hello rocketmq "+i).getBytes("UTF-8")); producer.sendOneway(msg); } //添加一個休眠操作,確保異步消息返回后能夠輸出 TimeUnit.SECONDS.sleep(10); producer.shutdown(); } }
消費者代碼同上
測試:
總結(jié) 同步消息
SendResult result = producer.send(msg);
異步消息(回調(diào)處理結(jié)果必須在生產(chǎn)者進程結(jié)束前執(zhí)行,否則回調(diào)無法正確執(zhí)行)
producer.send(msg, new SendCallback() { //表示成功返回結(jié)果 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } //表示發(fā)送消息失敗 @Override public void onException(Throwable throwable) { System.out.println(throwable); } });
單向消息
producer.sendOneway(msg);
到此這篇關(guān)于圖文并茂講解RocketMQ消息類別的文章就介紹到這了,更多相關(guān)RocketMQ消息類別內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot+Email發(fā)送郵件的實現(xiàn)示例
Spring?Boot提供了簡單而強大的郵件發(fā)送功能,本文主要介紹了SpringBoot+Email發(fā)送郵件的實現(xiàn)示例,具有一定的參考價值,感興趣的可以了解一下2024-03-03javax.mail.SendFailedException: Sending failed問題原因
這篇文章主要介紹了javax.mail.SendFailedException: Sending failed問題原因,需要的朋友可以參考下2015-05-05IntelliJ?IDEA?2023.2最新版激活方法及驗證ja-netfilter配置是否成功
隨著2023.2版本的發(fā)布,用戶們渴望了解如何激活這個最新版的IDE,本文將介紹三種可行的激活方案,包括許可證服務(wù)器、許可證代碼和idea?vmoptions配置,幫助讀者成功激活并充分利用IDEA的功能,感興趣的朋友參考下吧2023-08-08springboot中RestTemplate發(fā)送HTTP請求的實現(xiàn)示例
RestTemplate是一個 spring-web 提供的執(zhí)行HTTP請求的同步阻塞式工具類,本文就來介紹一下RestTemplate發(fā)送HTTP請求,具有一定的參考價值,感興趣的可以了解一下2024-03-03IDEA安裝部署Alibaba Cloud Toolkit的實現(xiàn)步驟
Alibaba Cloud Toolkit是阿里云針對IDE平臺為開發(fā)者提供的一款插件,本文主要介紹了IDEA安裝部署Alibaba Cloud Toolkit的實現(xiàn)步驟,具有一定的參考價值,感興趣的可以了解一下2023-08-08Java中的HttpServletRequestWrapper用法解析
這篇文章主要介紹了Java中的HttpServletRequestWrapper用法解析,HttpServletRequest 對參數(shù)值的獲取實際調(diào)的是org.apache.catalina.connector.Request,沒有提供對應(yīng)的set方法修改屬性,所以不能對前端傳來的參數(shù)進行修改,需要的朋友可以參考下2024-01-01