欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

結合線程池實現(xiàn)apache?kafka消費者組的誤區(qū)及解決方法

 更新時間:2022年07月07日 11:03:27   作者:字母哥哥  
這篇文章主要介紹了結合線程池實現(xiàn)apache?kafka消費者組的誤區(qū)及解決方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

一個錯誤:多線程使用單一消費者

下圖顯現(xiàn)了一種錯誤的使用KafkaConsumer的方法

  • 創(chuàng)建多個線程用來消費kafka數(shù)據(jù)
  • 多線程使用同一個KafkaConsumer對象
  • 在單線程中使用這個KafkaConsumer對象,完成數(shù)據(jù)拉取、處理、提交偏移量。

在這里插入圖片描述

這種方式之所以錯誤的原因是:KafkaConsumer是線程不安全的,可能出現(xiàn)把同一批數(shù)據(jù)既給線程A處理,也交給線程B處理重復消費的問題。

一個誤區(qū):多線程就是消費者組

下圖中體現(xiàn)的是一種正常的KafkaConsumer使用方式

  • 使用一個KafkaConsumer拉取數(shù)據(jù)
  • 拉取數(shù)據(jù)后將一個批次的數(shù)據(jù)交給一個線程去處理

在這里插入圖片描述

這個處理方式不是錯誤,但是他只是一個消費者在消費kafka消息隊列中的數(shù)據(jù),不是消費者組的方式消費數(shù)據(jù)。無法充分利用kafka分區(qū)提升消息處理的吞吐量。

常規(guī)正確做法:使用線程池實現(xiàn)消費者組

下面的方法是常規(guī)的正確實現(xiàn)方式

在這里插入圖片描述

  • 因為KafkaConsumer是線程不安全的,所以不能跨線程使用KafkaConsumer
  • 每個線程持有一個KafkaConsumer對象
  • 多個線程的實現(xiàn)可以使用線程池,線程池的線程數(shù)量等于消費者組內(nèi)消費者的數(shù)量
public class MyConsumerGroup {
    public void groupConsumer(){
        ExecutorService executorService = Executors.newFixedThreadPool(6);
        for (int i = 0; i < 6; i++) {
            MyConsumer myConsumer = new MyConsumer();
            executorService.execute(myConsumer);
        }
    }
}

MyConsumer方法需要實現(xiàn)Runnable接口,并在run方法中調(diào)用MyConsumer#pollData。MyConsumer的代碼參考本專欄的《消費者Java實現(xiàn)》( 集成apache kafka-clients實現(xiàn)數(shù)據(jù)消費者)

@Override
public void run() {
    MyConsumer myConsumer = new MyConsumer();
    myConsumer.pollData();
}

到此這篇關于結合線程池實現(xiàn)apache kafka消費者組的文章就介紹到這了,更多相關apache kafka消費者組內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • springboot全局異常處理詳解

    springboot全局異常處理詳解

    本篇文章主要介紹了springboot全局異常處理詳解,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-05-05
  • JAVA調(diào)用SAP WEBSERVICE服務實現(xiàn)流程圖解

    JAVA調(diào)用SAP WEBSERVICE服務實現(xiàn)流程圖解

    這篇文章主要介紹了JAVA調(diào)用SAP WEBSERVICE服務實現(xiàn)流程圖解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-10-10
  • mybatis執(zhí)行錯誤但sql執(zhí)行正常問題

    mybatis執(zhí)行錯誤但sql執(zhí)行正常問題

    這篇文章主要介紹了mybatis執(zhí)行錯誤但sql執(zhí)行正常問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • 淺談java常量池

    淺談java常量池

    下面小編就為大家?guī)硪黄獪\談java常量池。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-06-06
  • Java 中的 xx ≠ null 是什么新語法

    Java 中的 xx ≠ null 是什么新語法

    Java中null是一個關鍵字,用來標識一個不確定的對象。因此可以將null賦給引用類型變量,但不可以將null賦給基本類型變量。本文給大家分享Java 中的 xx ≠ null 是什么新語法,感興趣的朋友一起看看吧
    2021-06-06
  • Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則解析

    Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則解析

    在Java中,ByteBuffer是java.nio包中的一個類,用于處理字節(jié)數(shù)據(jù),ByteBuffer提供了兩種方式來分配內(nèi)存:allocate和allocateDirect,這篇文章主要介紹了Java中ByteBuffer的allocate方法 和allocateDirect方法的區(qū)別和選用原則 ,需要的朋友可以參考下
    2023-12-12
  • 基于java 線程的幾種狀態(tài)(詳解)

    基于java 線程的幾種狀態(tài)(詳解)

    下面小編就為大家?guī)硪黄趈ava 線程的幾種狀態(tài)(詳解)。小編覺得挺不錯的,現(xiàn)在就想給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-09-09
  • 劍指Offer之Java算法習題精講N叉樹的遍歷及數(shù)組與字符串

    劍指Offer之Java算法習題精講N叉樹的遍歷及數(shù)組與字符串

    跟著思路走,之后從簡單題入手,反復去看,做過之后可能會忘記,之后再做一次,記不住就反復做,反復尋求思路和規(guī)律,慢慢積累就會發(fā)現(xiàn)質(zhì)的變化
    2022-03-03
  • Java Servlet3.0異步處理問題

    Java Servlet3.0異步處理問題

    這篇文章主要介紹了Java中Servlet3.0異步處理的原理以及遇到的問題分析,需要的朋友參考一下。
    2017-12-12
  • SWT(JFace)體驗之List演示匯總

    SWT(JFace)體驗之List演示匯總

    SWT(JFace)體驗之List演示代碼匯總
    2009-06-06

最新評論