Java kafka如何實(shí)現(xiàn)自定義分區(qū)類和攔截器
生產(chǎn)者發(fā)送到對(duì)應(yīng)的分區(qū)有以下幾種方式:
(1)指定了patition,則直接使用;(可以查閱對(duì)應(yīng)的java api, 有多種參數(shù))
(2)未指定patition但指定key,通過(guò)對(duì)key的value進(jìn)行hash出一個(gè)patition;
(3)patition和key都未指定,使用輪詢選出一個(gè)patition。
但是kafka提供了,自定義分區(qū)算法的功能,由業(yè)務(wù)手動(dòng)實(shí)現(xiàn)分布:
1、實(shí)現(xiàn)一個(gè)自定義分區(qū)類,CustomPartitioner實(shí)現(xiàn)Partitioner
import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class CustomPartitioner implements Partitioner { /** * * @param topic 當(dāng)前的發(fā)送的topic * @param key 當(dāng)前的key值 * @param keyBytes 當(dāng)前的key的字節(jié)數(shù)組 * @param value 當(dāng)前的value值 * @param valueBytes 當(dāng)前的value的字節(jié)數(shù)組 * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //這邊根據(jù)返回值就是分區(qū)號(hào), 這邊就是固定發(fā)送到三號(hào)分區(qū) return 3; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
2、producer配置文件指定,具體的分區(qū)類
// 具體的分區(qū)類
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
技巧:可以使用ProducerConfig中提供的配置ProducerConfig
kafka producer攔截器
攔截器(interceptor)是在Kafka 0.10版本被引入的。
interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機(jī)會(huì)對(duì)消息做一些定制化需求,比如修改消息等。
許用戶指定多個(gè)interceptor按序作用于同一條消息從而形成一個(gè)攔截鏈(interceptor chain)。
所使用的類為:
org.apache.kafka.clients.producer.ProducerInterceptor
我們可以編碼測(cè)試下:
1、定義消息攔截器,實(shí)現(xiàn)消息處理(可以是加時(shí)間戳等等,unid等等。)
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; import java.util.UUID; public class MessageInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { System.out.println("這是MessageInterceptor的configure方法"); } /** * 這個(gè)是消息發(fā)送之前進(jìn)行處理 * * @param record * @return */ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 創(chuàng)建一個(gè)新的record,把uuid入消息體的最前部 System.out.println("為消息添加uuid"); return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), UUID.randomUUID().toString().replace("-", "") + "," + record.value()); } /** * 這個(gè)是生產(chǎn)者回調(diào)函數(shù)調(diào)用之前處理 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println("MessageInterceptor攔截器的onAcknowledgement方法"); } @Override public void close() { System.out.println("MessageInterceptor close 方法"); } }
2、定義計(jì)數(shù)攔截器
import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class CounterInterceptor implements ProducerInterceptor<String, String>{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { System.out.println("這是CounterInterceptor的configure方法"); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { System.out.println("CounterInterceptor計(jì)數(shù)過(guò)濾器不對(duì)消息做任何操作"); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 統(tǒng)計(jì)成功和失敗的次數(shù) System.out.println("CounterInterceptor過(guò)濾器執(zhí)行統(tǒng)計(jì)失敗和成功數(shù)量"); if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存結(jié)果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } }
3、producer客戶端:
import org.apache.kafka.clients.producer.*; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class Producer1 { public static void main(String[] args) throws Exception { Properties props = new Properties(); // Kafka服務(wù)端的主機(jī)名和端口號(hào) props.put("bootstrap.servers", "localhost:9092"); // 等待所有副本節(jié)點(diǎn)的應(yīng)答 props.put("acks", "all"); // 消息發(fā)送最大嘗試次數(shù) props.put("retries", 0); // 一批消息處理大小 props.put("batch.size", 16384); // 請(qǐng)求延時(shí),可能生產(chǎn)數(shù)據(jù)太快了 props.put("linger.ms", 1); // 發(fā)送緩存區(qū)內(nèi)存大小,數(shù)據(jù)是先放到生產(chǎn)者的緩沖區(qū) props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 具體的分區(qū)類 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner"); //定義攔截器 List<String> interceptors = new ArrayList<>(); interceptors.add("kafka.MessageInterceptor"); interceptors.add("kafka.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1; i++) { producer.send(new ProducerRecord<String, String>("test_0515", i + "", "xxx-" + i), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println("這是producer回調(diào)函數(shù)"); } }); } /*System.out.println("現(xiàn)在執(zhí)行關(guān)閉producer"); producer.close();*/ producer.close(); } }
總結(jié),我們可以知道攔截器鏈各個(gè)方法的執(zhí)行順序,假如有A、B攔截器,在一個(gè)攔截器鏈中:
(1)執(zhí)行A的configure方法,執(zhí)行B的configure方法
(2)執(zhí)行A的onSend方法,B的onSend方法
(3)生產(chǎn)者發(fā)送完畢后,執(zhí)行A的onAcknowledgement方法,B的onAcknowledgement方法。
(4)執(zhí)行producer自身的callback回調(diào)函數(shù)。
(5)執(zhí)行A的close方法,B的close方法。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
java web實(shí)現(xiàn)網(wǎng)上手機(jī)銷售系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了java web實(shí)現(xiàn)網(wǎng)上手機(jī)銷售系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08Spring Boot Admin管理監(jiān)控?cái)?shù)據(jù)的方法
本篇文章主要介紹了Spring Boot Admin管理監(jiān)控?cái)?shù)據(jù)的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-12-12Java實(shí)現(xiàn)簡(jiǎn)單學(xué)生管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單學(xué)生管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-07-07java中接口和事件監(jiān)聽(tīng)器的深入理解
這篇文章主要給大家介紹了關(guān)于java中接口和事件監(jiān)聽(tīng)器的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12java如何用Processing生成馬賽克風(fēng)格的圖像
這篇文章主要介紹了如何用java如何用Processing生成馬賽克風(fēng)格的圖像,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-03-03Java 中的HashMap詳解和使用示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了Java 中的HashMap詳解和使用示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理,需要的朋友可以參考下2017-05-05Java實(shí)現(xiàn)單例模式之餓漢式、懶漢式、枚舉式
本篇文章主要介紹了Java實(shí)現(xiàn)單例的3種普遍的模式,餓漢式、懶漢式、枚舉式。具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2016-10-10