欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

解決SpringBoot整合RocketMQ遇到的坑

 更新時間:2021年07月02日 11:09:06   作者:喝酸奶的小睿睿  
這篇文章主要介紹了解決SpringBoot整合RocketMQ遇到的坑,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

應(yīng)用場景

在實現(xiàn)RocketMQ消費時,一般會用到@RocketMQMessageListener注解定義Group、Topic以及selectorExpression(數(shù)據(jù)過濾、選擇的規(guī)則)為了能支持動態(tài)篩選數(shù)據(jù),一般都會使用表達式,然后通過apollo或者cloud config進行動態(tài)切換。

引入依賴

 <!-- RocketMq Spring Boot Starter-->
 <dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.4</version>
  </dependency>

消費者代碼

@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}")
public class Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("消費到的數(shù)據(jù)為:"+s);
    }
}

問題排查

RocketMQMessageListener整個注解默認(rèn)selectorExpression為*,表示接收當(dāng)前Topic下的所有數(shù)據(jù),如果我們想對tags進行動態(tài)配置,在使用${rocketmq.selectorExpression}表達式時會發(fā)現(xiàn)所有數(shù)據(jù)全被過濾了,跟蹤源碼(ListenerContainerConfiguration.java)發(fā)現(xiàn)在創(chuàng)建listener時selectorExpression的數(shù)據(jù)在通environment環(huán)境變量中獲取對應(yīng)的數(shù)據(jù)后又被覆蓋了,導(dǎo)致整個過濾條件被變更為表達式。

@Override
    public void afterSingletonsInstantiated() {
    	// 獲取所有所有使用了RocketMQMessageListener注解的bean
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
        if (Objects.nonNull(beans)) {
        	// 循環(huán)注冊容器
            beans.forEach(this::registerContainer);
        }
    }
    private void registerContainer(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
		// 校驗當(dāng)前bean是否實現(xiàn)了RocketMQListener接口
        if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
        }
		// 獲取bean上的annotation
        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
		// 解析group及topic,可支持表達式
        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
        String topic = this.environment.resolvePlaceholders(annotation.topic());
        boolean listenerEnabled =
            (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                .getOrDefault(topic, true);
        if (!listenerEnabled) {
            log.debug(
                "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                consumerGroup, topic);
            return;
        }
        validate(annotation);
        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
            counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
		// 注冊bean的,調(diào)用createRocketMQListenerContainer
        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
            DefaultRocketMQListenerContainer.class);
        if (!container.isRunning()) {
            try {
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }
        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }
    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
        RocketMQMessageListener annotation) {
        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();        
        container.setRocketMQMessageListener(annotation);        
        String nameServer = environment.resolvePlaceholders(annotation.nameServer());
        nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
        container.setNameServer(nameServer);
        if (!StringUtils.isEmpty(accessChannel)) {
            container.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }
        container.setTopic(environment.resolvePlaceholders(annotation.topic()));
        // 此處已經(jīng)根據(jù)表達式將數(shù)據(jù)取出
        String tags = environment.resolvePlaceholders(annotation.selectorExpression());
        if (!StringUtils.isEmpty(tags)) {
            container.setSelectorExpression(tags);
        }
        container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
        // 此處將SelectorExpression的數(shù)據(jù)覆蓋成了表達式
        container.setRocketMQMessageListener(annotation);
        container.setRocketMQListener((RocketMQListener)bean);
        container.setObjectMapper(objectMapper);
        container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
        container.setName(name);  // REVIEW ME, use the same clientId or multiple?
        return container;
    }

問題解決

因為ListenerContainerConfiguration類是實現(xiàn)了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我們可以通過反射對selectorExpression的數(shù)據(jù)在ListenerContainerConfiguration進行初始化前進行解析并賦值回去。

/**
 * 在springboot初始化后,RocketMQ容器初始化前利用反射動態(tài)改變數(shù)據(jù)
**/
@Configuration
public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private StandardEnvironment environment;
    @Override
    public void afterPropertiesSet() throws Exception {
        Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
        for (Object bean : beans.values()){
            Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
            if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
                continue;
            }
            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
            InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
            Field field = invocationHandler.getClass().getDeclaredField("memberValues");
            field.setAccessible(true);
            Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);
            for (Map.Entry<String,Object> entry: memberValues.entrySet()) {
                if(Objects.nonNull(entry)){
                    memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));
                }
            }
        }
    }
}

初次之外,在2.1.0版本的依賴包中已經(jīng)修復(fù)了此Bug,在不造成依賴沖突的前提下,建議使用2.1.0以上的版本包。

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • springmvc中RequestMappingHandlerAdapter與HttpMessageConverter的裝配講解

    springmvc中RequestMappingHandlerAdapter與HttpMessageConverter的

    今天小編就為大家分享一篇關(guān)于springmvc中RequestMappingHandlerAdapter與HttpMessageConverter的裝配講解,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-01-01
  • Mybatis/Mybatis-Plus駝峰式命名映射的實現(xiàn)

    Mybatis/Mybatis-Plus駝峰式命名映射的實現(xiàn)

    本文主要介紹了Mybatis-Plus駝峰式命名映射的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07
  • 詳解SpringBoot實現(xiàn)事件同步與異步監(jiān)聽

    詳解SpringBoot實現(xiàn)事件同步與異步監(jiān)聽

    這篇文章主要通過示例為大家詳細(xì)介紹了SpringBoot中的事件的用法和原理以及如何實現(xiàn)事件同步與異步監(jiān)聽,快跟隨小編一起學(xué)習(xí)學(xué)習(xí)吧
    2022-06-06
  • java 實現(xiàn)KMP算法

    java 實現(xiàn)KMP算法

    這篇文章主要介紹了java 如何實現(xiàn)KMP算法,幫助大家更好的理解和學(xué)習(xí)Java,感興趣的朋友可以了解下
    2020-12-12
  • idea實現(xiàn)類快捷生成接口方法示例

    idea實現(xiàn)類快捷生成接口方法示例

    這篇文章主要介紹了idea實現(xiàn)類快捷生成接口方法示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • 總結(jié)Java的Struts框架的異常處理方法

    總結(jié)Java的Struts框架的異常處理方法

    這篇文章主要介紹了Java的Struts框架的異常處理方法,Struts是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下
    2015-12-12
  • k8s部署springboot實現(xiàn)前后端分離項目

    k8s部署springboot實現(xiàn)前后端分離項目

    本文主要介紹了k8s部署springboot實現(xiàn)前后端分離項目,主要包括配置文件、鏡像構(gòu)建和容器編排等方面,具有一定的參考價值,感興趣的可以了解一下
    2024-01-01
  • Spring Boot 與 mybatis配置方法

    Spring Boot 與 mybatis配置方法

    這篇文章主要介紹了Spring Boot 與 mybatis配置方法,需要的朋友可以參考下
    2017-06-06
  • Java反射及性能詳細(xì)

    Java反射及性能詳細(xì)

    這篇文章主要介紹了Java反射及性能,現(xiàn)如今的java工程中,反射的使用無處無在。無論是設(shè)計模式中的代理模式,還是紅透半邊天的Spring框架中的IOC,AOP等等,都存在大量反射的影子。下面我們就對該話題進行詳細(xì)介紹,感興趣的小伙伴可以參考一下
    2021-10-10
  • Netty分布式pipeline管道Handler的添加代碼跟蹤解析

    Netty分布式pipeline管道Handler的添加代碼跟蹤解析

    這篇文章主要介紹了Netty分布式pipeline管道Handler的添加代碼跟蹤解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-03-03

最新評論