結合線程池實現(xiàn)apache?kafka消費者組的誤區(qū)及解決方法
一個錯誤:多線程使用單一消費者
下圖顯現(xiàn)了一種錯誤的使用KafkaConsumer的方法
- 創(chuàng)建多個線程用來消費kafka數(shù)據(jù)
- 多線程使用同一個KafkaConsumer對象
- 在單線程中使用這個KafkaConsumer對象,完成數(shù)據(jù)拉取、處理、提交偏移量。
這種方式之所以錯誤的原因是:KafkaConsumer是線程不安全的,可能出現(xiàn)把同一批數(shù)據(jù)既給線程A處理,也交給線程B處理重復消費的問題。
一個誤區(qū):多線程就是消費者組
下圖中體現(xiàn)的是一種正常的KafkaConsumer使用方式
- 使用一個KafkaConsumer拉取數(shù)據(jù)
- 拉取數(shù)據(jù)后將一個批次的數(shù)據(jù)交給一個線程去處理
這個處理方式不是錯誤,但是他只是一個消費者在消費kafka消息隊列中的數(shù)據(jù),不是消費者組的方式消費數(shù)據(jù)。無法充分利用kafka分區(qū)提升消息處理的吞吐量。
常規(guī)正確做法:使用線程池實現(xiàn)消費者組
下面的方法是常規(guī)的正確實現(xiàn)方式
- 因為KafkaConsumer是線程不安全的,所以不能跨線程使用KafkaConsumer
- 每個線程持有一個KafkaConsumer對象
- 多個線程的實現(xiàn)可以使用線程池,線程池的線程數(shù)量等于消費者組內(nèi)消費者的數(shù)量
public class MyConsumerGroup { public void groupConsumer(){ ExecutorService executorService = Executors.newFixedThreadPool(6); for (int i = 0; i < 6; i++) { MyConsumer myConsumer = new MyConsumer(); executorService.execute(myConsumer); } } }
MyConsumer方法需要實現(xiàn)Runnable接口,并在run方法中調(diào)用MyConsumer#pollData。MyConsumer的代碼參考本專欄的《消費者Java實現(xiàn)》( 集成apache kafka-clients實現(xiàn)數(shù)據(jù)消費者)
@Override public void run() { MyConsumer myConsumer = new MyConsumer(); myConsumer.pollData(); }
到此這篇關于結合線程池實現(xiàn)apache kafka消費者組的文章就介紹到這了,更多相關apache kafka消費者組內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
JAVA調(diào)用SAP WEBSERVICE服務實現(xiàn)流程圖解
這篇文章主要介紹了JAVA調(diào)用SAP WEBSERVICE服務實現(xiàn)流程圖解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-10-10mybatis執(zhí)行錯誤但sql執(zhí)行正常問題
這篇文章主要介紹了mybatis執(zhí)行錯誤但sql執(zhí)行正常問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則解析
在Java中,ByteBuffer是java.nio包中的一個類,用于處理字節(jié)數(shù)據(jù),ByteBuffer提供了兩種方式來分配內(nèi)存:allocate和allocateDirect,這篇文章主要介紹了Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則 ,需要的朋友可以參考下2023-12-12劍指Offer之Java算法習題精講N叉樹的遍歷及數(shù)組與字符串
跟著思路走,之后從簡單題入手,反復去看,做過之后可能會忘記,之后再做一次,記不住就反復做,反復尋求思路和規(guī)律,慢慢積累就會發(fā)現(xiàn)質(zhì)的變化2022-03-03