SpringBoot 整合 Avro 與 Kafka的詳細(xì)過(guò)程
【需求】:生產(chǎn)者發(fā)送數(shù)據(jù)至 kafka 序列化使用 Avro,消費(fèi)者通過(guò) Avro 進(jìn)行反序列化,并將數(shù)據(jù)通過(guò) MyBatisPlus 存入數(shù)據(jù)庫(kù)。
一、環(huán)境介紹
【1】Apache Avro 1.8;【2】Spring Kafka 1.2;【3】Spring Boot 1.5;【4】Maven 3.5;
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.codenotfound</groupId> <artifactId>spring-kafka-avro</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-kafka-avro</name> <description>Spring Kafka - Apache Avro Serializer Deserializer Example</description> <url>https://www.codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <spring-kafka.version>1.2.2.RELEASE</spring-kafka.version> <avro.version>1.8.2</avro.version> </properties> <dependencies> <!-- spring-boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> <!-- avro --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>${avro.version}</version> </dependency> </dependencies> <build> <plugins> <!-- spring-boot-maven-plugin --> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <!-- avro-maven-plugin --> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>${avro.version}</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory> <outputDirectory>${project.build.directory}/generated/avro</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
二、Avro 文件
【1】Avro 依賴(lài)于由使用JSON定義的原始類(lèi)型組成的架構(gòu)。對(duì)于此示例,我們將使用Apache Avro入門(mén)指南中的“用戶”模式,如下所示。該模式存儲(chǔ)在src / main / resources / avro下的 user.avsc文件中。我這里使用的是 electronicsPackage.avsc。namespace 指定你生成 java 類(lèi)時(shí)指定的 package 路徑,name 表時(shí)生成的文件。
{"namespace": "com.yd.cyber.protocol.avro", "type": "record", "name": "ElectronicsPackage", "fields": [ {"name":"package_number","type":["string","null"],"default": null}, {"name":"frs_site_code","type":["string","null"],"default": null}, {"name":"frs_site_code_type","type":["string","null"],"default":null}, {"name":"end_allocate_code","type":["string","null"],"default": null}, {"name":"code_1","type":["string","null"],"default": null}, {"name":"aggregat_package_code","type":["string","null"],"default": null} ] }
【2】Avro附帶了代碼生成功能,該代碼生成功能使我們可以根據(jù)上面定義的“用戶”模式自動(dòng)創(chuàng)建Java類(lèi)。一旦生成了相關(guān)的類(lèi),就無(wú)需直接在程序中使用架構(gòu)。這些類(lèi)可以使用 avro-tools.jar 或項(xiàng)目是Maven 項(xiàng)目,調(diào)用 Maven Projects 進(jìn)行 compile 自動(dòng)生成 electronicsPackage.java 文件:如下是通過(guò) maven 的方式
【3】這將導(dǎo)致生成一個(gè) electronicsPackage.java 類(lèi),該類(lèi)包含架構(gòu)和許多 Builder構(gòu)造 electronicsPackage對(duì)象的方法。
三、為 Kafka 主題生成 Avro消息
Kafka Byte 在其主題中存儲(chǔ)和傳輸數(shù)組。但是,當(dāng)我們使用 Avro對(duì)象時(shí),我們需要在這些 Byte數(shù)組之間進(jìn)行轉(zhuǎn)換。在0.9.0.0版之前,Kafka Java API使用 Encoder/ Decoder接口的實(shí)現(xiàn)來(lái)處理轉(zhuǎn)換,但是在新API中,這些已經(jīng)被 Serializer/ Deserializer接口實(shí)現(xiàn)代替。Kafka附帶了許多 內(nèi)置(反)序列化器,但不包括Avro。為了解決這個(gè)問(wèn)題,我們將創(chuàng)建一個(gè) AvroSerializer類(lèi),該類(lèi)Serializer專(zhuān)門(mén)為 Avro對(duì)象實(shí)現(xiàn)接口。然后,我們實(shí)現(xiàn)將 serialize() 主題名稱(chēng)和數(shù)據(jù)對(duì)象作為輸入的方法,在本例中,該對(duì)象是擴(kuò)展的 Avro對(duì)象 SpecificRecordBase。該方法將Avro對(duì)象序列化為字節(jié)數(shù)組并返回結(jié)果。這個(gè)類(lèi)屬于通用類(lèi),一次配置多次使用。
package com.yd.cyber.web.avro; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Map; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; /** * avro序列化類(lèi) * @author zzx * @creat 2020-03-11-19:17 */ public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> { @Override public void close() {} @Override public void configure(Map<String, ?> arg0, boolean arg1) {} @Override public byte[] serialize(String topic, T data) { if(data == null) { return null; } DatumWriter<T> writer = new SpecificDatumWriter<>(data.getSchema()); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream , null); try { writer.write(data, binaryEncoder); binaryEncoder.flush(); byteArrayOutputStream.close(); }catch (IOException e) { throw new SerializationException(e.getMessage()); } return byteArrayOutputStream.toByteArray(); } }
四、AvroConfig 配置類(lèi)
Avro 配置信息在 AvroConfig 配置類(lèi)中,現(xiàn)在,我們需要更改,AvroConfig 開(kāi)始使用我們的自定義 Serializer實(shí)現(xiàn)。這是通過(guò)將“ VALUE_SERIALIZER_CLASS_CONFIG”屬性設(shè)置為 AvroSerializer該類(lèi)來(lái)完成的。此外,我們更改了ProducerFactory 和KafkaTemplate 通用類(lèi)型,使其指定 ElectronicsPackage 而不是 String。當(dāng)我們有多個(gè)序列化的時(shí)候,這個(gè)配置文件需要多次需求,添加自己需要序列化的對(duì)象。
package com.yd.cyber.web.avro; /** * @author zzx * @creat 2020-03-11-20:23 */ @Configuration @EnableKafka public class AvroConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.max-request-size}") private String maxRequestSize; @Bean public Map<String, Object> avroProducerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class); return props; } @Bean public ProducerFactory<String, ElectronicsPackage> elProducerFactory() { return new DefaultKafkaProducerFactory<>(avroProducerConfigs()); } @Bean public KafkaTemplate<String, ElectronicsPackage> elKafkaTemplate() { return new KafkaTemplate<>(elProducerFactory()); } }
五、通過(guò) kafkaTemplate 發(fā)送消息
最后就是通過(guò) Controller類(lèi)調(diào)用 kafkaTemplate 的 send 方法接受一個(gè)Avro electronicsPackage對(duì)象作為輸入。請(qǐng)注意,我們還更新了 kafkaTemplate 泛型類(lèi)型。
package com.yd.cyber.web.controller.aggregation; import com.yd.cyber.protocol.avro.ElectronicsPackage; import com.yd.cyber.web.vo.ElectronicsPackageVO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeanUtils; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * <p> * InnoDB free: 4096 kB 前端控制器 * </p> * * @author zzx * @since 2020-04-19 */ @RestController @RequestMapping("/electronicsPackageTbl") public class ElectronicsPackageController { //日誌 private static final Logger log = LoggerFactory.getLogger(ElectronicsPackageController.class); @Resource private KafkaTemplate<String,ElectronicsPackage> kafkaTemplate; @GetMapping("/push") public void push(){ ElectronicsPackageVO electronicsPackageVO = new ElectronicsPackageVO(); electronicsPackageVO.setElectId(9); electronicsPackageVO.setAggregatPackageCode("9"); electronicsPackageVO.setCode1("9"); electronicsPackageVO.setEndAllocateCode("9"); electronicsPackageVO.setFrsSiteCodeType("9"); electronicsPackageVO.setFrsSiteCode("9"); electronicsPackageVO.setPackageNumber("9"); ElectronicsPackage electronicsPackage = new ElectronicsPackage(); BeanUtils.copyProperties(electronicsPackageVO,electronicsPackage); //發(fā)送消息 kafkaTemplate.send("Electronics_Package",electronicsPackage); log.info("Electronics_Package TOPIC 發(fā)送成功"); } }
六、從 Kafka主題消費(fèi) Avro消息反序列化
收到的消息需要反序列化為 Avro格式。為此,我們創(chuàng)建一個(gè) AvroDeserializer 實(shí)現(xiàn)該 Deserializer接口的類(lèi)。該 deserialize()方法將主題名稱(chēng)和Byte數(shù)組作為輸入,然后將其解碼回Avro對(duì)象。從 targetType類(lèi)參數(shù)中檢索需要用于解碼的模式,該類(lèi)參數(shù)需要作為參數(shù)傳遞給 AvroDeserializer構(gòu)造函數(shù)。
package com.yd.cyber.web.avro; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.Arrays; import java.util.Map; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.xml.bind.DatatypeConverter; /** * avro反序列化 * @author fuyx * @creat 2020-03-12-15:19 */ public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> { //日志系統(tǒng) private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class); protected final Class<T> targetType; public AvroDeserializer(Class<T> targetType) { this.targetType = targetType; } @Override public void close() {} @Override public void configure(Map<String, ?> arg0, boolean arg1) {} @Override public T deserialize(String topic, byte[] data) { try { T result = null; if(data == null) { return null; } LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data)); ByteArrayInputStream in = new ByteArrayInputStream(data); DatumReader<GenericRecord> userDatumReader = new SpecificDatumReader<>(targetType.newInstance().getSchema()); BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null); result = (T) userDatumReader.read(null, decoder); LOGGER.debug("deserialized data='{}'", result); return result; } catch (Exception ex) { throw new SerializationException( "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex); } finally { } } }
七、反序列化的配置類(lèi)
我將反序列化的配置和序列化的配置都放置在 AvroConfig 配置類(lèi)中。在 AvroConfig 需要被這樣更新了AvroDeserializer用作值“VALUE_DESERIALIZER_CLASS_CONFIG”屬性。我們還更改了 ConsumerFactory 和 ConcurrentKafkaListenerContainerFactory通用類(lèi)型,以使其指定 ElectronicsPackage 而不是 String。將 DefaultKafkaConsumerFactory 通過(guò)1個(gè)新的創(chuàng)造 AvroDeserializer 是需要 “User.class”作為構(gòu)造函數(shù)的參數(shù)。需要使用Class<?> targetType,AvroDeserializer 以將消費(fèi) byte[]對(duì)象反序列化為適當(dāng)?shù)哪繕?biāo)對(duì)象(在此示例中為 ElectronicsPackage 類(lèi))。
@Configuration @EnableKafka public class AvroConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.max-request-size}") private String maxRequestSize; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro"); return props; } @Bean public ConsumerFactory<String, ElectronicsPackage> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new AvroDeserializer<>(ElectronicsPackage.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, ElectronicsPackage> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, ElectronicsPackage> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
八、消費(fèi)者消費(fèi)消息
消費(fèi)者通過(guò) @KafkaListener 監(jiān)聽(tīng)對(duì)應(yīng)的 Topic ,這里需要注意的是,網(wǎng)上直接獲取對(duì)象的參數(shù)傳的是對(duì)象,比如這里可能需要傳入 ElectronicsPackage 類(lèi),但是我這樣寫(xiě)的時(shí)候,error日志總說(shuō)是返回序列化的問(wèn)題,所以我使用 GenericRecord 對(duì)象接收,也就是我反序列化中定義的對(duì)象,是沒(méi)有問(wèn)題的。然后我將接收到的消息通過(guò) mybatisplus 存入到數(shù)據(jù)庫(kù)。
package com.zzx.cyber.web.controller.dataSource.intercompany; import com.zzx.cyber.web.service.ElectronicsPackageService; import com.zzx.cyber.web.vo.ElectronicsPackageVO; import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeanUtils; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Controller; import javax.annotation.Resource; /** * @desc: * @author: zzx * @creatdate 2020/4/1912:21 */ @Controller public class ElectronicsPackageConsumerController { //日志 private static final Logger log = LoggerFactory.getLogger(ElectronicsPackageConsumerController.class); //服務(wù)層 @Resource private ElectronicsPackageService electronicsPackageService; /** * 掃描數(shù)據(jù)測(cè)試 * @param genericRecordne */ @KafkaListener(topics = {"Electronics_Package"}) public void receive(GenericRecord genericRecordne) throws Exception { log.info("數(shù)據(jù)接收:electronicsPackage + "+ genericRecordne.toString()); //業(yè)務(wù)處理類(lèi),mybatispuls 自動(dòng)生成的類(lèi) ElectronicsPackageVO electronicsPackageVO = new ElectronicsPackageVO(); //將收的數(shù)據(jù)復(fù)制過(guò)來(lái) BeanUtils.copyProperties(genericRecordne,electronicsPackageVO); try { //落庫(kù) log.info("數(shù)據(jù)入庫(kù)"); electronicsPackageService.save(electronicsPackageVO); } catch (Exception e) { throw new Exception("插入異常"+e); } } }
到此這篇關(guān)于SpringBoot 整合 Avro 與 Kafka的文章就介紹到這了,更多相關(guān)SpringBoot 整合 Avro 與 Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot項(xiàng)目消費(fèi)Kafka數(shù)據(jù)的方法
- SpringBoot集成Kafka的實(shí)現(xiàn)示例
- SpringBoot整合Kafka完成生產(chǎn)消費(fèi)的方案
- springboot使用kafka推送數(shù)據(jù)到服務(wù)端的操作方法帶認(rèn)證
- SpringBoot使用Kafka來(lái)優(yōu)化接口請(qǐng)求的并發(fā)方式
- 如何使用SpringBoot集成Kafka實(shí)現(xiàn)用戶數(shù)據(jù)變更后發(fā)送消息
- Spring Boot 集成 Kafka的詳細(xì)步驟
- SpringKafka錯(cuò)誤處理(重試機(jī)制與死信隊(duì)列)
相關(guān)文章
springboot結(jié)合redis實(shí)現(xiàn)搜索欄熱搜功能及文字過(guò)濾
本文主要介紹了springboot結(jié)合redis實(shí)現(xiàn)搜索欄熱搜功能及文字過(guò)濾,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02Spring ApplicationListener監(jiān)聽(tīng)器用法詳解
這篇文章主要介紹了Spring ApplicationListener監(jiān)聽(tīng)器用法詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11使用java將動(dòng)態(tài)網(wǎng)頁(yè)生成靜態(tài)網(wǎng)頁(yè)示例
這篇文章主要介紹了使用java將動(dòng)態(tài)網(wǎng)頁(yè)生成靜態(tài)網(wǎng)頁(yè)示例,需要的朋友可以參考下2014-03-03Spring?使用注解存儲(chǔ)和讀取?Bean對(duì)象操作方法
在?Spring?中,要想更加簡(jiǎn)單的實(shí)現(xiàn)對(duì)?Bean?對(duì)象的儲(chǔ)存和使用,其核心就是使用?注解?,本文主要就是演示如何使用注解實(shí)現(xiàn)對(duì)?Bean?對(duì)象的存取操作,感興趣的朋友跟隨小編一起看看吧2023-08-08解決springmvc整合Mybatis的Log4j日志輸出問(wèn)題
這篇文章主要介紹了解決springmvc整合Mybatis的Log4j日志輸出問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07Spring Boot 中實(shí)現(xiàn)跨域的多種方式小結(jié)
Spring Boot提供了多種方式來(lái)實(shí)現(xiàn)跨域請(qǐng)求,開(kāi)發(fā)者可以根據(jù)具體需求選擇適合的方法,在配置時(shí),要確保不僅考慮安全性,還要兼顧應(yīng)用的靈活性和性能,本文給大家介紹Spring Boot 中實(shí)現(xiàn)跨域的多種方式,感興趣的朋友一起看看吧2024-01-01