欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring?Boot?整合RocketMq實(shí)現(xiàn)消息過(guò)濾功能

 更新時(shí)間:2022年06月07日 14:29:43   作者:劍圣無(wú)痕  
這篇文章主要介紹了Spring?Boot?整合RocketMq實(shí)現(xiàn)消息過(guò)濾,本文講解了RocketMQ實(shí)現(xiàn)消息過(guò)濾,針對(duì)不同的業(yè)務(wù)場(chǎng)景選擇合適的方案即可,需要的朋友可以參考下

簡(jiǎn)介

消息過(guò)濾是指消費(fèi)者一端在消費(fèi)消息時(shí),對(duì)消息進(jìn)行選擇性過(guò)濾,只消費(fèi)符合過(guò)濾條件的消息。 RocketMQ的消息過(guò)濾機(jī)制大致分為兩種:標(biāo)簽過(guò)濾和類過(guò)濾。其中標(biāo)簽過(guò)濾又分為Tag過(guò)濾和SQL92過(guò)濾。

根據(jù)TAG過(guò)濾消息

消息發(fā)送端只能設(shè)置一個(gè)tag,消息接收端可以設(shè)置多個(gè)tag。

生產(chǎn)者

 public void sendTagMessage()
   {
       String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
       for(int i=0;i<10;i++)
       {
           String tag = tags[i % tags.length];
           logger.info("sendTagMessage tag is :{}",tag);
           String msg = "hello, 這是第" + (i + 1) + "條消息";
           org.springframework.messaging.Message<String> msg1 = MessageBuilder.withPayload(msg).build(); 
           rocketMQTemplate.convertAndSend("test-tag-rocketmq" + ":" + tag, msg1);
       }
   }

說(shuō)明:示例中循環(huán)發(fā)送了10條消息,每條消息設(shè)置了一個(gè)tag發(fā)送過(guò)濾消息的格式為:topic:tag的形式,注意發(fā)送端只能設(shè)定一個(gè)tag。

消費(fèi)者

@Component
@RocketMQMessageListener(consumerGroup="test-tagrocketmq-group",topic="test-tag-rocketmq",selectorExpression="TagA || TagC || TagD",selectorType=SelectorType.TAG, messageModel = MessageModel.CLUSTERING)
public class TagConsumer implements RocketMQListener<Object>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(Object o)
    {
        String msg=JSON.toJSONString(o);
        logger.info("send TagA || TagC || TagD  succss content is:{}", msg);
    }
}

說(shuō)明:

  • selectorType:指定消息通過(guò)的tag的方式,默認(rèn)為SelectorType.TAG
  • messageModel:指定消息的消費(fèi)模式,默認(rèn)為MessageModel.CLUSTERING模式每條消息只能由一個(gè)消費(fèi)者消費(fèi),而MessageModel.BROADCASTING模式為廣播模式,所有訂閱者都能消費(fèi)。
  • selectorExpression :指定那些Tag消息能夠被消費(fèi),多個(gè)采用||分割。

測(cè)試結(jié)果

從結(jié)果我可以看出第2條為TAGC、第7條為TAGC、第8條為TAGD,第3條為TAGD,第5條為TAGA,第0條為TAGA,而消費(fèi)端監(jiān)聽(tīng)的TAG為TAGA、TAGC、TAGD所以對(duì)于不符合條件的消息進(jìn)行了過(guò)濾。

根據(jù)SQL表達(dá)式過(guò)濾消息

SQL表達(dá)式方式可以根據(jù)發(fā)送消息時(shí)輸入的屬性進(jìn)行一些計(jì)算。

RocketMQ的SQL表達(dá)式語(yǔ)法 只定義了一些基本的語(yǔ)法功能。

  • 數(shù)字比較,如>,>=,<,<=,BETWEEN,=;
  • 字符比較,如:=,<>,IN;IS NULL or IS NOT NULL;
  • 邏輯運(yùn)算符:AND, OR, NOT;
  • 常量類型:
  • 數(shù)值,如:123, 3.1415;
  • 字符, 如:‘abc’, 必須使用單引號(hào);
  • NULL,特殊常量
  • Boolean, TRUE or FALSE;

生產(chǎn)者

   public void sendSQLMessage()
   {
       String msg = "hello, 這是第1條消息";
       org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(msg).build() ;
       Map<String, Object> headers = new HashMap<>() ;
       headers.put("i", 5) ;
       rocketMQTemplate.convertAndSend("test-sql-rocketmq", message, headers);
   }

說(shuō)明:傳遞了參數(shù)為5進(jìn)行條件判斷。

消費(fèi)者

@Component
@RocketMQMessageListener(consumerGroup="test-sqlrocketmq-group",topic="test-sql-rocketmq",selectorExpression = "i=5",selectorType=SelectorType.SQL92, messageModel = MessageModel.CLUSTERING)
public class SQLConsumer implements RocketMQListener<MessageExt>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(MessageExt message)
    {
        String msg=new String(message.getBody());
        String paramStr=JSON.toJSONString(message.getProperties());
        //消息內(nèi)容
        logger.info("send succss content is:{}", msg);
        //消息參數(shù)
        logger.info("send mssage parma is:{}", paramStr);
    }
}

