欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Apache Flink的網(wǎng)絡(luò)協(xié)議棧詳細介紹

  發(fā)布時間:2019-06-28 17:26:24   作者:佚名   我要評論
Flink 的網(wǎng)絡(luò)協(xié)議棧是組成 flink-runtime 模塊的核心組件之一,本文中介紹了Apache Flink網(wǎng)絡(luò)協(xié)議棧,感興趣的朋友可以閱讀本文參考一下

Flink 的網(wǎng)絡(luò)協(xié)議棧是組成 flink-runtime 模塊的核心組件之一,是每個 Flink 作業(yè)的核心。它連接所有 TaskManager 的各個子任務(wù)(Subtask),因此,對于 Flink 作業(yè)的性能包括吞吐與延遲都至關(guān)重要。與 TaskManager 和 JobManager 之間通過基于 Akka 的 RPC 通信的控制通道不同,TaskManager 之間的網(wǎng)絡(luò)協(xié)議棧依賴于更加底層的 Netty API。

本文將首先介紹 Flink 暴露給流算子(Stream operator)的高層抽象,然后詳細介紹 Flink 網(wǎng)絡(luò)協(xié)議棧的物理實現(xiàn)和各種優(yōu)化、優(yōu)化的效果以及 Flink 在吞吐量和延遲之間的權(quán)衡。

Apache Flink的網(wǎng)絡(luò)協(xié)議棧詳細介紹

1.邏輯視圖

Flink 的網(wǎng)絡(luò)協(xié)議棧為彼此通信的子任務(wù)提供以下邏輯視圖,例如在 A 通過 keyBy() 操作進行數(shù)據(jù) Shuffle :

這一過程建立在以下三種基本概念的基礎(chǔ)上:

▼ 子任務(wù)輸出類型(ResultPartitionType):

Pipelined(有限的或無限的):一旦產(chǎn)生數(shù)據(jù)就可以持續(xù)向下游發(fā)送有限數(shù)據(jù)流或無限數(shù)據(jù)流。 Blocking:僅在生成完整結(jié)果后向下游發(fā)送數(shù)據(jù)。

▼ 調(diào)度策略:

同時調(diào)度所有任務(wù)(Eager):同時部署作業(yè)的所有子任務(wù)(用于流作業(yè))。

上游產(chǎn)生第一條記錄部署下游(Lazy):一旦任何生產(chǎn)者生成任何輸出,就立即部署下游任務(wù)。

上游產(chǎn)生完整數(shù)據(jù)部署下游:當(dāng)任何或所有生產(chǎn)者生成完整數(shù)據(jù)后,部署下游任務(wù)。

▼ 數(shù)據(jù)傳輸:

高吞吐:Flink 不是一個一個地發(fā)送每條記錄,而是將若干記錄緩沖到其網(wǎng)絡(luò)緩沖區(qū)中并一次性發(fā)送它們。這降低了每條記錄的發(fā)送成本因此提高了吞吐量。 低延遲:當(dāng)網(wǎng)絡(luò)緩沖區(qū)超過一定的時間未被填滿時會觸發(fā)超時發(fā)送,通過減小超時時間,可以通過犧牲一定的吞吐來獲取更低的延遲。

我們將在下面深入 Flink 網(wǎng)絡(luò)協(xié)議棧的物理實現(xiàn)時看到關(guān)于吞吐延遲的優(yōu)化。對于這一部分,讓我們詳細說明輸出類型與調(diào)度策略。首先,需要知道的是子任務(wù)的輸出類型和調(diào)度策略是緊密關(guān)聯(lián)的,只有兩者的一些特定組合才是有效的。

Pipelined 結(jié)果是流式輸出,需要目標(biāo) Subtask 正在運行以便接收數(shù)據(jù)。因此需要在上游 Task 產(chǎn)生數(shù)據(jù)之前或者產(chǎn)生第一條數(shù)據(jù)的時候調(diào)度下游目標(biāo) Task 運行。批處理作業(yè)生成有界結(jié)果數(shù)據(jù),而流式處理作業(yè)產(chǎn)生無限結(jié)果數(shù)據(jù)。

