Kafka是什么及如何使用SpringBoot對(duì)接Kafka(最新推薦)
繼上一次教大家手把手安裝kafka后,今天我們直接來(lái)到入門實(shí)操教程,也就是使用SpringBoot該怎么對(duì)接和使用kafka。當(dāng)然,在一開始我們也會(huì)比較細(xì)致的介紹一下kafka本身。那么話不多說,馬上開始今天的學(xué)習(xí)吧
一、Kafka與流處理
我們先來(lái)看看比較正式的介紹:Kafka是一種流處理平臺(tái),由LinkedIn公司創(chuàng)建,現(xiàn)在是Apache下的開源項(xiàng)目。Kafka通過發(fā)布/訂閱機(jī)制實(shí)現(xiàn)消息的異步傳輸和處理。它具有高吞吐量、低延遲、可伸縮性和可靠性等優(yōu)點(diǎn),使其成為了流處理和實(shí)時(shí)數(shù)據(jù)管道的首選解決方案
介紹其實(shí)是比較清晰的,如果你是第一次接觸“流處理”概念,我們也可以做一點(diǎn)解釋,流處理指的是對(duì)連續(xù)、實(shí)時(shí)產(chǎn)生的數(shù)據(jù)流進(jìn)行實(shí)時(shí)處理、計(jì)算和分析的過程。
假設(shè)你正在玩一款在線游戲,其他玩家的動(dòng)作和游戲事件會(huì)實(shí)時(shí)地傳到服務(wù)器上。這些事件就形成了一條數(shù)據(jù)流。在流處理中,我們會(huì)對(duì)這條數(shù)據(jù)流進(jìn)行實(shí)時(shí)處理,例如計(jì)算每個(gè)玩家的分?jǐn)?shù)、監(jiān)控游戲區(qū)域內(nèi)的異常情況、統(tǒng)計(jì)玩家在線時(shí)長(zhǎng)等等。這樣,游戲管理員就可以實(shí)時(shí)地監(jiān)控和管理游戲,而不需要等到游戲結(jié)束才進(jìn)行操作。
類似的,流處理還可以應(yīng)用在其他實(shí)時(shí)性要求比較高的場(chǎng)景中,例如金融交易、物聯(lián)網(wǎng)、實(shí)時(shí)監(jiān)測(cè)等。通過對(duì)數(shù)據(jù)流進(jìn)行實(shí)時(shí)處理,我們可以更加精準(zhǔn)地掌握數(shù)據(jù)變化的情況,并及時(shí)做出反應(yīng)和調(diào)整,
二、Spring Boot與Kafka的整合Demo
1. 新建springboot工程
如果你沒有現(xiàn)成的Spring boot項(xiàng)目,那么我們可以使用IDEA自帶的Spring Initializr 來(lái)創(chuàng)建一個(gè)spring-boot的項(xiàng)目
此時(shí)我們可以直接選擇使用Apache Kafka,另外項(xiàng)目還可以加個(gè)Spring Web準(zhǔn)備讓前臺(tái)調(diào)用
2. 添加Kafka依賴
如果你不是像上述一樣新建的項(xiàng)目,那你也可以選擇在已有的Spring Boot應(yīng)用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依賴:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.11</version> </dependency>
3. 配置Kafka
在application.properties文件中添加以下配置:
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=test_group
這里我們指定了Kafka服務(wù)器的地址和端口,并配置了消費(fèi)者組的ID,關(guān)于消費(fèi)者組的概念,其實(shí)就是某一些消費(fèi)者具備相同的功能,因此會(huì)把他們?cè)O(shè)為同一個(gè)消費(fèi)者組,這樣他們就不會(huì)重復(fù)消費(fèi)同一條消息了。更具體地原理,我們會(huì)在之后地篇章中介紹。
4. 創(chuàng)建Kafka生產(chǎn)者
在Kafka中,生產(chǎn)者是發(fā)送消息的應(yīng)用程序或服務(wù)。在Spring Boot中,我們可以使用KafkaTemplate類來(lái)創(chuàng)建Kafka生產(chǎn)者
package com.zhanfu.kafkademo.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { kafkaTemplate.send("test_topic", message); } }
這里我們使用@Autowired注解來(lái)自動(dòng)注入KafkaTemplate,并使用send方法將消息發(fā)送到名為“test_topic”的Kafka主題中。
5. 創(chuàng)建Kafka消費(fèi)者
在Kafka中,消費(fèi)者是接收并處理訂閱主題消息的應(yīng)用程序或服務(wù)。在Spring Boot中,我們可以使用@KafkaListener注解來(lái)創(chuàng)建Kafka消費(fèi)者。
package com.zhanfu.kafkademo.listener; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaLis { @KafkaListener(topics = "test_topic", groupId = "test_group") public void receiveMessage(String message) { System.out.println("Received message: " + message); } }
6. 應(yīng)用程序入口
現(xiàn)在我們已經(jīng)完成了Spring Boot和Kafka的整合。我們可以啟動(dòng)Spring Boot應(yīng)用程序,然后發(fā)送消息并消費(fèi)它,以測(cè)試我們的應(yīng)用程序是否正確地與Kafka集成。
package com.zhanfu.kafkademo.controller; import com.zhanfu.kafkademo.service.KafkaService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; @RestController public class MessageController { @Autowired private KafkaService kafkaService; @GetMapping("/send/{message}") public String sendMessage(@PathVariable String message) { kafkaService.sendMessage(message); return "Message sent successfully"; } }
在這個(gè)例子中,我們使用@Autowired注解來(lái)自動(dòng)注入KafkaProducer,并通過發(fā)送消息的方法來(lái)調(diào)用sendMessage方法。最終項(xiàng)目整體框架如圖:
三、啟動(dòng)與驗(yàn)證
首先自然是啟動(dòng) Kafka ,怎么啟動(dòng)可參考 《上手第一關(guān),手把手教你安裝kafka與可視化工具kafka-eagle》,然后是啟動(dòng)我們的Spring Boot項(xiàng)目
然后在瀏覽器中輸入
http://127.0.0.1:8080/send/hello
最后檢查我們的項(xiàng)目日志:
可以看到,整個(gè)發(fā)送和接收的流程都走通了
四、KafkaTemplate 介紹
不難看出,在Springboot中,使用kafka的關(guān)鍵在于 KafkaTemplate
, 它是 Spring 提供的 Kafka 生產(chǎn)者模版,用于向 Kafka 集群發(fā)送消息。并且把 Kafka 的生產(chǎn)者客戶端封裝成了一個(gè) Spring Bean,提供更加方便易用的 API。
它有三個(gè)主要屬性:
producerFactory
:生產(chǎn)者工廠類,用于創(chuàng)建 KafkaProducer 實(shí)例。defaultTopic
:默認(rèn)主題名稱,如果在發(fā)送消息時(shí)沒有指定主題名稱,則使用該默認(rèn)主題。messageConverter
:消息轉(zhuǎn)換器,用于將消息對(duì)象轉(zhuǎn)換為 Kafka ProducerRecord
它的主要方法:
- send(ProducerRecord<K,V> record):向指定的 Kafka 主題發(fā)送一條消息。ProducerRecord 包含了主題名稱、分區(qū)編號(hào)、Key 和 Value 等信息。
- send(String topic, V data):向指定的 Kafka 主題發(fā)送一條消息。
- send(String topic, K key, V data):向指定的 Kafka 主題發(fā)送一條消息,并指定消息的 Key。
- execute(ProducerCallback<K,V> callback):使用回調(diào)方式發(fā)送消息,可以自定義消息的創(chuàng)建過程和錯(cuò)誤處理過程。
- inTransaction():?jiǎn)⒂檬聞?wù),多個(gè) send 方法調(diào)用將被包裝在一個(gè)事務(wù)中,保證 Kafka 事務(wù)的原子性。
除了上述方法外,KafkaTemplate 還提供了其他方法,如 sendDefault()、sendOffsetsToTransaction() 等,可以根據(jù)實(shí)際需要進(jìn)行選擇和使用。
需要注意的是,在使用 KafkaTemplate 發(fā)送消息時(shí)應(yīng)該注意消息的序列化方式、主題和分區(qū)的選擇以及錯(cuò)誤處理等問題,以保證消息的可靠性和正確性。
當(dāng)然,很多同學(xué)可能還注意到一個(gè)細(xì)節(jié),我們?cè)谏厦娴腄emo中,我們直接將其 @Autowired進(jìn)我們的代碼中,這是怎么做到的呢?換句話說,這個(gè) KafkaTemplate
為什么自己就會(huì)被spring 容器管理的呢?其實(shí)這得益于SpringBoot中對(duì)Kafka有了很多自動(dòng)配置的內(nèi)容。如下:
如上圖,相信對(duì)Spring Boot熟悉的同學(xué)看到 ConditionalOnClass
、 ConditionalOnMissingBean
應(yīng)該就明白了。其實(shí)Spring Boot 早就貼心的為我們預(yù)留了這些自動(dòng)配置,只要我們引入了 spring-kafka 包,使得項(xiàng)目中出現(xiàn)了 KafkaTemplate 類,那么它就能被自動(dòng)配置并存入Spring 容器內(nèi)
總結(jié)
今天我們通過一個(gè)Demo講解了在SpringBoot中如何對(duì)接Kafka,也介紹了下關(guān)鍵類 KafkaTemplate ,得益于Spring Boot 的自動(dòng)配置,開發(fā)者要做的配置內(nèi)容其實(shí)并不多,使用也主要是依賴其提供的API,相對(duì)簡(jiǎn)單,相信大家很容易也都學(xué)會(huì)了,那么在后面的過程中,我們將繼續(xù)學(xué)習(xí)其使用,并且會(huì)著重講解 Kafka 的原理與結(jié)構(gòu)
到此這篇關(guān)于Kafka是什么,以及如何使用SpringBoot對(duì)接Kafka的文章就介紹到這了,更多相關(guān)SpringBoot對(duì)接Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring MVC 4.1.3 + MyBatis零基礎(chǔ)搭建Web開發(fā)框架(注解模式)
本篇文章主要介紹了Spring MVC 4.1.3 + MyBatis零基礎(chǔ)搭建Web開發(fā)框架(注解模式),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-03-03C++/java 繼承類的多態(tài)詳解及實(shí)例代碼
這篇文章主要介紹了C++/java 繼承類的多態(tài)詳解及實(shí)例代碼的相關(guān)資料,需要的朋友可以參考下2017-02-02SpringBoot的@GetMapping路徑匹配規(guī)則、國(guó)際化詳細(xì)教程
這篇文章主要介紹了SpringBoot的@GetMapping路徑匹配規(guī)則、國(guó)際化,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2023-11-11詳解如何在Spring Boot中實(shí)現(xiàn)容錯(cuò)機(jī)制
容錯(cuò)機(jī)制是構(gòu)建健壯和可靠的應(yīng)用程序的重要組成部分,它可以幫助應(yīng)用程序在面對(duì)異?;蚬收蠒r(shí)保持穩(wěn)定運(yùn)行,Spring Boot提供了多種機(jī)制來(lái)實(shí)現(xiàn)容錯(cuò),包括異常處理、斷路器、重試和降級(jí)等,本文將介紹如何在Spring Boot中實(shí)現(xiàn)這些容錯(cuò)機(jī)制,需要的朋友可以參考下2023-10-10