SpringBoot整合RocketMQ的詳細(xì)過(guò)程
今天我們來(lái)討論如何在項(xiàng)目開(kāi)發(fā)中優(yōu)雅地使用RocketMQ。本文分為三部分,第一部分實(shí)現(xiàn)SpringBoot與RocketMQ的整合,第二部分解決在使用RocketMQ過(guò)程中可能遇到的一些問(wèn)題并解決他們,第三部分介紹如何封裝RocketMQ以便更好地使用。
1. SpringBoot整合RocketMQ
在SpringBoot中集成RocketMQ,只需要簡(jiǎn)單四步:
1.引入相關(guān)依賴(lài)
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> </dependency>
2.添加RocketMQ的相關(guān)配置
rocketmq:
consumer:
group: springboot_consumer_group
# 一次拉取消息最大值,注意是拉取消息的最大值而非消費(fèi)最大值
pull-batch-size: 10
name-server: 10.5.103.6:9876
producer:
# 發(fā)送同一類(lèi)消息的設(shè)置為同一個(gè)group,保證唯一
group: springboot_producer_group
# 發(fā)送消息超時(shí)時(shí)間,默認(rèn)3000
sendMessageTimeout: 10000
# 發(fā)送消息失敗重試次數(shù),默認(rèn)2
retryTimesWhenSendFailed: 2
# 異步消息重試此處,默認(rèn)2
retryTimesWhenSendAsyncFailed: 2
# 消息最大長(zhǎng)度,默認(rèn)1024 * 1024 * 4(默認(rèn)4M)
maxMessageSize: 4096
# 壓縮消息閾值,默認(rèn)4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在內(nèi)部發(fā)送失敗時(shí)重試另一個(gè)broker,默認(rèn)false
retryNextServer: false3.使用提供的模板工具類(lèi)RocketMQTemplate發(fā)送消息
@RestController
public class NormalProduceController {
@Setter(onMethod_ = @Autowired)
private RocketMQTemplate rocketmqTemplate;
@GetMapping("/test")
public SendResult test() {
Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
SendResult sendResult = rocketmqTemplate.send(topic, msg);
}
}4.實(shí)現(xiàn)RocketMQListener接口消費(fèi)消息
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name")
public class MyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 處理消息的邏輯
System.out.println("Received message: " + message);
}
}以上4步即可實(shí)現(xiàn)SpringBoot與RocketMQ的整合,這部分屬于基礎(chǔ)知識(shí),不做過(guò)多說(shuō)明。
2 使用RocketMQ會(huì)遇到的問(wèn)題
以下是一些在SpringBoot中使用RocketMQ時(shí)常遇到的問(wèn)題,現(xiàn)在為您逐一解決。
2.1 WARN No appenders could be found for logger
啟動(dòng)項(xiàng)目時(shí)會(huì)在日志中看到如下告警
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
此時(shí)我們只需要在啟動(dòng)類(lèi)中設(shè)置環(huán)境變量 rocketmq.client.logUseSlf4j 為 true 明確指定RocketMQ的日志框架
@SpringBootApplication
public class RocketDemoApplication {
public static void main(String[] args) {
/*
* 指定使用的日志框架,否則將會(huì)告警
* RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
* RocketMQLog:WARN Please initialize the logger system properly.
*/
System.setProperty("rocketmq.client.logUseSlf4j", "true");
SpringApplication.run(RocketDemoApplication.class, args);
}
}同時(shí)還得在配置文件中調(diào)整日志級(jí)別,不然在控制臺(tái)會(huì)一直看到broker的日志信息
logging:
level:
RocketmqClient: ERROR
io:
netty: ERROR
2.2 不支持LocalDate 和 LocalDateTime
在使用Java8后經(jīng)常會(huì)使用LocalDate/LocalDateTime這兩個(gè)時(shí)間類(lèi)型字段,然而RocketMQ原始配置并不支持Java時(shí)間類(lèi)型,當(dāng)我們發(fā)送的實(shí)體消息中包含上述兩個(gè)字段時(shí),消費(fèi)端在消費(fèi)時(shí)會(huì)出現(xiàn)如下所示的錯(cuò)誤。
比如生產(chǎn)者的代碼如下:
@GetMapping("/test")
public void test(){
//普通消息無(wú)返回值,只負(fù)責(zé)發(fā)送消息?不等待服務(wù)器回應(yīng)且沒(méi)有回調(diào)函數(shù)觸發(fā)。
RocketMessage rocketMessage = RocketMessage.builder().
id(1111L).
message("hello,world")
.localDate(LocalDate.now())
.localDateTime(LocalDateTime.now())
.build();
rocketmqTemplate.convertAndSend(destination,rocketMessage);
}消費(fèi)者的代碼如下:
@Component
@RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "consumer_topic")
public class RocketMQConsumer implements RocketMQListener<RocketMessage> {
@Override
public void onMessage(RocketMessage message) {
System.out.println("消費(fèi)消息-" + message);
}
}消費(fèi)者開(kāi)始消費(fèi)時(shí)會(huì)出現(xiàn)類(lèi)型轉(zhuǎn)換異常錯(cuò)誤Cannot construct instance of java.time.LocalDate,錯(cuò)誤詳情如下:

