欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot 整合 Avro 與 Kafka的詳細(xì)過程

 更新時間:2024年12月02日 14:15:25   作者:程序猿進(jìn)階  
本文介紹了如何在Spring Boot中使用Avro和Kafka進(jìn)行數(shù)據(jù)的序列化和反序列化,并通過MyBatisPlus將數(shù)據(jù)存入數(shù)據(jù)庫,感興趣的朋友跟隨小編一起看看吧

【需求】:生產(chǎn)者發(fā)送數(shù)據(jù)至 kafka 序列化使用 Avro,消費(fèi)者通過 Avro 進(jìn)行反序列化,并將數(shù)據(jù)通過 MyBatisPlus 存入數(shù)據(jù)庫。

一、環(huán)境介紹

【1】Apache Avro 1.8;【2】Spring Kafka 1.2;【3】Spring Boot 1.5;【4】Maven 3.5;

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.codenotfound</groupId>
  <artifactId>spring-kafka-avro</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>spring-kafka-avro</name>
  <description>Spring Kafka - Apache Avro Serializer Deserializer Example</description>
  <url>https://www.codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html</url>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.4.RELEASE</version>
  </parent>
  <properties>
    <java.version>1.8</java.version>
    <spring-kafka.version>1.2.2.RELEASE</spring-kafka.version>
    <avro.version>1.8.2</avro.version>
  </properties>
  <dependencies>
    <!-- spring-boot -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <!-- spring-kafka -->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>${spring-kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka-test</artifactId>
      <version>${spring-kafka.version}</version>
      <scope>test</scope>
    </dependency>
    <!-- avro -->
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>${avro.version}</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <!-- spring-boot-maven-plugin -->
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
      <!-- avro-maven-plugin -->
      <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>${avro.version}</version>
        <executions>
          <execution>
            <phase>generate-sources</phase>
            <goals>
              <goal>schema</goal>
            </goals>
            <configuration>
              <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
              <outputDirectory>${project.build.directory}/generated/avro</outputDirectory>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

二、Avro 文件

【1】Avro 依賴于由使用JSON定義的原始類型組成的架構(gòu)。對于此示例,我們將使用Apache Avro入門指南中的“用戶”模式,如下所示。該模式存儲在src / main / resources / avro下的 user.avsc文件中。我這里使用的是 electronicsPackage.avsc。namespace 指定你生成 java 類時指定的 package 路徑,name 表時生成的文件。

{"namespace": "com.yd.cyber.protocol.avro",
 "type": "record",
 "name": "ElectronicsPackage",
 "fields": [
     {"name":"package_number","type":["string","null"],"default": null},
     {"name":"frs_site_code","type":["string","null"],"default": null},
     {"name":"frs_site_code_type","type":["string","null"],"default":null},
     {"name":"end_allocate_code","type":["string","null"],"default": null},
     {"name":"code_1","type":["string","null"],"default": null},
     {"name":"aggregat_package_code","type":["string","null"],"default": null}
    ]
}

【2】Avro附帶了代碼生成功能,該代碼生成功能使我們可以根據(jù)上面定義的“用戶”模式自動創(chuàng)建Java類。一旦生成了相關(guān)的類,就無需直接在程序中使用架構(gòu)。這些類可以使用 avro-tools.jar 或項(xiàng)目是Maven 項(xiàng)目,調(diào)用 Maven Projects 進(jìn)行 compile 自動生成 electronicsPackage.java 文件:如下是通過 maven 的方式

【3】這將導(dǎo)致生成一個 electronicsPackage.java 類,該類包含架構(gòu)和許多 Builder構(gòu)造 electronicsPackage對象的方法。

三、為 Kafka 主題生成 Avro消息

Kafka Byte 在其主題中存儲和傳輸數(shù)組。但是,當(dāng)我們使用 Avro對象時,我們需要在這些 Byte數(shù)組之間進(jìn)行轉(zhuǎn)換。在0.9.0.0版之前,Kafka Java API使用 Encoder/ Decoder接口的實(shí)現(xiàn)來處理轉(zhuǎn)換,但是在新API中,這些已經(jīng)被 Serializer/ Deserializer接口實(shí)現(xiàn)代替。Kafka附帶了許多 內(nèi)置(反)序列化器,但不包括Avro。為了解決這個問題,我們將創(chuàng)建一個 AvroSerializer類,該類Serializer專門為 Avro對象實(shí)現(xiàn)接口。然后,我們實(shí)現(xiàn)將 serialize() 主題名稱和數(shù)據(jù)對象作為輸入的方法,在本例中,該對象是擴(kuò)展的 Avro對象 SpecificRecordBase。該方法將Avro對象序列化為字節(jié)數(shù)組并返回結(jié)果。這個類屬于通用類,一次配置多次使用。

package com.yd.cyber.web.avro;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
/**
 *  avro序列化類
 * @author zzx
 * @creat 2020-03-11-19:17
 */
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {
    @Override
    public void close() {}
    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {}
    @Override
    public byte[] serialize(String topic, T data) {
        if(data == null) {
            return null;
        }
        DatumWriter<T> writer = new SpecificDatumWriter<>(data.getSchema());
        ByteArrayOutputStream byteArrayOutputStream  = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder  = EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream , null);
        try {
            writer.write(data, binaryEncoder);
            binaryEncoder.flush();
            byteArrayOutputStream.close();
        }catch (IOException e) {
            throw new SerializationException(e.getMessage());
        }
        return byteArrayOutputStream.toByteArray();
    }
}

