欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文詳解kafka序列化器和攔截器

 更新時間:2023年03月29日 09:43:54   作者:劉牌  
這篇文章主要為大家介紹了kafka序列化器和攔截器使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

介紹

本篇主要介紹kafka的攔截器和序列化器,序列化器是和數(shù)據(jù)在網(wǎng)絡(luò)中的傳輸有關(guān),數(shù)據(jù)在網(wǎng)絡(luò)中的傳輸為字節(jié)流,所以生產(chǎn)者在發(fā)送時需要將其序列化為字節(jié)流,消費者收到消息時,需要將字節(jié)流反序列化為我們能夠識別的對象,我們不難看出,這就是RPC通信,kafka中實現(xiàn)了很多自定義協(xié)議,我們知道,在RPC通信中,只有生產(chǎn)者和消費者的協(xié)議一樣,才能相互傳輸和解析數(shù)據(jù),在使用HTTP時,我們就不用去關(guān)注協(xié)議本身,因為HTTP是TCP的上層建筑,它自己實現(xiàn)了一套協(xié)議,我們不用去關(guān)注,但是使用RPC,我們是面向TCP編程,所以自然得約定和實現(xiàn)自己的協(xié)議,而序列化就是這過程中很重要的一部分。

攔截器是一個隨處可見的詞,基本上很多框架中都有攔截器機制,它的作用主要是對請求進行攔截,我們可以對請求進行過濾和處理,以達到業(yè)務(wù)目的,比如Spring中有HandlerInterceptor攔截器,在kafka種也有攔截器,我們可以自定義攔截器,對消息進行攔截,比如某些異常消息我們不需要發(fā)送,那么就將其攔截下來。

序列化器

數(shù)據(jù)在網(wǎng)絡(luò)中傳輸是以字節(jié)流的形式進行傳輸,在生產(chǎn)者端發(fā)送消息需要先進行序列化,消費者端進行反序列化,序列化的方式有很多,比如jdk,json,protobuf,kryo,hessian,avro等等,在大數(shù)據(jù)量的傳輸中,序列化和反序列化的效率對吞吐量有一定的影響,kafka提供了許多序列化和反序列化器,如StringDeserializerStringSerializer,如果我們需要自定義一個序列化和反序列化器,那么實現(xiàn)Serializer,Deserializer接口即可。

如下,kafka生產(chǎn)者在發(fā)送消息到broker之前需要序列化,消費者從broker獲取消息后需要反序列化。

設(shè)置序列化和反序列化

生產(chǎn)者端設(shè)置序列化

//序列化
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());

消費者端設(shè)置反序列化

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());

自定義序列化

/**
 * 功能說明: JSON序列化
 * <p>
 * Original @Author: steakliu , 2022-11-02  15:14
 */
public class JsonSerializer<T> implements Serializer<T> {

  @Override
  public byte[] serialize(String topic, T obj) {
    try {
      return obj == null ? null : JSON.toJSONBytes(obj);
    }catch (Exception e){
      throw new SerializationException("json serializing exception");
    }
  }
  
}

自定義反序列化

/**
 * 功能說明:JSON反序列化
 * <p>
 * Original @Author: steakliu-劉牌, 2022-11-11  09:38
 */
public class  JsonDeserializer<T> implements Deserializer<T> {
  @Override
  public T deserialize(String topic, byte[] data) {
    return (T) JSON.parse(data);
  }
}

如上簡單的使用fastjson作為序列化和反序列化工具,演示了自定義kafka的序列化和反序列化機制,我們可以根據(jù)實際情況來設(shè)計不同的序列化反序列化機制,當(dāng)然,不會是像上面這些簡單,如果使用spring,那么spring提供了JSON序列化和反序列化器直接使用。

思考

雖然我們可以自定義序列化和反序列化器,但是自定義序列化和反序列化器在使用上也要保持一些一致,也就是說生產(chǎn)者和消費者要保持使用一種類型的序列化機制,不然會出現(xiàn)消息轉(zhuǎn)換問題,如果我們以kafka的方式向別人提供服務(wù),那么他們就需要使用我們的制定的序列化方式,所以這可能就存在一定的耦合,如果使用Kafka的String序列化和反序列化機制,因為是它是默認(rèn)方式并且是字符串,通用性比較好,所以就不用去考慮序列化和反序列化,直接拿到字符串轉(zhuǎn)為對象,再進行業(yè)務(wù)處理,使用自定義序列化的話,就直接拿到序列化后的對象,不用進行字符串轉(zhuǎn)對象操作。

在實際場景中,我們可以根據(jù)自己的業(yè)務(wù)來使用何種序列化方式,沒有最好的,只有合適的。

攔截器

kafka中消費者和生產(chǎn)者都有攔截器,分別為ConsumerInterceptorProducerInterceptor,只需實現(xiàn)它們即可實現(xiàn)攔截,加入攔截器后,生產(chǎn)者會在發(fā)送消息之前對消息進行攔截處理,消費者在收到消息之前也會經(jīng)過攔截器,那么我們就可以在攔截器中加入一些自己需要的邏輯。

如下消費者攔截器對消息進行攔截,如果有異常消息,則對異常消息進行處理,只要需要對消息進行處理,監(jiān)控等,都可以使用攔截器。

/**
 * 功能說明: 消費者攔截器
 * <p>
 * Original @Author: steakliu-劉牌, 2023-03-15  10:17
 */
