欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot如何獲取Kafka的Topic列表

 更新時間:2022年09月29日 16:04:21   作者:AnthonyJing  
這篇文章主要介紹了SpringBoot如何獲取Kafka的Topic列表問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

寫在前面   

眾所周知,kafka是現(xiàn)代流行的消息隊列,它使用經(jīng)典的消息訂閱發(fā)布模式實現(xiàn)消息的流轉(zhuǎn),大部分代碼結(jié)合kafka使用都是使用它的生產(chǎn)者和消費者來實現(xiàn)消息的傳遞,那么對于kafka的主題的管理怎么使用代碼實現(xiàn)呢,這是今天要講的主題 

命令行模式

kafka要結(jié)合zookeeper使用,因為它把元數(shù)據(jù)信息交給了zookeeper管理,其實使用命令行命令很容易就能對topic進行管理,主要使用的命令是kafka-topics.sh

創(chuàng)建主題
kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --replication-factor 3 --partitions 3
查看主題列表
kafka-topics.sh --zookeeper localhost:2181 --list
查看主題狀態(tài)
kafka-topics.sh --describe ?--zookeeper 127.0.0.1:2181 --topic TestTopic?

代碼模式

那么話說回來如何使用代碼實現(xiàn)topic的管理呢,那么現(xiàn)在就來看一下代碼的實現(xiàn)方式,此處使用springboot2框架實現(xiàn)。

首先引進依賴kafka的相關(guān)

<dependency>
? ? <groupId>org.springframework.kafka</groupId>
?? ?<artifactId>spring-kafka</artifactId>
</dependency>

創(chuàng)建一個測試類進行測試

public static void main(String[] args) {
? ? ? ? Properties properties = ?new Properties();
? ? ? ? properties.put("bootstrap.servers", "10.0.59.11:9093");
? ? ? ? properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ? ? ? properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ? ? ? AdminClient adminClient = AdminClient.create(properties);
? ? ? ? ListTopicsResult result = adminClient.listTopics();
? ? ? ? KafkaFuture<Set<String>> names = result.names();
? ? ? ? try {
? ? ? ? ? ? names.get().forEach((k)->{
? ? ? ? ? ? ? ? System.out.println(k);
? ? ? ? ? ? });
? ? ? ? } catch (InterruptedException | ExecutionException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? adminClient.close();
? ? }

這里面最主要的就是AdminClient這個類,AdminClient實現(xiàn)了Admin接口,Admin里面定義了許多和kafka配置相關(guān)的東西

讓我們依次來看一下

public abstract class AdminClient implements Admin {
? ? public AdminClient() {
? ? }

? ? public static AdminClient create(Properties props) {
? ? ? ? return (AdminClient)Admin.create(props);
? ? }

? ? public static AdminClient create(Map<String, Object> conf) {
? ? ? ? return (AdminClient)Admin.create(conf);
? ? }
}

而Admin接口里有以下方法

static Admin create(Properties props) {
? ? ? ? return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), (TimeoutProcessorFactory)null);
? ? }

? ? static Admin create(Map<String, Object> conf) {
? ? ? ? return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), (TimeoutProcessorFactory)null);
? ? }

? ? default void close() {
? ? ? ? this.close(9223372036854775807L, TimeUnit.MILLISECONDS);
? ? }

? ? /** @deprecated */
? ? @Deprecated
? ? default void close(long duration, TimeUnit unit) {
? ? ? ? this.close(Duration.ofMillis(unit.toMillis(duration)));
? ? }

? ? void close(Duration var1);

? ? default CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
? ? ? ? return this.createTopics(newTopics, new CreateTopicsOptions());
? ? }

? ? CreateTopicsResult createTopics(Collection<NewTopic> var1, CreateTopicsOptions var2);

? ? default DeleteTopicsResult deleteTopics(Collection<String> topics) {
? ? ? ? return this.deleteTopics(topics, new DeleteTopicsOptions());
? ? }

? ? DeleteTopicsResult deleteTopics(Collection<String> var1, DeleteTopicsOptions var2);

? ? default ListTopicsResult listTopics() {
? ? ? ? return this.listTopics(new ListTopicsOptions());
? ? }

? ? ListTopicsResult listTopics(ListTopicsOptions var1);

