Flink狀態(tài)和容錯源碼解析
引言
- 計算模型
- DataStream基礎(chǔ)框架
- 事件時間和窗口
- 狀態(tài)和容錯
- 部署&調(diào)度
- 存儲體系
- 底層支撐
Flink中提供了State(狀態(tài))這個概念來保存中間計算結(jié)果和緩存數(shù)據(jù),按照不同的場景,F(xiàn)link提供了多種不同類型的State,同時為了實現(xiàn)Exactly once的語義,F(xiàn)link參考Chandy-Lamport算法實現(xiàn)了Asynchronous Barrier Snapshotting算法(簡稱ABS),本篇我們來了解Flink狀態(tài)的底層實現(xiàn)及如何進行快照處理。
概述
Flink中提供了2種State,一種是Keyed State,是用在Keyed DataStream(即每條記錄是有一個key)。即每個key對應(yīng)有一個狀態(tài)信息,用于數(shù)據(jù)處理場景中需要按key記錄狀態(tài)的場景,如風控場景下記錄用戶的狀態(tài),window操作中記錄窗口的數(shù)據(jù)信息也是用keyed State來實現(xiàn)的。另一種是Operator State,即算子的狀態(tài),如在KafkaSource中記錄消費的偏移量。本篇將從狀態(tài)的管理、數(shù)據(jù)的存儲和備份恢復(fù)等來介紹底層機制。
State
Keyed State
Keyed State按照存儲數(shù)據(jù)類型的不同,分為如下幾類
- ValueState: 保存一個值,支持查詢和更新
- ListState: 保存一個元素列表
- ReducingState: 保存一個單值,表示添加到狀態(tài)的所有數(shù)據(jù)的聚合,有定義一個ReduceFunction來進行聚合處理
- AggregatingState<IN, OUT>: 保留一個單值,也是添加到狀態(tài)的所有數(shù)據(jù)的聚合,和ReducingState不同的是,輸入數(shù)據(jù)和結(jié)果值的類型可以不同。
- MapState: 維護的一個映射列表 創(chuàng)建這些狀態(tài)時需要同時定義一個StateDescriptor,這里面定義一個狀態(tài)的名字(唯一名稱),通過StateDescriptor就可以來引用具體狀態(tài)實例。
狀態(tài)實例管理及數(shù)據(jù)存儲
從Keyed State的定義來看,這里的關(guān)系應(yīng)該是一個key到State的映射,而在使用時卻沒有看到這個key,另外具體的狀態(tài)是如何保存的,本節(jié)我們來深入分析
從上圖可以看出,各種不同的State,是通過KeyedStateStore來獲取到的,KeyedStateStore只是一個代理類,其底層是調(diào)用KeyedStateBackend來負責具體的處理,具體的實現(xiàn)類有如下2個
- HeapKeyedStateBackend: 基于內(nèi)存的StateBackend,把數(shù)據(jù)保存在Java的Heap里面
- RocksDBKeyedStateBackend: 狀態(tài)數(shù)據(jù)保存在RocksDB中
下面我們從如下幾個功能點來看具體各個StateBackend的實現(xiàn)
- 各個實例的管理
- 內(nèi)部數(shù)據(jù)保存
- 數(shù)據(jù)過期處理
- 數(shù)據(jù)快照處理
- 數(shù)據(jù)重分布
HeapKeyedStateBackend
基于內(nèi)存的StateBackend,將數(shù)據(jù)保存在Java的Heap中,適合小數(shù)據(jù)狀態(tài)場景
各個實例的管理
private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
使用一個Map來保存各個不同的KeyedState,key為定義的名字,StateTable中存儲的具體的數(shù)據(jù)。StateTable的內(nèi)部在下面內(nèi)部數(shù)據(jù)保存中介紹
- 內(nèi)部數(shù)據(jù)保存 內(nèi)部數(shù)據(jù)存儲是通過StateTable來管理的,里面定義了一個StateMap的數(shù)組來存儲具體的數(shù)據(jù),具體的數(shù)據(jù)通過計算key的KeyGroupIndex來確定把數(shù)據(jù)存放到哪個StateMap。
- StateMap:存儲數(shù)據(jù),具體的實現(xiàn)有CopyOnWriteStateMap和CopyOnWriteSkipListStateMap2個。
- KeyGroup:將key分組管理,這樣方便在后續(xù)通過savepoint來恢復(fù)任務(wù)時如果調(diào)整了并行度,這樣方便對key按KeyGroup重新分布。keyGroup的計算為將key做hash處理后按最大并行度(maxParallelism)取余
- 數(shù)據(jù)過期處理 如果有設(shè)置了數(shù)據(jù)過期處理,這種生成的State會每個value都帶上一個時間戳數(shù)據(jù),如MapState是每個value都帶一個時間戳,具體的實例類型也會不同,對應(yīng)為如下幾個
- TtlValueState
- TtlListState
- TtlReducingState
- TtlAggregatingState
- TtlMapState 具體數(shù)據(jù)的清理處理通過TtlIncrementalCleanup類來實現(xiàn)
- 數(shù)據(jù)快照處理 HeapKeyedStateBackend的snapshot處理由類HeapSnapshotStrategy來負責,調(diào)用的方法為asyncSnapshot(),只支持全量的快照處理。
- 數(shù)據(jù)重分布 在實際的數(shù)據(jù)處理中由于并行度設(shè)置的不合理,在日常運維過程中會涉及到并行度的調(diào)整,算子的并行度調(diào)整后,那對key的分布也會進行調(diào)整,這樣就會導(dǎo)致keyedState的數(shù)據(jù)進行重分布。Flink中引入了一個KeyGroup的概念,其對key做了個分組管理,KeyGroup的個數(shù)為最大并行度,具體實現(xiàn)為將key進行hash后然分類(hash值對KeyGroup數(shù)取余)到其中一個KeyGroup中。
RocksDBKeyedStateBackend
使用RocksDB來存儲狀態(tài),這個State backend可以存儲非常大的狀態(tài),如果超過了內(nèi)存可以split到磁盤上。同樣我們分如下幾個階段來了解相關(guān)具體的實現(xiàn)
- 各個實例的管理 通過如下的Map來存儲定義的各個狀態(tài)信息,和前面HeapKeyedStateBackend類似,key為自己定義的名稱
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
- 內(nèi)部數(shù)據(jù)保存 從上面可以看到每個key對應(yīng)的value是RocksDbKvStateInfo,這個其中有如下2個屬性 ColumnFamilyHandle:rocksdb庫中的類,提供了一個handle對應(yīng)到rocksdb的ColumnFamily。
public static class RocksDbKvStateInfo implements AutoCloseable { public final ColumnFamilyHandle columnFamilyHandle; public final RegisteredStateMetaInfoBase metaInfo; }
- 數(shù)據(jù)過期處理 通過RocksDB的CompactionFilter[1]功能來實現(xiàn)數(shù)據(jù)過期的處理,RocksDB項目中專門有個FlinkCompactionFilter的類用于Flink項目 CompactionFilter提供了一種在rocksdb進行compaction時候,根據(jù)自定義邏輯去刪除/修改key/value對的方法。如根據(jù)業(yè)務(wù)的ttl屬性刪除過期keys。
- 數(shù)據(jù)快照處理 RocksDBKeyedStateBackend支持2種快照處理方式,全量和增量,由于RocksDB底層是使用lsm樹來進行存儲的,所以比較方便實現(xiàn)增量數(shù)據(jù)的獲取
- 數(shù)據(jù)恢復(fù)處理 和前面HeapKeyedStateBackend的類似,這里就不再展開了。
OperatorState
接下面我們深入了解下OperatorState,在實際使用中如我們消費了kafka的數(shù)據(jù)需要記錄kafka消費的offset,還有一些場景需要將一些信息分發(fā)到所有的任務(wù)。這里需要使用到一類新的狀態(tài)處理,這種狀態(tài)是與每個算子綁定的,F(xiàn)link提供了如下3個類來支持
- ListState:通過一個list來保存相關(guān)的狀態(tài)信息,如果并行度調(diào)整了,其中的數(shù)據(jù)按新的并行度重新進行分布處理;
- UnionListState: 保存數(shù)據(jù)時和ListState類似,但是在出錯和從savepoint恢復(fù)數(shù)據(jù)時的策略會不一樣,其會將所有的狀態(tài)通過廣播的方式發(fā)給下游的每個任務(wù),然后下游的任務(wù)來選擇自己需要的部分;
- BroadcastState: 用于每個任務(wù)上的狀態(tài)都要一樣的場景,這個在數(shù)據(jù)恢復(fù)時是把狀態(tài)復(fù)制到所有的任務(wù)。
類似KeyedState,這里也是通過一個Backend來管理對應(yīng)的狀態(tài)數(shù)據(jù),其接口定義為:OperatorStateStore。其實現(xiàn)類目前只有一個:DefaultOperatorStateBackend,我們也通過以下幾個部分來分別了解DefaultOperatorStateBackend的實現(xiàn)
- 各個實例的管理
private final Map<String, PartitionableListState<?>> registeredOperatorStates; private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
這里通過2個Map來分別管理ListState(含UnionListState,后面都統(tǒng)一使用ListState來代指)和BroadcastState。 2. 內(nèi)部數(shù)據(jù)保存 ListState的實現(xiàn)類為PartitionableListState,底層通過一個ArrayList來保存數(shù)據(jù)。BroadcastState定義的接口是kv的,所以其實現(xiàn)類HeapBroadcastState使用Map來存儲相應(yīng)的數(shù)據(jù)。 3. 數(shù)據(jù)過期處理 operatorState不涉及到數(shù)據(jù)過期的處理 4. 數(shù)據(jù)快照處理 DefaultOperatorStateBackendSnapshotStrategy類來負責具體的快照處理,調(diào)用的方法為asyncSnapshot,分別對ListState和BroadcastState進行快照處理 5. 數(shù)據(jù)重分布 數(shù)據(jù)重分布的策略前面介紹各個State時已經(jīng)介紹了,這里就不再重復(fù)介紹了。
上層封裝
Flink中對狀態(tài)的backend和checkpoint存儲策略進行了封裝定義。 StateBackend:定義一個Streaming應(yīng)用的state如何在集群中的本地存儲。不同的實現(xiàn)使用不同的數(shù)據(jù)結(jié)構(gòu)來存儲對應(yīng)的狀態(tài)。其具體實現(xiàn)有如下2個:
- HashMapStateBackend:將狀態(tài)保存在TaskManager的內(nèi)存中(JVM heap),其底層KeyedStateBackend使用的是HeapKeyedStateBackend,OperatorStateBackend使用的是DefaultOperatorStateBackend
- EmbeddedRocksDBStateBackend:狀態(tài)保存在一個嵌入的Rockdb實例中,其底層實現(xiàn)KeyedStateBackend使用的是RocksDBKeyedStateBackend,OperatorStateBackend使用的和HashMapStateBackend一致,也是DefaultOperatorStateBackend 說明:原來的RocksDBStateBackend、FsStateBackend和MemoryStateBackend不建議使用了。 另外對checkpoint的存儲,CheckpointStorage接口定義了在Streaming應(yīng)用中StateBackend在容錯性方面如何存儲其狀態(tài)數(shù)據(jù)。其實現(xiàn)有如下2個
- JobManagerCheckpointStorage:將checkpoints狀態(tài)數(shù)據(jù)存儲到JobManager的內(nèi)存中,savepoints保存到文件系統(tǒng)
- FileSystemCheckpointStorage:將checkpoints狀態(tài)存儲到文件系統(tǒng),如保存到HDFS上。
總結(jié)
本篇我們了解了Flink的相關(guān)的狀態(tài)的內(nèi)容和checkpoint的保存策略。
- State分為KeyedState和OperatorState,KeyedState主要存儲針對記錄級別的狀態(tài),如window操作時的狀態(tài)。OperatorState主要存儲針對算子的狀態(tài),如消費kafka的offset信息等;每類狀態(tài)分別提供了不同種類的狀態(tài)類來支持不同場景的狀態(tài)保存需求
- Flink底層通過StateBackend來保存對應(yīng)的狀態(tài)數(shù)據(jù),主要有通過內(nèi)存保存和使用RocksDB保存這2類,另外在其具體實現(xiàn)中為了方便并行度調(diào)整后對狀態(tài)的重新拆分處理,引入了KeyGroup的概念
- 在用戶使用層,對上述2種狀態(tài)進行了封裝,接口為:StateBackend,來管理KeyedState和OperatorState,另外將checkpoint的存儲策略從原來的StateBackend中拆分出來。目前支持有基于內(nèi)存和外部存儲的2類checkpoint存儲策略。 最后由于checkpoint的觸發(fā)以及任務(wù)恢復(fù)的處理與整體計算處理比較緊密,這塊等介紹完Flink部署模式后再來詳細梳理checkpoint的處理過程。
參考文檔
以上就是Flink狀態(tài)和容錯源碼解析的詳細內(nèi)容,更多關(guān)于Flink狀態(tài)容錯的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解決 IDEA 創(chuàng)建 Gradle 項目沒有src目錄問題
這篇文章主要介紹了解決 IDEA 創(chuàng)建 Gradle 項目沒有src目錄問題,本文圖文并茂給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2018-06-06spring-cloud入門之spring-cloud-config(配置中心)
這篇文章主要介紹了spring-cloud入門之spring-cloud-config(配置中心),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-01-01Java使用Queryable-pageable實現(xiàn)分頁效果
這篇文章主要為大家介紹了Java如何使用Queryable-pageable從而實現(xiàn)分頁效果,文中的示例代碼簡潔易懂,感興趣的小伙伴可以動手嘗試一下2022-06-06JavaScript中new運算符的實現(xiàn)過程解析
這篇文章主要介紹了JavaScript中new運算符的實現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習或者工作具有一定的參考學(xué)習價值,需要的朋友可以參考下2019-10-10SpringBoot集成Swagger2生成接口文檔的方法示例
我們提供Restful接口的時候,API文檔是尤為的重要,它承載著對接口的定義,描述等,本文主要介紹了SpringBoot集成Swagger2生成接口文檔的方法示例,需要的朋友們下面隨著小編來一起學(xué)習學(xué)習吧2018-12-12SpringMVC xml文件路徑在web.xml中的配置方式
這篇文章主要介紹了SpringMVC xml文件路徑在web.xml中的配置方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09