解決SpringBoot整合RocketMQ遇到的坑
應(yīng)用場景
在實現(xiàn)RocketMQ消費時,一般會用到@RocketMQMessageListener注解定義Group、Topic以及selectorExpression(數(shù)據(jù)過濾、選擇的規(guī)則)為了能支持動態(tài)篩選數(shù)據(jù),一般都會使用表達式,然后通過apollo或者cloud config進行動態(tài)切換。
引入依賴
<!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
消費者代碼
@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}") public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("消費到的數(shù)據(jù)為:"+s); } }
問題排查
RocketMQMessageListener整個注解默認(rèn)selectorExpression為*,表示接收當(dāng)前Topic下的所有數(shù)據(jù),如果我們想對tags進行動態(tài)配置,在使用${rocketmq.selectorExpression}表達式時會發(fā)現(xiàn)所有數(shù)據(jù)全被過濾了,跟蹤源碼(ListenerContainerConfiguration.java)發(fā)現(xiàn)在創(chuàng)建listener時selectorExpression的數(shù)據(jù)在通environment環(huán)境變量中獲取對應(yīng)的數(shù)據(jù)后又被覆蓋了,導(dǎo)致整個過濾條件被變更為表達式。
@Override public void afterSingletonsInstantiated() { // 獲取所有所有使用了RocketMQMessageListener注解的bean Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) { // 循環(huán)注冊容器 beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); // 校驗當(dāng)前bean是否實現(xiàn)了RocketMQListener接口 if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } // 獲取bean上的annotation RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); // 解析group及topic,可支持表達式 String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this.environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); return; } validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext; // 注冊bean的,調(diào)用createRocketMQListenerContainer genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); // 此處已經(jīng)根據(jù)表達式將數(shù)據(jù)取出 String tags = environment.resolvePlaceholders(annotation.selectorExpression()); if (!StringUtils.isEmpty(tags)) { container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); // 此處將SelectorExpression的數(shù)據(jù)覆蓋成了表達式 container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener)bean); container.setObjectMapper(objectMapper); container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); container.setName(name); // REVIEW ME, use the same clientId or multiple? return container; }
問題解決
因為ListenerContainerConfiguration類是實現(xiàn)了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我們可以通過反射對selectorExpression的數(shù)據(jù)在ListenerContainerConfiguration進行初始化前進行解析并賦值回去。
/** * 在springboot初始化后,RocketMQ容器初始化前利用反射動態(tài)改變數(shù)據(jù) **/ @Configuration public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean { @Autowired private ApplicationContext applicationContext; @Autowired private StandardEnvironment environment; @Override public void afterPropertiesSet() throws Exception { Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); for (Object bean : beans.values()){ Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { continue; } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation); Field field = invocationHandler.getClass().getDeclaredField("memberValues"); field.setAccessible(true); Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler); for (Map.Entry<String,Object> entry: memberValues.entrySet()) { if(Objects.nonNull(entry)){ memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue()))); } } } } }
初次之外,在2.1.0版本的依賴包中已經(jīng)修復(fù)了此Bug,在不造成依賴沖突的前提下,建議使用2.1.0以上的版本包。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
springmvc中RequestMappingHandlerAdapter與HttpMessageConverter的
今天小編就為大家分享一篇關(guān)于springmvc中RequestMappingHandlerAdapter與HttpMessageConverter的裝配講解,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-01-01Mybatis/Mybatis-Plus駝峰式命名映射的實現(xiàn)
本文主要介紹了Mybatis-Plus駝峰式命名映射的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07詳解SpringBoot實現(xiàn)事件同步與異步監(jiān)聽
這篇文章主要通過示例為大家詳細(xì)介紹了SpringBoot中的事件的用法和原理以及如何實現(xiàn)事件同步與異步監(jiān)聽,快跟隨小編一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06k8s部署springboot實現(xiàn)前后端分離項目
本文主要介紹了k8s部署springboot實現(xiàn)前后端分離項目,主要包括配置文件、鏡像構(gòu)建和容器編排等方面,具有一定的參考價值,感興趣的可以了解一下2024-01-01Netty分布式pipeline管道Handler的添加代碼跟蹤解析
這篇文章主要介紹了Netty分布式pipeline管道Handler的添加代碼跟蹤解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03