說(shuō)明:

  • selectorType:指定消息通過(guò)的tag的方式,默認(rèn)為SelectorType.CLUSTERING
  • messageModel:指定消息的消費(fèi)模式,默認(rèn)為MessageModel.CLUSTERING模式每條消息只能由一個(gè)消費(fèi)者消費(fèi),而MessageModel.BROADCASTING模式為廣播模式,所有訂閱者都能消費(fèi)。
  • selectorExpression : 采用rocketMQ支持的表達(dá)式。例如i=5

啟動(dòng)程序報(bào)錯(cuò)The broker does not support consumer to filter message by SQL92

原因:默認(rèn)情況下broke沒(méi)有開啟對(duì)SQL語(yǔ)法的支持,需要修改配置

1.打開rocketmq服務(wù)下的broke.conf文件,添加如下配置即可。

2.重啟broke服務(wù)即可.

測(cè)試結(jié)果

說(shuō)明:只有滿足SQL條件能進(jìn)行消費(fèi)。

總結(jié)

本文講解了RocketMQ實(shí)現(xiàn)消息過(guò)濾,針對(duì)不同的業(yè)務(wù)場(chǎng)景選擇合適的方案即可,如果疑問(wèn),請(qǐng)隨時(shí)反饋,

到此這篇關(guān)于Spring Boot 整合RocketMq實(shí)現(xiàn)消息過(guò)濾的文章就介紹到這了,更多相關(guān)Spring Boot消息過(guò)濾內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Mybatis-plus動(dòng)態(tài)條件查詢QueryWrapper的使用案例

    Mybatis-plus動(dòng)態(tài)條件查詢QueryWrapper的使用案例

    mybatis-plus框架功能很強(qiáng)大,把很多功能都集成了,下面這篇文章主要給大家介紹了關(guān)于Mybatis-plus動(dòng)態(tài)條件查詢QueryWrapper的使用教程,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2022-07-07
  • Java線程間通訊的幾種方法小結(jié)

    Java線程間通訊的幾種方法小結(jié)

    線程通信可以用于控制并發(fā)線程的數(shù)量,本文主要介紹了Java線程間通訊的幾種方法小結(jié),文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2024-01-01
  • Mybatis-plus中QueryWrapper的多種用法小結(jié)

    Mybatis-plus中QueryWrapper的多種用法小結(jié)

    本文主要介紹了Mybatis-plus中QueryWrapper的多種用法小結(jié),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • java使用MulticastSocket實(shí)現(xiàn)組播

    java使用MulticastSocket實(shí)現(xiàn)組播

    這篇文章主要為大家詳細(xì)介紹了java使用MulticastSocket實(shí)現(xiàn)組播,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-01-01
  • 10個(gè)Java文件操作必備技巧分享

    10個(gè)Java文件操作必備技巧分享

    在我們?nèi)粘5拈_發(fā)中,文件操作是一個(gè)非常重要的主題。文件讀寫、文件復(fù)制、任意位置讀寫、緩存等技巧都是我們必須要掌握的。本文為大家整理了10個(gè)實(shí)用的文件操作技巧,希望對(duì)大家有所幫助
    2023-04-04
  • 一起來(lái)學(xué)習(xí)JAVA的運(yùn)算符

    一起來(lái)學(xué)習(xí)JAVA的運(yùn)算符

    這篇文章主要為大家詳細(xì)介紹了JAVA的運(yùn)算符,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2022-03-03
  • Java調(diào)用HTTPS接口實(shí)現(xiàn)繞過(guò)SSL認(rèn)證

    Java調(diào)用HTTPS接口實(shí)現(xiàn)繞過(guò)SSL認(rèn)證

    SSL認(rèn)證是確保通信安全的重要手段,有的時(shí)候?yàn)榱朔奖阏{(diào)用,我們會(huì)繞過(guò)SSL認(rèn)證,這篇文章主要介紹了Java如何調(diào)用HTTPS接口實(shí)現(xiàn)繞過(guò)SSL認(rèn)證,需要的可以參考下
    2023-11-11
  • Java中UUID生成原理及優(yōu)缺點(diǎn)

    Java中UUID生成原理及優(yōu)缺點(diǎn)

    本文將詳細(xì)講解UUID的生成原理、特性、實(shí)用場(chǎng)景以及優(yōu)缺點(diǎn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • 關(guān)于JWT與cookie和token的區(qū)別說(shuō)明

    關(guān)于JWT與cookie和token的區(qū)別說(shuō)明

    這篇文章主要介紹了JWT與cookie和token的區(qū)別說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • SpringMVC 單文件,多文件上傳實(shí)現(xiàn)詳解

    SpringMVC 單文件,多文件上傳實(shí)現(xiàn)詳解

    這篇文章主要介紹了SpringMVC 單文件,多文件上傳實(shí)現(xiàn)詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-09-09

最新評(píng)論