圖文并茂講解RocketMQ消息類別
1、同步消息
即時性較強(qiáng),重要的消息,且必須有回執(zhí)的消息,例如短信,通知(轉(zhuǎn)賬成功)

生產(chǎn)者:
public class Producer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8"));
//同步消息發(fā)送
SendResult result = producer.send(msg);
System.out.println("返回結(jié)果:"+result);
}
producer.shutdown();
}
}
消費者:
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.23.127:9876");
consumer.subscribe("topic2","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
//System.out.println("收到消息:"+msg);
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個 標(biāo)記后相同的消息講不會再次發(fā)給消費者
}
});
consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運行
System.out.println("接收消息服務(wù)已運行");
}
}測試:

2、異步消息
即時性較弱,但需要有回執(zhí)的消息,例如訂單中的某些信息

生產(chǎn)者:
public class Producer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
//異步消息發(fā)送
Message msg = new Message("topic2",("異步消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.send(msg, new SendCallback() {
//表示成功返回結(jié)果
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
//表示發(fā)送消息失敗
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
}
//添加一個休眠操作,確保異步消息返回后能夠輸出
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
}
消費者:
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.23.127:9876");
consumer.subscribe("topic2","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
//System.out.println("收到消息:"+msg);
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個 標(biāo)記后相同的消息講不會再次發(fā)給消費者
}
});
consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運行
System.out.println("接收消息服務(wù)已運行");
}
}
測試:

3、單向消息
不需要有回執(zhí)的消息,例如日志類消息

生產(chǎn)者:
public class Producer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
//單向消息
Message msg = new Message("topic2",("單向消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.sendOneway(msg);
}
//添加一個休眠操作,確保異步消息返回后能夠輸出
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
}
消費者代碼同上
測試:

總結(jié) 同步消息
SendResult result = producer.send(msg);
異步消息(回調(diào)處理結(jié)果必須在生產(chǎn)者進(jìn)程結(jié)束前執(zhí)行,否則回調(diào)無法正確執(zhí)行)
producer.send(msg, new SendCallback() {
//表示成功返回結(jié)果
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
//表示發(fā)送消息失敗
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
單向消息
producer.sendOneway(msg);
到此這篇關(guān)于圖文并茂講解RocketMQ消息類別的文章就介紹到這了,更多相關(guān)RocketMQ消息類別內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot+Email發(fā)送郵件的實現(xiàn)示例
Spring?Boot提供了簡單而強(qiáng)大的郵件發(fā)送功能,本文主要介紹了SpringBoot+Email發(fā)送郵件的實現(xiàn)示例,具有一定的參考價值,感興趣的可以了解一下2024-03-03
javax.mail.SendFailedException: Sending failed問題原因
這篇文章主要介紹了javax.mail.SendFailedException: Sending failed問題原因,需要的朋友可以參考下2015-05-05
IntelliJ?IDEA?2023.2最新版激活方法及驗證ja-netfilter配置是否成功
隨著2023.2版本的發(fā)布,用戶們渴望了解如何激活這個最新版的IDE,本文將介紹三種可行的激活方案,包括許可證服務(wù)器、許可證代碼和idea?vmoptions配置,幫助讀者成功激活并充分利用IDEA的功能,感興趣的朋友參考下吧2023-08-08
springboot中RestTemplate發(fā)送HTTP請求的實現(xiàn)示例
RestTemplate是一個 spring-web 提供的執(zhí)行HTTP請求的同步阻塞式工具類,本文就來介紹一下RestTemplate發(fā)送HTTP請求,具有一定的參考價值,感興趣的可以了解一下2024-03-03
IDEA安裝部署Alibaba Cloud Toolkit的實現(xiàn)步驟
Alibaba Cloud Toolkit是阿里云針對IDE平臺為開發(fā)者提供的一款插件,本文主要介紹了IDEA安裝部署Alibaba Cloud Toolkit的實現(xiàn)步驟,具有一定的參考價值,感興趣的可以了解一下2023-08-08
Java中的HttpServletRequestWrapper用法解析
這篇文章主要介紹了Java中的HttpServletRequestWrapper用法解析,HttpServletRequest 對參數(shù)值的獲取實際調(diào)的是org.apache.catalina.connector.Request,沒有提供對應(yīng)的set方法修改屬性,所以不能對前端傳來的參數(shù)進(jìn)行修改,需要的朋友可以參考下2024-01-01

