Kafka Java Producer代碼實(shí)例詳解
根據(jù)業(yè)務(wù)需要可以使用Kafka提供的Java Producer API進(jìn)行產(chǎn)生數(shù)據(jù),并將產(chǎn)生的數(shù)據(jù)發(fā)送到Kafka對(duì)應(yīng)Topic的對(duì)應(yīng)分區(qū)中,入口類為:Producer
Kafka的Producer API主要提供下列三個(gè)方法:
- public void send(KeyedMessage<K,V> message) 發(fā)送單條數(shù)據(jù)到Kafka集群
- public void send(List<KeyedMessage<K,V>> messages) 發(fā)送多條數(shù)據(jù)(數(shù)據(jù)集)到Kafka集群
- public void close() 關(guān)閉Kafka連接資源
一、JavaKafkaProducerPartitioner:自定義的數(shù)據(jù)分區(qū)器,功能是:決定輸入的key/value鍵值對(duì)的message發(fā)送到Topic的那個(gè)分區(qū)中,返回分區(qū)id,范圍:[0,分區(qū)數(shù)量); 這里的實(shí)現(xiàn)比較簡(jiǎn)單,根據(jù)key中的數(shù)字決定分區(qū)的值。具體代碼如下:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
/**
* Created by gerry on 12/21.
*/
public class JavaKafkaProducerPartitioner implements Partitioner {
/**
* 無(wú)參構(gòu)造函數(shù)
*/
public JavaKafkaProducerPartitioner() {
this(new VerifiableProperties());
}
/**
* 構(gòu)造函數(shù),必須給定
*
* @param properties 上下文
*/
public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
// nothings
}
@Override
public int partition(Object key, int numPartitions) {
int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());
return num % numPartitions;
}
}
二、 JavaKafkaProducer:通過(guò)Kafka提供的API進(jìn)行數(shù)據(jù)產(chǎn)生操作的測(cè)試類;具體代碼如下:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.log4j.Logger;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ThreadLocalRandom;
/**
* Created by gerry on 12/21.
*/
public class JavaKafkaProducer {
private Logger logger = Logger.getLogger(JavaKafkaProducer.class);
public static final String TOPIC_NAME = "test";
public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
public static final int chartsLength = charts.length;
public static void main(String[] args) {
String brokerList = "192.168.187.149:9092";
brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";
brokerList = "192.168.187.146:9092";
Properties props = new Properties();
props.put("metadata.broker.list", brokerList);
/**
* 0表示不等待結(jié)果返回<br/>
* 1表示等待至少有一個(gè)服務(wù)器返回?cái)?shù)據(jù)接收標(biāo)識(shí)<br/>
* -1表示必須接收到所有的服務(wù)器返回標(biāo)識(shí),及同步寫入<br/>
* */
props.put("request.required.acks", "0");
/**
* 內(nèi)部發(fā)送數(shù)據(jù)是異步還是同步
* sync:同步, 默認(rèn)
* async:異步
*/
props.put("producer.type", "async");
/**
* 設(shè)置序列化的類
* 可選:kafka.serializer.StringEncoder
* 默認(rèn):kafka.serializer.DefaultEncoder
*/
props.put("serializer.class", "kafka.serializer.StringEncoder");
/**
* 設(shè)置分區(qū)類
* 根據(jù)key進(jìn)行數(shù)據(jù)分區(qū)
* 默認(rèn)是:kafka.producer.DefaultPartitioner ==> 安裝key的hash進(jìn)行分區(qū)
* 可選:kafka.serializer.ByteArrayPartitioner ==> 轉(zhuǎn)換為字節(jié)數(shù)組后進(jìn)行hash分區(qū)
*/
props.put("partitioner.class", "JavaKafkaProducerPartitioner");
// 重試次數(shù)
props.put("message.send.max.retries", "3");
// 異步提交的時(shí)候(async),并發(fā)提交的記錄數(shù)
props.put("batch.num.messages", "200");
// 設(shè)置緩沖區(qū)大小,默認(rèn)10KB
props.put("send.buffer.bytes", "102400");
// 2. 構(gòu)建Kafka Producer Configuration上下文
ProducerConfig config = new ProducerConfig(props);
// 3. 構(gòu)建Producer對(duì)象
final Producer<String, String> producer = new Producer<String, String>(config);
// 4. 發(fā)送數(shù)據(jù)到服務(wù)器,并發(fā)線程發(fā)送
final AtomicBoolean flag = new AtomicBoolean(true);
int numThreads = 50;
ExecutorService pool = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < 5; i++) {
pool.submit(new Thread(new Runnable() {
@Override
public void run() {
while (flag.get()) {
// 發(fā)送數(shù)據(jù)
KeyedMessage message = generateKeyedMessage();
producer.send(message);
System.out.println("發(fā)送數(shù)據(jù):" + message);
// 休眠一下
try {
int least = 10;
int bound = 100;
Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " shutdown....");
}
}, "Thread-" + i));
}
// 5. 等待執(zhí)行完成
long sleepMillis = 600000;
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag.set(false);
// 6. 關(guān)閉資源
pool.shutdown();
try {
pool.awaitTermination(6, TimeUnit.SECONDS);
} catch (InterruptedException e) {
} finally {
producer.close(); // 最后之后調(diào)用
}
}
/**
* 產(chǎn)生一個(gè)消息
*
* @return
*/
private static KeyedMessage<String, String> generateKeyedMessage() {
String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);
StringBuilder sb = new StringBuilder();
int num = ThreadLocalRandom.current().nextInt(1, 5);
for (int i = 0; i < num; i++) {
sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");
}
String message = sb.toString().trim();
return new KeyedMessage(TOPIC_NAME, key, message);
}
/**
* 產(chǎn)生一個(gè)給定長(zhǎng)度的字符串
*
* @param numItems
* @return
*/
private static String generateStringMessage(int numItems) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numItems; i++) {
sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);
}
return sb.toString();
}
}
三、Pom.xml依賴配置如下
<properties>
<kafka.version>0.8.2.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
java數(shù)組復(fù)制的四種方法效率對(duì)比
這篇文章主要介紹了java數(shù)組復(fù)制的四種方法效率對(duì)比,文中有簡(jiǎn)單的代碼示例,以及效率的比較結(jié)果,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11
Spark Streaming編程初級(jí)實(shí)踐詳解
這篇文章主要為大家介紹了Spark Streaming編程初級(jí)實(shí)踐詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04
SpringBoot項(xiàng)目集成Flyway進(jìn)行數(shù)據(jù)庫(kù)版本控制的詳細(xì)教程
這篇文章主要介紹了SpringBoot項(xiàng)目集成Flyway進(jìn)行數(shù)據(jù)庫(kù)版本控制,本文分步驟通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07
Java Base64算法實(shí)際應(yīng)用之郵件發(fā)送實(shí)例分析
這篇文章主要介紹了Java Base64算法實(shí)際應(yīng)用之郵件發(fā)送,結(jié)合實(shí)例形式分析了java字符編碼與郵件發(fā)送相關(guān)操作技巧,需要的朋友可以參考下2019-09-09
Spring Bean生命周期之Bean元信息的配置與解析階段詳解
這篇文章主要為大家詳細(xì)介紹了Spring Bean生命周期之Bean元信息的配置與解析階段,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2022-03-03

