Apache Flink的網(wǎng)絡(luò)協(xié)議棧詳細(xì)介紹

▼ 造成反壓(2)
與沒(méi)有流量控制的接收端反壓機(jī)制不同,Credit 提供了更直接的控制:如果接收端的處理速度跟不上,最終它的 Credit 會(huì)減少成 0,此時(shí)發(fā)送端就不會(huì)在向網(wǎng)絡(luò)中發(fā)送數(shù)據(jù)(數(shù)據(jù)會(huì)被序列化到 Buffer 中并緩存在發(fā)送端)。由于反壓只發(fā)生在邏輯鏈路上,因此沒(méi)必要阻斷從多路復(fù)用的 TCP 連接中讀取數(shù)據(jù),也就不會(huì)影響其他的接收者接收和處理數(shù)據(jù)。
▼ Credit-based 的優(yōu)勢(shì)與問(wèn)題
由于通過(guò) Credit-based 流控機(jī)制,多路復(fù)用中的一個(gè)信道不會(huì)由于反壓阻塞其他邏輯信道,因此整體資源利用率會(huì)增加。此外,通過(guò)完全控制正在發(fā)送的數(shù)據(jù)量,我們還能夠加快 Checkpoint alignment:如果沒(méi)有流量控制,通道需要一段時(shí)間才能填滿(mǎn)網(wǎng)絡(luò)協(xié)議棧的內(nèi)部緩沖區(qū)并表明接收端不再讀取數(shù)據(jù)了。在這段時(shí)間里,大量的 Buffer 不會(huì)被處理。任何 Checkpoint barrier(觸發(fā) Checkpoint 的消息)都必須在這些數(shù)據(jù) Buffer 后排隊(duì),因此必須等到所有這些數(shù)據(jù)都被處理后才能夠觸發(fā) Checkpoint(“Barrier 不會(huì)在數(shù)據(jù)之前被處理!”)。
但是,來(lái)自接收方的附加通告消息(向發(fā)送端通知 Credit)可能會(huì)產(chǎn)生一些額外的開(kāi)銷(xiāo),尤其是在使用 SSL 加密信道的場(chǎng)景中。此外,單個(gè)輸入通道( Input channel)不能使用緩沖池中的所有 Buffer,因?yàn)榇嬖跓o(wú)法共享的 Exclusive buffer。新的流控協(xié)議也有可能無(wú)法做到立即發(fā)送盡可能多的數(shù)據(jù)(如果生成數(shù)據(jù)的速度快于接收端反饋 Credit 的速度),這時(shí)則可能增長(zhǎng)發(fā)送數(shù)據(jù)的時(shí)間。雖然這可能會(huì)影響作業(yè)的性能,但由于其所有優(yōu)點(diǎn),通常新的流量控制會(huì)表現(xiàn)得更好??赡軙?huì)通過(guò)增加單個(gè)通道的獨(dú)占 Buffer 數(shù)量,這會(huì)增大內(nèi)存開(kāi)銷(xiāo)。然而,與先前實(shí)現(xiàn)相比,總體內(nèi)存使用可能仍然會(huì)降低,因?yàn)榈讓拥木W(wǎng)絡(luò)協(xié)議棧不再需要緩存大量數(shù)據(jù),因?yàn)槲覀兛偸强梢粤⒓磳⑵鋫鬏數(shù)?Flink(一定會(huì)有相應(yīng)的 Buffer 接收數(shù)據(jù))。
在使用新的 Credit-based 流量控制時(shí),可能還會(huì)注意到另一件事:由于我們?cè)诎l(fā)送方和接收方之間緩沖較少的數(shù)據(jù),反壓可能會(huì)更早的到來(lái)。然而,這是我們所期望的,因?yàn)榫彺娓鄶?shù)據(jù)并沒(méi)有真正獲得任何好處。如果要緩存更多的數(shù)據(jù)并且保留 Credit-based 流量控制,可以考慮通過(guò)增加單個(gè)輸入共享 Buffer 的數(shù)量。
注意:如果需要關(guān)閉 Credit-based 流量控制,可以將這個(gè)配置添加到 flink-conf.yaml 中:taskmanager.network.credit-model:false。但是,此參數(shù)已過(guò)時(shí),最終將與非 Credit-based 流控制代碼一起刪除。
4.序列號(hào)與反序列化
下圖從上面的擴(kuò)展了更高級(jí)別的視圖,其中包含網(wǎng)絡(luò)協(xié)議棧及其周?chē)M件的更多詳細(xì)信息,從發(fā)送算子發(fā)送記錄(Record)到接收算子獲取它:
在生成 Record 并將其傳遞出去之后,例如通過(guò) Collector#collect(),它被傳遞給 RecordWriter,RecordWriter 會(huì)將 Java 對(duì)象序列化為字節(jié)序列,最終存儲(chǔ)在 Buffer 中按照上面所描述的在網(wǎng)絡(luò)協(xié)議棧中進(jìn)行處理。RecordWriter 首先使用 SpanningRecordSerializer 將 Record 序列化為靈活的堆上字節(jié)數(shù)組。然后,它嘗試將這些字節(jié)寫(xiě)入目標(biāo)網(wǎng)絡(luò) Channel 的 Buffer 中。我們將在下面的章節(jié)回到這一部分。
在接收方,底層網(wǎng)絡(luò)協(xié)議棧(Netty)將接收到的 Buffer 寫(xiě)入相應(yīng)的輸入通道(Channel)。流任務(wù)的線(xiàn)程最終從這些隊(duì)列中讀取并嘗試在 RecordReader 的幫助下通過(guò) SpillingAdaptiveSpanningRecordDeserializer 將累積的字節(jié)反序列化為 Java 對(duì)象。與序列化器類(lèi)似,這個(gè)反序列化器還必須處理特殊情況,例如跨越多個(gè)網(wǎng)絡(luò) Buffer 的 Record,或者因?yàn)橛涗洷旧肀染W(wǎng)絡(luò)緩沖區(qū)大(默認(rèn)情況下為32KB,通過(guò) taskmanager.memory.segment-size 設(shè)置)或者因?yàn)樾蛄谢?Record 時(shí),目標(biāo) Buffer 中已經(jīng)沒(méi)有足夠的剩余空間保存序列化后的字節(jié)數(shù)據(jù),在這種情況下,F(xiàn)link 將使用這些字節(jié)空間并繼續(xù)將其余字節(jié)寫(xiě)入新的網(wǎng)絡(luò) Buffer 中。
4.1 將網(wǎng)絡(luò) Buffer 寫(xiě)入 Netty
在上圖中,Credit-based 流控制機(jī)制實(shí)際上位于“Netty Server”(和“Netty Client”)組件內(nèi)部,RecordWriter 寫(xiě)入的 Buffer 始終以空狀態(tài)(無(wú)數(shù)據(jù))添加到 Subpartition 中,然后逐漸向其中填寫(xiě)序列化后的記錄。但是 Netty 在什么時(shí)候真正的獲取并發(fā)送這些 Buffer 呢?顯然,不能是 Buffer 中只要有數(shù)據(jù)就發(fā)送,因?yàn)榭缇€(xiàn)程(寫(xiě)線(xiàn)程與發(fā)送線(xiàn)程)的數(shù)據(jù)交換與同步會(huì)造成大量的額外開(kāi)銷(xiāo),并且會(huì)造成緩存本身失去意義(如果是這樣的話(huà),不如直接將將序列化后的字節(jié)發(fā)到網(wǎng)絡(luò)上而不必引入中間的 Buffer)。
在 Flink 中,有三種情況可以使 Netty 服務(wù)端使用(發(fā)送)網(wǎng)絡(luò) Buffer:
寫(xiě)入 Record 時(shí) Buffer 變滿(mǎn),或者 Buffer 超時(shí)未被發(fā)送,或 發(fā)送特殊消息,例如 Checkpoint barrier。
▼ 在 Buffer 滿(mǎn)后發(fā)送
RecordWriter 將 Record 序列化到本地的序列化緩沖區(qū)中,并將這些序列化后的字節(jié)逐漸寫(xiě)入位于相應(yīng) Result subpartition 隊(duì)列中的一個(gè)或多個(gè)網(wǎng)絡(luò) Buffer中。雖然單個(gè) RecordWriter 可以處理多個(gè) Subpartition,但每個(gè) Subpartition 只會(huì)有一個(gè) RecordWriter 向其寫(xiě)入數(shù)據(jù)。另一方面,Netty 服務(wù)端線(xiàn)程會(huì)從多個(gè) Result subpartition 中讀取并像上面所說(shuō)的那樣將數(shù)據(jù)寫(xiě)入適當(dāng)?shù)亩嗦窂?fù)用信道。這是一個(gè)典型的生產(chǎn)者 - 消費(fèi)者模式,網(wǎng)絡(luò)緩沖區(qū)位于生產(chǎn)者與消費(fèi)者之間,如下圖所示。在(1)序列化和(2)將數(shù)據(jù)寫(xiě)入 Buffer 之后,RecordWriter 會(huì)相應(yīng)地更新緩沖區(qū)的寫(xiě)入索引。一旦 Buffer 完全填滿(mǎn),RecordWriter 會(huì)(3)為當(dāng)前 Record 剩余的字節(jié)或者下一個(gè) Record 從其本地緩沖池中獲取新的 Buffer,并將新的 Buffer 添加到相應(yīng) Subpartition 的隊(duì)列中。這將(4)通知 Netty服務(wù)端線(xiàn)程有新的數(shù)據(jù)可發(fā)送(如果 Netty 還不知道有可用的數(shù)據(jù)的話(huà)4)。每當(dāng) Netty 有能力處理這些通知時(shí),它將(5)從隊(duì)列中獲取可用 Buffer 并通過(guò)適當(dāng)?shù)?TCP 通道發(fā)送它。
注釋4:如果隊(duì)列中有更多已完成的 Buffer,我們可以假設(shè) Netty 已經(jīng)收到通知。
▼ 在 Buffer 超時(shí)后發(fā)送
為了支持低延遲應(yīng)用,我們不能只等到 Buffer 滿(mǎn)了才向下游發(fā)送數(shù)據(jù)。因?yàn)榭赡艽嬖谶@種情況,某種通信信道沒(méi)有太多數(shù)據(jù),等到 Buffer 滿(mǎn)了在發(fā)送會(huì)不必要地增加這些少量 Record 的處理延遲。因此,F(xiàn)link 提供了一個(gè)定期 Flush 線(xiàn)程(the output flusher)每隔一段時(shí)間會(huì)將任何緩存的數(shù)據(jù)全部寫(xiě)出??梢酝ㄟ^(guò) StreamExecutionEnvironment#setBufferTimeout 配置 Flush 的間隔,并作為延遲5的上限(對(duì)于低吞吐量通道)。下圖顯示了它與其他組件的交互方式:RecordWriter 如前所述序列化數(shù)據(jù)并寫(xiě)入網(wǎng)絡(luò) Buffer,但同時(shí),如果 Netty 還不知道有數(shù)據(jù)可以發(fā)送,Output flusher 會(huì)(3,4)通知 Netty 服務(wù)端線(xiàn)程數(shù)據(jù)可讀(類(lèi)似與上面的“buffer已滿(mǎn)”的場(chǎng)景)。當(dāng) Netty 處理此通知(5)時(shí),它將消費(fèi)(獲取并發(fā)送)Buffer 中的可用數(shù)據(jù)并更新 Buffer 的讀取索引。Buffer 會(huì)保留在隊(duì)列中——從 Netty 服務(wù)端對(duì)此 Buffer 的任何進(jìn)一步操作將在下次從讀取索引繼續(xù)讀取。
注釋5:嚴(yán)格來(lái)說(shuō),Output flusher 不提供任何保證——它只向 Netty 發(fā)送通知,而 Netty 線(xiàn)程會(huì)按照能力與意愿進(jìn)行處理。這也意味著如果存在反壓,則 Output flusher 是無(wú)效的。
▼ 特殊消息后發(fā)送
一些特殊的消息如果通過(guò) RecordWriter 發(fā)送,也會(huì)觸發(fā)立即 Flush 緩存的數(shù)據(jù)。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件,這些事件應(yīng)該盡快被發(fā)送,而不應(yīng)該等待 Buffer 被填滿(mǎn)或者 Output flusher 的下一次 Flush。
▼ 進(jìn)一步的討論
與小于 1.5 版本的 Flink 不同,請(qǐng)注意(a)網(wǎng)絡(luò) Buffer 現(xiàn)在會(huì)被直接放在 Subpartition 的隊(duì)列中,(b)網(wǎng)絡(luò) Buffer 不會(huì)在 Flush 之后被關(guān)閉。這給我們帶來(lái)了一些好處:
同步開(kāi)銷(xiāo)較少(Output flusher 和 RecordWriter 是相互獨(dú)立的) 在高負(fù)荷情況下,Netty 是瓶頸(直接的網(wǎng)絡(luò)瓶頸或反壓),我們?nèi)匀豢梢栽谖赐瓿傻?Buffer 中填充數(shù)據(jù) Netty 通知顯著減少
但是,在低負(fù)載情況下,可能會(huì)出現(xiàn) CPU 使用率和 TCP 數(shù)據(jù)包速率的增加。這是因?yàn)椋現(xiàn)link 將使用任何可用的 CPU 計(jì)算能力來(lái)嘗試維持所需的延遲。一旦負(fù)載增加,F(xiàn)link 將通過(guò)填充更多的 Buffer 進(jìn)行自我調(diào)整。由于同步開(kāi)銷(xiāo)減少,高負(fù)載場(chǎng)景不會(huì)受到影響,甚至可以實(shí)現(xiàn)更高的吞吐。
4.2 BufferBuilder 和 BufferConsumer
更深入地了解 Flink 中是如何實(shí)現(xiàn)生產(chǎn)者 - 消費(fèi)者機(jī)制,需要仔細(xì)查看 Flink 1.5 中引入的 BufferBuilder 和 BufferConsumer 類(lèi)。雖然讀取是以 Buffer 為粒度,但寫(xiě)入它是按 Record 進(jìn)行的,因此是 Flink 中所有網(wǎng)絡(luò)通信的核心路徑。因此,我們需要在任務(wù)線(xiàn)程(Task thread)和 Netty 線(xiàn)程之間實(shí)現(xiàn)輕量級(jí)連接,這意味著盡量小的同步開(kāi)銷(xiāo)。你可以通過(guò)查看源代碼獲取更加詳細(xì)的信息。
5. 延遲與吞吐
引入網(wǎng)絡(luò) Buffer 的目是獲得更高的資源利用率和更高的吞吐,代價(jià)是讓 Record 在 Buffer 中等待一段時(shí)間。雖然可以通過(guò) Buffer 超時(shí)給出此等待時(shí)間的上限,但可能很想知道有關(guān)這兩個(gè)維度(延遲和吞吐)之間權(quán)衡的更多信息,顯然,無(wú)法兩者同時(shí)兼得。下圖顯示了不同的 Buffer 超時(shí)時(shí)間下的吞吐,超時(shí)時(shí)間從 0 開(kāi)始(每個(gè) Record 直接 Flush)到 100 毫秒(默認(rèn)值),測(cè)試在具有 100 個(gè)節(jié)點(diǎn)每個(gè)節(jié)點(diǎn) 8 個(gè) Slot 的群集上運(yùn)行,每個(gè)節(jié)點(diǎn)運(yùn)行沒(méi)有業(yè)務(wù)邏輯的 Task 因此只用于測(cè)試網(wǎng)絡(luò)協(xié)議棧的能力。為了進(jìn)行比較,我們還測(cè)試了低延遲改進(jìn)(如上所述)之前的 Flink 1.4 版本。
如圖,使用 Flink 1.5+,即使是非常低的 Buffer 超時(shí)(例如1ms)(對(duì)于低延遲場(chǎng)景)也提供高達(dá)超時(shí)默認(rèn)參數(shù)(100ms)75% 的最大吞吐,但會(huì)緩存更少的數(shù)據(jù)。
6.結(jié)論
了解 Result partition,批處理和流式計(jì)算的不同網(wǎng)絡(luò)連接以及調(diào)度類(lèi)型,Credit-Based 流量控制以及 Flink 網(wǎng)絡(luò)協(xié)議棧內(nèi)部的工作機(jī)理,有助于更好的理解網(wǎng)絡(luò)協(xié)議棧相關(guān)的參數(shù)以及作業(yè)的行為。后續(xù)我們會(huì)推出更多 Flink 網(wǎng)絡(luò)棧的相關(guān)內(nèi)容,并深入更多細(xì)節(jié),包括運(yùn)維相關(guān)的監(jiān)控指標(biāo)(Metrics),進(jìn)一步的網(wǎng)絡(luò)調(diào)優(yōu)策略以及需要避免的常見(jiàn)錯(cuò)誤等。
以上就是小編為大家?guī)?lái)的Apache Flink的網(wǎng)絡(luò)協(xié)議棧詳細(xì)介紹的全部?jī)?nèi)容,希望能對(duì)您有所幫助,小伙伴們有空可以來(lái)腳本之家網(wǎng)站,我們的網(wǎng)站上還有許多其它的資料等著小伙伴來(lái)挖掘哦!
相關(guān)文章
IOT通信協(xié)議有哪些?物聯(lián)網(wǎng)七大通信協(xié)議對(duì)比介紹
物聯(lián)網(wǎng)簡(jiǎn)稱(chēng)iot,本文中為大家的是物聯(lián)網(wǎng)的7大協(xié)議以及對(duì)比,有需要的朋友可以閱讀本文參考一下2019-06-285G時(shí)代 HTTP和DNS協(xié)議將發(fā)生哪些變化?
HTTP和DNS這兩種協(xié)議幾乎已經(jīng)成為家喻戶(hù)曉,現(xiàn)在5G時(shí)代的來(lái)臨,這些協(xié)議也都將發(fā)生巨大的變化,本文就介紹下HTTP和DNS這些協(xié)議在未來(lái)會(huì)如何發(fā)展的,如何改變的2017-07-05最常用路由協(xié)議RIP-1/2 OSPF IS-IS BGP的特點(diǎn)對(duì)比
RIP協(xié)議是最早的路由協(xié)議,OSPF是目前應(yīng)用最廣泛的IGP協(xié)議,IS-IS是另外一種鏈路狀態(tài)型的路由協(xié)議,BGP協(xié)議是唯一的EGP協(xié)議,那么這幾種路由協(xié)議有什么特點(diǎn)和不同呢?下面就2017-04-06如何屏蔽https網(wǎng)站、禁止訪(fǎng)問(wèn)https、禁止跳轉(zhuǎn)https的方法
由于網(wǎng)絡(luò)安全形勢(shì)越發(fā)嚴(yán)峻,為了保護(hù)用戶(hù)隱私和網(wǎng)絡(luò)安全,越來(lái)越多的網(wǎng)站都開(kāi)啟了HTTPS,如何禁止訪(fǎng)問(wèn)HTTPS網(wǎng)站、如何屏蔽HTTPS網(wǎng)站就成為重要的網(wǎng)絡(luò)管理工作,下面就來(lái)看2017-03-29無(wú)線(xiàn)網(wǎng)絡(luò)IEEE802.11/IEEE802.3協(xié)議標(biāo)準(zhǔn)和區(qū)別
IEEE802協(xié)議是一種物理協(xié)議集,而以太網(wǎng)協(xié)議是由一組IEEE 802.3標(biāo)準(zhǔn)定義的局域網(wǎng)協(xié)議集,下面就為大家介紹下IEEE802.11/IEEE802.3協(xié)議標(biāo)準(zhǔn)和區(qū)別,大家了解下吧2017-03-27LoRa與ZigBee有什么區(qū)別?LoRa與ZigBee技術(shù)全面分析
ZigBee是基于IEEE802.15.4標(biāo)準(zhǔn)的低功耗局域網(wǎng)協(xié)議,LoRa 是LPWAN通信技術(shù)中的一種,那么兩者之間有什么區(qū)別和聯(lián)系呢?下面就詳情來(lái)為大家解析下,希望對(duì)大家有幫助2017-03-26- 報(bào)文是HTTP協(xié)議下請(qǐng)求和響應(yīng)的信息基礎(chǔ),這里就帶大家來(lái)逐步解讀HTTP報(bào)文的組成及含義,需要的朋友可以參考下2016-06-16
SIP協(xié)議錯(cuò)誤代碼code大全(中英文對(duì)照)
這篇文章主要介紹了SIP協(xié)議錯(cuò)誤代碼code大全(中英文對(duì)照),需要的朋友可以參考下2016-05-16- 這篇文章主要為大家詳細(xì)介紹了查看電腦MAC地址的方法,教大家如何查看電腦的MAC地址,感興趣的小伙伴們可以參考一下2016-04-06
- 這篇文章主要介紹了圖解TCP/IP協(xié)議,幫助大家輕松學(xué)會(huì)TCP/IP協(xié)議,需要的朋友可以參考下2015-12-16