欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java kafka如何實(shí)現(xiàn)自定義分區(qū)類和攔截器

 更新時(shí)間:2020年06月09日 09:42:23   作者:護(hù)花使者  
這篇文章主要介紹了Java kafka如何實(shí)現(xiàn)自定義分區(qū)類和攔截器,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下

生產(chǎn)者發(fā)送到對(duì)應(yīng)的分區(qū)有以下幾種方式:

(1)指定了patition,則直接使用;(可以查閱對(duì)應(yīng)的java api, 有多種參數(shù))

(2)未指定patition但指定key,通過(guò)對(duì)key的value進(jìn)行hash出一個(gè)patition;

(3)patition和key都未指定,使用輪詢選出一個(gè)patition。

但是kafka提供了,自定義分區(qū)算法的功能,由業(yè)務(wù)手動(dòng)實(shí)現(xiàn)分布:

1、實(shí)現(xiàn)一個(gè)自定義分區(qū)類,CustomPartitioner實(shí)現(xiàn)Partitioner

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {

  /**
   *
   * @param topic 當(dāng)前的發(fā)送的topic
   * @param key  當(dāng)前的key值
   * @param keyBytes 當(dāng)前的key的字節(jié)數(shù)組
   * @param value 當(dāng)前的value值
   * @param valueBytes 當(dāng)前的value的字節(jié)數(shù)組
   * @param cluster
   * @return
   */
  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    //這邊根據(jù)返回值就是分區(qū)號(hào), 這邊就是固定發(fā)送到三號(hào)分區(qū)
    return 3;
  }

  @Override
  public void close() {

  }
  @Override
  public void configure(Map<String, ?> configs) {

  }

}

2、producer配置文件指定,具體的分區(qū)類

// 具體的分區(qū)類
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer攔截器

攔截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機(jī)會(huì)對(duì)消息做一些定制化需求,比如修改消息等。

許用戶指定多個(gè)interceptor按序作用于同一條消息從而形成一個(gè)攔截鏈(interceptor chain)。

所使用的類為:

org.apache.kafka.clients.producer.ProducerInterceptor

我們可以編碼測(cè)試下:

1、定義消息攔截器,實(shí)現(xiàn)消息處理(可以是加時(shí)間戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;
import java.util.UUID;

public class MessageInterceptor implements ProducerInterceptor<String, String> {

  @Override
  public void configure(Map<String, ?> configs) {
    System.out.println("這是MessageInterceptor的configure方法");
  }

  /**
   * 這個(gè)是消息發(fā)送之前進(jìn)行處理
   *
   * @param record
   * @return
   */
  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    // 創(chuàng)建一個(gè)新的record,把uuid入消息體的最前部
    System.out.println("為消息添加uuid");
    return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
        UUID.randomUUID().toString().replace("-", "") + "," + record.value());
  }

  /**
   * 這個(gè)是生產(chǎn)者回調(diào)函數(shù)調(diào)用之前處理
   * @param metadata
   * @param exception
   */
  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    System.out.println("MessageInterceptor攔截器的onAcknowledgement方法");
  }

  @Override
  public void close() {
    System.out.println("MessageInterceptor close 方法");
  }
}

2、定義計(jì)數(shù)攔截器

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor<String, String>{
  private int errorCounter = 0;
  private int successCounter = 0;

  @Override
  public void configure(Map<String, ?> configs) {
    System.out.println("這是CounterInterceptor的configure方法");
  }

  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    System.out.println("CounterInterceptor計(jì)數(shù)過(guò)濾器不對(duì)消息做任何操作");
    return record;
  }

  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    // 統(tǒng)計(jì)成功和失敗的次數(shù)
    System.out.println("CounterInterceptor過(guò)濾器執(zhí)行統(tǒng)計(jì)失敗和成功數(shù)量");
    if (exception == null) {
      successCounter++;
    } else {
      errorCounter++;
    }
  }

  @Override
  public void close() {
    // 保存結(jié)果
    System.out.println("Successful sent: " + successCounter);
    System.out.println("Failed sent: " + errorCounter);
  }
}