批處理作業(yè)也可能以阻塞方式產(chǎn)生結(jié)果,具體取決于所使用的算子和連接模式。在這種情況下,必須等待上游 Task 先生成完整的結(jié)果,然后才能調(diào)度下游的接收 Task 運行。這能夠提高批處理作業(yè)的效率并且占用更少的資源。

下表總結(jié)了 Task 輸出類型以及調(diào)度策略的有效組合:

注釋:

目前 Flink 未使用 批處理 / 流計算統(tǒng)一完成后,可能適用于流式作業(yè)

此外,對于具有多個輸入的子任務(wù),調(diào)度以兩種方式啟動:當(dāng)所有或者任何上游任務(wù)產(chǎn)生第一條數(shù)據(jù)或者產(chǎn)生完整數(shù)據(jù)時調(diào)度任務(wù)運行。要調(diào)整批處理作業(yè)中的輸出類型和調(diào)度策略,可以參考 ExecutionConfig#setExecutionMode()——尤其是 ExecutionMode,以及 ExecutionConfig#setDefaultInputDependencyConstraint()。

2.物理數(shù)據(jù)傳輸

為了理解物理數(shù)據(jù)連接,請回想一下,在 Flink 中,不同的任務(wù)可以通過 Slotsharing group 共享相同 Slot。TaskManager 還可以提供多個 Slot,以允許將同一任務(wù)的多個子任務(wù)調(diào)度到同一個 TaskManager 上。

對于下圖所示的示例,我們假設(shè) 2 個并發(fā)為 4 的任務(wù)部署在 2 個 TaskManager 上,每個 TaskManager 有兩個 Slot。TaskManager 1 執(zhí)行子任務(wù) A.1,A.2,B.1 和 B.2,TaskManager 2 執(zhí)行子任務(wù) A.3,A.4,B.3 和 B.4。在 A 和 B 之間是 Shuffle 連接類型,比如來自于 A 的 keyBy() 操作,在每個 TaskManager 上會有 2x4 個邏輯連接,其中一些是本地的,另一些是遠程的:

不同任務(wù)(遠程)之間的每個網(wǎng)絡(luò)連接將在 Flink 的網(wǎng)絡(luò)堆棧中獲得自己的 TCP 通道。但是,如果同一任務(wù)的不同子任務(wù)被調(diào)度到同一個 TaskManager,則它們與同一個 TaskManager 的網(wǎng)絡(luò)連接將多路復(fù)用并共享同一個 TCP 信道以減少資源使用。在我們的例子中,這適用于 A.1→B.3,A.1→B.4,以及 A.2→B.3 和 A.2→B.4,如下圖所示:

每個子任務(wù)的輸出結(jié)果稱為 ResultPartition,每個 ResultPartition 被分成多個單獨的 ResultSubpartition- 每個邏輯通道一個。Flink 的網(wǎng)絡(luò)協(xié)議棧在這一點的處理上,不再處理單個記錄,而是將一組序列化的記錄填充到網(wǎng)絡(luò)緩沖區(qū)中進行處理。每個子任務(wù)本地緩沖區(qū)中最多可用 Buffer 數(shù)目為(每個發(fā)送方和接收方各一個):

#channels * buffers-per-channel + floating-buffers-per-gate

單個 TaskManager 上的網(wǎng)絡(luò)層 Buffer 總數(shù)通常不需要配置。有關(guān)如何在需要時進行配置的詳細信息,請參閱配置網(wǎng)絡(luò)緩沖區(qū)的文檔。

▼ 造成反壓(1)

