SpringBoot如何獲取Kafka的Topic列表
寫在前面
眾所周知,kafka是現(xiàn)代流行的消息隊列,它使用經(jīng)典的消息訂閱發(fā)布模式實現(xiàn)消息的流轉(zhuǎn),大部分代碼結(jié)合kafka使用都是使用它的生產(chǎn)者和消費者來實現(xiàn)消息的傳遞,那么對于kafka的主題的管理怎么使用代碼實現(xiàn)呢,這是今天要講的主題
命令行模式
kafka要結(jié)合zookeeper使用,因為它把元數(shù)據(jù)信息交給了zookeeper管理,其實使用命令行命令很容易就能對topic進行管理,主要使用的命令是kafka-topics.sh
創(chuàng)建主題 kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --replication-factor 3 --partitions 3 查看主題列表 kafka-topics.sh --zookeeper localhost:2181 --list 查看主題狀態(tài) kafka-topics.sh --describe ?--zookeeper 127.0.0.1:2181 --topic TestTopic?
代碼模式
那么話說回來如何使用代碼實現(xiàn)topic的管理呢,那么現(xiàn)在就來看一下代碼的實現(xiàn)方式,此處使用springboot2框架實現(xiàn)。
首先引進依賴kafka的相關(guān)
<dependency> ? ? <groupId>org.springframework.kafka</groupId> ?? ?<artifactId>spring-kafka</artifactId> </dependency>
創(chuàng)建一個測試類進行測試
public static void main(String[] args) {
? ? ? ? Properties properties = ?new Properties();
? ? ? ? properties.put("bootstrap.servers", "10.0.59.11:9093");
? ? ? ? properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ? ? ? properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ? ? ? AdminClient adminClient = AdminClient.create(properties);
? ? ? ? ListTopicsResult result = adminClient.listTopics();
? ? ? ? KafkaFuture<Set<String>> names = result.names();
? ? ? ? try {
? ? ? ? ? ? names.get().forEach((k)->{
? ? ? ? ? ? ? ? System.out.println(k);
? ? ? ? ? ? });
? ? ? ? } catch (InterruptedException | ExecutionException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? adminClient.close();
? ? }這里面最主要的就是AdminClient這個類,AdminClient實現(xiàn)了Admin接口,Admin里面定義了許多和kafka配置相關(guān)的東西
讓我們依次來看一下
public abstract class AdminClient implements Admin {
? ? public AdminClient() {
? ? }
? ? public static AdminClient create(Properties props) {
? ? ? ? return (AdminClient)Admin.create(props);
? ? }
? ? public static AdminClient create(Map<String, Object> conf) {
? ? ? ? return (AdminClient)Admin.create(conf);
? ? }
}而Admin接口里有以下方法
static Admin create(Properties props) {
? ? ? ? return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), (TimeoutProcessorFactory)null);
? ? }
? ? static Admin create(Map<String, Object> conf) {
? ? ? ? return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), (TimeoutProcessorFactory)null);
? ? }
? ? default void close() {
? ? ? ? this.close(9223372036854775807L, TimeUnit.MILLISECONDS);
? ? }
? ? /** @deprecated */
? ? @Deprecated
? ? default void close(long duration, TimeUnit unit) {
? ? ? ? this.close(Duration.ofMillis(unit.toMillis(duration)));
? ? }
? ? void close(Duration var1);
? ? default CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
? ? ? ? return this.createTopics(newTopics, new CreateTopicsOptions());
? ? }
? ? CreateTopicsResult createTopics(Collection<NewTopic> var1, CreateTopicsOptions var2);
? ? default DeleteTopicsResult deleteTopics(Collection<String> topics) {
? ? ? ? return this.deleteTopics(topics, new DeleteTopicsOptions());
? ? }
? ? DeleteTopicsResult deleteTopics(Collection<String> var1, DeleteTopicsOptions var2);
? ? default ListTopicsResult listTopics() {
? ? ? ? return this.listTopics(new ListTopicsOptions());
? ? }
? ? ListTopicsResult listTopics(ListTopicsOptions var1);
? ? default DescribeTopicsResult describeTopics(Collection<String> topicNames) {
? ? ? ? return this.describeTopics(topicNames, new DescribeTopicsOptions());
? ? }
? ? DescribeTopicsResult describeTopics(Collection<String> var1, DescribeTopicsOptions var2);
? ? default DescribeClusterResult describeCluster() {
? ? ? ? return this.describeCluster(new DescribeClusterOptions());
? ? }
? ? DescribeClusterResult describeCluster(DescribeClusterOptions var1);
? ? default DescribeAclsResult describeAcls(AclBindingFilter filter) {
? ? ? ? return this.describeAcls(filter, new DescribeAclsOptions());
? ? }
? ? DescribeAclsResult describeAcls(AclBindingFilter var1, DescribeAclsOptions var2);通過名稱我們可以看出,里面有創(chuàng)建Topic,有刪除Topic,有列出所有Topic,有描述Topic
我們通過這些方法可以管理Kafka的Topic
最后我們來看一下實現(xiàn)效果

控制臺打印里面有3個Topic
去服務(wù)器命令行驗證一下

也是3個說明代碼沒問題
總結(jié)
網(wǎng)上大多數(shù)關(guān)于kafka的代碼實現(xiàn)都是關(guān)于生產(chǎn)者和消費者的實現(xiàn),今天主要是使用一下kakfa的配置管理類,實現(xiàn)對topic的管理,以此記錄作為以后工作中的參考。希望能給大家一個參考,也希望大家多多支持腳本之家。
- Spring?Boot整合Kafka教程詳解
- Spring?Boot?中使用@KafkaListener并發(fā)批量接收消息的完整代碼
- 基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解
- SpringBoot整合kafka遇到的版本不對應(yīng)問題及解決
- SpringBoot+Nacos+Kafka微服務(wù)流編排的簡單實現(xiàn)
- SpringBoot集成Kafka的步驟
- Spring Boot集群管理工具KafkaAdminClient使用方法解析
- Springboot集成Kafka實現(xiàn)producer和consumer的示例代碼
- Spring?Boot?基于?SCRAM?認(rèn)證集成?Kafka?的過程詳解
相關(guān)文章
SpringBoot Redis配置Fastjson進行序列化和反序列化實現(xiàn)
這篇文章主要介紹了SpringBoot Redis配置Fastjson進行序列化和反序列化實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10
Springboot攔截器如何獲取@RequestBody參數(shù)
這篇文章主要介紹了Springboot攔截器如何獲取@RequestBody參數(shù)的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06
實例解析使用Java實現(xiàn)基本的音頻播放器的編寫要點
這篇文章主要介紹了使用Java實現(xiàn)基本的音頻播放器的代碼要點實例分享,包括音頻文件的循環(huán)播放等功能實現(xiàn)的關(guān)鍵點,需要的朋友可以參考下2016-01-01
Java?SpringBoot項目如何優(yōu)雅的實現(xiàn)操作日志記錄
這篇文章主要介紹了Java?SpringBoot項目如何優(yōu)雅的實現(xiàn)操作日志記錄,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的朋友可以參考一下2022-08-08
idea一招搞定同步所有配置(導(dǎo)入或?qū)С鏊信渲?
使用intellij idea很長一段時間,軟件相關(guān)的配置也都按照自己習(xí)慣的設(shè)置好,如果需要重裝軟件,還得需要重新設(shè)置,本文就詳細的介紹了idea 同步所有配置,感興趣的可以了解一下2021-07-07

