Springboot詳解RocketMQ實現(xiàn)廣播消息流程
RocketMQ消息模式主要有兩種:廣播模式、集群模式(負載均衡模式)
廣播模式是每個消費者,都會消費消息;
負載均衡模式是每一個消費只會被某一個消費者消費一次;
我們業(yè)務上一般用的是負載均衡模式,當然一些特殊場景需要用到廣播模式,比如發(fā)送一個信息到郵箱,手機,站內(nèi)提示;
我們可以通過@RocketMQMessageListener的messageModel屬性值來設置,MessageModel.BROADCASTING是廣播模式,MessageModel.CLUSTERING是默認集群負載均衡模式
下面來介紹下 springboot+rockermq 整合實現(xiàn) 廣播消息
- 創(chuàng)建Springboot項目,添加rockermq 依賴
<!--rocketMq依賴-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>- 配置rocketmq
# 端口
server:
port: 8083# 配置 rocketmq
rocketmq:
name-server: 127.0.0.1:9876
#生產(chǎn)者
producer:
#生產(chǎn)者組名,規(guī)定在一個應用里面必須唯一
group: group1
#消息發(fā)送的超時時間 默認3000ms
send-message-timeout: 3000
#消息達到4096字節(jié)的時候,消息就會被壓縮。默認 4096
compress-message-body-threshold: 4096
#最大的消息限制,默認為128K
max-message-size: 4194304
#同步消息發(fā)送失敗重試次數(shù)
retry-times-when-send-failed: 3
#在內(nèi)部發(fā)送失敗時是否重試其他代理,這個參數(shù)在有多個broker時才生效
retry-next-server: true
#異步消息發(fā)送失敗重試的次數(shù)
retry-times-when-send-async-failed: 3
- 生產(chǎn)端:新建一個 controller 來做消息發(fā)送
生產(chǎn)端按正常發(fā)送邏輯發(fā)送消息即可
package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 廣播消息
* @author qzz
*/
@RestController
public class RocketMQBroadCOntroller {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 發(fā)送廣播消息
*/
@RequestMapping("/testBroadSend")
public void testSyncSend(){
//參數(shù)一:topic 如果想添加tag,可以使用"topic:tag"的寫法
//參數(shù)二:消息內(nèi)容
for(int i=0;i<10;i++){
rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
}
}
}- 創(chuàng)建兩個消費者來消費消息
我們先集群負載均衡測試,加上messageModel=MessageModel.CLUSTERING
消費者1:
package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* 廣播消息
* 配置RocketMQ監(jiān)聽
* MessageModel.CLUSTERING:集群模式
* MessageModel.BROADCASTING:廣播模式
* @author qzz
*/
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("集群模式 消費者1,消費消息:"+s);
}
}消費者2: 與消費者1在 同一個consumerGroup 和 topic
package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* 廣播消息
* 配置RocketMQ監(jiān)聽
* MessageModel.CLUSTERING:集群模式
* MessageModel.BROADCASTING:廣播模式
* @author qzz
*/
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("集群模式 消費者2,消費消息:"+s);
}
}- 啟動服務,測試 集群模式消費
集群模式測試: 兩個消費者平攤 消息

- 把上面兩個消費者的 messageModel 屬性值修改成 廣播模式
消費者1:
package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* 廣播消息
* 配置RocketMQ監(jiān)聽
* MessageModel.CLUSTERING:集群模式
* MessageModel.BROADCASTING:廣播模式
* @author qzz
*/
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("廣播消息1 廣播模式,消費消息:"+s);
}
}消費者2: 與消費者1在 同一個consumerGroup 和 topic
package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* 廣播消息
* 配置RocketMQ監(jiān)聽
* MessageModel.CLUSTERING:集群模式
* MessageModel.BROADCASTING:廣播模式
* @author qzz
*/
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("廣播消息2 廣播模式,消費消息:"+s);
}
}- 重啟服務,測試 廣播模式消費

廣播模式消費下,兩個消費者都消費到Topic的所有消息。
測試成功!
到此這篇關于Springboot詳解RocketMQ實現(xiàn)廣播消息流程的文章就介紹到這了,更多相關Springboot廣播消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
springboot整合JSR303校驗功能實現(xiàn)代碼
這篇文章主要介紹了springboot整合JSR303校驗功能實現(xiàn),JSR303校驗方法有統(tǒng)一校驗的需求,統(tǒng)一校驗實現(xiàn)以及分組校驗,本文結(jié)合實例代碼給大家介紹的非常詳細,需要的朋友可以參考下2023-01-01
Security框架:如何使用CorsFilter解決前端跨域請求問題
這篇文章主要介紹了Security框架:如何使用CorsFilter解決前端跨域請求問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11
Java使用easyExcel導出excel數(shù)據(jù)案例
這篇文章主要介紹了Java使用easyExcel導出excel數(shù)據(jù)案例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-10-10
mybatis mapper.xml 區(qū)間查詢條件詳解
這篇文章主要介紹了mybatis mapper.xml 區(qū)間查詢條件詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09

