欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot如何獲取Kafka的Topic列表

 更新時間:2022年09月29日 16:04:21   作者:AnthonyJing  
這篇文章主要介紹了SpringBoot如何獲取Kafka的Topic列表問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

寫在前面   

眾所周知,kafka是現(xiàn)代流行的消息隊列,它使用經典的消息訂閱發(fā)布模式實現(xiàn)消息的流轉,大部分代碼結合kafka使用都是使用它的生產者和消費者來實現(xiàn)消息的傳遞,那么對于kafka的主題的管理怎么使用代碼實現(xiàn)呢,這是今天要講的主題 

命令行模式

kafka要結合zookeeper使用,因為它把元數據信息交給了zookeeper管理,其實使用命令行命令很容易就能對topic進行管理,主要使用的命令是kafka-topics.sh

創(chuàng)建主題
kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --replication-factor 3 --partitions 3
查看主題列表
kafka-topics.sh --zookeeper localhost:2181 --list
查看主題狀態(tài)
kafka-topics.sh --describe ?--zookeeper 127.0.0.1:2181 --topic TestTopic?

代碼模式

那么話說回來如何使用代碼實現(xiàn)topic的管理呢,那么現(xiàn)在就來看一下代碼的實現(xiàn)方式,此處使用springboot2框架實現(xiàn)。

首先引進依賴kafka的相關

<dependency>
? ? <groupId>org.springframework.kafka</groupId>
?? ?<artifactId>spring-kafka</artifactId>
</dependency>

創(chuàng)建一個測試類進行測試

public static void main(String[] args) {
? ? ? ? Properties properties = ?new Properties();
? ? ? ? properties.put("bootstrap.servers", "10.0.59.11:9093");
? ? ? ? properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ? ? ? properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ? ? ? AdminClient adminClient = AdminClient.create(properties);
? ? ? ? ListTopicsResult result = adminClient.listTopics();
? ? ? ? KafkaFuture<Set<String>> names = result.names();
? ? ? ? try {
? ? ? ? ? ? names.get().forEach((k)->{
? ? ? ? ? ? ? ? System.out.println(k);
? ? ? ? ? ? });
? ? ? ? } catch (InterruptedException | ExecutionException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? adminClient.close();
? ? }

這里面最主要的就是AdminClient這個類,AdminClient實現(xiàn)了Admin接口,Admin里面定義了許多和kafka配置相關的東西

讓我們依次來看一下

public abstract class AdminClient implements Admin {
? ? public AdminClient() {
? ? }

? ? public static AdminClient create(Properties props) {
? ? ? ? return (AdminClient)Admin.create(props);
? ? }

? ? public static AdminClient create(Map<String, Object> conf) {
? ? ? ? return (AdminClient)Admin.create(conf);
? ? }
}

而Admin接口里有以下方法

static Admin create(Properties props) {
? ? ? ? return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), (TimeoutProcessorFactory)null);
? ? }

? ? static Admin create(Map<String, Object> conf) {
? ? ? ? return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), (TimeoutProcessorFactory)null);
? ? }

? ? default void close() {
? ? ? ? this.close(9223372036854775807L, TimeUnit.MILLISECONDS);
? ? }

? ? /** @deprecated */
? ? @Deprecated
? ? default void close(long duration, TimeUnit unit) {
? ? ? ? this.close(Duration.ofMillis(unit.toMillis(duration)));
? ? }

? ? void close(Duration var1);

? ? default CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
? ? ? ? return this.createTopics(newTopics, new CreateTopicsOptions());
? ? }

? ? CreateTopicsResult createTopics(Collection<NewTopic> var1, CreateTopicsOptions var2);

? ? default DeleteTopicsResult deleteTopics(Collection<String> topics) {
? ? ? ? return this.deleteTopics(topics, new DeleteTopicsOptions());
? ? }

? ? DeleteTopicsResult deleteTopics(Collection<String> var1, DeleteTopicsOptions var2);

? ? default ListTopicsResult listTopics() {
? ? ? ? return this.listTopics(new ListTopicsOptions());
? ? }

? ? ListTopicsResult listTopics(ListTopicsOptions var1);

? ? default DescribeTopicsResult describeTopics(Collection<String> topicNames) {
? ? ? ? return this.describeTopics(topicNames, new DescribeTopicsOptions());
? ? }

? ? DescribeTopicsResult describeTopics(Collection<String> var1, DescribeTopicsOptions var2);

? ? default DescribeClusterResult describeCluster() {
? ? ? ? return this.describeCluster(new DescribeClusterOptions());
? ? }

? ? DescribeClusterResult describeCluster(DescribeClusterOptions var1);

? ? default DescribeAclsResult describeAcls(AclBindingFilter filter) {
? ? ? ? return this.describeAcls(filter, new DescribeAclsOptions());
? ? }

? ? DescribeAclsResult describeAcls(AclBindingFilter var1, DescribeAclsOptions var2);

通過名稱我們可以看出,里面有創(chuàng)建Topic,有刪除Topic,有列出所有Topic,有描述Topic

我們通過這些方法可以管理Kafka的Topic

最后我們來看一下實現(xiàn)效果 

控制臺打印里面有3個Topic

去服務器命令行驗證一下

也是3個說明代碼沒問題

總結

網上大多數關于kafka的代碼實現(xiàn)都是關于生產者和消費者的實現(xiàn),今天主要是使用一下kakfa的配置管理類,實現(xiàn)對topic的管理,以此記錄作為以后工作中的參考。希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

最新評論