RocketMQ消息發(fā)送與消息類別詳解
如果事務(wù)執(zhí)行成功并選擇提交事務(wù),則產(chǎn)生注冊成功消息,進(jìn)入下一步,需要的朋友可以參考下
一、消息發(fā)送
1.1 單生產(chǎn)者單消費(fèi)者消息發(fā)送(OneToOne)
1、新建maven項(xiàng)目recketmqtest
2、導(dǎo)入RocketMQ客戶端坐標(biāo)
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency>
3、生產(chǎn)者
package com.liming.base; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; /** * @author 黎明 * @version 1.0 * 生產(chǎn)者 * @date 2023/5/21 9:08 */ public class Producer { public static void main(String[] args) throws Exception { /** 1. 誰來發(fā)? 2. 發(fā)給誰? 3. 怎么發(fā)? 4. 發(fā)什么? 5. 發(fā)的結(jié)果是什么? 6. 打掃戰(zhàn)場 **/ // 1、創(chuàng)建一個發(fā)送消息的對象Producer DefaultMQProducer producer = new DefaultMQProducer("group1"); // 2、設(shè)定發(fā)送的命名服務(wù)器地址 producer.setNamesrvAddr("localhost:9876"); // 3.1 啟動發(fā)送的服務(wù) producer.start(); // 4、創(chuàng)建要發(fā)送的消息對象 Message message = new Message("topic1", "tag1","hello recketmq".getBytes()); // 3.2 發(fā)送消息 SendResult sendResult = producer.send(message); System.out.println("返回結(jié)果:" + sendResult); // 5、關(guān)閉連接 producer.shutdown(); } }
4、消費(fèi)者
package com.liming.base; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @author 黎明 * @version 1.0 * 消費(fèi)者 * @date 2023/5/21 9:17 */ public class Consumer { public static void main(String[] args) throws MQClientException { //1.創(chuàng)建一個接收消息的對象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.設(shè)定接收的命名服務(wù)器地址 consumer.setNamesrvAddr("localhost:9876"); //3.設(shè)置接收消息對應(yīng)的topic,對應(yīng)的sub標(biāo)簽為任意 consumer.subscribe("topic1", "*"); //3.開啟監(jiān)聽,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //遍歷消息 for (MessageExt msg : list) { System.out.println("收到的消息:" + msg); byte[] body = msg.getBody(); System.out.println(new String(body)); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4.啟動接收消息的服務(wù) consumer.start(); System.out.println("接收消息服務(wù)已經(jīng)開啟!"); //5 不要關(guān)閉消費(fèi)者! } }
1.2 單生產(chǎn)者多消費(fèi)者消息發(fā)送(OneToMany)
生產(chǎn)者
//1.創(chuàng)建一個發(fā)送消息的對象Producer DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.設(shè)定發(fā)送的命名服務(wù)器地址 producer.setNamesrvAddr("localhost:9876"); //3.1啟動發(fā)送的服務(wù) producer.start(); for (int i = 0; i < 10; i++) { //4.創(chuàng)建要發(fā)送的消息對象,指定topic,指定內(nèi)容body Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes(); //3.2發(fā)送消息 SendResult result = producer.send(msg); System.out.println("返回結(jié)果:" + result); } //5.關(guān)閉連接 producer.shutdown();
消費(fèi)者(負(fù)載均衡模式:默認(rèn)模式)
//1.創(chuàng)建一個接收消息的對象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.設(shè)定接收的命名服務(wù)器地址 consumer.setNamesrvAddr("localhost:9876"); //3.設(shè)置接收消息對應(yīng)的topic,對應(yīng)的sub標(biāo)簽為任意 consumer.subscribe("topic1","*"); //設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(默認(rèn)模式:負(fù)載均衡) consumer.setMessageModel(MessageModel.CLUSTERING); //3.開啟監(jiān)聽,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //遍歷消息 for (MessageExt msg : list) { System.out.println("收到消息:"+msg); System.out.println("消息是:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4.啟動接收消息的服務(wù) consumer.start(); System.out.println("接受消息服務(wù)已經(jīng)開啟!"); //5 不要關(guān)閉消費(fèi)者!
消費(fèi)者(廣播模式)
//1.創(chuàng)建一個接收消息的對象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.設(shè)定接收的命名服務(wù)器地址 consumer.setNamesrvAddr("localhost:9876"); //3.設(shè)置接收消息對應(yīng)的topic,對應(yīng)的sub標(biāo)簽為任意 consumer.subscribe("topic1","*"); //設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(默認(rèn)模式:負(fù)載均衡) //consumer.setMessageModel(MessageModel.CLUSTERING); //設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(廣播模式) consumer.setMessageModel(MessageModel.BROADCASTING); //3.開啟監(jiān)聽,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //遍歷消息 for (MessageExt msg : list) { System.out.println("收到消息:"+msg); System.out.println("消息是:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4.啟動接收消息的服務(wù) consumer.start(); System.out.println("接受消息服務(wù)已經(jīng)開啟!"); //5 不要關(guān)閉消費(fèi)者!
1.3 多生產(chǎn)者多消費(fèi)者消息發(fā)送(ManyToMany)
多生產(chǎn)者產(chǎn)生的消息可以被同一個消費(fèi)者消費(fèi),也可以被多個消費(fèi)者消費(fèi)
二、消息類別
2.1 同步消息
特征:即時性較強(qiáng),重要的消息,且必須有回執(zhí)的消息,例如短信,通知(轉(zhuǎn)賬成功)
代碼實(shí)現(xiàn)(生產(chǎn)者中):
SendResult result = producer.send(msg);
2.2 異步消息
特征:即時性較弱,但需要有回執(zhí)的消息,例如訂單中的某些信息
代碼實(shí)現(xiàn)(生產(chǎn)者中):
//1.創(chuàng)建一個發(fā)送消息的對象Producer DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.設(shè)定發(fā)送的命名服務(wù)器地址 producer.setNamesrvAddr("localhost:9876"); //3.1啟動發(fā)送的服務(wù) producer.start(); for (int i = 0; i < 10; i++) { //4.創(chuàng)建要發(fā)送的消息對象,指定topic,指定內(nèi)容body Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes("UTF-8")); //3.2 同步消息 //SendResult result = producer.send(msg); //System.out.println("返回結(jié)果:" + result); //異步消息 producer.send(msg, new SendCallback() { //表示成功返回結(jié)果 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } //表示發(fā)送消息失敗 @Override public void onException(Throwable throwable) { System.out.println(throwable); } }); System.out.println("消息"+i+"發(fā)完了,做業(yè)務(wù)邏輯去了!"); } //休眠10秒 TimeUnit.SECONDS.sleep(10); //5.關(guān)閉連接 producer.shutdown();
2.3 單向消息
特征:不需要有回執(zhí)的消息,例如日志類消息
代碼實(shí)現(xiàn)(生產(chǎn)者中):
producer.sendOneway(msg);
2.4 延時消息
消息發(fā)送時并不直接發(fā)送到消息服務(wù)器,而是根據(jù)設(shè)定的等待時間到達(dá),起到延時到達(dá)的緩沖作用
Message msg = new Message("topic3",("延時消息:hello rocketmq "+i).getBytes("UTF-8")); //設(shè)置延時等級3,這個消息將在10s之后發(fā)送(現(xiàn)在只支持固定的幾個時間,詳看delayTimeLevel) msg.setDelayTimeLevel(3); SendResult result = producer.send(msg); System.out.println("返回結(jié)果:"+result);
目前支持的消息時間:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
2.5 批量消息
批量發(fā)送消息能顯著提高傳遞小消息的性能.
發(fā)送批量消息:
List<Message> msgList = new ArrayList<Message>(); Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8")); Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8")); Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8")); msgList.add(msg1); msgList.add(msg2); msgList.add(msg3); SendResult result = producer.send(msgList);
注意限制:
- 這些批量消息應(yīng)該有相同的topic
- 相同的waitStoreMsgOK
- 不能是延時消息
- 消息內(nèi)容總長度不超過4M
三、消息過濾
3.1 分類過濾
按照tag過濾信息
生產(chǎn)者:
Message msg = new Message("topic6","tag2",("消息過濾按照tag:hello rocketmq 2").getBytes("UTF-8"));
消費(fèi)者:
//接收消息的時候,除了制定topic,還可以指定接收的tag,*代表任意tag consumer.subscribe("topic6","tag1 || tag2");
3.2 語法過濾(屬性過濾/語法過濾/SQL過濾)
基本語法:
- 數(shù)值比較,比如:>,>=,<,<=,BETWEEN,=;
- 字符比較,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 邏輯符號 AND,OR,NOT;
常量支持類型為:
- 數(shù)值,比如:123,3.1415;
- 字符,比如:‘abc’,必須用單引號包裹起來;
- NULL,特殊的常量
- 布爾值,TRUE 或 FALSE
生產(chǎn)者:
//為消息添加屬性 msg.putUserProperty("vip","1"); msg.putUserProperty("age","20");
消費(fèi)者:
//使用消息選擇器來過濾對應(yīng)的屬性,語法格式為類SQL語法 consumer.subscribe("topic7", MessageSelector.bySql("age >= 18")); consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));
注意:SQL過濾需要依賴服務(wù)器的功能支持,在broker.conf配置文件中添加對應(yīng)的功能項(xiàng),并開啟對應(yīng)功能
enablePropertyFilter=true
重啟broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
或者直接cmd中輸入(D:\software\rocketmq-all-4.8.0-bin-release\bin)
mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue
四、事務(wù)消息
1、正常事務(wù)過程(灰色線條)
2、事務(wù)補(bǔ)償過程(藍(lán)色線條)
事務(wù)消息狀態(tài)
- 提交狀態(tài):允許進(jìn)入隊(duì)列,此消息與非事務(wù)消息無區(qū)別
- 回滾狀態(tài):不允許進(jìn)入隊(duì)列,此消息等同于未發(fā)送過
- 中間狀態(tài):完成了half消息的發(fā)送,未對MQ進(jìn)行二次狀態(tài)確認(rèn)
注意:事務(wù)消息僅與生產(chǎn)者有關(guān),與消費(fèi)者無關(guān)
代碼實(shí)現(xiàn):
提交狀態(tài)
//事務(wù)消息使用的生產(chǎn)者是TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("localhost:9876"); //添加本地事務(wù)對應(yīng)的監(jiān)聽 producer.setTransactionListener(new TransactionListener() { //正常事務(wù)過程 public LocalTransactionState executeLocalTransaction(Message message, Object o) { return LocalTransactionState.COMMIT_MESSAGE; } //事務(wù)補(bǔ)償過程 public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { return null; } }); producer.start(); Message msg = new Message("topic8",("事務(wù)消息:hello rocketmq ").getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg,null); System.out.println("返回結(jié)果:"+result); producer.shutdown();
回滾狀態(tài)
producer.setTransactionListener(new TransactionListener() { //正常事務(wù) @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return LocalTransactionState.ROLLBACK_MESSAGE; } //事務(wù)補(bǔ)償 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { return null; } });
中間狀態(tài)
public static void main(String[] args) throws Exception { TransactionMQProducer producer=new TransactionMQProducer("group1"); producer.setNamesrvAddr("localhost:9876"); producer.setTransactionListener(new TransactionListener() { //正常事務(wù) @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return LocalTransactionState.UNKNOW; } //事務(wù)補(bǔ)償 正常執(zhí)行UNKNOW才會觸發(fā) @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("事務(wù)補(bǔ)償"); return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); Message msg = new Message("topic13", "hello rocketmq".getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg, null); System.out.println("返回結(jié)果:" + result); //事務(wù)補(bǔ)償生產(chǎn)者一定要一直啟動著 //producer.shutdown(); }
到此這篇關(guān)于RocketMQ消息發(fā)送與消息類別詳解的文章就介紹到這了,更多相關(guān)RocketMQ消息類別內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
mybatis 映射文件中if標(biāo)簽判斷字符串相等的兩種方式
這篇文章主要介紹了mybatis 映射文件中if標(biāo)簽判斷字符串相等的方式,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2018-06-06Java利用Guava?Retry實(shí)現(xiàn)重處理
guava-retrying是谷歌的Guava庫的一個小擴(kuò)展,允許為任意函數(shù)調(diào)用創(chuàng)建可配置的重試策略,比如與正常運(yùn)行時間不穩(wěn)定的遠(yuǎn)程服務(wù)對話的函數(shù)調(diào)用。本文將利用其實(shí)現(xiàn)重處理,感興趣的可以了解一下2022-08-08maven父工程relativepath標(biāo)簽使用解讀
文章主要介紹了在使用Maven構(gòu)建父子工程時如何通過設(shè)置父工程和子工程的pom文件來管理依賴和版本,當(dāng)子工程是Spring Boot項(xiàng)目時,可以通過關(guān)閉`relativePath`標(biāo)簽來繼承Spring Boot的父工程,同時在父工程中使用`dependencyManagement`標(biāo)簽來統(tǒng)一管理Spring Boot的依賴版本2024-11-11