欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring?Boot?整合RocketMq實現(xiàn)消息過濾功能

 更新時間:2022年06月07日 14:29:43   作者:劍圣無痕  
這篇文章主要介紹了Spring?Boot?整合RocketMq實現(xiàn)消息過濾,本文講解了RocketMQ實現(xiàn)消息過濾,針對不同的業(yè)務場景選擇合適的方案即可,需要的朋友可以參考下

簡介

消息過濾是指消費者一端在消費消息時,對消息進行選擇性過濾,只消費符合過濾條件的消息。 RocketMQ的消息過濾機制大致分為兩種:標簽過濾和類過濾。其中標簽過濾又分為Tag過濾和SQL92過濾。

根據(jù)TAG過濾消息

消息發(fā)送端只能設置一個tag,消息接收端可以設置多個tag。

生產(chǎn)者

 public void sendTagMessage()
   {
       String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
       for(int i=0;i<10;i++)
       {
           String tag = tags[i % tags.length];
           logger.info("sendTagMessage tag is :{}",tag);
           String msg = "hello, 這是第" + (i + 1) + "條消息";
           org.springframework.messaging.Message<String> msg1 = MessageBuilder.withPayload(msg).build(); 
           rocketMQTemplate.convertAndSend("test-tag-rocketmq" + ":" + tag, msg1);
       }
   }

說明:示例中循環(huán)發(fā)送了10條消息,每條消息設置了一個tag發(fā)送過濾消息的格式為:topic:tag的形式,注意發(fā)送端只能設定一個tag。

消費者

@Component
@RocketMQMessageListener(consumerGroup="test-tagrocketmq-group",topic="test-tag-rocketmq",selectorExpression="TagA || TagC || TagD",selectorType=SelectorType.TAG, messageModel = MessageModel.CLUSTERING)
public class TagConsumer implements RocketMQListener<Object>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(Object o)
    {
        String msg=JSON.toJSONString(o);
        logger.info("send TagA || TagC || TagD  succss content is:{}", msg);
    }
}

說明:

  • selectorType:指定消息通過的tag的方式,默認為SelectorType.TAG
  • messageModel:指定消息的消費模式,默認為MessageModel.CLUSTERING模式每條消息只能由一個消費者消費,而MessageModel.BROADCASTING模式為廣播模式,所有訂閱者都能消費。
  • selectorExpression :指定那些Tag消息能夠被消費,多個采用||分割。

測試結(jié)果

從結(jié)果我可以看出第2條為TAGC、第7條為TAGC、第8條為TAGD,第3條為TAGD,第5條為TAGA,第0條為TAGA,而消費端監(jiān)聽的TAG為TAGA、TAGC、TAGD所以對于不符合條件的消息進行了過濾。

根據(jù)SQL表達式過濾消息

SQL表達式方式可以根據(jù)發(fā)送消息時輸入的屬性進行一些計算。

RocketMQ的SQL表達式語法 只定義了一些基本的語法功能。

  • 數(shù)字比較,如>,>=,<,<=,BETWEEN,=;
  • 字符比較,如:=,<>,IN;IS NULL or IS NOT NULL;
  • 邏輯運算符:AND, OR, NOT;
  • 常量類型:
  • 數(shù)值,如:123, 3.1415;
  • 字符, 如:‘abc’, 必須使用單引號;
  • NULL,特殊常量
  • Boolean, TRUE or FALSE;

生產(chǎn)者

   public void sendSQLMessage()
   {
       String msg = "hello, 這是第1條消息";
       org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(msg).build() ;
       Map<String, Object> headers = new HashMap<>() ;
       headers.put("i", 5) ;
       rocketMQTemplate.convertAndSend("test-sql-rocketmq", message, headers);
   }

說明:傳遞了參數(shù)為5進行條件判斷。

消費者

@Component
@RocketMQMessageListener(consumerGroup="test-sqlrocketmq-group",topic="test-sql-rocketmq",selectorExpression = "i=5",selectorType=SelectorType.SQL92, messageModel = MessageModel.CLUSTERING)
public class SQLConsumer implements RocketMQListener<MessageExt>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(MessageExt message)
    {
        String msg=new String(message.getBody());
        String paramStr=JSON.toJSONString(message.getProperties());
        //消息內(nèi)容
        logger.info("send succss content is:{}", msg);
        //消息參數(shù)
        logger.info("send mssage parma is:{}", paramStr);
    }
}

