欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

spring-kafka使消費者動態(tài)訂閱新增的topic問題

 更新時間:2022年12月27日 11:31:56   作者:DayDayUp丶  
這篇文章主要介紹了spring-kafka使消費者動態(tài)訂閱新增的topic問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

一、前言

在Java中使用kafka,方式很多,例如:

  • 直接使用kafka-clients這類原生的API;
  • 也可以使用Spring對其的包裝API,即spring-kafka,同其它包裝API一樣(如JdbcTemplate、RestTemplate、RedisTemplate等等),KafkaTemplate是其生產(chǎn)者核心類,KafkaListener是其消費者核心注解;
  • 也有包裝地更加抽象的SpringCloudStream等。

這里討論的話題是,如何在spring-kafka中,使得一個消費者可以動態(tài)訂閱新增的topic?

本文不討論利用SpringCloudConfig或Apollo等分布式配置中心,利用@RefreshScope的方式來達到目的,這種方式有點殺雞用牛刀,也會增加系統(tǒng)復雜度和維護成本。

我的環(huán)境:jdk 1.8,Spring 2.1.3.RELEASE,kafka_2.12-2.3.0單節(jié)點。

二、需求分析

上面已經(jīng)提到,spring-kafka通過 @KafkaListener 的方式配置訂閱的topic,最常用的屬性可能是 topics,而要實現(xiàn)本文的需求,就要使用另一個屬性 topicPattern,查看它的屬性說明:

The topic pattern for this listener. 
The entries can be 'topic pattern', a'property-placeholder key' or an 'expression'. 
The framework will create acontainer that subscribes to all topics matching the specified pattern to getdynamically assigned partitions. 
The pattern matching will be performedperiodically against topics existing at the time of check. 
An expression mustbe resolved to the topic pattern (String or Pattern result types are supported). 

將其翻譯過來:

此偵聽器的主題模式。條目可以是“主題模式”,“屬性占位符鍵”或“表達式”。
該框架將創(chuàng)建一個容器,該容器訂閱與指定模式匹配的所有主題以獲取動態(tài)分配的分區(qū)。
模式匹配將針對檢查時存在的主題【定期執(zhí)行】。
表達式必須解析為主題模式(支持字符串或模式結果類型)。

注意:從說明信息來看,topicPattern 已經(jīng)可以做到定期檢查topic列表,然后將新加入的topic分配至某個消費者。

下面列出消費端的核心測試代碼:

@Component
public class SinkConsumer {
    @KafkaListener(topicPattern = "test_topic2.*")
    public void listen2(ConsumerRecord<?, ?> record) throws Exception {
        System.out.printf("topic2.* = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }
}

代碼實現(xiàn)很簡潔,就是期待我們新增一個符合 topicPattern 的topic后,spring-kafka能否自動為新建的topic分配到此目標消費者。

三、測試運行

3.1 啟動消費者服務

配置文件中,spring該配的配,kafka該配的配,接著啟動即可。

3.2 新建topic

新建 test_topic2_3,剛創(chuàng)建完不能立刻分配到目標消費者,從 topicPattern 的注釋得知spring-kafka會定期掃描topic列表,我們要給它幾分鐘等待掃描到新topic,并為它成功分配到目標消費者后,再去發(fā)送第一條消息(所以可以先去洗個手,此時19:02)。

3.3 等待topic被分配到消費者

洗手期間的控制臺日志提示:已為新建的 test_topic2_3 分配到我們的目標消費者,并將offset設置到起始位置0,日志如下:

2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=test] Revoking previously assigned partitions [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=test] (Re-)joining group
2019-11-15 19:05:15.757  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] Attempt to heartbeat failed since group is rebalancing
2019-11-15 19:05:15.761  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] Revoking previously assigned partitions [test_topic-0]
2019-11-15 19:05:15.762  INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [test_topic-0]
2019-11-15 19:05:15.762  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] (Re-)joining group
2019-11-15 19:05:16.025  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.025  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.026  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=test] Setting newly assigned partitions [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]
2019-11-15 19:05:16.026  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] Setting newly assigned partitions [test_topic-0]
2019-11-15 19:05:16.028  INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test_topic-0]
2019-11-15 19:05:16.032  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_3-0 to offset 0.
2019-11-15 19:05:16.032  INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]

3.4 發(fā)送第一條消息

洗手完畢,看到3.3小節(jié)里的日志,然后確認成功分配到目標消費者,且offset被設為0之后,發(fā)送第一條消息【我是第1個test_topic2_3的消息】,控制臺日志打印出此消息信息,代表成功消費:

topic2.* = test_topic2_3, offset = 0, value = {"date":"2019-11-15 19:11:13","msg":"我是第1個test_topic2_3的消息"} 

3.5 注意事項

若不等到offset被設為0之后,過早發(fā)送消息,則會在消費端丟失過早發(fā)送的消息,并且當spring-kafka自動設置offset的時候,日志提示,offset被設置為1,而不是起始位置0:

INFO o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_1-0 to offset 1.

在上面的3.1至3.4的整個過程中,可能會日志警告,代表暫時不能為新增的topic分配到目標消費者:

WARN o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] The following subscribed topics are not assigned to any members: [test_topic2_3] 

所以只需等待日志提示可以成功分配到目標消費者,且offset被設為0之后,即可發(fā)送第一條消息。

總結

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

  • mybatis多個區(qū)間處理方式(雙foreach循環(huán))

    mybatis多個區(qū)間處理方式(雙foreach循環(huán))

    這篇文章主要介紹了mybatis多個區(qū)間處理方式(雙foreach循環(huán)),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • 關于Java反射給泛型集合賦值問題

    關于Java反射給泛型集合賦值問題

    這篇文章主要介紹了Java反射給泛型集合賦值,需要的朋友可以參考下
    2022-01-01
  • Java開啟線程的四種方法案例詳解

    Java開啟線程的四種方法案例詳解

    這篇文章主要介紹了Java開啟線程的四種方法,本文結合實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-02-02
  • SpringBoot中使用tkMapper的方法詳解

    SpringBoot中使用tkMapper的方法詳解

    這篇文章主要介紹了SpringBoot中使用tkMapper的方法詳解
    2022-11-11
  • java簡單實現(xiàn)多線程及線程池實例詳解

    java簡單實現(xiàn)多線程及線程池實例詳解

    這篇文章主要為大家詳細介紹了java簡單實現(xiàn)多線程,及java爬蟲使用線程池實例,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-03-03
  • Java Scanner對象中hasNext()與next()方法的使用

    Java Scanner對象中hasNext()與next()方法的使用

    這篇文章主要介紹了Java Scanner對象中hasNext()與next()方法的使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • Java反射中java.beans包學習總結

    Java反射中java.beans包學習總結

    本篇文章通過學習Java反射中java.beans包,吧知識點做了總結,并把相關內(nèi)容做了關聯(lián),對此有需要的朋友可以學習參考下。
    2018-02-02
  • 詳解Java多線程和IO流的應用

    詳解Java多線程和IO流的應用

    這篇文章主要介紹了詳解Java多線程和IO流的應用,無論是本地文件復制,還是網(wǎng)絡多線程下載,對于流的使用都是一樣的,需要的朋友可以參考下
    2023-04-04
  • 使用maven自定義插件開發(fā)

    使用maven自定義插件開發(fā)

    這篇文章主要介紹了使用maven自定義插件開發(fā),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • Java8之Lambda表達式使用解讀

    Java8之Lambda表達式使用解讀

    這篇文章主要介紹了Java8之Lambda表達式使用解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-11-11

最新評論