欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringKafka消息消費(fèi)之@KafkaListener與消費(fèi)組配置方式

 更新時(shí)間:2025年05月23日 08:47:05   作者:程序媛學(xué)姐  
這篇文章主要介紹了SpringKafka消息消費(fèi)之@KafkaListener與消費(fèi)組配置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

引言

Apache Kafka作為高吞吐量的分布式消息系統(tǒng),在大數(shù)據(jù)處理和微服務(wù)架構(gòu)中扮演著關(guān)鍵角色。

Spring Kafka為Java開發(fā)者提供了簡(jiǎn)潔易用的Kafka消費(fèi)者API,特別是通過@KafkaListener注解,極大地簡(jiǎn)化了消息消費(fèi)的實(shí)現(xiàn)過程。

本文將深入探討Spring Kafka的消息消費(fèi)機(jī)制,重點(diǎn)關(guān)注@KafkaListener注解的使用方法和消費(fèi)組配置策略,幫助開發(fā)者構(gòu)建高效穩(wěn)定的消息消費(fèi)系統(tǒng)。

一、Spring Kafka消費(fèi)者基礎(chǔ)配置

使用Spring Kafka進(jìn)行消息消費(fèi)的第一步是配置消費(fèi)者工廠和監(jiān)聽器容器工廠。

這些配置定義了消費(fèi)者的基本行為,包括服務(wù)器地址、消息反序列化方式等。

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 使JsonDeserializer信任所有包
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

二、@KafkaListener注解使用

@KafkaListener是Spring Kafka提供的核心注解,用于將方法標(biāo)記為Kafka消息監(jiān)聽器。

通過簡(jiǎn)單的注解配置,就能實(shí)現(xiàn)消息的自動(dòng)消費(fèi)和處理。

@Service
public class KafkaConsumerService {

    // 基本用法:監(jiān)聽單個(gè)主題
    @KafkaListener(topics = "test-topic", groupId = "test-group")
    public void listen(String message) {
        System.out.println("接收到消息:" + message);
    }
    
    // 監(jiān)聽多個(gè)主題
    @KafkaListener(topics = {"topic1", "topic2"}, groupId = "multi-topic-group")
    public void listenMultipleTopics(String message) {
        System.out.println("從多個(gè)主題接收到消息:" + message);
    }
    
    // 指定分區(qū)監(jiān)聽
    @KafkaListener(topicPartitions = {
        @TopicPartition(topic = "partitioned-topic", partitions = {"0", "1"})
    }, groupId = "partitioned-group")
    public void listenPartitions(String message) {
        System.out.println("從特定分區(qū)接收到消息:" + message);
    }
    
    // 使用ConsumerRecord獲取消息元數(shù)據(jù)
    @KafkaListener(topics = "metadata-topic", groupId = "metadata-group")
    public void listenWithMetadata(ConsumerRecord<String, String> record) {
        System.out.println("主題:" + record.topic() + 
                          ",分區(qū):" + record.partition() +
                          ",偏移量:" + record.offset() +
                          ",鍵:" + record.key() +
                          ",值:" + record.value());
    }
    
    // 批量消費(fèi)
    @KafkaListener(topics = "batch-topic", groupId = "batch-group", 
                  containerFactory = "batchListenerFactory")
    public void listenBatch(List<String> messages) {
        System.out.println("接收到批量消息,數(shù)量:" + messages.size());
        messages.forEach(message -> System.out.println("批量消息:" + message));
    }
}

配置批量消費(fèi)需要額外的批處理監(jiān)聽器容器工廠:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchListenerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // 啟用批量監(jiān)聽
    factory.getContainerProperties().setPollTimeout(3000);  // 輪詢超時(shí)時(shí)間
    return factory;
}

三、消費(fèi)組配置與負(fù)載均衡

