欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

spring kafka框架中@KafkaListener 注解解讀和使用案例

 更新時(shí)間:2023年02月20日 09:37:33   作者:占星安啦  
Kafka 目前主要作為一個(gè)分布式的發(fā)布訂閱式的消息系統(tǒng)使用,也是目前最流行的消息隊(duì)列系統(tǒng)之一,這篇文章主要介紹了kafka @KafkaListener 注解解讀,需要的朋友可以參考下

簡(jiǎn)介

Kafka 目前主要作為一個(gè)分布式的發(fā)布訂閱式的消息系統(tǒng)使用,也是目前最流行的消息隊(duì)列系統(tǒng)之一。因此,也越來(lái)越多的框架對(duì) kafka 做了集成,比如本文將要說(shuō)到的 spring-kafka。

Kafka 既然作為一個(gè)消息發(fā)布訂閱系統(tǒng),就包括消息生成者和消息消費(fèi)者。本文主要講述的 spring-kafka 框架的 kafkaListener 注解的深入解讀和使用案例。

解讀

源碼解讀

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })

@Retention(RetentionPolicy.RUNTIME)

@MessageMapping

@Documented

@Repeatable(KafkaListeners.class)

public @interface KafkaListener {
   /**

    * 消費(fèi)者的id,當(dāng)GroupId沒(méi)有被配置的時(shí)候,默認(rèn)id為GroupId

    */

   String id() default "";
   /**

    * 監(jiān)聽(tīng)容器工廠,當(dāng)監(jiān)聽(tīng)時(shí)需要區(qū)分單數(shù)據(jù)還是多數(shù)據(jù)消費(fèi)需要配置containerFactory      屬性

    */

   String containerFactory() default "";
   /**

    * 需要監(jiān)聽(tīng)的Topic,可監(jiān)聽(tīng)多個(gè),和 topicPattern 屬性互斥
*/

   String[] topics() default {};
   /**

    * 需要監(jiān)聽(tīng)的Topic的正則表達(dá)。和 topics,topicPartitions屬性互斥
    */

   String topicPattern() default "";
   /**

    * 可配置更加詳細(xì)的監(jiān)聽(tīng)信息,必須監(jiān)聽(tīng)某個(gè)Topic中的指定分區(qū),或者從offset為200的偏移量開(kāi)始監(jiān)聽(tīng),可配置該參數(shù), 和 topicPattern 屬性互斥
    */

   TopicPartition[] topicPartitions() default {};
   /**

    *偵聽(tīng)器容器組 

    */

   String containerGroup() default "";
   /**

    * 監(jiān)聽(tīng)異常處理器,配置BeanName

    */

   String errorHandler() default "";
   /**

    * 消費(fèi)組ID 

    */

   String groupId() default "";
   /**

    * id是否為GroupId

    */

   boolean idIsGroup() default true;
   /**

    * 消費(fèi)者Id前綴

    */

   String clientIdPrefix() default "";
   /**

    * 真實(shí)監(jiān)聽(tīng)容器的BeanName,需要在 BeanName前加 "__"

    */

   String beanRef() default "__listener";
}

使用案例

ConsumerRecord 類消費(fèi)

使用 ConsumerRecord 類接收有一定的好處,ConsumerRecord 類里面包含分區(qū)信息、消息頭、消息體等內(nèi)容,如果業(yè)務(wù)需要獲取這些參數(shù)時(shí),使用 ConsumerRecord 會(huì)是個(gè)不錯(cuò)的選擇。如果使用具體的類型接收消息體則更加方便,比如說(shuō)用 String 類型去接收消息體。

這里我們編寫(xiě)一個(gè) Listener 方法,監(jiān)聽(tīng) "topic1"Topic,并把 ConsumerRecord 里面所包含的內(nèi)容打印到控制臺(tái)中:

@Component

public class Listener {
    private static final Logger log = LoggerFactory.getLogger(Listener.class);
    @KafkaListener(id = "consumer", topics = "topic1")

    public void consumerListener(ConsumerRecord record) {

        log.info("topic.quick.consumer receive : " + record.toString());

    }
}

批量消費(fèi)

批量消費(fèi)在現(xiàn)實(shí)業(yè)務(wù)場(chǎng)景中是很有實(shí)用性的。因?yàn)榕肯M(fèi)可以增大 kafka 消費(fèi)吞吐量, 提高性能。

