深入解析Apache Kafka實(shí)時(shí)流處理平臺(tái)
Apache Kafka
Apache Kafka 是一個(gè)分布式流處理平臺(tái),由 LinkedIn 開(kāi)發(fā)并于 2011 年開(kāi)源,后成為 Apache 軟件基金會(huì)的頂級(jí)項(xiàng)目。Kafka 主要用于構(gòu)建實(shí)時(shí)數(shù)據(jù)管道和流應(yīng)用,能夠處理高吞吐量、低延遲的數(shù)據(jù)發(fā)布與訂閱場(chǎng)景,并支持?jǐn)?shù)據(jù)持久化、多消費(fèi)者組并行消費(fèi)以及容錯(cuò)等功能。
基本概念
主題(Topic):在 Kafka 中,主題是一個(gè)邏輯上的命名空間,是消息發(fā)布的類(lèi)別或頻道。生產(chǎn)者將消息發(fā)送到特定的主題上,而消費(fèi)者從這些主題中拉取消息。
分區(qū)(Partition):每個(gè)主題可以被劃分為多個(gè)分區(qū),每個(gè)分區(qū)都是一個(gè)有序且不可變的消息序列。分區(qū)的設(shè)計(jì)增強(qiáng)了系統(tǒng)的擴(kuò)展性和并行處理能力,同一主題的不同分區(qū)可以分布于不同的服務(wù)器節(jié)點(diǎn)上。
副本(Replica):同一分區(qū)可以在集群內(nèi)的不同 Broker 上有多個(gè)副本,其中一個(gè)為主副本(Leader),其他為跟隨副本(Follower)。通過(guò)復(fù)制機(jī)制,Kafka 提供了數(shù)據(jù)冗余以實(shí)現(xiàn)高可用性。
生產(chǎn)者(Producer):生產(chǎn)者負(fù)責(zé)向 Kafka 主題發(fā)送消息。生產(chǎn)者可以選擇將消息發(fā)送至特定分區(qū),或者讓 Kafka 自動(dòng)基于負(fù)載均衡或其他策略選擇目標(biāo)分區(qū)。
消費(fèi)者(Consumer):消費(fèi)者從主題的分區(qū)中讀取消息。消費(fèi)者可以通過(guò)訂閱一個(gè)或多個(gè)主題來(lái)接收消息。
消費(fèi)者組(Consumer Group):消費(fèi)者組是一組共同消費(fèi)主題的消費(fèi)者的邏輯集合。當(dāng)消費(fèi)者屬于同一個(gè)組時(shí),它們會(huì)集體消費(fèi)主題的所有分區(qū),但每個(gè)分區(qū)只分配給該組內(nèi)的一臺(tái)消費(fèi)者實(shí)例,從而實(shí)現(xiàn)了消息的并行消費(fèi)和負(fù)載均衡。如果組內(nèi)消費(fèi)者數(shù)量超過(guò)分區(qū)數(shù),則多余的消費(fèi)者將處于空閑狀態(tài)。
操作指南
創(chuàng)建主題
在早期版本的 Kafka 中,用戶通常使用 kafka-topics.sh 工具通過(guò) ZooKeeper 連接字符串創(chuàng)建主題。例如:
./kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 3 \ --partitions 8 \ --topic my-topic
然而,在較新的 Kafka 版本中,ZooKeeper 的依賴(lài)已被移除,現(xiàn)在推薦直接通過(guò) Kafka 的 AdminClient API 或命令行工具與 Kafka 集群通信:
./kafka-topics.sh --bootstrap-server localhost:9092 \ --topic my-topic \ --partitions 8 \ --replication-factor 3 \ --create
上述命令會(huì)在 Kafka 集群中創(chuàng)建一個(gè)名為 my-topic 的主題,它包含8個(gè)分區(qū),并且每個(gè)分區(qū)都有3份副本。
消費(fèi)主題
要消費(fèi)主題,開(kāi)發(fā)者通常編寫(xiě)代碼實(shí)現(xiàn) Kafka Consumer API。以下是一個(gè)簡(jiǎn)單的 Java 示例:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-consumer-group"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } // 處理記錄后提交偏移量 consumer.commitAsync(); }
在這個(gè)示例中,我們首先設(shè)置了連接 Kafka 集群所需的屬性,定義了消費(fèi)者組名,并指定了消息的鍵值反序列化方式。然后創(chuàng)建了一個(gè) KafkaConsumer 實(shí)例,訂閱了 my-topic 主題,并開(kāi)始循環(huán)消費(fèi)消息。每當(dāng)收到消息時(shí),會(huì)輸出消息的偏移量、鍵和值。
總結(jié)
Kafka 通過(guò)其靈活的主題、分區(qū)和消費(fèi)者組模型,提供了一種高效可靠的消息傳遞系統(tǒng),適用于大規(guī)模實(shí)時(shí)數(shù)據(jù)處理和集成場(chǎng)景。從簡(jiǎn)單到復(fù)雜的應(yīng)用,Kafka 可以支持從日志收集、事件驅(qū)動(dòng)架構(gòu)到大數(shù)據(jù)處理等多種業(yè)務(wù)需求。
以上就是深入解析Apache Kafka實(shí)時(shí)流處理平臺(tái)的詳細(xì)內(nèi)容,更多關(guān)于Apache Kafka流處理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- 解決kafka:org.apache.kafka.common.errors.TimeoutException問(wèn)題
- 使用Docker搭建Apache Kafka環(huán)境的詳細(xì)過(guò)程
- Apache?Kafka?分區(qū)重分配的實(shí)現(xiàn)原理解析
- 結(jié)合線程池實(shí)現(xiàn)apache?kafka消費(fèi)者組的誤區(qū)及解決方法
- 在Spring Boot應(yīng)用程序中使用Apache Kafka的方法步驟詳解
- 如何使用Apache Kafka 構(gòu)建實(shí)時(shí)數(shù)據(jù)處理應(yīng)用
相關(guān)文章
java和javascript中過(guò)濾掉img形式的字符串不顯示圖片的方法
這篇文章主要介紹了java和javascript中過(guò)濾掉img形式的字符串不顯示圖片的方法,以實(shí)例形式分別講述了采用java和javascript實(shí)現(xiàn)過(guò)濾掉img形式字符串的技巧,需要的朋友可以參考下2015-02-02java操作mongodb時(shí),對(duì)象bean和DBObject相互轉(zhuǎn)換的方法(推薦)
下面小編就為大家?guī)?lái)一篇java操作mongodb時(shí),對(duì)象bean和DBObject相互轉(zhuǎn)換的方法(推薦)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-11-11Spring中的攔截器HandlerInterceptor詳細(xì)解析
這篇文章主要介紹了Spring中的攔截器HandlerInterceptor詳細(xì)解析,HandlerInterceptor 是 Spring 框架提供的一個(gè)攔截器接口,用于在請(qǐng)求處理過(guò)程中攔截和處理請(qǐng)求,需要的朋友可以參考下2024-01-01springboot下mybatis-plus開(kāi)啟打印sql日志的配置指南
這篇文章主要給大家介紹了關(guān)于springboot下mybatis-plus開(kāi)啟打印sql日志的配置指南的相關(guān)資料,還介紹了關(guān)閉打印的方法,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-03-03List轉(zhuǎn)變?yōu)槎禾?hào)分隔的String(Java7和Java8分別實(shí)現(xiàn))
這篇文章主要介紹了Java7和Java8分別實(shí)現(xiàn)List轉(zhuǎn)變?yōu)槎禾?hào)分隔的String,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06MyBatis XML去除多余AND|OR前綴或逗號(hào)等后綴的操作
這篇文章主要介紹了MyBatis XML去除多余AND|OR前綴或逗號(hào)等后綴的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02Java?-jar參數(shù)詳解之掌握J(rèn)ava可執(zhí)行JAR文件的運(yùn)行技巧
做項(xiàng)目的時(shí)候我們肯定接觸過(guò)很多jar包,下面這篇文章主要給大家介紹了關(guān)于Java?-jar參數(shù)詳解之掌握J(rèn)ava可執(zhí)行JAR文件的運(yùn)行技巧,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-11-11