SpringBoot3集成Kafka的方法詳解
一、簡介
Kafka是一個(gè)開源的分布式事件流平臺(tái),常被用于高性能數(shù)據(jù)管道、流分析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用,基于Zookeeper協(xié)調(diào)的處理平臺(tái),也是一種消息系統(tǒng),具有更好的吞吐量、內(nèi)置分區(qū)、復(fù)制和容錯(cuò),這使得它成為大規(guī)模消息處理應(yīng)用程序的一個(gè)很好的解決方案;
二、環(huán)境搭建
1、Kafka部署
1、下載安裝包:kafka_2.13-3.5.0.tgz
2、配置環(huán)境變量
open -e ~/.bash_profile
export KAFKA_HOME=/本地路徑/kafka3.5
export PATH=$PATH:$KAFKA_HOME/bin
source ~/.bash_profile
3、該目錄【kafka3.5/bin】啟動(dòng)zookeeper
zookeeper-server-start.sh ../config/zookeeper.properties
4、該目錄【kafka3.5/bin】啟動(dòng)kafka
kafka-server-start.sh ../config/server.properties
2、Kafka測試
1、生產(chǎn)者
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>id-1-message
>id-2-message
2、消費(fèi)者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
id-1-message
id-2-message
3、查看topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
test-topic
4、查看消息列表
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --partition 0
id-1-message
id-2-message
3、可視化工具
配置和部署
1、下載安裝包:kafka-eagle-bin-3.0.2.tar.gz
2、配置環(huán)境變量
open -e ~/.bash_profile
export KE_HOME=/本地路徑/efak-web-3.0.2
export PATH=$PATH:$KE_HOME/bin
source ~/.bash_profile
3、修改配置文件:system-config.properties
efak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
efak.url=jdbc:mysql://127.0.0.1:3306/kafka-eagle
4、本地新建數(shù)據(jù)庫:kafka-eagle,注意用戶名和密碼是否一致
5、啟動(dòng)命令
efak-web-3.0.2/bin/ke.sh start
命令語法: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate|cluster}
6、本地訪問【localhost:8048】 username:admin password:123456
KSQL語句測試
select * from `test-topic` where `partition` in (0) order by `date` desc limit 5
select * from `test-topic` where `partition` in (0) and msg like '%5%' order by `date` desc limit 3
三、工程搭建
1、工程結(jié)構(gòu)
2、依賴管理
這里關(guān)于依賴的管理就比較復(fù)雜了,首先spring-kafka
組件選擇與boot框架中spring相同的依賴,即6.0.10
版本,在spring-kafka
最近的版本中3.0.8
符合;
但是該版本使用的是kafka-clients
組件的3.3.2
版本,在Spring文檔的kafka模塊中,明確說明spring-boot:3.1
要使用kafka-clients:3.4
,所以從spring-kafka
組件中排除掉,重新依賴kafka-clients
組件;
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka-clients.version}</version> </dependency>
3、配置文件
配置kafka連接地址,監(jiān)聽器的消息應(yīng)答機(jī)制,消費(fèi)者的基礎(chǔ)模式;
spring: # kafka配置 kafka: bootstrap-servers: localhost:9092 listener: missing-topics-fatal: false ack-mode: manual_immediate consumer: group-id: boot-kafka-group enable-auto-commit: false max-poll-records: 10 properties: max.poll.interval.ms: 3600000
四、基礎(chǔ)用法
1、消息生產(chǎn)
模板類KafkaTemplate
用于執(zhí)行高級的操作,封裝各種消息發(fā)送的方法,在該方法中,通過topic
和key
以及消息主體,實(shí)現(xiàn)消息的生產(chǎn);
@RestController public class ProducerWeb { @Resource private KafkaTemplate<String, String> kafkaTemplate; @GetMapping("/send/msg") public String sendMsg (){ try { // 構(gòu)建消息主體 JsonMapper jsonMapper = new JsonMapper(); String msgBody = jsonMapper.writeValueAsString(new MqMsg(7,"boot-kafka-msg")); // 發(fā)送消息 kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody); } catch (JsonProcessingException e) { e.printStackTrace(); } return "OK" ; } }
2、消息消費(fèi)
編寫消息監(jiān)聽類,通過KafkaListener
注解控制監(jiān)聽的具體信息,在實(shí)現(xiàn)消息生產(chǎn)和消費(fèi)的方法測試后,使用可視化工具kafka-eagle
查看topic和消息列表;
@Component public class ConsumerListener { private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class); @KafkaListener(topics = "boot-kafka-topic") public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) { try { String key = String.valueOf(record.key()); String body = record.value(); log.info("\n=====\ntopic:boot-kafka-topic,key{},body:{}\n=====\n",key,body); } catch (Exception e){ e.printStackTrace(); } finally { acknowledgment.acknowledge(); } } }
五、參考源碼
文檔倉庫:
https://gitee.com/cicadasmile/butte-java-note
到此這篇關(guān)于SpringBoot3集成Kafka的方法詳解的文章就介紹到這了,更多相關(guān)SpringBoot3集成Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 如何使用SpringBoot集成Kafka實(shí)現(xiàn)用戶數(shù)據(jù)變更后發(fā)送消息
- SpringBoot集成Kafka 配置工具類的詳細(xì)代碼
- springboot集成kafka消費(fèi)手動(dòng)啟動(dòng)停止操作
- Springboot集成kafka高級應(yīng)用實(shí)戰(zhàn)分享
- Springboot 2.x集成kafka 2.2.0的示例代碼
- SpringBoot集成kafka全面實(shí)戰(zhàn)記錄
- SpringBoot集成Kafka的步驟
- Springboot集成Kafka實(shí)現(xiàn)producer和consumer的示例代碼
- SpringBoot集成Kafka的實(shí)現(xiàn)示例
相關(guān)文章
spring boot openfeign從此和httpClient說再見詳析
這篇文章主要給大家介紹了關(guān)于spring boot openfeign從此和httpClient說再見的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起看看吧2018-06-06關(guān)于mybatis-plus邏輯刪除自動(dòng)填充更新時(shí)間的問題
mybatis-plus是對mybatis的增強(qiáng),mybatis-plus更像是面向?qū)ο缶幊?,?shù)據(jù)庫基本CRUD的操作可以不用手動(dòng)編寫SQL語句,大大提高了開發(fā)的效率,這篇文章主要介紹了mybatis-plus邏輯刪除自動(dòng)填充更新時(shí)間問題,需要的朋友可以參考下2022-07-07Java獲取當(dāng)?shù)氐娜粘鋈章鋾r(shí)間代碼分享
這篇文章主要介紹了Java獲取當(dāng)?shù)氐娜粘鋈章鋾r(shí)間代碼分享,國外猿友寫的一個(gè)類,需要的朋友可以參考下2014-06-06Java?Spring?AOP源碼解析之事務(wù)實(shí)現(xiàn)原理
這篇文章主要為大家介紹了Java?Spring?AOP事務(wù)實(shí)現(xiàn)原理,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-01-01Java 中解決Unsupported major.minor version 51.0的問題
本文主要介紹解決Unsupported major.minor version 51.0的問題, 這里給大家整理了詳細(xì)資料,有需要的小伙伴可以參考下2016-08-08Java如何將json字符串與實(shí)體類互相轉(zhuǎn)換
在我們調(diào)用三方平臺(tái)接口時(shí),經(jīng)常需要將我們封裝的實(shí)體類轉(zhuǎn)換為json作為傳參,下面這篇文章主要給大家介紹了關(guān)于Java如何將json字符串與實(shí)體類互相轉(zhuǎn)換的相關(guān)資料,需要的朋友可以參考下2023-11-11mybatis.type-aliases-package的作用及用法說明
這篇文章主要介紹了mybatis.type-aliases-package的作用及用法說明,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01