Spring?Boot?整合RocketMq實現(xiàn)消息過濾功能
簡介
消息過濾是指消費者一端在消費消息時,對消息進行選擇性過濾,只消費符合過濾條件的消息。 RocketMQ的消息過濾機制大致分為兩種:標簽過濾和類過濾。其中標簽過濾又分為Tag過濾和SQL92過濾。
根據(jù)TAG過濾消息
消息發(fā)送端只能設置一個tag,消息接收端可以設置多個tag。
生產(chǎn)者
public void sendTagMessage() { String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for(int i=0;i<10;i++) { String tag = tags[i % tags.length]; logger.info("sendTagMessage tag is :{}",tag); String msg = "hello, 這是第" + (i + 1) + "條消息"; org.springframework.messaging.Message<String> msg1 = MessageBuilder.withPayload(msg).build(); rocketMQTemplate.convertAndSend("test-tag-rocketmq" + ":" + tag, msg1); } }
說明:示例中循環(huán)發(fā)送了10條消息,每條消息設置了一個tag發(fā)送過濾消息的格式為:topic:tag的形式,注意發(fā)送端只能設定一個tag。
消費者
@Component @RocketMQMessageListener(consumerGroup="test-tagrocketmq-group",topic="test-tag-rocketmq",selectorExpression="TagA || TagC || TagD",selectorType=SelectorType.TAG, messageModel = MessageModel.CLUSTERING) public class TagConsumer implements RocketMQListener<Object> { private Logger logger =LoggerFactory.getLogger(getClass()); @Override public void onMessage(Object o) { String msg=JSON.toJSONString(o); logger.info("send TagA || TagC || TagD succss content is:{}", msg); } }
說明:
- selectorType:指定消息通過的tag的方式,默認為SelectorType.TAG
- messageModel:指定消息的消費模式,默認為MessageModel.CLUSTERING模式每條消息只能由一個消費者消費,而MessageModel.BROADCASTING模式為廣播模式,所有訂閱者都能消費。
- selectorExpression :指定那些Tag消息能夠被消費,多個采用||分割。
測試結(jié)果
從結(jié)果我可以看出第2條為TAGC、第7條為TAGC、第8條為TAGD,第3條為TAGD,第5條為TAGA,第0條為TAGA,而消費端監(jiān)聽的TAG為TAGA、TAGC、TAGD所以對于不符合條件的消息進行了過濾。
根據(jù)SQL表達式過濾消息
SQL表達式方式可以根據(jù)發(fā)送消息時輸入的屬性進行一些計算。
RocketMQ的SQL表達式語法 只定義了一些基本的語法功能。
- 數(shù)字比較,如>,>=,<,<=,BETWEEN,=;
- 字符比較,如:=,<>,IN;IS NULL or IS NOT NULL;
- 邏輯運算符:AND, OR, NOT;
- 常量類型:
- 數(shù)值,如:123, 3.1415;
- 字符, 如:‘abc’, 必須使用單引號;
- NULL,特殊常量
- Boolean, TRUE or FALSE;
生產(chǎn)者
public void sendSQLMessage() { String msg = "hello, 這是第1條消息"; org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(msg).build() ; Map<String, Object> headers = new HashMap<>() ; headers.put("i", 5) ; rocketMQTemplate.convertAndSend("test-sql-rocketmq", message, headers); }
說明:傳遞了參數(shù)為5進行條件判斷。
消費者
@Component @RocketMQMessageListener(consumerGroup="test-sqlrocketmq-group",topic="test-sql-rocketmq",selectorExpression = "i=5",selectorType=SelectorType.SQL92, messageModel = MessageModel.CLUSTERING) public class SQLConsumer implements RocketMQListener<MessageExt> { private Logger logger =LoggerFactory.getLogger(getClass()); @Override public void onMessage(MessageExt message) { String msg=new String(message.getBody()); String paramStr=JSON.toJSONString(message.getProperties()); //消息內(nèi)容 logger.info("send succss content is:{}", msg); //消息參數(shù) logger.info("send mssage parma is:{}", paramStr); } }
說明:
- selectorType:指定消息通過的tag的方式,默認為SelectorType.CLUSTERING
- messageModel:指定消息的消費模式,默認為MessageModel.CLUSTERING模式每條消息只能由一個消費者消費,而MessageModel.BROADCASTING模式為廣播模式,所有訂閱者都能消費。
- selectorExpression : 采用rocketMQ支持的表達式。例如i=5
啟動程序報錯The broker does not support consumer to filter message by SQL92
原因:默認情況下broke沒有開啟對SQL語法的支持,需要修改配置
1.打開rocketmq服務下的broke.conf文件,添加如下配置即可。
2.重啟broke服務即可.
測試結(jié)果
說明:只有滿足SQL條件能進行消費。
總結(jié)
本文講解了RocketMQ實現(xiàn)消息過濾,針對不同的業(yè)務場景選擇合適的方案即可,如果疑問,請隨時反饋,
到此這篇關(guān)于Spring Boot 整合RocketMq實現(xiàn)消息過濾的文章就介紹到這了,更多相關(guān)Spring Boot消息過濾內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis-plus動態(tài)條件查詢QueryWrapper的使用案例
mybatis-plus框架功能很強大,把很多功能都集成了,下面這篇文章主要給大家介紹了關(guān)于Mybatis-plus動態(tài)條件查詢QueryWrapper的使用教程,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2022-07-07Mybatis-plus中QueryWrapper的多種用法小結(jié)
本文主要介紹了Mybatis-plus中QueryWrapper的多種用法小結(jié),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-04-04java使用MulticastSocket實現(xiàn)組播
這篇文章主要為大家詳細介紹了java使用MulticastSocket實現(xiàn)組播,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-01-01Java調(diào)用HTTPS接口實現(xiàn)繞過SSL認證
SSL認證是確保通信安全的重要手段,有的時候為了方便調(diào)用,我們會繞過SSL認證,這篇文章主要介紹了Java如何調(diào)用HTTPS接口實現(xiàn)繞過SSL認證,需要的可以參考下2023-11-11關(guān)于JWT與cookie和token的區(qū)別說明
這篇文章主要介紹了JWT與cookie和token的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10