SpringBoot如何處理@KafkaListener消息
消息監(jiān)聽容器
1、KafkaMessageListenerContainer
由spring提供用于監(jiān)聽以及拉取消息,并將這些消息按指定格式轉(zhuǎn)換后交給由@KafkaListener注解的方法處理,相當于一個消費者;
看看其整體代碼結(jié)構(gòu):
可以發(fā)現(xiàn)其入口方法為doStart(), 往上追溯到實現(xiàn)了SmartLifecycle接口,很明顯,由spring管理其start和stop操作;
ListenerConsumer, 內(nèi)部真正拉取消息消費的是這個結(jié)構(gòu),其 實現(xiàn)了Runable接口,簡言之,它就是一個后臺線程輪訓拉取并處理消息(while true死循環(huán)拉取消息)。
在doStart方法中會創(chuàng)建ListenerConsumer并交給線程池處理
以上步驟就開啟了消息監(jiān)聽過程。
KafkaMessageListenerContainer#doStart
protected void doStart() { if (isRunning()) { return; } ContainerProperties containerProperties = getContainerProperties(); if (!this.consumerFactory.isAutoCommit()) { AckMode ackMode = containerProperties.getAckMode(); if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) { Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0"); } if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0) { containerProperties.setAckTime(5000); } } Object messageListener = containerProperties.getMessageListener(); Assert.state(messageListener != null, "A MessageListener is required"); if (containerProperties.getConsumerTaskExecutor() == null) { SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-C-"); containerProperties.setConsumerTaskExecutor(consumerExecutor); } Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener"); this.listener = (GenericMessageListener<?>) messageListener; ListenerType listenerType = ListenerUtils.determineListenerType(this.listener); if (this.listener instanceof DelegatingMessageListener) { Object delegating = this.listener; while (delegating instanceof DelegatingMessageListener) { delegating = ((DelegatingMessageListener<?>) delegating).getDelegate(); } listenerType = ListenerUtils.determineListenerType(delegating); } // 這里創(chuàng)建了監(jiān)聽消費者對象 this.listenerConsumer = new ListenerConsumer(this.listener, listenerType); setRunning(true); // 將消費者對象放入到線程池中執(zhí)行 this.listenerConsumerFuture = containerProperties .getConsumerTaskExecutor() .submitListenable(this.listenerConsumer); }
KafkaMessageListenerContainer.ListenerConsumer#run
public void run() { this.consumerThread = Thread.currentThread(); if (this.genericListener instanceof ConsumerSeekAware) { ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this); } if (this.transactionManager != null) { ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId); } this.count = 0; this.last = System.currentTimeMillis(); if (isRunning() && this.definedPartitions != null) { try { initPartitionsIfNeeded(); } catch (Exception e) { this.logger.error("Failed to set initial offsets", e); } } long lastReceive = System.currentTimeMillis(); long lastAlertAt = lastReceive; while (isRunning()) { try { if (!this.autoCommit && !this.isRecordAck) { processCommits(); } processSeeks(); if (!this.consumerPaused && isPaused()) { this.consumer.pause(this.consumer.assignment()); this.consumerPaused = true; if (this.logger.isDebugEnabled()) { this.logger.debug("Paused consumption from: " + this.consumer.paused()); } publishConsumerPausedEvent(this.consumer.assignment()); } // 拉取信息 ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout()); this.lastPoll = System.currentTimeMillis(); if (this.consumerPaused && !isPaused()) { if (this.logger.isDebugEnabled()) { this.logger.debug("Resuming consumption from: " + this.consumer.paused()); } Set<TopicPartition> paused = this.consumer.paused(); this.consumer.resume(paused); this.consumerPaused = false; publishConsumerResumedEvent(paused); } if (records != null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records"); if (records.count() > 0 && this.logger.isTraceEnabled()) { this.logger.trace(records.partitions().stream() .flatMap(p -> records.records(p).stream()) // map to same format as send metadata toString() .map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()) .collect(Collectors.toList())); } } if (records != null && records.count() > 0) { if (this.containerProperties.getIdleEventInterval() != null) { lastReceive = System.currentTimeMillis(); } invokeListener(records); } else { if (this.containerProperties.getIdleEventInterval() != null) { long now = System.currentTimeMillis(); if (now > lastReceive + this.containerProperties.getIdleEventInterval() && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) { publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener ? this.consumer : null, this.consumerPaused); lastAlertAt = now; if (this.genericListener instanceof ConsumerSeekAware) { seekPartitions(getAssignedPartitions(), true); } } } } } catch (WakeupException e) { // Ignore, we're stopping } catch (NoOffsetForPartitionException nofpe) { this.fatalError = true; ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe); break; } catch (Exception e) { handleConsumerException(e); } } ProducerFactoryUtils.clearConsumerGroupId(); if (!this.fatalError) { if (this.kafkaTxManager == null) { commitPendingAcks(); try { this.consumer.unsubscribe(); } catch (WakeupException e) { // No-op. Continue process } } } else { ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container"); KafkaMessageListenerContainer.this.stop(); } this.monitorTask.cancel(true); if (!this.taskSchedulerExplicitlySet) { ((ThreadPoolTaskScheduler) this.taskScheduler).destroy(); } this.consumer.close(); this.logger.info("Consumer stopped"); }
2、ConcurrentMessageListenerContainer
并發(fā)消息監(jiān)聽,相當于創(chuàng)建消費者;其底層邏輯仍然是通過KafkaMessageListenerContainer實現(xiàn)處理;從實現(xiàn)上看就是在KafkaMessageListenerContainer上做了層包裝,有多少的concurrency就創(chuàng)建多個KafkaMessageListenerContainer,也就是concurrency個消費者。
protected void doStart() { if (!isRunning()) { ContainerProperties containerProperties = getContainerProperties(); TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions(); if (topicPartitions != null && this.concurrency > topicPartitions.length) { this.logger.warn("When specific partitions are provided, the concurrency must be less than or " + "equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length); this.concurrency = topicPartitions.length; } setRunning(true); // 創(chuàng)建多個消費者 for (int i = 0; i < this.concurrency; i++) { KafkaMessageListenerContainer<K, V> container; if (topicPartitions == null) { container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties); } else { container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties, partitionSubset(containerProperties, i)); } String beanName = getBeanName(); container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i); if (getApplicationEventPublisher() != null) { container.setApplicationEventPublisher(getApplicationEventPublisher()); } container.setClientIdSuffix("-" + i); container.setAfterRollbackProcessor(getAfterRollbackProcessor()); container.start(); this.containers.add(container); } } }
3、@KafkaListener底層監(jiān)聽原理
上面已經(jīng)介紹了KafkaMessageListenerContainer
的作用是拉取并處理消息,但還缺少關(guān)鍵的一步,即 如何將我們的業(yè)務(wù)邏輯與KafkaMessageListenerContainer的處理邏輯聯(lián)系起來?
那么這個橋梁就是@KafkaListener注解
KafkaListenerAnnotationBeanPostProcessor, 從后綴BeanPostProcessor就可以知道這是Spring IOC初始化bean相關(guān)的操作,當然這里也是;
此類會掃描帶@KafkaListener注解的類或者方法,通過 KafkaListenerContainerFactory工廠創(chuàng)建對應(yīng)的KafkaMessageListenerContainer,并調(diào)用start方法啟動監(jiān)聽,也就是這樣打通了這條路…
4、Spring Boot 自動加載kafka相關(guān)配置
1、KafkaAutoConfiguration
自動生成kafka相關(guān)配置,比如當缺少這些bean的時候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默認創(chuàng)建bean實例
2、KafkaAnnotationDrivenConfiguration
主要是針對于spring-kafka提供的注解背后的相關(guān)操作,比如 @KafkaListener;
在開啟了@EnableKafka注解后,spring會掃描到此配置并創(chuàng)建缺少的bean實例,比如當配置的工廠beanName不是kafkaListenerContainerFactory的時候,就會默認創(chuàng)建一個beanName為kafkaListenerContainerFactory的實例,這也是為什么在springboot中不用定義consumer的相關(guān)配置也可以通過@KafkaListener正常的處理消息
5、消息處理
單條消息處理
@Configuration public class KafkaConsumerConfiguration { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaCustomizeContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(2); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers()); props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300); // poll 一次拉取的阻塞的最大時長,單位:毫秒。這里指的是阻塞拉取需要滿足至少 fetch-min-size 大小的消息 props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000); return props; } }
這種方式的@KafkaLisener中的參數(shù)是單條的。
批量處理
@Configuration @EnableKafka public class KafkaConfig { @Bean public KafkaListenerContainerFactory<?, ?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 增加開啟批量處理 factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<< return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } } // 注意:這里接受的是集合類型 @KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list) { ... }
這種方式的@KafkaLisener中的參數(shù)是多條的。
6、線程池相關(guān)
如果沒有額外給Kafka指定線程池,底層默認用的是SimpleAsyncTaskExecutor類,它不使用線程池,而是為每個任務(wù)創(chuàng)建新線程。相當于一個消費者用一個獨立的線程來跑。
總結(jié)
spring為了將kafka融入其生態(tài),方便在spring大環(huán)境下使用kafka,開發(fā)了spring-kafa這一模塊,本質(zhì)上是為了幫助開發(fā)者更好的以spring的方式使用kafka
@KafkaListener就是這么一個工具,在同一個項目中既可以有單條的消息處理,也可以配置多條的消息處理,稍微改變下配置即可實現(xiàn),很是方便
當然,@KafkaListener單條或者多條消息處理仍然是spring自行封裝處理,與kafka-client客戶端的拉取機制無關(guān);比如一次性拉取50條消息,對于單條處理來說就是循環(huán)50次處理,而多條消息處理則可以一次性處理50條;本質(zhì)上來說這套邏輯都是spring處理的,并不是說單條消費就是通過kafka-client一次只拉取一條消息
在使用過程中需要注意spring自動的創(chuàng)建的一些bean實例,當然也可以覆蓋其自動創(chuàng)建的實例以滿足特定的需求場景。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
IDEA插件EasyCode及MyBatis最優(yōu)配置步驟詳解
這篇文章主要介紹了IDEA插件EasyCode MyBatis最優(yōu)配置步驟詳解,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12SpringMVC 使用JSR-303進行校驗 @Valid示例
本篇文章主要介紹了SpringMVC 使用JSR-303進行校驗 @Valid示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-02-02JPA如何使用nativequery多表關(guān)聯(lián)查詢返回自定義實體類
這篇文章主要介紹了JPA如何使用nativequery多表關(guān)聯(lián)查詢返回自定義實體類,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11SpringCloud如何利用Feign訪問外部http請求
這篇文章主要介紹了SpringCloud如何利用Feign訪問外部http請求,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03