欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

spring?kafka?@KafkaListener詳解與使用過(guò)程

 更新時(shí)間:2023年02月20日 09:20:06   作者:石臻臻的雜貨鋪  
這篇文章主要介紹了spring-kafka?@KafkaListener詳解與使用,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

說(shuō)明

  • 從2.2.4版開(kāi)始,您可以直接在注釋上指定Kafka使用者屬性,這些屬性將覆蓋在使用者工廠(chǎng)中配置的具有相同名稱(chēng)的所有屬性。您不能通過(guò)這種方式指定group.id和client.id屬性。他們將被忽略;
  • 可以使用#{…?}或?qū)傩哉嘉环?{…?})在SpEL上配置注釋上的大多數(shù)屬性。

比如:

   @KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",
            clientIdPrefix = "myClientId")

屬性concurrency將會(huì)從容器中獲取listen.concurrency的值,如果不存在就默認(rèn)用3

@KafkaListener詳解

id 監(jiān)聽(tīng)器的id

①. 消費(fèi)者線(xiàn)程命名規(guī)則

填寫(xiě):

2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 線(xiàn)程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消費(fèi)

沒(méi)有填寫(xiě)ID:

2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 線(xiàn)程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7

②.在相同容器中的監(jiān)聽(tīng)器ID不能重復(fù)

否則會(huì)報(bào)錯(cuò)

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id

③.會(huì)覆蓋消費(fèi)者工廠(chǎng)的消費(fèi)組GroupId

假如配置文件屬性配置了消費(fèi)組kafka.consumer.group-id=BASE-DEMO
正常情況它是該容器中的默認(rèn)消費(fèi)組
但是如果設(shè)置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么當(dāng)前消費(fèi)者的消費(fèi)組就是consumer-id7 ;

當(dāng)然如果你不想要他作為groupId的話(huà) 可以設(shè)置屬性idIsGroup = false;那么還是會(huì)使用默認(rèn)的GroupId;

④. 如果配置了屬性groupId,則其優(yōu)先級(jí)最高

 @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")

例如上面代碼中最終這個(gè)消費(fèi)者的消費(fèi)組GroupId是 “groupId-test”

該id屬性(如果存在)將用作Kafka消費(fèi)者group.id屬性,并覆蓋消費(fèi)者工廠(chǎng)中的已配置屬性(如果存在)您還可以groupId顯式設(shè)置或?qū)⑵湓O(shè)置idIsGroup為false,以恢復(fù)使用使用者工廠(chǎng)的先前行為group.id。

groupId 消費(fèi)組名

指定該消費(fèi)組的消費(fèi)組名; 關(guān)于消費(fèi)組名的配置可以看看上面的 id 監(jiān)聽(tīng)器的id

如何獲取消費(fèi)者 group.id

在監(jiān)聽(tīng)器中調(diào)用KafkaUtils.getConsumerGroupId()可以獲得當(dāng)前的groupId; 可以在日志中打印出來(lái); 可以知道是哪個(gè)客戶(hù)端消費(fèi)的;

topics 指定要監(jiān)聽(tīng)哪些topic(與topicPattern、topicPartitions 三選一)

可以同時(shí)監(jiān)聽(tīng)多個(gè)
topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 匹配Topic進(jìn)行監(jiān)聽(tīng)(與topics、topicPartitions 三選一) topicPartitions 顯式分區(qū)分配

可以為監(jiān)聽(tīng)器配置明確的主題和分區(qū)(以及可選的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

上面例子意思是 監(jiān)聽(tīng)topic1的0,1分區(qū);監(jiān)聽(tīng)topic2的第0分區(qū),并且第1分區(qū)從offset為100的開(kāi)始消費(fèi);

errorHandler 異常處理

實(shí)現(xiàn)KafkaListenerErrorHandler; 然后做一些異常處理;

@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
    	//do someting
        return null;
    }
}

調(diào)用的時(shí)候 填寫(xiě)beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 監(jiān)聽(tīng)器工廠(chǎng)

指定生成監(jiān)聽(tīng)器的工廠(chǎng)類(lèi);

