Flink作業(yè)Task運(yùn)行源碼解析
引言
上一篇我們分析了Flink部署集群的過程和作業(yè)提交的方式,本篇我們來分析下,具體作業(yè)是如何被調(diào)度和計(jì)算的。具體分為2個部分來介紹
- 作業(yè)運(yùn)行的整體框架,對相關(guān)的重要角色有深入了解
- 計(jì)算流程,重點(diǎn)是如何調(diào)度具體的operator機(jī)制
概覽
首先我們來了解下整體的框架 JobMaster: 計(jì)算框架的主節(jié)點(diǎn),負(fù)責(zé)運(yùn)行單個JobGraph,包括任務(wù)的調(diào)度,資源申請和TaskManager的管理等。 TaskExecutor: 負(fù)責(zé)多個Task的具體執(zhí)行 Dispatcher接收到submitJob的請求后,會生成一個JobMaster實(shí)例(具體為Dispatcher創(chuàng)建JobManagerRunner,JobManagerRunner創(chuàng)建JobMaster),下面來具體介紹下JobMaster和TaskExecutor的內(nèi)部信息
調(diào)度框架
JobMaster
private final SchedulerNG schedulerNG; private final ShuffleMaster<?> shuffleMaster; private final SlotPoolService slotPoolService; private final LeaderRetrievalService resourceManagerLeaderRetriever; private final BlobWriter blobWriter; private final JobMasterPartitionTracker partitionTracker; private HeartbeatManager<TaskExecutorToJobManagerHeartbeatPayload, AllocatedSlotReport> taskManagerHeartbeatManager; private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
JobMaster作為整個任務(wù)調(diào)度計(jì)算的主節(jié)點(diǎn),需要和一些外部角色進(jìn)行交互,具體的如下:
- resourceManagerLeaderRetriever: 負(fù)責(zé)和resourceManager間的通訊
- slotPoolService: 用于管理slotpool的,slot資源管理,負(fù)責(zé)slot的申請、釋放等。
- partitionTracker: 負(fù)責(zé)算子計(jì)算結(jié)果數(shù)據(jù)分區(qū)的跟蹤
- schedulerNG:內(nèi)部的調(diào)度引擎,負(fù)責(zé)job的調(diào)度處理
- shuffleMaster: 數(shù)據(jù)shuffle處理
- taskManagerHeartbeatManager:記錄和taskManager間的心跳信息,
- resourceManagerHeartbeatManager:記錄和resourceManager間的心跳
ScheduleNG
ScheduleNG實(shí)際負(fù)責(zé)job調(diào)度處理,包括生成ExecutionGraph,作業(yè)的調(diào)度執(zhí)行,任務(wù)出錯處理等。其實(shí)現(xiàn)類為DefaultScheduler
- SchedulingStrategy:任務(wù)調(diào)度的策略,實(shí)現(xiàn)類為PipelinedRegionSchedulingStrategy,按pipeline region的粒度來調(diào)度任務(wù)
- ExecutionGraphFactory:其實(shí)現(xiàn)類為DefaultExecutionGraphFactory,創(chuàng)建ExecutionGraph的工廠類
TaskExecutor
實(shí)際任務(wù)運(yùn)行的節(jié)點(diǎn),該類負(fù)責(zé)多個任務(wù)的運(yùn)行,首先我們看看其實(shí)現(xiàn)了TaskExecutorGateway接口,TaskExecutorGateway定義了各類可以調(diào)用的功能接口,具體內(nèi)容見下表
分類 | 方法名 | 說明 |
---|---|---|
Task操作相關(guān) | SubmitTask | 向TaskExecutor提交任務(wù) |
Task操作相關(guān) | cancelTask | 取消指定的任務(wù) |
Task操作相關(guān) | sendOperatorEventToTask | 發(fā)送算子事件給Task |
Slot操作相關(guān) | requestSlot | 給指定的Job分配指定的slot |
Slot操作相關(guān) | freeSlot | 釋放對應(yīng)的slot |
Slot操作相關(guān) | freeInactiveSlots | 釋放指定Job的未使用的slot |
Partition操作相關(guān) | updatePartitions | 更新分區(qū)信息 |
Partition操作相關(guān) | releaseOrPromotePartitions | 批量釋放或保留分區(qū) |
Partition操作相關(guān) | releaseClusterPartitions | 釋放屬于給定datasets的所有集群分區(qū)數(shù)據(jù) |
checkpoint操作相關(guān) | triggerCheckpoint | 觸發(fā)指定任務(wù)的checkpoint處理 |
checkpoint操作相關(guān) | confirmCheckpoint | 確認(rèn)指定任務(wù)的checkpoint |
checkpoint操作相關(guān) | abortCheckpoint | 終止給定任務(wù)的checkpoint |
Task
一個Task負(fù)責(zé)TaskManager上一個subtask的一次執(zhí)行,Task對Flink Operator進(jìn)行包裝然后運(yùn)行,并提供需要的各類服務(wù),如消費(fèi)輸入數(shù)據(jù),生產(chǎn)數(shù)據(jù)以及和JobManager通訊。Task實(shí)現(xiàn)了Runnable接口,即通過一個單獨(dú)的線程來運(yùn)行,而其中的Flink Operator部分封裝在實(shí)現(xiàn)了TaskInvokable接口的類中,實(shí)現(xiàn)類主要為SourceStreamTask和OneInputStreamTask。下面分別詳細(xì)介紹下這幾個類
- Task: 對應(yīng)為一個線程,來運(yùn)行具體的Operator的邏輯,并包括相關(guān)的其他的輔助功能,包括如執(zhí)行狀態(tài)的管理、結(jié)果數(shù)據(jù)管理(ResultPartitionWriters)、輸入數(shù)據(jù)(IndexInputGate)以及生成封裝了Operator邏輯的TaskInvokable實(shí)例并運(yùn)行
- TaskInvokable:封裝了具體Operator的處理邏輯,主要包括有2個方法,restore()和invoke()。restore()方法在invoke()之前調(diào)用,用于恢復(fù)上次的有效狀態(tài)。invoke()方法執(zhí)行具體的處理邏輯。下面我們看看其實(shí)現(xiàn)子類(這里只列了與StreamGraph相關(guān)的實(shí)現(xiàn)類,對于其他的子類沒有展示)
- SourceStreamTask:用于執(zhí)行StreamSource,即源頭的讀取數(shù)據(jù)類Operator
- OneInputStreamTask:用于執(zhí)行OneInputStreamOperator,即只有一個輸入的operator
- TwoInputStreamTask: 用于執(zhí)行TwoInputStreamOperator,有2個輸入的operator
- MultipleInputStreamTask: 用于執(zhí)行MultipleInputStreamOperator,有多個輸入的operator
計(jì)算框架
計(jì)算框架這節(jié)主要來了解數(shù)據(jù)是如何在Flink中如何處理和流轉(zhuǎn)的。這里我們主要回答以下幾個問題:
- Flink中整個數(shù)據(jù)的處理流程,單條數(shù)據(jù)是如何在各個算子間流轉(zhuǎn)和處理的
- 對于算子chain和其他算子其底層實(shí)現(xiàn)區(qū)別是怎樣的,為何chain后的效率會高 我們先以StreamMap算子為例來看整體計(jì)算框架的設(shè)計(jì)
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private static final long serialVersionUID = 1L; public StreamMap(MapFunction<IN, OUT> mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); } }
這里StreamMap實(shí)現(xiàn)了Input接口,其中在實(shí)現(xiàn)的processElement()方法中實(shí)現(xiàn)了具體的對具體數(shù)據(jù)的操作處理(Operator),并將結(jié)果通過Output接口的collect()方法發(fā)射出去。我們先看看這2個接口定義的方法
基本上2邊是一一對應(yīng)的關(guān)系,Input負(fù)責(zé)處理Element\Watermark\WatermarkStatus\LatencyMarker,而Output負(fù)責(zé)emit這些。這里Input是處理一個輸入的,如果是2個輸入那對應(yīng)的就是TwoInputStreamOperator
算子計(jì)算處理
對于Chain的操作,是通過Output接口的實(shí)現(xiàn)類ChainingOutput.java
// ChainingOutput.java @Override public void collect(StreamRecord<T> record) { pushToOperator(record); } protected <X> void pushToOperator(StreamRecord<X> record) { try { ... input.setKeyContextElement(castRecord); input.processElement(castRecord); } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); }
這里可以看到在output.collect()方法中把數(shù)據(jù)再推送到了算子,然后算子(input)繼續(xù)執(zhí)行processElement()這樣來實(shí)現(xiàn)了在當(dāng)前線程內(nèi)的pipeline處理,
總結(jié)
本篇我們介紹了Flink是如何來執(zhí)行相應(yīng)的算子來實(shí)現(xiàn)計(jì)算的,主要介紹了TaskExecutor運(yùn)行的Task實(shí)現(xiàn),以及chain算子是如何串行來運(yùn)行的。對于算子之間的數(shù)據(jù)交互這塊我們后面一篇來單獨(dú)介紹。
以上就是Flink作業(yè)Task運(yùn)行源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Flink作業(yè)Task運(yùn)行的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java常用類庫StringBuffer,Runtime,日期操作類等類庫總結(jié)
這篇文章主要介紹了Java常用類庫StringBuffer,Runtime,日期操作類等類庫總結(jié),需要的朋友可以參考下2020-02-02mall整合SpringTask實(shí)現(xiàn)定時任務(wù)的方法示例
這篇文章主要介紹了mall整合SpringTask實(shí)現(xiàn)定時任務(wù)的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-06-06IDEA中Web項(xiàng)目控制臺亂碼的問題及解決方法
這篇文章主要介紹了IDEA中Web項(xiàng)目控制臺亂碼的問題及解決方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08springboot項(xiàng)目獲取resources相對路徑的方法
這篇文章主要介紹了springboot項(xiàng)目獲取resources相對路徑的方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12使用SpringBoot簡單實(shí)現(xiàn)無感知的刷新 Token功能
實(shí)現(xiàn)無感知的刷新 Token 是一種提升用戶體驗(yàn)的常用技術(shù),可以在用戶使用應(yīng)用時自動更新 Token,無需用戶手動干預(yù),這種技術(shù)在需要長時間保持用戶登錄狀態(tài)的應(yīng)用中非常有用,以下是使用Spring Boot實(shí)現(xiàn)無感知刷新Token的一個場景案例和相應(yīng)的示例代碼2024-09-09