批量消費(fèi)實(shí)現(xiàn)步驟:

1、重新創(chuàng)建一份新的消費(fèi)者配置,配置為一次拉取 10 條消息

2、創(chuàng)建一個(gè)監(jiān)聽(tīng)容器工廠,命名為:batchContainerFactory,設(shè)置其為批量消費(fèi)并設(shè)置并發(fā)量為 5,這個(gè)并發(fā)量根據(jù)分區(qū)數(shù)決定,必須小于等于分區(qū)數(shù),否則會(huì)有線程一直處于空閑狀態(tài)。

3、創(chuàng)建一個(gè)分區(qū)數(shù)為 8 的 Topic。

4、創(chuàng)建監(jiān)聽(tīng)方法,設(shè)置消費(fèi) id 為 “batchConsumer”,clientID 前綴為“batch”,監(jiān)聽(tīng)“batch”,使用“batchContainerFactory” 工廠創(chuàng)建該監(jiān)聽(tīng)容器。

@Component

public class BatchListener {
    private static final Logger log= LoggerFactory.getLogger(BatchListener.class);
    private Map consumerProps() {

        Map props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        //一次拉取消息數(shù)量

        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                NumberDeserializers.IntegerDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.class);

        return props;

    }
    @Bean("batchContainerFactory")

    public ConcurrentKafkaListenerContainerFactory listenerContainer() {

        ConcurrentKafkaListenerContainerFactory container

                = new ConcurrentKafkaListenerContainerFactory();

        container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        //設(shè)置并發(fā)量,小于或等于Topic的分區(qū)數(shù)

        container.setConcurrency(5);

        //必須 設(shè)置為批量監(jiān)聽(tīng)

        container.setBatchListener(true);

        return container;

    }
    @Bean

    public NewTopic batchTopic() {

        return new NewTopic("topic.batch", 8, (short) 1);

    }
    @KafkaListener(id = "batchConsumer",clientIdPrefix = "batch"

            ,topics = {"topic.batch"},containerFactory = "batchContainerFactory")

    public void batchListener(List data) {

        log.info("topic.batch  receive : ");

        for (String s : data) {

            log.info(  s);

        }

    }

}

監(jiān)聽(tīng) Topic 中指定的分區(qū)

使用 @KafkaListener 注解的 topicPartitions 屬性監(jiān)聽(tīng)不同的 partition 分區(qū)。

@TopicPartition:topic-- 需要監(jiān)聽(tīng)的 Topic 的名稱,partitions – 需要監(jiān)聽(tīng) Topic 的分區(qū) id。

partitionOffsets – 可以設(shè)置從某個(gè)偏移量開(kāi)始監(jiān)聽(tīng),@PartitionOffset:partition – 分區(qū) Id,非數(shù)組,initialOffset – 初始偏移量。

@Bean

public NewTopic batchWithPartitionTopic() {

    return new NewTopic("topic.batch.partition", 8, (short) 1);

}
@KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory",

        topicPartitions = {

                @TopicPartition(topic = "topic.batch.partition",partitions = {"1","3"}),

                @TopicPartition(topic = "topic.batch.partition",partitions = {"0","4"},

                        partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100"))

        }

)

public void batchListenerWithPartition(List data) {

    log.info("topic.batch.partition  receive : ");

    for (String s : data) {

        log.info(s);

    }

}

注解方式獲取消息頭及消息體

當(dāng)你接收的消息包含請(qǐng)求頭,以及你監(jiān)聽(tīng)方法需要獲取該消息非常多的字段時(shí)可以通過(guò)這種方式。。這里使用的是默認(rèn)的監(jiān)聽(tīng)容器工廠創(chuàng)建的,如果你想使用批量消費(fèi),把對(duì)應(yīng)的類型改為 List 即可,比如 List data , List key。

@Payload:獲取的是消息的消息體,也就是發(fā)送內(nèi)容

@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):獲取發(fā)送消息的 key

@Header(KafkaHeaders.RECEIVED_PARTITION_ID):獲取當(dāng)前消息是從哪個(gè)分區(qū)中監(jiān)聽(tīng)到的

@Header(KafkaHeaders.RECEIVED_TOPIC):獲取監(jiān)聽(tīng)的 TopicName

