Kafka中使用Avro序列化和反序列化詳解
1.自定義序列化器與反序列化器
1.1 定義Order實體類
public class Order implements Serializable { private Integer OrderId; private String title; public Order() { } public Order(Integer orderId, String title) { OrderId = orderId; this.title = title; } // 省略必要的get與set方法 }
1.2 定義Order序列化類
由于Kafka中的數(shù)據(jù)都是字節(jié)數(shù)組,在將消息發(fā)送到Kafka之前需要先將數(shù)據(jù)序列化為字節(jié)數(shù)組。 序列化器的作用就是用于序列化要發(fā)送的消息的。
Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定義序列化器,將 泛型指定類型的數(shù)據(jù)轉(zhuǎn)換為字節(jié)數(shù)組。
Kafka提供了如下常用類型的序列化類:
自定義序列化器需要實現(xiàn)org.apache.kafka.common.serialization.Serializer接口,并實現(xiàn)其中 的 serialize 方法。
public class OrderSerializer implements Serializer<Order> { @Override public byte[] serialize(String topic, Order data) { try { if (data == null) return null; Integer orderId = data.getOrderId(); String title = data.getTitle(); int length = 0; byte[] bytes = null; if (null != title) { bytes = title.getBytes("utf-8"); length = bytes.length; } //前4個字節(jié)保存orderId, //第二個4個字節(jié)保存title字段的長度 ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length); buffer.putInt(orderId); buffer.putInt(length); buffer.put(bytes); return buffer.array(); } catch (UnsupportedEncodingException e) { throw new SerializationException("序列化數(shù)據(jù)異常"); } } }
1.3 生產(chǎn)者代碼
package com.warybee.c1; import com.warybee.model.Order; import com.warybee.serializer.OrderSerializer; import org.apache.kafka.clients.producer.*; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { Map<String, Object> configs = new HashMap<>(); // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址 configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); // 設(shè)置key的序列化類 configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); // 設(shè)置自定義的序列化類 configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, OrderSerializer.class); configs.put(ProducerConfig.ACKS_CONFIG,"all"); KafkaProducer<Integer, Order> kafkaProducer=new KafkaProducer<Integer, Order>(configs); //定義order Order order=new Order(); order.setOrderId(1); order.setTitle("iphone13 pro 256G"); ProducerRecord<Integer,Order> producerRecord=new ProducerRecord<Integer,Order> ( "test_order_topic", 0, order.getOrderId(), order); //消息的異步確認 kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (exception==null){ System.out.println("消息的主題:"+recordMetadata.topic()); System.out.println("消息的分區(qū):"+recordMetadata.partition()); System.out.println("消息的偏移量:"+recordMetadata.offset()); }else { System.out.println("發(fā)送消息異常"); } } }); // 關(guān)閉生產(chǎn)者 kafkaProducer.close(); } }
1.4. 定義Order反序列化器
自定義反序列化類,需要實現(xiàn) org.apache.kafka.common.serialization.Deserializer 接 口,并且實現(xiàn)其中的deserialize方法。
package com.warybee.deserializer; import com.warybee.model.Order; import org.apache.kafka.common.serialization.Deserializer; import java.nio.ByteBuffer; /** * @author joy * @description Order反序列化類 */ public class OrderDeserializer implements Deserializer<Order> { @Override public Order deserialize(String topic, byte[] data) { ByteBuffer allocate = ByteBuffer.allocate(data.length); allocate.put(data); allocate.flip(); Integer orderId = allocate.getInt(); int length = allocate.getInt(); String title = new String(data, 8, length); return new Order(orderId, title); } }
1.5 消費者代碼
package com.warybee.c1; import com.warybee.deserializer.OrderDeserializer; import com.warybee.model.Order; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author joy */ public class KafkaConsumerDemo { public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址 // 如果是集群,則可以通過此初始連接發(fā)現(xiàn)集群中的其他broker configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); //KEY反序列化類 configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); //自定義value反序列化類 configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, OrderDeserializer.class); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo-3"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //創(chuàng)建消費者對象 KafkaConsumer<Integer, Order> consumer = new KafkaConsumer<Integer, Order>(configs); List<String> topics = new ArrayList<>(); topics.add("test_order_topic"); //消費者訂閱主題 consumer.subscribe(topics); while (true){ //批量拉取主題消息,每3秒拉取一次 ConsumerRecords<Integer, Order> records = consumer.poll(3000); //變量消息 for (ConsumerRecord<Integer, Order> record : records) { System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value().getTitle()); } } } }
上面實現(xiàn)的序列化與反序列化類,定義繁瑣,不具有通用性,一不小心就會BUG滿天飛,不適合在實際項目中使用,只做了解原理即可。接下來使用Apache Avro來實現(xiàn)序列化和反序列化
2. 使用Avro序列化和反序列化
2.1 Apache Avro介紹
Apache Avro是一種與編程語言無關(guān)的序列化格式。提供了一種共享數(shù)據(jù)文件的方式。Avro 數(shù)據(jù)通過與語言無關(guān)的 schema 來定義。schema 通過 JSON 來描述,數(shù)據(jù)被序列化成二進制文件或 JSON 文件,不過一般會使用二進制文件。Avro 在讀寫文件時需要用到 schema,schema 一般會被內(nèi)嵌在數(shù)據(jù)文件里。
2.2 創(chuàng)建Maven項目
引入依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.7</version> </dependency> <!--Apache Avro 依賴--> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.0</version> </dependency>
Avro 插件
<build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.0</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/java/com/warybee/avro/schema/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
Avro插件參數(shù)說明:
- sourceDirectory:schema 文件所在目錄
- outputDirectory:根據(jù)schema 文件生成的類文件到哪個目錄
2.3 創(chuàng)建schema 文件
Apache Avro schema 是使用 JSON 定義的。詳細的介紹,可以參考官網(wǎng)文檔
定義一個order.avsc文件(文件所在目錄要與上一步配置的sourceDirectory保持一致),內(nèi)容如下:
{"namespace": "com.warybee.avro", "type": "record", "name": "Order", "fields": [ {"name": "orderId", "type": "int"}, {"name": "title", "type": "string"}, {"name": "num", "type": "int"} ] }
IDEA話,可以安裝一個Apache Avro IDL Schema Support插件,安裝插件后編寫schema 有智能提示。
2.4.Avro生成entity
上面配置了Avro 插件,通過maven命令,生成即可。
mvn install
或者IDEA右鍵->RUN Maven->install
2.5 生產(chǎn)者代碼
package com.warybee.avro; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.kafka.clients.producer.*; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; /** * @author joy */ public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { Map<String, Object> configs = new HashMap<>(); // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址 configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); // 設(shè)置key的序列化類 configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); // 設(shè)置value的序列化類 configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configs.put(ProducerConfig.ACKS_CONFIG,"all"); KafkaProducer<Integer,byte[]> producer=new KafkaProducer<Integer, byte[]>(configs); //發(fā)送100條消息 for (int i = 0; i < 100; i++) { Order order=Order.newBuilder() .setOrderId(i+1) .setTitle("訂單: "+(i+1)+" iphone 13 pro 256G") .setNum(1) .build(); ByteArrayOutputStream out = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, (BinaryEncoder)null); SpecificDatumWriter writer = new SpecificDatumWriter(order.getSchema()); try { writer.write(order, encoder); encoder.flush(); out.close(); } catch (IOException e) { e.printStackTrace(); } ProducerRecord<Integer,byte[]> record=new ProducerRecord<>( "test_avro_topic", 0, order.getOrderId(), out.toByteArray()); //發(fā)送消息 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception==null){ System.out.println("消息的主題:"+metadata.topic()); System.out.println("消息的分區(qū):"+metadata.partition()); System.out.println("消息的偏移量:"+metadata.offset()); }else { System.out.println("發(fā)送消息異常"); } } }); } // 關(guān)閉生產(chǎn)者 producer.close(); } }
2.6 消費者代碼
package com.warybee.avro; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author joy */ public class KafkaConsumerDemo { public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址 // 如果是集群,則可以通過此初始連接發(fā)現(xiàn)集群中的其他broker configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); //KEY反序列化類 configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); //value反序列化類 configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); //消費者所在的組ID configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo.avro"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //創(chuàng)建消費者對象 KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<Integer, byte[]>(configs); List<String> topics = new ArrayList<>(); topics.add("test_avro_topic"); //消費者訂閱主題 consumer.subscribe(topics); SpecificDatumReader<Order> reader = new SpecificDatumReader<>(Order.getClassSchema()); try { while (true){ //批量拉取主題消息,每3秒拉取一次 ConsumerRecords<Integer, byte[]> records = consumer.poll(3000); for (ConsumerRecord<Integer, byte[]> record : records) { Decoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null); Order order=null; try { order=reader.read(null,decoder); System.out.println("訂單ID:"+order.getOrderId()+"\t" +"訂單標(biāo)題:"+order.getTitle()+"\t" +"數(shù)量:"+order.getNum()); } catch (IOException e) { e.printStackTrace(); } } } }finally { consumer.close(); } } }
到此這篇關(guān)于Kafka中使用Avro序列化和反序列化詳解的文章就介紹到這了,更多相關(guān)Avro序列化和反序列化內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot實現(xiàn)qq郵箱驗證碼注冊和登錄驗證功能
這篇文章主要給大家介紹了關(guān)于Spring Boot實現(xiàn)qq郵箱驗證碼注冊和登錄驗證功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12Spring?Boot使用線程池處理上萬條數(shù)據(jù)插入功能
這篇文章主要介紹了Spring?Boot使用線程池處理上萬條數(shù)據(jù)插入功能,使用步驟是先創(chuàng)建一個線程池的配置,讓Spring Boot加載,用來定義如何創(chuàng)建一個ThreadPoolTaskExecutor,本文通過實例代碼給大家介紹的非常詳細,需要的朋友參考下吧2022-08-08IDEA(2022.2)搭建Servlet基本框架超詳細步驟
這篇文章主要給大家介紹了關(guān)于IDEA(2022.2)搭建Servlet基本框架超詳細步驟,Servlet容器負責(zé)Servlet和客戶的通信以及調(diào)用Servlet的方法,Servlet和客戶的通信采用"請求/響應(yīng)"的模式,需要的朋友可以參考下2023-10-10Java并發(fā)編程中的CyclicBarrier使用解析
這篇文章主要介紹了Java并發(fā)編程中的CyclicBarrier使用解析,CyclicBarrier從字面意思上來看,循環(huán)柵欄,這篇文章就來分析下是到底是如何實現(xiàn)循環(huán)和柵欄的,需要的朋友可以參考下2023-12-12java 利用反射機制,獲取實體所有屬性和方法,并對屬性賦值
這篇文章主要介紹了 java 利用反射機制,獲取實體所有屬性和方法,并對屬性賦值的相關(guān)資料,需要的朋友可以參考下2017-01-01Java?生成透明圖片的設(shè)置實現(xiàn)demo
這篇文章主要為大家介紹了Java?生成透明圖片的設(shè)置實現(xiàn)demo,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-02-02