Kafka簡單客戶端編程實例
今天,我們給大家?guī)硪黄绾卫肒afka的API進行客戶端編程的文章,這篇文章很簡單,就是利用Kafka的API創(chuàng)建一個生產(chǎn)者和消費者,生產(chǎn)者不斷向Kafka寫入消息,消費者則不斷消費Kafka的消息。下面是具體的實例代碼。
一、創(chuàng)建配置類Config
這個類很簡單,只是存放了兩個常量,一個是話題TOPIC,一個是線程數(shù)THREADS
package com.lya.kafka; /** * 配置項 * @author liuyazhuang * */ public class Config { /** * 話題 */ public static final String TOPIC = "wordcount"; /** * 線程數(shù) */ public static final Integer THREADS = 1; }
二、編程生產(chǎn)者類ProducerDemo
這個類的主要作用就是向Kafka寫入相應(yīng)的消息,并且將消息寫入wordcount話題。
package com.lya.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 生產(chǎn)者實例 * @author liuyazhuang * */ public class ProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("zk.connect", "192.168.209.121:2181"); props.put("metadata.broker.list","192.168.209.121:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("zk.connectiontimeout.ms", "15000"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); // 發(fā)送業(yè)務(wù)消息 // 讀取文件 讀取內(nèi)存數(shù)據(jù)庫 讀socket端口 for (int i = 1; i <= 100; i++) { Thread.sleep(500); producer.send(new KeyedMessage<String, String>(Config.TOPIC, "this number ===>>> " + i)); } } }
三、編寫消息者類ConsumerDemo
這個類的主要作用就是消費Kafka中wordcount話題的消息。
package com.lya.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; /** * 消費者實例 * @author liuyazhuang * */ public class ConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("zookeeper.connect", "192.168.209.121:2181"); props.put("group.id", "1111"); props.put("auto.offset.reset", "smallest"); props.put("zk.connectiontimeout.ms", "15000"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(Config.TOPIC, Config.THREADS); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(Config.TOPIC); for(final KafkaStream<byte[], byte[]> kafkaStream : streams){ new Thread(new Runnable() { @Override public void run() { for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){ String msg = new String(mm.message()); System.out.println(msg); } } }).start(); } } }
四、運行實例
首先,運行消費者類ConsumerDemo
運行結(jié)果如下:
沒有打印任何信息。
此時,我們運行生產(chǎn)者類ProducerDemo
我們再次打開消費者的控制臺查看如下:
打印出了生產(chǎn)者生產(chǎn)的消息。
至此,Kafka簡單客戶端編程實例結(jié)束。
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot應(yīng)用程序轉(zhuǎn)換成WAR文件詳解
其實一般使用SpringBoot使用打成jar包比較省事的,但也有很多童鞋是習慣使用WAR包的,下面這篇文章主要給大家介紹了關(guān)于SpringBoot轉(zhuǎn)換WAR的相關(guān)資料,需要的朋友可以參考下2022-11-11Spring?多數(shù)據(jù)源方法級別注解實現(xiàn)過程
多數(shù)據(jù)源管理是Spring框架中非常重要的一部分,它可以提高應(yīng)用程序的靈活性和可靠性,從而更好地滿足業(yè)務(wù)需求,這篇文章主要介紹了Spring?多數(shù)據(jù)源方法級別注解實現(xiàn),需要的朋友可以參考下2023-07-07java SelectableChannel的使實例用法講解
在本篇文章里小編給大家整理的是一篇關(guān)于java SelectableChannel的使實例用法講解內(nèi)容,有興趣的朋友們可以學習下。2021-03-03SpringBoot內(nèi)嵌tomcat處理有特殊字符轉(zhuǎn)義的問題
這篇文章主要介紹了SpringBoot內(nèi)嵌tomcat處理有特殊字符轉(zhuǎn)義的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-06-06