? ? default DescribeTopicsResult describeTopics(Collection<String> topicNames) {
? ? ? ? return this.describeTopics(topicNames, new DescribeTopicsOptions());
? ? }

? ? DescribeTopicsResult describeTopics(Collection<String> var1, DescribeTopicsOptions var2);

? ? default DescribeClusterResult describeCluster() {
? ? ? ? return this.describeCluster(new DescribeClusterOptions());
? ? }

? ? DescribeClusterResult describeCluster(DescribeClusterOptions var1);

? ? default DescribeAclsResult describeAcls(AclBindingFilter filter) {
? ? ? ? return this.describeAcls(filter, new DescribeAclsOptions());
? ? }

? ? DescribeAclsResult describeAcls(AclBindingFilter var1, DescribeAclsOptions var2);

通過名稱我們可以看出,里面有創(chuàng)建Topic,有刪除Topic,有列出所有Topic,有描述Topic

我們通過這些方法可以管理Kafka的Topic

最后我們來看一下實現(xiàn)效果 

控制臺打印里面有3個Topic

去服務(wù)器命令行驗證一下

也是3個說明代碼沒問題

總結(jié)

網(wǎng)上大多數(shù)關(guān)于kafka的代碼實現(xiàn)都是關(guān)于生產(chǎn)者和消費者的實現(xiàn),今天主要是使用一下kakfa的配置管理類,實現(xiàn)對topic的管理,以此記錄作為以后工作中的參考。希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • response.setHeader各種用法詳解

    response.setHeader各種用法詳解

    本文主要介紹了response.setHeader各種用法。具有很好的參考價值,下面跟著小編一起來看下吧
    2017-03-03
  • IntelliJ IDEA(2017)安裝和破解的方法

    IntelliJ IDEA(2017)安裝和破解的方法

    這篇文章主要介紹了IntelliJ IDEA(2017)安裝和破解的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-11-11
  • Java實現(xiàn)在線語音識別

    Java實現(xiàn)在線語音識別

    這篇文章主要為大家詳細介紹了Java實現(xiàn)在線語音識別功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-08-08
  • SpringBoot Redis配置Fastjson進行序列化和反序列化實現(xiàn)

    SpringBoot Redis配置Fastjson進行序列化和反序列化實現(xiàn)

    這篇文章主要介紹了SpringBoot Redis配置Fastjson進行序列化和反序列化實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-10-10
  • Springboot攔截器如何獲取@RequestBody參數(shù)

    Springboot攔截器如何獲取@RequestBody參數(shù)

    這篇文章主要介紹了Springboot攔截器如何獲取@RequestBody參數(shù)的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • 實例解析使用Java實現(xiàn)基本的音頻播放器的編寫要點

    實例解析使用Java實現(xiàn)基本的音頻播放器的編寫要點

    這篇文章主要介紹了使用Java實現(xiàn)基本的音頻播放器的代碼要點實例分享,包括音頻文件的循環(huán)播放等功能實現(xiàn)的關(guān)鍵點,需要的朋友可以參考下
    2016-01-01
  • Java?SpringBoot項目如何優(yōu)雅的實現(xiàn)操作日志記錄

    Java?SpringBoot項目如何優(yōu)雅的實現(xiàn)操作日志記錄

    這篇文章主要介紹了Java?SpringBoot項目如何優(yōu)雅的實現(xiàn)操作日志記錄,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的朋友可以參考一下
    2022-08-08
  • idea一招搞定同步所有配置(導(dǎo)入或?qū)С鏊信渲?

    idea一招搞定同步所有配置(導(dǎo)入或?qū)С鏊信渲?

    使用intellij idea很長一段時間,軟件相關(guān)的配置也都按照自己習(xí)慣的設(shè)置好,如果需要重裝軟件,還得需要重新設(shè)置,本文就詳細的介紹了idea 同步所有配置,感興趣的可以了解一下
    2021-07-07
  • MyBatis全局映射文件實現(xiàn)原理解析

    MyBatis全局映射文件實現(xiàn)原理解析

    這篇文章主要介紹了MyBatis全局映射文件實現(xiàn)原理解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-08-08
  • java8 使用stream排序空字段排在前面或后面

    java8 使用stream排序空字段排在前面或后面

    這篇文章主要介紹了java8 使用stream排序空字段排在前面或后面的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07

最新評論