欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

流式圖表拒絕增刪改查之kafka核心消費邏輯下篇

 更新時間:2023年04月12日 15:18:59   作者:在下uptown  
這篇文章主要為大家介紹了流式圖表拒絕增刪改查之kafka核心消費邏輯講解的下篇,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前篇回顧

kafka消費者線程

突擊檢查八股文,實現(xiàn)線程的方法有哪些?嗯?沒復(fù)習(xí)是吧,行沒關(guān)系,那感謝參加本次面試哈。

常用的幾種方式分別是:

  • 繼承Thread類,重寫run方法
  • 實現(xiàn)Runbale接口,重寫run方法
  • 實現(xiàn)Callable接口,重寫call方法

這里我們直接創(chuàng)捷出一個任務(wù)類實現(xiàn)Runable方法,重寫run方法,一個線程當(dāng)作一個kafka client,所以要在任務(wù)類中聲明一個KafkaConsumer的成員變量,另外創(chuàng)建任務(wù)需要指定當(dāng)前任務(wù)的名稱也就是線程名,還有要監(jiān)聽的topic主題。

private KafkaConsumer<String, String> consumer;
private String topic;
private String threadName;

name和topic通過構(gòu)造方法傳進(jìn)來,同時在構(gòu)造方法里完成對client的初始化操作。

/**
    * 封裝必要信息
    * @param bootServer 生產(chǎn)者ip
    * @param groupId 分組信息
    * @param topic  訂閱主題
    */
   public KafkaConsumerRunnable(String bootServer, String groupId, String topic) {
       this.topic = topic;
       Properties props = new Properties();
       props.put("bootstrap.servers", bootServer);
       props.put("group.id", groupId);
       props.put("enable.auto.commit", "false");
       props.put("auto.offset.reset", "latest");
       props.put("max.poll.records", 5);
       props.put("session.timeout.ms", "60000");
       props.put("max.poll.interval.ms", 300000);
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //鍵反序列化方式
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       this.consumer = new KafkaConsumer&lt;&gt;(props);
   }

這里封裝kafka client的必要信息,入?yún)ootServer為kafka集群ip,groupId為threadName,我們規(guī)定一個線程為一個kafka消費鏈接,消費一個topic。

上一篇線程池保證了任務(wù)不會輕易掛掉,就算掛掉了也會重新提交,所以為了節(jié)省資源不做所謂的同groupId的負(fù)載操作。session.timeout.ms和max.poll.interval.ms可以根據(jù)當(dāng)前的kafka資源靈活配置,不然可能會引發(fā)一些reblance。

enable.auto.commit設(shè)置為false,手動提交offset,auto.offset.reset這塊由于業(yè)務(wù)特殊,本來就是流式圖表瞬時的展示,如果真的出現(xiàn)了數(shù)據(jù)丟失那就丟了吧,從最新的數(shù)據(jù)讀取。

接下來只需要處理下消費邏輯,consumer.subscribe(Collections.singletonList(this.topic))開始訂閱監(jiān)聽kafka數(shù)據(jù),搞一個while true不斷的消費數(shù)據(jù),try catch只需要對WakeupException做處理,kafka客戶端會在關(guān)閉的時候拋出WakeupException異常。

finally里提交offset,無論這條offset對應(yīng)的數(shù)據(jù)消費成功還是失敗都是消費過了,失敗了就過去了。

   @Override
   public void run() {
   consumer.subscribe(Collections.singletonList(this.topic));
   String key = "stream_chart:" + this.name;
   Thread.currentThread().setName(key);
   try {
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
         // 如果隊列中沒有消息 等待KAFKA_TIME_OUT后調(diào)用poll,如果有消息立即消費
         for (ConsumerRecord<String, String> record : records) {
            String value = record.value();
            log.info("線程 {} 消費kafka數(shù)據(jù) -> {} \n", Thread.currentThread().getName(), value);
            RedisConfig.getRedisTemplate().opsForZSet().add(key, value, Instant.now().getEpochSecond() * 1000);
         }
      }
   } catch (WakeupException e) {
      log.info("ignore for shutdown", e);
   } finally {
      consumer.commitAsync();
   }
}

我們消費到數(shù)據(jù)直接放到redis的zset結(jié)構(gòu)里,當(dāng)前的時間戳作為score,最后留一個關(guān)閉客戶端的后門

// 退出后關(guān)掉客戶端
public void shutDown() {
   consumer.wakeup();
}

任務(wù)提交

任務(wù)提交這塊只需要在業(yè)務(wù)service中注入線程池,創(chuàng)建對應(yīng)的KafkaRunable任務(wù)封裝對應(yīng)的信息,執(zhí)行execute即可。

