Springboot項(xiàng)目消費(fèi)Kafka數(shù)據(jù)的方法
一、引入依賴(lài)
你需要在 pom.xml 中添加 spring-kafka 相關(guān)依賴(lài):
<dependencies> <!-- Spring Boot Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- Spring Boot Starter for Logging (optional but useful for debugging) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <!-- Spring Boot Starter for Testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
二、添加Kafka配置
在 application.yml 或 application.properties 文件中配置 Kafka 連接屬性:
application.yml 示例:
spring: kafka: bootstrap-servers: localhost:9092 # Kafka服務(wù)器地址 consumer: group-id: my-consumer-group # 消費(fèi)者組ID auto-offset-reset: earliest # 消費(fèi)者從頭開(kāi)始讀?。ㄈ绻麤](méi)有已提交的偏移量) key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 設(shè)置key的反序列化器 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 設(shè)置value的反序列化器為字符串 listener: missing-topics-fatal: false # 如果主題不存在,不拋出致命錯(cuò)誤
application.properties 示例:
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.listener.missing-topics-fatal=false spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 設(shè)置key的反序列化器 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 設(shè)置value的反序列化器為字符串
注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 設(shè)置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 設(shè)置value的反序列化器為字符串
以上配置說(shuō)明Kafka生產(chǎn)的數(shù)據(jù)是json字符串,那么消費(fèi)接收的數(shù)據(jù)默認(rèn)也是json字符串,如果接收消息想用對(duì)象接受,需要自定義序列化器,比如以下配置
spring: kafka: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer # 對(duì) Key 使用 StringSerializer value-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer # 對(duì) Value 使用 ErrorHandlingSerializer properties: spring.json.value.default.type: com.example.Order # 默認(rèn)的 JSON 反序列化目標(biāo)類(lèi)型為 Order
三、創(chuàng)建 Kafka 消費(fèi)者
創(chuàng)建一個(gè) Kafka 消費(fèi)者類(lèi)來(lái)處理消息。你可以使用 @KafkaListener 注解來(lái)監(jiān)聽(tīng) Kafka 中的消息
(一)Kafka生產(chǎn)的消息是JSON 字符串
1、方式一
如果消息是 JSON 字符串,你可以使用 StringDeserializer 獲取消息后,再使用 ObjectMapper 將其轉(zhuǎn)換為
Java 對(duì)象(如 Order)。
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.stereotype.Service; @Service @EnableKafka // 啟用 Kafka 消費(fèi)者 public class KafkaConsumer { private final ObjectMapper objectMapper = new ObjectMapper(); // 監(jiān)聽(tīng) Kafka 中的 order-topic 主題 @KafkaListener(topics = "order-topic", groupId = "order-consumer-group") public void consumeOrder(String message) { try { // 將 JSON 字符串反序列化為 Order 對(duì)象 Order order = objectMapper.readValue(message, Order.class); System.out.println("Received order: " + order); } catch (Exception e) { e.printStackTrace(); } } }
說(shuō)明:
@KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
topics 表示監(jiān)聽(tīng)的 Kafka 主題,groupId 表示消費(fèi)者所屬的消費(fèi)者組。
listen(String message): 該方法會(huì)被調(diào)用來(lái)處理收到的每條消息。在此示例中,我們打印出消息內(nèi)容。
2、方式二:需要直接訪問(wèn)消息元數(shù)據(jù)
可以通過(guò) ConsumerRecord 來(lái)接收 Kafka 消息。這種方式適用于需要直接訪問(wèn)消息元數(shù)據(jù)(如
topic、partition、offset)的場(chǎng)景,也適合手動(dòng)管理消息消費(fèi)和偏移量提交的情況。
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumer { // 監(jiān)聽(tīng) Kafka 中的 order-topic 主題 @KafkaListener(topics = "order-topic", groupId = "order-consumer-group") public void consumeOrder(ConsumerRecord<String, String> record) { // 獲取消息的詳細(xì)信息 String key = record.key(); // 獲取消息的 key String value = record.value(); // 獲取消息的 value String topic = record.topic(); // 獲取消息的 topic int partition = record.partition(); // 獲取消息的分區(qū) long offset = record.offset(); // 獲取消息的偏移量 long timestamp = record.timestamp(); // 獲取消息的時(shí)間戳 // 處理消息(這里我們只是打印消息) System.out.println("Consumed record: "); System.out.println("Key: " + key); System.out.println("Value: " + value); System.out.println("Topic: " + topic); System.out.println("Partition: " + partition); System.out.println("Offset: " + offset); System.out.println("Timestamp: " + timestamp); } }
(二)Kafka生產(chǎn)的消息是對(duì)象Order
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumer { // 監(jiān)聽(tīng) Kafka 中的 order-topic 主題 @KafkaListener(topics = "order-topic", groupId = "order-consumer-group") public void consumeOrder(ConsumerRecord<String, Order> record) { // 獲取消息的詳細(xì)信息 String key = record.key(); // 獲取消息的 key Order value = record.value(); // 獲取消息的 value String topic = record.topic(); // 獲取消息的 topic int partition = record.partition(); // 獲取消息的分區(qū) long offset = record.offset(); // 獲取消息的偏移量 long timestamp = record.timestamp(); // 獲取消息的時(shí)間戳 // 處理消息(這里我們只是打印消息) System.out.println("Consumed record: "); System.out.println("Key: " + key); System.out.println("Value: " + value); System.out.println("Topic: " + topic); System.out.println("Partition: " + partition); System.out.println("Offset: " + offset); System.out.println("Timestamp: " + timestamp); } }
四、創(chuàng)建 啟動(dòng)類(lèi)
確保你的 Spring Boot 啟動(dòng)類(lèi)正確配置了 Spring Boot 應(yīng)用程序啟動(dòng)。
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } }
五、配置 Kafka 生產(chǎn)者(可選)
(一)消息類(lèi)型為json串
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.stereotype.Service; import com.fasterxml.jackson.databind.ObjectMapper; @Service @EnableKafka public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; // 發(fā)送的是 String 類(lèi)型消息 private ObjectMapper objectMapper = new ObjectMapper(); // Jackson ObjectMapper 用于序列化 // 發(fā)送訂單到 Kafka public void sendOrder(String topic, Order order) { try { // 將 Order 對(duì)象轉(zhuǎn)換為 JSON 字符串 String orderJson = objectMapper.writeValueAsString(order); // 發(fā)送 JSON 字符串到 Kafka kafkaTemplate.send(topic, orderJson); // 發(fā)送字符串消息 System.out.println("Order JSON sent to Kafka: " + orderJson); } catch (Exception e) { e.printStackTrace(); } } }
(二)消息類(lèi)型為對(duì)象Order
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.stereotype.Service; @Service @EnableKafka public class KafkaProducer { @Autowired private KafkaTemplate<String, Order> kafkaTemplate; // 發(fā)送訂單到 Kafka public void sendOrder(String topic, Order order) { kafkaTemplate.send(topic, order); // 發(fā)送訂單對(duì)象,Spring Kafka 會(huì)自動(dòng)將 Order 轉(zhuǎn)換為 JSON } }
六、啟動(dòng) Kafka 服務(wù)
啟動(dòng) Kafka 服務(wù)
bin/kafka-server-start.sh config/server.properties
七、測(cè)試 Kafka 消費(fèi)者
你可以通過(guò)向 Kafka 發(fā)送消息來(lái)測(cè)試消費(fèi)者是否工作正常。假設(shè)你已經(jīng)在 Kafka 中創(chuàng)建了一個(gè)名為 my-topic 的主題,可以使用 KafkaProducer 來(lái)發(fā)送消息:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaController { @Autowired private KafkaProducer kafkaProducer; @GetMapping("/sendOrder") public String sendOrder() { Order order = new Order(); order.setOrderId(1L); order.setUserId(123L); order.setProduct("Laptop"); order.setQuantity(2); order.setStatus("Created"); kafkaProducer.sendOrder("order-topic", order); return "Order sent!"; } }
當(dāng)你訪問(wèn) /sendOrder端點(diǎn)時(shí),KafkaProducer 會(huì)將消息發(fā)送到 Kafka,KafkaConsumer 會(huì)接收到這條消息并打印出來(lái)。
九、測(cè)試和調(diào)試
你可以通過(guò)查看 Kafka 消費(fèi)者日志,確保消息已經(jīng)被成功消費(fèi)。你還可以使用 KafkaTemplate 發(fā)送消息,并確保 Kafka 生產(chǎn)者和消費(fèi)者之間的連接正常。
十、 結(jié)語(yǔ)
至此,你已經(jīng)在 Spring Boot 中成功配置并實(shí)現(xiàn)了 Kafka 消費(fèi)者和生產(chǎn)者。你可以根據(jù)需要擴(kuò)展功能,例如處理更復(fù)雜的消息類(lèi)型、批量消費(fèi)等。
到此這篇關(guān)于Springboot項(xiàng)目如何消費(fèi)Kafka數(shù)據(jù)的文章就介紹到這了,更多相關(guān)Springboot消費(fèi)Kafka數(shù)據(jù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot集成Kafka的實(shí)現(xiàn)示例
- SpringBoot整合Kafka完成生產(chǎn)消費(fèi)的方案
- SpringBoot 整合 Avro 與 Kafka的詳細(xì)過(guò)程
- springboot使用kafka推送數(shù)據(jù)到服務(wù)端的操作方法帶認(rèn)證
- SpringBoot使用Kafka來(lái)優(yōu)化接口請(qǐng)求的并發(fā)方式
- 如何使用SpringBoot集成Kafka實(shí)現(xiàn)用戶(hù)數(shù)據(jù)變更后發(fā)送消息
- Spring Boot 集成 Kafka的詳細(xì)步驟
- SpringKafka錯(cuò)誤處理(重試機(jī)制與死信隊(duì)列)
相關(guān)文章
mybatis-flex與springBoot整合的實(shí)現(xiàn)示例
Mybatis-flex提供了簡(jiǎn)單易用的API,開(kāi)發(fā)者只需要簡(jiǎn)單的配置即可使用,本文主要介紹了mybatis-flex與springBoot整合,具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01談?wù)凷pring Boot 數(shù)據(jù)源加載及其多數(shù)據(jù)源簡(jiǎn)單實(shí)現(xiàn)(小結(jié))
這篇文章主要介紹了談?wù)凷pring Boot 數(shù)據(jù)源加載及其多數(shù)據(jù)源簡(jiǎn)單實(shí)現(xiàn),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-04-04Mybatis步驟分解實(shí)現(xiàn)一個(gè)增刪改查程序
MybatisPlus是國(guó)產(chǎn)的第三方插件, 它封裝了許多常用的CURDapi,免去了我們寫(xiě)mapper.xml的重復(fù)勞動(dòng)。本文將整合MybatisPlus實(shí)現(xiàn)增刪改查功能,感興趣的可以了解一下2022-05-05Java虛擬機(jī)執(zhí)行引擎知識(shí)總結(jié)
這篇文章主要介紹了有關(guān)Java虛擬機(jī)執(zhí)行引擎的知識(shí),文中實(shí)例簡(jiǎn)單易懂,方便大家更好的學(xué)習(xí),有興趣的朋友可以了解下2020-06-06簡(jiǎn)單了解Spring中BeanFactory與FactoryBean的區(qū)別
這篇文章主要介紹了簡(jiǎn)單了解Spring中BeanFactory與FactoryBean的區(qū)別,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12javaSystem.out.println()輸出byte[]、char[]異常的問(wèn)題詳析
這篇文章主要給大家介紹了關(guān)于javaSystem.out.println()輸出byte[]、char[]異常問(wèn)題的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起看看啊2019-01-01解決打開(kāi)的idea項(xiàng)目maven不生效問(wèn)題
這篇文章主要給大家介紹了關(guān)于如何解決打開(kāi)的idea項(xiàng)目maven不生效問(wèn)題,最近在配置maven時(shí),發(fā)現(xiàn)無(wú)論配置幾遍,IDEA中的maven配置總會(huì)還原成默認(rèn)的,所以這里給大家分享下解決辦法,需要的朋友可以參考下2023-07-07