SpringBoot如何處理@KafkaListener消息
消息監(jiān)聽容器
1、KafkaMessageListenerContainer
由spring提供用于監(jiān)聽以及拉取消息,并將這些消息按指定格式轉(zhuǎn)換后交給由@KafkaListener注解的方法處理,相當(dāng)于一個(gè)消費(fèi)者;
看看其整體代碼結(jié)構(gòu):

可以發(fā)現(xiàn)其入口方法為doStart(), 往上追溯到實(shí)現(xiàn)了SmartLifecycle接口,很明顯,由spring管理其start和stop操作;
ListenerConsumer, 內(nèi)部真正拉取消息消費(fèi)的是這個(gè)結(jié)構(gòu),其 實(shí)現(xiàn)了Runable接口,簡言之,它就是一個(gè)后臺(tái)線程輪訓(xùn)拉取并處理消息(while true死循環(huán)拉取消息)。
在doStart方法中會(huì)創(chuàng)建ListenerConsumer并交給線程池處理
以上步驟就開啟了消息監(jiān)聽過程。
KafkaMessageListenerContainer#doStart
protected void doStart() {
if (isRunning()) {
return;
}
ContainerProperties containerProperties = getContainerProperties();
if (!this.consumerFactory.isAutoCommit()) {
AckMode ackMode = containerProperties.getAckMode();
if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
}
if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
&& containerProperties.getAckTime() == 0) {
containerProperties.setAckTime(5000);
}
}
Object messageListener = containerProperties.getMessageListener();
Assert.state(messageListener != null, "A MessageListener is required");
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
this.listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);
if (this.listener instanceof DelegatingMessageListener) {
Object delegating = this.listener;
while (delegating instanceof DelegatingMessageListener) {
delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();
}
listenerType = ListenerUtils.determineListenerType(delegating);
}
// 這里創(chuàng)建了監(jiān)聽消費(fèi)者對(duì)象
this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);
setRunning(true);
// 將消費(fèi)者對(duì)象放入到線程池中執(zhí)行
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
}KafkaMessageListenerContainer.ListenerConsumer#run
public void run() {
this.consumerThread = Thread.currentThread();
if (this.genericListener instanceof ConsumerSeekAware) {
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
}
if (this.transactionManager != null) {
ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
}
this.count = 0;
this.last = System.currentTimeMillis();
if (isRunning() && this.definedPartitions != null) {
try {
initPartitionsIfNeeded();
}
catch (Exception e) {
this.logger.error("Failed to set initial offsets", e);
}
}
long lastReceive = System.currentTimeMillis();
long lastAlertAt = lastReceive;
while (isRunning()) {
try {
if (!this.autoCommit && !this.isRecordAck) {
processCommits();
}
processSeeks();
if (!this.consumerPaused && isPaused()) {
this.consumer.pause(this.consumer.assignment());
this.consumerPaused = true;
if (this.logger.isDebugEnabled()) {
this.logger.debug("Paused consumption from: " + this.consumer.paused());
}
publishConsumerPausedEvent(this.consumer.assignment());
}
// 拉取信息
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
this.lastPoll = System.currentTimeMillis();
if (this.consumerPaused && !isPaused()) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Resuming consumption from: " + this.consumer.paused());
}
Set<TopicPartition> paused = this.consumer.paused();
this.consumer.resume(paused);
this.consumerPaused = false;
publishConsumerResumedEvent(paused);
}
if (records != null && this.logger.isDebugEnabled()) {
this.logger.debug("Received: " + records.count() + " records");
if (records.count() > 0 && this.logger.isTraceEnabled()) {
this.logger.trace(records.partitions().stream()
.flatMap(p -> records.records(p).stream())
// map to same format as send metadata toString()
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
.collect(Collectors.toList()));
}
}
if (records != null && records.count() > 0) {
if (this.containerProperties.getIdleEventInterval() != null) {
lastReceive = System.currentTimeMillis();
}
invokeListener(records);
}
else {
if (this.containerProperties.getIdleEventInterval() != null) {
long now = System.currentTimeMillis();
if (now > lastReceive + this.containerProperties.getIdleEventInterval()
&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener
? this.consumer : null, this.consumerPaused);
lastAlertAt = now;
if (this.genericListener instanceof ConsumerSeekAware) {
seekPartitions(getAssignedPartitions(), true);
}
}
}
}
}
catch (WakeupException e) {
// Ignore, we're stopping
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
break;
}
catch (Exception e) {
handleConsumerException(e);
}
}
ProducerFactoryUtils.clearConsumerGroupId();
if (!this.fatalError) {
if (this.kafkaTxManager == null) {
commitPendingAcks();
try {
this.consumer.unsubscribe();
}
catch (WakeupException e) {
// No-op. Continue process
}
}
}
else {
ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");
KafkaMessageListenerContainer.this.stop();
}
this.monitorTask.cancel(true);
if (!this.taskSchedulerExplicitlySet) {
((ThreadPoolTaskScheduler) this.taskScheduler).destroy();
}
this.consumer.close();
this.logger.info("Consumer stopped");
}2、ConcurrentMessageListenerContainer
并發(fā)消息監(jiān)聽,相當(dāng)于創(chuàng)建消費(fèi)者;其底層邏輯仍然是通過KafkaMessageListenerContainer實(shí)現(xiàn)處理;從實(shí)現(xiàn)上看就是在KafkaMessageListenerContainer上做了層包裝,有多少的concurrency就創(chuàng)建多個(gè)KafkaMessageListenerContainer,也就是concurrency個(gè)消費(fèi)者。
protected void doStart() {
if (!isRunning()) {
ContainerProperties containerProperties = getContainerProperties();
TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null
&& this.concurrency > topicPartitions.length) {
this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
setRunning(true);
// 創(chuàng)建多個(gè)消費(fèi)者
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container;
if (topicPartitions == null) {
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
containerProperties);
}
else {
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
containerProperties, partitionSubset(containerProperties, i));
}
String beanName = getBeanName();
container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
if (getApplicationEventPublisher() != null) {
container.setApplicationEventPublisher(getApplicationEventPublisher());
}
container.setClientIdSuffix("-" + i);
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
container.start();
this.containers.add(container);
}
}
}3、@KafkaListener底層監(jiān)聽原理
上面已經(jīng)介紹了KafkaMessageListenerContainer的作用是拉取并處理消息,但還缺少關(guān)鍵的一步,即 如何將我們的業(yè)務(wù)邏輯與KafkaMessageListenerContainer的處理邏輯聯(lián)系起來?
那么這個(gè)橋梁就是@KafkaListener注解
KafkaListenerAnnotationBeanPostProcessor, 從后綴BeanPostProcessor就可以知道這是Spring IOC初始化bean相關(guān)的操作,當(dāng)然這里也是;
此類會(huì)掃描帶@KafkaListener注解的類或者方法,通過 KafkaListenerContainerFactory工廠創(chuàng)建對(duì)應(yīng)的KafkaMessageListenerContainer,并調(diào)用start方法啟動(dòng)監(jiān)聽,也就是這樣打通了這條路…
4、Spring Boot 自動(dòng)加載kafka相關(guān)配置
1、KafkaAutoConfiguration
自動(dòng)生成kafka相關(guān)配置,比如當(dāng)缺少這些bean的時(shí)候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默認(rèn)創(chuàng)建bean實(shí)例
2、KafkaAnnotationDrivenConfiguration
主要是針對(duì)于spring-kafka提供的注解背后的相關(guān)操作,比如 @KafkaListener;
在開啟了@EnableKafka注解后,spring會(huì)掃描到此配置并創(chuàng)建缺少的bean實(shí)例,比如當(dāng)配置的工廠beanName不是kafkaListenerContainerFactory的時(shí)候,就會(huì)默認(rèn)創(chuàng)建一個(gè)beanName為kafkaListenerContainerFactory的實(shí)例,這也是為什么在springboot中不用定義consumer的相關(guān)配置也可以通過@KafkaListener正常的處理消息
5、消息處理
單條消息處理
@Configuration
public class KafkaConsumerConfiguration {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaCustomizeContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
private ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);
// poll 一次拉取的阻塞的最大時(shí)長,單位:毫秒。這里指的是阻塞拉取需要滿足至少 fetch-min-size 大小的消息
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);
return props;
}
}這種方式的@KafkaLisener中的參數(shù)是單條的。
批量處理
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 增加開啟批量處理
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
// 注意:這里接受的是集合類型
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}這種方式的@KafkaLisener中的參數(shù)是多條的。
6、線程池相關(guān)
如果沒有額外給Kafka指定線程池,底層默認(rèn)用的是SimpleAsyncTaskExecutor類,它不使用線程池,而是為每個(gè)任務(wù)創(chuàng)建新線程。相當(dāng)于一個(gè)消費(fèi)者用一個(gè)獨(dú)立的線程來跑。
總結(jié)
spring為了將kafka融入其生態(tài),方便在spring大環(huán)境下使用kafka,開發(fā)了spring-kafa這一模塊,本質(zhì)上是為了幫助開發(fā)者更好的以spring的方式使用kafka
@KafkaListener就是這么一個(gè)工具,在同一個(gè)項(xiàng)目中既可以有單條的消息處理,也可以配置多條的消息處理,稍微改變下配置即可實(shí)現(xiàn),很是方便
當(dāng)然,@KafkaListener單條或者多條消息處理仍然是spring自行封裝處理,與kafka-client客戶端的拉取機(jī)制無關(guān);比如一次性拉取50條消息,對(duì)于單條處理來說就是循環(huán)50次處理,而多條消息處理則可以一次性處理50條;本質(zhì)上來說這套邏輯都是spring處理的,并不是說單條消費(fèi)就是通過kafka-client一次只拉取一條消息
在使用過程中需要注意spring自動(dòng)的創(chuàng)建的一些bean實(shí)例,當(dāng)然也可以覆蓋其自動(dòng)創(chuàng)建的實(shí)例以滿足特定的需求場景。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
springboot寶塔簡單部署的實(shí)現(xiàn)示例
通過使用Spring Boot,可以快速構(gòu)建出高效、可擴(kuò)展的應(yīng)用程序,而寶塔面板則提供了簡單易用的網(wǎng)站管理和維護(hù)工具,本文將詳細(xì)介紹如何將Spring Boot應(yīng)用程序與寶塔面板進(jìn)行集成,實(shí)現(xiàn)自動(dòng)化部署、配置管理等操作2023-11-11
java中的Arrays這個(gè)工具類你真的會(huì)用嗎(一文秒懂)
這篇文章主要介紹了java中的Arrays這個(gè)工具類你真的會(huì)用嗎,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06
IDEA插件EasyCode及MyBatis最優(yōu)配置步驟詳解
這篇文章主要介紹了IDEA插件EasyCode MyBatis最優(yōu)配置步驟詳解,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12
SpringMVC 使用JSR-303進(jìn)行校驗(yàn) @Valid示例
本篇文章主要介紹了SpringMVC 使用JSR-303進(jìn)行校驗(yàn) @Valid示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-02-02
JPA如何使用nativequery多表關(guān)聯(lián)查詢返回自定義實(shí)體類
這篇文章主要介紹了JPA如何使用nativequery多表關(guān)聯(lián)查詢返回自定義實(shí)體類,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
Java找不到或無法加載主類及編碼錯(cuò)誤問題的解決方案
今天小編就為大家分享一篇關(guān)于Java找不到或無法加載主類及編碼錯(cuò)誤問題的解決方案,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-02-02
SpringCloud如何利用Feign訪問外部http請求
這篇文章主要介紹了SpringCloud如何利用Feign訪問外部http請求,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03