public class MyConsumerInterceptor implements ConsumerInterceptor<String, Message> {

  @Override
  public ConsumerRecords<String, Message> onConsume(ConsumerRecords<String, Message> records) {
    long currentTimeMillis = System.currentTimeMillis();
    records.forEach(record -> {
      if ("消息異常".equals(record.value().getMessageText())) {
        //處理異常消息
        this.handleMsg(record);
      }
    });
    return records;
  }
  
  private void handleMsg(ConsumerRecord<String, Message> record) {
    //處理異常消息
  }
  @Override
  public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
  @Override
  public void close() {}
  @Override
  public void configure(Map<String, ?> configs) { }
}

攔截器可以有多個,如果設(shè)置多個攔截器,那么就形成一個攔截器鏈,一個一個地執(zhí)行。

下面是使用spring-kafka時所配置的攔截器和序列化器的基本配置。

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 反序列化器
      key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        # 攔截器
        interceptor:
          classes: com.steakliu.kafka.interceptor.MyConsumerInterceptor
        spring:
          json:
            trusted:
              packages: '*'
    producer:
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        # 攔截器
        interceptor:
          classes: com.steakliu.kafka.interceptor.MyProducerInterceptor,com.steakliu.kafka.interceptor.MyProducerInterceptor2

總結(jié)

對于攔截器和序列化器,我們上面作了簡單地描述和示例,對于它們,可能我們都不怎么去去用甚至沒有用過,但是還是很有必要去了解的,了解它的設(shè)計和思想,在一些特殊的場景可能會用到。

以上就是一文詳解kafka序列化器和攔截器的詳細內(nèi)容,更多關(guān)于kafka序列化器攔截器的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • ireport數(shù)據(jù)表格報表的簡單使用

    ireport數(shù)據(jù)表格報表的簡單使用

    這篇文章給大家介紹了如何畫一個報表模板,這里介紹下畫表格需要用到的組件,文中通過圖文并茂的形式給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2021-10-10
  • Java中的CopyOnWriteArrayList原理詳解

    Java中的CopyOnWriteArrayList原理詳解

    這篇文章主要介紹了Java中的CopyOnWriteArrayList原理詳解,如源碼所示,CopyOnWriteArrayList和ArrayList一樣,都在內(nèi)部維護了一個數(shù)組,操作CopyOnWriteArrayList其實就是在操作內(nèi)部的數(shù)組,需要的朋友可以參考下
    2023-12-12
  • Java查詢MongoDB數(shù)據(jù)庫案例大全

    Java查詢MongoDB數(shù)據(jù)庫案例大全

    這篇文章主要給大家介紹了關(guān)于Java查詢MongoDB數(shù)據(jù)庫的一些相關(guān)案例,Java可以使用MongoDB的官方Java驅(qū)動程序來連接和操作MongoDB數(shù)據(jù)庫,需要的朋友可以參考下
    2023-07-07
  • SpringBoot項目打包成war包并部署在tomcat上運行的操作步驟

    SpringBoot項目打包成war包并部署在tomcat上運行的操作步驟

    我們開發(fā) SpringBoot 項目有時我們會需要打包成 war 包,放入外置的 Tomcat 中進行運行,或者使用工具idea直接啟動,便于開發(fā)調(diào)試,本文給大家分享SpringBoot項目打包成war包并部署在tomcat上運行的操作步驟,感興趣的朋友一起看看吧
    2024-03-03
  • SpringBoot如何使用RequestBodyAdvice進行統(tǒng)一參數(shù)處理

    SpringBoot如何使用RequestBodyAdvice進行統(tǒng)一參數(shù)處理

    這篇文章主要介紹了SpringBoot使用RequestBodyAdvice進行統(tǒng)一參數(shù)處理方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • 如何用Java的swing編寫簡單計算器

    如何用Java的swing編寫簡單計算器

    這篇文章主要給大家介紹了關(guān)于如何用Java的swing編寫簡單計算器的相關(guān)資料,通過本文可以設(shè)計一個圖形界面的簡易計算器,完成簡單的算術(shù)運算符,可以完成加法、減法、乘法、除法和取余運算,需要的朋友可以參考下
    2023-12-12
  • java多線程中的volatile和synchronized用法分析

    java多線程中的volatile和synchronized用法分析

    這篇文章主要介紹了java多線程中的volatile和synchronized用法分析,以實例的形式分析了在多線程中volatile和synchronized的用法區(qū)別與使用原理,具有一定的參考借鑒價值,需要的朋友可以參考下
    2014-12-12
  • SpringCloud中NacosNamingService的作用詳解

    SpringCloud中NacosNamingService的作用詳解

    這篇文章主要介紹了SpringCloud中NacosNamingService的作用詳解,NacosNamingService類完成服務(wù)實例注冊,撤銷與獲取服務(wù)實例操作,NacosNamingService初始化采用單例模式,使用反射生成,需要的朋友可以參考下
    2023-11-11
  • Java源碼解析HashMap的resize函數(shù)

    Java源碼解析HashMap的resize函數(shù)

    今天小編就為大家分享一篇關(guān)于Java源碼解析HashMap的resize函數(shù),小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-01-01
  • 解決HttpServletResponse和HttpServletRequest取值的2個坑

    解決HttpServletResponse和HttpServletRequest取值的2個坑

    這篇文章主要介紹了解決HttpServletResponse和HttpServletRequest取值的2個坑問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12

最新評論