欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Flink JobGraph生成源碼解析

 更新時(shí)間:2022年12月01日 10:41:06   作者:xiangel  
這篇文章主要為大家介紹了Flink JobGraph生成源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

在DataStream基礎(chǔ)中,由于其中的內(nèi)容較多,只是介紹了JobGraph的結(jié)果,而沒(méi)有涉及到StreamGraph到JobGraph的轉(zhuǎn)換過(guò)程。本篇我們來(lái)介紹下JobGraph的生成的詳情,重點(diǎn)是Operator可以串聯(lián)成Chain的條件

概念

首先我們來(lái)回顧下JobGraph中的相關(guān)概念

  • JobVertex:job的頂點(diǎn),即對(duì)應(yīng)的計(jì)算邏輯(這里用的是Vertex, 而前面用的是Node,有點(diǎn)差異),通過(guò)inputs記錄了所有來(lái)源的Edge,而輸出是ArrayList來(lái)記錄
  • JobEdge: job的邊,記錄了源Vertex和目標(biāo)表Vertex.
  • IntermediateDataSet: 定義了一個(gè)中間數(shù)據(jù)集,但并沒(méi)有存儲(chǔ),只是記錄了一個(gè)Producer(JobVertex)和一個(gè)Consumer(JobEdge)

JobGraph生成

前面我們?cè)诮榻B部署的時(shí)候,有介紹具體是通過(guò)PipelineExecutor的execute()方法來(lái)提交對(duì)應(yīng)的任務(wù),StreamGraph到JobGraph的轉(zhuǎn)換邏輯就是在該方法中處理的,具體是通過(guò)如下方法來(lái)進(jìn)行處理

public static JobGraph getJobGraph(
            @Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration)

最后執(zhí)行轉(zhuǎn)換的類為FlinkPipelineTranslator,調(diào)用的是其中的translateToJobGraph方法。

JobGraph translateToJobGraph(
            Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism);

這里有2個(gè)不同的實(shí)現(xiàn)類

  • StreamGraphTranslator:對(duì)StreamGraph的Pipeline進(jìn)行轉(zhuǎn)換處理
  • PlanTranslator:對(duì)Plan類型的Pipeline進(jìn)行轉(zhuǎn)換處理,用于SQL場(chǎng)景。 而這2個(gè)分別對(duì)應(yīng)到2個(gè)不同的類來(lái)生成JobGraph,分別如下:
  • StreamingJobGraphGenerator
  • JobGraphGenerator 本篇我們重點(diǎn)介紹StreamGraph到JobGraph的轉(zhuǎn)換StreamingJobGraphGenerator, JogGraphGenerator這塊等到介紹FlinkSQL的時(shí)候來(lái)介紹。StreamingJobGraphGenerator類中具體轉(zhuǎn)換處理的邏輯如下:
 private JobGraph createJobGraph() {
        preValidate();
        jobGraph.setJobType(streamGraph.getJobType());
![image.png](https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0603957ea34f4d6b9af96b686bd5fdb1~tplv-k3u1fbpfcp-watermark.image?)
        jobGraph.enableApproximateLocalRecovery(
                streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        Map<Integer, byte[]> hashes =
                defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }
        setChaining(hashes, legacyHashes);
        setPhysicalEdges();
        setSlotSharingAndCoLocation();
        setManagedMemoryFraction(
                Collections.unmodifiableMap(jobVertices),
                Collections.unmodifiableMap(vertexConfigs),
                Collections.unmodifiableMap(chainedConfigs),
                id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
                id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
        configureCheckpointing();
        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
        final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
                JobGraphUtils.prepareUserArtifactEntries(
                        streamGraph.getUserArtifacts().stream()
                                .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
                        jobGraph.getJobID());
        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                distributedCacheEntries.entrySet()) {
            jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
        }
        // set the ExecutionConfig last when it has been finalized
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        } catch (IOException e) {
            throw new IllegalConfigurationException(
                    "Could not serialize the ExecutionConfig."
                            + "This indicates that non-serializable types (like custom serializers) were registered");
        }
        addVertexIndexPrefixInVertexName();
        setVertexDescription();
        return jobGraph;
    }

重點(diǎn)我們介紹以下幾點(diǎn)

生成hash值

對(duì)每個(gè)streamNode生成一個(gè)hash值,用于來(lái)標(biāo)識(shí)節(jié)點(diǎn),用于重新提交任務(wù)后涉及恢復(fù)作業(yè)的場(chǎng)景。具體生成hash值的邏輯如下:

  • 如果指定了id信息,如Transformation.getUid(), 就用該值來(lái)生成hash值
  • 否則使用鏈上的輸出node和節(jié)點(diǎn)的輸入nodes的hash值來(lái)生成一個(gè)hash值 對(duì)具體的算法細(xì)節(jié)感興趣的同學(xué)可以深入研究StreamGraphHasherV2的具體內(nèi)容。

生成chain

如果連接的2個(gè)節(jié)點(diǎn)滿足一定的條件,就會(huì)把這2個(gè)節(jié)點(diǎn)放到一個(gè)chain里面,這樣可以避免上下游算子間發(fā)送數(shù)據(jù)的網(wǎng)絡(luò)開銷和序列化反序列化的性能開銷。判斷算子是否可以組成一個(gè)chain的判斷邏輯如下:

    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
    }
    private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
                && arePartitionerAndExchangeModeChainable(
                        edge.getPartitioner(),
                        edge.getExchangeMode(),
                        streamGraph.getExecutionConfig().isDynamicGraph())
                && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                && streamGraph.isChainingEnabled())) {
            return false;
        }
        // check that we do not have a union operation, because unions currently only work
        // through the network/byte-channel stack.
        // we check that by testing that each "type" (which means input position) is used only once
        for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
            if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
                return false;
            }
        }
        return true;
    }

