Apache Flink的網(wǎng)絡(luò)協(xié)議棧詳細介紹

Flink 的網(wǎng)絡(luò)協(xié)議棧是組成 flink-runtime 模塊的核心組件之一,是每個 Flink 作業(yè)的核心。它連接所有 TaskManager 的各個子任務(wù)(Subtask),因此,對于 Flink 作業(yè)的性能包括吞吐與延遲都至關(guān)重要。與 TaskManager 和 JobManager 之間通過基于 Akka 的 RPC 通信的控制通道不同,TaskManager 之間的網(wǎng)絡(luò)協(xié)議棧依賴于更加底層的 Netty API。
本文將首先介紹 Flink 暴露給流算子(Stream operator)的高層抽象,然后詳細介紹 Flink 網(wǎng)絡(luò)協(xié)議棧的物理實現(xiàn)和各種優(yōu)化、優(yōu)化的效果以及 Flink 在吞吐量和延遲之間的權(quán)衡。
Apache Flink的網(wǎng)絡(luò)協(xié)議棧詳細介紹
1.邏輯視圖
Flink 的網(wǎng)絡(luò)協(xié)議棧為彼此通信的子任務(wù)提供以下邏輯視圖,例如在 A 通過 keyBy() 操作進行數(shù)據(jù) Shuffle :
這一過程建立在以下三種基本概念的基礎(chǔ)上:
▼ 子任務(wù)輸出類型(ResultPartitionType):
Pipelined(有限的或無限的):一旦產(chǎn)生數(shù)據(jù)就可以持續(xù)向下游發(fā)送有限數(shù)據(jù)流或無限數(shù)據(jù)流。 Blocking:僅在生成完整結(jié)果后向下游發(fā)送數(shù)據(jù)。
▼ 調(diào)度策略:
同時調(diào)度所有任務(wù)(Eager):同時部署作業(yè)的所有子任務(wù)(用于流作業(yè))。
上游產(chǎn)生第一條記錄部署下游(Lazy):一旦任何生產(chǎn)者生成任何輸出,就立即部署下游任務(wù)。
上游產(chǎn)生完整數(shù)據(jù)部署下游:當(dāng)任何或所有生產(chǎn)者生成完整數(shù)據(jù)后,部署下游任務(wù)。
▼ 數(shù)據(jù)傳輸:
高吞吐:Flink 不是一個一個地發(fā)送每條記錄,而是將若干記錄緩沖到其網(wǎng)絡(luò)緩沖區(qū)中并一次性發(fā)送它們。這降低了每條記錄的發(fā)送成本因此提高了吞吐量。 低延遲:當(dāng)網(wǎng)絡(luò)緩沖區(qū)超過一定的時間未被填滿時會觸發(fā)超時發(fā)送,通過減小超時時間,可以通過犧牲一定的吞吐來獲取更低的延遲。
我們將在下面深入 Flink 網(wǎng)絡(luò)協(xié)議棧的物理實現(xiàn)時看到關(guān)于吞吐延遲的優(yōu)化。對于這一部分,讓我們詳細說明輸出類型與調(diào)度策略。首先,需要知道的是子任務(wù)的輸出類型和調(diào)度策略是緊密關(guān)聯(lián)的,只有兩者的一些特定組合才是有效的。
Pipelined 結(jié)果是流式輸出,需要目標(biāo) Subtask 正在運行以便接收數(shù)據(jù)。因此需要在上游 Task 產(chǎn)生數(shù)據(jù)之前或者產(chǎn)生第一條數(shù)據(jù)的時候調(diào)度下游目標(biāo) Task 運行。批處理作業(yè)生成有界結(jié)果數(shù)據(jù),而流式處理作業(yè)產(chǎn)生無限結(jié)果數(shù)據(jù)。
批處理作業(yè)也可能以阻塞方式產(chǎn)生結(jié)果,具體取決于所使用的算子和連接模式。在這種情況下,必須等待上游 Task 先生成完整的結(jié)果,然后才能調(diào)度下游的接收 Task 運行。這能夠提高批處理作業(yè)的效率并且占用更少的資源。
下表總結(jié)了 Task 輸出類型以及調(diào)度策略的有效組合:
注釋:
目前 Flink 未使用 批處理 / 流計算統(tǒng)一完成后,可能適用于流式作業(yè)
此外,對于具有多個輸入的子任務(wù),調(diào)度以兩種方式啟動:當(dāng)所有或者任何上游任務(wù)產(chǎn)生第一條數(shù)據(jù)或者產(chǎn)生完整數(shù)據(jù)時調(diào)度任務(wù)運行。要調(diào)整批處理作業(yè)中的輸出類型和調(diào)度策略,可以參考 ExecutionConfig#setExecutionMode()——尤其是 ExecutionMode,以及 ExecutionConfig#setDefaultInputDependencyConstraint()。
2.物理數(shù)據(jù)傳輸
為了理解物理數(shù)據(jù)連接,請回想一下,在 Flink 中,不同的任務(wù)可以通過 Slotsharing group 共享相同 Slot。TaskManager 還可以提供多個 Slot,以允許將同一任務(wù)的多個子任務(wù)調(diào)度到同一個 TaskManager 上。
對于下圖所示的示例,我們假設(shè) 2 個并發(fā)為 4 的任務(wù)部署在 2 個 TaskManager 上,每個 TaskManager 有兩個 Slot。TaskManager 1 執(zhí)行子任務(wù) A.1,A.2,B.1 和 B.2,TaskManager 2 執(zhí)行子任務(wù) A.3,A.4,B.3 和 B.4。在 A 和 B 之間是 Shuffle 連接類型,比如來自于 A 的 keyBy() 操作,在每個 TaskManager 上會有 2x4 個邏輯連接,其中一些是本地的,另一些是遠程的:
不同任務(wù)(遠程)之間的每個網(wǎng)絡(luò)連接將在 Flink 的網(wǎng)絡(luò)堆棧中獲得自己的 TCP 通道。但是,如果同一任務(wù)的不同子任務(wù)被調(diào)度到同一個 TaskManager,則它們與同一個 TaskManager 的網(wǎng)絡(luò)連接將多路復(fù)用并共享同一個 TCP 信道以減少資源使用。在我們的例子中,這適用于 A.1→B.3,A.1→B.4,以及 A.2→B.3 和 A.2→B.4,如下圖所示:
每個子任務(wù)的輸出結(jié)果稱為 ResultPartition,每個 ResultPartition 被分成多個單獨的 ResultSubpartition- 每個邏輯通道一個。Flink 的網(wǎng)絡(luò)協(xié)議棧在這一點的處理上,不再處理單個記錄,而是將一組序列化的記錄填充到網(wǎng)絡(luò)緩沖區(qū)中進行處理。每個子任務(wù)本地緩沖區(qū)中最多可用 Buffer 數(shù)目為(每個發(fā)送方和接收方各一個):
#channels * buffers-per-channel + floating-buffers-per-gate
單個 TaskManager 上的網(wǎng)絡(luò)層 Buffer 總數(shù)通常不需要配置。有關(guān)如何在需要時進行配置的詳細信息,請參閱配置網(wǎng)絡(luò)緩沖區(qū)的文檔。
▼ 造成反壓(1)
每當(dāng)子任務(wù)的數(shù)據(jù)發(fā)送緩沖區(qū)耗盡時——數(shù)據(jù)駐留在 Subpartition 的緩沖區(qū)隊列中或位于更底層的基于 Netty 的網(wǎng)絡(luò)堆棧內(nèi),生產(chǎn)者就會被阻塞,無法繼續(xù)發(fā)送數(shù)據(jù),而受到反壓。接收端以類似的方式工作:Netty 收到任何數(shù)據(jù)都需要通過網(wǎng)絡(luò) Buffer 傳遞給 Flink。如果相應(yīng)子任務(wù)的網(wǎng)絡(luò)緩沖區(qū)中沒有足夠可用的網(wǎng)絡(luò) Buffer,F(xiàn)link 將停止從該通道讀取,直到 Buffer 可用。這將反壓該多路復(fù)用上的所有發(fā)送子任務(wù),因此也限制了其他接收子任務(wù)。下圖說明了過載的子任務(wù) B.4,它會導(dǎo)致多路復(fù)用的反壓,也會導(dǎo)致子任務(wù) B.3 無法接受和處理數(shù)據(jù),即使是 B.3 還有足夠的處理能力。
為了防止這種情況發(fā)生,F(xiàn)link 1.5 引入了自己的流量控制機制。
3.Credit-based 流量控制
Credit-based 流量控制可確保發(fā)送端已經(jīng)發(fā)送的任何數(shù)據(jù),接收端都具有足夠的能力(Buffer)來接收。新的流量控制機制基于網(wǎng)絡(luò)緩沖區(qū)的可用性,作為 Flink 之前機制的自然延伸。每個遠程輸入通道(RemoteInputChannel)現(xiàn)在都有自己的一組獨占緩沖區(qū)(Exclusive buffer),而不是只有一個共享的本地緩沖池(LocalBufferPool)。與之前不同,本地緩沖池中的緩沖區(qū)稱為流動緩沖區(qū)(Floating buffer),因為它們會在輸出通道間流動并且可用于每個輸入通道。
數(shù)據(jù)接收方會將自身的可用 Buffer 作為 Credit 告知數(shù)據(jù)發(fā)送方(1 buffer = 1 credit)。每個 Subpartition 會跟蹤下游接收端的 Credit(也就是可用于接收數(shù)據(jù)的 Buffer 數(shù)目)。只有在相應(yīng)的通道(Channel)有 Credit 的時候 Flink 才會向更底層的網(wǎng)絡(luò)協(xié)議棧發(fā)送數(shù)據(jù)(以 Buffer 為粒度),并且每發(fā)送一個 Buffer 的數(shù)據(jù),相應(yīng)的通道上的 Credit 會減 1。除了發(fā)送數(shù)據(jù)本身外,數(shù)據(jù)發(fā)送端還會發(fā)送相應(yīng) Subpartition 中有多少正在排隊發(fā)送的 Buffer 數(shù)(稱之為 Backlog)給下游。數(shù)據(jù)接收端會利用這一信息(Backlog)去申請合適數(shù)量的 Floating buffer 用于接收發(fā)送端的數(shù)據(jù),這可以加快發(fā)送端堆積數(shù)據(jù)的處理。接收端會首先申請和 Backlog 數(shù)量相等的 Buffer,但可能無法申請到全部,甚至一個都申請不到,這時接收端會利用已經(jīng)申請到的 Buffer 進行數(shù)據(jù)接收,并監(jiān)聽是否有新的 Buffer 可用。
Credit-based 的流控使用 Buffers-per-channel 來指定每個 Channel 有多少獨占的 Buffer,使用 Floating-buffers-per-gate 來指定共享的本地緩沖池(Local buffer pool)大小(可選3),通過共享本地緩沖池,Credit-based 流控可以使用的 Buffer 數(shù)目可以達到與原來非 Credit-based 流控同樣的大小。這兩個參數(shù)的默認值是被精心選取的,以保證新的 Credit-based 流控在網(wǎng)絡(luò)健康延遲正常的情況下至少可以達到與原策略相同的吞吐??梢愿鶕?jù)實際的網(wǎng)絡(luò) RRT (round-trip-time)和帶寬對這兩個參數(shù)進行調(diào)整。
注釋3:如果沒有足夠的 Buffer 可用,則每個緩沖池將獲得全局可用 Buffer 的相同份額(±1)。
相關(guān)文章
IOT通信協(xié)議有哪些?物聯(lián)網(wǎng)七大通信協(xié)議對比介紹
物聯(lián)網(wǎng)簡稱iot,本文中為大家的是物聯(lián)網(wǎng)的7大協(xié)議以及對比,有需要的朋友可以閱讀本文參考一下2019-06-285G時代 HTTP和DNS協(xié)議將發(fā)生哪些變化?
HTTP和DNS這兩種協(xié)議幾乎已經(jīng)成為家喻戶曉,現(xiàn)在5G時代的來臨,這些協(xié)議也都將發(fā)生巨大的變化,本文就介紹下HTTP和DNS這些協(xié)議在未來會如何發(fā)展的,如何改變的2017-07-05最常用路由協(xié)議RIP-1/2 OSPF IS-IS BGP的特點對比
RIP協(xié)議是最早的路由協(xié)議,OSPF是目前應(yīng)用最廣泛的IGP協(xié)議,IS-IS是另外一種鏈路狀態(tài)型的路由協(xié)議,BGP協(xié)議是唯一的EGP協(xié)議,那么這幾種路由協(xié)議有什么特點和不同呢?下面就2017-04-06如何屏蔽https網(wǎng)站、禁止訪問https、禁止跳轉(zhuǎn)https的方法
由于網(wǎng)絡(luò)安全形勢越發(fā)嚴(yán)峻,為了保護用戶隱私和網(wǎng)絡(luò)安全,越來越多的網(wǎng)站都開啟了HTTPS,如何禁止訪問HTTPS網(wǎng)站、如何屏蔽HTTPS網(wǎng)站就成為重要的網(wǎng)絡(luò)管理工作,下面就來看2017-03-29無線網(wǎng)絡(luò)IEEE802.11/IEEE802.3協(xié)議標(biāo)準(zhǔn)和區(qū)別
IEEE802協(xié)議是一種物理協(xié)議集,而以太網(wǎng)協(xié)議是由一組IEEE 802.3標(biāo)準(zhǔn)定義的局域網(wǎng)協(xié)議集,下面就為大家介紹下IEEE802.11/IEEE802.3協(xié)議標(biāo)準(zhǔn)和區(qū)別,大家了解下吧2017-03-27LoRa與ZigBee有什么區(qū)別?LoRa與ZigBee技術(shù)全面分析
ZigBee是基于IEEE802.15.4標(biāo)準(zhǔn)的低功耗局域網(wǎng)協(xié)議,LoRa 是LPWAN通信技術(shù)中的一種,那么兩者之間有什么區(qū)別和聯(lián)系呢?下面就詳情來為大家解析下,希望對大家有幫助2017-03-26- 報文是HTTP協(xié)議下請求和響應(yīng)的信息基礎(chǔ),這里就帶大家來逐步解讀HTTP報文的組成及含義,需要的朋友可以參考下2016-06-16
- 這篇文章主要介紹了SIP協(xié)議錯誤代碼code大全(中英文對照),需要的朋友可以參考下2016-05-16
- 這篇文章主要為大家詳細介紹了查看電腦MAC地址的方法,教大家如何查看電腦的MAC地址,感興趣的小伙伴們可以參考一下2016-04-06
- 這篇文章主要介紹了圖解TCP/IP協(xié)議,幫助大家輕松學(xué)會TCP/IP協(xié)議,需要的朋友可以參考下2015-12-16