欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java API方式調(diào)用Kafka各種協(xié)議的方法

 更新時(shí)間:2017年09月06日 10:10:48   作者:huxihx  
本篇文章主要介紹了Java API方式調(diào)用Kafka各種協(xié)議的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧

眾所周知,Kafka自己實(shí)現(xiàn)了一套二進(jìn)制協(xié)議(binary protocol)用于各種功能的實(shí)現(xiàn),比如發(fā)送消息,獲取消息,提交位移以及創(chuàng)建topic等。具體協(xié)議規(guī)范參見(jiàn):Kafka協(xié)議  這套協(xié)議的具體使用流程為:

1.客戶(hù)端創(chuàng)建對(duì)應(yīng)協(xié)議的請(qǐng)求

2.客戶(hù)端發(fā)送請(qǐng)求給對(duì)應(yīng)的broker

3.broker處理請(qǐng)求,并發(fā)送response給客戶(hù)端

雖然Kafka提供的大量的腳本工具用于各種功能的實(shí)現(xiàn),但很多時(shí)候我們還是希望可以把某些功能以編程的方式嵌入到另一個(gè)系統(tǒng)中。這時(shí)使用Java API的方式就顯得異常地靈活了。本文我將嘗試給出Java API底層框架的一個(gè)范例,同時(shí)也會(huì)針對(duì)“創(chuàng)建topic”和“查看位移”這兩個(gè)主要功能給出對(duì)應(yīng)的例子。 需要提前說(shuō)明的是,本文給出的范例并沒(méi)有考慮Kafka集群開(kāi)啟安全的情況。另外Kafka的KIP4應(yīng)該一直在優(yōu)化命令行工具以及各種管理操作,有興趣的讀者可以關(guān)注這個(gè)KIP。

本文中用到的API依賴(lài)于kafka-clients,所以如果你使用Maven構(gòu)建的話(huà),請(qǐng)加上:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.2.0</version>
</dependency>

如果是gradle,請(qǐng)加上:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'

底層框架

/**
   * 發(fā)送請(qǐng)求主方法
   * @param host     目標(biāo)broker的主機(jī)名
   * @param port     目標(biāo)broker的端口
   * @param request    請(qǐng)求對(duì)象
   * @param apiKey    請(qǐng)求類(lèi)型
   * @return       序列化后的response
   * @throws IOException
   */
  public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {
    Socket socket = connect(host, port);
    try {
      return send(request, apiKey, socket);
    } finally {
      socket.close();
    }
  }

  /**
   * 發(fā)送序列化請(qǐng)求并等待response返回
   * @param socket      連向目標(biāo)broker的socket
   * @param request      序列化后的請(qǐng)求
   * @return         序列化后的response
   * @throws IOException
   */
  private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
    sendRequest(socket, request);
    return getResponse(socket);
  }

  /**
   * 發(fā)送序列化請(qǐng)求給socket
   * @param socket      連向目標(biāo)broker的socket
   * @param request      序列化后的請(qǐng)求
   * @throws IOException
   */
  private void sendRequest(Socket socket, byte[] request) throws IOException {
    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
    dos.writeInt(request.length);
    dos.write(request);
    dos.flush();
  }

  /**
   * 從給定socket處獲取response
   * @param socket      連向目標(biāo)broker的socket
   * @return         獲取到的序列化后的response
   * @throws IOException
   */
  private byte[] getResponse(Socket socket) throws IOException {
    DataInputStream dis = null;
    try {
      dis = new DataInputStream(socket.getInputStream());
      byte[] response = new byte[dis.readInt()];
      dis.readFully(response);
      return response;
    } finally {
      if (dis != null) {
        dis.close();
      }
    }
  }

  /**
   * 創(chuàng)建Socket連接
   * @param hostName     目標(biāo)broker主機(jī)名
   * @param port       目標(biāo)broker服務(wù)端口, 比如9092
   * @return         創(chuàng)建的Socket連接
   * @throws IOException
   */
  private Socket connect(String hostName, int port) throws IOException {
    return new Socket(hostName, port);
  }

  /**
   * 向給定socket發(fā)送請(qǐng)求
   * @param request    請(qǐng)求對(duì)象
   * @param apiKey    請(qǐng)求類(lèi)型, 即屬于哪種請(qǐng)求
   * @param socket    連向目標(biāo)broker的socket
   * @return       序列化后的response
   * @throws IOException
   */
  private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
    RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
    ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
    header.writeTo(buffer);
    request.writeTo(buffer);
    byte[] serializedRequest = buffer.array();
    byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
    ByteBuffer responseBuffer = ByteBuffer.wrap(response);
    ResponseHeader.parse(responseBuffer);
    return responseBuffer;
  }

有了這些方法的鋪墊,我們就可以創(chuàng)建具體的請(qǐng)求了。

創(chuàng)建topic