原因:RocketMQ內(nèi)置使用的轉(zhuǎn)換器是RocketMQMessageConverter,轉(zhuǎn)換Json時(shí)使用的是MappingJackson2MessageConverter,但是這個(gè)轉(zhuǎn)換器不支持時(shí)間類(lèi)型。
解決辦法:需要自定義消息轉(zhuǎn)換器,將MappingJackson2MessageConverter進(jìn)行替換,并添加支持時(shí)間模塊
@Configuration
public class RocketMQEnhanceConfig {
/**
* 解決RocketMQ Jackson不支持Java時(shí)間類(lèi)型配置
* 源碼參考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
*/
@Bean
@Primary
public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
RocketMQMessageConverter converter = new RocketMQMessageConverter();
CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
for (MessageConverter messageConverter : messageConverterList) {
if(messageConverter instanceof MappingJackson2MessageConverter){
MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
objectMapper.registerModules(new JavaTimeModule());
}
}
return converter;
}
}2.3 RockeMQ環(huán)境隔離
在使用RocketMQ時(shí),通常會(huì)在代碼中直接指定消息主題(topic),而且開(kāi)發(fā)環(huán)境和測(cè)試環(huán)境可能共用一個(gè)RocketMQ環(huán)境。如果沒(méi)有進(jìn)行處理,在開(kāi)發(fā)環(huán)境發(fā)送的消息就可能被測(cè)試環(huán)境的消費(fèi)者消費(fèi),測(cè)試環(huán)境發(fā)送的消息也可能被開(kāi)發(fā)環(huán)境的消費(fèi)者消費(fèi),從而導(dǎo)致數(shù)據(jù)混亂的問(wèn)題。
為了解決這個(gè)問(wèn)題,我們可以根據(jù)不同的環(huán)境實(shí)現(xiàn)自動(dòng)隔離。通過(guò)簡(jiǎn)單配置一個(gè)選項(xiàng),如dev、test、prod等不同環(huán)境,所有的消息都會(huì)被自動(dòng)隔離。例如,當(dāng)發(fā)送的消息主題為consumer_topic時(shí),可以自動(dòng)在topic后面加上環(huán)境后綴,如consumer_topic_dev。
那么,我們?cè)撊绾螌?shí)現(xiàn)呢?
可以編寫(xiě)一個(gè)配置類(lèi)實(shí)現(xiàn)BeanPostProcessor,并重寫(xiě)postProcessBeforeInitialization方法,在監(jiān)聽(tīng)器實(shí)例初始化前修改對(duì)應(yīng)的topic。
- BeanPostProcessor是Spring框架中的一個(gè)接口,它的作用是在Spring容器實(shí)例化、配置完bean之后,在bean初始化前后進(jìn)行一些額外的處理工作。
- 具體來(lái)說(shuō),BeanPostProcessor接口定義了兩個(gè)方法:
- postProcessBeforeInitialization(Object bean, String beanName): 在bean初始化之前進(jìn)行處理,可以對(duì)bean做一些修改等操作。
- postProcessAfterInitialization(Object bean, String beanName): 在bean初始化之后進(jìn)行處理,可以進(jìn)行一些清理或者其他操作。
- BeanPostProcessor可以在應(yīng)用程序中對(duì)Bean的創(chuàng)建和初始化過(guò)程進(jìn)行攔截和修改,對(duì)Bean的生命周期進(jìn)行干預(yù)和操作。它可以對(duì)所有的Bean類(lèi)實(shí)例進(jìn)行增強(qiáng)處理,使得開(kāi)發(fā)人員可以在Bean初始化前后自定義一些操作,從而實(shí)現(xiàn)自己的業(yè)務(wù)需求。比如,可以通過(guò)BeanPostProcessor來(lái)實(shí)現(xiàn)注入某些必要的屬性值、加入某一個(gè)對(duì)象等等。
實(shí)現(xiàn)方案如下:
1.在配置文件中增加相關(guān)配置
rocketmq: enhance: # 啟動(dòng)隔離,用于激活配置類(lèi)EnvironmentIsolationConfig # 啟動(dòng)后會(huì)自動(dòng)在topic上拼接激活的配置文件,達(dá)到自動(dòng)隔離的效果 enabledIsolation: true # 隔離環(huán)境名稱(chēng),拼接到topic后,topic_dev,默認(rèn)空字符串 environment: dev
2.新增配置類(lèi),在實(shí)例化消息監(jiān)聽(tīng)者之前把topic修改掉
@Configuration
public class EnvironmentIsolationConfig implements BeanPostProcessor {
@Value("${rocketmq.enhance.enabledIsolation:true}")
private boolean enabledIsolation;
@Value("${rocketmq.enhance.environment:''}")
private String environmentName;
/**
* 在裝載Bean之前實(shí)現(xiàn)參數(shù)修改
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof DefaultRocketMQListenerContainer){
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
//拼接Topic
if(enabledIsolation && StringUtils.hasText(environmentName)){
container.setTopic(String.join("_", container.getTopic(),environmentName));
}
return container;
}
return bean;
}
}啟動(dòng)項(xiàng)目可以看到日志中消息監(jiān)聽(tīng)的隊(duì)列已經(jīng)被修改了
2023-03-23 17:04:59.726 [main] INFO o.a.r.s.support.DefaultRocketMQListenerContainer:290 - running container: DefaultRocketMQListenerContainer{consumerGroup='springboot_consumer_group', nameServer='10.5.103.6:9876', topic='consumer_topic_dev', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}
3. RocketMQ二次封裝
在解釋為什么要二次封裝之前先來(lái)看看RocketMQ官方文檔中推薦的最佳實(shí)踐
- 消息發(fā)送成功或者失敗要打印消息日志,用于業(yè)務(wù)排查問(wèn)題。
- 如果消息量較少,建議在消費(fèi)入口方法打印消息,消費(fèi)耗時(shí)等,方便后續(xù)排查問(wèn)題。
- RocketMQ 無(wú)法避免消息重復(fù)(Exactly-Once),所以如果業(yè)務(wù)對(duì)消費(fèi)重復(fù)非常敏感,務(wù)必要在業(yè)務(wù)層面進(jìn)行去重處理??梢越柚P(guān)系數(shù)據(jù)庫(kù)進(jìn)行去重。首先需要確定消息的唯一鍵,可以是msgId,也可以是消息內(nèi)容中的唯一標(biāo)識(shí)字段,例如訂單Id等。
上面三個(gè)步驟基本每次發(fā)送消息或者消費(fèi)消息都要實(shí)現(xiàn),屬于重復(fù)動(dòng)作。
接下來(lái)討論的是在RocketMQ中發(fā)送消息時(shí)選擇何種消息類(lèi)型最為合適。
在RocketMQ中有四種可選格式:
- 發(fā)送Json對(duì)象
- 發(fā)送轉(zhuǎn)Json后的String對(duì)象
- 根據(jù)業(yè)務(wù)封裝對(duì)應(yīng)實(shí)體類(lèi)
- 直接使用原生MessageExt接收。
對(duì)于如何選擇消息類(lèi)型,需要考慮到消費(fèi)者在不查看消息發(fā)送者的情況下,如何獲取消息的含義。因此,在這種情況下,使用第三種方式即根據(jù)業(yè)務(wù)封裝對(duì)應(yīng)實(shí)體類(lèi)的方式最為合適,也是大多數(shù)開(kāi)發(fā)者在發(fā)送消息時(shí)的常用方式。
有了上面兩點(diǎn)結(jié)論以后我們來(lái)看看為什么要對(duì)RocketMQ二次封裝。
3.1 為什么要二次封裝
按照上述最佳實(shí)踐,一個(gè)完整的消息傳遞鏈路從生產(chǎn)到消費(fèi)應(yīng)包括 準(zhǔn)備消息、發(fā)送消息、記錄消息日志、處理發(fā)送失敗、記錄接收消息日志、處理業(yè)務(wù)邏輯、異常處理和異常重試 等步驟。
雖然使用原生RocketMQ可以完成這些動(dòng)作,但每個(gè)生產(chǎn)者和消費(fèi)者都需要編寫(xiě)大量重復(fù)的代碼來(lái)完成相同的任務(wù),這就是需要進(jìn)行二次封裝的原因。我們希望通過(guò)二次封裝,**生產(chǎn)者只需準(zhǔn)備好消息實(shí)體并調(diào)用封裝后的工具類(lèi)發(fā)送,而消費(fèi)者只需處理核心業(yè)務(wù)邏輯,其他公共邏輯會(huì)得到統(tǒng)一處理。 **
在二次封裝中,關(guān)鍵是找出框架在日常使用中所涵蓋的許多操作,以及區(qū)分哪些操作是可變的,哪些是不變的。以上述例子為例,實(shí)際上只有生產(chǎn)者的消息準(zhǔn)備和消費(fèi)者的業(yè)務(wù)處理是可變的操作,需要根據(jù)需求進(jìn)行處理,而其他步驟可以固定下來(lái)形成一個(gè)模板。
當(dāng)然,本文提到的二次封裝不是指對(duì)源代碼進(jìn)行封裝,而是針對(duì)工具的原始使用方式進(jìn)行的封裝??梢詫⑵渑cMybatis和Mybatis-plus區(qū)分開(kāi)來(lái)。這兩者都能完成任務(wù),只不過(guò)Mybatis-plus更為簡(jiǎn)單便捷。
3.2 實(shí)現(xiàn)二次封裝
實(shí)現(xiàn)二次封裝需要?jiǎng)?chuàng)建一個(gè)自定義的starter,這樣其他項(xiàng)目只需要依賴(lài)此starter即可使用封裝功能。同時(shí),在自定義starter中還需要解決文章第二部分中提到的一些問(wèn)題。
代碼結(jié)構(gòu)如下所示:

3.2.1 消息實(shí)體類(lèi)的封裝
/**
* 消息實(shí)體,所有消息都需要繼承此類(lèi)
* 公眾號(hào):JAVA日知錄
*/
@Data
public abstract class BaseMessage {
/**
* 業(yè)務(wù)鍵,用于RocketMQ控制臺(tái)查看消費(fèi)情況
*/
protected String key;
/**
* 發(fā)送消息來(lái)源,用于排查問(wèn)題
*/
protected String source = "";
/**
* 發(fā)送時(shí)間
*/
protected LocalDateTime sendTime = LocalDateTime.now();
/**
* 重試次數(shù),用于判斷重試次數(shù),超過(guò)重試次數(shù)發(fā)送異常警告
*/
protected Integer retryTimes = 0;
}后面所有發(fā)送的消息實(shí)體都需要繼承此實(shí)體類(lèi)。
3.2.2 消息發(fā)送工具類(lèi)的封裝
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class RocketMQEnhanceTemplate {
private final RocketMQTemplate template;
@Resource
private RocketEnhanceProperties rocketEnhanceProperties;
public RocketMQTemplate getTemplate() {
return template;
}
/**
* 根據(jù)系統(tǒng)上下文自動(dòng)構(gòu)建隔離后的topic
* 構(gòu)建目的地
*/
public String buildDestination(String topic, String tag) {
topic = reBuildTopic(topic);
return topic + ":" + tag;
}
/**
* 根據(jù)環(huán)境重新隔離topic
* @param topic 原始topic
*/
private String reBuildTopic(String topic) {
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
return topic +"_" + rocketEnhanceProperties.getEnvironment();
}
return topic;
}
/**
* 發(fā)送同步消息
*/
public <T extends BaseMessage> SendResult send(String topic, String tag, T message) {
// 注意分隔符
return send(buildDestination(topic,tag), message);
}
public <T extends BaseMessage> SendResult send(String destination, T message) {
// 設(shè)置業(yè)務(wù)鍵,此處根據(jù)公共的參數(shù)進(jìn)行處理
// 更多的其它基礎(chǔ)業(yè)務(wù)處理...
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage);
// 此處為了方便查看給日志轉(zhuǎn)了json,根據(jù)選擇選擇日志記錄方式,例如ELK采集
log.info("[{}]同步消息[{}]發(fā)送結(jié)果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
return sendResult;
}
/**
* 發(fā)送延遲消息
*/
public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
return send(buildDestination(topic,tag), message, delayLevel);
}
public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) {
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
log.info("[{}]延遲等級(jí)[{}]消息[{}]發(fā)送結(jié)果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
return sendResult;
}
}
這里封裝了一個(gè)消息發(fā)送類(lèi),實(shí)現(xiàn)了日志記錄以及自動(dòng)重建topic的功能(即生產(chǎn)者實(shí)現(xiàn)環(huán)境隔離),后面項(xiàng)目中只需要注入RocketMQEnhanceTemplate來(lái)實(shí)現(xiàn)消息的發(fā)送。
3.2.3 消費(fèi)者的封裝
@Slf4j
public abstract class EnhanceMessageHandler<T extends BaseMessage> {
/**
* 默認(rèn)重試次數(shù)
*/
private static final int MAX_RETRY_TIMES = 3;
/**
* 延時(shí)等級(jí)
*/
private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND;
@Resource
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
/**
* 消息處理
*
* @param message 待處理消息
* @throws Exception 消費(fèi)異常
*/
protected abstract void handleMessage(T message) throws Exception;
/**
* 超過(guò)重試次數(shù)消息,需要啟用isRetry
*
* @param message 待處理消息
*/
protected abstract void handleMaxRetriesExceeded(T message);
/**
* 是否需要根據(jù)業(yè)務(wù)規(guī)則過(guò)濾消息,去重邏輯可以在此處處理
* @param message 待處理消息
* @return true: 本次消息被過(guò)濾,false:不過(guò)濾
*/
protected boolean filter(T message) {
return false;
}
/**
* 是否異常時(shí)重復(fù)發(fā)送
*
* @return true: 消息重試,false:不重試
*/
protected abstract boolean isRetry();
/**
* 消費(fèi)異常時(shí)是否拋出異常
* 返回true,則由rocketmq機(jī)制自動(dòng)重試
* false:消費(fèi)異常(如果沒(méi)有開(kāi)啟重試則消息會(huì)被自動(dòng)ack)
*/
protected abstract boolean throwException();
/**
* 最大重試次數(shù)
*
* @return 最大重試次數(shù),默認(rèn)5次
*/
protected int getMaxRetryTimes() {
return MAX_RETRY_TIMES;
}
/**
* isRetry開(kāi)啟時(shí),重新入隊(duì)延遲時(shí)間
* @return -1:立即入隊(duì)重試
*/
protected int getDelayLevel() {
return DELAY_LEVEL;
}
/**
* 使用模板模式構(gòu)建消息消費(fèi)框架,可自由擴(kuò)展或刪減
*/
public void dispatchMessage(T message) {
// 基礎(chǔ)日志記錄被父類(lèi)處理了
log.info("消費(fèi)者收到消息[{}]", JSONObject.toJSON(message));
if (filter(message)) {
log.info("消息id{}不滿(mǎn)足消費(fèi)條件,已過(guò)濾。",message.getKey());
return;
}
// 超過(guò)最大重試次數(shù)時(shí)調(diào)用子類(lèi)方法處理
if (message.getRetryTimes() > getMaxRetryTimes()) {
handleMaxRetriesExceeded(message);
return;
}
try {
long now = System.currentTimeMillis();
handleMessage(message);
long costTime = System.currentTimeMillis() - now;
log.info("消息{}消費(fèi)成功,耗時(shí)[{}ms]", message.getKey(),costTime);
} catch (Exception e) {
log.error("消息{}消費(fèi)異常", message.getKey(),e);
// 是捕獲異常還是拋出,由子類(lèi)決定
if (throwException()) {
//拋出異常,由DefaultMessageListenerConcurrently類(lèi)處理
throw new RuntimeException(e);
}
//此時(shí)如果不開(kāi)啟重試機(jī)制,則默認(rèn)ACK了
if (isRetry()) {
handleRetry(message);
}
}
}
protected void handleRetry(T message) {
// 獲取子類(lèi)RocketMQMessageListener注解拿到topic和tag
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
if (annotation == null) {
return;
}
//重新構(gòu)建消息體
String messageSource = message.getSource();
if(!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource);
}
message.setRetryTimes(message.getRetryTimes() + 1);
SendResult sendResult;
try {
// 如果消息發(fā)送不成功,則再次重新發(fā)送,如果發(fā)送異常則拋出由MQ再次處理(異常時(shí)不走延遲消息)
sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel());
} catch (Exception ex) {
// 此處捕獲之后,相當(dāng)于此條消息被消息完成然后重新發(fā)送新的消息
//由生產(chǎn)者直接發(fā)送
throw new RuntimeException(ex);
}
// 發(fā)送失敗的處理就是不進(jìn)行ACK,由RocketMQ重試
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("重試消息發(fā)送失敗");
}
}
}
使用模版設(shè)計(jì)模式定義了消息消費(fèi)的骨架,實(shí)現(xiàn)了日志打印,異常處理,異常重試等公共邏輯,消息過(guò)濾(查重)、業(yè)務(wù)處理則交由子類(lèi)實(shí)現(xiàn)。
3.2.4 基礎(chǔ)配置類(lèi)
@Configuration
@EnableConfigurationProperties(RocketEnhanceProperties.class)
public class RocketMQEnhanceAutoConfiguration {
/**
* 注入增強(qiáng)的RocketMQEnhanceTemplate
*/
@Bean
public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){
return new RocketMQEnhanceTemplate(rocketMQTemplate);
}
/**
* 解決RocketMQ Jackson不支持Java時(shí)間類(lèi)型配置
* 源碼參考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
*/
@Bean
@Primary
public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
RocketMQMessageConverter converter = new RocketMQMessageConverter();
CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
for (MessageConverter messageConverter : messageConverterList) {
if(messageConverter instanceof MappingJackson2MessageConverter){
MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
objectMapper.registerModules(new JavaTimeModule());
}
}
return converter;
}
/**
* 環(huán)境隔離配置
*/
@Bean
@ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true")
public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){
return new EnvironmentIsolationConfig(rocketEnhanceProperties);
}
}
public class EnvironmentIsolationConfig implements BeanPostProcessor {
private RocketEnhanceProperties rocketEnhanceProperties;
public EnvironmentIsolationConfig(RocketEnhanceProperties rocketEnhanceProperties) {
this.rocketEnhanceProperties = rocketEnhanceProperties;
}
/**
* 在裝載Bean之前實(shí)現(xiàn)參數(shù)修改
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof DefaultRocketMQListenerContainer){
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment()));
}
return container;
}
return bean;
}
}
@ConfigurationProperties(prefix = "rocketmq.enhance")
@Data
public class RocketEnhanceProperties {
private boolean enabledIsolation;
private String environment;
}3.3 封裝后的使用
3.3.1 引入依賴(lài)
<dependency> <groupId>com.jianzh5</groupId> <artifactId>cloud-rocket-starter</artifactId> </dependency>
3.3.2 自定義配置
rocketmq:
...
enhance:
# 啟動(dòng)隔離,用于激活配置類(lèi)EnvironmentIsolationConfig
# 啟動(dòng)后會(huì)自動(dòng)在topic上拼接激活的配置文件,達(dá)到自動(dòng)隔離的效果
enabledIsolation: true
# 隔離環(huán)境名稱(chēng),拼接到topic后,topic_dev,默認(rèn)空字符串
environment: dev3.3.3 發(fā)送消息
@RestController
@RequestMapping("enhance")
@Slf4j
public class EnhanceProduceController {
//注入增強(qiáng)后的模板,可以自動(dòng)實(shí)現(xiàn)環(huán)境隔離,日志記錄
@Setter(onMethod_ = @Autowired)
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
private static final String topic = "rocket_enhance";
private static final String tag = "member";
/**
* 發(fā)送實(shí)體消息
*/
@GetMapping("/member")
public SendResult member() {
String key = UUID.randomUUID().toString();
MemberMessage message = new MemberMessage();
// 設(shè)置業(yè)務(wù)key
message.setKey(key);
// 設(shè)置消息來(lái)源,便于查詢(xún)
message.setSource("MEMBER");
// 業(yè)務(wù)消息內(nèi)容
message.setUserName("Java日知錄");
message.setBirthday(LocalDate.now());
return rocketMQEnhanceTemplate.send(topic, tag, message);
}
}
注意這里使用的是封裝后的模板工具類(lèi),一旦在配置文件中啟動(dòng)環(huán)境隔離,則生產(chǎn)者的消息也自動(dòng)發(fā)送到隔離后的topic中。
3.3.4 消費(fèi)者
@Slf4j
@Component
@RocketMQMessageListener(
consumerGroup = "enhance_consumer_group",
topic = "rocket_enhance",
selectorExpression = "*",
consumeThreadMax = 5 //默認(rèn)是64個(gè)線(xiàn)程并發(fā)消息,配置 consumeThreadMax 參數(shù)指定并發(fā)消費(fèi)線(xiàn)程數(shù),避免太大導(dǎo)致資源不夠
)
public class EnhanceMemberMessageListener extends EnhanceMessageHandler<MemberMessage> implements RocketMQListener<MemberMessage> {
@Override
protected void handleMessage(MemberMessage message) throws Exception {
// 此時(shí)這里才是最終的業(yè)務(wù)處理,代碼只需要處理資源類(lèi)關(guān)閉異常,其他的可以交給父類(lèi)重試
System.out.println("業(yè)務(wù)消息處理:"+message.getUserName());
}
@Override
protected void handleMaxRetriesExceeded(MemberMessage message) {
// 當(dāng)超過(guò)指定重試次數(shù)消息時(shí)此處方法會(huì)被調(diào)用
// 生產(chǎn)中可以進(jìn)行回退或其他業(yè)務(wù)操作
log.error("消息消費(fèi)失敗,請(qǐng)執(zhí)行后續(xù)處理");
}
/**
* 是否執(zhí)行重試機(jī)制
*/
@Override
protected boolean isRetry() {
return true;
}
@Override
protected boolean throwException() {
// 是否拋出異常,false搭配retry自行處理異常
return false;
}
@Override
protected boolean filter() {
// 消息過(guò)濾
return false;
}
/**
* 監(jiān)聽(tīng)消費(fèi)消息,不需要執(zhí)行業(yè)務(wù)處理,委派給父類(lèi)做基礎(chǔ)操作,父類(lèi)做完基礎(chǔ)操作后會(huì)調(diào)用子類(lèi)的實(shí)際處理類(lèi)型
*/
@Override
public void onMessage(MemberMessage memberMessage) {
super.dispatchMessage(memberMessage);
}
}
為了方便消費(fèi)者對(duì)RocketMQ中的消息進(jìn)行處理,我們可以使用EnhanceMessageHandler來(lái)進(jìn)行消息的處理和邏輯的處理。
消費(fèi)者實(shí)現(xiàn)了RocketMQListener的同時(shí),可以繼承EnhanceMessageHandler來(lái)進(jìn)行公共邏輯的處理,而核心業(yè)務(wù)邏輯需要自己實(shí)現(xiàn)handleMessage方法。 如果需要對(duì)消息進(jìn)行過(guò)濾或者去重的處理,則可以重寫(xiě)父類(lèi)的filter方法進(jìn)行實(shí)現(xiàn)。這樣可以更加方便地對(duì)消息進(jìn)行處理,減輕開(kāi)發(fā)者的工作量。
到此這篇關(guān)于SpringBoot整合RocketMQ的詳細(xì)過(guò)程的文章就介紹到這了,更多相關(guān)SpringBoot整合RocketMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解SpringCloud新一代網(wǎng)關(guān)Gateway
SpringCloud Gateway是Spring Cloud的一個(gè)全新項(xiàng)目,Spring 5.0+ Spring Boot 2.0和Project Reactor等技術(shù)開(kāi)發(fā)的網(wǎng)關(guān),它旨在為微服務(wù)架構(gòu)提供一種簡(jiǎn)單有效的統(tǒng)一的API路由管理方式2021-06-06
spring cloud如何修復(fù)zuul跨域配置異常的問(wèn)題
最近的開(kāi)發(fā)過(guò)程中,使用spring集成了spring-cloud-zuul,在配置zuul跨域的時(shí)候遇到了問(wèn)題,下面這篇文章主要給大家介紹了關(guān)于spring cloud如何修復(fù)zuul跨域配置異常的問(wèn)題,需要的朋友可以參考借鑒,下面來(lái)一起看看吧。2017-09-09
使用ShardingSphere-Proxy實(shí)現(xiàn)分表分庫(kù)
這篇文章介紹了使用ShardingSphere-Proxy實(shí)現(xiàn)分表分庫(kù)的方法,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-02-02
SpringMVC五大組件與執(zhí)行原理分析總結(jié)
這篇文章主要介紹了SpringMVC五大組件與執(zhí)行原理分析總結(jié),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧2023-01-01
永久解決 Intellij idea 報(bào)錯(cuò):Error :java 不支持發(fā)行版本5的問(wèn)題
這篇文章主要介紹了永久解決 Intellij idea 報(bào)錯(cuò):Error :java 不支持發(fā)行版本5的問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02
Java編程實(shí)現(xiàn)提取文章中關(guān)鍵字的方法
這篇文章主要介紹了Java編程實(shí)現(xiàn)提取文章中關(guān)鍵字的方法,較為詳細(xì)的分析了Java提取文章關(guān)鍵字的原理與具體實(shí)現(xiàn)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-11-11
java接口中的代理設(shè)計(jì)模式代碼時(shí)實(shí)踐
這篇文章主要介紹了java接口中的代理設(shè)計(jì)模式代碼時(shí)實(shí)踐,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-07-07

