結(jié)合線程池實(shí)現(xiàn)apache?kafka消費(fèi)者組的誤區(qū)及解決方法
一個(gè)錯(cuò)誤:多線程使用單一消費(fèi)者
下圖顯現(xiàn)了一種錯(cuò)誤的使用KafkaConsumer的方法
- 創(chuàng)建多個(gè)線程用來(lái)消費(fèi)kafka數(shù)據(jù)
- 多線程使用同一個(gè)KafkaConsumer對(duì)象
- 在單線程中使用這個(gè)KafkaConsumer對(duì)象,完成數(shù)據(jù)拉取、處理、提交偏移量。
這種方式之所以錯(cuò)誤的原因是:KafkaConsumer是線程不安全的,可能出現(xiàn)把同一批數(shù)據(jù)既給線程A處理,也交給線程B處理重復(fù)消費(fèi)的問(wèn)題。
一個(gè)誤區(qū):多線程就是消費(fèi)者組
下圖中體現(xiàn)的是一種正常的KafkaConsumer使用方式
- 使用一個(gè)KafkaConsumer拉取數(shù)據(jù)
- 拉取數(shù)據(jù)后將一個(gè)批次的數(shù)據(jù)交給一個(gè)線程去處理
這個(gè)處理方式不是錯(cuò)誤,但是他只是一個(gè)消費(fèi)者在消費(fèi)kafka消息隊(duì)列中的數(shù)據(jù),不是消費(fèi)者組的方式消費(fèi)數(shù)據(jù)。無(wú)法充分利用kafka分區(qū)提升消息處理的吞吐量。
常規(guī)正確做法:使用線程池實(shí)現(xiàn)消費(fèi)者組
下面的方法是常規(guī)的正確實(shí)現(xiàn)方式
- 因?yàn)镵afkaConsumer是線程不安全的,所以不能跨線程使用KafkaConsumer
- 每個(gè)線程持有一個(gè)KafkaConsumer對(duì)象
- 多個(gè)線程的實(shí)現(xiàn)可以使用線程池,線程池的線程數(shù)量等于消費(fèi)者組內(nèi)消費(fèi)者的數(shù)量
public class MyConsumerGroup { public void groupConsumer(){ ExecutorService executorService = Executors.newFixedThreadPool(6); for (int i = 0; i < 6; i++) { MyConsumer myConsumer = new MyConsumer(); executorService.execute(myConsumer); } } }
MyConsumer方法需要實(shí)現(xiàn)Runnable接口,并在run方法中調(diào)用MyConsumer#pollData。MyConsumer的代碼參考本專(zhuān)欄的《消費(fèi)者Java實(shí)現(xiàn)》( 集成apache kafka-clients實(shí)現(xiàn)數(shù)據(jù)消費(fèi)者)
@Override public void run() { MyConsumer myConsumer = new MyConsumer(); myConsumer.pollData(); }
到此這篇關(guān)于結(jié)合線程池實(shí)現(xiàn)apache kafka消費(fèi)者組的文章就介紹到這了,更多相關(guān)apache kafka消費(fèi)者組內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JAVA調(diào)用SAP WEBSERVICE服務(wù)實(shí)現(xiàn)流程圖解
這篇文章主要介紹了JAVA調(diào)用SAP WEBSERVICE服務(wù)實(shí)現(xiàn)流程圖解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10mybatis執(zhí)行錯(cuò)誤但sql執(zhí)行正常問(wèn)題
這篇文章主要介紹了mybatis執(zhí)行錯(cuò)誤但sql執(zhí)行正常問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則解析
在Java中,ByteBuffer是java.nio包中的一個(gè)類(lèi),用于處理字節(jié)數(shù)據(jù),ByteBuffer提供了兩種方式來(lái)分配內(nèi)存:allocate和allocateDirect,這篇文章主要介紹了Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則 ,需要的朋友可以參考下2023-12-12劍指Offer之Java算法習(xí)題精講N叉樹(shù)的遍歷及數(shù)組與字符串
跟著思路走,之后從簡(jiǎn)單題入手,反復(fù)去看,做過(guò)之后可能會(huì)忘記,之后再做一次,記不住就反復(fù)做,反復(fù)尋求思路和規(guī)律,慢慢積累就會(huì)發(fā)現(xiàn)質(zhì)的變化2022-03-03