四、AvroConfig 配置類

Avro 配置信息在 AvroConfig 配置類中,現(xiàn)在,我們需要更改,AvroConfig 開始使用我們的自定義 Serializer實(shí)現(xiàn)。這是通過將“ VALUE_SERIALIZER_CLASS_CONFIG”屬性設(shè)置為 AvroSerializer該類來完成的。此外,我們更改了ProducerFactory 和KafkaTemplate 通用類型,使其指定 ElectronicsPackage 而不是 String。當(dāng)我們有多個序列化的時候,這個配置文件需要多次需求,添加自己需要序列化的對象。

package com.yd.cyber.web.avro;
/**
 * @author zzx
 * @creat 2020-03-11-20:23
 */
@Configuration
@EnableKafka
public class AvroConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.producer.max-request-size}")
    private String maxRequestSize;
    @Bean
    public Map<String, Object> avroProducerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory<String, ElectronicsPackage> elProducerFactory() {
        return new DefaultKafkaProducerFactory<>(avroProducerConfigs());
    }
    @Bean
    public KafkaTemplate<String, ElectronicsPackage> elKafkaTemplate() {
        return new KafkaTemplate<>(elProducerFactory());
    }
}

五、通過 kafkaTemplate 發(fā)送消息

最后就是通過 Controller類調(diào)用 kafkaTemplate 的 send 方法接受一個Avro electronicsPackage對象作為輸入。請注意,我們還更新了 kafkaTemplate 泛型類型。

package com.yd.cyber.web.controller.aggregation;
import com.yd.cyber.protocol.avro.ElectronicsPackage;
import com.yd.cyber.web.vo.ElectronicsPackageVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
 * <p>
 * InnoDB free: 4096 kB 前端控制器
 * </p>
 *
 * @author zzx
 * @since 2020-04-19
 */
@RestController
@RequestMapping("/electronicsPackageTbl")
public class ElectronicsPackageController {
    //日誌
    private static final Logger log = LoggerFactory.getLogger(ElectronicsPackageController.class);
    @Resource
    private KafkaTemplate<String,ElectronicsPackage> kafkaTemplate;
    @GetMapping("/push")
    public void push(){
        ElectronicsPackageVO electronicsPackageVO = new ElectronicsPackageVO();
        electronicsPackageVO.setElectId(9);
        electronicsPackageVO.setAggregatPackageCode("9");
        electronicsPackageVO.setCode1("9");
        electronicsPackageVO.setEndAllocateCode("9");
        electronicsPackageVO.setFrsSiteCodeType("9");
        electronicsPackageVO.setFrsSiteCode("9");
        electronicsPackageVO.setPackageNumber("9");
        ElectronicsPackage electronicsPackage = new ElectronicsPackage();
        BeanUtils.copyProperties(electronicsPackageVO,electronicsPackage);
        //發(fā)送消息
        kafkaTemplate.send("Electronics_Package",electronicsPackage);
        log.info("Electronics_Package TOPIC 發(fā)送成功");
    }
}