@Header(KafkaHeaders.RECEIVED_TIMESTAMP):獲取時(shí)間戳

@KafkaListener(id = "params", topics = "topic.params")

public void otherListener(@Payload String data,

                         @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,

                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,

                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,

                         @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {

    log.info("topic.params receive : \n"+

            "data : "+data+"\n"+

            "key : "+key+"\n"+

            "partitionId : "+partition+"\n"+

            "topic : "+topic+"\n"+

            "timestamp : "+ts+"\n"

    );

}

使用 Ack 機(jī)制確認(rèn)消費(fèi)

Kafka 是通過(guò)最新保存偏移量進(jìn)行消息消費(fèi)的,而且確認(rèn)消費(fèi)的消息并不會(huì)立刻刪除,所以我們可以重復(fù)的消費(fèi)未被刪除的數(shù)據(jù),當(dāng)?shù)谝粭l消息未被確認(rèn),而第二條消息被確認(rèn)的時(shí)候,Kafka 會(huì)保存第二條消息的偏移量,也就是說(shuō)第一條消息再也不會(huì)被監(jiān)聽(tīng)器所獲取,除非是根據(jù)第一條消息的偏移量手動(dòng)獲取。Kafka 的 ack 機(jī)制可以有效的確保消費(fèi)不被丟失。因?yàn)樽詣?dòng)提交是在 kafka 拉取到數(shù)據(jù)之后就直接提交,這樣很容易丟失數(shù)據(jù),尤其是在需要事物控制的時(shí)候。

使用 Kafka 的 Ack 機(jī)制比較簡(jiǎn)單,只需簡(jiǎn)單的三步即可:

  • 設(shè)置 ENABLE_AUTO_COMMIT_CONFIG=false,禁止自動(dòng)提交
  • 設(shè)置 AckMode=MANUAL_IMMEDIATE
  • 監(jiān)聽(tīng)方法加入 Acknowledgment ack 參數(shù)

4.使用 Consumer.seek 方法,可以指定到某個(gè)偏移量的位置

@Component

public class AckListener {

    private static final Logger log = LoggerFactory.getLogger(AckListener.class);
    private Map consumerProps() {

        Map props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return props;

    }
    @Bean("ackContainerFactory")

    public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {

        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        return factory;

    }
    @KafkaListener(id = "ack", topics = "topic.ack", containerFactory = "ackContainerFactory")

    public void ackListener(ConsumerRecord record, Acknowledgment ack) {

        log.info("topic.quick.ack receive : " + record.value());

        ack.acknowledge();

    }

}

解決重復(fù)消費(fèi)

上一節(jié)中使用 ack 手動(dòng)提交偏移量時(shí),假如 consumer 掛了重啟,那它將從 committed offset 位置開(kāi)始重新消費(fèi),而不是 consume offset 位置。這也就意味著有可能重復(fù)消費(fèi)。

在 0.9 客戶端中,有 3 種 ack 策略:

策略 1: 自動(dòng)的,周期性的 ack。

策略 2:consumer.commitSync(),調(diào)用 commitSync,手動(dòng)同步 ack。每處理完 1 條消息,commitSync 1 次。

策略 3:consumer. commitASync(),手動(dòng)異步 ack。、

那么使用策略 2,提交每處理完 1 條消息,就發(fā)送一次 commitSync。那這樣是不是就可以解決 “重復(fù)消費(fèi)” 了呢?如下代碼:

while (true) {

        List buffer = new ArrayList<>();

        ConsumerRecords records = consumer.poll(100);

        for (ConsumerRecord record : records) {

            buffer.add(record);

        }

        insertIntoDb(buffer);    //消除處理,存到db

        consumer.commitSync();   //同步發(fā)送ack

        buffer.clear();

    }

}

答案是否定的!因?yàn)樯厦娴?insertIntoDb 和 commitSync 做不到原子操作:如果在數(shù)據(jù)處理完成,commitSync 的時(shí)候掛了,服務(wù)器再次重啟,消息仍然會(huì)重復(fù)消費(fèi)。

     那么如何解決重復(fù)消費(fèi)的問(wèn)題呢?答案是自己保存 committed offset,而不是依賴 kafka 的集群保存 committed offset,把消息的處理和保存 offset 做成一個(gè)原子操作,并且對(duì)消息加入唯一 id, 進(jìn)行判重。

