欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kafka Java Producer代碼實例詳解

 更新時間:2020年06月04日 10:05:31   作者:liuming_1992  
這篇文章主要介紹了Kafka Java Producer代碼實例詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

根據(jù)業(yè)務需要可以使用Kafka提供的Java Producer API進行產(chǎn)生數(shù)據(jù),并將產(chǎn)生的數(shù)據(jù)發(fā)送到Kafka對應Topic的對應分區(qū)中,入口類為:Producer

Kafka的Producer API主要提供下列三個方法:

  •   public void send(KeyedMessage<K,V> message) 發(fā)送單條數(shù)據(jù)到Kafka集群
  •   public void send(List<KeyedMessage<K,V>> messages) 發(fā)送多條數(shù)據(jù)(數(shù)據(jù)集)到Kafka集群
  •   public void close() 關(guān)閉Kafka連接資源

一、JavaKafkaProducerPartitioner:自定義的數(shù)據(jù)分區(qū)器,功能是:決定輸入的key/value鍵值對的message發(fā)送到Topic的那個分區(qū)中,返回分區(qū)id,范圍:[0,分區(qū)數(shù)量); 這里的實現(xiàn)比較簡單,根據(jù)key中的數(shù)字決定分區(qū)的值。具體代碼如下:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
 * Created by gerry on 12/21.
 */
public class JavaKafkaProducerPartitioner implements Partitioner {

  /**
   * 無參構(gòu)造函數(shù)
   */
  public JavaKafkaProducerPartitioner() {
    this(new VerifiableProperties());
  }

  /**
   * 構(gòu)造函數(shù),必須給定
   *
   * @param properties 上下文
   */
  public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
    // nothings
  }

  @Override
  public int partition(Object key, int numPartitions) {
    int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());
    return num % numPartitions;
  }
}

二、 JavaKafkaProducer:通過Kafka提供的API進行數(shù)據(jù)產(chǎn)生操作的測試類;具體代碼如下:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.log4j.Logger;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ThreadLocalRandom;

/**
 * Created by gerry on 12/21.
 */
public class JavaKafkaProducer {
  private Logger logger = Logger.getLogger(JavaKafkaProducer.class);
  public static final String TOPIC_NAME = "test";
  public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
  public static final int chartsLength = charts.length;


  public static void main(String[] args) {
    String brokerList = "192.168.187.149:9092";
    brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";
    brokerList = "192.168.187.146:9092";
    Properties props = new Properties();
    props.put("metadata.broker.list", brokerList);
    /**
     * 0表示不等待結(jié)果返回<br/>
     * 1表示等待至少有一個服務器返回數(shù)據(jù)接收標識<br/>
     * -1表示必須接收到所有的服務器返回標識,及同步寫入<br/>
     * */
    props.put("request.required.acks", "0");
    /**
     * 內(nèi)部發(fā)送數(shù)據(jù)是異步還是同步
     * sync:同步, 默認
     * async:異步
     */
    props.put("producer.type", "async");
    /**
     * 設置序列化的類
     * 可選:kafka.serializer.StringEncoder
     * 默認:kafka.serializer.DefaultEncoder
     */
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    /**
     * 設置分區(qū)類
     * 根據(jù)key進行數(shù)據(jù)分區(qū)
     * 默認是:kafka.producer.DefaultPartitioner ==> 安裝key的hash進行分區(qū)
     * 可選:kafka.serializer.ByteArrayPartitioner ==> 轉(zhuǎn)換為字節(jié)數(shù)組后進行hash分區(qū)
     */
    props.put("partitioner.class", "JavaKafkaProducerPartitioner");

    // 重試次數(shù)
    props.put("message.send.max.retries", "3");

    // 異步提交的時候(async),并發(fā)提交的記錄數(shù)
    props.put("batch.num.messages", "200");

    // 設置緩沖區(qū)大小,默認10KB
    props.put("send.buffer.bytes", "102400");

    // 2. 構(gòu)建Kafka Producer Configuration上下文
    ProducerConfig config = new ProducerConfig(props);

    // 3. 構(gòu)建Producer對象
    final Producer<String, String> producer = new Producer<String, String>(config);

    // 4. 發(fā)送數(shù)據(jù)到服務器,并發(fā)線程發(fā)送
    final AtomicBoolean flag = new AtomicBoolean(true);
    int numThreads = 50;
    ExecutorService pool = Executors.newFixedThreadPool(numThreads);
    for (int i = 0; i < 5; i++) {
      pool.submit(new Thread(new Runnable() {
        @Override
        public void run() {
          while (flag.get()) {
            // 發(fā)送數(shù)據(jù)
            KeyedMessage message = generateKeyedMessage();
            producer.send(message);
            System.out.println("發(fā)送數(shù)據(jù):" + message);

            // 休眠一下
            try {
              int least = 10;
              int bound = 100;
              Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }

          System.out.println(Thread.currentThread().getName() + " shutdown....");
        }
      }, "Thread-" + i));

    }

    // 5. 等待執(zhí)行完成
    long sleepMillis = 600000;
    try {
      Thread.sleep(sleepMillis);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    flag.set(false);

    // 6. 關(guān)閉資源

    pool.shutdown();
    try {
      pool.awaitTermination(6, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
    } finally {
      producer.close(); // 最后之后調(diào)用
    }
  }

  /**
   * 產(chǎn)生一個消息
   *
   * @return
   */
  private static KeyedMessage<String, String> generateKeyedMessage() {
    String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);
    StringBuilder sb = new StringBuilder();
    int num = ThreadLocalRandom.current().nextInt(1, 5);
    for (int i = 0; i < num; i++) {
      sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");
    }
    String message = sb.toString().trim();
    return new KeyedMessage(TOPIC_NAME, key, message);
  }

  /**
   * 產(chǎn)生一個給定長度的字符串
   *
   * @param numItems
   * @return
   */
  private static String generateStringMessage(int numItems) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < numItems; i++) {
      sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);
    }
    return sb.toString();
  }
}

