欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Flink作業(yè)Task運(yùn)行源碼解析

 更新時間:2022年12月01日 10:50:33   作者:xiangel  
這篇文章主要為大家介紹了Flink作業(yè)Task運(yùn)行源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

上一篇我們分析了Flink部署集群的過程和作業(yè)提交的方式,本篇我們來分析下,具體作業(yè)是如何被調(diào)度和計(jì)算的。具體分為2個部分來介紹

  • 作業(yè)運(yùn)行的整體框架,對相關(guān)的重要角色有深入了解
  • 計(jì)算流程,重點(diǎn)是如何調(diào)度具體的operator機(jī)制

概覽

首先我們來了解下整體的框架 JobMaster: 計(jì)算框架的主節(jié)點(diǎn),負(fù)責(zé)運(yùn)行單個JobGraph,包括任務(wù)的調(diào)度,資源申請和TaskManager的管理等。 TaskExecutor: 負(fù)責(zé)多個Task的具體執(zhí)行 Dispatcher接收到submitJob的請求后,會生成一個JobMaster實(shí)例(具體為Dispatcher創(chuàng)建JobManagerRunner,JobManagerRunner創(chuàng)建JobMaster),下面來具體介紹下JobMaster和TaskExecutor的內(nèi)部信息

調(diào)度框架

JobMaster

    private final SchedulerNG schedulerNG;
    private final ShuffleMaster<?> shuffleMaster;
    private final SlotPoolService slotPoolService;
    private final LeaderRetrievalService resourceManagerLeaderRetriever;
    private final BlobWriter blobWriter;
    private final JobMasterPartitionTracker partitionTracker;
    private HeartbeatManager<TaskExecutorToJobManagerHeartbeatPayload, AllocatedSlotReport>
            taskManagerHeartbeatManager;
    private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;

JobMaster作為整個任務(wù)調(diào)度計(jì)算的主節(jié)點(diǎn),需要和一些外部角色進(jìn)行交互,具體的如下:

  • resourceManagerLeaderRetriever: 負(fù)責(zé)和resourceManager間的通訊
  • slotPoolService: 用于管理slotpool的,slot資源管理,負(fù)責(zé)slot的申請、釋放等。
  • partitionTracker: 負(fù)責(zé)算子計(jì)算結(jié)果數(shù)據(jù)分區(qū)的跟蹤
  • schedulerNG:內(nèi)部的調(diào)度引擎,負(fù)責(zé)job的調(diào)度處理
  • shuffleMaster: 數(shù)據(jù)shuffle處理
  • taskManagerHeartbeatManager:記錄和taskManager間的心跳信息,
  • resourceManagerHeartbeatManager:記錄和resourceManager間的心跳

ScheduleNG

ScheduleNG實(shí)際負(fù)責(zé)job調(diào)度處理,包括生成ExecutionGraph,作業(yè)的調(diào)度執(zhí)行,任務(wù)出錯處理等。其實(shí)現(xiàn)類為DefaultScheduler

  • SchedulingStrategy:任務(wù)調(diào)度的策略,實(shí)現(xiàn)類為PipelinedRegionSchedulingStrategy,按pipeline region的粒度來調(diào)度任務(wù)
  • ExecutionGraphFactory:其實(shí)現(xiàn)類為DefaultExecutionGraphFactory,創(chuàng)建ExecutionGraph的工廠類

TaskExecutor

實(shí)際任務(wù)運(yùn)行的節(jié)點(diǎn),該類負(fù)責(zé)多個任務(wù)的運(yùn)行,首先我們看看其實(shí)現(xiàn)了TaskExecutorGateway接口,TaskExecutorGateway定義了各類可以調(diào)用的功能接口,具體內(nèi)容見下表

