Kafka是什么及如何使用SpringBoot對接Kafka(最新推薦)
繼上一次教大家手把手安裝kafka后,今天我們直接來到入門實操教程,也就是使用SpringBoot該怎么對接和使用kafka。當然,在一開始我們也會比較細致的介紹一下kafka本身。那么話不多說,馬上開始今天的學習吧
一、Kafka與流處理
我們先來看看比較正式的介紹:Kafka是一種流處理平臺,由LinkedIn公司創(chuàng)建,現(xiàn)在是Apache下的開源項目。Kafka通過發(fā)布/訂閱機制實現(xiàn)消息的異步傳輸和處理。它具有高吞吐量、低延遲、可伸縮性和可靠性等優(yōu)點,使其成為了流處理和實時數(shù)據(jù)管道的首選解決方案
介紹其實是比較清晰的,如果你是第一次接觸“流處理”概念,我們也可以做一點解釋,流處理指的是對連續(xù)、實時產(chǎn)生的數(shù)據(jù)流進行實時處理、計算和分析的過程。
假設你正在玩一款在線游戲,其他玩家的動作和游戲事件會實時地傳到服務器上。這些事件就形成了一條數(shù)據(jù)流。在流處理中,我們會對這條數(shù)據(jù)流進行實時處理,例如計算每個玩家的分數(shù)、監(jiān)控游戲區(qū)域內(nèi)的異常情況、統(tǒng)計玩家在線時長等等。這樣,游戲管理員就可以實時地監(jiān)控和管理游戲,而不需要等到游戲結(jié)束才進行操作。
類似的,流處理還可以應用在其他實時性要求比較高的場景中,例如金融交易、物聯(lián)網(wǎng)、實時監(jiān)測等。通過對數(shù)據(jù)流進行實時處理,我們可以更加精準地掌握數(shù)據(jù)變化的情況,并及時做出反應和調(diào)整,
二、Spring Boot與Kafka的整合Demo
1. 新建springboot工程
如果你沒有現(xiàn)成的Spring boot項目,那么我們可以使用IDEA自帶的Spring Initializr 來創(chuàng)建一個spring-boot的項目
此時我們可以直接選擇使用Apache Kafka,另外項目還可以加個Spring Web準備讓前臺調(diào)用
2. 添加Kafka依賴
如果你不是像上述一樣新建的項目,那你也可以選擇在已有的Spring Boot應用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依賴:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.11</version> </dependency>
3. 配置Kafka
在application.properties文件中添加以下配置:
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=test_group
這里我們指定了Kafka服務器的地址和端口,并配置了消費者組的ID,關于消費者組的概念,其實就是某一些消費者具備相同的功能,因此會把他們設為同一個消費者組,這樣他們就不會重復消費同一條消息了。更具體地原理,我們會在之后地篇章中介紹。
4. 創(chuàng)建Kafka生產(chǎn)者
在Kafka中,生產(chǎn)者是發(fā)送消息的應用程序或服務。在Spring Boot中,我們可以使用KafkaTemplate類來創(chuàng)建Kafka生產(chǎn)者
package com.zhanfu.kafkademo.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { kafkaTemplate.send("test_topic", message); } }
這里我們使用@Autowired注解來自動注入KafkaTemplate,并使用send方法將消息發(fā)送到名為“test_topic”的Kafka主題中。
5. 創(chuàng)建Kafka消費者
在Kafka中,消費者是接收并處理訂閱主題消息的應用程序或服務。在Spring Boot中,我們可以使用@KafkaListener注解來創(chuàng)建Kafka消費者。
package com.zhanfu.kafkademo.listener; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaLis { @KafkaListener(topics = "test_topic", groupId = "test_group") public void receiveMessage(String message) { System.out.println("Received message: " + message); } }
6. 應用程序入口
現(xiàn)在我們已經(jīng)完成了Spring Boot和Kafka的整合。我們可以啟動Spring Boot應用程序,然后發(fā)送消息并消費它,以測試我們的應用程序是否正確地與Kafka集成。
package com.zhanfu.kafkademo.controller; import com.zhanfu.kafkademo.service.KafkaService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; @RestController public class MessageController { @Autowired private KafkaService kafkaService; @GetMapping("/send/{message}") public String sendMessage(@PathVariable String message) { kafkaService.sendMessage(message); return "Message sent successfully"; } }
在這個例子中,我們使用@Autowired注解來自動注入KafkaProducer,并通過發(fā)送消息的方法來調(diào)用sendMessage方法。最終項目整體框架如圖:
三、啟動與驗證
首先自然是啟動 Kafka ,怎么啟動可參考 《上手第一關,手把手教你安裝kafka與可視化工具kafka-eagle》,然后是啟動我們的Spring Boot項目
然后在瀏覽器中輸入
http://127.0.0.1:8080/send/hello
最后檢查我們的項目日志:
可以看到,整個發(fā)送和接收的流程都走通了
四、KafkaTemplate 介紹
不難看出,在Springboot中,使用kafka的關鍵在于 KafkaTemplate
, 它是 Spring 提供的 Kafka 生產(chǎn)者模版,用于向 Kafka 集群發(fā)送消息。并且把 Kafka 的生產(chǎn)者客戶端封裝成了一個 Spring Bean,提供更加方便易用的 API。
它有三個主要屬性:
producerFactory
:生產(chǎn)者工廠類,用于創(chuàng)建 KafkaProducer 實例。defaultTopic
:默認主題名稱,如果在發(fā)送消息時沒有指定主題名稱,則使用該默認主題。messageConverter
:消息轉(zhuǎn)換器,用于將消息對象轉(zhuǎn)換為 Kafka ProducerRecord
它的主要方法:
- send(ProducerRecord<K,V> record):向指定的 Kafka 主題發(fā)送一條消息。ProducerRecord 包含了主題名稱、分區(qū)編號、Key 和 Value 等信息。
- send(String topic, V data):向指定的 Kafka 主題發(fā)送一條消息。
- send(String topic, K key, V data):向指定的 Kafka 主題發(fā)送一條消息,并指定消息的 Key。
- execute(ProducerCallback<K,V> callback):使用回調(diào)方式發(fā)送消息,可以自定義消息的創(chuàng)建過程和錯誤處理過程。
- inTransaction():啟用事務,多個 send 方法調(diào)用將被包裝在一個事務中,保證 Kafka 事務的原子性。
除了上述方法外,KafkaTemplate 還提供了其他方法,如 sendDefault()、sendOffsetsToTransaction() 等,可以根據(jù)實際需要進行選擇和使用。
需要注意的是,在使用 KafkaTemplate 發(fā)送消息時應該注意消息的序列化方式、主題和分區(qū)的選擇以及錯誤處理等問題,以保證消息的可靠性和正確性。
當然,很多同學可能還注意到一個細節(jié),我們在上面的Demo中,我們直接將其 @Autowired進我們的代碼中,這是怎么做到的呢?換句話說,這個 KafkaTemplate
為什么自己就會被spring 容器管理的呢?其實這得益于SpringBoot中對Kafka有了很多自動配置的內(nèi)容。如下:
如上圖,相信對Spring Boot熟悉的同學看到 ConditionalOnClass
、 ConditionalOnMissingBean
應該就明白了。其實Spring Boot 早就貼心的為我們預留了這些自動配置,只要我們引入了 spring-kafka 包,使得項目中出現(xiàn)了 KafkaTemplate 類,那么它就能被自動配置并存入Spring 容器內(nèi)
總結(jié)
今天我們通過一個Demo講解了在SpringBoot中如何對接Kafka,也介紹了下關鍵類 KafkaTemplate ,得益于Spring Boot 的自動配置,開發(fā)者要做的配置內(nèi)容其實并不多,使用也主要是依賴其提供的API,相對簡單,相信大家很容易也都學會了,那么在后面的過程中,我們將繼續(xù)學習其使用,并且會著重講解 Kafka 的原理與結(jié)構(gòu)
到此這篇關于Kafka是什么,以及如何使用SpringBoot對接Kafka的文章就介紹到這了,更多相關SpringBoot對接Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring MVC 4.1.3 + MyBatis零基礎搭建Web開發(fā)框架(注解模式)
本篇文章主要介紹了Spring MVC 4.1.3 + MyBatis零基礎搭建Web開發(fā)框架(注解模式),具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-03-03SpringBoot的@GetMapping路徑匹配規(guī)則、國際化詳細教程
這篇文章主要介紹了SpringBoot的@GetMapping路徑匹配規(guī)則、國際化,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2023-11-11