Spring?Boot?中使用@KafkaListener并發(fā)批量接收消息的完整代碼
kakfa是我們在項(xiàng)目開發(fā)中經(jīng)常使用的消息中間件。由于它的寫性能非常高,因此,經(jīng)常會碰到讀取Kafka消息隊列時擁堵的情況。遇到這種情況時,有時我們不能直接清理整個topic,因?yàn)檫€有別的服務(wù)正在使用該topic。因此只能額外啟動一個相同名稱的consumer-group來加快消息消費(fèi)(如果該topic只有一個分區(qū),再啟動一個新的消費(fèi)者,沒有作用)。
完整的代碼在這里,歡迎加星號、fork。
官方文檔在https://docs.spring.io/spring-kafka/reference/html/_reference.html
###第一步,并發(fā)消費(fèi)###
先看代碼,重點(diǎn)是這我們使用的是ConcurrentKafkaListenerContainerFactory并且設(shè)置了factory.setConcurrency(4); (我的topic有4個分區(qū),為了加快消費(fèi)將并發(fā)設(shè)置為4,也就是有4個KafkaMessageListenerContainer)
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; }
注意也可以直接在application.properties中添加spring.kafka.listener.concurrency=3,然后使用@KafkaListener并發(fā)消費(fèi)。
###第二步,批量消費(fèi)###
然后是批量消費(fèi)。重點(diǎn)是factory.setBatchListener(true);
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
一個設(shè)啟用批量消費(fèi),一個設(shè)置批量消費(fèi)每次最多消費(fèi)多少條消息記錄。
重點(diǎn)說明一下,我們設(shè)置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是說如果沒有達(dá)到50條消息,我們就一直等待。官方的解釋是"The maximum number of records returned in a single call to poll().", 也就是50表示的是一次poll最多返回的記錄數(shù)。
從啟動日志中可以看到還有個 max.poll.interval.ms = 300000, 也就說每間隔max.poll.interval.ms我們就調(diào)用一次poll。每次poll最多返回50條記錄。
max.poll.interval.ms官方解釋是"The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit()); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId()); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset()); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); return propsMap; }
啟動日志截圖
關(guān)于max.poll.records和max.poll.interval.ms官方解釋截圖:
###第三步,分區(qū)消費(fèi)###
對于只有一個分區(qū)的topic,不需要分區(qū)消費(fèi),因?yàn)闆]有意義。下面的例子是針對有2個分區(qū)的情況(我的完整代碼中有4個listenPartitionX方法,我的topic設(shè)置了4個分區(qū)),讀者可以根據(jù)自己的情況進(jìn)行調(diào)整。
public class MyListener { private static final String TPOIC = "topic02"; @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) }) public void listenPartition0(List<ConsumerRecord<?, ?>> records) { log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id0 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p0 Received message={}", message); } } } @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) }) public void listenPartition1(List<ConsumerRecord<?, ?>> records) { log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id1 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p1 Received message={}", message); } } }
關(guān)于分區(qū)和消費(fèi)者關(guān)系,后面會補(bǔ)充,先摘錄如下:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.
最后,總結(jié),如果我們的topic有多個分區(qū),經(jīng)過以上步驟可以很好的加快消息消費(fèi)。如果只有一個分區(qū),因?yàn)橐呀?jīng)有一個同名group id在消費(fèi)了,新啟動的一個基本上沒有作用(本人測試結(jié)果)。
具體代碼在這里,歡迎加星號,fork。
到此這篇關(guān)于Spring Boot 中使用@KafkaListener并發(fā)批量接收消息的文章就介紹到這了,更多相關(guān)Spring Boot 使用@KafkaListener并發(fā)批量接收消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Spring?Boot整合Kafka教程詳解
- 基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解
- SpringBoot如何獲取Kafka的Topic列表
- SpringBoot整合kafka遇到的版本不對應(yīng)問題及解決
- SpringBoot+Nacos+Kafka微服務(wù)流編排的簡單實(shí)現(xiàn)
- SpringBoot集成Kafka的步驟
- Spring Boot集群管理工具KafkaAdminClient使用方法解析
- Springboot集成Kafka實(shí)現(xiàn)producer和consumer的示例代碼
- Spring?Boot?基于?SCRAM?認(rèn)證集成?Kafka?的過程詳解
相關(guān)文章
SpringBoot整合數(shù)據(jù)庫訪問層的實(shí)戰(zhàn)
本文主要介紹了SpringBoot整合數(shù)據(jù)庫訪問層的實(shí)戰(zhàn),主要包含JdbcTemplate和mybatis框架的整合應(yīng)用,具有一定的參考價值,感興趣的可以了解一下2022-03-03Java存儲過程調(diào)用CallableStatement的方法
這篇文章主要介紹了Java存儲過程調(diào)用CallableStatement的方法,幫助大家更好的理解和學(xué)習(xí)Java,感興趣的朋友可以了解下2020-11-11Java SSH 秘鑰連接mysql數(shù)據(jù)庫的方法
這篇文章主要介紹了Java SSH 秘鑰連接mysql數(shù)據(jù)庫的方法,包括引入依賴的代碼和出現(xiàn)異常報錯問題,需要的朋友可以參考下2021-06-06spring boot開發(fā)遇到坑之spring-boot-starter-web配置文件使用教程
Spring Boot支持容器的自動配置,默認(rèn)是Tomcat,當(dāng)然我們也是可以進(jìn)行修改的。這篇文章給大家介紹了spring boot開發(fā)遇到坑之spring-boot-starter-web配置文件使用教程,需要的朋友參考下吧2018-01-01mybatis plus 自動轉(zhuǎn)駝峰配置小結(jié)
SpringBoot提供兩種配置Mybatis的方式,第一種是通過yml或application.properties文件開啟配置,第二種是使用自定義配置類,通過給容器添加一個ConfigurationCustomizer來實(shí)現(xiàn)更靈活的配置,這兩種方法可以根據(jù)項(xiàng)目需求和個人喜好選擇使用2024-10-10詳解Springboot 優(yōu)雅停止服務(wù)的幾種方法
這篇文章主要介紹了詳解Springboot 優(yōu)雅停止服務(wù)的幾種方法 ,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08Java實(shí)現(xiàn)淘寶秒殺聚劃算搶購自動提醒源碼
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)淘寶秒殺聚劃算搶購自動提醒源碼,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-02-02