六、從 Kafka主題消費(fèi) Avro消息反序列化

收到的消息需要反序列化為 Avro格式。為此,我們創(chuàng)建一個 AvroDeserializer 實(shí)現(xiàn)該 Deserializer接口的類。該 deserialize()方法將主題名稱和Byte數(shù)組作為輸入,然后將其解碼回Avro對象。從 targetType類參數(shù)中檢索需要用于解碼的模式,該類參數(shù)需要作為參數(shù)傳遞給 AvroDeserializer構(gòu)造函數(shù)。

package com.yd.cyber.web.avro;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.DatatypeConverter;
/**
 *  avro反序列化
 * @author fuyx
 * @creat 2020-03-12-15:19
 */
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
    //日志系統(tǒng)
    private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class);
    protected final Class<T> targetType;
    public AvroDeserializer(Class<T> targetType) {
        this.targetType = targetType;
    }
    @Override
    public void close() {}
    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {}
    @Override
    public T deserialize(String topic, byte[] data) {
        try {
            T result = null;
            if(data == null) {
                return null;
            }
            LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
            ByteArrayInputStream in = new ByteArrayInputStream(data);
            DatumReader<GenericRecord> userDatumReader = new SpecificDatumReader<>(targetType.newInstance().getSchema());
            BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
            result = (T) userDatumReader.read(null, decoder);
            LOGGER.debug("deserialized data='{}'", result);
            return result;
        } catch (Exception ex) {
            throw new SerializationException(
                    "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
        } finally {
        }
    }
}

七、反序列化的配置類

我將反序列化的配置和序列化的配置都放置在 AvroConfig 配置類中。在 AvroConfig 需要被這樣更新了AvroDeserializer用作值“VALUE_DESERIALIZER_CLASS_CONFIG”屬性。我們還更改了 ConsumerFactory 和 ConcurrentKafkaListenerContainerFactory通用類型,以使其指定 ElectronicsPackage 而不是 String。將 DefaultKafkaConsumerFactory 通過1個新的創(chuàng)造 AvroDeserializer 是需要 “User.class”作為構(gòu)造函數(shù)的參數(shù)。需要使用Class<?> targetType,AvroDeserializer 以將消費(fèi) byte[]對象反序列化為適當(dāng)?shù)哪繕?biāo)對象(在此示例中為 ElectronicsPackage 類)。

@Configuration
@EnableKafka
public class AvroConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.producer.max-request-size}")
    private String maxRequestSize;
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro");
        return props;
    }
    @Bean
    public ConsumerFactory<String, ElectronicsPackage> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new AvroDeserializer<>(ElectronicsPackage.class));
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ElectronicsPackage> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ElectronicsPackage> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

八、消費(fèi)者消費(fèi)消息

消費(fèi)者通過 @KafkaListener 監(jiān)聽對應(yīng)的 Topic ,這里需要注意的是,網(wǎng)上直接獲取對象的參數(shù)傳的是對象,比如這里可能需要傳入 ElectronicsPackage 類,但是我這樣寫的時候,error日志總說是返回序列化的問題,所以我使用 GenericRecord 對象接收,也就是我反序列化中定義的對象,是沒有問題的。然后我將接收到的消息通過 mybatisplus 存入到數(shù)據(jù)庫。

package com.zzx.cyber.web.controller.dataSource.intercompany;
import com.zzx.cyber.web.service.ElectronicsPackageService;
import com.zzx.cyber.web.vo.ElectronicsPackageVO;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Controller;
import javax.annotation.Resource;
/**
 * @desc:
 * @author: zzx
 * @creatdate 2020/4/1912:21
 */