例如我寫(xiě)一個(gè) 批量消費(fèi)的工廠(chǎng)類(lèi)

    /**
     * 監(jiān)聽(tīng)器工廠(chǎng) 批量消費(fèi)
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        //設(shè)置為批量消費(fèi),每個(gè)批次數(shù)量在Kafka配置參數(shù)中設(shè)置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

使用containerFactory = "batchFactory"

clientIdPrefix 客戶(hù)端前綴

會(huì)覆蓋消費(fèi)者工廠(chǎng)的kafka.consumer.client-id屬性; 最為前綴后面接 -n n是數(shù)字

concurrency并發(fā)數(shù)

會(huì)覆蓋消費(fèi)者工廠(chǎng)中的concurrency ,這里的并發(fā)數(shù)就是多線(xiàn)程消費(fèi); 比如說(shuō)單機(jī)情況下,你設(shè)置了3; 相當(dāng)于就是啟動(dòng)了3個(gè)客戶(hù)端來(lái)分配消費(fèi)分區(qū);分布式情況 總線(xiàn)程數(shù)=concurrency*機(jī)器數(shù)量; 并不是設(shè)置越多越好,具體如何設(shè)置請(qǐng)看Java concurrency之集合

    /**
     * 監(jiān)聽(tīng)器工廠(chǎng) 
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(6);
        return factory;
    }
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)

雖然使用的工廠(chǎng)是concurrencyFactory(concurrency配置了6); 但是他最終生成的監(jiān)聽(tīng)器數(shù)量 是1;

properties 配置其他屬性

kafka中的屬性看org.apache.kafka.clients.consumer.ConsumerConfig ;
同名的都可以修改掉;

用法

    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
            , clientIdPrefix = "myClientId5",groupId = "groupId-test",
            properties = {
                    "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")

@KafkaListener使用

KafkaListenerEndpointRegistry

    @Autowired
    private KafkaListenerEndpointRegistry registry;
       //.... 獲取所有注冊(cè)的監(jiān)聽(tīng)器
        registry.getAllListenerContainers();

設(shè)置入?yún)Ⅱ?yàn)證器

當(dāng)您將Spring Boot與驗(yàn)證啟動(dòng)器一起使用時(shí),將LocalValidatorFactoryBean自動(dòng)配置:如下

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

使用

@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

spring-kafka官方文檔

擴(kuò)展:Spring for Apache Kafka @KafkaListener使用及注意事項(xiàng)

官方文檔:   https://docs.spring.io/spring-kafka/reference/html/

 @KafkaListener

The @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

If, say, six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

You can now configure a KafkaListenerErrorHandler to handle exceptions. See Handling Exceptions for more information.

By default, the @KafkaListener id property is now used as the group.id property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id values for listeners. To restore the previous behavior of using the factory configured group.id, set the idIsGroup property on the annotation to false.

示例:

   demo類(lèi):

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}</code>

配置類(lèi)及注解:
@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

到此這篇關(guān)于spring-kafka @KafkaListener詳解與使用的文章就介紹到這了,更多相關(guān)spring-kafka使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot @FixMethodOrder 如何調(diào)整單元測(cè)試順序

    SpringBoot @FixMethodOrder 如何調(diào)整單元測(cè)試順序

    這篇文章主要介紹了SpringBoot @FixMethodOrder 調(diào)整單元測(cè)試順序方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • java web實(shí)現(xiàn)自動(dòng)登錄功能

    java web實(shí)現(xiàn)自動(dòng)登錄功能

    這篇文章主要為大家詳細(xì)介紹了java web實(shí)現(xiàn)自動(dòng)登錄功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-10-10
  • SpringBoot項(xiàng)目中連接SQL Server的三種方式

    SpringBoot項(xiàng)目中連接SQL Server的三種方式

    連接SQL Server是許多Spring Boot項(xiàng)目中常見(jiàn)的需求之一,本文主要介紹了SpringBoot項(xiàng)目中連接SQL Server的三種方式,具有一定的參考價(jià)值 ,感興趣的可以了解一下
    2023-09-09
  • Java基于Javafaker生成測(cè)試數(shù)據(jù)

    Java基于Javafaker生成測(cè)試數(shù)據(jù)

    這篇文章主要介紹了Java基于Javafaker生成測(cè)試數(shù)據(jù)的方法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下
    2020-12-12
  • Hibernate核心類(lèi)和接口的詳細(xì)介紹

    Hibernate核心類(lèi)和接口的詳細(xì)介紹

    今天小編就為大家分享一篇關(guān)于Hibernate核心類(lèi)和接口的詳細(xì)介紹,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧
    2019-03-03
  • java long轉(zhuǎn)String +Codeforces110A案例

    java long轉(zhuǎn)String +Codeforces110A案例

    這篇文章主要介紹了java long轉(zhuǎn)String +Codeforces110A案例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-09-09
  • XFire構(gòu)建web service客戶(hù)端的五種方式

    XFire構(gòu)建web service客戶(hù)端的五種方式

    本篇文章主要介紹了XFire構(gòu)建web service客戶(hù)端的五種方式。具有很好的參考價(jià)值,下面跟著小編一起來(lái)看下吧
    2017-01-01
  • Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧大全

    Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧大全

    在Java中QueryWrapper是MyBatis-Plus框架中的一個(gè)查詢(xún)構(gòu)造器,它提供了豐富的查詢(xún)方法,其中包括and和or方法,可以用于構(gòu)建復(fù)雜的查詢(xún)條件,這篇文章主要給大家介紹了關(guān)于Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧的相關(guān)資料,需要的朋友可以參考下
    2024-07-07
  • Lombok在idea中的使用教程

    Lombok在idea中的使用教程

    Lombok是一個(gè)可以通過(guò)簡(jiǎn)單的注解形式,來(lái)幫助我們簡(jiǎn)化消除一些必須有但顯得很臃腫(如果getter、setter方法)的Java代碼的工具,通過(guò)使用對(duì)應(yīng)的注解,可以在編譯源碼的時(shí)候生成對(duì)應(yīng)的方法,這篇文章主要介紹了Lombok在idea中的使用,需要的朋友可以參考下
    2023-03-03
  • Java應(yīng)用服務(wù)器之tomcat會(huì)話(huà)復(fù)制集群配置的示例詳解

    Java應(yīng)用服務(wù)器之tomcat會(huì)話(huà)復(fù)制集群配置的示例詳解

    這篇文章主要介紹了Java應(yīng)用服務(wù)器之tomcat會(huì)話(huà)復(fù)制集群配置的相關(guān)知識(shí),本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-07-07

最新評(píng)論