Kafka Java Producer代碼實例詳解
根據(jù)業(yè)務需要可以使用Kafka提供的Java Producer API進行產(chǎn)生數(shù)據(jù),并將產(chǎn)生的數(shù)據(jù)發(fā)送到Kafka對應Topic的對應分區(qū)中,入口類為:Producer
Kafka的Producer API主要提供下列三個方法:
- public void send(KeyedMessage<K,V> message) 發(fā)送單條數(shù)據(jù)到Kafka集群
- public void send(List<KeyedMessage<K,V>> messages) 發(fā)送多條數(shù)據(jù)(數(shù)據(jù)集)到Kafka集群
- public void close() 關(guān)閉Kafka連接資源
一、JavaKafkaProducerPartitioner:自定義的數(shù)據(jù)分區(qū)器,功能是:決定輸入的key/value鍵值對的message發(fā)送到Topic的那個分區(qū)中,返回分區(qū)id,范圍:[0,分區(qū)數(shù)量); 這里的實現(xiàn)比較簡單,根據(jù)key中的數(shù)字決定分區(qū)的值。具體代碼如下:
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; /** * Created by gerry on 12/21. */ public class JavaKafkaProducerPartitioner implements Partitioner { /** * 無參構(gòu)造函數(shù) */ public JavaKafkaProducerPartitioner() { this(new VerifiableProperties()); } /** * 構(gòu)造函數(shù),必須給定 * * @param properties 上下文 */ public JavaKafkaProducerPartitioner(VerifiableProperties properties) { // nothings } @Override public int partition(Object key, int numPartitions) { int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim()); return num % numPartitions; } }
二、 JavaKafkaProducer:通過Kafka提供的API進行數(shù)據(jù)產(chǎn)生操作的測試類;具體代碼如下:
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.log4j.Logger; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ThreadLocalRandom; /** * Created by gerry on 12/21. */ public class JavaKafkaProducer { private Logger logger = Logger.getLogger(JavaKafkaProducer.class); public static final String TOPIC_NAME = "test"; public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray(); public static final int chartsLength = charts.length; public static void main(String[] args) { String brokerList = "192.168.187.149:9092"; brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095"; brokerList = "192.168.187.146:9092"; Properties props = new Properties(); props.put("metadata.broker.list", brokerList); /** * 0表示不等待結(jié)果返回<br/> * 1表示等待至少有一個服務器返回數(shù)據(jù)接收標識<br/> * -1表示必須接收到所有的服務器返回標識,及同步寫入<br/> * */ props.put("request.required.acks", "0"); /** * 內(nèi)部發(fā)送數(shù)據(jù)是異步還是同步 * sync:同步, 默認 * async:異步 */ props.put("producer.type", "async"); /** * 設置序列化的類 * 可選:kafka.serializer.StringEncoder * 默認:kafka.serializer.DefaultEncoder */ props.put("serializer.class", "kafka.serializer.StringEncoder"); /** * 設置分區(qū)類 * 根據(jù)key進行數(shù)據(jù)分區(qū) * 默認是:kafka.producer.DefaultPartitioner ==> 安裝key的hash進行分區(qū) * 可選:kafka.serializer.ByteArrayPartitioner ==> 轉(zhuǎn)換為字節(jié)數(shù)組后進行hash分區(qū) */ props.put("partitioner.class", "JavaKafkaProducerPartitioner"); // 重試次數(shù) props.put("message.send.max.retries", "3"); // 異步提交的時候(async),并發(fā)提交的記錄數(shù) props.put("batch.num.messages", "200"); // 設置緩沖區(qū)大小,默認10KB props.put("send.buffer.bytes", "102400"); // 2. 構(gòu)建Kafka Producer Configuration上下文 ProducerConfig config = new ProducerConfig(props); // 3. 構(gòu)建Producer對象 final Producer<String, String> producer = new Producer<String, String>(config); // 4. 發(fā)送數(shù)據(jù)到服務器,并發(fā)線程發(fā)送 final AtomicBoolean flag = new AtomicBoolean(true); int numThreads = 50; ExecutorService pool = Executors.newFixedThreadPool(numThreads); for (int i = 0; i < 5; i++) { pool.submit(new Thread(new Runnable() { @Override public void run() { while (flag.get()) { // 發(fā)送數(shù)據(jù) KeyedMessage message = generateKeyedMessage(); producer.send(message); System.out.println("發(fā)送數(shù)據(jù):" + message); // 休眠一下 try { int least = 10; int bound = 100; Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound)); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " shutdown...."); } }, "Thread-" + i)); } // 5. 等待執(zhí)行完成 long sleepMillis = 600000; try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } flag.set(false); // 6. 關(guān)閉資源 pool.shutdown(); try { pool.awaitTermination(6, TimeUnit.SECONDS); } catch (InterruptedException e) { } finally { producer.close(); // 最后之后調(diào)用 } } /** * 產(chǎn)生一個消息 * * @return */ private static KeyedMessage<String, String> generateKeyedMessage() { String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99); StringBuilder sb = new StringBuilder(); int num = ThreadLocalRandom.current().nextInt(1, 5); for (int i = 0; i < num; i++) { sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" "); } String message = sb.toString().trim(); return new KeyedMessage(TOPIC_NAME, key, message); } /** * 產(chǎn)生一個給定長度的字符串 * * @param numItems * @return */ private static String generateStringMessage(int numItems) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < numItems; i++) { sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]); } return sb.toString(); } }
三、Pom.xml依賴配置如下
<properties> <kafka.version>0.8.2.1</kafka.version> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>${kafka.version}</version> </dependency> </dependencies>
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot項目集成Flyway進行數(shù)據(jù)庫版本控制的詳細教程
這篇文章主要介紹了SpringBoot項目集成Flyway進行數(shù)據(jù)庫版本控制,本文分步驟通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07Java Base64算法實際應用之郵件發(fā)送實例分析
這篇文章主要介紹了Java Base64算法實際應用之郵件發(fā)送,結(jié)合實例形式分析了java字符編碼與郵件發(fā)送相關(guān)操作技巧,需要的朋友可以參考下2019-09-09Spring Bean生命周期之Bean元信息的配置與解析階段詳解
這篇文章主要為大家詳細介紹了Spring Bean生命周期之Bean元信息的配置與解析階段,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-03-03