欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot詳解RocketMQ實(shí)現(xiàn)廣播消息流程

 更新時(shí)間:2022年06月22日 11:35:21   作者:12程序猿  
RocketMQ作為一款純java、分布式、隊(duì)列模型的開源消息中間件,支持事務(wù)消息、順序消息、批量消息、定時(shí)消息、消息回溯等,本篇我們了解如何實(shí)現(xiàn)廣播消息

RocketMQ消息模式主要有兩種:廣播模式、集群模式(負(fù)載均衡模式)

廣播模式是每個(gè)消費(fèi)者,都會(huì)消費(fèi)消息;

負(fù)載均衡模式是每一個(gè)消費(fèi)只會(huì)被某一個(gè)消費(fèi)者消費(fèi)一次;

我們業(yè)務(wù)上一般用的是負(fù)載均衡模式,當(dāng)然一些特殊場(chǎng)景需要用到廣播模式,比如發(fā)送一個(gè)信息到郵箱,手機(jī),站內(nèi)提示;

我們可以通過@RocketMQMessageListenermessageModel屬性值來(lái)設(shè)置,MessageModel.BROADCASTING是廣播模式,MessageModel.CLUSTERING是默認(rèn)集群負(fù)載均衡模式

下面來(lái)介紹下 springboot+rockermq 整合實(shí)現(xiàn) 廣播消息

  • 創(chuàng)建Springboot項(xiàng)目,添加rockermq 依賴
<!--rocketMq依賴-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>
  • 配置rocketmq

# 端口
server:
  port: 8083

# 配置 rocketmq
rocketmq:
  name-server: 127.0.0.1:9876
  #生產(chǎn)者
  producer:
    #生產(chǎn)者組名,規(guī)定在一個(gè)應(yīng)用里面必須唯一
    group: group1
    #消息發(fā)送的超時(shí)時(shí)間 默認(rèn)3000ms
    send-message-timeout: 3000
    #消息達(dá)到4096字節(jié)的時(shí)候,消息就會(huì)被壓縮。默認(rèn) 4096
    compress-message-body-threshold: 4096
    #最大的消息限制,默認(rèn)為128K
    max-message-size: 4194304
    #同步消息發(fā)送失敗重試次數(shù)
    retry-times-when-send-failed: 3
    #在內(nèi)部發(fā)送失敗時(shí)是否重試其他代理,這個(gè)參數(shù)在有多個(gè)broker時(shí)才生效
    retry-next-server: true
    #異步消息發(fā)送失敗重試的次數(shù)
    retry-times-when-send-async-failed: 3

  • 生產(chǎn)端:新建一個(gè) controller 來(lái)做消息發(fā)送

生產(chǎn)端按正常發(fā)送邏輯發(fā)送消息即可

package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 廣播消息
 * @author qzz
 */
@RestController
public class RocketMQBroadCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 發(fā)送廣播消息
     */
    @RequestMapping("/testBroadSend")
    public void testSyncSend(){
        //參數(shù)一:topic   如果想添加tag,可以使用"topic:tag"的寫法
        //參數(shù)二:消息內(nèi)容
        for(int i=0;i<10;i++){
            rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
        }
    }
}
  • 創(chuàng)建兩個(gè)消費(fèi)者來(lái)消費(fèi)消息

我們先集群負(fù)載均衡測(cè)試,加上messageModel=MessageModel.CLUSTERING

消費(fèi)者1:

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 廣播消息
 * 配置RocketMQ監(jiān)聽
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:廣播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消費(fèi)者1,消費(fèi)消息:"+s);
    }
}

消費(fèi)者2: 與消費(fèi)者1在 同一個(gè)consumerGroup 和 topic

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 廣播消息
 * 配置RocketMQ監(jiān)聽
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:廣播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消費(fèi)者2,消費(fèi)消息:"+s);
    }
}
  • 啟動(dòng)服務(wù),測(cè)試 集群模式消費(fèi)