3、producer客戶端:

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Producer1 {
  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    // Kafka服務(wù)端的主機(jī)名和端口號(hào)
    props.put("bootstrap.servers", "localhost:9092");
    // 等待所有副本節(jié)點(diǎn)的應(yīng)答
    props.put("acks", "all");
    // 消息發(fā)送最大嘗試次數(shù)
    props.put("retries", 0);
    // 一批消息處理大小
    props.put("batch.size", 16384);
    // 請(qǐng)求延時(shí),可能生產(chǎn)數(shù)據(jù)太快了
    props.put("linger.ms", 1);
    // 發(fā)送緩存區(qū)內(nèi)存大小,數(shù)據(jù)是先放到生產(chǎn)者的緩沖區(qū)
    props.put("buffer.memory", 33554432);
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 具體的分區(qū)類
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
    //定義攔截器
    List<String> interceptors = new ArrayList<>();
    interceptors.add("kafka.MessageInterceptor");
    interceptors.add("kafka.CounterInterceptor");
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 1; i++) {
      producer.send(new ProducerRecord<String, String>("test_0515", i + "", "xxx-" + i), new Callback() {
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
          System.out.println("這是producer回調(diào)函數(shù)");
        }
      });
    }
    /*System.out.println("現(xiàn)在執(zhí)行關(guān)閉producer");
    producer.close();*/
    producer.close();
  }
}

總結(jié),我們可以知道攔截器鏈各個(gè)方法的執(zhí)行順序,假如有A、B攔截器,在一個(gè)攔截器鏈中:

(1)執(zhí)行A的configure方法,執(zhí)行B的configure方法

(2)執(zhí)行A的onSend方法,B的onSend方法

(3)生產(chǎn)者發(fā)送完畢后,執(zhí)行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)執(zhí)行producer自身的callback回調(diào)函數(shù)。

(5)執(zhí)行A的close方法,B的close方法。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • 詳細(xì)聊聊JDK中的反模式接口常量

    詳細(xì)聊聊JDK中的反模式接口常量

    這篇文章主要給大家介紹了關(guān)于JDK中反模式接口常量的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用jdk具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2022-01-01
  • java web實(shí)現(xiàn)網(wǎng)上手機(jī)銷售系統(tǒng)

    java web實(shí)現(xiàn)網(wǎng)上手機(jī)銷售系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了java web實(shí)現(xiàn)網(wǎng)上手機(jī)銷售系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • Spring Boot Admin管理監(jiān)控?cái)?shù)據(jù)的方法

    Spring Boot Admin管理監(jiān)控?cái)?shù)據(jù)的方法

    本篇文章主要介紹了Spring Boot Admin管理監(jiān)控?cái)?shù)據(jù)的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-12-12
  • Java實(shí)現(xiàn)簡(jiǎn)單學(xué)生管理系統(tǒng)

    Java實(shí)現(xiàn)簡(jiǎn)單學(xué)生管理系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單學(xué)生管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-07-07
  • 關(guān)于spring中定時(shí)器的使用教程

    關(guān)于spring中定時(shí)器的使用教程

    大家應(yīng)該都有所體會(huì),在很多實(shí)際的web應(yīng)用中,都有需要定時(shí)實(shí)現(xiàn)的服務(wù),下面這篇文章主要給大家介紹了關(guān)于spring中定時(shí)器的使用教程,對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起看看吧。
    2017-06-06
  • java中接口和事件監(jiān)聽(tīng)器的深入理解

    java中接口和事件監(jiān)聽(tīng)器的深入理解

    這篇文章主要給大家介紹了關(guān)于java中接口和事件監(jiān)聽(tīng)器的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-12-12
  • java如何用Processing生成馬賽克風(fēng)格的圖像

    java如何用Processing生成馬賽克風(fēng)格的圖像

    這篇文章主要介紹了如何用java如何用Processing生成馬賽克風(fēng)格的圖像,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-03-03
  • Java 中的HashMap詳解和使用示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    Java 中的HashMap詳解和使用示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    這篇文章主要介紹了Java 中的HashMap詳解和使用示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理,需要的朋友可以參考下
    2017-05-05
  • 多層嵌套的json的值如何解析/替換

    多層嵌套的json的值如何解析/替換

    這篇文章主要介紹了多層嵌套的json的值如何解析/替換的方法示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-10-10
  • Java實(shí)現(xiàn)單例模式之餓漢式、懶漢式、枚舉式

    Java實(shí)現(xiàn)單例模式之餓漢式、懶漢式、枚舉式

    本篇文章主要介紹了Java實(shí)現(xiàn)單例的3種普遍的模式,餓漢式、懶漢式、枚舉式。具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。
    2016-10-10

最新評(píng)論