spring?kafka?@KafkaListener詳解與使用過(guò)程
說(shuō)明
- 從2.2.4版開(kāi)始,您可以直接在注釋上指定Kafka使用者屬性,這些屬性將覆蓋在使用者工廠(chǎng)中配置的具有相同名稱(chēng)的所有屬性。您不能通過(guò)這種方式指定group.id和client.id屬性。他們將被忽略;
- 可以使用#{…?}或?qū)傩哉嘉环?{…?})在SpEL上配置注釋上的大多數(shù)屬性。
比如:
@KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}", clientIdPrefix = "myClientId")
屬性concurrency
將會(huì)從容器中獲取listen.concurrency
的值,如果不存在就默認(rèn)用3
@KafkaListener詳解
id 監(jiān)聽(tīng)器的id
①. 消費(fèi)者線(xiàn)程命名規(guī)則
填寫(xiě):
2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 線(xiàn)程:Thread[
consumer-id5-1-C-1
,5,main]-groupId:BASE-DEMO consumer-id5 消費(fèi)
沒(méi)有填寫(xiě)ID:
2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 線(xiàn)程:Thread[
org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
,5,main] consumer-id7
②.在相同容器中的監(jiān)聽(tīng)器ID不能重復(fù)
否則會(huì)報(bào)錯(cuò)
Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
③.會(huì)覆蓋消費(fèi)者工廠(chǎng)的消費(fèi)組GroupId
假如配置文件屬性配置了消費(fèi)組kafka.consumer.group-id=BASE-DEMO
正常情況它是該容器中的默認(rèn)消費(fèi)組
但是如果設(shè)置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么當(dāng)前消費(fèi)者的消費(fèi)組就是consumer-id7
;
當(dāng)然如果你不想要他作為groupId的話(huà) 可以設(shè)置屬性idIsGroup = false
;那么還是會(huì)使用默認(rèn)的GroupId;
④. 如果配置了屬性groupId,則其優(yōu)先級(jí)最高
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")
例如上面代碼中最終這個(gè)消費(fèi)者的消費(fèi)組GroupId
是 “groupId-test”
該id屬性(如果存在)將用作Kafka消費(fèi)者group.id屬性,并覆蓋消費(fèi)者工廠(chǎng)中的已配置屬性(如果存在)您還可以groupId顯式設(shè)置或?qū)⑵湓O(shè)置idIsGroup為false,以恢復(fù)使用使用者工廠(chǎng)的先前行為group.id。
groupId 消費(fèi)組名
指定該消費(fèi)組的消費(fèi)組名; 關(guān)于消費(fèi)組名的配置可以看看上面的 id 監(jiān)聽(tīng)器的id
如何獲取消費(fèi)者 group.id
在監(jiān)聽(tīng)器中調(diào)用KafkaUtils.getConsumerGroupId()
可以獲得當(dāng)前的groupId; 可以在日志中打印出來(lái); 可以知道是哪個(gè)客戶(hù)端消費(fèi)的;
topics 指定要監(jiān)聽(tīng)哪些topic(與topicPattern、topicPartitions 三選一)
可以同時(shí)監(jiān)聽(tīng)多個(gè)topics = {"SHI_TOPIC3","SHI_TOPIC4"}
topicPattern 匹配Topic進(jìn)行監(jiān)聽(tīng)(與topics、topicPartitions 三選一) topicPartitions 顯式分區(qū)分配
可以為監(jiān)聽(tīng)器配置明確的主題和分區(qū)(以及可選的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { ... }
上面例子意思是 監(jiān)聽(tīng)topic1
的0,1分區(qū);監(jiān)聽(tīng)topic2
的第0分區(qū),并且第1分區(qū)從offset為100的開(kāi)始消費(fèi);
errorHandler 異常處理
實(shí)現(xiàn)KafkaListenerErrorHandler
; 然后做一些異常處理;
@Component public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception) { return null; } @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { //do someting return null; } }
調(diào)用的時(shí)候 填寫(xiě)beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"
containerFactory 監(jiān)聽(tīng)器工廠(chǎng)
指定生成監(jiān)聽(tīng)器的工廠(chǎng)類(lèi);
例如我寫(xiě)一個(gè) 批量消費(fèi)的工廠(chǎng)類(lèi)
/** * 監(jiān)聽(tīng)器工廠(chǎng) 批量消費(fèi) * @return */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(kafkaConsumerFactory()); //設(shè)置為批量消費(fèi),每個(gè)批次數(shù)量在Kafka配置參數(shù)中設(shè)置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(true); return factory; }
使用containerFactory = "batchFactory"
clientIdPrefix 客戶(hù)端前綴
會(huì)覆蓋消費(fèi)者工廠(chǎng)的
kafka.consumer.client-id
屬性; 最為前綴后面接-n
n是數(shù)字
concurrency并發(fā)數(shù)
會(huì)覆蓋消費(fèi)者工廠(chǎng)中的concurrency ,這里的并發(fā)數(shù)就是多線(xiàn)程消費(fèi); 比如說(shuō)單機(jī)情況下,你設(shè)置了3; 相當(dāng)于就是啟動(dòng)了3個(gè)客戶(hù)端來(lái)分配消費(fèi)分區(qū);分布式情況 總線(xiàn)程數(shù)=concurrency*機(jī)器數(shù)量; 并不是設(shè)置越多越好,具體如何設(shè)置請(qǐng)看Java concurrency之集合
/** * 監(jiān)聽(tīng)器工廠(chǎng) * @return */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(kafkaConsumerFactory()); factory.setConcurrency(6); return factory; }
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)
雖然使用的工廠(chǎng)是concurrencyFactory
(concurrency配置了6); 但是他最終生成的監(jiān)聽(tīng)器數(shù)量 是1;
properties 配置其他屬性
kafka中的屬性看org.apache.kafka.clients.consumer.ConsumerConfig
;
同名的都可以修改掉;
用法
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1" , clientIdPrefix = "myClientId5",groupId = "groupId-test", properties = { "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")
@KafkaListener使用
KafkaListenerEndpointRegistry
@Autowired private KafkaListenerEndpointRegistry registry; //.... 獲取所有注冊(cè)的監(jiān)聽(tīng)器 registry.getAllListenerContainers();
設(shè)置入?yún)Ⅱ?yàn)證器
當(dāng)您將Spring Boot與驗(yàn)證啟動(dòng)器一起使用時(shí),將LocalValidatorFactoryBean自動(dòng)配置:如下
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer { @Autowired private LocalValidatorFactoryBean validator; ... @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(this.validator); } }
使用
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler", containerFactory = "kafkaJsonListenerContainerFactory") public void validatedListener(@Payload @Valid ValidatedClass val) { ... } @Bean public KafkaListenerErrorHandler validationErrorHandler() { return (m, e) -> { ... }; }
擴(kuò)展:Spring for Apache Kafka @KafkaListener使用及注意事項(xiàng)
官方文檔: https://docs.spring.io/spring-kafka/reference/html/
@KafkaListener
The @KafkaListener
annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter
configured with various features, such as converters to convert the data, if necessary, to match the method parameters.
If, say, six TopicPartition
instances are provided and the concurrency
is 3
; each container gets two partitions. For five TopicPartition
instances, two containers get two partitions, and the third gets one. If the concurrency
is greater than the number of TopicPartitions
, the concurrency
is adjusted down such that each container gets one partition.
You can now configure a KafkaListenerErrorHandler
to handle exceptions. See Handling Exceptions for more information.
By default, the @KafkaListener
id
property is now used as the group.id
property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId
on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id
values for listeners. To restore the previous behavior of using the factory configured group.id
, set the idIsGroup
property on the annotation to false
.
示例:
demo類(lèi):
public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } }</code> 配置類(lèi)及注解:
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } }
到此這篇關(guān)于spring-kafka @KafkaListener詳解與使用的文章就介紹到這了,更多相關(guān)spring-kafka使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot @FixMethodOrder 如何調(diào)整單元測(cè)試順序
這篇文章主要介紹了SpringBoot @FixMethodOrder 調(diào)整單元測(cè)試順序方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09java web實(shí)現(xiàn)自動(dòng)登錄功能
這篇文章主要為大家詳細(xì)介紹了java web實(shí)現(xiàn)自動(dòng)登錄功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-10-10SpringBoot項(xiàng)目中連接SQL Server的三種方式
連接SQL Server是許多Spring Boot項(xiàng)目中常見(jiàn)的需求之一,本文主要介紹了SpringBoot項(xiàng)目中連接SQL Server的三種方式,具有一定的參考價(jià)值 ,感興趣的可以了解一下2023-09-09Java基于Javafaker生成測(cè)試數(shù)據(jù)
這篇文章主要介紹了Java基于Javafaker生成測(cè)試數(shù)據(jù)的方法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2020-12-12java long轉(zhuǎn)String +Codeforces110A案例
這篇文章主要介紹了java long轉(zhuǎn)String +Codeforces110A案例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-09-09XFire構(gòu)建web service客戶(hù)端的五種方式
本篇文章主要介紹了XFire構(gòu)建web service客戶(hù)端的五種方式。具有很好的參考價(jià)值,下面跟著小編一起來(lái)看下吧2017-01-01Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧大全
在Java中QueryWrapper是MyBatis-Plus框架中的一個(gè)查詢(xún)構(gòu)造器,它提供了豐富的查詢(xún)方法,其中包括and和or方法,可以用于構(gòu)建復(fù)雜的查詢(xún)條件,這篇文章主要給大家介紹了關(guān)于Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧的相關(guān)資料,需要的朋友可以參考下2024-07-07Java應(yīng)用服務(wù)器之tomcat會(huì)話(huà)復(fù)制集群配置的示例詳解
這篇文章主要介紹了Java應(yīng)用服務(wù)器之tomcat會(huì)話(huà)復(fù)制集群配置的相關(guān)知識(shí),本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07