SpringKafka消息消費(fèi)之@KafkaListener與消費(fèi)組配置方式
引言
Apache Kafka作為高吞吐量的分布式消息系統(tǒng),在大數(shù)據(jù)處理和微服務(wù)架構(gòu)中扮演著關(guān)鍵角色。
Spring Kafka為Java開發(fā)者提供了簡(jiǎn)潔易用的Kafka消費(fèi)者API,特別是通過@KafkaListener注解,極大地簡(jiǎn)化了消息消費(fèi)的實(shí)現(xiàn)過程。
本文將深入探討Spring Kafka的消息消費(fèi)機(jī)制,重點(diǎn)關(guān)注@KafkaListener注解的使用方法和消費(fèi)組配置策略,幫助開發(fā)者構(gòu)建高效穩(wěn)定的消息消費(fèi)系統(tǒng)。
一、Spring Kafka消費(fèi)者基礎(chǔ)配置
使用Spring Kafka進(jìn)行消息消費(fèi)的第一步是配置消費(fèi)者工廠和監(jiān)聽器容器工廠。
這些配置定義了消費(fèi)者的基本行為,包括服務(wù)器地址、消息反序列化方式等。
@Configuration @EnableKafka public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 使JsonDeserializer信任所有包 props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
二、@KafkaListener注解使用
@KafkaListener是Spring Kafka提供的核心注解,用于將方法標(biāo)記為Kafka消息監(jiān)聽器。
通過簡(jiǎn)單的注解配置,就能實(shí)現(xiàn)消息的自動(dòng)消費(fèi)和處理。
@Service public class KafkaConsumerService { // 基本用法:監(jiān)聽單個(gè)主題 @KafkaListener(topics = "test-topic", groupId = "test-group") public void listen(String message) { System.out.println("接收到消息:" + message); } // 監(jiān)聽多個(gè)主題 @KafkaListener(topics = {"topic1", "topic2"}, groupId = "multi-topic-group") public void listenMultipleTopics(String message) { System.out.println("從多個(gè)主題接收到消息:" + message); } // 指定分區(qū)監(jiān)聽 @KafkaListener(topicPartitions = { @TopicPartition(topic = "partitioned-topic", partitions = {"0", "1"}) }, groupId = "partitioned-group") public void listenPartitions(String message) { System.out.println("從特定分區(qū)接收到消息:" + message); } // 使用ConsumerRecord獲取消息元數(shù)據(jù) @KafkaListener(topics = "metadata-topic", groupId = "metadata-group") public void listenWithMetadata(ConsumerRecord<String, String> record) { System.out.println("主題:" + record.topic() + ",分區(qū):" + record.partition() + ",偏移量:" + record.offset() + ",鍵:" + record.key() + ",值:" + record.value()); } // 批量消費(fèi) @KafkaListener(topics = "batch-topic", groupId = "batch-group", containerFactory = "batchListenerFactory") public void listenBatch(List<String> messages) { System.out.println("接收到批量消息,數(shù)量:" + messages.size()); messages.forEach(message -> System.out.println("批量消息:" + message)); } }
配置批量消費(fèi)需要額外的批處理監(jiān)聽器容器工廠:
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> batchListenerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); // 啟用批量監(jiān)聽 factory.getContainerProperties().setPollTimeout(3000); // 輪詢超時(shí)時(shí)間 return factory; }
三、消費(fèi)組配置與負(fù)載均衡
Kafka的消費(fèi)組機(jī)制是實(shí)現(xiàn)消息消費(fèi)負(fù)載均衡的關(guān)鍵。同一組內(nèi)的多個(gè)消費(fèi)者實(shí)例會(huì)自動(dòng)分配主題分區(qū),確保每個(gè)分區(qū)只被一個(gè)消費(fèi)者處理,實(shí)現(xiàn)并行消費(fèi)。
// 配置消費(fèi)組屬性 @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> props = new HashMap<>(); // 基本配置 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); // 消費(fèi)組配置 props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-application-group"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁用自動(dòng)提交 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 單次輪詢最大記錄數(shù) props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 會(huì)話超時(shí)時(shí)間 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // 心跳間隔 return new DefaultKafkaConsumerFactory<>(props); }
多個(gè)消費(fèi)者可以通過配置相同的組ID來實(shí)現(xiàn)負(fù)載均衡:
// 消費(fèi)者1 @KafkaListener(topics = "shared-topic", groupId = "shared-group") public void consumer1(String message) { System.out.println("消費(fèi)者1接收到消息:" + message); } // 消費(fèi)者2 @KafkaListener(topics = "shared-topic", groupId = "shared-group") public void consumer2(String message) { System.out.println("消費(fèi)者2接收到消息:" + message); }
當(dāng)這兩個(gè)消費(fèi)者同時(shí)運(yùn)行時(shí),Kafka會(huì)自動(dòng)將主題分區(qū)分配給它們,每個(gè)消費(fèi)者只處理分配給它的分區(qū)中的消息。
四、手動(dòng)提交偏移量
在某些場(chǎng)景下,自動(dòng)提交偏移量可能無法滿足需求,此時(shí)可以配置手動(dòng)提交。手動(dòng)提交允許更精確地控制消息消費(fèi)的確認(rèn)時(shí)機(jī),確保在消息完全處理后才提交偏移量。
@Configuration public class ManualCommitConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, String> manualCommitFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; } } @Service public class ManualCommitService { @KafkaListener(topics = "manual-commit-topic", groupId = "manual-group", containerFactory = "manualCommitFactory") public void listenWithManualCommit(String message, Acknowledgment ack) { try { System.out.println("處理消息:" + message); // 處理消息的業(yè)務(wù)邏輯 // ... // 成功處理后確認(rèn)消息 ack.acknowledge(); } catch (Exception e) { // 異常處理,可以選擇不確認(rèn) System.err.println("消息處理失?。? + e.getMessage()); } } }
五、錯(cuò)誤處理與重試機(jī)制
消息消費(fèi)過程中可能會(huì)遇到各種異常,Spring Kafka提供了全面的錯(cuò)誤處理機(jī)制,包括重試、死信隊(duì)列等。
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> retryListenerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 配置重試 factory.setRetryTemplate(retryTemplate()); // 配置恢復(fù)回調(diào) factory.setRecoveryCallback(context -> { ConsumerRecord<String, String> record = (ConsumerRecord<String, String>) context.getAttribute("record"); System.err.println("重試失敗,發(fā)送到死信隊(duì)列:" + record.value()); // 可以將消息發(fā)送到死信主題 // kafkaTemplate.send("dead-letter-topic", record.value()); return null; }); return factory; } private RetryTemplate retryTemplate() { RetryTemplate template = new RetryTemplate(); // 固定間隔重試策略 FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(1000); // 1秒重試間隔 template.setBackOffPolicy(backOffPolicy); // 簡(jiǎn)單重試策略 SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(3); // 最大重試次數(shù) template.setRetryPolicy(retryPolicy); return template; } @KafkaListener(topics = "retry-topic", groupId = "retry-group", containerFactory = "retryListenerFactory") public void listenWithRetry(String message) { System.out.println("接收到需要重試處理的消息:" + message); // 模擬處理失敗 if (message.contains("error")) { throw new RuntimeException("處理失敗,將重試"); } System.out.println("消息處理成功"); }
總結(jié)
Spring Kafka通過@KafkaListener注解和靈活的消費(fèi)組配置,為開發(fā)者提供了強(qiáng)大的消息消費(fèi)能力。
本文介紹了基本配置、@KafkaListener的使用方法、消費(fèi)組機(jī)制、手動(dòng)提交偏移量以及錯(cuò)誤處理策略。
在實(shí)際應(yīng)用中,開發(fā)者應(yīng)根據(jù)業(yè)務(wù)需求選擇合適的消費(fèi)模式和配置策略,以實(shí)現(xiàn)高效可靠的消息處理。
合理利用消費(fèi)組可以實(shí)現(xiàn)負(fù)載均衡和水平擴(kuò)展,而手動(dòng)提交偏移量和錯(cuò)誤處理機(jī)制則能提升系統(tǒng)的健壯性。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
springboot集成Feign的實(shí)現(xiàn)示例
Feign是聲明式HTTP客戶端,用于簡(jiǎn)化微服務(wù)之間的REST調(diào)用,本文就來介紹一下springboot集成Feign的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-11-11Spring Boot實(shí)現(xiàn)圖片上傳/加水印一把梭操作實(shí)例代碼
這篇文章主要給大家介紹了關(guān)于Spring Boot實(shí)現(xiàn)圖片上傳/加水印一把梭操作的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-11-11Caused by: java.lang.ClassNotFoundException: org.apache.comm
這篇文章主要介紹了Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type異常,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07java實(shí)現(xiàn)兩臺(tái)服務(wù)器間文件復(fù)制的方法
這篇文章主要介紹了java實(shí)現(xiàn)兩臺(tái)服務(wù)器間文件復(fù)制的方法,是對(duì)單臺(tái)服務(wù)器上文件復(fù)制功能的升級(jí)與改進(jìn),具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-01-01