Spring Boot集成Apache Kafka的實戰(zhàn)指南
Apache Kafka 是一個分布式流處理平臺,廣泛用于構建實時數(shù)據(jù)管道、日志聚合系統(tǒng)和事件溯源架構。Spring Boot 提供了對 Kafka 的良好集成支持,使得開發(fā)者可以非常便捷地在項目中使用 Kafka。
本文將手把手教你如何在 Spring Boot 項目中集成 Kafka,包括生產(chǎn)者(Producer)和消費者(Consumer)的實現(xiàn),并提供完整的代碼示例。
開發(fā)環(huán)境準備
Java 17+
Maven 或 Gradle
Spring Boot 3.x
Apache Kafka 3.0+(本地或遠程)
IDE(如 IntelliJ IDEA、VS Code)
創(chuàng)建 Spring Boot 項目
你可以通過 Spring Initializr 創(chuàng)建一個新的 Spring Boot 項目,選擇以下依賴:
- Spring Web
- Spring for Apache Kafka
或者手動添加 pom.xml 中的依賴:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Spring Boot 會自動管理版本兼容性,無需手動指定版本號。
配置 Kafka 連接信息
在 application.yml 或 application.properties 文件中配置 Kafka 相關參數(shù):
application.yml 示例:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
編寫 Kafka 生產(chǎn)者(Producer)
創(chuàng)建一個服務類用于發(fā)送消息到 Kafka 主題。
KafkaProducer.java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducer { private final KafkaTemplate<String, String> kafkaTemplate; public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); System.out.println("Sent message: " + message); } }
編寫 Kafka 消費者(Consumer)
使用 @KafkaListener 注解監(jiān)聽特定主題的消息。
KafkaConsumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumer { @KafkaListener(topics = "test-topic", groupId = "my-group") public void listen(ConsumerRecord<String, String> record) { System.out.printf("Received message: topic - %s, partition - %d, offset - %d, key - %s, value - %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } }
添加 REST 接口用于測試發(fā)送消息
為了方便測試,我們可以創(chuàng)建一個簡單的 REST 控制器來觸發(fā)消息發(fā)送。
KafkaController.java
import org.springframework.web.bind.annotation.*; import org.springframework.beans.factory.annotation.Autowired; @RestController @RequestMapping("/kafka") public class KafkaController { @Autowired private KafkaProducer kafkaProducer; @PostMapping("/send") public String sendMessage(@RequestParam String msg) { kafkaProducer.sendMessage("test-topic", msg); return "Message sent: " + msg; } }
啟動 Kafka 環(huán)境(可選)
如果你還沒有運行 Kafka,可以按照以下步驟快速啟動:
啟動 Zookeeper(Kafka 依賴)
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動 Kafka 服務
bin/kafka-server-start.sh config/server.properties
創(chuàng)建測試 Topic
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
測試接口
啟動 Spring Boot 應用后,訪問如下接口發(fā)送消息:
POST http://localhost:8080/kafka/send?msg=HelloKafka
觀察控制臺輸出,確認是否收到類似以下內容:
Received message: topic - test-topic, partition - 0, offset - 5, key - null, value - HelloKafka
擴展功能建議
使用 JSON 格式傳輸對象(自定義序列化/反序列化)
多消費者組配置與負載均衡
異常處理與重試機制(@DltHandler, SeekToCurrentErrorHandler)
Kafka Streams 實現(xiàn)實時流處理邏輯
配置 SSL、SASL 安全認證
結合 Spring Cloud Stream 構建云原生事件驅動架構
到此這篇關于Spring Boot集成Apache Kafka的實戰(zhàn)指南的文章就介紹到這了,更多相關SpringBoot集成Apache Kafka內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
一次由Lombok的@AllArgsConstructor注解引發(fā)的錯誤及解決
這篇文章主要介紹了一次由Lombok的@AllArgsConstructor注解引發(fā)的錯誤及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09