分類方法名說明
Task操作相關(guān)SubmitTask向TaskExecutor提交任務(wù)
Task操作相關(guān)cancelTask取消指定的任務(wù)
Task操作相關(guān)sendOperatorEventToTask發(fā)送算子事件給Task
Slot操作相關(guān)requestSlot給指定的Job分配指定的slot
Slot操作相關(guān)freeSlot釋放對應(yīng)的slot
Slot操作相關(guān)freeInactiveSlots釋放指定Job的未使用的slot
Partition操作相關(guān)updatePartitions更新分區(qū)信息
Partition操作相關(guān)releaseOrPromotePartitions批量釋放或保留分區(qū)
Partition操作相關(guān)releaseClusterPartitions釋放屬于給定datasets的所有集群分區(qū)數(shù)據(jù)
checkpoint操作相關(guān)triggerCheckpoint觸發(fā)指定任務(wù)的checkpoint處理
checkpoint操作相關(guān)confirmCheckpoint確認(rèn)指定任務(wù)的checkpoint
checkpoint操作相關(guān)abortCheckpoint終止給定任務(wù)的checkpoint

Task

一個Task負(fù)責(zé)TaskManager上一個subtask的一次執(zhí)行,Task對Flink Operator進(jìn)行包裝然后運(yùn)行,并提供需要的各類服務(wù),如消費(fèi)輸入數(shù)據(jù),生產(chǎn)數(shù)據(jù)以及和JobManager通訊。Task實(shí)現(xiàn)了Runnable接口,即通過一個單獨(dú)的線程來運(yùn)行,而其中的Flink Operator部分封裝在實(shí)現(xiàn)了TaskInvokable接口的類中,實(shí)現(xiàn)類主要為SourceStreamTask和OneInputStreamTask。下面分別詳細(xì)介紹下這幾個類

  • Task: 對應(yīng)為一個線程,來運(yùn)行具體的Operator的邏輯,并包括相關(guān)的其他的輔助功能,包括如執(zhí)行狀態(tài)的管理、結(jié)果數(shù)據(jù)管理(ResultPartitionWriters)、輸入數(shù)據(jù)(IndexInputGate)以及生成封裝了Operator邏輯的TaskInvokable實(shí)例并運(yùn)行
  • TaskInvokable:封裝了具體Operator的處理邏輯,主要包括有2個方法,restore()和invoke()。restore()方法在invoke()之前調(diào)用,用于恢復(fù)上次的有效狀態(tài)。invoke()方法執(zhí)行具體的處理邏輯。下面我們看看其實(shí)現(xiàn)子類(這里只列了與StreamGraph相關(guān)的實(shí)現(xiàn)類,對于其他的子類沒有展示)

  • SourceStreamTask:用于執(zhí)行StreamSource,即源頭的讀取數(shù)據(jù)類Operator
  • OneInputStreamTask:用于執(zhí)行OneInputStreamOperator,即只有一個輸入的operator
  • TwoInputStreamTask: 用于執(zhí)行TwoInputStreamOperator,有2個輸入的operator
  • MultipleInputStreamTask: 用于執(zhí)行MultipleInputStreamOperator,有多個輸入的operator

計(jì)算框架