具體解讀如下:

  • 下游節(jié)點(diǎn)只有1個(gè)輸入邊
  • 上游節(jié)點(diǎn)和下游節(jié)點(diǎn)是在同一個(gè)SlotSharingGroup,slotSharingGroup在沒(méi)有設(shè)置的情況下,默認(rèn)為default;
  • 上下游節(jié)點(diǎn)的算子的chaining策略是支持chain的,上游算子的chaining策略為ALWAYS\HEAD\HEAD_WITH_SOURCES,下游算子的chaining策略為ALWAYS或者(HEAD_WITH_SOURCES且上游算子為source算子,具體這些策略的說(shuō)明見(jiàn)ChainingStrategy.java
  • 邊的分區(qū)策略是ForwardForConsecutiveHashPartitioner或者分區(qū)策略是ForwardPartitioner且數(shù)據(jù)交換方式(StreamExchangeMode)不是批模式
  • 上下游節(jié)點(diǎn)的并行度一致
  • StreamGraph是允許Chaining的

總結(jié)

本篇介紹了StreamGraph到JobGraph的生成流程,重點(diǎn)是在上下游節(jié)點(diǎn)是需要滿足什么條件才能chain到一起的具體邏輯。

以上就是Flink JobGraph生成源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Flink JobGraph生成的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • SpringCloud微服務(wù)續(xù)約實(shí)現(xiàn)源碼分析詳解

    SpringCloud微服務(wù)續(xù)約實(shí)現(xiàn)源碼分析詳解

    這篇文章主要介紹了SpringCloud微服務(wù)續(xù)約實(shí)現(xiàn)源碼分析,服務(wù)續(xù)期和服務(wù)注冊(cè)非常相似,服務(wù)注冊(cè)在Eureka?Client程序啟動(dòng)之后開啟,并同時(shí)開啟服務(wù)續(xù)期的定時(shí)任務(wù)
    2022-11-11
  • Spring?Cloud?Gateway遠(yuǎn)程命令執(zhí)行漏洞分析(CVE-2022-22947)

    Spring?Cloud?Gateway遠(yuǎn)程命令執(zhí)行漏洞分析(CVE-2022-22947)

    使用Spring Cloud Gateway的應(yīng)用程序在Actuator端點(diǎn)啟用、公開和不安全的情況下容易受到代碼注入的攻擊,攻擊者可以惡意創(chuàng)建允許在遠(yuǎn)程主機(jī)上執(zhí)行任意遠(yuǎn)程執(zhí)行的請(qǐng)求,這篇文章主要介紹了Spring?Cloud?Gateway遠(yuǎn)程命令執(zhí)行漏洞(CVE-2022-22947),需要的朋友可以參考下
    2023-03-03
  • JDK17、JDK19、JDK1.8輕松切換(無(wú)坑版,小白也可以看懂!)

    JDK17、JDK19、JDK1.8輕松切換(無(wú)坑版,小白也可以看懂!)

    在做不同的java項(xiàng)目時(shí)候,因項(xiàng)目需要很可能來(lái)回切換jdk版本,下面這篇文章主要介紹了JDK17、JDK19、JDK1.8輕松切換的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2023-02-02
  • 深入理解JDK8中Stream使用

    深入理解JDK8中Stream使用

    Stream 是 Java8 中處理集合的關(guān)鍵抽象概念,它可以指定你希望對(duì)集合進(jìn)行的操作,可以執(zhí)行非常復(fù)雜的查找、過(guò)濾和映射數(shù)據(jù)等操作。這篇文章主要介紹了JDK8中Stream使用解析,需要的朋友可以參考下
    2021-06-06
  • idea 如何查找類中的某個(gè)方法

    idea 如何查找類中的某個(gè)方法

    這篇文章主要介紹了idea 如何查找類中的某個(gè)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-02-02
  • Springboot如何通過(guò)yml配置文件為靜態(tài)成員變量賦值

    Springboot如何通過(guò)yml配置文件為靜態(tài)成員變量賦值

    這篇文章主要介紹了Springboot如何通過(guò)yml配置文件為靜態(tài)成員變量賦值,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-10-10
  • Effective Java (異常處理)

    Effective Java (異常處理)

    Effective Java (異常處理),需要的朋友可以參考一下
    2013-02-02
  • Spring Boot jpa Service層代碼實(shí)例

    Spring Boot jpa Service層代碼實(shí)例

    這篇文章主要介紹了Spring Boot jpa Service層代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10
  • 詳解MyBatis延遲加載是如何實(shí)現(xiàn)的

    詳解MyBatis延遲加載是如何實(shí)現(xiàn)的

    MyBatis 的延遲加載(懶加載)特性允許在需要使用關(guān)聯(lián)對(duì)象數(shù)據(jù)時(shí)才進(jìn)行加載,而不是在執(zhí)行主查詢時(shí)就加載所有相關(guān)數(shù)據(jù),我們將通過(guò)以下幾個(gè)方面來(lái)深入了解MyBatis的延遲加載實(shí)現(xiàn)機(jī)制,需要的朋友可以參考下
    2024-07-07
  • 使用迭代器Iterator遍歷Collection問(wèn)題

    使用迭代器Iterator遍歷Collection問(wèn)題

    這篇文章主要介紹了使用迭代器Iterator遍歷Collection問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-11-11

最新評(píng)論