Java中的Kafka消費者詳解
一、消費者工作流程
1.1 總體工作流程
1.2 消費者組初始化流程
1.3 消費者組詳細消費流程
二、消費者消費消息方式
pull(拉)模 式:consumer采用從broker中主動拉取數(shù)據(jù)。Kafka采用這種方式。
push(推)模式:Kafka沒有采用這種方式,因為由broker決定消息發(fā)送速率,很難適應所有消費者的 消費速率。
例如推送的速度是50m/s,Consumer1、Consumer2就來不及處理消息。
pull模式不足之處是,如 果Kafka沒有數(shù)據(jù),消費者可能會陷入循環(huán)中,一直返回空數(shù)據(jù)
2.1 消費者組
Consumer Group(CG):消費者組,由多個consumer組成。形成一個消費者組的條件,是所有消費者的groupid相同。
消費者組內(nèi)每個消費者負責消費不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)消費者消費。 消費者組之間互不影響。所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。
如果向消費組中添加更多的消費者,超過主題分區(qū)數(shù)量,則有一部分消費者就會閑置,不會接收任何消息。
消費者組之間互不影響。所有的消費者都屬于某個消費者 組,即消費者組是邏輯上的一個訂閱者。
2.2 消費一個主題
// 0 配置 Properties properties = new Properties(); // 連接 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消費者組id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5"); // 1 創(chuàng)建一個消費者 "", "hello" KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 訂閱主題 first ArrayList<String> topics = new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); // 3 消費數(shù)據(jù) while (true){ //1秒消費一次 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } }
2.3 消費一個分區(qū)
需求:創(chuàng)建一個獨立消費者,消費 first 主題 0 號分區(qū)的數(shù)據(jù)。
// 0 配置 Properties properties = new Properties(); // 連接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 組id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); // 1 創(chuàng)建一個消費者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 訂閱主題對應的分區(qū) ArrayList<TopicPartition> topicPartitions = new ArrayList<>(); topicPartitions.add(new TopicPartition("first",0)); kafkaConsumer.assign(topicPartitions); // 3 消費數(shù)據(jù) while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } }
三、分區(qū)的分配以及再平衡
1、一個consumer group中有多個consumer組成,一個 topic有多個partition組成,現(xiàn)在的問題是,到底由哪個consumer來消費哪個 partition的數(shù)據(jù)。
2、Kafka有四種主流的分區(qū)分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通過配置參數(shù)partition.assignment.strategy,修改分區(qū)的分配策略。Kafka可以同時使用多個分區(qū)分配策略。
參數(shù)名稱 | 描述 |
heartbeat.interval.ms | Kafka 消費者和 coordinator 之間的心跳時間,默認 3s。該條目的值必須小于 session.timeout.ms,也不應該高于session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消費者和 coordinator 之間連接超時時間,默認 45s。超過該值,該消費者被移除,消費者組執(zhí)行再平衡。 |
max.poll.interval.ms | 消費者處理消息的最大時長,默認是 5 分鐘。超過該值,該消費者被移除,消費者組執(zhí)行再平衡。 |
partition.assignment.strategy | 消 費 者 分 區(qū) 分 配 策 略 , 默 認 策 略 是 Range + CooperativeSticky。Kafka 可以同時使用多個分區(qū)分配策略???以 選 擇 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky |
3.1 分區(qū)分配策略之Range
- 默認策略是Range + CooperativeSticky。
- 再平衡案例
(1)停止掉 0 號消費者,快速重新發(fā)送消息觀看結(jié)果(45s 以內(nèi),越快越好)。 1 號消費者:消費到 3、4 號分區(qū)數(shù)據(jù)。 2 號消費者:消費到 5、6 號分區(qū)數(shù)據(jù)。 0 號消費者的任務會整體被分配到 1 號消費者或者 2 號消費者。 說明:0 號消費者掛掉后,消費者組需要按照超時時間 45s 來判斷它是否退出,所以需要等待,時間到了 45s 后,判斷它真的退出就會把任務分配給其他 broker 執(zhí)行。
(2)再次重新發(fā)送消息觀看結(jié)果(45s 以后)。 1 號消費者:消費到 0、1、2、3 號分區(qū)數(shù)據(jù)。 2 號消費者:消費到 4、5、6 號分區(qū)數(shù)據(jù)。說明:消費者 0 已經(jīng)被踢出消費者組,所以重新按照 range 方式分配。
3.2 分區(qū)分配策略之RoundRobin
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
- 再平衡案例
(1)停止掉 0 號消費者,快速重新發(fā)送消息觀看結(jié)果(45s 以內(nèi),越快越好)。 1 號消費者:消費到 2、5 號分區(qū)數(shù)據(jù) 2 號消費者:消費到 4、1 號分區(qū)數(shù)據(jù)0 號消費者的任務會按照 RoundRobin 的方式,把數(shù)據(jù)輪詢分成 0 、6 和 3 號分區(qū)數(shù)據(jù), 分別由 1 號消費者或者 2 號消費者消費。說明:0 號消費者掛掉后,消費者組需要按照超時時間 45s 來判斷它是否退出,所以需 要等待,時間到了 45s 后,判斷它真的退出就會把任務分配給其他 broker 執(zhí)行。
(2)再次重新發(fā)送消息觀看結(jié)果(45s 以后)。 1 號消費者:消費到 0、2、4、6 號分區(qū)數(shù)據(jù)2 號消費者:消費到 1、3、5 號分區(qū)數(shù)據(jù)說明:消費者 0 已經(jīng)被踢出消費者組,所以重新按照 RoundRobin 方式分配
3.3 Sticky 以及再平衡
粘性分區(qū)定義:可以理解為分配的結(jié)果帶有“粘性的”。即在執(zhí)行一次新的分配之前,考慮上一次分配的結(jié)果,盡量少的調(diào)整分配的變動,可以節(jié)省大量的開銷。
粘性分區(qū)是 Kafka 從 0.11.x 版本開始引入這種分配策略,首先會盡量均衡的放置分區(qū)到消費者上面,在出現(xiàn)同一消費者組內(nèi)消費者出現(xiàn)問題的時候,會盡量保持原有分配的分區(qū)不變化
再平衡案例
(1)停止掉 0 號消費者,快速重新發(fā)送消息觀看結(jié)果(45s 以內(nèi),越快越好)。 1 號消費者:消費到 2、5、3 號分區(qū)數(shù)據(jù)。 2 號消費者:消費到 4、6 號分區(qū)數(shù)據(jù)。 0 號消費者的任務會按照粘性規(guī)則,盡可能均衡的隨機分成 0 和 1 號分區(qū)數(shù)據(jù),分別由 1 號消費者或者 2 號消費者消費。說明:0 號消費者掛掉后,消費者組需要按照超時時間 45s 來判斷它是否退出,所以需要等待,時間到了 45s 后,判斷它真的退出就會把任務分配給其他 broker 執(zhí)行。
(2)再次重新發(fā)送消息觀看結(jié)果(45s 以后)。 1 號消費者:消費到 2、3、5 號分區(qū)數(shù)據(jù)。 2 號消費者:消費到 0、1、4、6 號分區(qū)數(shù)據(jù)。說明:消費者 0 已經(jīng)被踢出消費者組,所以重新按照粘性方式分配。
四、offset 位移
__consumer_offsets 主題里面采用 key 和 value 的方式存儲數(shù)據(jù)。key 是 group.id+topic+分區(qū)號,value 就是當前 offset 的值。
每隔一段時間,kafka 內(nèi)部會對這個 topic 進行compact,也就是每個 group.id+topic+分區(qū)號就保留最新數(shù)據(jù)。
4.1 自動提交 offset
為了使我們能夠?qū)W⒂谧约旱臉I(yè)務邏輯,Kafka提供了自動提交offset的功能。5s
參數(shù)名稱 | 描述 |
enable.auto.commit | 默認值為 true,消費者會自動周期性地向服務器提交偏移量。 |
auto.commit.interval.ms | 自動提交offset的時間間隔,默認是5s,如果設(shè)置了 enable.auto.commit 的值為 true, 則該值定義了消費者偏移量向 Kafka 提交的頻率,默認 5s。 |
// 自動提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 提交時間間隔 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
4.2 手動提交offset
雖然自動提交offset十分簡單便利,但由于其是基于時間提交的,開發(fā)人員難以把握offset提交的時機。因 此Kafka還提供了手動提交offset的API。
手動提交offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(異步提交)。兩者的相同點是,都會將本次提交的一批數(shù)據(jù)最高的偏移量提交;不同點是,同步提交阻塞當前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導致,也會出現(xiàn)提交失?。?;而異步提交則沒有失敗重試機制,故有可能提交失敗。
commitSync(同步提交):必須等待offset提交完畢,再去消費下一批數(shù)據(jù)。 commitAsync(異步提交) :發(fā)送完提交offset請求后,就開始消費下一批數(shù)據(jù)了。
- 同步提交 offset
由于同步提交 offset 有失敗重試機制,故更加可靠,但是由于一直等待提交結(jié)果,提交的效率比較低。以下為同步提交 offset 的示例。
// 0 配置 Properties properties = new Properties(); // 連接 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消費者組id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); // 手動提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 1 創(chuàng)建一個消費者 "", "hello" KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 訂閱主題 first ArrayList<String> topics = new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); // 3 消費數(shù)據(jù) while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } // 同步提交 offset kafkaConsumer.commitSync(); }
- 異步提交 offset
雖然同步提交 offset 更可靠一些,但是由于其會阻塞當前線程,直到提交成功。因此吞吐量會受到很大的影響。因此更多的情況下,會選用異步提交 offset 的方式。
// 0 配置 Properties properties = new Properties(); // 連接 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消費者組id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); // 手動提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 1 創(chuàng)建一個消費者 "", "hello" KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 訂閱主題 first ArrayList<String> topics = new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); // 3 消費數(shù)據(jù) while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } // 同步提交 offset kafkaConsumer.commitAsync(); }
4.3 指定 Offset 消費
auto.offset.reset = earliest | latest | none 默認是 latest。 當 Kafka 中沒有初始偏移量(消費者組第一次消費)或服務器上不再存在當前偏移量時(例如該數(shù)據(jù)已被刪除),該怎么辦?
(1)earliest:自動將偏移量重置為最早的偏移量,–from-beginning。
(2)latest(默認值):自動將偏移量重置為最新偏移量。
(3)none:如果未找到消費者組的先前偏移量,則向消費者拋出異常。
// 0 配置信息 Properties properties = new Properties(); // 連接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 組id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3"); // 1 創(chuàng)建消費者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 訂閱主題 ArrayList<String> topics = new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); // 指定位置進行消費 Set<TopicPartition> assignment = kafkaConsumer.assignment(); // 保證分區(qū)分配方案已經(jīng)制定完畢 while (assignment.size() == 0){ kafkaConsumer.poll(Duration.ofSeconds(1)); assignment = kafkaConsumer.assignment(); } // 指定消費的offset for (TopicPartition topicPartition : assignment) { kafkaConsumer.seek(topicPartition,600); } // 3 消費數(shù)據(jù) while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } }
4.4 指定時間消費
需求:在生產(chǎn)環(huán)境中,會遇到最近消費的幾個小時數(shù)據(jù)異常,想重新按照時間消費。例如要求按照時間消費前一天的數(shù)據(jù),怎么處理?
// 0 配置信息 Properties properties = new Properties(); // 連接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 組id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3"); // 1 創(chuàng)建消費者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 訂閱主題 ArrayList<String> topics = new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); // 指定位置進行消費 Set<TopicPartition> assignment = kafkaConsumer.assignment(); // 保證分區(qū)分配方案已經(jīng)制定完畢 while (assignment.size() == 0){ kafkaConsumer.poll(Duration.ofSeconds(1)); assignment = kafkaConsumer.assignment(); } // 希望把時間轉(zhuǎn)換為對應的offset HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>(); // 封裝對應集合 for (TopicPartition topicPartition : assignment) { topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000); } Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap); // 指定消費的offset for (TopicPartition topicPartition : assignment) { OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition); kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset()); } // 3 消費數(shù)據(jù) while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } }
五、消費者事務
重復消費:已經(jīng)消費了數(shù)據(jù),但是 offset 沒提交。
漏消費:先提交 offset 后消費,有可能會造成數(shù)據(jù)的漏消費。
思考:怎么能做到既不漏消費也不重復消費呢?詳看消費者事務。
如果想完成Consumer端的精準一次性消費,那么需要Kafka消費端將消費過程和提交offset過程做原子綁定。此時我們需要將Kafka的offset保存到支持事務的自定義介質(zhì)(比 如MySQL)。
數(shù)據(jù)積壓(消費者如何提高吞吐量)
參數(shù)名稱 | 描述 |
fetch.max.bytes | 默認 Default: 52428800(50 m)。消費者獲取服務器端一批消息最大的字節(jié)數(shù)。如果服務器端一批次的數(shù)據(jù)大于該值(50m)仍然可以拉取回來這批數(shù)據(jù),因此,這不是一個絕對最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影響。 |
max.poll.records | 一次 poll 拉取數(shù)據(jù)返回消息的最大條數(shù),默認是 500 條 |
到此這篇關(guān)于Java中的Kafka消費者詳解的文章就介紹到這了,更多相關(guān)Kafka消費者內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中使用數(shù)組實現(xiàn)棧數(shù)據(jù)結(jié)構(gòu)實例
這篇文章主要介紹了Java中使用數(shù)組實現(xiàn)棧數(shù)據(jù)結(jié)構(gòu)實例,本文先是講解了實現(xiàn)棧至少應該包括以下幾個方法等知識,然后給出代碼實例,需要的朋友可以參考下2015-01-01IDEA2020.1同步系統(tǒng)設(shè)置到GitHub的方法
這篇文章主要介紹了IDEA2020.1同步系統(tǒng)設(shè)置到GitHub的方法,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-05-05Java高并發(fā)BlockingQueue重要的實現(xiàn)類詳解
這篇文章主要給大家介紹了關(guān)于Java高并發(fā)BlockingQueue重要的實現(xiàn)類的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-01-01Redis 集成Spring的示例代碼(spring-data-redis)
本篇文章主要介紹了Redis 集成Spring的示例代碼(spring-data-redis) ,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-09-09