Spring?Boot?中使用@KafkaListener并發(fā)批量接收消息的完整代碼
kakfa是我們?cè)陧?xiàng)目開(kāi)發(fā)中經(jīng)常使用的消息中間件。由于它的寫(xiě)性能非常高,因此,經(jīng)常會(huì)碰到讀取Kafka消息隊(duì)列時(shí)擁堵的情況。遇到這種情況時(shí),有時(shí)我們不能直接清理整個(gè)topic,因?yàn)檫€有別的服務(wù)正在使用該topic。因此只能額外啟動(dòng)一個(gè)相同名稱的consumer-group來(lái)加快消息消費(fèi)(如果該topic只有一個(gè)分區(qū),再啟動(dòng)一個(gè)新的消費(fèi)者,沒(méi)有作用)。
完整的代碼在這里,歡迎加星號(hào)、fork。
官方文檔在https://docs.spring.io/spring-kafka/reference/html/_reference.html
###第一步,并發(fā)消費(fèi)###
先看代碼,重點(diǎn)是這我們使用的是ConcurrentKafkaListenerContainerFactory并且設(shè)置了factory.setConcurrency(4); (我的topic有4個(gè)分區(qū),為了加快消費(fèi)將并發(fā)設(shè)置為4,也就是有4個(gè)KafkaMessageListenerContainer)
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; }
注意也可以直接在application.properties中添加spring.kafka.listener.concurrency=3,然后使用@KafkaListener并發(fā)消費(fèi)。
###第二步,批量消費(fèi)###
然后是批量消費(fèi)。重點(diǎn)是factory.setBatchListener(true);
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
一個(gè)設(shè)啟用批量消費(fèi),一個(gè)設(shè)置批量消費(fèi)每次最多消費(fèi)多少條消息記錄。
重點(diǎn)說(shuō)明一下,我們?cè)O(shè)置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是說(shuō)如果沒(méi)有達(dá)到50條消息,我們就一直等待。官方的解釋是"The maximum number of records returned in a single call to poll().", 也就是50表示的是一次poll最多返回的記錄數(shù)。
從啟動(dòng)日志中可以看到還有個(gè) max.poll.interval.ms = 300000, 也就說(shuō)每間隔max.poll.interval.ms我們就調(diào)用一次poll。每次poll最多返回50條記錄。
max.poll.interval.ms官方解釋是"The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit()); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId()); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset()); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); return propsMap; }
啟動(dòng)日志截圖
關(guān)于max.poll.records和max.poll.interval.ms官方解釋截圖:
###第三步,分區(qū)消費(fèi)###
對(duì)于只有一個(gè)分區(qū)的topic,不需要分區(qū)消費(fèi),因?yàn)闆](méi)有意義。下面的例子是針對(duì)有2個(gè)分區(qū)的情況(我的完整代碼中有4個(gè)listenPartitionX方法,我的topic設(shè)置了4個(gè)分區(qū)),讀者可以根據(jù)自己的情況進(jìn)行調(diào)整。
public class MyListener { private static final String TPOIC = "topic02"; @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) }) public void listenPartition0(List<ConsumerRecord<?, ?>> records) { log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id0 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p0 Received message={}", message); } } } @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) }) public void listenPartition1(List<ConsumerRecord<?, ?>> records) { log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id1 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p1 Received message={}", message); } } }
關(guān)于分區(qū)和消費(fèi)者關(guān)系,后面會(huì)補(bǔ)充,先摘錄如下:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.
最后,總結(jié),如果我們的topic有多個(gè)分區(qū),經(jīng)過(guò)以上步驟可以很好的加快消息消費(fèi)。如果只有一個(gè)分區(qū),因?yàn)橐呀?jīng)有一個(gè)同名group id在消費(fèi)了,新啟動(dòng)的一個(gè)基本上沒(méi)有作用(本人測(cè)試結(jié)果)。
具體代碼在這里,歡迎加星號(hào),fork。
到此這篇關(guān)于Spring Boot 中使用@KafkaListener并發(fā)批量接收消息的文章就介紹到這了,更多相關(guān)Spring Boot 使用@KafkaListener并發(fā)批量接收消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Spring?Boot整合Kafka教程詳解
- 基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解
- SpringBoot如何獲取Kafka的Topic列表
- SpringBoot整合kafka遇到的版本不對(duì)應(yīng)問(wèn)題及解決
- SpringBoot+Nacos+Kafka微服務(wù)流編排的簡(jiǎn)單實(shí)現(xiàn)
- SpringBoot集成Kafka的步驟
- Spring Boot集群管理工具KafkaAdminClient使用方法解析
- Springboot集成Kafka實(shí)現(xiàn)producer和consumer的示例代碼
- Spring?Boot?基于?SCRAM?認(rèn)證集成?Kafka?的過(guò)程詳解
相關(guān)文章
SpringBoot整合數(shù)據(jù)庫(kù)訪問(wèn)層的實(shí)戰(zhàn)
本文主要介紹了SpringBoot整合數(shù)據(jù)庫(kù)訪問(wèn)層的實(shí)戰(zhàn),主要包含JdbcTemplate和mybatis框架的整合應(yīng)用,具有一定的參考價(jià)值,感興趣的可以了解一下2022-03-03Java存儲(chǔ)過(guò)程調(diào)用CallableStatement的方法
這篇文章主要介紹了Java存儲(chǔ)過(guò)程調(diào)用CallableStatement的方法,幫助大家更好的理解和學(xué)習(xí)Java,感興趣的朋友可以了解下2020-11-11Java SSH 秘鑰連接mysql數(shù)據(jù)庫(kù)的方法
這篇文章主要介紹了Java SSH 秘鑰連接mysql數(shù)據(jù)庫(kù)的方法,包括引入依賴的代碼和出現(xiàn)異常報(bào)錯(cuò)問(wèn)題,需要的朋友可以參考下2021-06-06spring boot開(kāi)發(fā)遇到坑之spring-boot-starter-web配置文件使用教程
Spring Boot支持容器的自動(dòng)配置,默認(rèn)是Tomcat,當(dāng)然我們也是可以進(jìn)行修改的。這篇文章給大家介紹了spring boot開(kāi)發(fā)遇到坑之spring-boot-starter-web配置文件使用教程,需要的朋友參考下吧2018-01-01mybatis plus 自動(dòng)轉(zhuǎn)駝峰配置小結(jié)
SpringBoot提供兩種配置Mybatis的方式,第一種是通過(guò)yml或application.properties文件開(kāi)啟配置,第二種是使用自定義配置類(lèi),通過(guò)給容器添加一個(gè)ConfigurationCustomizer來(lái)實(shí)現(xiàn)更靈活的配置,這兩種方法可以根據(jù)項(xiàng)目需求和個(gè)人喜好選擇使用2024-10-10詳解Springboot 優(yōu)雅停止服務(wù)的幾種方法
這篇文章主要介紹了詳解Springboot 優(yōu)雅停止服務(wù)的幾種方法 ,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08Java實(shí)現(xiàn)淘寶秒殺聚劃算搶購(gòu)自動(dòng)提醒源碼
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)淘寶秒殺聚劃算搶購(gòu)自動(dòng)提醒源碼,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-02-02