Spring?Boot整合Kafka教程詳解
正文
本教程將介紹如何在 Spring Boot 應(yīng)用程序中使用 Kafka。Kafka 是一個(gè)分布式的發(fā)布-訂閱消息系統(tǒng),它可以處理大量數(shù)據(jù)并提供高吞吐量。
在本教程中,我們將使用 Spring Boot 2.5.4 和 Kafka 2.8.0。
步驟一:添加依賴(lài)項(xiàng)
在 pom.xml 中添加以下依賴(lài)項(xiàng):
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.0</version> </dependency>
步驟二:配置 Kafka
在 application.yml
文件中添加以下配置:
sping: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer
這里我們配置了 Kafka 的服務(wù)地址為 localhost:9092
,配置了一個(gè)消費(fèi)者組 ID 為 my-group
,并設(shè)置了一個(gè)最早的偏移量來(lái)讀取消息。在生產(chǎn)者方面,我們配置了消息序列化程序?yàn)?StringSerializer
。
步驟三:創(chuàng)建一個(gè)生產(chǎn)者
現(xiàn)在,我們將創(chuàng)建一個(gè) Kafka 生產(chǎn)者,用于發(fā)送消息到 Kafka 服務(wù)器。在這里,我們將創(chuàng)建一個(gè) RESTful 端點(diǎn),用于接收 POST 請(qǐng)求并將消息發(fā)送到 Kafka。
首先,我們將創(chuàng)建一個(gè) KafkaProducerConfig
類(lèi),用于配置 Kafka 生產(chǎn)者:
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
在上面的代碼中,我們使用 @Configuration
注解將 KafkaProducerConfig
類(lèi)聲明為配置類(lèi)。然后,我們使用 @Value
注解注入配置文件中的 bootstrap-servers
屬性。
接下來(lái),我們創(chuàng)建了一個(gè) producerConfigs
方法,用于設(shè)置 Kafka 生產(chǎn)者的配置。在這里,我們?cè)O(shè)置了 BOOTSTRAP_SERVERS_CONFIG
、KEY_SERIALIZER_CLASS_CONFIG
和 VALUE_SERIALIZER_CLASS_CONFIG
三個(gè)屬性。
然后,我們創(chuàng)建了一個(gè) producerFactory
方法,用于創(chuàng)建 Kafka 生產(chǎn)者工廠。在這里,我們使用了 DefaultKafkaProducerFactory
類(lèi),并傳遞了我們的配置。
最后,我們創(chuàng)建了一個(gè) kafkaTemplate
方法,用于創(chuàng)建 KafkaTemplate
實(shí)例。在這里,我們使用了剛剛創(chuàng)建的生產(chǎn)者工廠作為參數(shù),然后返回 KafkaTemplate
實(shí)例。
接下來(lái),我們將創(chuàng)建一個(gè) RESTful 端點(diǎn),用于接收 POST 請(qǐng)求并將消息發(fā)送到 Kafka。在這里,我們將使用 @RestController
注解創(chuàng)建一個(gè) RESTful 控制器:
@RestController public class KafkaController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @PostMapping("/send") public void sendMessage(@RequestBody String message) { kafkaTemplate.send("my-topic", message); } }
在上面的代碼中,我們使用 @Autowired
注解將 KafkaTemplate
實(shí)例注入到 KafkaController
類(lèi)中。然后,我們創(chuàng)建了一個(gè) sendMessage
方法,用于發(fā)送消息到 Kafka。
在這里,我們使用 kafkaTemplate.send
方法發(fā)送消息到 my-topic
主題。send 方法返回一個(gè) ListenableFuture
對(duì)象,用于異步處理結(jié)果。
步驟四:創(chuàng)建一個(gè)消費(fèi)者
現(xiàn)在,我們將創(chuàng)建一個(gè) Kafka 消費(fèi)者,用于從 Kafka 服務(wù)器接收消息。在這里,我們將創(chuàng)建一個(gè)消費(fèi)者組,并將其配置為從 my-topic
主題讀取消息。
首先,我們將創(chuàng)建一個(gè) KafkaConsumerConfig
類(lèi),用于配置 Kafka 消費(fèi)者:
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
在上面的代碼中,我們使用 @Configuration
注解將 KafkaConsumerConfig
類(lèi)聲明為配置類(lèi),并使用 @EnableKafka
注解啟用 Kafka。
然后,我們使用 @Value
注解注入配置文件中的 bootstrap-servers
和 consumer.group-id
屬性。
接下來(lái),我們創(chuàng)建了一個(gè) consumerConfigs
方法,用于設(shè)置 Kafka 消費(fèi)者的配置。在這里,我們?cè)O(shè)置了 BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIG
、AUTO_OFFSET_RESET_CONFIG
、KEY_DESERIALIZER_CLASS_CONFIG
和 VALUE_DESERIALIZER_CLASS_CONFIG
五個(gè)屬性。
然后,我們創(chuàng)建了一個(gè) consumerFactory
方法,用于創(chuàng)建 Kafka 消費(fèi)者工廠。在這里,我們使用了 DefaultKafkaConsumerFactory
類(lèi),并傳遞了我們的配置。
最后,我們創(chuàng)建了一個(gè) kafkaListenerContainerFactory
方法,用于創(chuàng)建一個(gè) ConcurrentKafkaListenerContainerFactory
實(shí)例。在這里,我們將消費(fèi)者工廠注入到 kafkaListenerContainerFactory
實(shí)例中。
接下來(lái),我們將創(chuàng)建一個(gè) Kafka 消費(fèi)者類(lèi) KafkaConsumer
,用于監(jiān)聽(tīng) my-topic
主題并接收消息:
@Service public class KafkaConsumer { @KafkaListener(topics = "my-topic", groupId = "my-group-id") public void consume(String message) { System.out.println("Received message: " + message); } }
在上面的代碼中,我們使用 @KafkaListener
注解聲明了一個(gè)消費(fèi)者方法,用于接收從 my-topic
主題中讀取的消息。在這里,我們將消費(fèi)者組 ID 設(shè)置為 my-group-id
。
現(xiàn)在,我們已經(jīng)完成了 Kafka 生產(chǎn)者和消費(fèi)者的設(shè)置。我們可以使用 mvn spring-boot:run
命令啟動(dòng)應(yīng)用程序,并使用 curl 命令發(fā)送 POST 請(qǐng)求到 http://localhost:8080/send
端點(diǎn),以將消息發(fā)送到 Kafka。然后,我們可以在控制臺(tái)上查看消費(fèi)者接收到的消息。
這就是使用 Spring Boot 和 Kafka 的基本設(shè)置。我們可以根據(jù)需要進(jìn)行更改和擴(kuò)展,以滿(mǎn)足特定的需求。
以上就是Spring Boot整合Kafka教程詳解的詳細(xì)內(nèi)容,更多關(guān)于Spring Boot整合Kafka的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Spring?Boot?中使用@KafkaListener并發(fā)批量接收消息的完整代碼
- 基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解
- SpringBoot如何獲取Kafka的Topic列表
- SpringBoot整合kafka遇到的版本不對(duì)應(yīng)問(wèn)題及解決
- SpringBoot+Nacos+Kafka微服務(wù)流編排的簡(jiǎn)單實(shí)現(xiàn)
- SpringBoot集成Kafka的步驟
- Spring Boot集群管理工具KafkaAdminClient使用方法解析
- Springboot集成Kafka實(shí)現(xiàn)producer和consumer的示例代碼
- Spring?Boot?基于?SCRAM?認(rèn)證集成?Kafka?的過(guò)程詳解
相關(guān)文章
Spring?Cloud?Eureka基礎(chǔ)應(yīng)用及原理
這篇文章主要介紹了Spring?Cloud?Eureka基礎(chǔ)應(yīng)用,Eureka?Client中內(nèi)置一個(gè)負(fù)載均衡器,用來(lái)進(jìn)行基本的負(fù)載均衡,下面我們將通過(guò)搭建一個(gè)簡(jiǎn)單的Eureka例子來(lái)了解Eureka的運(yùn)作原理,感興趣的朋友一起看看吧2022-05-05Mybatis不啟動(dòng)項(xiàng)目直接測(cè)試Mapper的實(shí)現(xiàn)方法
在項(xiàng)目開(kāi)發(fā)中,測(cè)試單個(gè)Mybatis Mapper方法通常需要啟動(dòng)整個(gè)SpringBoot項(xiàng)目,消耗大量時(shí)間,本文介紹通過(guò)Main方法和Mybatis配置類(lèi),快速測(cè)試Mapper功能,無(wú)需啟動(dòng)整個(gè)項(xiàng)目,這方法使用AnnotationConfigApplicationContext容器2024-09-09Java使用OpenCV3.2實(shí)現(xiàn)視頻讀取與播放
這篇文章主要為大家詳細(xì)介紹了Java使用OpenCV3.2實(shí)現(xiàn)視頻讀取與播放,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-07-07Spring Cloud Gateway 記錄請(qǐng)求應(yīng)答數(shù)據(jù)日志操作
這篇文章主要介紹了Spring Cloud Gateway 記錄請(qǐng)求應(yīng)答數(shù)據(jù)日志操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12java基于websocket實(shí)現(xiàn)im聊天功能
這篇文章主要為大家介紹了java基于websocket實(shí)現(xiàn)im聊天功能示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11MyBatis通用Mapper中的通用example(排序)詳解
這篇文章主要介紹了MyBatis通用Mapper中的通用example(排序)詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12MapReduce實(shí)現(xiàn)TopN效果示例解析
這篇文章主要為大家介紹了MapReduce實(shí)現(xiàn)TopN效果示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07Java手寫(xiě)線程池之向JDK線程池進(jìn)發(fā)
在前面的文章自己動(dòng)手寫(xiě)乞丐版線程池中,我們寫(xiě)了一個(gè)非常簡(jiǎn)單的線程池實(shí)現(xiàn),這個(gè)只是一個(gè)非常簡(jiǎn)單的實(shí)現(xiàn),在本篇文章當(dāng)中我們將要實(shí)現(xiàn)一個(gè)和JDK內(nèi)部實(shí)現(xiàn)的線程池非常相似的線程池,需要的可以了解一下2022-10-10