spring-Kafka中的@KafkaListener深入源碼解讀
前言
本文主要通過深入了解源碼,梳理從spring啟動到真正監(jiān)聽kafka消息的這套流程
一、總體流程
從spring啟動開始處理@KafkaListener,到start消息監(jiān)聽整體流程圖

二、源碼解讀
1、postProcessAfterInitialization
KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
// 掃描@KafkaListener注解
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
......
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
// 遍歷掃描到的所有@KafkaListener注解并開始處理
for (KafkaListener listener : entry.getValue()) {
processKafkaListener(listener, method, bean, beanName);
}
}
this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
// 處理在類上的@KafkaListener注解
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}
1.1、processKafkaListener
KafkaListenerAnnotationBeanPostProcessor#processKafkaListener
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
}1.2、processListener
KafkaListenerAnnotationBeanPostProcessor#processListener
將每個(gè)kafkaListener轉(zhuǎn)變成MethodKafkaListenerEndpoint并注冊到KafkaListenerEndpointRegistrar容器,方便后續(xù)統(tǒng)一啟動監(jiān)聽
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
Object bean, Object adminTarget, String beanName) {
String beanRef = kafkaListener.beanRef();
if (StringUtils.hasText(beanRef)) {
this.listenerScope.addListener(beanRef, bean);
}
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(kafkaListener));
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
endpoint.setTopics(resolveTopics(kafkaListener));
endpoint.setTopicPattern(resolvePattern(kafkaListener));
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
String group = kafkaListener.containerGroup();
......
// 注冊已經(jīng)封裝好的消費(fèi)端-endpoint
this.registrar.registerEndpoint(endpoint, factory);
if (StringUtils.hasText(beanRef)) {
this.listenerScope.removeListener(beanRef);
}
}1.3、registerEndpoint
KafkaListenerEndpointRegistrar#registerEndpoint
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
......
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
// 如果到了需要立即啟動監(jiān)聽的階段就直接注冊并監(jiān)聽(也就是創(chuàng)建消息監(jiān)聽容器并啟動)
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
// 一般情況都先走這一步,添加至此列表,待bean后續(xù)的生命周期 統(tǒng)一注冊并啟動
this.endpointDescriptors.add(descriptor);
}
}
}
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
boolean startImmediately) {
......
synchronized (this.listenerContainers) {
......
// 1.創(chuàng)建消息監(jiān)聽容器
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
// 2.是否立即啟動消息監(jiān)聽
if (startImmediately) {
startIfNecessary(container);
}
}
}1.4、startIfNecessary
KafkaListenerEndpointRegistry#startIfNecessary
啟動消息監(jiān)聽
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
// 啟動消息監(jiān)聽
// 到這一步之后,消息監(jiān)聽以及處理都是KafkaMessageListenerContainer的邏輯
// 到此也就打通了@KafkaListener到MessageListenerContainer消息監(jiān)聽容器的邏輯
listenerContainer.start();
}
}2、afterSingletonsInstantiated
這一步是實(shí)例化(此處的實(shí)例化是已經(jīng)創(chuàng)建對象并完成了初始化操作)之后,緊接著的操作
KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);
// 對"注冊員"信息的完善
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, KafkaListenerConfigurer> instances =
((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
for (KafkaListenerConfigurer configurer : instances.values()) {
configurer.configureKafkaListeners(this.registrar);
}
}
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
KafkaListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
......
// Actually register all listeners
// 整個(gè)方法這里才是關(guān)鍵
// 創(chuàng)建MessageListenerContainer并注冊
this.registrar.afterPropertiesSet();
}
2.1、afterPropertiesSet
KafkaListenerEndpointRegistrar#afterPropertiesSet
public void afterPropertiesSet() {
registerAllEndpoints();
}2.2、registerAllEndpoints
KafkaListenerEndpointRegistrar#registerAllEndpoints
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
// 這里是真正的創(chuàng)建ListenerContainer監(jiān)聽對象并注冊
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
// 啟動時(shí)所有消息監(jiān)聽對象都注冊之后,便將參數(shù)置為true
this.startImmediately = true; // trigger immediate startup
}
}總結(jié)
以上便是整個(gè)流程,總體感覺就是將kafka消息監(jiān)聽融入到spring生命周期中,并完美契合
調(diào)試及相關(guān)源碼版本:
org.springframework.boot::2.3.3.RELEASE spring-kafka:2.5.4.RELEASE
相關(guān)參考:
到此這篇關(guān)于spring-Kafka中的@KafkaListener深入源碼解讀的文章就介紹到這了,更多相關(guān)spring-Kafka @KafkaListener內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java利用反射動態(tài)設(shè)置對象字段值的實(shí)現(xiàn)
橋梁信息維護(hù)需要做到字段級別的權(quán)限控制,本文主要介紹了Java利用反射動態(tài)設(shè)置對象字段值的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01
你的Idea還有BUG嗎不妨試試另一個(gè)開發(fā)神器
Spring Tool Suite(STS)就是一個(gè)基于Eclipse的開發(fā)環(huán)境, 用于開發(fā)Spring應(yīng)用程序。本文給大家給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2020-12-12
Spring?Boot最經(jīng)典的20道面試題你都會了嗎
Spring Boot是現(xiàn)代化的Java應(yīng)用程序開發(fā)框架,具有高度的靈活性和可擴(kuò)展性,下面這篇文章主要給大家介紹了關(guān)于Spring?Boot最經(jīng)典的20道面試題,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-06-06
Java Calendar日歷與Date日期的相互轉(zhuǎn)換詳解
這篇文章主要介紹了Java Calendar日歷與Date日期的相互轉(zhuǎn)換詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08