計(jì)算框架這節(jié)主要來了解數(shù)據(jù)是如何在Flink中如何處理和流轉(zhuǎn)的。這里我們主要回答以下幾個問題:

  • Flink中整個數(shù)據(jù)的處理流程,單條數(shù)據(jù)是如何在各個算子間流轉(zhuǎn)和處理的
  • 對于算子chain和其他算子其底層實(shí)現(xiàn)區(qū)別是怎樣的,為何chain后的效率會高 我們先以StreamMap算子為例來看整體計(jì)算框架的設(shè)計(jì)
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {
    private static final long serialVersionUID = 1L;
    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

這里StreamMap實(shí)現(xiàn)了Input接口,其中在實(shí)現(xiàn)的processElement()方法中實(shí)現(xiàn)了具體的對具體數(shù)據(jù)的操作處理(Operator),并將結(jié)果通過Output接口的collect()方法發(fā)射出去。我們先看看這2個接口定義的方法

基本上2邊是一一對應(yīng)的關(guān)系,Input負(fù)責(zé)處理Element\Watermark\WatermarkStatus\LatencyMarker,而Output負(fù)責(zé)emit這些。這里Input是處理一個輸入的,如果是2個輸入那對應(yīng)的就是TwoInputStreamOperator

算子計(jì)算處理

對于Chain的操作,是通過Output接口的實(shí)現(xiàn)類ChainingOutput.java

    // ChainingOutput.java
    @Override
    public void collect(StreamRecord<T> record) {
        pushToOperator(record);
    }
    protected <X> void pushToOperator(StreamRecord<X> record) {
        try {
            ...
            input.setKeyContextElement(castRecord);
            input.processElement(castRecord);
        } catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }

這里可以看到在output.collect()方法中把數(shù)據(jù)再推送到了算子,然后算子(input)繼續(xù)執(zhí)行processElement()這樣來實(shí)現(xiàn)了在當(dāng)前線程內(nèi)的pipeline處理,

總結(jié)

本篇我們介紹了Flink是如何來執(zhí)行相應(yīng)的算子來實(shí)現(xiàn)計(jì)算的,主要介紹了TaskExecutor運(yùn)行的Task實(shí)現(xiàn),以及chain算子是如何串行來運(yùn)行的。對于算子之間的數(shù)據(jù)交互這塊我們后面一篇來單獨(dú)介紹。

以上就是Flink作業(yè)Task運(yùn)行源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Flink作業(yè)Task運(yùn)行的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java常用類庫StringBuffer,Runtime,日期操作類等類庫總結(jié)

    Java常用類庫StringBuffer,Runtime,日期操作類等類庫總結(jié)

    這篇文章主要介紹了Java常用類庫StringBuffer,Runtime,日期操作類等類庫總結(jié),需要的朋友可以參考下
    2020-02-02
  • mall整合SpringTask實(shí)現(xiàn)定時任務(wù)的方法示例

    mall整合SpringTask實(shí)現(xiàn)定時任務(wù)的方法示例

    這篇文章主要介紹了mall整合SpringTask實(shí)現(xiàn)定時任務(wù)的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-06-06
  • Mybatis如何分割字符串

    Mybatis如何分割字符串

    這篇文章主要介紹了Mybatis如何分割字符串問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • Java自動解壓文件實(shí)例代碼

    Java自動解壓文件實(shí)例代碼

    Java自動解壓文件實(shí)例代碼,需要的朋友可以參考一下
    2013-04-04
  • springboot整合sentinel的方法教程

    springboot整合sentinel的方法教程

    這篇文章主要介紹了springboot整合sentinel的方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11
  • IDEA中Web項(xiàng)目控制臺亂碼的問題及解決方法

    IDEA中Web項(xiàng)目控制臺亂碼的問題及解決方法

    這篇文章主要介紹了IDEA中Web項(xiàng)目控制臺亂碼的問題及解決方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-08-08
  • springboot項(xiàng)目獲取resources相對路徑的方法

    springboot項(xiàng)目獲取resources相對路徑的方法

    這篇文章主要介紹了springboot項(xiàng)目獲取resources相對路徑的方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • Java Scanner如何獲取字符串和帶空格的字符串

    Java Scanner如何獲取字符串和帶空格的字符串

    這篇文章主要介紹了Java Scanner如何獲取字符串和帶空格的字符串問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • Java?Zip壓縮之簡化文件和文件夾的壓縮操作

    Java?Zip壓縮之簡化文件和文件夾的壓縮操作

    這篇文章主要給大家介紹了關(guān)于Java?Zip壓縮之簡化文件和文件夾的壓縮操作,Zip壓縮是一種常見的文件壓縮格式,它將多個文件和文件夾打包成一個以.zip為后綴的壓縮包,需要的朋友可以參考下
    2023-10-10
  • 使用SpringBoot簡單實(shí)現(xiàn)無感知的刷新 Token功能

    使用SpringBoot簡單實(shí)現(xiàn)無感知的刷新 Token功能

    實(shí)現(xiàn)無感知的刷新 Token 是一種提升用戶體驗(yàn)的常用技術(shù),可以在用戶使用應(yīng)用時自動更新 Token,無需用戶手動干預(yù),這種技術(shù)在需要長時間保持用戶登錄狀態(tài)的應(yīng)用中非常有用,以下是使用Spring Boot實(shí)現(xiàn)無感知刷新Token的一個場景案例和相應(yīng)的示例代碼
    2024-09-09

最新評論