每當(dāng)子任務(wù)的數(shù)據(jù)發(fā)送緩沖區(qū)耗盡時——數(shù)據(jù)駐留在 Subpartition 的緩沖區(qū)隊列中或位于更底層的基于 Netty 的網(wǎng)絡(luò)堆棧內(nèi),生產(chǎn)者就會被阻塞,無法繼續(xù)發(fā)送數(shù)據(jù),而受到反壓。接收端以類似的方式工作:Netty 收到任何數(shù)據(jù)都需要通過網(wǎng)絡(luò) Buffer 傳遞給 Flink。如果相應(yīng)子任務(wù)的網(wǎng)絡(luò)緩沖區(qū)中沒有足夠可用的網(wǎng)絡(luò) Buffer,F(xiàn)link 將停止從該通道讀取,直到 Buffer 可用。這將反壓該多路復(fù)用上的所有發(fā)送子任務(wù),因此也限制了其他接收子任務(wù)。下圖說明了過載的子任務(wù) B.4,它會導(dǎo)致多路復(fù)用的反壓,也會導(dǎo)致子任務(wù) B.3 無法接受和處理數(shù)據(jù),即使是 B.3 還有足夠的處理能力。

為了防止這種情況發(fā)生,F(xiàn)link 1.5 引入了自己的流量控制機制。

3.Credit-based 流量控制

Credit-based 流量控制可確保發(fā)送端已經(jīng)發(fā)送的任何數(shù)據(jù),接收端都具有足夠的能力(Buffer)來接收。新的流量控制機制基于網(wǎng)絡(luò)緩沖區(qū)的可用性,作為 Flink 之前機制的自然延伸。每個遠程輸入通道(RemoteInputChannel)現(xiàn)在都有自己的一組獨占緩沖區(qū)(Exclusive buffer),而不是只有一個共享的本地緩沖池(LocalBufferPool)。與之前不同,本地緩沖池中的緩沖區(qū)稱為流動緩沖區(qū)(Floating buffer),因為它們會在輸出通道間流動并且可用于每個輸入通道。

數(shù)據(jù)接收方會將自身的可用 Buffer 作為 Credit 告知數(shù)據(jù)發(fā)送方(1 buffer = 1 credit)。每個 Subpartition 會跟蹤下游接收端的 Credit(也就是可用于接收數(shù)據(jù)的 Buffer 數(shù)目)。只有在相應(yīng)的通道(Channel)有 Credit 的時候 Flink 才會向更底層的網(wǎng)絡(luò)協(xié)議棧發(fā)送數(shù)據(jù)(以 Buffer 為粒度),并且每發(fā)送一個 Buffer 的數(shù)據(jù),相應(yīng)的通道上的 Credit 會減 1。除了發(fā)送數(shù)據(jù)本身外,數(shù)據(jù)發(fā)送端還會發(fā)送相應(yīng) Subpartition 中有多少正在排隊發(fā)送的 Buffer 數(shù)(稱之為 Backlog)給下游。數(shù)據(jù)接收端會利用這一信息(Backlog)去申請合適數(shù)量的 Floating buffer 用于接收發(fā)送端的數(shù)據(jù),這可以加快發(fā)送端堆積數(shù)據(jù)的處理。接收端會首先申請和 Backlog 數(shù)量相等的 Buffer,但可能無法申請到全部,甚至一個都申請不到,這時接收端會利用已經(jīng)申請到的 Buffer 進行數(shù)據(jù)接收,并監(jiān)聽是否有新的 Buffer 可用。

Credit-based 的流控使用 Buffers-per-channel 來指定每個 Channel 有多少獨占的 Buffer,使用 Floating-buffers-per-gate 來指定共享的本地緩沖池(Local buffer pool)大小(可選3),通過共享本地緩沖池,Credit-based 流控可以使用的 Buffer 數(shù)目可以達到與原來非 Credit-based 流控同樣的大小。這兩個參數(shù)的默認值是被精心選取的,以保證新的 Credit-based 流控在網(wǎng)絡(luò)健康延遲正常的情況下至少可以達到與原策略相同的吞吐??梢愿鶕?jù)實際的網(wǎng)絡(luò) RRT (round-trip-time)和帶寬對這兩個參數(shù)進行調(diào)整。

注釋3:如果沒有足夠的 Buffer 可用,則每個緩沖池將獲得全局可用 Buffer 的相同份額(±1)。

相關(guān)文章

最新評論