深入解析Apache Kafka實時流處理平臺
Apache Kafka
Apache Kafka 是一個分布式流處理平臺,由 LinkedIn 開發(fā)并于 2011 年開源,后成為 Apache 軟件基金會的頂級項目。Kafka 主要用于構(gòu)建實時數(shù)據(jù)管道和流應用,能夠處理高吞吐量、低延遲的數(shù)據(jù)發(fā)布與訂閱場景,并支持數(shù)據(jù)持久化、多消費者組并行消費以及容錯等功能。
基本概念
主題(Topic):在 Kafka 中,主題是一個邏輯上的命名空間,是消息發(fā)布的類別或頻道。生產(chǎn)者將消息發(fā)送到特定的主題上,而消費者從這些主題中拉取消息。
分區(qū)(Partition):每個主題可以被劃分為多個分區(qū),每個分區(qū)都是一個有序且不可變的消息序列。分區(qū)的設計增強了系統(tǒng)的擴展性和并行處理能力,同一主題的不同分區(qū)可以分布于不同的服務器節(jié)點上。
副本(Replica):同一分區(qū)可以在集群內(nèi)的不同 Broker 上有多個副本,其中一個為主副本(Leader),其他為跟隨副本(Follower)。通過復制機制,Kafka 提供了數(shù)據(jù)冗余以實現(xiàn)高可用性。
生產(chǎn)者(Producer):生產(chǎn)者負責向 Kafka 主題發(fā)送消息。生產(chǎn)者可以選擇將消息發(fā)送至特定分區(qū),或者讓 Kafka 自動基于負載均衡或其他策略選擇目標分區(qū)。
消費者(Consumer):消費者從主題的分區(qū)中讀取消息。消費者可以通過訂閱一個或多個主題來接收消息。
消費者組(Consumer Group):消費者組是一組共同消費主題的消費者的邏輯集合。當消費者屬于同一個組時,它們會集體消費主題的所有分區(qū),但每個分區(qū)只分配給該組內(nèi)的一臺消費者實例,從而實現(xiàn)了消息的并行消費和負載均衡。如果組內(nèi)消費者數(shù)量超過分區(qū)數(shù),則多余的消費者將處于空閑狀態(tài)。
操作指南
創(chuàng)建主題
在早期版本的 Kafka 中,用戶通常使用 kafka-topics.sh 工具通過 ZooKeeper 連接字符串創(chuàng)建主題。例如:
./kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 3 \ --partitions 8 \ --topic my-topic
然而,在較新的 Kafka 版本中,ZooKeeper 的依賴已被移除,現(xiàn)在推薦直接通過 Kafka 的 AdminClient API 或命令行工具與 Kafka 集群通信:
./kafka-topics.sh --bootstrap-server localhost:9092 \ --topic my-topic \ --partitions 8 \ --replication-factor 3 \ --create
上述命令會在 Kafka 集群中創(chuàng)建一個名為 my-topic 的主題,它包含8個分區(qū),并且每個分區(qū)都有3份副本。
消費主題
要消費主題,開發(fā)者通常編寫代碼實現(xiàn) Kafka Consumer API。以下是一個簡單的 Java 示例:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-consumer-group"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } // 處理記錄后提交偏移量 consumer.commitAsync(); }
在這個示例中,我們首先設置了連接 Kafka 集群所需的屬性,定義了消費者組名,并指定了消息的鍵值反序列化方式。然后創(chuàng)建了一個 KafkaConsumer 實例,訂閱了 my-topic 主題,并開始循環(huán)消費消息。每當收到消息時,會輸出消息的偏移量、鍵和值。
總結(jié)
Kafka 通過其靈活的主題、分區(qū)和消費者組模型,提供了一種高效可靠的消息傳遞系統(tǒng),適用于大規(guī)模實時數(shù)據(jù)處理和集成場景。從簡單到復雜的應用,Kafka 可以支持從日志收集、事件驅(qū)動架構(gòu)到大數(shù)據(jù)處理等多種業(yè)務需求。
以上就是深入解析Apache Kafka實時流處理平臺的詳細內(nèi)容,更多關(guān)于Apache Kafka流處理的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java和javascript中過濾掉img形式的字符串不顯示圖片的方法
這篇文章主要介紹了java和javascript中過濾掉img形式的字符串不顯示圖片的方法,以實例形式分別講述了采用java和javascript實現(xiàn)過濾掉img形式字符串的技巧,需要的朋友可以參考下2015-02-02java操作mongodb時,對象bean和DBObject相互轉(zhuǎn)換的方法(推薦)
下面小編就為大家?guī)硪黄猨ava操作mongodb時,對象bean和DBObject相互轉(zhuǎn)換的方法(推薦)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-11-11Spring中的攔截器HandlerInterceptor詳細解析
這篇文章主要介紹了Spring中的攔截器HandlerInterceptor詳細解析,HandlerInterceptor 是 Spring 框架提供的一個攔截器接口,用于在請求處理過程中攔截和處理請求,需要的朋友可以參考下2024-01-01springboot下mybatis-plus開啟打印sql日志的配置指南
這篇文章主要給大家介紹了關(guān)于springboot下mybatis-plus開啟打印sql日志的配置指南的相關(guān)資料,還介紹了關(guān)閉打印的方法,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2023-03-03List轉(zhuǎn)變?yōu)槎禾柗指舻腟tring(Java7和Java8分別實現(xiàn))
這篇文章主要介紹了Java7和Java8分別實現(xiàn)List轉(zhuǎn)變?yōu)槎禾柗指舻腟tring,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06MyBatis XML去除多余AND|OR前綴或逗號等后綴的操作
這篇文章主要介紹了MyBatis XML去除多余AND|OR前綴或逗號等后綴的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02Java?-jar參數(shù)詳解之掌握Java可執(zhí)行JAR文件的運行技巧
做項目的時候我們肯定接觸過很多jar包,下面這篇文章主要給大家介紹了關(guān)于Java?-jar參數(shù)詳解之掌握Java可執(zhí)行JAR文件的運行技巧,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2023-11-11