Kafka producer端開發(fā)代碼實例
一、producer工作流程
producer使用用戶啟動producer的線程,將待發(fā)送的消息封裝到一個ProducerRecord類實例,然后將其序列化之后發(fā)送給partitioner,再由后者確定目標分區(qū)后一同發(fā)送到位于producer程序中的一塊內(nèi)存緩沖區(qū)中。而producer的另外一個線程(Sender線程)則負責實時從該緩沖區(qū)中提取出準備就緒的消息封裝進一個批次(batch),統(tǒng)一發(fā)送給對應的broker,具體流程如下圖:

二、producer示例程序開發(fā)
首先引入kafka相關依賴,在pom.xml文件中加入如下依賴:
<!--kafka--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.2.0</version> </dependency>
在resources下面創(chuàng)建kafka-producer.properties配置文件,用于設置kafka參數(shù),內(nèi)容如下:
bootstrap.servers=192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer acks=-1 retries=3 batch.size=323840 linger.ms=10 buffer.memory=33554432 max.block.ms=3000
其中,前三個參數(shù)必須明確指定,因為這三個參數(shù)沒有默認值(注:kafka的producer參數(shù)配置可以參考http://kafka.apache.org/documentation/),然后編寫producer發(fā)送消息的代碼:
/**
* Kafka發(fā)送消息測試
* @throws IOException
*/
public void sendMsg() throws IOException {
//1.構(gòu)造properties對象
Properties properties = new Properties();
FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties");
properties.load(fileInputStream);
fileInputStream.close();
//2.構(gòu)造kafkaProducer對象
KafkaProducer producer = new KafkaProducer(properties);
for (int i = 0; i < 100; i++) {
//3.構(gòu)造待發(fā)送消息的producerRecord對象,并指定消息要發(fā)送到哪個topic,消息的key和value
ProducerRecord testTopic = new ProducerRecord("testTopic", Integer.toString(i), Integer.toString(i));
//4.調(diào)用kafkaProducer對象的send方法發(fā)送消息
producer.send(testTopic);
}
//5.關閉kafkaProducer
producer.close();
}
然后登陸kafka所在服務器,執(zhí)行以下命令監(jiān)聽消息:
cd /usr/local/kafka/bin
./kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --topic testTopic --from-beginning
運行sendMsg方法,注意觀察消費端,

可以看到有0-99之間的數(shù)字依次被消費到,說明消息發(fā)送成功。
三、異步和同步發(fā)送消息
上面發(fā)送消息的示例程序中,沒有對發(fā)送結(jié)果進行處理,如果消息發(fā)送失敗我們也是無法得知的,這種方法在實際應用中是不推薦的。在實際使用場景中,一般使用異步和同步兩種常見發(fā)送方式。Java版本producer的send方法會返回一個Future對象,如果調(diào)用Future.get()方法就會無限等待返回結(jié)果,實現(xiàn)同步發(fā)送的效果,否則就是異步發(fā)送。
1.異步發(fā)送消息
Java版本producer的send()方法提供了回調(diào)類參數(shù)來實現(xiàn)異步發(fā)送以及對發(fā)送結(jié)果進行的響應,具體代碼如下:
/**
* 異步發(fā)送消息
*
* @throws IOException
*/
public void sendMsg() throws IOException {
//1.構(gòu)造properties對象
Properties properties = new Properties();
FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties");
properties.load(fileInputStream);
fileInputStream.close();
//2.構(gòu)造kafkaProducer對象
KafkaProducer producer = new KafkaProducer(properties);
for (int i = 0; i < 100; i++) {
//3.構(gòu)造待發(fā)送消息的producerRecord對象,并指定消息要發(fā)送到哪個topic,消息的key和value
ProducerRecord testTopic = new ProducerRecord("testTopic", Integer.toString(i), Integer.toString(i));
//4.調(diào)用kafkaProducer對象的send方法發(fā)送消息,傳入Callback回調(diào)參數(shù)
producer.send(testTopic, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (null == exception) {
//消息發(fā)送成功后的處理
System.out.println("消息發(fā)送成功");
} else {
//消息發(fā)送失敗后的處理
System.out.println("消息發(fā)送失敗");
}
}
});
}
//5.關閉kafkaProducer
producer.close();
}
以上代碼中,send方法第二個參數(shù)傳入一個匿名內(nèi)部類對象,也可以傳入實現(xiàn)了org.apache.kafka.clients.producer.Callback接口的類對象。同時onCompletion方法的兩個入?yún)ecordMetadata和exception不會同時為空,當消息發(fā)送成功后,exception為null,消息發(fā)送失敗后recordMetadata為null。因此可以按照兩個入?yún)⑦M行成功和失敗邏輯的處理。
其次,Kafka發(fā)送消息失敗的類型包含兩類,可重試異常和不可重試異常。所有的可重試異常都繼承自org.apache.kafka.common.errors.RetriableException抽象類,理論上所有沒有繼承RetriableException 類的其他異常都屬于不可重試異常,鑒于此,可以在消息發(fā)送失敗后,按照是否可以重試,來進行不同的處理邏輯處理:
//4.調(diào)用kafkaProducer對象的send方法發(fā)送消息
producer.send(testTopic, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (null == exception) {
//消息發(fā)送成功后的處理
System.out.println("消息發(fā)送成功");
} else {
if(exception instanceof RetriableException){
// 可重試異常
System.out.println("可重試異常");
}else{
// 不可重試異常
System.out.println("不可重試異常");
}
}
}
});
2.同步發(fā)送消息
同步發(fā)送和異步發(fā)送是通過Java的Futrue來區(qū)分的,調(diào)用Future.get()無限等待結(jié)果返回,即實現(xiàn)了同步發(fā)送的結(jié)果,具體代碼如下:
// 發(fā)送消息
Future future = producer.send(testTopic);
try {
// 調(diào)用get方法等待結(jié)果返回,發(fā)送失敗則會拋出異常
future.get();
} catch (Exception e) {
System.out.println("消息發(fā)送失敗");
}
四、其他高級特性
1.消息分區(qū)機制
kafka producer提供了分區(qū)策略以及分區(qū)器(partitioner)用于確定將消息發(fā)送到指定topic的哪個分區(qū)中。默認分區(qū)器根據(jù)murmur2算法計算消息key的哈希值,然后對總分區(qū)數(shù)求模確認消息要被發(fā)送的目標分區(qū)號(這點讓我想起了redis集群中key值的分配方法),這樣就確保了相同key的消息被發(fā)送到相同分區(qū)。若消息沒有key值,將采用輪詢的方式確保消息在topic的所有分區(qū)上均勻分配。
除了使用kafka默認的分區(qū)機制,也可以通過實現(xiàn)org.apache.kafka.clients.producer.Partitioner接口來自定義分區(qū)器,此時需要在構(gòu)造KafkaProducer的 properties中增加partitioner.class來指明分區(qū)器實現(xiàn)類,如:partitioner.class=com.demo.service.CustomerPartitioner。
2.消息序列化
在本篇開始的producer示例程序中,在構(gòu)造KafkaProducer對象的時候,有兩個配置項
- key.serializer=org.apache.kafka.common.serialization.StringSerializer
- value.serializer=org.apache.kafka.common.serialization.StringSerializer
分別用于配置消息key和value的序列化方式為String類型,除此之外,Kafka中還提供了如下默認的序列化器:
ByteArraySerializer:本質(zhì)上什么也不做,因為網(wǎng)絡中傳輸就是以字節(jié)傳輸?shù)模?/p>
ByteBufferSerializer:序列化ByteBuffer消息;
BytesSerializer:序列化kafka自定義的Bytes類型;
IntegerSerializer:序列化Integer類型;
DoubleSerializer:序列化Double類型;
LongSerializer:序列化Long類型;
如果要自定義序列化器,則需要實現(xiàn)org.apache.kafka.common.serialization.Serializer接口,并且將key.serializer和value.serializer配置為自定義的序列化器。
3.消息壓縮
消息壓縮可以顯著降低磁盤占用以及帶寬占用,從而有效提升I/O密集型應用性能,但是引入壓縮同時會消耗額外的CPU,因此壓縮是I/O性能和CPU資源的平衡。kafka目前支持3種壓縮算法:CZIP,Snappy和LZ4,性能測試結(jié)果顯示三種壓縮算法的性能如下:LZ4>>Snappy>GZIP,目前啟用LZ4進行消息壓縮的producer的吞吐量是最高的。
默認情況下Kafka是不壓縮消息的,但是可以通過在創(chuàng)建KafkaProducer 對象的時候設置producer端參數(shù)compression.type來開啟消息壓縮,如配置compression.type=LZ4。那么什么時候開啟壓縮呢?首先判斷是否啟用壓縮的依據(jù)是I/O資源消耗與CPU資源消耗的對比,如果環(huán)境上I/O資源非常緊張,比如producer程序占用了大量的網(wǎng)絡帶寬或broker端的磁盤占用率很高,而producer端的CPU資源非常富裕,那么就可以考慮為producer開啟壓縮。
4.無消息丟失配置
在使用KafkaProducer.send()方法發(fā)送消息的時候,其實是把消息放入緩沖區(qū)中,再由一個專屬I/O線程負責從緩沖區(qū)提取消息并封裝消息到batch中,然后再發(fā)送出去。如果在I/O線程將消息發(fā)送出去之前,producer奔潰了,那么所有的消息都將丟失。同時,存在多消息發(fā)送時候由于網(wǎng)絡抖動導致消息亂序的問題,為了解決這兩個問題,可以通過在producer端以及broker端進行配置進行避免。
4.1 producer端配置
max.block.ms=3000:設置block的時長,當緩沖區(qū)被填滿或者metadata丟失時產(chǎn)生block,停止接收新的消息;
acks=all:等待所有follower都響應了發(fā)送消息認為消息發(fā)送成功;
retries=Integer.MAX_VALUE:設置重試次數(shù),設置一個比較大的值可以保證消息不丟失;
max.in.flight.requests.per.connection=1:限制producer在單個broker連接上能夠發(fā)送的未響應請求的數(shù)量,從而防止同topic統(tǒng)一分區(qū)下消息亂序問題;
除了設置以上參數(shù)之外,在發(fā)送消息的時候,應該盡量使用帶有回調(diào)參數(shù)的send方法來處理發(fā)送結(jié)果,如果數(shù)據(jù)發(fā)送失敗,則顯示調(diào)用KafkaProducer.close(0)方法來立即關閉producer,防止消息亂序。
4.2 broker端配置
unclean.leader.election.enable=false:關閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader;
replication.factor>=3:至少使用3個副本保存數(shù)據(jù);
min.issync.replicas>1:控制某條消息至少被寫入到ISR中多少個副本才算成功,當且僅當producer端acks參數(shù)設置為all或者-1時,該參數(shù)才有效。
最后,確保replication.factor>min.issync.replicas,如果兩者相等,那么只要有一個副本掛掉,分區(qū)就無法工作,推薦配置replication.factor=min.issync.replicas+1。
關于producer端的開發(fā)就介紹到這兒,下一篇將介紹consumer端的開發(fā)。
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
JavaSE面試題之this與super關鍵字的區(qū)別詳解
this關鍵字用于引用當前對象的引用,super關鍵字用于引用父類對象的引用,下面這篇文章主要給大家介紹了關于JavaSE面試題之this與super關鍵字區(qū)別的相關資料,需要的朋友可以參考下2023-12-12
spring cloud gateway如何獲取請求的真實地址
這篇文章主要介紹了spring cloud gateway如何獲取請求的真實地址問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-05-05
springboot logback調(diào)整mybatis日志級別無效的解決
這篇文章主要介紹了springboot logback調(diào)整mybatis日志級別無效的解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-10-10
Java文件處理之使用XWPFDocument導出Word文檔
最近因項目開發(fā)的需要,整理了一份用JAVA導出WORD文檔,下面這篇文章主要給大家介紹了關于Java文件處理之使用XWPFDocument導出Word文檔的相關資料,需要的朋友可以參考下2023-12-12
使用SpringBoot與Thrift實現(xiàn)RPC通信的方式詳解
在微服務架構(gòu)的世界里,服務間的通信機制選擇成為了關鍵決策之一,RPC因其簡潔、高效的特點備受青睞,本文將詳細探討如何利用Spring?Boot和Thrift框架構(gòu)建RPC通信,讓讀者理解其內(nèi)在原理及實現(xiàn)方式,需要的朋友可以參考下2023-10-10

