欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringKafka消息消費之@KafkaListener與消費組配置方式

 更新時間:2025年05月23日 08:47:05   作者:程序媛學姐  
這篇文章主要介紹了SpringKafka消息消費之@KafkaListener與消費組配置方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

引言

Apache Kafka作為高吞吐量的分布式消息系統(tǒng),在大數(shù)據(jù)處理和微服務架構中扮演著關鍵角色。

Spring Kafka為Java開發(fā)者提供了簡潔易用的Kafka消費者API,特別是通過@KafkaListener注解,極大地簡化了消息消費的實現(xiàn)過程。

本文將深入探討Spring Kafka的消息消費機制,重點關注@KafkaListener注解的使用方法和消費組配置策略,幫助開發(fā)者構建高效穩(wěn)定的消息消費系統(tǒng)。

一、Spring Kafka消費者基礎配置

使用Spring Kafka進行消息消費的第一步是配置消費者工廠和監(jiān)聽器容器工廠。

這些配置定義了消費者的基本行為,包括服務器地址、消息反序列化方式等。

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 使JsonDeserializer信任所有包
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

二、@KafkaListener注解使用

@KafkaListener是Spring Kafka提供的核心注解,用于將方法標記為Kafka消息監(jiān)聽器。

通過簡單的注解配置,就能實現(xiàn)消息的自動消費和處理。

@Service
public class KafkaConsumerService {

    // 基本用法:監(jiān)聽單個主題
    @KafkaListener(topics = "test-topic", groupId = "test-group")
    public void listen(String message) {
        System.out.println("接收到消息:" + message);
    }
    
    // 監(jiān)聽多個主題
    @KafkaListener(topics = {"topic1", "topic2"}, groupId = "multi-topic-group")
    public void listenMultipleTopics(String message) {
        System.out.println("從多個主題接收到消息:" + message);
    }
    
    // 指定分區(qū)監(jiān)聽
    @KafkaListener(topicPartitions = {
        @TopicPartition(topic = "partitioned-topic", partitions = {"0", "1"})
    }, groupId = "partitioned-group")
    public void listenPartitions(String message) {
        System.out.println("從特定分區(qū)接收到消息:" + message);
    }
    
    // 使用ConsumerRecord獲取消息元數(shù)據(jù)
    @KafkaListener(topics = "metadata-topic", groupId = "metadata-group")
    public void listenWithMetadata(ConsumerRecord<String, String> record) {
        System.out.println("主題:" + record.topic() + 
                          ",分區(qū):" + record.partition() +
                          ",偏移量:" + record.offset() +
                          ",鍵:" + record.key() +
                          ",值:" + record.value());
    }
    
    // 批量消費
    @KafkaListener(topics = "batch-topic", groupId = "batch-group", 
                  containerFactory = "batchListenerFactory")
    public void listenBatch(List<String> messages) {
        System.out.println("接收到批量消息,數(shù)量:" + messages.size());
        messages.forEach(message -> System.out.println("批量消息:" + message));
    }
}

配置批量消費需要額外的批處理監(jiān)聽器容器工廠:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchListenerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // 啟用批量監(jiān)聽
    factory.getContainerProperties().setPollTimeout(3000);  // 輪詢超時時間
    return factory;
}

三、消費組配置與負載均衡

Kafka的消費組機制是實現(xiàn)消息消費負載均衡的關鍵。同一組內的多個消費者實例會自動分配主題分區(qū),確保每個分區(qū)只被一個消費者處理,實現(xiàn)并行消費。

// 配置消費組屬性
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    // 基本配置
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    
    // 消費組配置
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-application-group");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // 禁用自動提交
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);  // 單次輪詢最大記錄數(shù)
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);  // 會話超時時間
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);  // 心跳間隔
    
    return new DefaultKafkaConsumerFactory<>(props);
}

多個消費者可以通過配置相同的組ID來實現(xiàn)負載均衡:

// 消費者1
@KafkaListener(topics = "shared-topic", groupId = "shared-group")
public void consumer1(String message) {
    System.out.println("消費者1接收到消息:" + message);
}

// 消費者2
@KafkaListener(topics = "shared-topic", groupId = "shared-group")
public void consumer2(String message) {
    System.out.println("消費者2接收到消息:" + message);
}

當這兩個消費者同時運行時,Kafka會自動將主題分區(qū)分配給它們,每個消費者只處理分配給它的分區(qū)中的消息。

四、手動提交偏移量

在某些場景下,自動提交偏移量可能無法滿足需求,此時可以配置手動提交。手動提交允許更精確地控制消息消費的確認時機,確保在消息完全處理后才提交偏移量。

@Configuration
public class ManualCommitConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> manualCommitFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

@Service
public class ManualCommitService {
    