說明:

  • selectorType:指定消息通過的tag的方式,默認為SelectorType.CLUSTERING
  • messageModel:指定消息的消費模式,默認為MessageModel.CLUSTERING模式每條消息只能由一個消費者消費,而MessageModel.BROADCASTING模式為廣播模式,所有訂閱者都能消費。
  • selectorExpression : 采用rocketMQ支持的表達式。例如i=5

啟動程序報錯The broker does not support consumer to filter message by SQL92

原因:默認情況下broke沒有開啟對SQL語法的支持,需要修改配置

1.打開rocketmq服務下的broke.conf文件,添加如下配置即可。

2.重啟broke服務即可.

測試結(jié)果

說明:只有滿足SQL條件能進行消費。

總結(jié)

本文講解了RocketMQ實現(xiàn)消息過濾,針對不同的業(yè)務場景選擇合適的方案即可,如果疑問,請隨時反饋,

到此這篇關(guān)于Spring Boot 整合RocketMq實現(xiàn)消息過濾的文章就介紹到這了,更多相關(guān)Spring Boot消息過濾內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Mybatis-plus動態(tài)條件查詢QueryWrapper的使用案例

    Mybatis-plus動態(tài)條件查詢QueryWrapper的使用案例

    mybatis-plus框架功能很強大,把很多功能都集成了,下面這篇文章主要給大家介紹了關(guān)于Mybatis-plus動態(tài)條件查詢QueryWrapper的使用教程,文中通過圖文介紹的非常詳細,需要的朋友可以參考下
    2022-07-07
  • Java線程間通訊的幾種方法小結(jié)

    Java線程間通訊的幾種方法小結(jié)

    線程通信可以用于控制并發(fā)線程的數(shù)量,本文主要介紹了Java線程間通訊的幾種方法小結(jié),文中通過示例代碼介紹的非常詳細,需要的朋友們下面隨著小編來一起學習學習吧
    2024-01-01
  • Mybatis-plus中QueryWrapper的多種用法小結(jié)

    Mybatis-plus中QueryWrapper的多種用法小結(jié)

    本文主要介紹了Mybatis-plus中QueryWrapper的多種用法小結(jié),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-04-04
  • java使用MulticastSocket實現(xiàn)組播

    java使用MulticastSocket實現(xiàn)組播

    這篇文章主要為大家詳細介紹了java使用MulticastSocket實現(xiàn)組播,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-01-01
  • 10個Java文件操作必備技巧分享

    10個Java文件操作必備技巧分享

    在我們?nèi)粘5拈_發(fā)中,文件操作是一個非常重要的主題。文件讀寫、文件復制、任意位置讀寫、緩存等技巧都是我們必須要掌握的。本文為大家整理了10個實用的文件操作技巧,希望對大家有所幫助
    2023-04-04
  • 一起來學習JAVA的運算符

    一起來學習JAVA的運算符

    這篇文章主要為大家詳細介紹了JAVA的運算符,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-03-03
  • Java調(diào)用HTTPS接口實現(xiàn)繞過SSL認證

    Java調(diào)用HTTPS接口實現(xiàn)繞過SSL認證

    SSL認證是確保通信安全的重要手段,有的時候為了方便調(diào)用,我們會繞過SSL認證,這篇文章主要介紹了Java如何調(diào)用HTTPS接口實現(xiàn)繞過SSL認證,需要的可以參考下
    2023-11-11
  • Java中UUID生成原理及優(yōu)缺點

    Java中UUID生成原理及優(yōu)缺點

    本文將詳細講解UUID的生成原理、特性、實用場景以及優(yōu)缺點,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-06-06
  • 關(guān)于JWT與cookie和token的區(qū)別說明

    關(guān)于JWT與cookie和token的區(qū)別說明

    這篇文章主要介紹了JWT與cookie和token的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • SpringMVC 單文件,多文件上傳實現(xiàn)詳解

    SpringMVC 單文件,多文件上傳實現(xiàn)詳解

    這篇文章主要介紹了SpringMVC 單文件,多文件上傳實現(xiàn)詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-09-09

最新評論