欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

結(jié)合線程池實(shí)現(xiàn)apache?kafka消費(fèi)者組的誤區(qū)及解決方法

 更新時(shí)間:2022年07月07日 11:03:27   作者:字母哥哥  
這篇文章主要介紹了結(jié)合線程池實(shí)現(xiàn)apache?kafka消費(fèi)者組的誤區(qū)及解決方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

一個(gè)錯(cuò)誤:多線程使用單一消費(fèi)者

下圖顯現(xiàn)了一種錯(cuò)誤的使用KafkaConsumer的方法

  • 創(chuàng)建多個(gè)線程用來(lái)消費(fèi)kafka數(shù)據(jù)
  • 多線程使用同一個(gè)KafkaConsumer對(duì)象
  • 在單線程中使用這個(gè)KafkaConsumer對(duì)象,完成數(shù)據(jù)拉取、處理、提交偏移量。

在這里插入圖片描述

這種方式之所以錯(cuò)誤的原因是:KafkaConsumer是線程不安全的,可能出現(xiàn)把同一批數(shù)據(jù)既給線程A處理,也交給線程B處理重復(fù)消費(fèi)的問(wèn)題。

一個(gè)誤區(qū):多線程就是消費(fèi)者組

下圖中體現(xiàn)的是一種正常的KafkaConsumer使用方式

  • 使用一個(gè)KafkaConsumer拉取數(shù)據(jù)
  • 拉取數(shù)據(jù)后將一個(gè)批次的數(shù)據(jù)交給一個(gè)線程去處理

在這里插入圖片描述

這個(gè)處理方式不是錯(cuò)誤,但是他只是一個(gè)消費(fèi)者在消費(fèi)kafka消息隊(duì)列中的數(shù)據(jù),不是消費(fèi)者組的方式消費(fèi)數(shù)據(jù)。無(wú)法充分利用kafka分區(qū)提升消息處理的吞吐量。

常規(guī)正確做法:使用線程池實(shí)現(xiàn)消費(fèi)者組

下面的方法是常規(guī)的正確實(shí)現(xiàn)方式

在這里插入圖片描述

  • 因?yàn)镵afkaConsumer是線程不安全的,所以不能跨線程使用KafkaConsumer
  • 每個(gè)線程持有一個(gè)KafkaConsumer對(duì)象
  • 多個(gè)線程的實(shí)現(xiàn)可以使用線程池,線程池的線程數(shù)量等于消費(fèi)者組內(nèi)消費(fèi)者的數(shù)量
public class MyConsumerGroup {
    public void groupConsumer(){
        ExecutorService executorService = Executors.newFixedThreadPool(6);
        for (int i = 0; i < 6; i++) {
            MyConsumer myConsumer = new MyConsumer();
            executorService.execute(myConsumer);
        }
    }
}

MyConsumer方法需要實(shí)現(xiàn)Runnable接口,并在run方法中調(diào)用MyConsumer#pollData。MyConsumer的代碼參考本專(zhuān)欄的《消費(fèi)者Java實(shí)現(xiàn)》( 集成apache kafka-clients實(shí)現(xiàn)數(shù)據(jù)消費(fèi)者)

@Override
public void run() {
    MyConsumer myConsumer = new MyConsumer();
    myConsumer.pollData();
}

到此這篇關(guān)于結(jié)合線程池實(shí)現(xiàn)apache kafka消費(fèi)者組的文章就介紹到這了,更多相關(guān)apache kafka消費(fèi)者組內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • springboot全局異常處理詳解

    springboot全局異常處理詳解

    本篇文章主要介紹了springboot全局異常處理詳解,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-05-05
  • JAVA調(diào)用SAP WEBSERVICE服務(wù)實(shí)現(xiàn)流程圖解

    JAVA調(diào)用SAP WEBSERVICE服務(wù)實(shí)現(xiàn)流程圖解

    這篇文章主要介紹了JAVA調(diào)用SAP WEBSERVICE服務(wù)實(shí)現(xiàn)流程圖解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-10-10
  • mybatis執(zhí)行錯(cuò)誤但sql執(zhí)行正常問(wèn)題

    mybatis執(zhí)行錯(cuò)誤但sql執(zhí)行正常問(wèn)題

    這篇文章主要介紹了mybatis執(zhí)行錯(cuò)誤但sql執(zhí)行正常問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • 淺談java常量池

    淺談java常量池

    下面小編就為大家?guī)?lái)一篇淺談java常量池。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2016-06-06
  • Java 中的 xx ≠ null 是什么新語(yǔ)法

    Java 中的 xx ≠ null 是什么新語(yǔ)法

    Java中null是一個(gè)關(guān)鍵字,用來(lái)標(biāo)識(shí)一個(gè)不確定的對(duì)象。因此可以將null賦給引用類(lèi)型變量,但不可以將null賦給基本類(lèi)型變量。本文給大家分享Java 中的 xx ≠ null 是什么新語(yǔ)法,感興趣的朋友一起看看吧
    2021-06-06
  • Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則解析

    Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則解析

    在Java中,ByteBuffer是java.nio包中的一個(gè)類(lèi),用于處理字節(jié)數(shù)據(jù),ByteBuffer提供了兩種方式來(lái)分配內(nèi)存:allocate和allocateDirect,這篇文章主要介紹了Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則 ,需要的朋友可以參考下
    2023-12-12
  • 基于java 線程的幾種狀態(tài)(詳解)

    基于java 線程的幾種狀態(tài)(詳解)

    下面小編就為大家?guī)?lái)一篇基于java 線程的幾種狀態(tài)(詳解)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就想給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-09-09
  • 劍指Offer之Java算法習(xí)題精講N叉樹(shù)的遍歷及數(shù)組與字符串

    劍指Offer之Java算法習(xí)題精講N叉樹(shù)的遍歷及數(shù)組與字符串

    跟著思路走,之后從簡(jiǎn)單題入手,反復(fù)去看,做過(guò)之后可能會(huì)忘記,之后再做一次,記不住就反復(fù)做,反復(fù)尋求思路和規(guī)律,慢慢積累就會(huì)發(fā)現(xiàn)質(zhì)的變化
    2022-03-03
  • Java Servlet3.0異步處理問(wèn)題

    Java Servlet3.0異步處理問(wèn)題

    這篇文章主要介紹了Java中Servlet3.0異步處理的原理以及遇到的問(wèn)題分析,需要的朋友參考一下。
    2017-12-12
  • SWT(JFace)體驗(yàn)之List演示匯總

    SWT(JFace)體驗(yàn)之List演示匯總

    SWT(JFace)體驗(yàn)之List演示代碼匯總
    2009-06-06

最新評(píng)論