    @KafkaListener(topics = "manual-commit-topic", 
                  groupId = "manual-group",
                  containerFactory = "manualCommitFactory")
    public void listenWithManualCommit(String message, Acknowledgment ack) {
        try {
            System.out.println("處理消息:" + message);
            // 處理消息的業(yè)務邏輯
            // ...
            // 成功處理后確認消息
            ack.acknowledge();
        } catch (Exception e) {
            // 異常處理,可以選擇不確認
            System.err.println("消息處理失?。? + e.getMessage());
        }
    }
}

五、錯誤處理與重試機制

消息消費過程中可能會遇到各種異常,Spring Kafka提供了全面的錯誤處理機制,包括重試、死信隊列等。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> retryListenerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    
    // 配置重試
    factory.setRetryTemplate(retryTemplate());
    
    // 配置恢復回調
    factory.setRecoveryCallback(context -> {
        ConsumerRecord<String, String> record = 
            (ConsumerRecord<String, String>) context.getAttribute("record");
        System.err.println("重試失敗,發(fā)送到死信隊列:" + record.value());
        // 可以將消息發(fā)送到死信主題
        // kafkaTemplate.send("dead-letter-topic", record.value());
        return null;
    });
    
    return factory;
}

private RetryTemplate retryTemplate() {
    RetryTemplate template = new RetryTemplate();
    
    // 固定間隔重試策略
    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(1000);  // 1秒重試間隔
    template.setBackOffPolicy(backOffPolicy);
    
    // 簡單重試策略
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(3);  // 最大重試次數(shù)
    template.setRetryPolicy(retryPolicy);
    
    return template;
}

@KafkaListener(topics = "retry-topic", groupId = "retry-group", 
               containerFactory = "retryListenerFactory")
public void listenWithRetry(String message) {
    System.out.println("接收到需要重試處理的消息:" + message);
    // 模擬處理失敗
    if (message.contains("error")) {
        throw new RuntimeException("處理失敗,將重試");
    }
    System.out.println("消息處理成功");
}

總結

Spring Kafka通過@KafkaListener注解和靈活的消費組配置,為開發(fā)者提供了強大的消息消費能力。

本文介紹了基本配置、@KafkaListener的使用方法、消費組機制、手動提交偏移量以及錯誤處理策略。

在實際應用中,開發(fā)者應根據(jù)業(yè)務需求選擇合適的消費模式和配置策略,以實現(xiàn)高效可靠的消息處理。

合理利用消費組可以實現(xiàn)負載均衡和水平擴展,而手動提交偏移量和錯誤處理機制則能提升系統(tǒng)的健壯性。

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

  • springboot集成Feign的實現(xiàn)示例

    springboot集成Feign的實現(xiàn)示例

    Feign是聲明式HTTP客戶端,用于簡化微服務之間的REST調用,本文就來介紹一下springboot集成Feign的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-11-11
  • 使用Java和ffmpeg把音頻和視頻合成視頻的操作方法

    使用Java和ffmpeg把音頻和視頻合成視頻的操作方法

    這篇文章主要介紹了使用Java和ffmpeg把音頻和視頻合成視頻,本文通過實例代碼給大家介紹的非常詳細,對大家的工作或學習具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-03-03
  • Java Dubbo框架知識點梳理

    Java Dubbo框架知識點梳理

    這篇文章主要介紹了Java Dubbo框架知識點梳理,通過詳細的文字講解和代碼實例,梳理了Dubbo這個框架,需要的朋友可以參考下
    2021-06-06
  • 詳解多線程及Runable 和Thread的區(qū)別

    詳解多線程及Runable 和Thread的區(qū)別

    這篇文章主要介紹了多線程及Runable 和Thread的區(qū)別,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-04-04
  • Spring Boot實現(xiàn)圖片上傳/加水印一把梭操作實例代碼

    Spring Boot實現(xiàn)圖片上傳/加水印一把梭操作實例代碼

    這篇文章主要給大家介紹了關于Spring Boot實現(xiàn)圖片上傳/加水印一把梭操作的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2018-11-11
  • Caused by: java.lang.ClassNotFoundException: org.apache.commons.collections.Transformer異常

    Caused by: java.lang.ClassNotFoundException: org.apache.comm

    這篇文章主要介紹了Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type異常,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-07-07
  • java實現(xiàn)兩臺服務器間文件復制的方法

    java實現(xiàn)兩臺服務器間文件復制的方法

    這篇文章主要介紹了java實現(xiàn)兩臺服務器間文件復制的方法,是對單臺服務器上文件復制功能的升級與改進,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-01-01
  • Java Calendar類使用案例詳解

    Java Calendar類使用案例詳解

    這篇文章主要介紹了Java Calendar類使用案例詳解,本篇文章通過簡要的案例,講解了該項技術的了解與使用,以下就是詳細內容,需要的朋友可以參考下
    2021-08-08
  • spring boot2.0總結介紹

    spring boot2.0總結介紹

    今天小編就為大家分享一篇關于spring boot2.0總結介紹,小編覺得內容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2018-12-12
  • Java 如何同時返回多個不同類型

    Java 如何同時返回多個不同類型

    這篇文章主要介紹了Java 同時返回多個不同類型的方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08

最新評論