Flink JobGraph生成源碼解析
引言
在DataStream基礎(chǔ)中,由于其中的內(nèi)容較多,只是介紹了JobGraph的結(jié)果,而沒(méi)有涉及到StreamGraph到JobGraph的轉(zhuǎn)換過(guò)程。本篇我們來(lái)介紹下JobGraph的生成的詳情,重點(diǎn)是Operator可以串聯(lián)成Chain的條件
概念
首先我們來(lái)回顧下JobGraph中的相關(guān)概念
- JobVertex:job的頂點(diǎn),即對(duì)應(yīng)的計(jì)算邏輯(這里用的是Vertex, 而前面用的是Node,有點(diǎn)差異),通過(guò)inputs記錄了所有來(lái)源的Edge,而輸出是ArrayList來(lái)記錄
- JobEdge: job的邊,記錄了源Vertex和目標(biāo)表Vertex.
- IntermediateDataSet: 定義了一個(gè)中間數(shù)據(jù)集,但并沒(méi)有存儲(chǔ),只是記錄了一個(gè)Producer(JobVertex)和一個(gè)Consumer(JobEdge)
JobGraph生成
前面我們?cè)诮榻B部署的時(shí)候,有介紹具體是通過(guò)PipelineExecutor的execute()方法來(lái)提交對(duì)應(yīng)的任務(wù),StreamGraph到JobGraph的轉(zhuǎn)換邏輯就是在該方法中處理的,具體是通過(guò)如下方法來(lái)進(jìn)行處理
public static JobGraph getJobGraph( @Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration)
最后執(zhí)行轉(zhuǎn)換的類為FlinkPipelineTranslator,調(diào)用的是其中的translateToJobGraph方法。
JobGraph translateToJobGraph( Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism);
這里有2個(gè)不同的實(shí)現(xiàn)類
- StreamGraphTranslator:對(duì)StreamGraph的Pipeline進(jìn)行轉(zhuǎn)換處理
- PlanTranslator:對(duì)Plan類型的Pipeline進(jìn)行轉(zhuǎn)換處理,用于SQL場(chǎng)景。 而這2個(gè)分別對(duì)應(yīng)到2個(gè)不同的類來(lái)生成JobGraph,分別如下:
- StreamingJobGraphGenerator
- JobGraphGenerator 本篇我們重點(diǎn)介紹StreamGraph到JobGraph的轉(zhuǎn)換StreamingJobGraphGenerator, JogGraphGenerator這塊等到介紹FlinkSQL的時(shí)候來(lái)介紹。StreamingJobGraphGenerator類中具體轉(zhuǎn)換處理的邏輯如下:
private JobGraph createJobGraph() { preValidate(); jobGraph.setJobType(streamGraph.getJobType());  jobGraph.enableApproximateLocalRecovery( streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled()); // Generate deterministic hashes for the nodes in order to identify them across // submission iff they didn't change. Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); // Generate legacy version hashes for backwards compatibility List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size()); for (StreamGraphHasher hasher : legacyStreamGraphHashers) { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } setChaining(hashes, legacyHashes); setPhysicalEdges(); setSlotSharingAndCoLocation(); setManagedMemoryFraction( Collections.unmodifiableMap(jobVertices), Collections.unmodifiableMap(vertexConfigs), Collections.unmodifiableMap(chainedConfigs), id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(), id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases()); configureCheckpointing(); jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings()); final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries = JobGraphUtils.prepareUserArtifactEntries( streamGraph.getUserArtifacts().stream() .collect(Collectors.toMap(e -> e.f0, e -> e.f1)), jobGraph.getJobID()); for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : distributedCacheEntries.entrySet()) { jobGraph.addUserArtifact(entry.getKey(), entry.getValue()); } // set the ExecutionConfig last when it has been finalized try { jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); } catch (IOException e) { throw new IllegalConfigurationException( "Could not serialize the ExecutionConfig." + "This indicates that non-serializable types (like custom serializers) were registered"); } addVertexIndexPrefixInVertexName(); setVertexDescription(); return jobGraph; }
重點(diǎn)我們介紹以下幾點(diǎn)
生成hash值
對(duì)每個(gè)streamNode生成一個(gè)hash值,用于來(lái)標(biāo)識(shí)節(jié)點(diǎn),用于重新提交任務(wù)后涉及恢復(fù)作業(yè)的場(chǎng)景。具體生成hash值的邏輯如下:
- 如果指定了id信息,如Transformation.getUid(), 就用該值來(lái)生成hash值
- 否則使用鏈上的輸出node和節(jié)點(diǎn)的輸入nodes的hash值來(lái)生成一個(gè)hash值 對(duì)具體的算法細(xì)節(jié)感興趣的同學(xué)可以深入研究StreamGraphHasherV2的具體內(nèi)容。
生成chain
如果連接的2個(gè)節(jié)點(diǎn)滿足一定的條件,就會(huì)把這2個(gè)節(jié)點(diǎn)放到一個(gè)chain里面,這樣可以避免上下游算子間發(fā)送數(shù)據(jù)的網(wǎng)絡(luò)開銷和序列化反序列化的性能開銷。判斷算子是否可以組成一個(gè)chain的判斷邏輯如下:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph); } private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) && arePartitionerAndExchangeModeChainable( edge.getPartitioner(), edge.getExchangeMode(), streamGraph.getExecutionConfig().isDynamicGraph()) && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled())) { return false; } // check that we do not have a union operation, because unions currently only work // through the network/byte-channel stack. // we check that by testing that each "type" (which means input position) is used only once for (StreamEdge inEdge : downStreamVertex.getInEdges()) { if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) { return false; } } return true; }
具體解讀如下:
- 下游節(jié)點(diǎn)只有1個(gè)輸入邊
- 上游節(jié)點(diǎn)和下游節(jié)點(diǎn)是在同一個(gè)SlotSharingGroup,slotSharingGroup在沒(méi)有設(shè)置的情況下,默認(rèn)為default;
- 上下游節(jié)點(diǎn)的算子的chaining策略是支持chain的,上游算子的chaining策略為ALWAYS\HEAD\HEAD_WITH_SOURCES,下游算子的chaining策略為ALWAYS或者(HEAD_WITH_SOURCES且上游算子為source算子,具體這些策略的說(shuō)明見(jiàn)ChainingStrategy.java
- 邊的分區(qū)策略是ForwardForConsecutiveHashPartitioner或者分區(qū)策略是ForwardPartitioner且數(shù)據(jù)交換方式(StreamExchangeMode)不是批模式
- 上下游節(jié)點(diǎn)的并行度一致
- StreamGraph是允許Chaining的
總結(jié)
本篇介紹了StreamGraph到JobGraph的生成流程,重點(diǎn)是在上下游節(jié)點(diǎn)是需要滿足什么條件才能chain到一起的具體邏輯。
以上就是Flink JobGraph生成源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Flink JobGraph生成的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringCloud微服務(wù)續(xù)約實(shí)現(xiàn)源碼分析詳解
這篇文章主要介紹了SpringCloud微服務(wù)續(xù)約實(shí)現(xiàn)源碼分析,服務(wù)續(xù)期和服務(wù)注冊(cè)非常相似,服務(wù)注冊(cè)在Eureka?Client程序啟動(dòng)之后開啟,并同時(shí)開啟服務(wù)續(xù)期的定時(shí)任務(wù)2022-11-11Spring?Cloud?Gateway遠(yuǎn)程命令執(zhí)行漏洞分析(CVE-2022-22947)
使用Spring Cloud Gateway的應(yīng)用程序在Actuator端點(diǎn)啟用、公開和不安全的情況下容易受到代碼注入的攻擊,攻擊者可以惡意創(chuàng)建允許在遠(yuǎn)程主機(jī)上執(zhí)行任意遠(yuǎn)程執(zhí)行的請(qǐng)求,這篇文章主要介紹了Spring?Cloud?Gateway遠(yuǎn)程命令執(zhí)行漏洞(CVE-2022-22947),需要的朋友可以參考下2023-03-03JDK17、JDK19、JDK1.8輕松切換(無(wú)坑版,小白也可以看懂!)
在做不同的java項(xiàng)目時(shí)候,因項(xiàng)目需要很可能來(lái)回切換jdk版本,下面這篇文章主要介紹了JDK17、JDK19、JDK1.8輕松切換的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02Springboot如何通過(guò)yml配置文件為靜態(tài)成員變量賦值
這篇文章主要介紹了Springboot如何通過(guò)yml配置文件為靜態(tài)成員變量賦值,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10Spring Boot jpa Service層代碼實(shí)例
這篇文章主要介紹了Spring Boot jpa Service層代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10詳解MyBatis延遲加載是如何實(shí)現(xiàn)的
MyBatis 的延遲加載(懶加載)特性允許在需要使用關(guān)聯(lián)對(duì)象數(shù)據(jù)時(shí)才進(jìn)行加載,而不是在執(zhí)行主查詢時(shí)就加載所有相關(guān)數(shù)據(jù),我們將通過(guò)以下幾個(gè)方面來(lái)深入了解MyBatis的延遲加載實(shí)現(xiàn)機(jī)制,需要的朋友可以參考下2024-07-07使用迭代器Iterator遍歷Collection問(wèn)題
這篇文章主要介紹了使用迭代器Iterator遍歷Collection問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11