欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot如何獲取Kafka的Topic列表

 更新時(shí)間:2022年09月29日 16:04:21   作者:AnthonyJing  
這篇文章主要介紹了SpringBoot如何獲取Kafka的Topic列表問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

寫在前面   

眾所周知,kafka是現(xiàn)代流行的消息隊(duì)列,它使用經(jīng)典的消息訂閱發(fā)布模式實(shí)現(xiàn)消息的流轉(zhuǎn),大部分代碼結(jié)合kafka使用都是使用它的生產(chǎn)者和消費(fèi)者來實(shí)現(xiàn)消息的傳遞,那么對于kafka的主題的管理怎么使用代碼實(shí)現(xiàn)呢,這是今天要講的主題 

命令行模式

kafka要結(jié)合zookeeper使用,因?yàn)樗言獢?shù)據(jù)信息交給了zookeeper管理,其實(shí)使用命令行命令很容易就能對topic進(jìn)行管理,主要使用的命令是kafka-topics.sh

創(chuàng)建主題
kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --replication-factor 3 --partitions 3
查看主題列表
kafka-topics.sh --zookeeper localhost:2181 --list
查看主題狀態(tài)
kafka-topics.sh --describe ?--zookeeper 127.0.0.1:2181 --topic TestTopic?

代碼模式

那么話說回來如何使用代碼實(shí)現(xiàn)topic的管理呢,那么現(xiàn)在就來看一下代碼的實(shí)現(xiàn)方式,此處使用springboot2框架實(shí)現(xiàn)。

首先引進(jìn)依賴kafka的相關(guān)

<dependency>
? ? <groupId>org.springframework.kafka</groupId>
?? ?<artifactId>spring-kafka</artifactId>
</dependency>

創(chuàng)建一個(gè)測試類進(jìn)行測試

public static void main(String[] args) {
? ? ? ? Properties properties = ?new Properties();
? ? ? ? properties.put("bootstrap.servers", "10.0.59.11:9093");
? ? ? ? properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ? ? ? properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ? ? ? AdminClient adminClient = AdminClient.create(properties);
? ? ? ? ListTopicsResult result = adminClient.listTopics();
? ? ? ? KafkaFuture<Set<String>> names = result.names();
? ? ? ? try {
? ? ? ? ? ? names.get().forEach((k)->{
? ? ? ? ? ? ? ? System.out.println(k);
? ? ? ? ? ? });
? ? ? ? } catch (InterruptedException | ExecutionException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? adminClient.close();
? ? }

這里面最主要的就是AdminClient這個(gè)類,AdminClient實(shí)現(xiàn)了Admin接口,Admin里面定義了許多和kafka配置相關(guān)的東西

讓我們依次來看一下

public abstract class AdminClient implements Admin {
? ? public AdminClient() {
? ? }

? ? public static AdminClient create(Properties props) {
? ? ? ? return (AdminClient)Admin.create(props);
? ? }

? ? public static AdminClient create(Map<String, Object> conf) {
? ? ? ? return (AdminClient)Admin.create(conf);
? ? }
}

而Admin接口里有以下方法

static Admin create(Properties props) {
? ? ? ? return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), (TimeoutProcessorFactory)null);
? ? }

? ? static Admin create(Map<String, Object> conf) {
? ? ? ? return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), (TimeoutProcessorFactory)null);
? ? }

? ? default void close() {
? ? ? ? this.close(9223372036854775807L, TimeUnit.MILLISECONDS);
? ? }

? ? /** @deprecated */
? ? @Deprecated
? ? default void close(long duration, TimeUnit unit) {
? ? ? ? this.close(Duration.ofMillis(unit.toMillis(duration)));
? ? }

? ? void close(Duration var1);

? ? default CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
? ? ? ? return this.createTopics(newTopics, new CreateTopicsOptions());
? ? }

? ? CreateTopicsResult createTopics(Collection<NewTopic> var1, CreateTopicsOptions var2);

? ? default DeleteTopicsResult deleteTopics(Collection<String> topics) {
? ? ? ? return this.deleteTopics(topics, new DeleteTopicsOptions());
? ? }

? ? DeleteTopicsResult deleteTopics(Collection<String> var1, DeleteTopicsOptions var2);

? ? default ListTopicsResult listTopics() {
? ? ? ? return this.listTopics(new ListTopicsOptions());
? ? }

? ? ListTopicsResult listTopics(ListTopicsOptions var1);

? ? default DescribeTopicsResult describeTopics(Collection<String> topicNames) {
? ? ? ? return this.describeTopics(topicNames, new DescribeTopicsOptions());
? ? }

? ? DescribeTopicsResult describeTopics(Collection<String> var1, DescribeTopicsOptions var2);

? ? default DescribeClusterResult describeCluster() {
? ? ? ? return this.describeCluster(new DescribeClusterOptions());
? ? }

? ? DescribeClusterResult describeCluster(DescribeClusterOptions var1);

? ? default DescribeAclsResult describeAcls(AclBindingFilter filter) {
? ? ? ? return this.describeAcls(filter, new DescribeAclsOptions());
? ? }

? ? DescribeAclsResult describeAcls(AclBindingFilter var1, DescribeAclsOptions var2);

通過名稱我們可以看出,里面有創(chuàng)建Topic,有刪除Topic,有列出所有Topic,有描述Topic

我們通過這些方法可以管理Kafka的Topic

最后我們來看一下實(shí)現(xiàn)效果 

控制臺(tái)打印里面有3個(gè)Topic

去服務(wù)器命令行驗(yàn)證一下

也是3個(gè)說明代碼沒問題

總結(jié)

網(wǎng)上大多數(shù)關(guān)于kafka的代碼實(shí)現(xiàn)都是關(guān)于生產(chǎn)者和消費(fèi)者的實(shí)現(xiàn),今天主要是使用一下kakfa的配置管理類,實(shí)現(xiàn)對topic的管理,以此記錄作為以后工作中的參考。希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論