spring-kafka使消費(fèi)者動(dòng)態(tài)訂閱新增的topic問(wèn)題
一、前言
在Java中使用kafka,方式很多,例如:
- 直接使用kafka-clients這類(lèi)原生的API;
- 也可以使用Spring對(duì)其的包裝API,即spring-kafka,同其它包裝API一樣(如JdbcTemplate、RestTemplate、RedisTemplate等等),KafkaTemplate是其生產(chǎn)者核心類(lèi),KafkaListener是其消費(fèi)者核心注解;
- 也有包裝地更加抽象的SpringCloudStream等。
這里討論的話題是,如何在spring-kafka中,使得一個(gè)消費(fèi)者可以動(dòng)態(tài)訂閱新增的topic?
本文不討論利用SpringCloudConfig或Apollo等分布式配置中心,利用@RefreshScope的方式來(lái)達(dá)到目的,這種方式有點(diǎn)殺雞用牛刀,也會(huì)增加系統(tǒng)復(fù)雜度和維護(hù)成本。
我的環(huán)境:jdk 1.8,Spring 2.1.3.RELEASE,kafka_2.12-2.3.0單節(jié)點(diǎn)。
二、需求分析
上面已經(jīng)提到,spring-kafka通過(guò) @KafkaListener 的方式配置訂閱的topic,最常用的屬性可能是 topics,而要實(shí)現(xiàn)本文的需求,就要使用另一個(gè)屬性 topicPattern,查看它的屬性說(shuō)明:
The topic pattern for this listener.
The entries can be 'topic pattern', a'property-placeholder key' or an 'expression'.
The framework will create acontainer that subscribes to all topics matching the specified pattern to getdynamically assigned partitions.
The pattern matching will be performedperiodically against topics existing at the time of check.
An expression mustbe resolved to the topic pattern (String or Pattern result types are supported).
將其翻譯過(guò)來(lái):
此偵聽(tīng)器的主題模式。條目可以是“主題模式”,“屬性占位符鍵”或“表達(dá)式”。
該框架將創(chuàng)建一個(gè)容器,該容器訂閱與指定模式匹配的所有主題以獲取動(dòng)態(tài)分配的分區(qū)。
模式匹配將針對(duì)檢查時(shí)存在的主題【定期執(zhí)行】。
表達(dá)式必須解析為主題模式(支持字符串或模式結(jié)果類(lèi)型)。
注意:從說(shuō)明信息來(lái)看,topicPattern 已經(jīng)可以做到定期檢查topic列表,然后將新加入的topic分配至某個(gè)消費(fèi)者。
下面列出消費(fèi)端的核心測(cè)試代碼:
@Component public class SinkConsumer { @KafkaListener(topicPattern = "test_topic2.*") public void listen2(ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic2.* = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value()); } }
代碼實(shí)現(xiàn)很簡(jiǎn)潔,就是期待我們新增一個(gè)符合 topicPattern 的topic后,spring-kafka能否自動(dòng)為新建的topic分配到此目標(biāo)消費(fèi)者。
三、測(cè)試運(yùn)行
3.1 啟動(dòng)消費(fèi)者服務(wù)
配置文件中,spring該配的配,kafka該配的配,接著啟動(dòng)即可。
3.2 新建topic
新建 test_topic2_3,剛創(chuàng)建完不能立刻分配到目標(biāo)消費(fèi)者,從 topicPattern 的注釋得知spring-kafka會(huì)定期掃描topic列表,我們要給它幾分鐘等待掃描到新topic,并為它成功分配到目標(biāo)消費(fèi)者后,再去發(fā)送第一條消息(所以可以先去洗個(gè)手,此時(shí)19:02)。
3.3 等待topic被分配到消費(fèi)者
洗手期間的控制臺(tái)日志提示:已為新建的 test_topic2_3 分配到我們的目標(biāo)消費(fèi)者,并將offset設(shè)置到起始位置0,日志如下:
2019-11-15 19:05:12.958 INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=test] Revoking previously assigned partitions [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958 INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958 INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=test] (Re-)joining group
2019-11-15 19:05:15.757 INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=test] Attempt to heartbeat failed since group is rebalancing
2019-11-15 19:05:15.761 INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=test] Revoking previously assigned partitions [test_topic-0]
2019-11-15 19:05:15.762 INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [test_topic-0]
2019-11-15 19:05:15.762 INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=test] (Re-)joining group
2019-11-15 19:05:16.025 INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.025 INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.026 INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=test] Setting newly assigned partitions [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]
2019-11-15 19:05:16.026 INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=test] Setting newly assigned partitions [test_topic-0]
2019-11-15 19:05:16.028 INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [test_topic-0]
2019-11-15 19:05:16.032 INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_3-0 to offset 0.
2019-11-15 19:05:16.032 INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]
3.4 發(fā)送第一條消息
洗手完畢,看到3.3小節(jié)里的日志,然后確認(rèn)成功分配到目標(biāo)消費(fèi)者,且offset被設(shè)為0之后,發(fā)送第一條消息【我是第1個(gè)test_topic2_3的消息】,控制臺(tái)日志打印出此消息信息,代表成功消費(fèi):
topic2.* = test_topic2_3, offset = 0, value = {"date":"2019-11-15 19:11:13","msg":"我是第1個(gè)test_topic2_3的消息"}
3.5 注意事項(xiàng)
若不等到offset被設(shè)為0之后,過(guò)早發(fā)送消息,則會(huì)在消費(fèi)端丟失過(guò)早發(fā)送的消息,并且當(dāng)spring-kafka自動(dòng)設(shè)置offset的時(shí)候,日志提示,offset被設(shè)置為1,而不是起始位置0:
INFO o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_1-0 to offset 1.
在上面的3.1至3.4的整個(gè)過(guò)程中,可能會(huì)日志警告,代表暫時(shí)不能為新增的topic分配到目標(biāo)消費(fèi)者:
WARN o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=test] The following subscribed topics are not assigned to any members: [test_topic2_3]
所以只需等待日志提示可以成功分配到目標(biāo)消費(fèi)者,且offset被設(shè)為0之后,即可發(fā)送第一條消息。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot實(shí)現(xiàn)發(fā)送短信的示例代碼
這篇文章主要介紹了SpringBoot實(shí)現(xiàn)發(fā)送短信的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04RocketMQ中的消費(fèi)者啟動(dòng)流程解讀
這篇文章主要介紹了RocketMQ中的消費(fèi)者啟動(dòng)流程解讀,RocketMQ是一款高性能、高可靠性的分布式消息中間件,消費(fèi)者是RocketMQ中的重要組成部分,消費(fèi)者負(fù)責(zé)從消息隊(duì)列中獲取消息并進(jìn)行處理,需要的朋友可以參考下2023-10-10Java單例模式實(shí)現(xiàn)靜態(tài)內(nèi)部類(lèi)方法示例
這篇文章主要介紹了Java單例模式實(shí)現(xiàn)靜態(tài)內(nèi)部類(lèi)方法示例,涉及構(gòu)造函數(shù)私有化等相關(guān)內(nèi)容,需要的朋友可以了解下。2017-09-09Java中的Web MVC簡(jiǎn)介_(kāi)動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
MVC模型是一種架構(gòu)型的模式,本身不引入新功能,只是幫助我們將開(kāi)發(fā)的結(jié)構(gòu)組織的更加合理,使展示與模型分離、流程控制邏輯、業(yè)務(wù)邏輯調(diào)用與展示邏輯分離2017-09-09Java多線程編程之ThreadLocal線程范圍內(nèi)的共享變量
這篇文章主要介紹了Java多線程編程之ThreadLocal線程范圍內(nèi)的共享變量,本文講解了ThreadLocal的作用和目的、ThreadLocal的應(yīng)用場(chǎng)景、ThreadLocal的使用實(shí)例等,需要的朋友可以參考下2015-05-05新手學(xué)習(xí)微服務(wù)SpringCloud項(xiàng)目架構(gòu)搭建方法
這篇文章主要介紹了新手學(xué)習(xí)微服務(wù)SpringCloud項(xiàng)目架構(gòu)搭建方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-01-01Java 基礎(chǔ)語(yǔ)法之解析 Java 的包和繼承
包是組織類(lèi)的一種方式,繼承顧名思義,比如誰(shuí)繼承了長(zhǎng)輩的產(chǎn)業(yè),其實(shí)這里的繼承和我們生活中的繼承很類(lèi)似,下面文字將為大家詳細(xì)介紹Java的包和繼承2021-09-09Java21增強(qiáng)對(duì)Emoji表情符號(hào)處理示例詳解
這篇文章主要為大家介紹了Java21增強(qiáng)對(duì)Emoji表情符號(hào)處理示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11Java線程編程中Thread類(lèi)的基礎(chǔ)學(xué)習(xí)教程
這篇文章主要介紹了Java線程編程中Thread類(lèi)的基礎(chǔ)學(xué)習(xí)教程,Thread類(lèi)包含諸多操作線程的方法,非常重要,需要的朋友可以參考下2015-12-12