欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kafka簡單客戶端編程實例

 更新時間:2017年11月08日 11:33:31   作者:liuyazhuang  
這篇文章主要為大家詳細介紹了Kafka簡單客戶端編程實例,利用Kafka的API進行客戶端編程,具有一定的參考價值,感興趣的小伙伴們可以參考一下

今天,我們給大家?guī)硪黄绾卫肒afka的API進行客戶端編程的文章,這篇文章很簡單,就是利用Kafka的API創(chuàng)建一個生產(chǎn)者和消費者,生產(chǎn)者不斷向Kafka寫入消息,消費者則不斷消費Kafka的消息。下面是具體的實例代碼。

一、創(chuàng)建配置類Config

這個類很簡單,只是存放了兩個常量,一個是話題TOPIC,一個是線程數(shù)THREADS

package com.lya.kafka; 
 
/** 
 * 配置項 
 * @author liuyazhuang 
 * 
 */ 
public class Config { 
  
 /** 
  * 話題 
  */ 
 public static final String TOPIC = "wordcount"; 
 /** 
  * 線程數(shù) 
  */ 
 public static final Integer THREADS = 1; 
} 

二、編程生產(chǎn)者類ProducerDemo

這個類的主要作用就是向Kafka寫入相應(yīng)的消息,并且將消息寫入wordcount話題。

package com.lya.kafka; 
 
import java.util.Properties; 
 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
 
/** 
 * 生產(chǎn)者實例 
 * @author liuyazhuang 
 * 
 */ 
public class ProducerDemo { 
 public static void main(String[] args) throws Exception { 
  Properties props = new Properties(); 
  props.put("zk.connect", "192.168.209.121:2181"); 
  props.put("metadata.broker.list","192.168.209.121:9092"); 
  props.put("serializer.class", "kafka.serializer.StringEncoder"); 
  props.put("zk.connectiontimeout.ms", "15000"); 
  ProducerConfig config = new ProducerConfig(props); 
  Producer<String, String> producer = new Producer<String, String>(config); 
 
  // 發(fā)送業(yè)務(wù)消息 
  // 讀取文件 讀取內(nèi)存數(shù)據(jù)庫 讀socket端口 
  for (int i = 1; i <= 100; i++) { 
   Thread.sleep(500); 
   producer.send(new KeyedMessage<String, String>(Config.TOPIC, 
     "this number ===>>> " + i)); 
  } 
 
 } 
} 

三、編寫消息者類ConsumerDemo

這個類的主要作用就是消費Kafka中wordcount話題的消息。

package com.lya.kafka; 
 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
 
import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.message.MessageAndMetadata; 
 
/** 
 * 消費者實例 
 * @author liuyazhuang 
 * 
 */ 
public class ConsumerDemo { 
  
 
 public static void main(String[] args) { 
   
  Properties props = new Properties(); 
  props.put("zookeeper.connect", "192.168.209.121:2181"); 
  props.put("group.id", "1111"); 
  props.put("auto.offset.reset", "smallest"); 
  props.put("zk.connectiontimeout.ms", "15000"); 
 
  ConsumerConfig config = new ConsumerConfig(props); 
  ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config); 
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
  topicCountMap.put(Config.TOPIC, Config.THREADS); 
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(Config.TOPIC); 
   
  for(final KafkaStream<byte[], byte[]> kafkaStream : streams){ 
   new Thread(new Runnable() { 
    @Override 
    public void run() { 
     for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){ 
      String msg = new String(mm.message()); 
      System.out.println(msg); 
     } 
    } 
    
   }).start(); 
   
  } 
 } 
} 

四、運行實例

首先,運行消費者類ConsumerDemo
運行結(jié)果如下:

沒有打印任何信息。
此時,我們運行生產(chǎn)者類ProducerDemo
我們再次打開消費者的控制臺查看如下:

打印出了生產(chǎn)者生產(chǎn)的消息。
至此,Kafka簡單客戶端編程實例結(jié)束。

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Java棧的應(yīng)用之括號匹配算法實例分析

    Java棧的應(yīng)用之括號匹配算法實例分析

    這篇文章主要介紹了Java棧的應(yīng)用之括號匹配算法,結(jié)合實例形式分析了Java使用棧實現(xiàn)括號匹配算法的相關(guān)原理、操作技巧與注意事項,需要的朋友可以參考下
    2020-03-03
  • Java注解處理器學習之編譯時處理的注解詳析

    Java注解處理器學習之編譯時處理的注解詳析

    編譯時注解相信對每一個java開發(fā)者來說都不陌生,下面這篇文章主要給大家介紹了關(guān)于Java注解處理器學習之編譯時處理的注解的相關(guān)資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考借鑒,下面來一起看看吧
    2018-05-05
  • Java 高并發(fā)六:JDK并發(fā)包2詳解

    Java 高并發(fā)六:JDK并發(fā)包2詳解

    本文主要介紹Java高并發(fā)這里整理了詳細資料,并講解了 1. 線程池的基本使用 2. 擴展和增強線程池 3. ForkJoin的知識,有興趣的小伙伴可以參考下
    2016-09-09
  • SpringBoot應(yīng)用程序轉(zhuǎn)換成WAR文件詳解

    SpringBoot應(yīng)用程序轉(zhuǎn)換成WAR文件詳解

    其實一般使用SpringBoot使用打成jar包比較省事的,但也有很多童鞋是習慣使用WAR包的,下面這篇文章主要給大家介紹了關(guān)于SpringBoot轉(zhuǎn)換WAR的相關(guān)資料,需要的朋友可以參考下
    2022-11-11
  • Java高效利用異常處理的技巧總結(jié)

    Java高效利用異常處理的技巧總結(jié)

    這篇文章主要為大家詳細介紹了Java如何高效利用異常處理,從而達到優(yōu)化代碼的效果,文中的示例代碼講解詳細,感興趣的小伙伴可以學習一下
    2023-09-09
  • Spring?多數(shù)據(jù)源方法級別注解實現(xiàn)過程

    Spring?多數(shù)據(jù)源方法級別注解實現(xiàn)過程

    多數(shù)據(jù)源管理是Spring框架中非常重要的一部分,它可以提高應(yīng)用程序的靈活性和可靠性,從而更好地滿足業(yè)務(wù)需求,這篇文章主要介紹了Spring?多數(shù)據(jù)源方法級別注解實現(xiàn),需要的朋友可以參考下
    2023-07-07
  • java SelectableChannel的使實例用法講解

    java SelectableChannel的使實例用法講解

    在本篇文章里小編給大家整理的是一篇關(guān)于java SelectableChannel的使實例用法講解內(nèi)容,有興趣的朋友們可以學習下。
    2021-03-03
  • SpringBoot內(nèi)嵌tomcat處理有特殊字符轉(zhuǎn)義的問題

    SpringBoot內(nèi)嵌tomcat處理有特殊字符轉(zhuǎn)義的問題

    這篇文章主要介紹了SpringBoot內(nèi)嵌tomcat處理有特殊字符轉(zhuǎn)義的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-06-06
  • jar包運行一段時間后莫名其妙掛掉線上問題及處理方案

    jar包運行一段時間后莫名其妙掛掉線上問題及處理方案

    這篇文章主要介紹了jar包運行一段時間后莫名其妙掛掉線上問題及處理方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • 使用java實現(xiàn)日志工具類分享

    使用java實現(xiàn)日志工具類分享

    這篇文章主要介紹的Java代碼工具類是用于書寫日志信息到指定的文件,并且具有刪除之前日志文件的功能,需要的朋友可以參考下
    2014-03-03

最新評論