Kafka中的producer攔截器與consumer攔截器詳解
1. producer 攔截器(interceptor)
1.1 介紹
Producer 的Interceptor使得用戶在消息發(fā)送前以及Producer回調(diào)邏輯前有機(jī)會(huì)對(duì)消息做 一些定制化需求,比如修改消息等。Producer允許指定多個(gè)Interceptor按照指定順序作用于一條消 息從而形成一個(gè)攔截鏈(interceptor chain)。
自定義的攔截器(interceptor)需要實(shí)現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor接口,接口定義如下:
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface ProducerInterceptor<K, V> extends Configurable {
/**
該方法封裝進(jìn)KafkaProducer.send()方法中,方法會(huì)在消息發(fā)送之前被調(diào)用,用戶可以在該方法中對(duì)消息做任
何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會(huì)影響目標(biāo)分區(qū)的計(jì)算
*/
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
/**
該方法會(huì)在消息成功提交或發(fā)送失敗之后被調(diào)用,
KafkaProducer.send()異步發(fā)送有回調(diào)通知 callback, onAcknowledgement 的調(diào)用要早于 callback 的調(diào)用。
*/
void onAcknowledgement(RecordMetadata metadata, Exception exception);
/**
關(guān)閉Interceptor,主要用于執(zhí)行一些資源清理工作。
*/
void close();
}
- onSend() : 方法封裝進(jìn)KafkaProducer.send()方法中,方法會(huì)在消息發(fā)送之前被調(diào)用,用戶可以在該方法中對(duì)消息做任
- 何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會(huì)影響目標(biāo)分區(qū)的計(jì)算;
- onAcknowledgement(): 當(dāng)發(fā)送到服務(wù)器的記錄已被確認(rèn)時(shí),或者當(dāng)發(fā)送記錄在發(fā)送到服務(wù)器之前失敗時(shí)調(diào)用此方法。KafkaProducer.send()異步發(fā)送有回調(diào)通知 callback, onAcknowledgement 的調(diào)用要早于 callback 的調(diào)用。
- close(): 關(guān)閉Interceptor,主要用于執(zhí)行一些資源清理工作。
如果指定了多個(gè)Interceptor,則Producer將按照指定順序調(diào)用它們,如果interceptor出現(xiàn)異常Producer僅僅是捕獲每個(gè) Interceptor拋出的異常記錄到錯(cuò)誤日志中而非在向上傳遞。
1.2 案例
實(shí)現(xiàn)兩個(gè)攔截器(interceptor),組成攔截鏈。第一個(gè)攔截器在消息發(fā)送前,給消息添加header。第二個(gè)攔截器統(tǒng)計(jì)消息的發(fā)送成功數(shù)和失敗數(shù)。
PS:其實(shí)這兩個(gè)功能可以放在一個(gè)interceptor中,這里僅僅是為了演示多個(gè)interceptor。
定義攔截器
攔截器1:
public class MyInterceptor implements ProducerInterceptor<Integer,String> {
/**
* 消息發(fā)送之前調(diào)用
* @param record
* @return
*/
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
//消息的主題
String topic = record.topic();
Integer partition = record.partition();
Integer key = record.key();
String value = record.value();
Headers headers = record.headers();
//給header添加時(shí)間戳
String stamp = System.currentTimeMillis()+"";
headers.add("timestamp",stamp.getBytes(StandardCharsets.UTF_8));
ProducerRecord<Integer,String> resultRecord=
new ProducerRecord<>(topic,partition,key,value,headers);
return resultRecord;
}
/**
* 當(dāng)發(fā)送到服務(wù)器的記錄已被確認(rèn)時(shí),或者當(dāng)發(fā)送記錄在發(fā)送到服務(wù)器之前失敗時(shí)調(diào)用此方法
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}攔截器2:
public class MyInterceptor02 implements ProducerInterceptor<Integer,String> {
private int errorNum=0;
private int successNum=0;
/**
* 消息發(fā)送之前調(diào)用
* @param record
* @return
*/
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
return record;
}
/**
* 當(dāng)發(fā)送到服務(wù)器的記錄已被確認(rèn)時(shí),或者當(dāng)發(fā)送記錄在發(fā)送到服務(wù)器之前失敗時(shí)調(diào)用此方法
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception==null){
successNum++;
}else {
errorNum++;
}
}
@Override
public void close() {
System.out.println("消息發(fā)送成功數(shù): " + successNum);
System.out.println("消息發(fā)送失敗數(shù): " + errorNum);
}
@Override
public void configure(Map<String, ?> configs) {
}
}生產(chǎn)者
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"攔截器類全路徑");,多個(gè)攔截器使用,分割
public class KafkaProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
Map<String, Object> configs = new HashMap<>();
// 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
// 設(shè)置key的序列化類
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
// 設(shè)置value的序列化類
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.ACKS_CONFIG,"all");
//添加攔截器
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyInterceptor,com.warybee.interceptor.MyInterceptor02");
KafkaProducer<Integer,String> kafkaProducer=new KafkaProducer<Integer, String>(configs);
//發(fā)送100條消息
for (int i = 0; i < 100; i++) {
ProducerRecord<Integer,String> producerRecord=new ProducerRecord<>
( "test_topic_1",
0,
i,
"test topic msg "+i);
//消息的異步確認(rèn)
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception==null){
System.out.println("消息的主題:"+recordMetadata.topic());
System.out.println("消息的分區(qū):"+recordMetadata.partition());
System.out.println("消息的偏移量:"+recordMetadata.offset());
}else {
System.out.println("發(fā)送消息異常");
}
}
});
}
// 關(guān)閉生產(chǎn)者
kafkaProducer.close();
}
}2 Consumer攔截器(interceptor)
2.1.介紹
消費(fèi)者(Consumer)在拉取了分區(qū)消息之后,要首先經(jīng)過(guò)反序列化器對(duì)key和value進(jìn)行反序列化處理,處理完之后,如果消費(fèi)端設(shè)置了攔截器,則需要經(jīng)過(guò)攔截器的處理之后,才能返回給消費(fèi)者應(yīng)用程 序進(jìn)行處理。
- ConsumerInterceptor允許攔截甚至更改消費(fèi)者接收到的消息。
- 常用在于將第三方組件引入 消費(fèi)者應(yīng)用程序,用于定制的監(jiān)控、日志處理等。
- ConsumerInterceptor方法拋出的異常會(huì)被捕獲、記錄,但是不會(huì)向下傳播。如果用戶配置了 錯(cuò)誤的key或value類型參數(shù),消費(fèi)者不會(huì)拋出異常,而僅僅是記錄下來(lái)。
- 如果有多個(gè)攔截器,則該方法按照KafkaConsumer的configs中配置的順序調(diào)用。
- 從調(diào)用 KafkaConsumer.poll(long) 的同一線程調(diào)用 ConsumerInterceptor 回調(diào)。
自定義的攔截器(interceptor)需要實(shí)現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor接口,接口定義如下:
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
/**
該方法在poll方法返回之前調(diào)用。調(diào)用結(jié)束后poll方法就返回消息了。
該方法可以修改消費(fèi)者消息,返回新的消息。攔截器可以過(guò)濾收到的消息或生成新的消息。
*/
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
/**
當(dāng)消費(fèi)者提交偏移量時(shí),調(diào)用該方法。通常你可以在該方法中做一些記賬類的動(dòng)作,比如打日志等。
調(diào)用者將忽略此方法拋出的任何異常。
*/
void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
/**
* 關(guān)閉Interceptor之前調(diào)用
*/
void close();
}方法說(shuō)明:
- onConsume 該方法在poll方法返回之前調(diào)用。調(diào)用結(jié)束后poll方法就返回消息了。
- onCommit 當(dāng)消費(fèi)者提交偏移量時(shí),調(diào)用該方法。通常你可以在該方法中做一些記賬類的動(dòng)作,比如打日志等。
2.2 案例
定義攔截器
public class MyConsumerInterceptor implements ConsumerInterceptor<Integer,String> {
@Override
public ConsumerRecords<Integer, String> onConsume(ConsumerRecords<Integer, String> records) {
//在這里可以對(duì)接收到的消息進(jìn)行修改
//如不做處理,直接返回即可
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp,offsetAndMetadata) -> {
System.out.println(tp+" : "+offsetAndMetadata.offset());
});
}
@Override
public void close() {
}
/**
* 用于獲取消費(fèi)者的設(shè)置參數(shù)
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {
configs.forEach((k, v) -> {
System.out.println(k + "\t" + v);
});
}
}消費(fèi)者
在消費(fèi)者客戶端配置中增加如下配置
如果有多個(gè)攔截器,用,分割即可
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
public class KafkaConsumerDemo {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址
// 如果是集群,則可以通過(guò)此初始連接發(fā)現(xiàn)集群中的其他broker
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
//KEY反序列化類
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
//value反序列化類
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
//創(chuàng)建消費(fèi)者對(duì)象
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
List<String> topics = new ArrayList<>();
topics.add("test_topic_1");
//消費(fèi)者訂閱主題
consumer.subscribe(topics);
while (true){
//批量拉取主題消息,每3秒拉取一次
ConsumerRecords<Integer, String> records = consumer.poll(3000);
//變量消息
for (ConsumerRecord<Integer, String> record : records) {
System.out.println(record.topic() + "\t"
+ record.partition() + "\t"
+ record.offset() + "\t"
+ record.key() + "\t"
+ record.value());
}
}
}
}到此這篇關(guān)于Kafka中的producer攔截器與consumer攔截器詳解的文章就介紹到這了,更多相關(guān)producer攔截器與consumer攔截器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Cloud體系實(shí)現(xiàn)標(biāo)簽路由的方法示例
這篇文章主要介紹了Spring Cloud體系實(shí)現(xiàn)標(biāo)簽路由的方法示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-05-05
springboot中的Application.properties常用配置
這篇文章主要介紹了springboot中的Application.properties常用配置,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05
SSM框架下如何實(shí)現(xiàn)數(shù)據(jù)從后臺(tái)傳輸?shù)角芭_(tái)
這篇文章主要介紹了SSM框架下如何實(shí)現(xiàn)數(shù)據(jù)從后臺(tái)傳輸?shù)角芭_(tái),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05
java 使用ImageIO.writer從BufferedImage生成jpeg圖像遇到問(wèn)題總結(jié)及解決
這篇文章主要介紹了java 使用ImageIO.writer從BufferedImage生成jpeg圖像遇到問(wèn)題總結(jié)及解決的相關(guān)資料,需要的朋友可以參考下2017-03-03
spring事務(wù)的REQUIRES_NEW源碼示例解析
這篇文章主要為大家介紹了spring事務(wù)的REQUIRES_NEW源碼示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09
Java實(shí)現(xiàn)從Html文本中提取純文本的方法
今天小編就為大家分享一篇Java實(shí)現(xiàn)從Html文本中提取純文本的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-05-05
IDEA報(bào)錯(cuò):java?找不到符號(hào)圖文解決過(guò)程
這篇文章主要給大家介紹了關(guān)于IDEA報(bào)錯(cuò):java?找不到符號(hào)解決的相關(guān)資料,運(yùn)行項(xiàng)目時(shí)Idea報(bào)錯(cuò),提示找不到符號(hào),但是這個(gè)類在項(xiàng)目里是存在的,網(wǎng)上找了很多文章都沒(méi)解決,浪費(fèi)了一個(gè)下午終于弄好了,記錄一下,需要的朋友可以參考下2023-08-08

