RocketMQ根據(jù)Tag進(jìn)行消息過濾
一、概述
RocketMQ的消費(fèi)者可以根據(jù)Tag進(jìn)行消息過濾,也支持自定義屬性過濾。消息過濾目前是在Broker端實(shí)現(xiàn)的,優(yōu)點(diǎn)是減少了對于Consumer無用消息的網(wǎng)絡(luò)傳輸,缺點(diǎn)是增加了Broker的負(fù)擔(dān)、而且實(shí)現(xiàn)相對復(fù)雜。RocketMQ支持兩種方式的消息過濾。一種是Tag過濾,另外一種是SQL過濾。下面我們分別介紹一下。
二、Tag過濾
在大多數(shù)情況下,Tag是個簡單而有用的設(shè)計(jì),其可以來選擇您想要的消息。下面我們通過一個示例演示:
(1)、生產(chǎn)者發(fā)送消息
public class MQProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 創(chuàng)建DefaultMQProducer類并設(shè)定生產(chǎn)者名稱
DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
// 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開
mqProducer.setNamesrvAddr("10.0.90.86:9876");
// 消息最大長度 默認(rèn)4M
mqProducer.setMaxMessageSize(4096);
// 發(fā)送消息超時(shí)時(shí)間,默認(rèn)3000
mqProducer.setSendMsgTimeout(3000);
// 發(fā)送消息失敗重試次數(shù),默認(rèn)2
mqProducer.setRetryTimesWhenSendAsyncFailed(2);
// 啟動消息生產(chǎn)者
mqProducer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
String tag = tags[i % tags.length];
String msg = "hello, 這是第" + (i + 1) + "條消息";
// 創(chuàng)建消息,并指定Topic(主題),Tag(標(biāo)簽)和消息內(nèi)容
Message message = new Message("FilterMessageTopic", tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發(fā)送同步消息到一個Broker,可以通過sendResult返回消息是否成功送達(dá)
SendResult sendResult = mqProducer.send(message);
System.out.println(sendResult);
}
// 如果不再發(fā)送消息,關(guān)閉Producer實(shí)例
mqProducer.shutdown();
}
}啟動生產(chǎn)者,如下可看到,10條消息成功發(fā)送到Broker中。
SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648020000, offsetMsgId=0A005A5600002A9F000000000000548C, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=1], queueOffset=3] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648110001, offsetMsgId=0A005A5600002A9F000000000000555D, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=3] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D86481D0002, offsetMsgId=0A005A5600002A9F000000000000562E, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=3], queueOffset=3] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648220003, offsetMsgId=0A005A5600002A9F00000000000056FF, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=0], queueOffset=2] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648290004, offsetMsgId=0A005A5600002A9F00000000000057D0, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=1], queueOffset=4] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648300005, offsetMsgId=0A005A5600002A9F00000000000058A1, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=4] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648370006, offsetMsgId=0A005A5600002A9F0000000000005972, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=3], queueOffset=4] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D86483D0007, offsetMsgId=0A005A5600002A9F0000000000005A43, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=0], queueOffset=3] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648430008, offsetMsgId=0A005A5600002A9F0000000000005B14, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=1], queueOffset=5] SendResult [sendStatus=SEND_OK, msgId=AC6E004E14E418B4AAC28D8648490009, offsetMsgId=0A005A5600002A9F0000000000005BE5, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=5]
(2)、消費(fèi)者訂閱消息
主要是通過mqPushConsumer.subscribe("FilterMessageTopic", "TagA || TagC || TagD") 指定需要訂閱的Tag,如果訂閱所有Tag的話,則傳入*即可。
public class MQConsumer {
public static void main(String[] args) throws MQClientException {
// 創(chuàng)建DefaultMQPushConsumer類并設(shè)定消費(fèi)者名稱
DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
// 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開
mqPushConsumer.setNamesrvAddr("10.0.90.86:9876");
// 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)
// 如果不是第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 設(shè)置消費(fèi)模型,集群還是廣播,默認(rèn)為集群
mqPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 消費(fèi)者最小線程量
mqPushConsumer.setConsumeThreadMin(5);
// 消費(fèi)者最大線程量
mqPushConsumer.setConsumeThreadMax(10);
// 設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)是1
mqPushConsumer.setConsumeMessageBatchMaxSize(1);
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費(fèi)的消息,如果訂閱該主題下的所有tag,則使用*
// 本例中,只訂閱Tag為: TagA 、 TagC 、 TagD的消息
mqPushConsumer.subscribe("FilterMessageTopic", "TagA || TagC || TagD");
// 注冊回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息
mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
// 監(jiān)聽類實(shí)現(xiàn)MessageListenerConcurrently接口即可,重寫consumeMessage方法接收數(shù)據(jù)
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = msgList.get(0);
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
System.out.println("消費(fèi)者接收到消息: " + messageExt.toString() + "---消息內(nèi)容為:" + body);
// 標(biāo)記該消息已經(jīng)被成功消費(fèi)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費(fèi)者實(shí)例
mqPushConsumer.start();
}
}如下,可看到消費(fèi)者端接收到6條消息。
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=209, queueOffset=2, sysFlag=0, bornTimestamp=1646019187746, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187082, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F00000000000056FF, commitLogOffset=22271, bodyCRC=1188153005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=4, CONSUME_START_TIME=1646019218458, UNIQ_KEY=AC6E004E14E418B4AAC28D8648220003, CLUSTER=DefaultCluster, TAGS=TagD}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 52, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第4條消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=209, queueOffset=3, sysFlag=0, bornTimestamp=1646019187773, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187109, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F0000000000005A43, commitLogOffset=23107, bodyCRC=1559045667, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=4, CONSUME_START_TIME=1646019218458, UNIQ_KEY=AC6E004E14E418B4AAC28D86483D0007, CLUSTER=DefaultCluster, TAGS=TagC}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 56, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第8條消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=209, queueOffset=5, sysFlag=0, bornTimestamp=1646019187779, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187115, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F0000000000005B14, commitLogOffset=23316, bodyCRC=858737949, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1646019219467, UNIQ_KEY=AC6E004E14E418B4AAC28D8648430008, CLUSTER=DefaultCluster, TAGS=TagD}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 57, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第9條消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=209, queueOffset=3, sysFlag=0, bornTimestamp=1646019187715, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187057, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F000000000000548C, commitLogOffset=21644, bodyCRC=553127401, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1646019219468, UNIQ_KEY=AC6E004E14E418B4AAC28D8648020000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 49, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第1條消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=209, queueOffset=3, sysFlag=0, bornTimestamp=1646019187741, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187077, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F000000000000562E, commitLogOffset=22062, bodyCRC=604888532, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=5, CONSUME_START_TIME=1646019219472, UNIQ_KEY=AC6E004E14E418B4AAC28D86481D0002, CLUSTER=DefaultCluster, TAGS=TagC}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 51, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第3條消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=209, queueOffset=4, sysFlag=0, bornTimestamp=1646019187760, bornHost=/10.0.90.115:55652, storeTimestamp=1646019187097, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F00000000000058A1, commitLogOffset=22689, bodyCRC=1109661328, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1646019219473, UNIQ_KEY=AC6E004E14E418B4AAC28D8648300005, CLUSTER=DefaultCluster, TAGS=TagA}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 54, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第6條消息
具體分析如下:
// 消息發(fā)送時(shí)總共5個Tag
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
// Tag計(jì)算方法
tag = i % tags.length = i % 5- 第1條消息,i = 0,消息的標(biāo)簽tag = tags[i % tags.length] = tags[0] = TagA;
- 第2條消息,i = 1,消息的標(biāo)簽tag = tags[i % tags.length] = tags[1] = TagB;
- 第3條消息,i = 2,消息的標(biāo)簽tag = tags[i % tags.length] = tags[2] = TagC;
- 第4條消息,i = 3,消息的標(biāo)簽tag = tags[i % tags.length] = tags[3] = TagD;
- 第5條消息,i = 4,消息的標(biāo)簽tag = tags[i % tags.length] = tags[4] = TagE;
- 第6條消息,i = 5,消息的標(biāo)簽tag = tags[i % tags.length] = tags[0] = TagA;
- 第7條消息,i = 6,消息的標(biāo)簽tag = tags[i % tags.length] = tags[1] = TagB;
- 第8條消息,i = 7,消息的標(biāo)簽tag = tags[i % tags.length] = tags[2] = TagC;
- 第9條消息,i = 8,消息的標(biāo)簽tag = tags[i % tags.length] = tags[3] = TagD;
- 第10條消息,i = 9,消息的標(biāo)簽tag = tags[i % tags.length] = tags[4] = TagE;
因?yàn)橄M(fèi)者端只訂閱了 TagA 、 TagC 、 TagD的消息,所以對應(yīng)上面的,消費(fèi)者端只會收到六條消息,即第1、3、4、6、8、9條消息。
三、根據(jù)自定義屬性進(jìn)行過濾 (SQL過濾)
通過Tag過濾消息可以很方便地選擇您想要的消息,但是對于比較復(fù)雜的場合,使用Tag過濾的話可能不太滿足條件。在這種情況下,可以使用SQL表達(dá)式篩選消息。SQL特性可以通過發(fā)送消息時(shí)的屬性來進(jìn)行計(jì)算。
RocketMQ只定義了一些基本語法來支持這個特性。
- 數(shù)值比較,比如:>,>=,<,<=,BETWEEN,=;
- 字符比較,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 邏輯符號 AND,OR,NOT;
常量支持類型為:
- 數(shù)值,比如:123,3.1415;
- 字符,比如:'abc',必須用單引號包裹起來;
- NULL,特殊的常量
- 布爾值,TRUE 或 FALSE
注意,只有使用push推送模式的消費(fèi)者才能用使用SQL92標(biāo)準(zhǔn)的sql語句,pull拉取模式的消費(fèi)者是不支持這個功能的。
下面我們通過一個示例演示:
(1)、生產(chǎn)者發(fā)送消息
生產(chǎn)者發(fā)送消息時(shí),通過putUserProperty來設(shè)置消息的屬性,實(shí)際上就是通過一個Map將用戶自定義的屬性保存到消息的properties屬性中。
public class MQProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 創(chuàng)建DefaultMQProducer類并設(shè)定生產(chǎn)者名稱
DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
// 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開
mqProducer.setNamesrvAddr("10.0.90.86:9876");
// 消息最大長度 默認(rèn)4M
mqProducer.setMaxMessageSize(4096);
// 發(fā)送消息超時(shí)時(shí)間,默認(rèn)3000
mqProducer.setSendMsgTimeout(3000);
// 發(fā)送消息失敗重試次數(shù),默認(rèn)2
mqProducer.setRetryTimesWhenSendAsyncFailed(2);
// 啟動消息生產(chǎn)者
mqProducer.start();
for (int i = 0; i < 10; i++) {
String msg = "hello, 這是第" + (i + 1) + "條消息";
// 創(chuàng)建消息,并指定Topic(主題),Tag(標(biāo)簽)和消息內(nèi)容
Message message = new Message("FilterMessageTopic", "", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 設(shè)置用戶的一些自定義屬性,本質(zhì)上就是保存到一個Map中:private Map<String, String> properties
message.putUserProperty("num", String.valueOf(i));
message.putUserProperty("info", i % 2 == 0 ? "aaa" : "bbb");
// 發(fā)送同步消息到一個Broker,可以通過sendResult返回消息是否成功送達(dá)
SendResult sendResult = mqProducer.send(message);
System.out.println(sendResult);
}
// 如果不再發(fā)送消息,關(guān)閉Producer實(shí)例
mqProducer.shutdown();
}
}SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE709D90000, offsetMsgId=0A005A5600002A9F0000000000006E4E, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE709EF0001, offsetMsgId=0A005A5600002A9F0000000000006F24, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=3], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE709F50002, offsetMsgId=0A005A5600002A9F0000000000006FFA, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=0], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE709FB0003, offsetMsgId=0A005A5600002A9F00000000000070D0, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=1], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A020004, offsetMsgId=0A005A5600002A9F00000000000071A6, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A0C0005, offsetMsgId=0A005A5600002A9F000000000000727C, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=3], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A120006, offsetMsgId=0A005A5600002A9F0000000000007352, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=0], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A1C0007, offsetMsgId=0A005A5600002A9F0000000000007428, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=1], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A210008, offsetMsgId=0A005A5600002A9F00000000000074FE, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=2], queueOffset=13]
SendResult [sendStatus=SEND_OK, msgId=AC6E00924AE418B4AAC28DE70A270009, offsetMsgId=0A005A5600002A9F00000000000075D4, messageQueue=MessageQueue [topic=FilterMessageTopic, brokerName=broker-a, queueId=3], queueOffset=13]
(2)、消費(fèi)者消費(fèi)消息
消費(fèi)者端使用如下接口指定SQL過濾的語法:
public void subscribe(finalString topic, final MessageSelector messageSelector)
// 用MessageSelector.bySql來使用sql篩選消息
MessageSelector messageSelector = MessageSelector.bySql("xxxx");public class MQConsumer {
public static void main(String[] args) throws MQClientException {
// 創(chuàng)建DefaultMQPushConsumer類并設(shè)定消費(fèi)者名稱
DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
// 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開
mqPushConsumer.setNamesrvAddr("10.0.90.86:9876");
// 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)
// 如果不是第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 設(shè)置消費(fèi)模型,集群還是廣播,默認(rèn)為集群
mqPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 消費(fèi)者最小線程量
mqPushConsumer.setConsumeThreadMin(5);
// 消費(fèi)者最大線程量
mqPushConsumer.setConsumeThreadMax(10);
// 設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)是1
mqPushConsumer.setConsumeMessageBatchMaxSize(1);
// 用MessageSelector.bySql來使用sql篩選消息
mqPushConsumer.subscribe("FilterMessageTopic", MessageSelector.bySql("(num between 0 and 5 ) and (info = 'aaa')"));
// 注冊回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息
mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
// 監(jiān)聽類實(shí)現(xiàn)MessageListenerConcurrently接口即可,重寫consumeMessage方法接收數(shù)據(jù)
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = msgList.get(0);
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
System.out.println("消費(fèi)者接收到消息: " + messageExt.toString() + "---消息內(nèi)容為:" + body);
// 標(biāo)記該消息已經(jīng)被成功消費(fèi)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費(fèi)者實(shí)例
mqPushConsumer.start();
}
}我們直接運(yùn)行消費(fèi)者,發(fā)現(xiàn)啟動報(bào)錯了,如下:
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
這個錯誤是由于RocketMQ默認(rèn)是關(guān)閉了屬性過濾功能的,如果需要使用該功能,需要開啟enablePropertyFilter的屬性,將該屬性置為true才可以。也就是我們需要在RocketMQ的配置文件中添加如下配置:
// 開啟屬性過濾功能
enablePropertyFilter=true
重新啟動RocketMQ后,再次運(yùn)行消費(fèi)者,如下可看到,消費(fèi)者接收到三條消息:
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=214, queueOffset=11, sysFlag=0, bornTimestamp=1646025528795, bornHost=/10.0.90.115:59083, storeTimestamp=1646025527395, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F0000000000006E4E, commitLogOffset=28238, bodyCRC=553127401, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=14, num=0, CONSUME_START_TIME=1646025559201, UNIQ_KEY=AC6E00924AE418B4AAC28DE709D90000, CLUSTER=DefaultCluster, info=aaa}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 49, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第1條消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=214, queueOffset=9, sysFlag=0, bornTimestamp=1646025528821, bornHost=/10.0.90.115:59083, storeTimestamp=1646025527417, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F0000000000006FFA, commitLogOffset=28666, bodyCRC=604888532, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=11, num=2, CONSUME_START_TIME=1646025559201, UNIQ_KEY=AC6E00924AE418B4AAC28DE709F50002, CLUSTER=DefaultCluster, info=aaa}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 51, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第3條消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=214, queueOffset=12, sysFlag=0, bornTimestamp=1646025528834, bornHost=/10.0.90.115:59083, storeTimestamp=1646025527433, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F00000000000071A6, commitLogOffset=29094, bodyCRC=689155475, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='FilterMessageTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=14, num=4, CONSUME_START_TIME=1646025559201, UNIQ_KEY=AC6E00924AE418B4AAC28DE70A020004, CLUSTER=DefaultCluster, info=aaa}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 53, -26, -99, -95, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第5條消息
分析:
生產(chǎn)者發(fā)送消息的時(shí)候,添加了用戶自定義屬性num、info,通過上述控制臺輸出消息的properties屬性我們也可以看到。num的值其實(shí)就是0-9,info的值是偶數(shù)的時(shí)候?yàn)閍aa,奇數(shù)的時(shí)候?yàn)閎bb。消費(fèi)者通過MessageSelector.bySql("(num between 0 and 5 ) and (info = 'aaa')")指定的過濾條件是:num在[0,5]之間并且info的值為aaa。因此,同時(shí)滿足這兩個條件的就只有三條消息。
到此這篇關(guān)于RocketMQ根據(jù)Tag進(jìn)行消息過濾的文章就介紹到這了,更多相關(guān)RocketMQ消息過濾內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談Java中OutOfMemoryError問題產(chǎn)生原因
本文主要介紹了淺談Java中OutOfMemoryError問題產(chǎn)生原因,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06
Spring Boot快速實(shí)現(xiàn) IP地址解析的示例詳解
這篇文章主要介紹了Spring Boot快速實(shí)現(xiàn)IP地址解析,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-08-08
SpringBoot詳解MySQL如何實(shí)現(xiàn)讀寫分離
當(dāng)響應(yīng)的瓶頸在數(shù)據(jù)庫的時(shí)候,就要考慮數(shù)據(jù)庫的讀寫分離,當(dāng)然還可以分庫分表,那是單表數(shù)據(jù)量特別大,當(dāng)單表數(shù)據(jù)量不是特別大,但是請求量比較大的時(shí)候,就要考慮讀寫分離了.具體的話,還是要看自己的業(yè)務(wù)...如果還是很慢,那就要分庫分表了...我們這篇就簡單講一下讀寫分離2022-09-09
openEuler?搭建java開發(fā)環(huán)境的詳細(xì)過程
這篇文章主要介紹了openEuler?搭建java開發(fā)環(huán)境,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-06-06
Spring Boot前后端分離開發(fā)模式中的跨域問題及解決方法
本文介紹了解決Spring Boot前端Vue跨域問題的實(shí)戰(zhàn)經(jīng)驗(yàn),并提供了后端和前端的配置示例,通過配置后端和前端,我們可以輕松解決跨域問題,實(shí)現(xiàn)正常的前后端交互,需要的朋友可以參考下2023-09-09
IDEA的Web項(xiàng)目右鍵無法創(chuàng)建Servlet問題解決辦法
這篇文章主要介紹了IDEA的Web項(xiàng)目右鍵無法創(chuàng)建Servlet問題解決辦法的相關(guān)資料,在IDEA中新建Servlet時(shí)發(fā)現(xiàn)缺失選項(xiàng),可以通過在pom.xml文件中添加servlet依賴解決,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2024-10-10