集群模式測(cè)試: 兩個(gè)消費(fèi)者平攤 消息

  • 把上面兩個(gè)消費(fèi)者的 messageModel 屬性值修改成 廣播模式

消費(fèi)者1:

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 廣播消息
 * 配置RocketMQ監(jiān)聽
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:廣播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("廣播消息1 廣播模式,消費(fèi)消息:"+s);
    }
}

消費(fèi)者2: 與消費(fèi)者1在 同一個(gè)consumerGroup 和 topic

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 廣播消息
 * 配置RocketMQ監(jiān)聽
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:廣播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("廣播消息2 廣播模式,消費(fèi)消息:"+s);
    }
}
  • 重啟服務(wù),測(cè)試 廣播模式消費(fèi)

廣播模式消費(fèi)下,兩個(gè)消費(fèi)者都消費(fèi)到Topic的所有消息。

測(cè)試成功!

到此這篇關(guān)于Springboot詳解RocketMQ實(shí)現(xiàn)廣播消息流程的文章就介紹到這了,更多相關(guān)Springboot廣播消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java中addMouseListener()方法的使用

    java中addMouseListener()方法的使用

    這篇文章主要介紹了java中addMouseListener()方法的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • springboot整合JSR303校驗(yàn)功能實(shí)現(xiàn)代碼

    springboot整合JSR303校驗(yàn)功能實(shí)現(xiàn)代碼

    這篇文章主要介紹了springboot整合JSR303校驗(yàn)功能實(shí)現(xiàn),JSR303校驗(yàn)方法有統(tǒng)一校驗(yàn)的需求,統(tǒng)一校驗(yàn)實(shí)現(xiàn)以及分組校驗(yàn),本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2023-01-01
  • Security框架:如何使用CorsFilter解決前端跨域請(qǐng)求問題

    Security框架:如何使用CorsFilter解決前端跨域請(qǐng)求問題

    這篇文章主要介紹了Security框架:如何使用CorsFilter解決前端跨域請(qǐng)求問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Java使用easyExcel導(dǎo)出excel數(shù)據(jù)案例

    Java使用easyExcel導(dǎo)出excel數(shù)據(jù)案例

    這篇文章主要介紹了Java使用easyExcel導(dǎo)出excel數(shù)據(jù)案例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • Spring實(shí)現(xiàn)文件上傳(示例代碼)

    Spring實(shí)現(xiàn)文件上傳(示例代碼)

    Spring可以繼承commons-fileupload插件來(lái)實(shí)現(xiàn)文件上傳的功能。分為前端JSP編寫和后臺(tái)Controller的編寫
    2013-10-10
  • mybatis mapper.xml 區(qū)間查詢條件詳解

    mybatis mapper.xml 區(qū)間查詢條件詳解

    這篇文章主要介紹了mybatis mapper.xml 區(qū)間查詢條件詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • java jdk動(dòng)態(tài)代理詳解

    java jdk動(dòng)態(tài)代理詳解

    動(dòng)態(tài)代理類的Class實(shí)例是怎么生成的呢,是通過ProxyGenerator類來(lái)生成動(dòng)態(tài)代理類的class字節(jié)流,把它載入方法區(qū)
    2013-09-09
  • springboot中.yml文件參數(shù)的讀取方式

    springboot中.yml文件參數(shù)的讀取方式

    這篇文章主要介紹了springboot中.yml文件參數(shù)的讀取方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • java迭代器中刪除元素的實(shí)例操作詳解

    java迭代器中刪除元素的實(shí)例操作詳解

    在本篇內(nèi)容里小編給各位分享了一篇關(guān)于java迭代器中刪除元素的實(shí)例操作詳解內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。
    2021-01-01
  • Java @Pointcut注解表達(dá)式案例詳解

    Java @Pointcut注解表達(dá)式案例詳解

    這篇文章主要介紹了Java @Pointcut注解表達(dá)式案例詳解,本篇文章通過簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-09-09

最新評(píng)論