基于golang的輕量級工作流框架Fastflow
Fastflow 是什么?用一句話來定義它:一個(gè) 基于golang協(xié)程
、支持水平擴(kuò)容
的分布式高性能工作流框架
。
它具有以下特點(diǎn):
- 易用性:工作流模型基于 DAG 來定義,同時(shí)還提供開箱即用的 API,你可以隨時(shí)通過 API 創(chuàng)建、運(yùn)行、暫停工作流等,在開發(fā)新的原子能力時(shí)還提供了開箱即用的分布式鎖功能
- 高性能:得益于 golang 的協(xié)程 與 channel 技術(shù),fastflow 可以在單實(shí)例上并行執(zhí)行數(shù)百、數(shù)千乃至數(shù)萬個(gè)任務(wù)
- 可觀測性:fastflow 基于 Prometheus 的 metrics 暴露了當(dāng)前實(shí)例上的任務(wù)執(zhí)行信息,比如并發(fā)任務(wù)數(shù)、任務(wù)分發(fā)時(shí)間等。
- 可伸縮性:支持水平伸縮,以克服海量任務(wù)帶來的單點(diǎn)瓶頸,同時(shí)通過選舉 Leader 節(jié)點(diǎn)來保障各個(gè)節(jié)點(diǎn)的負(fù)載均衡
- 可擴(kuò)展性:fastflow 準(zhǔn)備了部分開箱即用的任務(wù)操作,比如 http請求、執(zhí)行腳本等,同時(shí)你也可以自行定義新的節(jié)點(diǎn)動作,同時(shí)你可以根據(jù)上下文來決定是否跳過節(jié)點(diǎn)(skip)
- 輕量:它僅僅是一個(gè)基礎(chǔ)框架,而不是一個(gè)完整的產(chǎn)品,這意味著你可以將其很低成本融入到遺留項(xiàng)目而無需部署、依賴另一個(gè)項(xiàng)目,這既是它的優(yōu)點(diǎn)也是缺點(diǎn)——當(dāng)你真的需要一個(gè)開箱即用的產(chǎn)品時(shí)(比如 airflow),你仍然需要少量的代碼開發(fā)才能使用
為什么要開發(fā) Fastflow
組內(nèi)有很多項(xiàng)目都涉及復(fù)雜的任務(wù)流場景,比如離線任務(wù),集群上下架,容器遷移等,這些場景都有幾個(gè)共同的特點(diǎn):
流程耗時(shí)且步驟復(fù)雜,比如創(chuàng)建一個(gè) k8s 集群,需要幾十步操作,其中包含腳本執(zhí)行、接口調(diào)用等,且相互存在依賴關(guān)系。
任務(wù)量巨大,比如容器平臺每天都會有幾十萬的離線任務(wù)需要調(diào)度執(zhí)行、再比如我們管理數(shù)百個(gè)K8S集群,幾乎每天會有集群需要上下節(jié)點(diǎn)、遷移容器等。
我們嘗試過各種解法:
- 硬編碼實(shí)現(xiàn):雖然工作量較小,但是只能滿足某個(gè)場景下的特定工作流,沒有可復(fù)用性。
- airflow:我們最開始的離線任務(wù)引擎就是基于這個(gè)來實(shí)現(xiàn)的,不得不承認(rèn)它的功能很全,也很方便,但是存在幾個(gè)問題
- 由 python 編寫的,我們希望團(tuán)隊(duì)維護(hù)的項(xiàng)目能夠統(tǒng)一語言,更有助于提升工作效率,雖然對一個(gè)有經(jīng)驗(yàn)的程序員來說多語言并不是問題,但是頻繁地在多個(gè)語言間來回切換其實(shí)是不利于高效工作的
- airflow 的任務(wù)執(zhí)行是以 進(jìn)程 來運(yùn)行的,雖然有更好的隔離性,但是顯然因此而犧牲了性能和并發(fā)度。
- 公司內(nèi)的工作流平臺:你可能想象不到一個(gè)世界前十的互聯(lián)網(wǎng)公司,他們內(nèi)部一個(gè)經(jīng)歷了數(shù)年線上考證的運(yùn)維用工作流平臺,會脆弱到承受不了上百工作流的并發(fā),第一次壓測就直接讓他們的服務(wù)癱瘓,進(jìn)而影響到其他業(yè)務(wù)的運(yùn)維任務(wù)。據(jù)團(tuán)隊(duì)反饋稱是因?yàn)槲覀兊墓ぷ髁鹘M成太復(fù)雜,一個(gè)流包含數(shù)十個(gè)任務(wù)節(jié)點(diǎn)才導(dǎo)致了這次意外的服務(wù)過載,隨后半年這個(gè)團(tuán)隊(duì)重寫了一個(gè)新的v2版本。
當(dāng)然 Github 上也還有其他的任務(wù)流引擎,我們也都評估過,無法滿足需求。比如 kubeflow 是基于 Pod 執(zhí)行任務(wù)的,比起 進(jìn)程
更為重量,還有一些項(xiàng)目,要么就是沒有經(jīng)過海量數(shù)據(jù)的考驗(yàn),要么就是沒有考慮可伸縮性,面對大量任務(wù)的執(zhí)行無法水平擴(kuò)容。
Concept
工作流模型
fastflow 的工作流模型基于 DAG(Directed acyclic graph),下圖是一個(gè)簡單的 DAG 示意圖:
在這個(gè)圖中,首先 A 節(jié)點(diǎn)所定義的任務(wù)會被執(zhí)行,當(dāng) A 執(zhí)行完畢后,B、C兩個(gè)節(jié)點(diǎn)所定義的任務(wù)將同時(shí)被觸發(fā),而只有 B、C 兩個(gè)節(jié)點(diǎn)都執(zhí)行成功后,最后的 D 節(jié)點(diǎn)才會被觸發(fā),這就是 fastflow 的工作流模型。
工作流的要素
fastflow 執(zhí)行任務(wù)的過程會涉及到幾個(gè)概念:Dag, Task, Action, DagInstance
Dag
描述了一個(gè)完整流程,它的每個(gè)節(jié)點(diǎn)被稱為 Task
,它定義了各個(gè) Task 的執(zhí)行順序和依賴關(guān)系,你可以通過編程
or yaml
來定義它
一個(gè)編程式定義的DAG
dag := &entity.Dag{ BaseInfo: entity.BaseInfo{ ID: "test-dag", }, Name: "test", Tasks: []entity.Task{ {ID: "task1", ActionName: "PrintAction"}, {ID: "task2", ActionName: "PrintAction", DependOn: []string{"task1"}}, {ID: "task3", ActionName: "PrintAction", DependOn: []string{"task2"}}, }, }
對應(yīng)的yaml如下:
id: "test-dag" name: "test" tasks: - id: "task1" actionName: "PrintAction" - id: ["task2"] actionName: "PrintAction" dependOn: ["task1"] - id: "task3" actionName: "PrintAction" dependOn: ["task2"]
同時(shí) Dag 可以定義這個(gè)工作流所需要的參數(shù),以便于在各個(gè) Task 去消費(fèi)它:
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" filePath: desc: "the file path" defaultValue: "/tmp/" tasks: - id: "task1" actionName: "PrintAction" params: writeName: "{{fileName}}" writePath: "{{filePath}}"
Task
它定義了這個(gè)節(jié)點(diǎn)的具體工作,比如是要發(fā)起一個(gè) http 請求,或是執(zhí)行一段腳本等,這些不同動作都通過選擇不同的 Action
來實(shí)現(xiàn),同時(shí)它也可以定義在何種條件下需要跳過 or 阻塞該節(jié)點(diǎn)。
下面這段yaml演示了 Task 如何根據(jù)某些條件來跳過運(yùn)行該節(jié)點(diǎn)。
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" tasks: - id: "task1" actionName: "PrintAction" preCheck: - act: skip #you can set "skip" or "block" conditions: - source: vars # source could be "vars" or "share-data" key: "fileName" op: "in" values: ["warn.txt", "error.txt"]
Task 的狀態(tài)有以下幾個(gè):
- init: Task已經(jīng)初始化完畢,等待執(zhí)行
- running: 正在運(yùn)行中
- ending: 當(dāng)執(zhí)行 Action 的 Run 所定義的內(nèi)容后,會進(jìn)入到該狀態(tài)
- retrying: 任務(wù)重試中
- failed: 執(zhí)行失敗
- success: 執(zhí)行成功
- blocked: 任務(wù)已阻塞,需要人工啟動
- skipped: 任務(wù)已跳過
Action
Action 是工作流的核心,定義了該節(jié)點(diǎn)將執(zhí)行什么操作,fastflow攜帶了一些開箱即用的Action,但是一般你都需要根據(jù)具體的業(yè)務(wù)場景自行編寫,它有幾個(gè)關(guān)鍵屬性:
- Name: Required Action的名稱,不可重復(fù),它是與 Task 關(guān)聯(lián)的核心
- Run: Required 需要執(zhí)行的動作,fastflow 將確保該動作僅會被執(zhí)行 一次(ExactlyOnce)
- RunBefore: Optional 在執(zhí)行 Run 之前運(yùn)行,如果有一些前置動作,可以在這里執(zhí)行,RunBefore 有可能會被執(zhí)行多次。
- RunAfter: Optional 在執(zhí)行 Run 之后運(yùn)行,一些長時(shí)間執(zhí)行的任務(wù)內(nèi)容建議放在這里,只要 Task 尚未結(jié)束,節(jié)點(diǎn)發(fā)生故障重啟時(shí)仍然會繼續(xù)執(zhí)行這部分內(nèi)容,
- RetryBefore:Optional 在重試失敗的任務(wù)節(jié)點(diǎn),可以提前執(zhí)行一些清理的動作
自行開發(fā)的 Action 在使用前都必須先注冊到 fastflow,如下所示:
type PrintParams struct { Key string Value string } type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { cinput := params.(*ActionParam) fmt.Println("action start: ", time.Now()) fmt.Println(fmt.Sprintf("params: key[%s] value[%s]", cinput.Key, cinput.Value)) return nil } func (a *PrintAction) ParameterNew() interface{} { return &PrintParams{} } func main() { ... // Register action fastflow.RegisterAction([]run.Action{ &PrintAction{}, }) ... }
DagInstance
當(dāng)你開始運(yùn)行一個(gè) Dag 后,則會為本次執(zhí)行生成一個(gè)執(zhí)行記錄,它被稱為 DagInstance
,當(dāng)它生成以后,會由 Leader 實(shí)例將其分發(fā)到一個(gè)健康的 Worker,再由其解析、執(zhí)行。
實(shí)例類型與Module
首先 fastflow 是一個(gè)分布式的框架,意味著你可以部署多個(gè)實(shí)例來分擔(dān)負(fù)載,而實(shí)例被分為兩類角色:
- Leader:此類實(shí)例在運(yùn)行過程中只會存在一個(gè),從 Worker 中進(jìn)行選舉而得出,它負(fù)責(zé)給 Worker 實(shí)例分發(fā)任務(wù),也會監(jiān)聽長時(shí)間得不到執(zhí)行的任務(wù)將其調(diào)度到其他節(jié)點(diǎn)等
- Worker:此類實(shí)例會存在復(fù)數(shù)個(gè),它們負(fù)責(zé)解析 DAG 工作流并以
協(xié)程
執(zhí)行其中的任務(wù)
而不同節(jié)點(diǎn)能夠承擔(dān)不同的功能,其背后是不同的 模塊
在各司其職,不同節(jié)點(diǎn)所運(yùn)行的模塊如下圖所示:
NOTE
- Leader 實(shí)例本質(zhì)上是一個(gè)承擔(dān)了
仲裁者
角色的 Worker,因此它也會分擔(dān)工作負(fù)載。 - 為了實(shí)現(xiàn)更均衡的負(fù)載,以及獲得更好的可擴(kuò)展性,fastflow 沒有選擇加鎖競爭的方式來實(shí)現(xiàn)工作分發(fā)
從上面的圖看,Leader 實(shí)例會比 Worker 實(shí)例多運(yùn)行一些模塊用于執(zhí)行中仲裁者相關(guān)的任務(wù),模塊之間的協(xié)作關(guān)系如下圖所示:
其中各個(gè)模塊的職責(zé)如下:
- Keeper: 每個(gè)節(jié)點(diǎn)都會運(yùn)行 負(fù)責(zé)注冊節(jié)點(diǎn)到存儲中,保持心跳,同時(shí)也會周期性嘗試競選 Leader,防止上任 Leader 故障后阻塞系統(tǒng),這個(gè)模塊同時(shí)也提供了 分布式鎖 功能,我們也可以實(shí)現(xiàn)不同存儲的 Keeper 來滿足特定的需求,比如 Etcd or Zookeepper,目前支持的 Keeper 實(shí)現(xiàn)只有 Mongo
- Store: 每個(gè)節(jié)點(diǎn)都會運(yùn)行 負(fù)責(zé)解耦 Worker 對底層存儲的依賴,通過這個(gè)組件,我們可以實(shí)現(xiàn)利用 Mongo, Mysql 等來作為 fastflow 的后端存儲,目前僅實(shí)現(xiàn)了 Mongo
- Parser:Worker 節(jié)點(diǎn)運(yùn)行 負(fù)責(zé)監(jiān)聽分發(fā)到自己節(jié)點(diǎn)的任務(wù),然后將其 DAG 結(jié)構(gòu)重組為一顆 Task 樹,并渲染好各個(gè)任務(wù)節(jié)點(diǎn)的輸入,接下來通知 Executor 模塊開始執(zhí)行 Task
- Commander:每個(gè)節(jié)點(diǎn)都會運(yùn)行 負(fù)責(zé)封裝一些常見的指令,如停止、重試、繼續(xù)等,下發(fā)到節(jié)點(diǎn)去運(yùn)行
- Executor: Worker 節(jié)點(diǎn)運(yùn)行 按照 Parser 解析好的 Task 樹以 goroutine 運(yùn)行單個(gè)的 Task
- Dispatcher:Leader節(jié)點(diǎn)才會運(yùn)行 負(fù)責(zé)監(jiān)聽等待執(zhí)行的 DAG,并根據(jù) Worker 的健康狀況均勻地分發(fā)任務(wù)
- WatchDog:Leader節(jié)點(diǎn)才會運(yùn)行 負(fù)責(zé)監(jiān)聽執(zhí)行超時(shí)的 Task 將其更新為失敗,同時(shí)也會重新調(diào)度那些一直得不到執(zhí)行的 DagInstance 到其他 Worker
Tips
以上模塊的分布機(jī)制僅僅只是 fastflow 的默認(rèn)實(shí)現(xiàn),你也可以自行決定實(shí)例運(yùn)行的模塊,比如在 Leader 上不再運(yùn)行 Worker 的實(shí)例,讓其專注于任務(wù)調(diào)度。
GetStart
更多例子請參考項(xiàng)目下面的
examples
目錄
準(zhǔn)備一個(gè)Mongo實(shí)例
如果已經(jīng)你已經(jīng)有了可測試的實(shí)例,可以直接替換為你的實(shí)例,如果沒有的話,可以使用Docker容器在本地跑一個(gè),指令如下:
docker run -d --name fastflow-mongo --network host mongo
運(yùn)行 fastflow
運(yùn)行以下示例
package main import ( "fmt" "log" "time" "github.com/shiningrush/fastflow" mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo" "github.com/shiningrush/fastflow/pkg/entity/run" "github.com/shiningrush/fastflow/pkg/mod" mongoStore "github.com/shiningrush/fastflow/store/mongo" ) type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { fmt.Println("action start: ", time.Now()) return nil } func main() { // Register action fastflow.RegisterAction([]run.Action{ &PrintAction{}, }) // init keeper, it used to e keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{ Key: "worker-1", // if your mongo does not set user/pwd, youshould remove it ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := keeper.Init(); err != nil { log.Fatal(fmt.Errorf("init keeper failed: %w", err)) } // init store st := mongoStore.NewStore(&mongoStore.StoreOption{ // if your mongo does not set user/pwd, youshould remove it ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := st.Init(); err != nil { log.Fatal(fmt.Errorf("init store failed: %w", err)) } go createDagAndInstance() // start fastflow if err := fastflow.Start(&fastflow.InitialOption{ Keeper: keeper, Store: st, // use yaml to define dag ReadDagFromDir: "./", }); err != nil { panic(fmt.Sprintf("init fastflow failed: %s", err)) } } func createDagAndInstance() { // wait fast start completed time.Sleep(time.Second) // run some dag instance for i := 0; i < 10; i++ { _, err := mod.GetCommander().RunDag("test-dag", nil) if err != nil { log.Fatal(err) } time.Sleep(time.Second * 10) } }
程序運(yùn)行目錄下的test-dag.yaml
id: "test-dag" name: "test" tasks: - id: "task1" actionName: "PrintAction" - id: "task2" actionName: "PrintAction" dependOn: ["task1"] - id: "task3" actionName: "PrintAction" dependOn: ["task2"]
Basic
Task與Task之間的通信
由于任務(wù)都是基于 goroutine
來執(zhí)行,因此任務(wù)之間的 context
是共享的,意味著你完全可以使用以下的代碼:
func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.WithValue("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.Context().Value("key") return nil }
但是注意這樣做有個(gè)弊端:當(dāng)節(jié)點(diǎn)重啟時(shí),如果任務(wù)尚未執(zhí)行完畢,那么這部分內(nèi)容會丟失。
如果不想因?yàn)楣收蟧r升級而丟失你的更改,可以使用 ShareData 來傳遞進(jìn)行通信,ShareData 是整個(gè) 在整個(gè) DagInstance 的生命周期都會共享的一塊數(shù)據(jù)空間,每次對它的寫入都會通過 Store
組件持久化,以確保數(shù)據(jù)不會丟失,用法如下:
func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.ShareData().Set("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.ShareData().Get("key") return nil }
任務(wù)日志
fastflow 還提供了 Task 粒度的日志記錄,這些日志都會通過 Store
組件持久化,用法如下:
func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { ctx.Trace("some message") return nil }
使用Dag變量
上面的文章中提到,我們可以在 Dag 中定義一些變量,在創(chuàng)建工作流時(shí)可以對這些變量進(jìn)行賦值,比如以下的Dag,定義了一個(gè)名為 `fileName 的變量
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt"
隨后我們可以使用 Commander
組件來啟動一個(gè)具體的工作流:
mod.GetCommander().RunDag("test-id", map[string]string{ "fileName": "demo.txt", })
這樣本次啟動的工作流的變量則被賦值為 demo.txt
,接下來我們有兩種方式去消費(fèi)它
1.帶參數(shù)的Action
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" tasks: - id: "task1" action: "PrintAction" params: # using {{var}} to consume dag's variable fileName: "{{fileName}}"
PrintAction.go:
type PrintParams struct { FileName string `json:"fileName"` } type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { cinput := params.(*ActionParam) fmt.Println(fmt.Sprintf("params: file[%s]", cinput.FileName, cinput.Value)) return nil } func (a *PrintAction) ParameterNew() interface{} { return &PrintParams{} }
2.編程式讀取
fastflow 也提供了相關(guān)函數(shù)來獲取 Dag 變量
func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { // get variable by name ctx.GetVar("fileName") // iterate variables ctx.IterateVars(func(key, val string) (stop bool) { ... }) return nil }
分布式鎖
如前所述,你可以在直接使用 Keeper
模塊提供的分布式鎖,如下所示:
... mod.GetKeeper().NewMutex("mutex key").Lock(ctx.Context(), mod.LockTTL(time.Second), mod.Reentrant("worker-key1")) ...
其中:
LockTTL
表示你持有該鎖的TTL,到期之后會自動釋放,默認(rèn)30s
Reentrant
用于需要實(shí)現(xiàn)可重入的分布式鎖的場景,作為持有場景的標(biāo)識,默認(rèn)為空,表示該鎖不可重入 歡迎轉(zhuǎn)載,注明出處即可。如果你覺得這篇博文幫助到你了,請點(diǎn)下右下角的推薦讓更多人看到它。
到此這篇關(guān)于基于golang的輕量級工作流框架Fastflow的文章就介紹到這了,更多相關(guān)go Fastflow內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang實(shí)現(xiàn)京東支付v2版本的示例代碼
這篇文章主要介紹了golang實(shí)現(xiàn)京東支付v2版本,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03Golang 統(tǒng)計(jì)字符串字?jǐn)?shù)的方法示例
本篇文章主要介紹了Golang 統(tǒng)計(jì)字符串字?jǐn)?shù)的方法示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-05-05詳解Go多協(xié)程并發(fā)環(huán)境下的錯(cuò)誤處理
這篇文章主要介紹了詳解Go多協(xié)程并發(fā)環(huán)境下的錯(cuò)誤處理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08淺析go中Ticker,Timer和Tick的用法與區(qū)別
在go面試的時(shí)候,面試官經(jīng)常會問time包的Ticker,Timer以及Tick的區(qū)別,一般在超時(shí)控制的時(shí)候用的比較多,今天就跟隨小編一起來詳細(xì)學(xué)一下這幾個(gè)的區(qū)別吧2023-10-10