欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring Boot集群管理工具KafkaAdminClient使用方法解析

 更新時間:2020年02月25日 10:25:56   作者:---WeiGeH  
這篇文章主要介紹了Spring Boot集群管理工具KafkaAdminClient使用方法解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

原理介紹

在Kafka官網(wǎng)中這么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準):

  • 創(chuàng)建Topic:createTopics(Collection<NewTopic> newTopics)
  • 刪除Topic:deleteTopics(Collection<String> topics)
  • 羅列所有Topic:listTopics()
  • 查詢Topic:describeTopics(Collection<String> topicNames)
  • 查詢集群信息:describeCluster()
  • 查詢ACL信息:describeAcls(AclBindingFilter filter)
  • 創(chuàng)建ACL信息:createAcls(Collection<AclBinding> acls)
  • 刪除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
  • 查詢配置信息:describeConfigs(Collection<ConfigResource> resources)
  • 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
  • 修改副本的日志目錄:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
  • 查詢節(jié)點的日志目錄信息:describeLogDirs(Collection<Integer> brokers)
  • 查詢副本的日志目錄信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
  • 增加分區(qū):createPartitions(Map<String, NewPartitions> newPartitions)

其內(nèi)部原理是使用Kafka自定義的一套二進制協(xié)議來實現(xiàn),詳細可以參見Kafka協(xié)議。主要實現(xiàn)步驟:

客戶端根據(jù)方法的調(diào)用創(chuàng)建相應的協(xié)議請求,比如創(chuàng)建Topic的createTopics方法,其內(nèi)部就是發(fā)送CreateTopicRequest請求。
客戶端發(fā)送請求至Kafka Broker。

Kafka Broker處理相應的請求并回執(zhí),比如與CreateTopicRequest對應的是CreateTopicResponse。
客戶端接收相應的回執(zhí)并進行解析處理。

和協(xié)議有關(guān)的請求和回執(zhí)的類基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是這些請求和回執(zhí)類的兩個基本父類。

代碼如下

@Component
public class KafkaConfig{

   // 配置Kafka
  public Properties getProps(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
/*    props.put("retries", 2); // 重試次數(shù)
    props.put("batch.size", 16384); // 批量發(fā)送大小
    props.put("buffer.memory", 33554432); // 緩存大小,根據(jù)本機內(nèi)存大小配置
    props.put("linger.ms", 1000); // 發(fā)送頻率,滿足任務一個條件發(fā)送*/
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
  }

}
@RestController
public class KafkaTopicManager {

  @Autowired
  private KafkaConfig kafkaConfig;

  @GetMapping("createTopic")
  public void createTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

    NewTopic newTopic = new NewTopic("test1",4, (short) 1);
    Collection<NewTopic> newTopicList = new ArrayList<>();
    newTopicList.add(newTopic);
    adminClient.createTopics(newTopicList);

    adminClient.close();
  }
  @GetMapping("deleteTopic")
  public void deleteTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
    adminClient.deleteTopics(Arrays.asList("test1"));
    adminClient.close();
  }
  @GetMapping("listAllTopic")
  public void listAllTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
    ListTopicsResult result = adminClient.listTopics();
    KafkaFuture<Set<String>> names = result.names();
    try {
      names.get().forEach((k)->{
        System.out.println(k);
      });
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
    adminClient.close();
  }
  @GetMapping("getTopic")
  public void getTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

    DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test"));

    Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values();

    if(values.isEmpty()){
      System.out.println("找不到描述信息");
    }else{
      for (KafkaFuture<TopicDescription> value : values) {
        System.out.println(value);
      }
    }
    adminClient.close();
  }
}

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Java兩種方式實現(xiàn)動態(tài)代理

    Java兩種方式實現(xiàn)動態(tài)代理

    Java 在 java.lang.reflect 包中有自己的代理支持,該類(Proxy.java)用于動態(tài)生成代理類,只需傳入目標接口、目標接口的類加載器以及 InvocationHandler 便可為目標接口生成代理類及代理對象。我們稱這個Java技術(shù)為:動態(tài)代理
    2020-10-10
  • Jackson處理Optional時遇到問題的解決與分析

    Jackson處理Optional時遇到問題的解決與分析

    Optional是Java實現(xiàn)函數(shù)式編程的強勁一步,并且?guī)椭诜妒街袑崿F(xiàn),但是Optional的意義顯然不止于此,下面這篇文章主要給大家介紹了關(guān)于Jackson處理Optional時遇到問題的解決與分析的相關(guān)資料,需要的朋友可以參考下
    2022-02-02
  • spring boot實現(xiàn)圖片上傳和下載功能

    spring boot實現(xiàn)圖片上傳和下載功能

    這篇文章主要為大家詳細介紹了spring boot實現(xiàn)圖片上傳和下載功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-02-02
  • Springboot?Mybatis使用pageHelper如何實現(xiàn)分頁查詢

    Springboot?Mybatis使用pageHelper如何實現(xiàn)分頁查詢

    這篇文章主要介紹了Springboot?Mybatis使用pageHelper如何實現(xiàn)分頁查詢問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • Java?Collection?接口和常用方法綜合詳解

    Java?Collection?接口和常用方法綜合詳解

    Collection派生出三個子接口,Set代表不可重復的無序集合、List代表可重復的有序集合、Queue是java提供的隊列實現(xiàn),通過它們不斷的擴展出很多的集合類,接下來我們詳細介紹一下
    2021-11-11
  • Mac系統(tǒng)搭建JDK及JMETER過程解析

    Mac系統(tǒng)搭建JDK及JMETER過程解析

    這篇文章主要介紹了Mac系統(tǒng)搭建JDK及JMETER過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-08-08
  • MyBatisPlus利用Service實現(xiàn)獲取數(shù)據(jù)列表

    MyBatisPlus利用Service實現(xiàn)獲取數(shù)據(jù)列表

    這篇文章主要為大家詳細介紹了怎樣使用 IServer 提供的 list 方法查詢多條數(shù)據(jù),這些方法將根據(jù)查詢條件獲取多條數(shù)據(jù),感興趣的可以了解一下
    2022-06-06
  • JAVA線程sleep()和wait()詳解及實例

    JAVA線程sleep()和wait()詳解及實例

    這篇文章主要介紹了JAVA線程sleep()和wait()詳解及實例的相關(guān)資料,探討一下sleep()和wait()方法的區(qū)別和實現(xiàn)機制,需要的朋友可以參考下
    2017-05-05
  • java檢查數(shù)組是否有重復元素的方法

    java檢查數(shù)組是否有重復元素的方法

    這篇文章主要介紹了java檢查數(shù)組是否有重復元素的方法,涉及java針對數(shù)組元素的操作技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-07-07
  • Java Map 按Key排序?qū)嵗a

    Java Map 按Key排序?qū)嵗a

    這篇文章主要介紹了Java Map 按Key排序?qū)嵗a的相關(guān)資料,需要的朋友可以參考下
    2017-02-02

最新評論