SpringKafka消息消費之@KafkaListener與消費組配置方式
引言
Apache Kafka作為高吞吐量的分布式消息系統(tǒng),在大數(shù)據(jù)處理和微服務架構中扮演著關鍵角色。
Spring Kafka為Java開發(fā)者提供了簡潔易用的Kafka消費者API,特別是通過@KafkaListener注解,極大地簡化了消息消費的實現(xiàn)過程。
本文將深入探討Spring Kafka的消息消費機制,重點關注@KafkaListener注解的使用方法和消費組配置策略,幫助開發(fā)者構建高效穩(wěn)定的消息消費系統(tǒng)。
一、Spring Kafka消費者基礎配置
使用Spring Kafka進行消息消費的第一步是配置消費者工廠和監(jiān)聽器容器工廠。
這些配置定義了消費者的基本行為,包括服務器地址、消息反序列化方式等。
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 使JsonDeserializer信任所有包
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}二、@KafkaListener注解使用
@KafkaListener是Spring Kafka提供的核心注解,用于將方法標記為Kafka消息監(jiān)聽器。
通過簡單的注解配置,就能實現(xiàn)消息的自動消費和處理。
@Service
public class KafkaConsumerService {
// 基本用法:監(jiān)聽單個主題
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void listen(String message) {
System.out.println("接收到消息:" + message);
}
// 監(jiān)聽多個主題
@KafkaListener(topics = {"topic1", "topic2"}, groupId = "multi-topic-group")
public void listenMultipleTopics(String message) {
System.out.println("從多個主題接收到消息:" + message);
}
// 指定分區(qū)監(jiān)聽
@KafkaListener(topicPartitions = {
@TopicPartition(topic = "partitioned-topic", partitions = {"0", "1"})
}, groupId = "partitioned-group")
public void listenPartitions(String message) {
System.out.println("從特定分區(qū)接收到消息:" + message);
}
// 使用ConsumerRecord獲取消息元數(shù)據(jù)
@KafkaListener(topics = "metadata-topic", groupId = "metadata-group")
public void listenWithMetadata(ConsumerRecord<String, String> record) {
System.out.println("主題:" + record.topic() +
",分區(qū):" + record.partition() +
",偏移量:" + record.offset() +
",鍵:" + record.key() +
",值:" + record.value());
}
// 批量消費
@KafkaListener(topics = "batch-topic", groupId = "batch-group",
containerFactory = "batchListenerFactory")
public void listenBatch(List<String> messages) {
System.out.println("接收到批量消息,數(shù)量:" + messages.size());
messages.forEach(message -> System.out.println("批量消息:" + message));
}
}配置批量消費需要額外的批處理監(jiān)聽器容器工廠:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // 啟用批量監(jiān)聽
factory.getContainerProperties().setPollTimeout(3000); // 輪詢超時時間
return factory;
}三、消費組配置與負載均衡
Kafka的消費組機制是實現(xiàn)消息消費負載均衡的關鍵。同一組內的多個消費者實例會自動分配主題分區(qū),確保每個分區(qū)只被一個消費者處理,實現(xiàn)并行消費。
// 配置消費組屬性
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// 基本配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// 消費組配置
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-application-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁用自動提交
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 單次輪詢最大記錄數(shù)
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 會話超時時間
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // 心跳間隔
return new DefaultKafkaConsumerFactory<>(props);
}多個消費者可以通過配置相同的組ID來實現(xiàn)負載均衡:
// 消費者1
@KafkaListener(topics = "shared-topic", groupId = "shared-group")
public void consumer1(String message) {
System.out.println("消費者1接收到消息:" + message);
}
// 消費者2
@KafkaListener(topics = "shared-topic", groupId = "shared-group")
public void consumer2(String message) {
System.out.println("消費者2接收到消息:" + message);
}當這兩個消費者同時運行時,Kafka會自動將主題分區(qū)分配給它們,每個消費者只處理分配給它的分區(qū)中的消息。
四、手動提交偏移量
在某些場景下,自動提交偏移量可能無法滿足需求,此時可以配置手動提交。手動提交允許更精確地控制消息消費的確認時機,確保在消息完全處理后才提交偏移量。
@Configuration
public class ManualCommitConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> manualCommitFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
@Service
public class ManualCommitService {
@KafkaListener(topics = "manual-commit-topic",
groupId = "manual-group",
containerFactory = "manualCommitFactory")
public void listenWithManualCommit(String message, Acknowledgment ack) {
try {
System.out.println("處理消息:" + message);
// 處理消息的業(yè)務邏輯
// ...
// 成功處理后確認消息
ack.acknowledge();
} catch (Exception e) {
// 異常處理,可以選擇不確認
System.err.println("消息處理失?。? + e.getMessage());
}
}
}五、錯誤處理與重試機制
消息消費過程中可能會遇到各種異常,Spring Kafka提供了全面的錯誤處理機制,包括重試、死信隊列等。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> retryListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 配置重試
factory.setRetryTemplate(retryTemplate());
// 配置恢復回調
factory.setRecoveryCallback(context -> {
ConsumerRecord<String, String> record =
(ConsumerRecord<String, String>) context.getAttribute("record");
System.err.println("重試失敗,發(fā)送到死信隊列:" + record.value());
// 可以將消息發(fā)送到死信主題
// kafkaTemplate.send("dead-letter-topic", record.value());
return null;
});
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
// 固定間隔重試策略
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000); // 1秒重試間隔
template.setBackOffPolicy(backOffPolicy);
// 簡單重試策略
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3); // 最大重試次數(shù)
template.setRetryPolicy(retryPolicy);
return template;
}
@KafkaListener(topics = "retry-topic", groupId = "retry-group",
containerFactory = "retryListenerFactory")
public void listenWithRetry(String message) {
System.out.println("接收到需要重試處理的消息:" + message);
// 模擬處理失敗
if (message.contains("error")) {
throw new RuntimeException("處理失敗,將重試");
}
System.out.println("消息處理成功");
}總結
Spring Kafka通過@KafkaListener注解和靈活的消費組配置,為開發(fā)者提供了強大的消息消費能力。
本文介紹了基本配置、@KafkaListener的使用方法、消費組機制、手動提交偏移量以及錯誤處理策略。
在實際應用中,開發(fā)者應根據(jù)業(yè)務需求選擇合適的消費模式和配置策略,以實現(xiàn)高效可靠的消息處理。
合理利用消費組可以實現(xiàn)負載均衡和水平擴展,而手動提交偏移量和錯誤處理機制則能提升系統(tǒng)的健壯性。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Spring Boot實現(xiàn)圖片上傳/加水印一把梭操作實例代碼
這篇文章主要給大家介紹了關于Spring Boot實現(xiàn)圖片上傳/加水印一把梭操作的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2018-11-11
Caused by: java.lang.ClassNotFoundException: org.apache.comm
這篇文章主要介紹了Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type異常,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-07-07