Kafka的消費(fèi)組機(jī)制是實(shí)現(xiàn)消息消費(fèi)負(fù)載均衡的關(guān)鍵。同一組內(nèi)的多個(gè)消費(fèi)者實(shí)例會(huì)自動(dòng)分配主題分區(qū),確保每個(gè)分區(qū)只被一個(gè)消費(fèi)者處理,實(shí)現(xiàn)并行消費(fèi)。

// 配置消費(fèi)組屬性
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    // 基本配置
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    
    // 消費(fèi)組配置
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-application-group");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // 禁用自動(dòng)提交
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);  // 單次輪詢最大記錄數(shù)
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);  // 會(huì)話超時(shí)時(shí)間
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);  // 心跳間隔
    
    return new DefaultKafkaConsumerFactory<>(props);
}

多個(gè)消費(fèi)者可以通過配置相同的組ID來實(shí)現(xiàn)負(fù)載均衡:

// 消費(fèi)者1
@KafkaListener(topics = "shared-topic", groupId = "shared-group")
public void consumer1(String message) {
    System.out.println("消費(fèi)者1接收到消息:" + message);
}

// 消費(fèi)者2
@KafkaListener(topics = "shared-topic", groupId = "shared-group")
public void consumer2(String message) {
    System.out.println("消費(fèi)者2接收到消息:" + message);
}

當(dāng)這兩個(gè)消費(fèi)者同時(shí)運(yùn)行時(shí),Kafka會(huì)自動(dòng)將主題分區(qū)分配給它們,每個(gè)消費(fèi)者只處理分配給它的分區(qū)中的消息。

四、手動(dòng)提交偏移量

在某些場(chǎng)景下,自動(dòng)提交偏移量可能無法滿足需求,此時(shí)可以配置手動(dòng)提交。手動(dòng)提交允許更精確地控制消息消費(fèi)的確認(rèn)時(shí)機(jī),確保在消息完全處理后才提交偏移量。

@Configuration
public class ManualCommitConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> manualCommitFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

@Service
public class ManualCommitService {
    
    @KafkaListener(topics = "manual-commit-topic", 
                  groupId = "manual-group",
                  containerFactory = "manualCommitFactory")
    public void listenWithManualCommit(String message, Acknowledgment ack) {
        try {
            System.out.println("處理消息:" + message);
            // 處理消息的業(yè)務(wù)邏輯
            // ...
            // 成功處理后確認(rèn)消息
            ack.acknowledge();
        } catch (Exception e) {
            // 異常處理,可以選擇不確認(rèn)
            System.err.println("消息處理失?。? + e.getMessage());
        }
    }
}

五、錯(cuò)誤處理與重試機(jī)制

消息消費(fèi)過程中可能會(huì)遇到各種異常,Spring Kafka提供了全面的錯(cuò)誤處理機(jī)制,包括重試、死信隊(duì)列等。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> retryListenerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    
    // 配置重試
    factory.setRetryTemplate(retryTemplate());
    
    // 配置恢復(fù)回調(diào)
    factory.setRecoveryCallback(context -> {
        ConsumerRecord<String, String> record = 
            (ConsumerRecord<String, String>) context.getAttribute("record");
        System.err.println("重試失敗,發(fā)送到死信隊(duì)列:" + record.value());
        // 可以將消息發(fā)送到死信主題
        // kafkaTemplate.send("dead-letter-topic", record.value());
        return null;
    });
    
    return factory;
}

private RetryTemplate retryTemplate() {
    RetryTemplate template = new RetryTemplate();
    
    // 固定間隔重試策略
    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(1000);  // 1秒重試間隔
    template.setBackOffPolicy(backOffPolicy);
    
    // 簡(jiǎn)單重試策略
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(3);  // 最大重試次數(shù)
    template.setRetryPolicy(retryPolicy);
    
    return template;
}

@KafkaListener(topics = "retry-topic", groupId = "retry-group", 
               containerFactory = "retryListenerFactory")
public void listenWithRetry(String message) {
    System.out.println("接收到需要重試處理的消息:" + message);
    // 模擬處理失敗
    if (message.contains("error")) {
        throw new RuntimeException("處理失敗,將重試");
    }
    System.out.println("消息處理成功");
}

