Golang微服務(wù)框架Kratos實(shí)現(xiàn)分布式任務(wù)隊(duì)列Asynq的方法詳解
Golang微服務(wù)框架Kratos實(shí)現(xiàn)分布式任務(wù)隊(duì)列Asynq
任務(wù)隊(duì)列(Task Queue) 一般用于跨線程或跨計(jì)算機(jī)分配工作的一種機(jī)制。其本質(zhì)是生產(chǎn)者消費(fèi)者模型,生產(chǎn)者發(fā)送任務(wù)到消息隊(duì)列,消費(fèi)者負(fù)責(zé)處理任務(wù)。
任務(wù)隊(duì)列的輸入是稱為 任務(wù)(Task)
的工作單元。專用的工作進(jìn)程不斷監(jiān)視任務(wù)隊(duì)列以查找要執(zhí)行的新工作。
在Golang語言里面,我們有像Asynq和Machinery這樣的類似于 Celery
的分布式任務(wù)隊(duì)列。
什么是任務(wù)隊(duì)列
消息隊(duì)列(Message Queue),一般來說知道的人不少。比如常見的:kafka、Rabbitmq、RocketMQ等。
任務(wù)隊(duì)列(Task Queue),聽說過這個(gè)概念的人不會(huì)太多,清楚它的概念的人怕是更少。
這兩個(gè)概念是有關(guān)系的,他們是怎樣的關(guān)系呢?任務(wù)隊(duì)列(Task Queue)是消息隊(duì)列(Message Queue)的超集。任務(wù)隊(duì)列是構(gòu)建在消息隊(duì)列之上的。消息隊(duì)列是任務(wù)隊(duì)列的一部分。
提起分布式任務(wù)隊(duì)列(Distributed Task Queue),就不得不提 Python
的Celery。故而,下面我們來看Celery的架構(gòu)圖,以此來講解。其他的任務(wù)隊(duì)列也并不會(huì)與之有太大的差異性,基礎(chǔ)的原理是一致的。
在 Celery
的架構(gòu)中,由多臺(tái) Server 發(fā)起 異步任務(wù)(Async Task)
,發(fā)送任務(wù)到 Broker
的隊(duì)列中,其中的 Celery Beat
進(jìn)程可負(fù)責(zé)發(fā)起定時(shí)任務(wù)。當(dāng) Task
到達(dá) Broker
后,會(huì)將其分發(fā)給相應(yīng)的 Celery Worker
進(jìn)行處理。當(dāng) Task
處理完成后,其結(jié)果存儲(chǔ)至 Backend
。
在上述過程中的 Broker
和 Backend
, Celery
并沒有去實(shí)現(xiàn),而是使用了已有的開源實(shí)現(xiàn),例如 RabbitMQ
作為 Broker
提供消息隊(duì)列服務(wù), Redis
作為 Backend
提供結(jié)果存儲(chǔ)服務(wù)。Celery 就像是抽象了消息隊(duì)列架構(gòu)中 Producer
、 Consumer
的實(shí)現(xiàn),將消息隊(duì)列中基本單位 “消息”
抽象成了任務(wù)隊(duì)列中的“任務(wù)”,并將異步、定時(shí)任務(wù)的發(fā)起和結(jié)果存儲(chǔ)等操作進(jìn)行了封裝,讓開發(fā)者可以忽略 AMQP、RabbitMQ 等實(shí)現(xiàn)細(xì)節(jié),為開發(fā)帶來便利。
綜上所述,Celery 作為任務(wù)隊(duì)列是基于消息隊(duì)列的進(jìn)一步封裝,其實(shí)現(xiàn)依賴消息隊(duì)列。
任務(wù)隊(duì)列的應(yīng)用場(chǎng)景
我們現(xiàn)在知道了任務(wù)隊(duì)列是什么,也知道了它的工作原理。但是,我們并不知道它可以用來做什么。下面,我們就來看看,它到底用在什么樣的場(chǎng)景下。
- 分布式任務(wù):可以將任務(wù)分發(fā)到多個(gè)工作者進(jìn)程或機(jī)器上執(zhí)行,以提高任務(wù)處理速度。
- 定時(shí)任務(wù):可以在指定時(shí)間執(zhí)行任務(wù)。例如:每天定時(shí)備份數(shù)據(jù)、日志歸檔、心跳測(cè)試、運(yùn)維巡檢。支持 crontab 定時(shí)模式
- 后臺(tái)任務(wù):可以在后臺(tái)執(zhí)行耗時(shí)任務(wù),例如圖像處理、數(shù)據(jù)分析等,不影響用戶界面的響應(yīng)。
- 解耦任務(wù):可以將任務(wù)與主程序解耦,以提高代碼的可讀性和可維護(hù)性,解耦應(yīng)用程序最直接的好處就是可擴(kuò)展性和并發(fā)性能的提高。支持并發(fā)執(zhí)行任務(wù),同時(shí)支持自動(dòng)動(dòng)態(tài)擴(kuò)展。
- 實(shí)時(shí)處理:可以支持實(shí)時(shí)處理任務(wù),例如即時(shí)通訊、消息隊(duì)列等。
Asynq概述
Asynq是一個(gè)使用Go語言實(shí)現(xiàn)的分布式任務(wù)隊(duì)列和異步處理庫,它由Redis提供支持,它提供了輕量級(jí)的、易于使用的API,并且具有高可擴(kuò)展性和高可定制化性。其作者Ken Hibino,任職于Google。
Asynq主要由以下幾個(gè)組件組成:
- 任務(wù)(Task):需要被異步執(zhí)行的操作;
- 處理器(Processor):負(fù)責(zé)執(zhí)行任務(wù)的工作進(jìn)程;
- 隊(duì)列(Queue):存放待執(zhí)行任務(wù)的隊(duì)列;
- 調(diào)度器(Scheduler):根據(jù)規(guī)則將任務(wù)分配給不同的處理器進(jìn)行執(zhí)行。
通過使用Asynq,我們可以非常輕松的實(shí)現(xiàn)異步任務(wù)處理,同時(shí)還可以提供高效率、高可擴(kuò)展性和高自定義性的處理方案。
Asynq的特點(diǎn)
- 保證至少執(zhí)行一次任務(wù)
- 任務(wù)寫入Redis后可以持久化
- 任務(wù)失敗之后,會(huì)自動(dòng)重試
- worker崩潰自動(dòng)恢復(fù)
- 可是實(shí)現(xiàn)任務(wù)的優(yōu)先級(jí)
- 任務(wù)可以進(jìn)行編排
- 任務(wù)可以設(shè)定執(zhí)行時(shí)間或者最長(zhǎng)可執(zhí)行的時(shí)間
- 支持中間件
- 可以使用 unique-option 來避免任務(wù)重復(fù)執(zhí)行,實(shí)現(xiàn)唯一性
- 支持 Redis Cluster 和 Redis Sentinels 以達(dá)成高可用性
- 作者提供了Web UI & CLI Tool讓大家查看任務(wù)的執(zhí)行情況
Asynq可視化監(jiān)控
Asynq提供了兩種監(jiān)控手段:CLI和Web UI。
命令行工具CLI
go install github.com/hibiken/asynq/tools/asynq@latest
Web UI
Asynqmon是一個(gè)基于Web的工具,用于監(jiān)視管理Asynq的任務(wù)和隊(duì)列,有關(guān)詳細(xì)的信息可以參閱工具的README。
Web UI我們可以通過Docker的方式來進(jìn)行安裝:
docker pull hibiken/asynqmon:latest docker run -d \ --name asynq \ -p 8080:8080 \ hibiken/asynqmon:latest --redis-addr=host.docker.internal:6379
安裝好Web UI之后,我們就可以打開瀏覽器訪問管理后臺(tái)了:http://localhost:8080
- 儀表盤
- 任務(wù)視圖
- 性能
Kratos下實(shí)現(xiàn)分布式任務(wù)隊(duì)列
我們將分布式任務(wù)隊(duì)列以 transport.Server
的形式整合進(jìn)微服務(wù)框架 Kratos
。
目前,go里面有兩個(gè)分布式任務(wù)隊(duì)列可用:
我已經(jīng)對(duì)這兩個(gè)庫進(jìn)行了支持:
創(chuàng)建Kratos服務(wù)端
因?yàn)樗蕾嘡edis,因此,我們可以使用Docker的方式安裝Redis的服務(wù)器:
docker pull bitnami/redis:latest docker run -itd \ --name redis-test \ -p 6379:6379 \ -e ALLOW_EMPTY_PASSWORD=yes \ bitnami/redis:latest
然后,我們需要在項(xiàng)目中安裝Asynq的依賴庫:
go get -u github.com/tx7do/kratos-transport/transport/asynq
接著,我們?cè)诖a當(dāng)中引入庫,并且創(chuàng)建出來 Server
:
import github.com/tx7do/kratos-transport/transport/asynq const ( localRedisAddr = "127.0.0.1:6379" ) ctx := context.Background() srv := asynq.NewServer( asynq.WithAddress(localRedisAddr), ) if err := srv.Start(ctx); err != nil { panic(err) } defer srv.Stop(ctx)
注冊(cè)任務(wù)回調(diào)
const ( testTask1 = "test_task_1" testDelayTask = "test_delay_task" testPeriodicTask = "test_periodic_task" ) type DelayTask struct { Message string `json:"message"` } func DelayTaskBinder() Any { return &DelayTask{} } func handleTask1(taskType string, taskData *DelayTask) error { LogInfof("Task Type: [%s], Payload: [%s]", taskType, taskData.Message) return nil } func handleDelayTask(taskType string, taskData *DelayTask) error { LogInfof("Delay Task Type: [%s], Payload: [%s]", taskType, taskData.Message) return nil } func handlePeriodicTask(taskType string, taskData *DelayTask) error { LogInfof("Periodic Task Type: [%s], Payload: [%s]", taskType, taskData.Message) return nil } var err error err = srv.RegisterSubscriber(testTask1, func(taskType string, payload MessagePayload) error { switch t := payload.(type) { case *DelayTask: return handleTask1(taskType, t) default: LogError("invalid payload struct type:", t) return errors.New("invalid payload struct type") } }, DelayTaskBinder, ) err = srv.RegisterSubscriber(testDelayTask, func(taskType string, payload MessagePayload) error { switch t := payload.(type) { case *DelayTask: return handleDelayTask(taskType, t) default: LogError("invalid payload struct type:", t) return errors.New("invalid payload struct type") } }, DelayTaskBinder, ) err = srv.RegisterSubscriber(testPeriodicTask, func(taskType string, payload MessagePayload) error { switch t := payload.(type) { case *DelayTask: return handlePeriodicTask(taskType, t) default: LogError("invalid payload struct type:", t) return errors.New("invalid payload struct type") } }, DelayTaskBinder, )
此步驟,相當(dāng)于是異步隊(duì)列中訂閱了某一類型任務(wù)。最終它由 asynq.Server
來執(zhí)行。
創(chuàng)建新任務(wù)
新建任務(wù),有兩個(gè)方法: NewTask
和 NewPeriodicTask
,內(nèi)部分別對(duì)應(yīng)著 asynq.Client
和 asynq.Scheduler
。
NewTask
是通過 asynq.Client
將任務(wù)直接入了隊(duì)列。
普通任務(wù)
普通任務(wù)通常是入列后立即執(zhí)行的(如果不需要排隊(duì)的),下面就是最簡(jiǎn)單的任務(wù),一個(gè)類型(Type),一個(gè)負(fù)載數(shù)據(jù)(Payload)就構(gòu)成了一個(gè)最簡(jiǎn)單的任務(wù):
err = srv.NewTask(testTask1, &DelayTask{Message: "delay task"}, )
當(dāng)然,你也可以添加一些的參數(shù),比如重試次數(shù)、超時(shí)時(shí)間、過期時(shí)間等……
// 最多重試3次,10秒超時(shí),20秒后過期 err = srv.NewTask(testTask1, &DelayTask{Message: "delay task"}, asynq.MaxRetry(10), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)), )
延遲任務(wù)(Delay Task)
延遲任務(wù),顧名思義,也就是推遲到指定時(shí)間執(zhí)行的任務(wù),我們可以有兩個(gè)參數(shù)可以注入: ProcessAt
和 ProcessIn
。
ProcessIn
指的是從現(xiàn)在開始推遲多少時(shí)間執(zhí)行:
// 3秒后執(zhí)行 err = srv.NewTask(testDelayTask, &DelayTask{Message: "delay task"}, asynq.ProcessIn(3*time.Second), )
ProcessAt
指的是在指定的某一個(gè)具體時(shí)間執(zhí)行:
// 1小時(shí)后的時(shí)間點(diǎn)執(zhí)行 oneHourLater := now.Add(time.Hour) err = srv.NewTask(testDelayTask, &DelayTask{Message: "delay task"}, asynq.ProcessAt(oneHourLater), )
周期性任務(wù)(Periodic Task)
周期性任務(wù) asynq.Scheduler
內(nèi)部是通過Crontab來實(shí)現(xiàn)定時(shí)的,定時(shí)器到點(diǎn)之后,就調(diào)度任務(wù)。它默認(rèn)使用的是UTC時(shí)區(qū)。
// 每分鐘執(zhí)行一次 _, err = srv.NewPeriodicTask( "*/1 * * * ?", testPeriodicTask, &DelayTask{Message: "periodic task"}, )
需要注意的是,若要保證周期性任務(wù)的持續(xù)調(diào)度執(zhí)行, asynq.Scheduler
必須要一直運(yùn)行著,否則調(diào)度將不會(huì)發(fā)生。調(diào)度器本身不參與任務(wù)的執(zhí)行,但是沒有它的存在,調(diào)度將不不復(fù)存在,也不會(huì)發(fā)生。
示例代碼
示例代碼可以在單元測(cè)試代碼中找到:kratos-transport/transport/asynq/server_test.go at main · tx7do/kratos-transport · GitHub
以上就是Golang微服務(wù)框架Kratos實(shí)現(xiàn)分布式任務(wù)隊(duì)列Asynq的方法詳解的詳細(xì)內(nèi)容,更多關(guān)于Golang Kratos實(shí)現(xiàn)Asynq的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
go通過benchmark對(duì)代碼進(jìn)行性能測(cè)試詳解
在開發(fā)中我們要想編寫高性能的代碼,或者優(yōu)化代碼的性能時(shí),你首先得知道當(dāng)前代碼的性能,在go中可以使用testing包的benchmark來做基準(zhǔn)測(cè)試 ,文中有詳細(xì)的代碼示例,感興趣的小伙伴可以參考一下2023-04-04