SpringBoot如何獲取Kafka的Topic列表
寫在前面
眾所周知,kafka是現(xiàn)代流行的消息隊列,它使用經(jīng)典的消息訂閱發(fā)布模式實現(xiàn)消息的流轉(zhuǎn),大部分代碼結(jié)合kafka使用都是使用它的生產(chǎn)者和消費者來實現(xiàn)消息的傳遞,那么對于kafka的主題的管理怎么使用代碼實現(xiàn)呢,這是今天要講的主題
命令行模式
kafka要結(jié)合zookeeper使用,因為它把元數(shù)據(jù)信息交給了zookeeper管理,其實使用命令行命令很容易就能對topic進行管理,主要使用的命令是kafka-topics.sh
創(chuàng)建主題 kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --replication-factor 3 --partitions 3 查看主題列表 kafka-topics.sh --zookeeper localhost:2181 --list 查看主題狀態(tài) kafka-topics.sh --describe ?--zookeeper 127.0.0.1:2181 --topic TestTopic?
代碼模式
那么話說回來如何使用代碼實現(xiàn)topic的管理呢,那么現(xiàn)在就來看一下代碼的實現(xiàn)方式,此處使用springboot2框架實現(xiàn)。
首先引進依賴kafka的相關(guān)
<dependency> ? ? <groupId>org.springframework.kafka</groupId> ?? ?<artifactId>spring-kafka</artifactId> </dependency>
創(chuàng)建一個測試類進行測試
public static void main(String[] args) { ? ? ? ? Properties properties = ?new Properties(); ? ? ? ? properties.put("bootstrap.servers", "10.0.59.11:9093"); ? ? ? ? properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); ? ? ? ? properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); ? ? ? ? AdminClient adminClient = AdminClient.create(properties); ? ? ? ? ListTopicsResult result = adminClient.listTopics(); ? ? ? ? KafkaFuture<Set<String>> names = result.names(); ? ? ? ? try { ? ? ? ? ? ? names.get().forEach((k)->{ ? ? ? ? ? ? ? ? System.out.println(k); ? ? ? ? ? ? }); ? ? ? ? } catch (InterruptedException | ExecutionException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? } ? ? ? ? adminClient.close(); ? ? }
這里面最主要的就是AdminClient這個類,AdminClient實現(xiàn)了Admin接口,Admin里面定義了許多和kafka配置相關(guān)的東西
讓我們依次來看一下
public abstract class AdminClient implements Admin { ? ? public AdminClient() { ? ? } ? ? public static AdminClient create(Properties props) { ? ? ? ? return (AdminClient)Admin.create(props); ? ? } ? ? public static AdminClient create(Map<String, Object> conf) { ? ? ? ? return (AdminClient)Admin.create(conf); ? ? } }
而Admin接口里有以下方法
static Admin create(Properties props) { ? ? ? ? return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), (TimeoutProcessorFactory)null); ? ? } ? ? static Admin create(Map<String, Object> conf) { ? ? ? ? return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), (TimeoutProcessorFactory)null); ? ? } ? ? default void close() { ? ? ? ? this.close(9223372036854775807L, TimeUnit.MILLISECONDS); ? ? } ? ? /** @deprecated */ ? ? @Deprecated ? ? default void close(long duration, TimeUnit unit) { ? ? ? ? this.close(Duration.ofMillis(unit.toMillis(duration))); ? ? } ? ? void close(Duration var1); ? ? default CreateTopicsResult createTopics(Collection<NewTopic> newTopics) { ? ? ? ? return this.createTopics(newTopics, new CreateTopicsOptions()); ? ? } ? ? CreateTopicsResult createTopics(Collection<NewTopic> var1, CreateTopicsOptions var2); ? ? default DeleteTopicsResult deleteTopics(Collection<String> topics) { ? ? ? ? return this.deleteTopics(topics, new DeleteTopicsOptions()); ? ? } ? ? DeleteTopicsResult deleteTopics(Collection<String> var1, DeleteTopicsOptions var2); ? ? default ListTopicsResult listTopics() { ? ? ? ? return this.listTopics(new ListTopicsOptions()); ? ? } ? ? ListTopicsResult listTopics(ListTopicsOptions var1); ? ? default DescribeTopicsResult describeTopics(Collection<String> topicNames) { ? ? ? ? return this.describeTopics(topicNames, new DescribeTopicsOptions()); ? ? } ? ? DescribeTopicsResult describeTopics(Collection<String> var1, DescribeTopicsOptions var2); ? ? default DescribeClusterResult describeCluster() { ? ? ? ? return this.describeCluster(new DescribeClusterOptions()); ? ? } ? ? DescribeClusterResult describeCluster(DescribeClusterOptions var1); ? ? default DescribeAclsResult describeAcls(AclBindingFilter filter) { ? ? ? ? return this.describeAcls(filter, new DescribeAclsOptions()); ? ? } ? ? DescribeAclsResult describeAcls(AclBindingFilter var1, DescribeAclsOptions var2);
通過名稱我們可以看出,里面有創(chuàng)建Topic,有刪除Topic,有列出所有Topic,有描述Topic
我們通過這些方法可以管理Kafka的Topic
最后我們來看一下實現(xiàn)效果
控制臺打印里面有3個Topic
去服務(wù)器命令行驗證一下
也是3個說明代碼沒問題
總結(jié)
網(wǎng)上大多數(shù)關(guān)于kafka的代碼實現(xiàn)都是關(guān)于生產(chǎn)者和消費者的實現(xiàn),今天主要是使用一下kakfa的配置管理類,實現(xiàn)對topic的管理,以此記錄作為以后工作中的參考。希望能給大家一個參考,也希望大家多多支持腳本之家。
- Spring?Boot整合Kafka教程詳解
- Spring?Boot?中使用@KafkaListener并發(fā)批量接收消息的完整代碼
- 基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解
- SpringBoot整合kafka遇到的版本不對應(yīng)問題及解決
- SpringBoot+Nacos+Kafka微服務(wù)流編排的簡單實現(xiàn)
- SpringBoot集成Kafka的步驟
- Spring Boot集群管理工具KafkaAdminClient使用方法解析
- Springboot集成Kafka實現(xiàn)producer和consumer的示例代碼
- Spring?Boot?基于?SCRAM?認證集成?Kafka?的過程詳解
相關(guān)文章
SpringBoot Redis配置Fastjson進行序列化和反序列化實現(xiàn)
這篇文章主要介紹了SpringBoot Redis配置Fastjson進行序列化和反序列化實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10Springboot攔截器如何獲取@RequestBody參數(shù)
這篇文章主要介紹了Springboot攔截器如何獲取@RequestBody參數(shù)的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06實例解析使用Java實現(xiàn)基本的音頻播放器的編寫要點
這篇文章主要介紹了使用Java實現(xiàn)基本的音頻播放器的代碼要點實例分享,包括音頻文件的循環(huán)播放等功能實現(xiàn)的關(guān)鍵點,需要的朋友可以參考下2016-01-01Java?SpringBoot項目如何優(yōu)雅的實現(xiàn)操作日志記錄
這篇文章主要介紹了Java?SpringBoot項目如何優(yōu)雅的實現(xiàn)操作日志記錄,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的朋友可以參考一下2022-08-08idea一招搞定同步所有配置(導(dǎo)入或?qū)С鏊信渲?
使用intellij idea很長一段時間,軟件相關(guān)的配置也都按照自己習(xí)慣的設(shè)置好,如果需要重裝軟件,還得需要重新設(shè)置,本文就詳細的介紹了idea 同步所有配置,感興趣的可以了解一下2021-07-07