依照官方文檔, 要自己保存偏移量, 需要:

  • enable.auto.commit=false, 禁用自動(dòng) ack。
  • 每次取到消息,把對(duì)應(yīng)的 offset 存下來(lái)。
  • 下次重啟,通過(guò) consumer.seek 函數(shù),定位到自己保存的 offset,從那開(kāi)始消費(fèi)。
  • 更進(jìn)一步處理可以對(duì)消息加入唯一 id, 進(jìn)行判重。

到此這篇關(guān)于kafka @KafkaListener 注解解讀的文章就介紹到這了,更多相關(guān)@KafkaListener 注解內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • JAVA annotation入門基礎(chǔ)

    JAVA annotation入門基礎(chǔ)

    以下是JAVA annotation入門基礎(chǔ),新手朋友們可以過(guò)來(lái)參考下。希望對(duì)你有所幫助
    2013-08-08
  • Spring Boot加密配置文件特殊內(nèi)容的示例代碼詳解

    Spring Boot加密配置文件特殊內(nèi)容的示例代碼詳解

    這篇文章主要介紹了Spring Boot加密配置文件特殊內(nèi)容的相關(guān)知識(shí),本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-05-05
  • 詳解SpringMVC中的異常處理機(jī)制

    詳解SpringMVC中的異常處理機(jī)制

    本篇文章將為大家詳細(xì)介紹一下springmvc的異常處理機(jī)制,用到了ControllerAdvice和ExceptionHandler注解,感興趣的小伙伴可以了解一下
    2022-07-07
  • IOC?容器啟動(dòng)和Bean實(shí)例化兩個(gè)階段詳解

    IOC?容器啟動(dòng)和Bean實(shí)例化兩個(gè)階段詳解

    這篇文章主要為大家介紹了IOC?容器啟動(dòng)和Bean實(shí)例化兩個(gè)階段詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-08-08
  • java抽象類和接口定義與用法詳解

    java抽象類和接口定義與用法詳解

    這篇文章主要介紹了java抽象類和接口定義與用法,結(jié)合實(shí)例形式詳細(xì)分析了java抽象類和接口的基本概念、原理、定義、使用方法及操作注意事項(xiàng),需要的朋友可以參考下
    2020-02-02
  • Java多線程案例之定時(shí)器詳解

    Java多線程案例之定時(shí)器詳解

    定時(shí)器是一種實(shí)際開(kāi)發(fā)中非常常用的組件,?類似于一個(gè)?“鬧鐘”,?達(dá)到一個(gè)設(shè)定的時(shí)間之后,?就執(zhí)行某個(gè)指定好的代碼。本文主要來(lái)和大家聊聊定時(shí)器的原理與使用,需要的可以參考一下
    2023-01-01
  • Java中使用Lambda表達(dá)式和函數(shù)編程示例

    Java中使用Lambda表達(dá)式和函數(shù)編程示例

    這篇文章介紹了Java中使用Lambda表達(dá)式和函數(shù)編程示例,該文章會(huì)演示多個(gè)示列,分別是變量聲明上下文中的lambda、return語(yǔ)句上下文中的lambda、賦值上下文中的lambda、lambda在數(shù)組初始值設(shè)定項(xiàng)上下文中的用法等等,需要的朋友可以參考一下
    2021-10-10
  • 原生Java操作兔子隊(duì)列RabbitMQ

    原生Java操作兔子隊(duì)列RabbitMQ

    這篇文章主要介紹了原生Java操作兔子隊(duì)列RabbitMQ,MQ全稱為Message?Queue,即消息隊(duì)列,“消息隊(duì)列”是在消息的傳輸過(guò)程中保存消息的容器,需要的朋友可以參考下
    2023-05-05
  • 你什么是Elastic Stack(ELK)

    你什么是Elastic Stack(ELK)

    這篇文章主要介紹了你什么是Elastic Stack(ELK),ELK是三款軟件的簡(jiǎn)稱,分別是Elasticsearch、Logstash、Kibana組成,需要的朋友可以參考下
    2023-04-04
  • SpringBoot之Order注解啟動(dòng)順序說(shuō)明

    SpringBoot之Order注解啟動(dòng)順序說(shuō)明

    這篇文章主要介紹了SpringBoot之Order注解啟動(dòng)順序說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09

最新評(píng)論