三、Pom.xml依賴配置如下

<properties>
  <kafka.version>0.8.2.1</kafka.version>
</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>${kafka.version}</version>
  </dependency>
</dependencies>

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • java數(shù)組復制的四種方法效率對比

    java數(shù)組復制的四種方法效率對比

    這篇文章主要介紹了java數(shù)組復制的四種方法效率對比,文中有簡單的代碼示例,以及效率的比較結(jié)果,具有一定參考價值,需要的朋友可以了解下。
    2017-11-11
  • Java之Maven工程打包jar

    Java之Maven工程打包jar

    Maven打包一般可以生成兩種包一種是可以直接運行的包,一種是依賴包(只是編譯包)。Maven默認打包時jar,如果需要修改其他類型,可以修改pom.xml。感興趣的同學可以參考閱讀
    2023-04-04
  • MyBatis逆向工程生成dao層增刪改查的操作

    MyBatis逆向工程生成dao層增刪改查的操作

    這篇文章主要介紹了MyBatis逆向工程生成dao層增刪改查的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Spark Streaming編程初級實踐詳解

    Spark Streaming編程初級實踐詳解

    這篇文章主要為大家介紹了Spark Streaming編程初級實踐詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-04-04
  • SpringBoot項目集成Flyway進行數(shù)據(jù)庫版本控制的詳細教程

    SpringBoot項目集成Flyway進行數(shù)據(jù)庫版本控制的詳細教程

    這篇文章主要介紹了SpringBoot項目集成Flyway進行數(shù)據(jù)庫版本控制,本文分步驟通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-07-07
  • Java實體類之間的相互轉(zhuǎn)換方式

    Java實體類之間的相互轉(zhuǎn)換方式

    這篇文章主要介紹了Java實體類之間的相互轉(zhuǎn)換方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • Java Base64算法實際應用之郵件發(fā)送實例分析

    Java Base64算法實際應用之郵件發(fā)送實例分析

    這篇文章主要介紹了Java Base64算法實際應用之郵件發(fā)送,結(jié)合實例形式分析了java字符編碼與郵件發(fā)送相關(guān)操作技巧,需要的朋友可以參考下
    2019-09-09
  • Java 導出excel進行換行的案例

    Java 導出excel進行換行的案例

    這篇文章主要介紹了Java 導出excel進行換行的案例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • Spring Bean生命周期之Bean元信息的配置與解析階段詳解

    Spring Bean生命周期之Bean元信息的配置與解析階段詳解

    這篇文章主要為大家詳細介紹了Spring Bean生命周期之Bean元信息的配置與解析階段,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-03-03
  • java用applet畫圖用到的方法(涉及雙緩沖)

    java用applet畫圖用到的方法(涉及雙緩沖)

    這篇文章主要介紹了java用applet畫圖用到的方法(涉及雙緩沖),然后作為基礎(chǔ)的基礎(chǔ)的基礎(chǔ),必須學習如何讓鍵盤與界面進行交互。下面就是對一個基礎(chǔ)得不能再基礎(chǔ)的applet程序,需要的朋友可以參考下
    2019-06-06

最新評論