這里有個坑需要注意下,第二次突擊檢查八股文,線程池提交方法submitexecute的區(qū)別說一下。不知道的立刻去熟讀并背誦。

public class TestTheadPool {
    public static void main(String[] args) {
        ExecutorService executorService= Executors.newFixedThreadPool(1);
        executorService.submit(new task("submit"));
        executorService.execute(new task("execute"));
    }
}
class task implements  Runnable{
    private String name;
    public task(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println(this.name + " start task");
        int i=1/0;
    }
}

熟悉的同學(xué)通過示例代碼可以看出來,submit提交的線程不會拋出異常代碼,只有獲取Future返回值并執(zhí)行g(shù)et方法才會捕獲到異常。這塊涉及到異步的東西不再贅述

try {
    Future<?> submit = executorService.submit(new task("submit"));
    submit.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

所以我們要使用execute執(zhí)行,不然kafka消費線程里消費失敗了攔截不到就不會被重新提交,導(dǎo)致線程掛掉。

以上就是流式圖表拒絕增刪改查之kafka核心消費邏輯下篇的詳細(xì)內(nèi)容,更多關(guān)于kafka消費流式圖表的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 基于java實現(xiàn)租車管理系統(tǒng)

    基于java實現(xiàn)租車管理系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了基于java實現(xiàn)租車管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-12-12
  • Java實現(xiàn)二維碼、條形碼功能(案例代碼)

    Java實現(xiàn)二維碼、條形碼功能(案例代碼)

    ZXing是一個開放源碼的,用Java實現(xiàn)的多種格式的1D/2D條碼圖像處理庫,它包含了聯(lián)系到其他語言的端口,Zxing可以實現(xiàn)使用手機的內(nèi)置的攝像頭完成條形碼的掃描及解碼,這篇文章主要介紹了Java實現(xiàn)二維碼、條形碼等功能,需要的朋友可以參考下
    2024-01-01
  • SpringBoot項目啟動后再請求遠(yuǎn)程接口的解決方式

    SpringBoot項目啟動后再請求遠(yuǎn)程接口的解決方式

    Spring?Boot是由Pivotal團隊提供的全新框架,其設(shè)計目的是用來簡化Spring應(yīng)用的創(chuàng)建、運行、調(diào)試、部署等,這篇文章主要介紹了SpringBoot項目啟動后再請求遠(yuǎn)程接口的實現(xiàn)方式?,需要的朋友可以參考下
    2023-02-02
  • Java如何實現(xiàn)長圖文生成的示例代碼

    Java如何實現(xiàn)長圖文生成的示例代碼

    這篇文章主要介紹了Java如何實現(xiàn)長圖文生成的示例代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-08-08
  • 淺談mybatis返回單一對象或?qū)ο罅斜淼膯栴}

    淺談mybatis返回單一對象或?qū)ο罅斜淼膯栴}

    這篇文章主要介紹了淺談mybatis返回單一對象或?qū)ο罅斜淼膯栴},具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • SpringMVC自定義攔截器登錄檢測功能的實現(xiàn)代碼

    SpringMVC自定義攔截器登錄檢測功能的實現(xiàn)代碼

    這篇文章主要介紹了SpringMVC自定義攔截器登錄檢測功能的實現(xiàn),本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-08-08
  • java web實現(xiàn)簡單留言板功能

    java web實現(xiàn)簡單留言板功能

    這篇文章主要為大家詳細(xì)介紹了java web實現(xiàn)簡單留言板功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-11-11
  • IDEA之啟動參數(shù),配置文件默認(rèn)參數(shù)的操作

    IDEA之啟動參數(shù),配置文件默認(rèn)參數(shù)的操作

    這篇文章主要介紹了IDEA之啟動參數(shù),配置文件默認(rèn)參數(shù)的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01
  • Java中Spring的Security使用詳解

    Java中Spring的Security使用詳解

    這篇文章主要介紹了Java中Spring的Security使用詳解,在web應(yīng)用開發(fā)中,安全無疑是十分重要的,選擇Spring Security來保護web應(yīng)用是一個非常好的選擇,需要的朋友可以參考下
    2023-07-07
  • Java中String、StringBuffer、StringBuilder的區(qū)別介紹

    Java中String、StringBuffer、StringBuilder的區(qū)別介紹

    這篇文章主要介紹了Java中String、StringBuffer、StringBuilder的區(qū)別介紹,本文講解了可變與不可變、是否多線程安全、gBuilder與StringBuffer共同點等內(nèi)容,需要的朋友可以參考下
    2015-06-06

最新評論