解決SpringBoot整合RocketMQ遇到的坑
應(yīng)用場景
在實(shí)現(xiàn)RocketMQ消費(fèi)時(shí),一般會用到@RocketMQMessageListener注解定義Group、Topic以及selectorExpression(數(shù)據(jù)過濾、選擇的規(guī)則)為了能支持動態(tài)篩選數(shù)據(jù),一般都會使用表達(dá)式,然后通過apollo或者cloud config進(jìn)行動態(tài)切換。
引入依賴
<!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
消費(fèi)者代碼
@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消費(fèi)到的數(shù)據(jù)為:"+s);
}
}
問題排查
RocketMQMessageListener整個(gè)注解默認(rèn)selectorExpression為*,表示接收當(dāng)前Topic下的所有數(shù)據(jù),如果我們想對tags進(jìn)行動態(tài)配置,在使用${rocketmq.selectorExpression}表達(dá)式時(shí)會發(fā)現(xiàn)所有數(shù)據(jù)全被過濾了,跟蹤源碼(ListenerContainerConfiguration.java)發(fā)現(xiàn)在創(chuàng)建listener時(shí)selectorExpression的數(shù)據(jù)在通environment環(huán)境變量中獲取對應(yīng)的數(shù)據(jù)后又被覆蓋了,導(dǎo)致整個(gè)過濾條件被變更為表達(dá)式。
@Override
public void afterSingletonsInstantiated() {
// 獲取所有所有使用了RocketMQMessageListener注解的bean
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
if (Objects.nonNull(beans)) {
// 循環(huán)注冊容器
beans.forEach(this::registerContainer);
}
}
private void registerContainer(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
// 校驗(yàn)當(dāng)前bean是否實(shí)現(xiàn)了RocketMQListener接口
if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
}
// 獲取bean上的annotation
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
// 解析group及topic,可支持表達(dá)式
String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
String topic = this.environment.resolvePlaceholders(annotation.topic());
boolean listenerEnabled =
(boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
if (!listenerEnabled) {
log.debug(
"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
consumerGroup, topic);
return;
}
validate(annotation);
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
// 注冊bean的,調(diào)用createRocketMQListenerContainer
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
container.setRocketMQMessageListener(annotation);
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
// 此處已經(jīng)根據(jù)表達(dá)式將數(shù)據(jù)取出
String tags = environment.resolvePlaceholders(annotation.selectorExpression());
if (!StringUtils.isEmpty(tags)) {
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
// 此處將SelectorExpression的數(shù)據(jù)覆蓋成了表達(dá)式
container.setRocketMQMessageListener(annotation);
container.setRocketMQListener((RocketMQListener)bean);
container.setObjectMapper(objectMapper);
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name); // REVIEW ME, use the same clientId or multiple?
return container;
}
問題解決
因?yàn)長istenerContainerConfiguration類是實(shí)現(xiàn)了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我們可以通過反射對selectorExpression的數(shù)據(jù)在ListenerContainerConfiguration進(jìn)行初始化前進(jìn)行解析并賦值回去。
/**
* 在springboot初始化后,RocketMQ容器初始化前利用反射動態(tài)改變數(shù)據(jù)
**/
@Configuration
public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private StandardEnvironment environment;
@Override
public void afterPropertiesSet() throws Exception {
Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
for (Object bean : beans.values()){
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
continue;
}
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
Field field = invocationHandler.getClass().getDeclaredField("memberValues");
field.setAccessible(true);
Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);
for (Map.Entry<String,Object> entry: memberValues.entrySet()) {
if(Objects.nonNull(entry)){
memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));
}
}
}
}
}
初次之外,在2.1.0版本的依賴包中已經(jīng)修復(fù)了此Bug,在不造成依賴沖突的前提下,建議使用2.1.0以上的版本包。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- 淺談Springboot整合RocketMQ使用心得
- springBoot整合RocketMQ及坑的示例代碼
- SpringBoot整合RocketMQ實(shí)現(xiàn)消息發(fā)送和接收的詳細(xì)步驟
- Springboot RocketMq實(shí)現(xiàn)過程詳解
- SpringBoot整合RocketMQ實(shí)現(xiàn)發(fā)送同步消息
- Springboot詳解RocketMQ實(shí)現(xiàn)消息發(fā)送與接收流程
- SpringBoot集成RocketMQ的使用示例
- SpringBoot項(xiàng)目嵌入RocketMQ的實(shí)現(xiàn)示例
- SpringBoot+RocketMQ實(shí)現(xiàn)延遲消息的示例代碼
相關(guān)文章
Java的Hibernate框架中一對多的單向和雙向關(guān)聯(lián)映射
建立對SQL語句的映射是Hibernate框架操作數(shù)據(jù)庫的主要手段,這里我們列舉實(shí)例來為大家講解Java的Hibernate框架中一對多的單向和雙向關(guān)聯(lián)映射2016-06-06
Java并發(fā) CompletableFuture異步編程的實(shí)現(xiàn)
這篇文章主要介紹了Java并發(fā) CompletableFuture異步編程的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01
SpringCloud微服務(wù)架構(gòu)實(shí)戰(zhàn)之微服務(wù)治理功能的實(shí)現(xiàn)
這篇文章主要介紹了SpringCloud微服務(wù)架構(gòu)實(shí)戰(zhàn)之微服務(wù)治理,這些治理工具主要包括服務(wù)的注冊與發(fā)現(xiàn)、負(fù)載均衡管理、動態(tài)路由、服務(wù)降級和故障轉(zhuǎn)移、鏈路跟蹤、服務(wù)監(jiān)控等,需要的朋友可以參考下2022-02-02
Java Switch對各類型支持實(shí)現(xiàn)原理
這篇文章主要介紹了Java Switch對各類型支持實(shí)現(xiàn)原理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05
解決feignclient調(diào)用服務(wù),傳遞的中文數(shù)據(jù)成???問題
這篇文章主要介紹了解決feignclient調(diào)用服務(wù),傳遞的中文數(shù)據(jù)成???問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01
java.net.ConnectException: Connection refused問題解決辦法
這篇文章主要介紹了java.net.ConnectException: Connection refused問題解決辦法的相關(guān)資料,需要的朋友可以參考下2016-12-12
Spring?Bean注冊與注入實(shí)現(xiàn)方法詳解
首先,要學(xué)習(xí)Spring中的Bean的注入方式,就要先了解什么是依賴注入。依賴注入是指:讓調(diào)用類對某一接口的實(shí)現(xiàn)類的實(shí)現(xiàn)類的依賴關(guān)系由第三方注入,以此來消除調(diào)用類對某一接口實(shí)現(xiàn)類的依賴。Spring容器中支持的依賴注入方式主要有屬性注入、構(gòu)造函數(shù)注入、工廠方法注入2022-10-10