總結(jié)

Spring Kafka通過@KafkaListener注解和靈活的消費(fèi)組配置,為開發(fā)者提供了強(qiáng)大的消息消費(fèi)能力。

本文介紹了基本配置、@KafkaListener的使用方法、消費(fèi)組機(jī)制、手動(dòng)提交偏移量以及錯(cuò)誤處理策略。

在實(shí)際應(yīng)用中,開發(fā)者應(yīng)根據(jù)業(yè)務(wù)需求選擇合適的消費(fèi)模式和配置策略,以實(shí)現(xiàn)高效可靠的消息處理。

合理利用消費(fèi)組可以實(shí)現(xiàn)負(fù)載均衡和水平擴(kuò)展,而手動(dòng)提交偏移量和錯(cuò)誤處理機(jī)制則能提升系統(tǒng)的健壯性。

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • springboot集成Feign的實(shí)現(xiàn)示例

    springboot集成Feign的實(shí)現(xiàn)示例

    Feign是聲明式HTTP客戶端,用于簡(jiǎn)化微服務(wù)之間的REST調(diào)用,本文就來介紹一下springboot集成Feign的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-11-11
  • 使用Java和ffmpeg把音頻和視頻合成視頻的操作方法

    使用Java和ffmpeg把音頻和視頻合成視頻的操作方法

    這篇文章主要介紹了使用Java和ffmpeg把音頻和視頻合成視頻,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的工作或?qū)W習(xí)具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-03-03
  • Java Dubbo框架知識(shí)點(diǎn)梳理

    Java Dubbo框架知識(shí)點(diǎn)梳理

    這篇文章主要介紹了Java Dubbo框架知識(shí)點(diǎn)梳理,通過詳細(xì)的文字講解和代碼實(shí)例,梳理了Dubbo這個(gè)框架,需要的朋友可以參考下
    2021-06-06
  • 詳解多線程及Runable 和Thread的區(qū)別

    詳解多線程及Runable 和Thread的區(qū)別

    這篇文章主要介紹了多線程及Runable 和Thread的區(qū)別,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-04-04
  • Spring Boot實(shí)現(xiàn)圖片上傳/加水印一把梭操作實(shí)例代碼

    Spring Boot實(shí)現(xiàn)圖片上傳/加水印一把梭操作實(shí)例代碼

    這篇文章主要給大家介紹了關(guān)于Spring Boot實(shí)現(xiàn)圖片上傳/加水印一把梭操作的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-11-11
  • Caused by: java.lang.ClassNotFoundException: org.apache.commons.collections.Transformer異常

    Caused by: java.lang.ClassNotFoundException: org.apache.comm

    這篇文章主要介紹了Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type異常,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07
  • java實(shí)現(xiàn)兩臺(tái)服務(wù)器間文件復(fù)制的方法

    java實(shí)現(xiàn)兩臺(tái)服務(wù)器間文件復(fù)制的方法

    這篇文章主要介紹了java實(shí)現(xiàn)兩臺(tái)服務(wù)器間文件復(fù)制的方法,是對(duì)單臺(tái)服務(wù)器上文件復(fù)制功能的升級(jí)與改進(jìn),具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-01-01
  • Java Calendar類使用案例詳解

    Java Calendar類使用案例詳解

    這篇文章主要介紹了Java Calendar類使用案例詳解,本篇文章通過簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-08-08
  • spring boot2.0總結(jié)介紹

    spring boot2.0總結(jié)介紹

    今天小編就為大家分享一篇關(guān)于spring boot2.0總結(jié)介紹,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧
    2018-12-12
  • Java 如何同時(shí)返回多個(gè)不同類型

    Java 如何同時(shí)返回多個(gè)不同類型

    這篇文章主要介紹了Java 同時(shí)返回多個(gè)不同類型的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08

最新評(píng)論