@Controller
public class ElectronicsPackageConsumerController {
    //日志
    private static final Logger log  = LoggerFactory.getLogger(ElectronicsPackageConsumerController.class);
    //服務(wù)層
    @Resource
    private ElectronicsPackageService electronicsPackageService;
    /**
     * 掃描數(shù)據(jù)測試
     * @param genericRecordne
     */
    @KafkaListener(topics = {"Electronics_Package"})
    public void receive(GenericRecord genericRecordne) throws Exception {
        log.info("數(shù)據(jù)接收:electronicsPackage + "+  genericRecordne.toString());
        //業(yè)務(wù)處理類,mybatispuls 自動生成的類
        ElectronicsPackageVO electronicsPackageVO = new ElectronicsPackageVO();
        //將收的數(shù)據(jù)復(fù)制過來
        BeanUtils.copyProperties(genericRecordne,electronicsPackageVO);
        try {
            //落庫
            log.info("數(shù)據(jù)入庫");
            electronicsPackageService.save(electronicsPackageVO);
        } catch (Exception e) {
            throw new Exception("插入異常"+e);
        }
    }
}

到此這篇關(guān)于SpringBoot 整合 Avro 與 Kafka的文章就介紹到這了,更多相關(guān)SpringBoot 整合 Avro 與 Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java中的運(yùn)算符有哪些詳解

    Java中的運(yùn)算符有哪些詳解

    這篇文章主要給大家介紹了關(guān)于Java中運(yùn)算符有哪些的相關(guān)資料,包括算術(shù)運(yùn)算符、關(guān)系運(yùn)算符、邏輯運(yùn)算符、位運(yùn)算符、增量運(yùn)算符和自增/自減運(yùn)算符,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2024-10-10
  • springboot結(jié)合redis實(shí)現(xiàn)搜索欄熱搜功能及文字過濾

    springboot結(jié)合redis實(shí)現(xiàn)搜索欄熱搜功能及文字過濾

    本文主要介紹了springboot結(jié)合redis實(shí)現(xiàn)搜索欄熱搜功能及文字過濾,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • Spring ApplicationListener監(jiān)聽器用法詳解

    Spring ApplicationListener監(jiān)聽器用法詳解

    這篇文章主要介紹了Spring ApplicationListener監(jiān)聽器用法詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-11-11
  • Spring IOC原理詳解

    Spring IOC原理詳解

    這篇文章主要介紹了Spring IOC原理詳解,具有一定借鑒價值,需要的朋友可以參考下。
    2017-12-12
  • 使用java將動態(tài)網(wǎng)頁生成靜態(tài)網(wǎng)頁示例

    使用java將動態(tài)網(wǎng)頁生成靜態(tài)網(wǎng)頁示例

    這篇文章主要介紹了使用java將動態(tài)網(wǎng)頁生成靜態(tài)網(wǎng)頁示例,需要的朋友可以參考下
    2014-03-03
  • Spring?使用注解存儲和讀取?Bean對象操作方法

    Spring?使用注解存儲和讀取?Bean對象操作方法

    在?Spring?中,要想更加簡單的實(shí)現(xiàn)對?Bean?對象的儲存和使用,其核心就是使用?注解?,本文主要就是演示如何使用注解實(shí)現(xiàn)對?Bean?對象的存取操作,感興趣的朋友跟隨小編一起看看吧
    2023-08-08
  • Java效率工具之Lombok的具體使用

    Java效率工具之Lombok的具體使用

    這篇文章主要介紹了Java效率工具之Lombok的具體使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-07-07
  • 解決springmvc整合Mybatis的Log4j日志輸出問題

    解決springmvc整合Mybatis的Log4j日志輸出問題

    這篇文章主要介紹了解決springmvc整合Mybatis的Log4j日志輸出問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • GitLab在IDEA中回滾主分支問題

    GitLab在IDEA中回滾主分支問題

    這是工作中遇到的問題,記錄下來,也方便自己后面查看操作步驟,也方便各位遇到這個問題,不至于卡太久,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • Spring Boot 中實(shí)現(xiàn)跨域的多種方式小結(jié)

    Spring Boot 中實(shí)現(xiàn)跨域的多種方式小結(jié)

    Spring Boot提供了多種方式來實(shí)現(xiàn)跨域請求,開發(fā)者可以根據(jù)具體需求選擇適合的方法,在配置時,要確保不僅考慮安全性,還要兼顧應(yīng)用的靈活性和性能,本文給大家介紹Spring Boot 中實(shí)現(xiàn)跨域的多種方式,感興趣的朋友一起看看吧
    2024-01-01

最新評論