基于golang的輕量級工作流框架Fastflow
Fastflow 是什么?用一句話來定義它:一個 基于golang協(xié)程
、支持水平擴容
的分布式高性能工作流框架
。
它具有以下特點:
- 易用性:工作流模型基于 DAG 來定義,同時還提供開箱即用的 API,你可以隨時通過 API 創(chuàng)建、運行、暫停工作流等,在開發(fā)新的原子能力時還提供了開箱即用的分布式鎖功能
- 高性能:得益于 golang 的協(xié)程 與 channel 技術(shù),fastflow 可以在單實例上并行執(zhí)行數(shù)百、數(shù)千乃至數(shù)萬個任務(wù)
- 可觀測性:fastflow 基于 Prometheus 的 metrics 暴露了當前實例上的任務(wù)執(zhí)行信息,比如并發(fā)任務(wù)數(shù)、任務(wù)分發(fā)時間等。
- 可伸縮性:支持水平伸縮,以克服海量任務(wù)帶來的單點瓶頸,同時通過選舉 Leader 節(jié)點來保障各個節(jié)點的負載均衡
- 可擴展性:fastflow 準備了部分開箱即用的任務(wù)操作,比如 http請求、執(zhí)行腳本等,同時你也可以自行定義新的節(jié)點動作,同時你可以根據(jù)上下文來決定是否跳過節(jié)點(skip)
- 輕量:它僅僅是一個基礎(chǔ)框架,而不是一個完整的產(chǎn)品,這意味著你可以將其很低成本融入到遺留項目而無需部署、依賴另一個項目,這既是它的優(yōu)點也是缺點——當你真的需要一個開箱即用的產(chǎn)品時(比如 airflow),你仍然需要少量的代碼開發(fā)才能使用
為什么要開發(fā) Fastflow
組內(nèi)有很多項目都涉及復雜的任務(wù)流場景,比如離線任務(wù),集群上下架,容器遷移等,這些場景都有幾個共同的特點:
流程耗時且步驟復雜,比如創(chuàng)建一個 k8s 集群,需要幾十步操作,其中包含腳本執(zhí)行、接口調(diào)用等,且相互存在依賴關(guān)系。
任務(wù)量巨大,比如容器平臺每天都會有幾十萬的離線任務(wù)需要調(diào)度執(zhí)行、再比如我們管理數(shù)百個K8S集群,幾乎每天會有集群需要上下節(jié)點、遷移容器等。
我們嘗試過各種解法:
- 硬編碼實現(xiàn):雖然工作量較小,但是只能滿足某個場景下的特定工作流,沒有可復用性。
- airflow:我們最開始的離線任務(wù)引擎就是基于這個來實現(xiàn)的,不得不承認它的功能很全,也很方便,但是存在幾個問題
- 由 python 編寫的,我們希望團隊維護的項目能夠統(tǒng)一語言,更有助于提升工作效率,雖然對一個有經(jīng)驗的程序員來說多語言并不是問題,但是頻繁地在多個語言間來回切換其實是不利于高效工作的
- airflow 的任務(wù)執(zhí)行是以 進程 來運行的,雖然有更好的隔離性,但是顯然因此而犧牲了性能和并發(fā)度。
- 公司內(nèi)的工作流平臺:你可能想象不到一個世界前十的互聯(lián)網(wǎng)公司,他們內(nèi)部一個經(jīng)歷了數(shù)年線上考證的運維用工作流平臺,會脆弱到承受不了上百工作流的并發(fā),第一次壓測就直接讓他們的服務(wù)癱瘓,進而影響到其他業(yè)務(wù)的運維任務(wù)。據(jù)團隊反饋稱是因為我們的工作流組成太復雜,一個流包含數(shù)十個任務(wù)節(jié)點才導致了這次意外的服務(wù)過載,隨后半年這個團隊重寫了一個新的v2版本。
當然 Github 上也還有其他的任務(wù)流引擎,我們也都評估過,無法滿足需求。比如 kubeflow 是基于 Pod 執(zhí)行任務(wù)的,比起 進程
更為重量,還有一些項目,要么就是沒有經(jīng)過海量數(shù)據(jù)的考驗,要么就是沒有考慮可伸縮性,面對大量任務(wù)的執(zhí)行無法水平擴容。
Concept
工作流模型
fastflow 的工作流模型基于 DAG(Directed acyclic graph),下圖是一個簡單的 DAG 示意圖:
在這個圖中,首先 A 節(jié)點所定義的任務(wù)會被執(zhí)行,當 A 執(zhí)行完畢后,B、C兩個節(jié)點所定義的任務(wù)將同時被觸發(fā),而只有 B、C 兩個節(jié)點都執(zhí)行成功后,最后的 D 節(jié)點才會被觸發(fā),這就是 fastflow 的工作流模型。
工作流的要素
fastflow 執(zhí)行任務(wù)的過程會涉及到幾個概念:Dag, Task, Action, DagInstance
Dag
描述了一個完整流程,它的每個節(jié)點被稱為 Task
,它定義了各個 Task 的執(zhí)行順序和依賴關(guān)系,你可以通過編程
or yaml
來定義它
一個編程式定義的DAG
dag := &entity.Dag{ BaseInfo: entity.BaseInfo{ ID: "test-dag", }, Name: "test", Tasks: []entity.Task{ {ID: "task1", ActionName: "PrintAction"}, {ID: "task2", ActionName: "PrintAction", DependOn: []string{"task1"}}, {ID: "task3", ActionName: "PrintAction", DependOn: []string{"task2"}}, }, }
對應(yīng)的yaml如下:
id: "test-dag" name: "test" tasks: - id: "task1" actionName: "PrintAction" - id: ["task2"] actionName: "PrintAction" dependOn: ["task1"] - id: "task3" actionName: "PrintAction" dependOn: ["task2"]
同時 Dag 可以定義這個工作流所需要的參數(shù),以便于在各個 Task 去消費它:
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" filePath: desc: "the file path" defaultValue: "/tmp/" tasks: - id: "task1" actionName: "PrintAction" params: writeName: "{{fileName}}" writePath: "{{filePath}}"
Task
它定義了這個節(jié)點的具體工作,比如是要發(fā)起一個 http 請求,或是執(zhí)行一段腳本等,這些不同動作都通過選擇不同的 Action
來實現(xiàn),同時它也可以定義在何種條件下需要跳過 or 阻塞該節(jié)點。
下面這段yaml演示了 Task 如何根據(jù)某些條件來跳過運行該節(jié)點。
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" tasks: - id: "task1" actionName: "PrintAction" preCheck: - act: skip #you can set "skip" or "block" conditions: - source: vars # source could be "vars" or "share-data" key: "fileName" op: "in" values: ["warn.txt", "error.txt"]
Task 的狀態(tài)有以下幾個:
- init: Task已經(jīng)初始化完畢,等待執(zhí)行
- running: 正在運行中
- ending: 當執(zhí)行 Action 的 Run 所定義的內(nèi)容后,會進入到該狀態(tài)
- retrying: 任務(wù)重試中
- failed: 執(zhí)行失敗
- success: 執(zhí)行成功
- blocked: 任務(wù)已阻塞,需要人工啟動
- skipped: 任務(wù)已跳過
Action
Action 是工作流的核心,定義了該節(jié)點將執(zhí)行什么操作,fastflow攜帶了一些開箱即用的Action,但是一般你都需要根據(jù)具體的業(yè)務(wù)場景自行編寫,它有幾個關(guān)鍵屬性:
- Name: Required Action的名稱,不可重復,它是與 Task 關(guān)聯(lián)的核心
- Run: Required 需要執(zhí)行的動作,fastflow 將確保該動作僅會被執(zhí)行 一次(ExactlyOnce)
- RunBefore: Optional 在執(zhí)行 Run 之前運行,如果有一些前置動作,可以在這里執(zhí)行,RunBefore 有可能會被執(zhí)行多次。
- RunAfter: Optional 在執(zhí)行 Run 之后運行,一些長時間執(zhí)行的任務(wù)內(nèi)容建議放在這里,只要 Task 尚未結(jié)束,節(jié)點發(fā)生故障重啟時仍然會繼續(xù)執(zhí)行這部分內(nèi)容,
- RetryBefore:Optional 在重試失敗的任務(wù)節(jié)點,可以提前執(zhí)行一些清理的動作
自行開發(fā)的 Action 在使用前都必須先注冊到 fastflow,如下所示:
type PrintParams struct { Key string Value string } type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { cinput := params.(*ActionParam) fmt.Println("action start: ", time.Now()) fmt.Println(fmt.Sprintf("params: key[%s] value[%s]", cinput.Key, cinput.Value)) return nil } func (a *PrintAction) ParameterNew() interface{} { return &PrintParams{} } func main() { ... // Register action fastflow.RegisterAction([]run.Action{ &PrintAction{}, }) ... }
DagInstance
當你開始運行一個 Dag 后,則會為本次執(zhí)行生成一個執(zhí)行記錄,它被稱為 DagInstance
,當它生成以后,會由 Leader 實例將其分發(fā)到一個健康的 Worker,再由其解析、執(zhí)行。
實例類型與Module
首先 fastflow 是一個分布式的框架,意味著你可以部署多個實例來分擔負載,而實例被分為兩類角色:
- Leader:此類實例在運行過程中只會存在一個,從 Worker 中進行選舉而得出,它負責給 Worker 實例分發(fā)任務(wù),也會監(jiān)聽長時間得不到執(zhí)行的任務(wù)將其調(diào)度到其他節(jié)點等
- Worker:此類實例會存在復數(shù)個,它們負責解析 DAG 工作流并以
協(xié)程
執(zhí)行其中的任務(wù)
而不同節(jié)點能夠承擔不同的功能,其背后是不同的 模塊
在各司其職,不同節(jié)點所運行的模塊如下圖所示:
NOTE
- Leader 實例本質(zhì)上是一個承擔了
仲裁者
角色的 Worker,因此它也會分擔工作負載。 - 為了實現(xiàn)更均衡的負載,以及獲得更好的可擴展性,fastflow 沒有選擇加鎖競爭的方式來實現(xiàn)工作分發(fā)
從上面的圖看,Leader 實例會比 Worker 實例多運行一些模塊用于執(zhí)行中仲裁者相關(guān)的任務(wù),模塊之間的協(xié)作關(guān)系如下圖所示:
其中各個模塊的職責如下:
- Keeper: 每個節(jié)點都會運行 負責注冊節(jié)點到存儲中,保持心跳,同時也會周期性嘗試競選 Leader,防止上任 Leader 故障后阻塞系統(tǒng),這個模塊同時也提供了 分布式鎖 功能,我們也可以實現(xiàn)不同存儲的 Keeper 來滿足特定的需求,比如 Etcd or Zookeepper,目前支持的 Keeper 實現(xiàn)只有 Mongo
- Store: 每個節(jié)點都會運行 負責解耦 Worker 對底層存儲的依賴,通過這個組件,我們可以實現(xiàn)利用 Mongo, Mysql 等來作為 fastflow 的后端存儲,目前僅實現(xiàn)了 Mongo
- Parser:Worker 節(jié)點運行 負責監(jiān)聽分發(fā)到自己節(jié)點的任務(wù),然后將其 DAG 結(jié)構(gòu)重組為一顆 Task 樹,并渲染好各個任務(wù)節(jié)點的輸入,接下來通知 Executor 模塊開始執(zhí)行 Task
- Commander:每個節(jié)點都會運行 負責封裝一些常見的指令,如停止、重試、繼續(xù)等,下發(fā)到節(jié)點去運行
- Executor: Worker 節(jié)點運行 按照 Parser 解析好的 Task 樹以 goroutine 運行單個的 Task
- Dispatcher:Leader節(jié)點才會運行 負責監(jiān)聽等待執(zhí)行的 DAG,并根據(jù) Worker 的健康狀況均勻地分發(fā)任務(wù)
- WatchDog:Leader節(jié)點才會運行 負責監(jiān)聽執(zhí)行超時的 Task 將其更新為失敗,同時也會重新調(diào)度那些一直得不到執(zhí)行的 DagInstance 到其他 Worker
Tips
以上模塊的分布機制僅僅只是 fastflow 的默認實現(xiàn),你也可以自行決定實例運行的模塊,比如在 Leader 上不再運行 Worker 的實例,讓其專注于任務(wù)調(diào)度。
GetStart
更多例子請參考項目下面的
examples
目錄
準備一個Mongo實例
如果已經(jīng)你已經(jīng)有了可測試的實例,可以直接替換為你的實例,如果沒有的話,可以使用Docker容器在本地跑一個,指令如下:
docker run -d --name fastflow-mongo --network host mongo
運行 fastflow
運行以下示例
package main import ( "fmt" "log" "time" "github.com/shiningrush/fastflow" mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo" "github.com/shiningrush/fastflow/pkg/entity/run" "github.com/shiningrush/fastflow/pkg/mod" mongoStore "github.com/shiningrush/fastflow/store/mongo" ) type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { fmt.Println("action start: ", time.Now()) return nil } func main() { // Register action fastflow.RegisterAction([]run.Action{ &PrintAction{}, }) // init keeper, it used to e keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{ Key: "worker-1", // if your mongo does not set user/pwd, youshould remove it ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := keeper.Init(); err != nil { log.Fatal(fmt.Errorf("init keeper failed: %w", err)) } // init store st := mongoStore.NewStore(&mongoStore.StoreOption{ // if your mongo does not set user/pwd, youshould remove it ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := st.Init(); err != nil { log.Fatal(fmt.Errorf("init store failed: %w", err)) } go createDagAndInstance() // start fastflow if err := fastflow.Start(&fastflow.InitialOption{ Keeper: keeper, Store: st, // use yaml to define dag ReadDagFromDir: "./", }); err != nil { panic(fmt.Sprintf("init fastflow failed: %s", err)) } } func createDagAndInstance() { // wait fast start completed time.Sleep(time.Second) // run some dag instance for i := 0; i < 10; i++ { _, err := mod.GetCommander().RunDag("test-dag", nil) if err != nil { log.Fatal(err) } time.Sleep(time.Second * 10) } }
程序運行目錄下的test-dag.yaml
id: "test-dag" name: "test" tasks: - id: "task1" actionName: "PrintAction" - id: "task2" actionName: "PrintAction" dependOn: ["task1"] - id: "task3" actionName: "PrintAction" dependOn: ["task2"]
Basic
Task與Task之間的通信
由于任務(wù)都是基于 goroutine
來執(zhí)行,因此任務(wù)之間的 context
是共享的,意味著你完全可以使用以下的代碼:
func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.WithValue("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.Context().Value("key") return nil }
但是注意這樣做有個弊端:當節(jié)點重啟時,如果任務(wù)尚未執(zhí)行完畢,那么這部分內(nèi)容會丟失。
如果不想因為故障or升級而丟失你的更改,可以使用 ShareData 來傳遞進行通信,ShareData 是整個 在整個 DagInstance 的生命周期都會共享的一塊數(shù)據(jù)空間,每次對它的寫入都會通過 Store
組件持久化,以確保數(shù)據(jù)不會丟失,用法如下:
func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.ShareData().Set("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.ShareData().Get("key") return nil }
任務(wù)日志
fastflow 還提供了 Task 粒度的日志記錄,這些日志都會通過 Store
組件持久化,用法如下:
func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { ctx.Trace("some message") return nil }
使用Dag變量
上面的文章中提到,我們可以在 Dag 中定義一些變量,在創(chuàng)建工作流時可以對這些變量進行賦值,比如以下的Dag,定義了一個名為 `fileName 的變量
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt"
隨后我們可以使用 Commander
組件來啟動一個具體的工作流:
mod.GetCommander().RunDag("test-id", map[string]string{ "fileName": "demo.txt", })
這樣本次啟動的工作流的變量則被賦值為 demo.txt
,接下來我們有兩種方式去消費它
1.帶參數(shù)的Action
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" tasks: - id: "task1" action: "PrintAction" params: # using {{var}} to consume dag's variable fileName: "{{fileName}}"
PrintAction.go:
type PrintParams struct { FileName string `json:"fileName"` } type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { cinput := params.(*ActionParam) fmt.Println(fmt.Sprintf("params: file[%s]", cinput.FileName, cinput.Value)) return nil } func (a *PrintAction) ParameterNew() interface{} { return &PrintParams{} }
2.編程式讀取
fastflow 也提供了相關(guān)函數(shù)來獲取 Dag 變量
func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { // get variable by name ctx.GetVar("fileName") // iterate variables ctx.IterateVars(func(key, val string) (stop bool) { ... }) return nil }
分布式鎖
如前所述,你可以在直接使用 Keeper
模塊提供的分布式鎖,如下所示:
... mod.GetKeeper().NewMutex("mutex key").Lock(ctx.Context(), mod.LockTTL(time.Second), mod.Reentrant("worker-key1")) ...
其中:
LockTTL
表示你持有該鎖的TTL,到期之后會自動釋放,默認30s
Reentrant
用于需要實現(xiàn)可重入的分布式鎖的場景,作為持有場景的標識,默認為空,表示該鎖不可重入 歡迎轉(zhuǎn)載,注明出處即可。如果你覺得這篇博文幫助到你了,請點下右下角的推薦讓更多人看到它。
到此這篇關(guān)于基于golang的輕量級工作流框架Fastflow的文章就介紹到這了,更多相關(guān)go Fastflow內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang 統(tǒng)計字符串字數(shù)的方法示例
本篇文章主要介紹了Golang 統(tǒng)計字符串字數(shù)的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-05-05詳解Go多協(xié)程并發(fā)環(huán)境下的錯誤處理
這篇文章主要介紹了詳解Go多協(xié)程并發(fā)環(huán)境下的錯誤處理,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-08-08淺析go中Ticker,Timer和Tick的用法與區(qū)別
在go面試的時候,面試官經(jīng)常會問time包的Ticker,Timer以及Tick的區(qū)別,一般在超時控制的時候用的比較多,今天就跟隨小編一起來詳細學一下這幾個的區(qū)別吧2023-10-10