欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

spring-Kafka中的@KafkaListener深入源碼解讀

 更新時間:2023年02月20日 09:45:33   作者:柏油  
本文主要通過深入了解源碼,梳理從spring啟動到真正監(jiān)聽kafka消息的這套流程,從spring啟動開始處理@KafkaListener,本文結(jié)合實例流程圖給大家講解的非常詳細,需要的朋友參考下

前言

本文主要通過深入了解源碼,梳理從spring啟動到真正監(jiān)聽kafka消息的這套流程

一、總體流程

從spring啟動開始處理@KafkaListener,到start消息監(jiān)聽整體流程圖

二、源碼解讀

1、postProcessAfterInitialization

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
		    Class<?> targetClass = AopUtils.getTargetClass(bean);
		    
		    // 掃描@KafkaListener注解
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			
			......
			
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(bean.getClass());
				this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
			}
			else {
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
					Method method = entry.getKey();
					// 遍歷掃描到的所有@KafkaListener注解并開始處理
					for (KafkaListener listener : entry.getValue()) {
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
			}
			// 處理在類上的@KafkaListener注解
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}

1.1、processKafkaListener

KafkaListenerAnnotationBeanPostProcessor#processKafkaListener

	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
		Method methodToUse = checkProxy(method, bean);
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setMethod(methodToUse);
		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
	}

1.2、processListener

KafkaListenerAnnotationBeanPostProcessor#processListener

將每個kafkaListener轉(zhuǎn)變成MethodKafkaListenerEndpoint并注冊到KafkaListenerEndpointRegistrar容器,方便后續(xù)統(tǒng)一啟動監(jiān)聽

	protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
			Object bean, Object adminTarget, String beanName) {

		String beanRef = kafkaListener.beanRef();
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.addListener(beanRef, bean);
		}
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(kafkaListener));
		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
		endpoint.setTopics(resolveTopics(kafkaListener));
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
		String group = kafkaListener.containerGroup();

        ......
      
        // 注冊已經(jīng)封裝好的消費端-endpoint
		this.registrar.registerEndpoint(endpoint, factory);
		
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.removeListener(beanRef);
		}
	}

1.3、registerEndpoint

KafkaListenerEndpointRegistrar#registerEndpoint

	public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
		
	    ......
		
		KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
		synchronized (this.endpointDescriptors) {
		    // 如果到了需要立即啟動監(jiān)聽的階段就直接注冊并監(jiān)聽(也就是創(chuàng)建消息監(jiān)聽容器并啟動)
			if (this.startImmediately) { // Register and start immediately
				this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
						resolveContainerFactory(descriptor), true);
			}
			else {
			    // 一般情況都先走這一步,添加至此列表,待bean后續(xù)的生命周期 統(tǒng)一注冊并啟動
				this.endpointDescriptors.add(descriptor);
			}
		}
	}

	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
			boolean startImmediately) {

        ......
        
		synchronized (this.listenerContainers) {
		
			......
			
			// 1.創(chuàng)建消息監(jiān)聽容器
			MessageListenerContainer container = createListenerContainer(endpoint, factory);
			this.listenerContainers.put(id, container);
			if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
				List<MessageListenerContainer> containerGroup;
				if (this.applicationContext.containsBean(endpoint.getGroup())) {
					containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
				}
				else {
					containerGroup = new ArrayList<MessageListenerContainer>();
					this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
				}
				containerGroup.add(container);
			}
            
            // 2.是否立即啟動消息監(jiān)聽
			if (startImmediately) {
				startIfNecessary(container);
			}
		}
	}

1.4、startIfNecessary

KafkaListenerEndpointRegistry#startIfNecessary
啟動消息監(jiān)聽

	private void startIfNecessary(MessageListenerContainer listenerContainer) {
		if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
		    // 啟動消息監(jiān)聽
		    // 到這一步之后,消息監(jiān)聽以及處理都是KafkaMessageListenerContainer的邏輯
		    // 到此也就打通了@KafkaListener到MessageListenerContainer消息監(jiān)聽容器的邏輯
			listenerContainer.start();
		}
	}

2、afterSingletonsInstantiated

這一步是實例化(此處的實例化是已經(jīng)創(chuàng)建對象并完成了初始化操作)之后,緊接著的操作

KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated

	public void afterSingletonsInstantiated() {
		this.registrar.setBeanFactory(this.beanFactory);

        // 對"注冊員"信息的完善
		if (this.beanFactory instanceof ListableBeanFactory) {
			Map<String, KafkaListenerConfigurer> instances =
					((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
			for (KafkaListenerConfigurer configurer : instances.values()) {
				configurer.configureKafkaListeners(this.registrar);
			}
		}

		if (this.registrar.getEndpointRegistry() == null) {
			if (this.endpointRegistry == null) {
				Assert.state(this.beanFactory != null,
						"BeanFactory must be set to find endpoint registry by bean name");
				this.endpointRegistry = this.beanFactory.getBean(
						KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
						KafkaListenerEndpointRegistry.class);
			}
			this.registrar.setEndpointRegistry(this.endpointRegistry);
		}

		......

		// Actually register all listeners
		// 整個方法這里才是關(guān)鍵
		// 創(chuàng)建MessageListenerContainer并注冊
		this.registrar.afterPropertiesSet();
	}

2.1、afterPropertiesSet

KafkaListenerEndpointRegistrar#afterPropertiesSet

	public void afterPropertiesSet() {
		registerAllEndpoints();
	}

2.2、registerAllEndpoints

KafkaListenerEndpointRegistrar#registerAllEndpoints

	protected void registerAllEndpoints() {
		synchronized (this.endpointDescriptors) {
			for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
			    // 這里是真正的創(chuàng)建ListenerContainer監(jiān)聽對象并注冊
				this.endpointRegistry.registerListenerContainer(
						descriptor.endpoint, resolveContainerFactory(descriptor));
			}
			// 啟動時所有消息監(jiān)聽對象都注冊之后,便將參數(shù)置為true
			this.startImmediately = true;  // trigger immediate startup
		}
	}

總結(jié)

以上便是整個流程,總體感覺就是將kafka消息監(jiān)聽融入到spring生命周期中,并完美契合

調(diào)試及相關(guān)源碼版本:

org.springframework.boot::2.3.3.RELEASE
spring-kafka:2.5.4.RELEASE

相關(guān)參考:

spring-kafka官方文檔
spring容器之refresh方法

到此這篇關(guān)于spring-Kafka中的@KafkaListener深入源碼解讀的文章就介紹到這了,更多相關(guān)spring-Kafka @KafkaListener內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java的Channel通道之FileChannel類詳解

    Java的Channel通道之FileChannel類詳解

    這篇文章主要介紹了Java的Channel通道之FileChannel類詳解,FileChannel類是Java NIO中的一個重要類,用于在文件中進行讀寫操作,它提供了一種高效的方式來處理大文件和隨機訪問文件的需求,需要的朋友可以參考下
    2023-10-10
  • Java利用反射動態(tài)設(shè)置對象字段值的實現(xiàn)

    Java利用反射動態(tài)設(shè)置對象字段值的實現(xiàn)

    橋梁信息維護需要做到字段級別的權(quán)限控制,本文主要介紹了Java利用反射動態(tài)設(shè)置對象字段值的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下
    2024-01-01
  • Java字節(jié)碼中jvm實例用法

    Java字節(jié)碼中jvm實例用法

    在本篇文章里小編給大家整理的是一篇關(guān)于Java字節(jié)碼中jvm實例用法內(nèi)容,有興趣的朋友們可以學習參考下。
    2021-02-02
  • 你的Idea還有BUG嗎不妨試試另一個開發(fā)神器

    你的Idea還有BUG嗎不妨試試另一個開發(fā)神器

    Spring Tool Suite(STS)就是一個基于Eclipse的開發(fā)環(huán)境, 用于開發(fā)Spring應用程序。本文給大家給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2020-12-12
  • Java詳解swagger2如何配置使用

    Java詳解swagger2如何配置使用

    編寫和維護接口文檔是每個程序員的職責,根據(jù)Swagger2可以快速幫助我們編寫最新的API接口文檔,再也不用擔心開會前仍忙于整理各種資料了,間接提升了團隊開發(fā)的溝通效率
    2022-06-06
  • 創(chuàng)建Java線程安全類的七種方法

    創(chuàng)建Java線程安全類的七種方法

    線程安全是指某個方法或某段代碼,在多線程中能夠正確的執(zhí)行,不會出現(xiàn)數(shù)據(jù)不一致或數(shù)據(jù)污染的情況,我們把這樣的程序稱之為線程安全的,反之則為非線程安全的,下面這篇文章主要給大家介紹了關(guān)于創(chuàng)建Java線程安全類的七種方法,需要的朋友可以參考下
    2022-06-06
  • Kotlin協(xié)程與并發(fā)深入全面講解

    Kotlin協(xié)程與并發(fā)深入全面講解

    Android官方對協(xié)程的定義-協(xié)程是一種并發(fā)設(shè)計模式,您可以在Android平臺上使用它來簡化異步執(zhí)行的代碼。協(xié)程是在版本1.3中添加到Kotlin的,它基于來自其他語言的既定概念
    2022-11-11
  • Spring?Boot最經(jīng)典的20道面試題你都會了嗎

    Spring?Boot最經(jīng)典的20道面試題你都會了嗎

    Spring Boot是現(xiàn)代化的Java應用程序開發(fā)框架,具有高度的靈活性和可擴展性,下面這篇文章主要給大家介紹了關(guān)于Spring?Boot最經(jīng)典的20道面試題,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2024-06-06
  • servlet異步請求的實現(xiàn)

    servlet異步請求的實現(xiàn)

    本文主要介紹了servlet異步請求的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-07-07
  • Java Calendar日歷與Date日期的相互轉(zhuǎn)換詳解

    Java Calendar日歷與Date日期的相互轉(zhuǎn)換詳解

    這篇文章主要介紹了Java Calendar日歷與Date日期的相互轉(zhuǎn)換詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-08-08

最新評論