/**
   * 創(chuàng)建topic
   * 由于只是樣例代碼,有些東西就硬編碼寫(xiě)到程序里面了(比如主機(jī)名和端口),各位看官自行修改即可
   * @param topicName       topic名
   * @param partitions      分區(qū)數(shù)
   * @param replicationFactor   副本數(shù)
   * @throws IOException
   */
  public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException {
    Map<String, CreateTopicsRequest.TopicDetails> topics = new HashMap<>();
    // 插入多個(gè)元素便可同時(shí)創(chuàng)建多個(gè)topic
    topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor));
    int creationTimeoutMs = 60000;
    CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.CREATE_TOPICS);
    CreateTopicsResponse.parse(response, request.version());
  }

查看位移

/**
   * 獲取某個(gè)consumer group下的某個(gè)topic分區(qū)的位移
   * @param groupID      group id
   * @param topic       topic名
   * @param parititon     分區(qū)號(hào)
   * @throws IOException
   */
  public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
    TopicPartition tp = new TopicPartition(topic, parititon);
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp))
        .setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
    System.out.println(partitionData.offset);
  }
/**
   * 獲取某個(gè)consumer group下所有topic分區(qū)的位移信息
   * @param groupID      group id
   * @return         (topic分區(qū) --> 分區(qū)信息)的map
   * @throws IOException
   */
  public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException {
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    return resp.responseData();
  }

okay, 上面就是“創(chuàng)建topic”和“查看位移”的樣例代碼,各位看官可以參考著這兩個(gè)例子構(gòu)建其他類(lèi)型的請(qǐng)求。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Java線(xiàn)程安全之volatile詳解

    Java線(xiàn)程安全之volatile詳解

    這篇文章主要介紹了Java線(xiàn)程安全之volatile詳解,volatile 的存在,解決了不同內(nèi)存間拷貝的同步問(wèn)題,在每一次使用或者修改時(shí)候,都去原持有內(nèi)存中去拿最新的狀態(tài),需要的朋友可以參考下
    2023-08-08
  • 基于Java匯總Spock框架Mock靜態(tài)資源經(jīng)驗(yàn)

    基于Java匯總Spock框架Mock靜態(tài)資源經(jīng)驗(yàn)

    這篇文章主要介紹了基于Java匯總Spock框架Mock靜態(tài)資源經(jīng)驗(yàn),前面講了?Spock框架Mock對(duì)象、方法經(jīng)驗(yàn)總結(jié),今天分享一下Spock框架中Mock靜態(tài)資源的實(shí)踐經(jīng)驗(yàn)匯總。分成靜態(tài)資源和混合場(chǎng)景,需要的朋友可以參考一下
    2022-02-02
  • 使用IntelliJ IDEA查看類(lèi)的繼承關(guān)系圖形(圖文詳解)

    使用IntelliJ IDEA查看類(lèi)的繼承關(guān)系圖形(圖文詳解)

    這篇文章主要介紹了使用IntelliJ IDEA查看類(lèi)的繼承關(guān)系圖形,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的工作或?qū)W習(xí)具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-03-03
  • MyBatis工廠(chǎng)類(lèi)封裝與簡(jiǎn)化實(shí)現(xiàn)

    MyBatis工廠(chǎng)類(lèi)封裝與簡(jiǎn)化實(shí)現(xiàn)

    工廠(chǎng)類(lèi)的目的是將對(duì)象的創(chuàng)建邏輯封裝在一個(gè)類(lèi)中,以便客戶(hù)端代碼無(wú)需了解具體的實(shí)現(xiàn)細(xì)節(jié),本文主要介紹了MyBatis工廠(chǎng)類(lèi)封裝與簡(jiǎn)化實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-01-01
  • 解決springboot mapper注入報(bào)紅問(wèn)題

    解決springboot mapper注入報(bào)紅問(wèn)題

    這篇文章主要介紹了解決springboot mapper注入報(bào)紅問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • java中l(wèi)ong數(shù)據(jù)類(lèi)型轉(zhuǎn)換為int類(lèi)型

    java中l(wèi)ong數(shù)據(jù)類(lèi)型轉(zhuǎn)換為int類(lèi)型

    這篇文章主要講解Java中基本數(shù)據(jù)類(lèi)型,java long 類(lèi)型與其java int類(lèi)型的轉(zhuǎn)換的幾種方法,希望能給大家做一個(gè)參考
    2016-07-07
  • java實(shí)現(xiàn)音頻文件播放功能

    java實(shí)現(xiàn)音頻文件播放功能

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)音頻文件播放功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-12-12
  • Spring Boot項(xiàng)目中定制攔截器的方法詳解

    Spring Boot項(xiàng)目中定制攔截器的方法詳解

    這篇文章主要介紹了Spring Boot項(xiàng)目中定制攔截器的方法詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10
  • IDEA使用Mybatis插件 MyBatisCodeHelper-Pro的圖文教程

    IDEA使用Mybatis插件 MyBatisCodeHelper-Pro的圖文教程

    這篇文章主要介紹了IDEA使用Mybatis插件 MyBatisCodeHelper-Pro的教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-09-09
  • java讀取excel文件并復(fù)制(copy)文件到指定目錄示例

    java讀取excel文件并復(fù)制(copy)文件到指定目錄示例

    這篇文章主要介紹了java讀取excel文件并復(fù)制文件到指定目錄示例,需要的朋友可以參考下
    2014-02-02

最新評(píng)論