Golang微服務框架Kratos實現(xiàn)分布式任務隊列Asynq的方法詳解
Golang微服務框架Kratos實現(xiàn)分布式任務隊列Asynq
任務隊列(Task Queue) 一般用于跨線程或跨計算機分配工作的一種機制。其本質是生產(chǎn)者消費者模型,生產(chǎn)者發(fā)送任務到消息隊列,消費者負責處理任務。
任務隊列的輸入是稱為 任務(Task)
的工作單元。專用的工作進程不斷監(jiān)視任務隊列以查找要執(zhí)行的新工作。
在Golang語言里面,我們有像Asynq和Machinery這樣的類似于 Celery
的分布式任務隊列。
什么是任務隊列
消息隊列(Message Queue),一般來說知道的人不少。比如常見的:kafka、Rabbitmq、RocketMQ等。
任務隊列(Task Queue),聽說過這個概念的人不會太多,清楚它的概念的人怕是更少。
這兩個概念是有關系的,他們是怎樣的關系呢?任務隊列(Task Queue)是消息隊列(Message Queue)的超集。任務隊列是構建在消息隊列之上的。消息隊列是任務隊列的一部分。
提起分布式任務隊列(Distributed Task Queue),就不得不提 Python
的Celery。故而,下面我們來看Celery的架構圖,以此來講解。其他的任務隊列也并不會與之有太大的差異性,基礎的原理是一致的。
在 Celery
的架構中,由多臺 Server 發(fā)起 異步任務(Async Task)
,發(fā)送任務到 Broker
的隊列中,其中的 Celery Beat
進程可負責發(fā)起定時任務。當 Task
到達 Broker
后,會將其分發(fā)給相應的 Celery Worker
進行處理。當 Task
處理完成后,其結果存儲至 Backend
。
在上述過程中的 Broker
和 Backend
, Celery
并沒有去實現(xiàn),而是使用了已有的開源實現(xiàn),例如 RabbitMQ
作為 Broker
提供消息隊列服務, Redis
作為 Backend
提供結果存儲服務。Celery 就像是抽象了消息隊列架構中 Producer
、 Consumer
的實現(xiàn),將消息隊列中基本單位 “消息”
抽象成了任務隊列中的“任務”,并將異步、定時任務的發(fā)起和結果存儲等操作進行了封裝,讓開發(fā)者可以忽略 AMQP、RabbitMQ 等實現(xiàn)細節(jié),為開發(fā)帶來便利。
綜上所述,Celery 作為任務隊列是基于消息隊列的進一步封裝,其實現(xiàn)依賴消息隊列。
任務隊列的應用場景
我們現(xiàn)在知道了任務隊列是什么,也知道了它的工作原理。但是,我們并不知道它可以用來做什么。下面,我們就來看看,它到底用在什么樣的場景下。
- 分布式任務:可以將任務分發(fā)到多個工作者進程或機器上執(zhí)行,以提高任務處理速度。
- 定時任務:可以在指定時間執(zhí)行任務。例如:每天定時備份數(shù)據(jù)、日志歸檔、心跳測試、運維巡檢。支持 crontab 定時模式
- 后臺任務:可以在后臺執(zhí)行耗時任務,例如圖像處理、數(shù)據(jù)分析等,不影響用戶界面的響應。
- 解耦任務:可以將任務與主程序解耦,以提高代碼的可讀性和可維護性,解耦應用程序最直接的好處就是可擴展性和并發(fā)性能的提高。支持并發(fā)執(zhí)行任務,同時支持自動動態(tài)擴展。
- 實時處理:可以支持實時處理任務,例如即時通訊、消息隊列等。
Asynq概述
Asynq是一個使用Go語言實現(xiàn)的分布式任務隊列和異步處理庫,它由Redis提供支持,它提供了輕量級的、易于使用的API,并且具有高可擴展性和高可定制化性。其作者Ken Hibino,任職于Google。
Asynq主要由以下幾個組件組成:
- 任務(Task):需要被異步執(zhí)行的操作;
- 處理器(Processor):負責執(zhí)行任務的工作進程;
- 隊列(Queue):存放待執(zhí)行任務的隊列;
- 調度器(Scheduler):根據(jù)規(guī)則將任務分配給不同的處理器進行執(zhí)行。
通過使用Asynq,我們可以非常輕松的實現(xiàn)異步任務處理,同時還可以提供高效率、高可擴展性和高自定義性的處理方案。
Asynq的特點
- 保證至少執(zhí)行一次任務
- 任務寫入Redis后可以持久化
- 任務失敗之后,會自動重試
- worker崩潰自動恢復
- 可是實現(xiàn)任務的優(yōu)先級
- 任務可以進行編排
- 任務可以設定執(zhí)行時間或者最長可執(zhí)行的時間
- 支持中間件
- 可以使用 unique-option 來避免任務重復執(zhí)行,實現(xiàn)唯一性
- 支持 Redis Cluster 和 Redis Sentinels 以達成高可用性
- 作者提供了Web UI & CLI Tool讓大家查看任務的執(zhí)行情況
Asynq可視化監(jiān)控
Asynq提供了兩種監(jiān)控手段:CLI和Web UI。
命令行工具CLI
go install github.com/hibiken/asynq/tools/asynq@latest
Web UI
Asynqmon是一個基于Web的工具,用于監(jiān)視管理Asynq的任務和隊列,有關詳細的信息可以參閱工具的README。
Web UI我們可以通過Docker的方式來進行安裝:
docker pull hibiken/asynqmon:latest docker run -d \ --name asynq \ -p 8080:8080 \ hibiken/asynqmon:latest --redis-addr=host.docker.internal:6379
安裝好Web UI之后,我們就可以打開瀏覽器訪問管理后臺了:http://localhost:8080
- 儀表盤
- 任務視圖
- 性能
Kratos下實現(xiàn)分布式任務隊列
我們將分布式任務隊列以 transport.Server
的形式整合進微服務框架 Kratos
。
目前,go里面有兩個分布式任務隊列可用:
我已經(jīng)對這兩個庫進行了支持:
創(chuàng)建Kratos服務端
因為它依賴Redis,因此,我們可以使用Docker的方式安裝Redis的服務器:
docker pull bitnami/redis:latest docker run -itd \ --name redis-test \ -p 6379:6379 \ -e ALLOW_EMPTY_PASSWORD=yes \ bitnami/redis:latest
然后,我們需要在項目中安裝Asynq的依賴庫:
go get -u github.com/tx7do/kratos-transport/transport/asynq
接著,我們在代碼當中引入庫,并且創(chuàng)建出來 Server
:
import github.com/tx7do/kratos-transport/transport/asynq const ( localRedisAddr = "127.0.0.1:6379" ) ctx := context.Background() srv := asynq.NewServer( asynq.WithAddress(localRedisAddr), ) if err := srv.Start(ctx); err != nil { panic(err) } defer srv.Stop(ctx)
注冊任務回調
const ( testTask1 = "test_task_1" testDelayTask = "test_delay_task" testPeriodicTask = "test_periodic_task" ) type DelayTask struct { Message string `json:"message"` } func DelayTaskBinder() Any { return &DelayTask{} } func handleTask1(taskType string, taskData *DelayTask) error { LogInfof("Task Type: [%s], Payload: [%s]", taskType, taskData.Message) return nil } func handleDelayTask(taskType string, taskData *DelayTask) error { LogInfof("Delay Task Type: [%s], Payload: [%s]", taskType, taskData.Message) return nil } func handlePeriodicTask(taskType string, taskData *DelayTask) error { LogInfof("Periodic Task Type: [%s], Payload: [%s]", taskType, taskData.Message) return nil } var err error err = srv.RegisterSubscriber(testTask1, func(taskType string, payload MessagePayload) error { switch t := payload.(type) { case *DelayTask: return handleTask1(taskType, t) default: LogError("invalid payload struct type:", t) return errors.New("invalid payload struct type") } }, DelayTaskBinder, ) err = srv.RegisterSubscriber(testDelayTask, func(taskType string, payload MessagePayload) error { switch t := payload.(type) { case *DelayTask: return handleDelayTask(taskType, t) default: LogError("invalid payload struct type:", t) return errors.New("invalid payload struct type") } }, DelayTaskBinder, ) err = srv.RegisterSubscriber(testPeriodicTask, func(taskType string, payload MessagePayload) error { switch t := payload.(type) { case *DelayTask: return handlePeriodicTask(taskType, t) default: LogError("invalid payload struct type:", t) return errors.New("invalid payload struct type") } }, DelayTaskBinder, )
此步驟,相當于是異步隊列中訂閱了某一類型任務。最終它由 asynq.Server
來執(zhí)行。
創(chuàng)建新任務
新建任務,有兩個方法: NewTask
和 NewPeriodicTask
,內部分別對應著 asynq.Client
和 asynq.Scheduler
。
NewTask
是通過 asynq.Client
將任務直接入了隊列。
普通任務
普通任務通常是入列后立即執(zhí)行的(如果不需要排隊的),下面就是最簡單的任務,一個類型(Type),一個負載數(shù)據(jù)(Payload)就構成了一個最簡單的任務:
err = srv.NewTask(testTask1, &DelayTask{Message: "delay task"}, )
當然,你也可以添加一些的參數(shù),比如重試次數(shù)、超時時間、過期時間等……
// 最多重試3次,10秒超時,20秒后過期 err = srv.NewTask(testTask1, &DelayTask{Message: "delay task"}, asynq.MaxRetry(10), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)), )
延遲任務(Delay Task)
延遲任務,顧名思義,也就是推遲到指定時間執(zhí)行的任務,我們可以有兩個參數(shù)可以注入: ProcessAt
和 ProcessIn
。
ProcessIn
指的是從現(xiàn)在開始推遲多少時間執(zhí)行:
// 3秒后執(zhí)行 err = srv.NewTask(testDelayTask, &DelayTask{Message: "delay task"}, asynq.ProcessIn(3*time.Second), )
ProcessAt
指的是在指定的某一個具體時間執(zhí)行:
// 1小時后的時間點執(zhí)行 oneHourLater := now.Add(time.Hour) err = srv.NewTask(testDelayTask, &DelayTask{Message: "delay task"}, asynq.ProcessAt(oneHourLater), )
周期性任務(Periodic Task)
周期性任務 asynq.Scheduler
內部是通過Crontab來實現(xiàn)定時的,定時器到點之后,就調度任務。它默認使用的是UTC時區(qū)。
// 每分鐘執(zhí)行一次 _, err = srv.NewPeriodicTask( "*/1 * * * ?", testPeriodicTask, &DelayTask{Message: "periodic task"}, )
需要注意的是,若要保證周期性任務的持續(xù)調度執(zhí)行, asynq.Scheduler
必須要一直運行著,否則調度將不會發(fā)生。調度器本身不參與任務的執(zhí)行,但是沒有它的存在,調度將不不復存在,也不會發(fā)生。
示例代碼
示例代碼可以在單元測試代碼中找到:kratos-transport/transport/asynq/server_test.go at main · tx7do/kratos-transport · GitHub
以上就是Golang微服務框架Kratos實現(xiàn)分布式任務隊列Asynq的方法詳解的詳細內容,更多關于Golang Kratos實現(xiàn)Asynq的資料請關注腳本之家其它相關文章!