Kafka的基本使用及環(huán)境安裝
認識Kafka
消息隊列
消息隊列是分布式系統(tǒng)和現(xiàn)代應(yīng)用架構(gòu)中至關(guān)重要的中間件。它的核心作用是解耦、異步和削峰填谷,像一個高效的“通信員”和“緩沖池”協(xié)調(diào)不同組件之間的工作。
消息隊列的核心概念
生產(chǎn)者: 產(chǎn)生消息(數(shù)據(jù)、任務(wù)請求、事件通知)并發(fā)送到隊列的應(yīng)用程序或服務(wù)。
消息隊列: 一個臨時的、持久化的存儲區(qū)域(通?;趦?nèi)存、磁盤或數(shù)據(jù)庫),用于存放生產(chǎn)者發(fā)送的消息。消息按照先進先出的順序存儲,但很多隊列支持優(yōu)先級、延遲等特性。
消費者: 從隊列中獲取消息并進行處理的應(yīng)用程序或服務(wù)。
消息: 隊列中傳輸?shù)臄?shù)據(jù)單元,通常包含有效載荷(實際數(shù)據(jù))和元數(shù)據(jù)(如ID、時間戳、優(yōu)先級等)。
核心價值與解決的問題
解耦:
問題: 系統(tǒng)組件(服務(wù))之間直接調(diào)用會導(dǎo)致緊密耦合。一個組件的變更、故障或性能瓶頸會直接影響其他依賴它的組件。擴展也變得困難。
解決: 生產(chǎn)者只需將消息發(fā)送到隊列,無需知道誰(消費者)會處理它,消費者只需從隊列訂閱消息,無需知道消息是誰(生產(chǎn)者)發(fā)送的。雙方只依賴隊列,不直接依賴對方,大大降低了耦合度。系統(tǒng)更靈活、更易于維護和擴展。
異步:
問題: 同步調(diào)用要求調(diào)用方(生產(chǎn)者)必須等待被調(diào)用方(消費者)處理完成并返回結(jié)果才能繼續(xù)執(zhí)行。如果處理耗時很長,調(diào)用方會被阻塞,資源利用率低,用戶體驗差(如網(wǎng)頁卡頓)。
解決: 生產(chǎn)者發(fā)送消息到隊列后即可返回,無需等待消費者處理。消費者在后臺異步地從隊列拉取消息進行處理。這顯著提高了系統(tǒng)的吞吐量和響應(yīng)速度。
削峰填谷:
問題: 系統(tǒng)流量往往存在高峰和低谷。高峰期如果請求量遠超消費者處理能力,會導(dǎo)致系統(tǒng)過載、崩潰或請求超時。低谷期資源又可能閑置。
解決: 隊列作為緩沖區(qū),在流量高峰時積壓請求,平滑地將大量請求暫存起來。消費者按照自己的穩(wěn)定處理能力從隊列中拉取消息進行處理,避免了瞬間洪峰壓垮下游系統(tǒng)。在流量低谷時,消費者可以繼續(xù)處理隊列中積壓的消息。
冗余與可靠性:
問題: 直接調(diào)用時,如果消費者臨時不可用(故障、重啟、維護),生產(chǎn)者的請求會丟失或失敗。
解決: 消息隊列通常提供消息持久化功能(將消息寫入磁盤)。即使消費者暫時離線,消息也會安全存儲在隊列中,待消費者恢復(fù)后繼續(xù)處理,確保消息不丟失。許多隊列還提供確認機制(ACK),消費者處理成功后才會從隊列中移除消息。
可伸縮性:
問題: 單一消費者處理能力有限,難以應(yīng)對增長的業(yè)務(wù)量。
解決: 可以很容易地增加消費者的數(shù)量(水平擴展),讓多個消費者并行地從同一個隊列中拉取消息進行處理,顯著提高系統(tǒng)的整體吞吐量。隊列本身也可以做成分布式集群來應(yīng)對高吞吐量需求。
順序保證:
問題: 在分布式環(huán)境中保證消息處理的嚴格順序很困難。
解決: 雖然完全全局有序很難,但許多消息隊列能保證分區(qū)有序或隊列有序(在單個隊列/分區(qū)內(nèi),消息按照發(fā)送順序被消費)。這對于某些需要保證因果關(guān)系的業(yè)務(wù)場景(如賬戶流水)非常重要。
緩沖:
問題: 生產(chǎn)者和消費者的處理速度不一致。
解決: 隊列天然提供了緩沖能力,允許生產(chǎn)者和消費者以各自不同的速率工作,不會互相拖累。
常見的消息隊列有RabbitMQ,Kafka,RocketMQ。這里主要介紹Kafka。
Kafka
Kafka 通常指 Apache Kafka,這是一個開源的、分布式的、高吞吐量、低延遲的流處理平臺。它最初由 LinkedIn 開發(fā),后來捐贈給了 Apache 軟件基金會,并迅速成為大數(shù)據(jù)和實時數(shù)據(jù)處理領(lǐng)域的核心基礎(chǔ)設(shè)施之一。
Kafka 不僅僅是一個消息隊列,它是一個高吞吐、低延遲、分布式、持久化、可水平擴展的流數(shù)據(jù)平臺。它設(shè)計之初就是為了處理持續(xù)產(chǎn)生、體量巨大、需要實時處理的“數(shù)據(jù)流”。
ZooKeeper是一個開源的分布式應(yīng)用程序協(xié)調(diào)軟件,而Kafka是分布式事件處理平臺,底層是使用分布式架構(gòu)設(shè)計,所以Kafka的多個節(jié)點之間是采用zookeeper來實現(xiàn)協(xié)調(diào)調(diào)度的。
ZooKeeper
ZooKeeper是一個開源的分布式應(yīng)用程序協(xié)調(diào)軟件,而Kafka是分布式事件處理平臺,底層是使用分布式架構(gòu)設(shè)計,所以Kafka的多個節(jié)點之間是采用zookeeper來實現(xiàn)協(xié)調(diào)調(diào)度的。
Zookeeper的核心作用
ZooKeeper的數(shù)據(jù)存儲結(jié)構(gòu)可以簡單地理解為一個Tree結(jié)構(gòu),而Tree結(jié)構(gòu)上的每一個節(jié)點可以用于存儲數(shù)據(jù),所以一般情況下,我們可以將分布式系統(tǒng)的元數(shù)據(jù)(環(huán)境信息以及系統(tǒng)配置信息)保存在ZooKeeper節(jié)點中。
ZooKeeper創(chuàng)建數(shù)據(jù)節(jié)點時,會根據(jù)業(yè)務(wù)場景創(chuàng)建臨時節(jié)點或永久(持久)節(jié)點。永久節(jié)點就是無論客戶端是否連接上ZooKeeper都一直存在的節(jié)點,而臨時節(jié)點指的是客戶端連接時創(chuàng)建,斷開連接后刪除的節(jié)點。同時,ZooKeeper也提供了Watch(監(jiān)控)機制用于監(jiān)控節(jié)點的變化,然后通知對應(yīng)的客戶端進行相應(yīng)的變化。Kafka軟件中就內(nèi)置了ZooKeeper的客戶端,用于進行ZooKeeper的連接和通信。
Kafka的基本使用
環(huán)境安裝
我們這里先安裝簡單的Windows單機環(huán)境。在安裝之前務(wù)必先安裝Java8。
下載Kafka:Kafka下載地址Apache Kafka: A Distributed Streaming Platform.
https://kafka.apache.org/downloads
選擇版本為2.13-3.8.0
下載完成后進行解壓,解壓目錄放在非系統(tǒng)盤根目錄下。為了訪問方便,可以將解壓后的文件夾名稱修改為Kafka
Kafka的文件目錄
bin | linux系統(tǒng)下可執(zhí)行腳本文件 |
bin/windows | windows系統(tǒng)下可執(zhí)行腳本文件 |
config | 配置文件 |
libs | 依賴類庫 |
licenses | 許可信息 |
site-docs | 文檔 |
logs | 服務(wù)日志 |
啟動zookeeper
當(dāng)前版本的Kafka軟件仍然依賴Zookeeper,所以啟動Kafka之前,需要先啟動Zookeeper,Kafka軟件內(nèi)置了Zookeeper,所以無需額外安裝,直接調(diào)用啟動腳本即可。
1. 進入Kafka解壓縮文件夾的config目錄,修改zookeeper.properties配置文件
修改dataDir配置,用于設(shè)置ZooKeeper數(shù)據(jù)存儲位置,該路徑如果不存在會自動創(chuàng)建。
dataDir=D:/kafka/data/zk
在kafka解壓縮后的目錄中創(chuàng)建Zookeeper啟動腳本文件:zk.cmd。
輸入:
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
上述指令就是調(diào)用zookeeper啟動命令,同時指定配置文件
雙擊啟動即可:
啟動完成。
啟動Kafka
進入Kafka解壓縮文件夾的config目錄,修改server.properties配置文件.
設(shè)置Kafka數(shù)據(jù)的存儲目錄。如果文件目錄不存在,會自動生成。
在kafka解壓縮后的目錄中創(chuàng)建Kafka啟動腳本文件:kfk.cmd。
輸入:
call bin/windows/kafka-server-start.bat config/server.properties
雙擊啟動即可:
DOS窗口中,輸入jps指令,查看當(dāng)前啟動的軟件進程:
這里名稱為QuorumPeerMain的就是ZooKeeper軟件進程,名稱為Kafka的就是Kafka系統(tǒng)進程。此時,說明Kafka已經(jīng)可以正常使用了。
消息主題
在發(fā)布訂閱模型中,為了讓消費者對感興趣的消息進行消費,而不是消費所有消息,所以就定義了主題(Topic),也就是說將不同的消息進行分類,分成不同的主題(Topic),然后消息生產(chǎn)者在生成消息時,就會向指定的主題(Topic)中發(fā)送,而消息消費者也可以訂閱自己感興趣的主題(Topic)并從中獲取消息。
有很多種方式都可以操作Kafka消息中的主題(Topic):命令行、第三方工具、Java API、自動創(chuàng)建。而對于初學(xué)者來講,掌握基本的命令行操作是必要的。所以接下來,我們采用命令行進行操作。
創(chuàng)建主題
使用命令行方式創(chuàng)建主題test
打開DOS窗口,在確保Zookeeper和Kafkfa啟動的情況下,進入Kafkfa解壓目錄下的bin/windows目錄。
輸入如下命令創(chuàng)建主題test: kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test
test主題創(chuàng)建完成。
查詢主題
輸入如下命令進行主題查詢:kafka-topics.bat --bootstrap-server localhost:9092 --list
修改主題
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2
上述命令將test主題的分區(qū)數(shù)量設(shè)置為2.關(guān)于分區(qū)的信息,后面會詳細介紹。
發(fā)送數(shù)據(jù)
命令行操作
使用命令行方式發(fā)送:
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
上述操作就是在控制臺生成數(shù)據(jù),hello kafka 這里的數(shù)據(jù)需要回車,才會發(fā)送到Kafka服務(wù)器。
JavaAPI操作
引入依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.8.0</version> </dependency>
編寫生產(chǎn)者
public class ProducerTest { public static void main(String[] args) { // 配置屬性集合 Map<String, Object> configMap = new HashMap<>(); // 配置屬性:Kafka服務(wù)器集群地址 configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 配置屬性:Kafka生產(chǎn)的數(shù)據(jù)為KV對,所以在生產(chǎn)數(shù)據(jù)進行傳輸前需要分別對K,V進行對應(yīng)的序列化操作 configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 創(chuàng)建Kafka生產(chǎn)者對象,建立Kafka連接 // 構(gòu)造對象時,需要傳遞配置參數(shù) KafkaProducer<String, String> producer = new KafkaProducer<>(configMap); // 準備數(shù)據(jù),定義泛型 // 構(gòu)造對象時需要傳遞 【Topic主題名稱】,【Key】,【Value】三個參數(shù) for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>( "test", "key" + i, "value" + i ); // 生產(chǎn)(發(fā)送)數(shù)據(jù) producer.send(record); } // 關(guān)閉生產(chǎn)者連接 producer.close(); } }
消費數(shù)據(jù)
命令行操作
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
JavaAPI操作
public class ConsumerTest { public static void main(String[] args) { // 創(chuàng)建配置對象 Map<String, Object> configMap = new HashMap<>(); configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 反序列化類配置 configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 組ID配置 configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // 創(chuàng)建消費者對象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configMap); // 從kafka主題中獲取對象 訂閱主題 consumer.subscribe(Collections.singleton("test")); // 消費者從Kafka主題中拉取數(shù)據(jù) while (true) { ConsumerRecords<String, String> datas = consumer.poll(100); for (ConsumerRecord<String, String> data : datas) { System.out.println(data); } } // 關(guān)閉消費者對象 // consumer.close(); } }
到此這篇關(guān)于Kafka的基本使用的文章就介紹到這了,更多相關(guān)Kafka使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot使用kafka的過程
- Python使用Apache Kafka時Poll拉取速度慢的解決方法
- SpringBoot使用Kafka來優(yōu)化接口請求的并發(fā)方式
- 如何使用Apache Kafka 構(gòu)建實時數(shù)據(jù)處理應(yīng)用
- springboot使用kafka事務(wù)的示例代碼
- Spring Kafka中@KafkaListener注解的參數(shù)與使用小結(jié)
- springboot使用@KafkaListener監(jiān)聽多個kafka配置實現(xiàn)
- springboot連接kafka集群的使用示例
- spring?kafka?@KafkaListener詳解與使用過程
相關(guān)文章
方法參數(shù)屬性params,@PathVariable和@RequestParam用法及區(qū)別
這篇文章主要介紹了方法參數(shù)屬性params,@PathVariable和@RequestParam用法及區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-10-10使用IntelliJ IDEA查看類的繼承關(guān)系圖形(圖文詳解)
這篇文章主要介紹了使用IntelliJ IDEA查看類的繼承關(guān)系圖形,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的工作或?qū)W習(xí)具有一定的參考借鑒價值,需要的朋友可以參考下2020-03-03Java數(shù)據(jù)結(jié)構(gòu)之HashMap源碼深入分析
Java HashMap是一種基于哈希表實現(xiàn)的鍵值對存儲結(jié)構(gòu),可以實現(xiàn)快速的數(shù)據(jù)查找和存儲。它是線程不安全的,但在單線程環(huán)境中運行效率高,被廣泛應(yīng)用于Java開發(fā)中2023-04-04Eclipse+Java+Swing實現(xiàn)學(xué)生成績管理系統(tǒng)的實例代碼
這篇文章主要介紹了Eclipse+Java+Swing實現(xiàn)學(xué)生成績管理系統(tǒng),本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01簡化API提升開發(fā)效率RestTemplate與HttpClient?OkHttp關(guān)系詳解
這篇文章主要為大家介紹了簡化API,提升開發(fā)效率,RestTemplate與HttpClient?OkHttp關(guān)系介紹,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-10-10SpringCloud OpenFeign 自定義響應(yīng)解碼器的問題記錄
我們在使用 Spring Cloud 微服務(wù)的時候,通常將返回結(jié)果使用一個JsonResult 類進行封裝,本文重點介紹SpringCloud OpenFeign 自定義響應(yīng)解碼器的問題記錄,感興趣的朋友跟隨小編一起看看吧2024-06-06SpringBoot2零基礎(chǔ)到精通之profile功能與自定義starter
SpringBoot是一種整合Spring技術(shù)棧的方式(或者說是框架),同時也是簡化Spring的一種快速開發(fā)的腳手架,本篇讓我們一起學(xué)習(xí)profile功能與自定義starter2022-03-03關(guān)于spring-security(記住密碼,CSRF)
文章主要介紹了Spring Security中的PersistentTokenRepository、CSRF保護機制以及如何在登錄頁面添加記住我功能,并分享了相關(guān)實現(xiàn)代碼和配置2024-11-11IDEA無法打開Marketplace的三種解決方案(推薦)
這篇文章主要介紹了IDEA無法打開Marketplace的三種解決方案(推薦),本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11