RocketMQ根據(jù)Tag進行消息過濾
一、概述
RocketMQ的消費者可以根據(jù)Tag進行消息過濾,也支持自定義屬性過濾。消息過濾目前是在Broker端實現(xiàn)的,優(yōu)點是減少了對于Consumer無用消息的網(wǎng)絡傳輸,缺點是增加了Broker的負擔、而且實現(xiàn)相對復雜。RocketMQ支持兩種方式的消息過濾。一種是Tag過濾,另外一種是SQL過濾。下面我們分別介紹一下。
二、Tag過濾
在大多數(shù)情況下,Tag是個簡單而有用的設計,其可以來選擇您想要的消息。下面我們通過一個示例演示:
(1)、生產(chǎn)者發(fā)送消息
public class MQProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { // 創(chuàng)建DefaultMQProducer類并設定生產(chǎn)者名稱 DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test"); // 設置NameServer地址,如果是集群的話,使用分號;分隔開 mqProducer.setNamesrvAddr("10.0.90.86:9876"); // 消息最大長度 默認4M mqProducer.setMaxMessageSize(4096); // 發(fā)送消息超時時間,默認3000 mqProducer.setSendMsgTimeout(3000); // 發(fā)送消息失敗重試次數(shù),默認2 mqProducer.setRetryTimesWhenSendAsyncFailed(2); // 啟動消息生產(chǎn)者 mqProducer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { String tag = tags[i % tags.length]; String msg = "hello, 這是第" + (i + 1) + "條消息"; // 創(chuàng)建消息,并指定Topic(主題),Tag(標簽)和消息內(nèi)容 Message message = new Message("FilterMessageTopic", tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 發(fā)送同步消息到一個Broker,可以通過sendResult返回消息是否成功送達 SendResult sendResult = mqProducer.send(message); System.out.println(sendResult); } // 如果不再發(fā)送消息,關(guān)閉Producer實例 mqProducer.shutdown(); } }
啟動生產(chǎn)者,如下可看到,10條消息成功發(fā)送到Broker中。
SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648020000, offsetMsgId=0A005A5600002A9F000000000000548C, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=1], queueOffset=3] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648110001, offsetMsgId=0A005A5600002A9F000000000000555D, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=3] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D86481D0002, offsetMsgId=0A005A5600002A9F000000000000562E, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=3], queueOffset=3] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648220003, offsetMsgId=0A005A5600002A9F00000000000056FF, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=0], queueOffset=2] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648290004, offsetMsgId=0A005A5600002A9F00000000000057D0, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=1], queueOffset=4] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648300005, offsetMsgId=0A005A5600002A9F00000000000058A1, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=4] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648370006, offsetMsgId=0A005A5600002A9F0000000000005972, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=3], queueOffset=4] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D86483D0007, offsetMsgId=0A005A5600002A9F0000000000005A43, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=0], queueOffset=3] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648430008, offsetMsgId=0A005A5600002A9F0000000000005B14, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=1], queueOffset=5] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648490009, offsetMsgId=0A005A5600002A9F0000000000005BE5, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=5]
(2)、消費者訂閱消息
主要是通過mqPushConsumer.subscribe("FilterMessageTopic", "TagA || TagC || TagD") 指定需要訂閱的Tag,如果訂閱所有Tag的話,則傳入*即可。
public class MQConsumer { public static void main(String[] args) throws MQClientException { // 創(chuàng)建DefaultMQPushConsumer類并設定消費者名稱 DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test"); // 設置NameServer地址,如果是集群的話,使用分號;分隔開 mqPushConsumer.setNamesrvAddr("10.0.90.86:9876"); // 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 // 如果不是第一次啟動,那么按照上次消費的位置繼續(xù)消費 mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 設置消費模型,集群還是廣播,默認為集群 mqPushConsumer.setMessageModel(MessageModel.CLUSTERING); // 消費者最小線程量 mqPushConsumer.setConsumeThreadMin(5); // 消費者最大線程量 mqPushConsumer.setConsumeThreadMax(10); // 設置一次消費消息的條數(shù),默認是1 mqPushConsumer.setConsumeMessageBatchMaxSize(1); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息,如果訂閱該主題下的所有tag,則使用* // 本例中,只訂閱Tag為: TagA 、 TagC 、 TagD的消息 mqPushConsumer.subscribe("FilterMessageTopic", "TagA || TagC || TagD"); // 注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息 mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // 監(jiān)聽類實現(xiàn)MessageListenerConcurrently接口即可,重寫consumeMessage方法接收數(shù)據(jù) @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = msgList.get(0); String body = new String(messageExt.getBody(), StandardCharsets.UTF_8); System.out.println("消費者接收到消息: " + messageExt.toString() + "---消息內(nèi)容為:" + body); // 標記該消息已經(jīng)被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 mqPushConsumer.start(); } }
如下,可看到消費者端接收到6條消息。
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=209, queueOffset=2, sysFlag=0, bornTimestamp=1646019187746, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187082, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F00000000000056FF, commitLogOffset=22271, bodyCRC=1188153005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=4, CONSUME_START_TIME=1646019218458, UNIQ_KEY=AC6E004E14E418B4AAC28D8648220003, CLUSTER=DefaultCluster, TAGS=TagD}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 52, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第4條消息
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=209, queueOffset=3, sysFlag=0, bornTimestamp=1646019187773, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187109, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F0000000000005A43, commitLogOffset=23107, bodyCRC=1559045667, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=4, CONSUME_START_TIME=1646019218458, UNIQ_KEY=AC6E004E14E418B4AAC28D86483D0007, CLUSTER=DefaultCluster, TAGS=TagC}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 56, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第8條消息
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=209, queueOffset=5, sysFlag=0, bornTimestamp=1646019187779, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187115, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F0000000000005B14, commitLogOffset=23316, bodyCRC=858737949, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1646019219467, UNIQ_KEY=AC6E004E14E418B4AAC28D8648430008, CLUSTER=DefaultCluster, TAGS=TagD}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 57, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第9條消息
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=209, queueOffset=3, sysFlag=0, bornTimestamp=1646019187715, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187057, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F000000000000548C, commitLogOffset=21644, bodyCRC=553127401, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1646019219468, UNIQ_KEY=AC6E004E14E418B4AAC28D8648020000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 49, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第1條消息
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=209, queueOffset=3, sysFlag=0, bornTimestamp=1646019187741, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187077, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F000000000000562E, commitLogOffset=22062, bodyCRC=604888532, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=5, CONSUME_START_TIME=1646019219472, UNIQ_KEY=AC6E004E14E418B4AAC28D86481D0002, CLUSTER=DefaultCluster, TAGS=TagC}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 51, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第3條消息
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=209, queueOffset=4, sysFlag=0, bornTimestamp=1646019187760, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187097, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F00000000000058A1, commitLogOffset=22689, bodyCRC=1109661328, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1646019219473, UNIQ_KEY=AC6E004E14E418B4AAC28D8648300005, CLUSTER=DefaultCluster, TAGS=TagA}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 54, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第6條消息
具體分析如下:
// 消息發(fā)送時總共5個Tag String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; // Tag計算方法 tag = i % tags.length = i % 5
- 第1條消息,i = 0,消息的標簽tag = tags[i % tags.length] = tags[0] = TagA;
- 第2條消息,i = 1,消息的標簽tag = tags[i % tags.length] = tags[1] = TagB;
- 第3條消息,i = 2,消息的標簽tag = tags[i % tags.length] = tags[2] = TagC;
- 第4條消息,i = 3,消息的標簽tag = tags[i % tags.length] = tags[3] = TagD;
- 第5條消息,i = 4,消息的標簽tag = tags[i % tags.length] = tags[4] = TagE;
- 第6條消息,i = 5,消息的標簽tag = tags[i % tags.length] = tags[0] = TagA;
- 第7條消息,i = 6,消息的標簽tag = tags[i % tags.length] = tags[1] = TagB;
- 第8條消息,i = 7,消息的標簽tag = tags[i % tags.length] = tags[2] = TagC;
- 第9條消息,i = 8,消息的標簽tag = tags[i % tags.length] = tags[3] = TagD;
- 第10條消息,i = 9,消息的標簽tag = tags[i % tags.length] = tags[4] = TagE;
因為消費者端只訂閱了 TagA 、 TagC 、 TagD的消息,所以對應上面的,消費者端只會收到六條消息,即第1、3、4、6、8、9條消息。
三、根據(jù)自定義屬性進行過濾 (SQL過濾)
通過Tag過濾消息可以很方便地選擇您想要的消息,但是對于比較復雜的場合,使用Tag過濾的話可能不太滿足條件。在這種情況下,可以使用SQL表達式篩選消息。SQL特性可以通過發(fā)送消息時的屬性來進行計算。
RocketMQ只定義了一些基本語法來支持這個特性。
- 數(shù)值比較,比如:>,>=,<,<=,BETWEEN,=;
- 字符比較,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 邏輯符號 AND,OR,NOT;
常量支持類型為:
- 數(shù)值,比如:123,3.1415;
- 字符,比如:'abc',必須用單引號包裹起來;
- NULL,特殊的常量
- 布爾值,TRUE 或 FALSE
注意,只有使用push推送模式的消費者才能用使用SQL92標準的sql語句,pull拉取模式的消費者是不支持這個功能的。
下面我們通過一個示例演示:
(1)、生產(chǎn)者發(fā)送消息
生產(chǎn)者發(fā)送消息時,通過putUserProperty來設置消息的屬性,實際上就是通過一個Map將用戶自定義的屬性保存到消息的properties屬性中。
public class MQProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { // 創(chuàng)建DefaultMQProducer類并設定生產(chǎn)者名稱 DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test"); // 設置NameServer地址,如果是集群的話,使用分號;分隔開 mqProducer.setNamesrvAddr("10.0.90.86:9876"); // 消息最大長度 默認4M mqProducer.setMaxMessageSize(4096); // 發(fā)送消息超時時間,默認3000 mqProducer.setSendMsgTimeout(3000); // 發(fā)送消息失敗重試次數(shù),默認2 mqProducer.setRetryTimesWhenSendAsyncFailed(2); // 啟動消息生產(chǎn)者 mqProducer.start(); for (int i = 0; i < 10; i++) { String msg = "hello, 這是第" + (i + 1) + "條消息"; // 創(chuàng)建消息,并指定Topic(主題),Tag(標簽)和消息內(nèi)容 Message message = new Message("FilterMessageTopic", "", msg.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 設置用戶的一些自定義屬性,本質(zhì)上就是保存到一個Map中:private Map<String, String> properties message.putUserProperty("num", String.valueOf(i)); message.putUserProperty("info", i % 2 == 0 ? "aaa" : "bbb"); // 發(fā)送同步消息到一個Broker,可以通過sendResult返回消息是否成功送達 SendResult sendResult = mqProducer.send(message); System.out.println(sendResult); } // 如果不再發(fā)送消息,關(guān)閉Producer實例 mqProducer.shutdown(); } }
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE709D90000, offsetMsgId=0A005A5600002A9F0000000000006E4E, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE709EF0001, offsetMsgId=0A005A5600002A9F0000000000006F24, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=3], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE709F50002, offsetMsgId=0A005A5600002A9F0000000000006FFA, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=0], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE709FB0003, offsetMsgId=0A005A5600002A9F00000000000070D0, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=1], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A020004, offsetMsgId=0A005A5600002A9F00000000000071A6, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A0C0005, offsetMsgId=0A005A5600002A9F000000000000727C, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=3], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A120006, offsetMsgId=0A005A5600002A9F0000000000007352, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=0], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A1C0007, offsetMsgId=0A005A5600002A9F0000000000007428, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=1], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A210008, offsetMsgId=0A005A5600002A9F00000000000074FE, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=13]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A270009, offsetMsgId=0A005A5600002A9F00000000000075D4, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=3], queueOffset=13]
(2)、消費者消費消息
消費者端使用如下接口指定SQL過濾的語法:
public void subscribe(finalString topic, final MessageSelector messageSelector) // 用MessageSelector.bySql來使用sql篩選消息 MessageSelector messageSelector = MessageSelector.bySql("xxxx");
public class MQConsumer { public static void main(String[] args) throws MQClientException { // 創(chuàng)建DefaultMQPushConsumer類并設定消費者名稱 DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test"); // 設置NameServer地址,如果是集群的話,使用分號;分隔開 mqPushConsumer.setNamesrvAddr("10.0.90.86:9876"); // 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 // 如果不是第一次啟動,那么按照上次消費的位置繼續(xù)消費 mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 設置消費模型,集群還是廣播,默認為集群 mqPushConsumer.setMessageModel(MessageModel.CLUSTERING); // 消費者最小線程量 mqPushConsumer.setConsumeThreadMin(5); // 消費者最大線程量 mqPushConsumer.setConsumeThreadMax(10); // 設置一次消費消息的條數(shù),默認是1 mqPushConsumer.setConsumeMessageBatchMaxSize(1); // 用MessageSelector.bySql來使用sql篩選消息 mqPushConsumer.subscribe("FilterMessageTopic", MessageSelector.bySql("(num between 0 and 5 ) and (info = 'aaa')")); // 注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息 mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // 監(jiān)聽類實現(xiàn)MessageListenerConcurrently接口即可,重寫consumeMessage方法接收數(shù)據(jù) @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = msgList.get(0); String body = new String(messageExt.getBody(), StandardCharsets.UTF_8); System.out.println("消費者接收到消息: " + messageExt.toString() + "---消息內(nèi)容為:" + body); // 標記該消息已經(jīng)被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 mqPushConsumer.start(); } }
我們直接運行消費者,發(fā)現(xiàn)啟動報錯了,如下:
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
這個錯誤是由于RocketMQ默認是關(guān)閉了屬性過濾功能的,如果需要使用該功能,需要開啟enablePropertyFilter的屬性,將該屬性置為true才可以。也就是我們需要在RocketMQ的配置文件中添加如下配置:
// 開啟屬性過濾功能
enablePropertyFilter=true
重新啟動RocketMQ后,再次運行消費者,如下可看到,消費者接收到三條消息:
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=214, queueOffset=11, sysFlag=0, bornTimestamp=1646025528795, bornHost=/10.0.90.115:59083, storeTimestamp=1646025527395, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F0000000000006E4E, commitLogOffset=28238, bodyCRC=553127401, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=14, num=0, CONSUME_START_TIME=1646025559201, UNIQ_KEY=AC6E00924AE418B4AAC28DE709D90000, CLUSTER=DefaultCluster, info=aaa}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 49, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第1條消息
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=214, queueOffset=9, sysFlag=0, bornTimestamp=1646025528821, bornHost=/10.0.90.115:59083, storeTimestamp=1646025527417, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F0000000000006FFA, commitLogOffset=28666, bodyCRC=604888532, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=11, num=2, CONSUME_START_TIME=1646025559201, UNIQ_KEY=AC6E00924AE418B4AAC28DE709F50002, CLUSTER=DefaultCluster, info=aaa}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 51, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第3條消息
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=214, queueOffset=12, sysFlag=0, bornTimestamp=1646025528834, bornHost=/10.0.90.115:59083, storeTimestamp=1646025527433, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F00000000000071A6, commitLogOffset=29094, bodyCRC=689155475, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=14, num=4, CONSUME_START_TIME=1646025559201, UNIQ_KEY=AC6E00924AE418B4AAC28DE70A020004, CLUSTER=DefaultCluster, info=aaa}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 53, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第5條消息
分析:
生產(chǎn)者發(fā)送消息的時候,添加了用戶自定義屬性num、info,通過上述控制臺輸出消息的properties屬性我們也可以看到。num的值其實就是0-9,info的值是偶數(shù)的時候為aaa,奇數(shù)的時候為bbb。消費者通過MessageSelector.bySql("(num between 0 and 5 ) and (info = 'aaa')")指定的過濾條件是:num在[0,5]之間并且info的值為aaa。因此,同時滿足這兩個條件的就只有三條消息。
到此這篇關(guān)于RocketMQ根據(jù)Tag進行消息過濾的文章就介紹到這了,更多相關(guān)RocketMQ消息過濾內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談Java中OutOfMemoryError問題產(chǎn)生原因
本文主要介紹了淺談Java中OutOfMemoryError問題產(chǎn)生原因,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-06-06Spring Boot快速實現(xiàn) IP地址解析的示例詳解
這篇文章主要介紹了Spring Boot快速實現(xiàn)IP地址解析,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-08-08SpringBoot詳解MySQL如何實現(xiàn)讀寫分離
當響應的瓶頸在數(shù)據(jù)庫的時候,就要考慮數(shù)據(jù)庫的讀寫分離,當然還可以分庫分表,那是單表數(shù)據(jù)量特別大,當單表數(shù)據(jù)量不是特別大,但是請求量比較大的時候,就要考慮讀寫分離了.具體的話,還是要看自己的業(yè)務...如果還是很慢,那就要分庫分表了...我們這篇就簡單講一下讀寫分離2022-09-09openEuler?搭建java開發(fā)環(huán)境的詳細過程
這篇文章主要介紹了openEuler?搭建java開發(fā)環(huán)境,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-06-06Spring Boot前后端分離開發(fā)模式中的跨域問題及解決方法
本文介紹了解決Spring Boot前端Vue跨域問題的實戰(zhàn)經(jīng)驗,并提供了后端和前端的配置示例,通過配置后端和前端,我們可以輕松解決跨域問題,實現(xiàn)正常的前后端交互,需要的朋友可以參考下2023-09-09IDEA的Web項目右鍵無法創(chuàng)建Servlet問題解決辦法
這篇文章主要介紹了IDEA的Web項目右鍵無法創(chuàng)建Servlet問題解決辦法的相關(guān)資料,在IDEA中新建Servlet時發(fā)現(xiàn)缺失選項,可以通過在pom.xml文件中添加servlet依賴解決,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2024-10-10