Flink狀態(tài)和容錯(cuò)源碼解析
引言
- 計(jì)算模型
- DataStream基礎(chǔ)框架
- 事件時(shí)間和窗口
- 狀態(tài)和容錯(cuò)
- 部署&調(diào)度
- 存儲(chǔ)體系
- 底層支撐
Flink中提供了State(狀態(tài))這個(gè)概念來(lái)保存中間計(jì)算結(jié)果和緩存數(shù)據(jù),按照不同的場(chǎng)景,F(xiàn)link提供了多種不同類型的State,同時(shí)為了實(shí)現(xiàn)Exactly once的語(yǔ)義,F(xiàn)link參考Chandy-Lamport算法實(shí)現(xiàn)了Asynchronous Barrier Snapshotting算法(簡(jiǎn)稱ABS),本篇我們來(lái)了解Flink狀態(tài)的底層實(shí)現(xiàn)及如何進(jìn)行快照處理。
概述
Flink中提供了2種State,一種是Keyed State,是用在Keyed DataStream(即每條記錄是有一個(gè)key)。即每個(gè)key對(duì)應(yīng)有一個(gè)狀態(tài)信息,用于數(shù)據(jù)處理場(chǎng)景中需要按key記錄狀態(tài)的場(chǎng)景,如風(fēng)控場(chǎng)景下記錄用戶的狀態(tài),window操作中記錄窗口的數(shù)據(jù)信息也是用keyed State來(lái)實(shí)現(xiàn)的。另一種是Operator State,即算子的狀態(tài),如在KafkaSource中記錄消費(fèi)的偏移量。本篇將從狀態(tài)的管理、數(shù)據(jù)的存儲(chǔ)和備份恢復(fù)等來(lái)介紹底層機(jī)制。
State
Keyed State
Keyed State按照存儲(chǔ)數(shù)據(jù)類型的不同,分為如下幾類
- ValueState: 保存一個(gè)值,支持查詢和更新
- ListState: 保存一個(gè)元素列表
- ReducingState: 保存一個(gè)單值,表示添加到狀態(tài)的所有數(shù)據(jù)的聚合,有定義一個(gè)ReduceFunction來(lái)進(jìn)行聚合處理
- AggregatingState<IN, OUT>: 保留一個(gè)單值,也是添加到狀態(tài)的所有數(shù)據(jù)的聚合,和ReducingState不同的是,輸入數(shù)據(jù)和結(jié)果值的類型可以不同。
- MapState: 維護(hù)的一個(gè)映射列表 創(chuàng)建這些狀態(tài)時(shí)需要同時(shí)定義一個(gè)StateDescriptor,這里面定義一個(gè)狀態(tài)的名字(唯一名稱),通過(guò)StateDescriptor就可以來(lái)引用具體狀態(tài)實(shí)例。
狀態(tài)實(shí)例管理及數(shù)據(jù)存儲(chǔ)
從Keyed State的定義來(lái)看,這里的關(guān)系應(yīng)該是一個(gè)key到State的映射,而在使用時(shí)卻沒(méi)有看到這個(gè)key,另外具體的狀態(tài)是如何保存的,本節(jié)我們來(lái)深入分析
從上圖可以看出,各種不同的State,是通過(guò)KeyedStateStore來(lái)獲取到的,KeyedStateStore只是一個(gè)代理類,其底層是調(diào)用KeyedStateBackend來(lái)負(fù)責(zé)具體的處理,具體的實(shí)現(xiàn)類有如下2個(gè)
- HeapKeyedStateBackend: 基于內(nèi)存的StateBackend,把數(shù)據(jù)保存在Java的Heap里面
- RocksDBKeyedStateBackend: 狀態(tài)數(shù)據(jù)保存在RocksDB中
下面我們從如下幾個(gè)功能點(diǎn)來(lái)看具體各個(gè)StateBackend的實(shí)現(xiàn)
- 各個(gè)實(shí)例的管理
- 內(nèi)部數(shù)據(jù)保存
- 數(shù)據(jù)過(guò)期處理
- 數(shù)據(jù)快照處理
- 數(shù)據(jù)重分布
HeapKeyedStateBackend
基于內(nèi)存的StateBackend,將數(shù)據(jù)保存在Java的Heap中,適合小數(shù)據(jù)狀態(tài)場(chǎng)景
各個(gè)實(shí)例的管理
private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
使用一個(gè)Map來(lái)保存各個(gè)不同的KeyedState,key為定義的名字,StateTable中存儲(chǔ)的具體的數(shù)據(jù)。StateTable的內(nèi)部在下面內(nèi)部數(shù)據(jù)保存中介紹
- 內(nèi)部數(shù)據(jù)保存 內(nèi)部數(shù)據(jù)存儲(chǔ)是通過(guò)StateTable來(lái)管理的,里面定義了一個(gè)StateMap的數(shù)組來(lái)存儲(chǔ)具體的數(shù)據(jù),具體的數(shù)據(jù)通過(guò)計(jì)算key的KeyGroupIndex來(lái)確定把數(shù)據(jù)存放到哪個(gè)StateMap。
- StateMap:存儲(chǔ)數(shù)據(jù),具體的實(shí)現(xiàn)有CopyOnWriteStateMap和CopyOnWriteSkipListStateMap2個(gè)。
- KeyGroup:將key分組管理,這樣方便在后續(xù)通過(guò)savepoint來(lái)恢復(fù)任務(wù)時(shí)如果調(diào)整了并行度,這樣方便對(duì)key按KeyGroup重新分布。keyGroup的計(jì)算為將key做hash處理后按最大并行度(maxParallelism)取余
- 數(shù)據(jù)過(guò)期處理 如果有設(shè)置了數(shù)據(jù)過(guò)期處理,這種生成的State會(huì)每個(gè)value都帶上一個(gè)時(shí)間戳數(shù)據(jù),如MapState是每個(gè)value都帶一個(gè)時(shí)間戳,具體的實(shí)例類型也會(huì)不同,對(duì)應(yīng)為如下幾個(gè)
- TtlValueState
- TtlListState
- TtlReducingState
- TtlAggregatingState
- TtlMapState 具體數(shù)據(jù)的清理處理通過(guò)TtlIncrementalCleanup類來(lái)實(shí)現(xiàn)
- 數(shù)據(jù)快照處理 HeapKeyedStateBackend的snapshot處理由類HeapSnapshotStrategy來(lái)負(fù)責(zé),調(diào)用的方法為asyncSnapshot(),只支持全量的快照處理。
- 數(shù)據(jù)重分布 在實(shí)際的數(shù)據(jù)處理中由于并行度設(shè)置的不合理,在日常運(yùn)維過(guò)程中會(huì)涉及到并行度的調(diào)整,算子的并行度調(diào)整后,那對(duì)key的分布也會(huì)進(jìn)行調(diào)整,這樣就會(huì)導(dǎo)致keyedState的數(shù)據(jù)進(jìn)行重分布。Flink中引入了一個(gè)KeyGroup的概念,其對(duì)key做了個(gè)分組管理,KeyGroup的個(gè)數(shù)為最大并行度,具體實(shí)現(xiàn)為將key進(jìn)行hash后然分類(hash值對(duì)KeyGroup數(shù)取余)到其中一個(gè)KeyGroup中。
RocksDBKeyedStateBackend
使用RocksDB來(lái)存儲(chǔ)狀態(tài),這個(gè)State backend可以存儲(chǔ)非常大的狀態(tài),如果超過(guò)了內(nèi)存可以split到磁盤上。同樣我們分如下幾個(gè)階段來(lái)了解相關(guān)具體的實(shí)現(xiàn)
- 各個(gè)實(shí)例的管理 通過(guò)如下的Map來(lái)存儲(chǔ)定義的各個(gè)狀態(tài)信息,和前面HeapKeyedStateBackend類似,key為自己定義的名稱
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
- 內(nèi)部數(shù)據(jù)保存 從上面可以看到每個(gè)key對(duì)應(yīng)的value是RocksDbKvStateInfo,這個(gè)其中有如下2個(gè)屬性 ColumnFamilyHandle:rocksdb庫(kù)中的類,提供了一個(gè)handle對(duì)應(yīng)到rocksdb的ColumnFamily。
public static class RocksDbKvStateInfo implements AutoCloseable { public final ColumnFamilyHandle columnFamilyHandle; public final RegisteredStateMetaInfoBase metaInfo; }
- 數(shù)據(jù)過(guò)期處理 通過(guò)RocksDB的CompactionFilter[1]功能來(lái)實(shí)現(xiàn)數(shù)據(jù)過(guò)期的處理,RocksDB項(xiàng)目中專門有個(gè)FlinkCompactionFilter的類用于Flink項(xiàng)目 CompactionFilter提供了一種在rocksdb進(jìn)行compaction時(shí)候,根據(jù)自定義邏輯去刪除/修改key/value對(duì)的方法。如根據(jù)業(yè)務(wù)的ttl屬性刪除過(guò)期keys。
- 數(shù)據(jù)快照處理 RocksDBKeyedStateBackend支持2種快照處理方式,全量和增量,由于RocksDB底層是使用lsm樹來(lái)進(jìn)行存儲(chǔ)的,所以比較方便實(shí)現(xiàn)增量數(shù)據(jù)的獲取
- 數(shù)據(jù)恢復(fù)處理 和前面HeapKeyedStateBackend的類似,這里就不再展開了。
OperatorState
接下面我們深入了解下OperatorState,在實(shí)際使用中如我們消費(fèi)了kafka的數(shù)據(jù)需要記錄kafka消費(fèi)的offset,還有一些場(chǎng)景需要將一些信息分發(fā)到所有的任務(wù)。這里需要使用到一類新的狀態(tài)處理,這種狀態(tài)是與每個(gè)算子綁定的,F(xiàn)link提供了如下3個(gè)類來(lái)支持
- ListState:通過(guò)一個(gè)list來(lái)保存相關(guān)的狀態(tài)信息,如果并行度調(diào)整了,其中的數(shù)據(jù)按新的并行度重新進(jìn)行分布處理;
- UnionListState: 保存數(shù)據(jù)時(shí)和ListState類似,但是在出錯(cuò)和從savepoint恢復(fù)數(shù)據(jù)時(shí)的策略會(huì)不一樣,其會(huì)將所有的狀態(tài)通過(guò)廣播的方式發(fā)給下游的每個(gè)任務(wù),然后下游的任務(wù)來(lái)選擇自己需要的部分;
- BroadcastState: 用于每個(gè)任務(wù)上的狀態(tài)都要一樣的場(chǎng)景,這個(gè)在數(shù)據(jù)恢復(fù)時(shí)是把狀態(tài)復(fù)制到所有的任務(wù)。
類似KeyedState,這里也是通過(guò)一個(gè)Backend來(lái)管理對(duì)應(yīng)的狀態(tài)數(shù)據(jù),其接口定義為:OperatorStateStore。其實(shí)現(xiàn)類目前只有一個(gè):DefaultOperatorStateBackend,我們也通過(guò)以下幾個(gè)部分來(lái)分別了解DefaultOperatorStateBackend的實(shí)現(xiàn)
- 各個(gè)實(shí)例的管理
private final Map<String, PartitionableListState<?>> registeredOperatorStates; private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
這里通過(guò)2個(gè)Map來(lái)分別管理ListState(含UnionListState,后面都統(tǒng)一使用ListState來(lái)代指)和BroadcastState。 2. 內(nèi)部數(shù)據(jù)保存 ListState的實(shí)現(xiàn)類為PartitionableListState,底層通過(guò)一個(gè)ArrayList來(lái)保存數(shù)據(jù)。BroadcastState定義的接口是kv的,所以其實(shí)現(xiàn)類HeapBroadcastState使用Map來(lái)存儲(chǔ)相應(yīng)的數(shù)據(jù)。 3. 數(shù)據(jù)過(guò)期處理 operatorState不涉及到數(shù)據(jù)過(guò)期的處理 4. 數(shù)據(jù)快照處理 DefaultOperatorStateBackendSnapshotStrategy類來(lái)負(fù)責(zé)具體的快照處理,調(diào)用的方法為asyncSnapshot,分別對(duì)ListState和BroadcastState進(jìn)行快照處理 5. 數(shù)據(jù)重分布 數(shù)據(jù)重分布的策略前面介紹各個(gè)State時(shí)已經(jīng)介紹了,這里就不再重復(fù)介紹了。
上層封裝
Flink中對(duì)狀態(tài)的backend和checkpoint存儲(chǔ)策略進(jìn)行了封裝定義。 StateBackend:定義一個(gè)Streaming應(yīng)用的state如何在集群中的本地存儲(chǔ)。不同的實(shí)現(xiàn)使用不同的數(shù)據(jù)結(jié)構(gòu)來(lái)存儲(chǔ)對(duì)應(yīng)的狀態(tài)。其具體實(shí)現(xiàn)有如下2個(gè):
- HashMapStateBackend:將狀態(tài)保存在TaskManager的內(nèi)存中(JVM heap),其底層KeyedStateBackend使用的是HeapKeyedStateBackend,OperatorStateBackend使用的是DefaultOperatorStateBackend
- EmbeddedRocksDBStateBackend:狀態(tài)保存在一個(gè)嵌入的Rockdb實(shí)例中,其底層實(shí)現(xiàn)KeyedStateBackend使用的是RocksDBKeyedStateBackend,OperatorStateBackend使用的和HashMapStateBackend一致,也是DefaultOperatorStateBackend 說(shuō)明:原來(lái)的RocksDBStateBackend、FsStateBackend和MemoryStateBackend不建議使用了。 另外對(duì)checkpoint的存儲(chǔ),CheckpointStorage接口定義了在Streaming應(yīng)用中StateBackend在容錯(cuò)性方面如何存儲(chǔ)其狀態(tài)數(shù)據(jù)。其實(shí)現(xiàn)有如下2個(gè)
- JobManagerCheckpointStorage:將checkpoints狀態(tài)數(shù)據(jù)存儲(chǔ)到JobManager的內(nèi)存中,savepoints保存到文件系統(tǒng)
- FileSystemCheckpointStorage:將checkpoints狀態(tài)存儲(chǔ)到文件系統(tǒng),如保存到HDFS上。
總結(jié)
本篇我們了解了Flink的相關(guān)的狀態(tài)的內(nèi)容和checkpoint的保存策略。
- State分為KeyedState和OperatorState,KeyedState主要存儲(chǔ)針對(duì)記錄級(jí)別的狀態(tài),如window操作時(shí)的狀態(tài)。OperatorState主要存儲(chǔ)針對(duì)算子的狀態(tài),如消費(fèi)kafka的offset信息等;每類狀態(tài)分別提供了不同種類的狀態(tài)類來(lái)支持不同場(chǎng)景的狀態(tài)保存需求
- Flink底層通過(guò)StateBackend來(lái)保存對(duì)應(yīng)的狀態(tài)數(shù)據(jù),主要有通過(guò)內(nèi)存保存和使用RocksDB保存這2類,另外在其具體實(shí)現(xiàn)中為了方便并行度調(diào)整后對(duì)狀態(tài)的重新拆分處理,引入了KeyGroup的概念
- 在用戶使用層,對(duì)上述2種狀態(tài)進(jìn)行了封裝,接口為:StateBackend,來(lái)管理KeyedState和OperatorState,另外將checkpoint的存儲(chǔ)策略從原來(lái)的StateBackend中拆分出來(lái)。目前支持有基于內(nèi)存和外部存儲(chǔ)的2類checkpoint存儲(chǔ)策略。 最后由于checkpoint的觸發(fā)以及任務(wù)恢復(fù)的處理與整體計(jì)算處理比較緊密,這塊等介紹完Flink部署模式后再來(lái)詳細(xì)梳理checkpoint的處理過(guò)程。
參考文檔
以上就是Flink狀態(tài)和容錯(cuò)源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Flink狀態(tài)容錯(cuò)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解決 IDEA 創(chuàng)建 Gradle 項(xiàng)目沒(méi)有src目錄問(wèn)題
這篇文章主要介紹了解決 IDEA 創(chuàng)建 Gradle 項(xiàng)目沒(méi)有src目錄問(wèn)題,本文圖文并茂給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-06-06spring-cloud入門之spring-cloud-config(配置中心)
這篇文章主要介紹了spring-cloud入門之spring-cloud-config(配置中心),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-01-01Java使用Queryable-pageable實(shí)現(xiàn)分頁(yè)效果
這篇文章主要為大家介紹了Java如何使用Queryable-pageable從而實(shí)現(xiàn)分頁(yè)效果,文中的示例代碼簡(jiǎn)潔易懂,感興趣的小伙伴可以動(dòng)手嘗試一下2022-06-06IDEA使用Tomcat運(yùn)行web項(xiàng)目教程分享
在非Spring Boot項(xiàng)目中運(yùn)行Nacos示例,需要手動(dòng)配置Tomcat容器,本文介紹了如何在IDEA中配置Tomcat,并詳細(xì)解決了配置過(guò)程中可能遇到的異常情況,步驟包括修改IDEA項(xiàng)目結(jié)構(gòu)、添加Web模塊、配置Artifacts和Tomcat Server2024-10-10JavaScript中new運(yùn)算符的實(shí)現(xiàn)過(guò)程解析
這篇文章主要介紹了JavaScript中new運(yùn)算符的實(shí)現(xiàn)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10SpringBoot集成Swagger2生成接口文檔的方法示例
我們提供Restful接口的時(shí)候,API文檔是尤為的重要,它承載著對(duì)接口的定義,描述等,本文主要介紹了SpringBoot集成Swagger2生成接口文檔的方法示例,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-12-12SpringMVC xml文件路徑在web.xml中的配置方式
這篇文章主要介紹了SpringMVC xml文件路徑在web.xml中的配置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09java token生成和校驗(yàn)的實(shí)例代碼
這篇文章主要介紹了java token生成和校驗(yàn)的實(shí)例代碼,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-09-09