欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang微服務框架Kratos實現(xiàn)分布式任務隊列Asynq的方法詳解

 更新時間:2023年09月02日 08:36:57   作者:喵個咪  
任務隊列(Task Queue) 一般用于跨線程或跨計算機分配工作的一種機制,在Golang語言里面,我們有像Asynq和Machinery這樣的類似于Celery的分布式任務隊列,本文就給大家詳細介紹一下Golang微服務框架Kratos實現(xiàn)分布式任務隊列Asynq的方法,需要的朋友可以參考下

Golang微服務框架Kratos實現(xiàn)分布式任務隊列Asynq

任務隊列(Task Queue) 一般用于跨線程或跨計算機分配工作的一種機制。其本質是生產(chǎn)者消費者模型,生產(chǎn)者發(fā)送任務到消息隊列,消費者負責處理任務。

任務隊列的輸入是稱為 任務(Task) 的工作單元。專用的工作進程不斷監(jiān)視任務隊列以查找要執(zhí)行的新工作。

在Golang語言里面,我們有像AsynqMachinery這樣的類似于 Celery 的分布式任務隊列。

什么是任務隊列

消息隊列(Message Queue),一般來說知道的人不少。比如常見的:kafka、Rabbitmq、RocketMQ等。

任務隊列(Task Queue),聽說過這個概念的人不會太多,清楚它的概念的人怕是更少。

這兩個概念是有關系的,他們是怎樣的關系呢?任務隊列(Task Queue)是消息隊列(Message Queue)的超集。任務隊列是構建在消息隊列之上的。消息隊列是任務隊列的一部分。

提起分布式任務隊列(Distributed Task Queue),就不得不提 Python Celery。故而,下面我們來看Celery的架構圖,以此來講解。其他的任務隊列也并不會與之有太大的差異性,基礎的原理是一致的。

celery_framework.png

Celery 的架構中,由多臺 Server 發(fā)起 異步任務(Async Task) ,發(fā)送任務到 Broker 的隊列中,其中的 Celery Beat 進程可負責發(fā)起定時任務。當 Task 到達 Broker 后,會將其分發(fā)給相應的 Celery Worker 進行處理。當 Task 處理完成后,其結果存儲至 Backend 。

在上述過程中的 Broker Backend , Celery 并沒有去實現(xiàn),而是使用了已有的開源實現(xiàn),例如 RabbitMQ 作為 Broker 提供消息隊列服務, Redis 作為 Backend 提供結果存儲服務。Celery 就像是抽象了消息隊列架構中 Producer 、 Consumer 的實現(xiàn),將消息隊列中基本單位 “消息” 抽象成了任務隊列中的“任務”,并將異步、定時任務的發(fā)起和結果存儲等操作進行了封裝,讓開發(fā)者可以忽略 AMQP、RabbitMQ 等實現(xiàn)細節(jié),為開發(fā)帶來便利。

綜上所述,Celery 作為任務隊列是基于消息隊列的進一步封裝,其實現(xiàn)依賴消息隊列。

任務隊列的應用場景

我們現(xiàn)在知道了任務隊列是什么,也知道了它的工作原理。但是,我們并不知道它可以用來做什么。下面,我們就來看看,它到底用在什么樣的場景下。

  1. 分布式任務:可以將任務分發(fā)到多個工作者進程或機器上執(zhí)行,以提高任務處理速度。
  2. 定時任務:可以在指定時間執(zhí)行任務。例如:每天定時備份數(shù)據(jù)、日志歸檔、心跳測試、運維巡檢。支持 crontab 定時模式
  3. 后臺任務:可以在后臺執(zhí)行耗時任務,例如圖像處理、數(shù)據(jù)分析等,不影響用戶界面的響應。
  4. 解耦任務:可以將任務與主程序解耦,以提高代碼的可讀性和可維護性,解耦應用程序最直接的好處就是可擴展性和并發(fā)性能的提高。支持并發(fā)執(zhí)行任務,同時支持自動動態(tài)擴展。
  5. 實時處理:可以支持實時處理任務,例如即時通訊、消息隊列等。

Asynq概述

Asynq是一個使用Go語言實現(xiàn)的分布式任務隊列和異步處理庫,它由Redis提供支持,它提供了輕量級的、易于使用的API,并且具有高可擴展性和高可定制化性。其作者Ken Hibino,任職于Google。

Asynq主要由以下幾個組件組成:

  • 任務(Task):需要被異步執(zhí)行的操作;
  • 處理器(Processor):負責執(zhí)行任務的工作進程;
  • 隊列(Queue):存放待執(zhí)行任務的隊列;
  • 調度器(Scheduler):根據(jù)規(guī)則將任務分配給不同的處理器進行執(zhí)行。

asynq_framework.png

通過使用Asynq,我們可以非常輕松的實現(xiàn)異步任務處理,同時還可以提供高效率、高可擴展性和高自定義性的處理方案。

Asynq的特點

  • 保證至少執(zhí)行一次任務
  • 任務寫入Redis后可以持久化
  • 任務失敗之后,會自動重試
  • worker崩潰自動恢復
  • 可是實現(xiàn)任務的優(yōu)先級
  • 任務可以進行編排
  • 任務可以設定執(zhí)行時間或者最長可執(zhí)行的時間
  • 支持中間件
  • 可以使用 unique-option 來避免任務重復執(zhí)行,實現(xiàn)唯一性
  • 支持 Redis Cluster 和 Redis Sentinels 以達成高可用性
  • 作者提供了Web UI & CLI Tool讓大家查看任務的執(zhí)行情況

Asynq可視化監(jiān)控

Asynq提供了兩種監(jiān)控手段:CLI和Web UI。

命令行工具CLI

go install github.com/hibiken/asynq/tools/asynq@latest

Web UI

Asynqmon是一個基于Web的工具,用于監(jiān)視管理Asynq的任務和隊列,有關詳細的信息可以參閱工具的README。

Web UI我們可以通過Docker的方式來進行安裝:

docker pull hibiken/asynqmon:latest
docker run -d \
    --name asynq \
    -p 8080:8080 \
    hibiken/asynqmon:latest --redis-addr=host.docker.internal:6379

安裝好Web UI之后,我們就可以打開瀏覽器訪問管理后臺了:http://localhost:8080

  • 儀表盤

asynq_web_ui_dashboard.png

  • 任務視圖

asynq_web_ui_task_view.png

  • 性能

asynq_web_ui_metrics.png

Kratos下實現(xiàn)分布式任務隊列

我們將分布式任務隊列以 transport.Server 的形式整合進微服務框架 Kratos 。

目前,go里面有兩個分布式任務隊列可用:

我已經(jīng)對這兩個庫進行了支持:

創(chuàng)建Kratos服務端

因為它依賴Redis,因此,我們可以使用Docker的方式安裝Redis的服務器:

docker pull bitnami/redis:latest
docker run -itd \
    --name redis-test \
    -p 6379:6379 \
    -e ALLOW_EMPTY_PASSWORD=yes \
    bitnami/redis:latest

然后,我們需要在項目中安裝Asynq的依賴庫:

go get -u github.com/tx7do/kratos-transport/transport/asynq

接著,我們在代碼當中引入庫,并且創(chuàng)建出來 Server

import github.com/tx7do/kratos-transport/transport/asynq
const (
	localRedisAddr = "127.0.0.1:6379"
)
ctx := context.Background()
srv := asynq.NewServer(
    asynq.WithAddress(localRedisAddr),
)
if err := srv.Start(ctx); err != nil {
    panic(err)
}
defer srv.Stop(ctx)

注冊任務回調

const (
	testTask1        = "test_task_1"
	testDelayTask    = "test_delay_task"
	testPeriodicTask = "test_periodic_task"
)
type DelayTask struct {
	Message string `json:"message"`
}
func DelayTaskBinder() Any { return &DelayTask{} }
func handleTask1(taskType string, taskData *DelayTask) error {
	LogInfof("Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
	return nil
}
func handleDelayTask(taskType string, taskData *DelayTask) error {
	LogInfof("Delay Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
	return nil
}
func handlePeriodicTask(taskType string, taskData *DelayTask) error {
	LogInfof("Periodic Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
	return nil
}
var err error
err = srv.RegisterSubscriber(testTask1,
    func(taskType string, payload MessagePayload) error {
        switch t := payload.(type) {
        case *DelayTask:
            return handleTask1(taskType, t)
        default:
            LogError("invalid payload struct type:", t)
            return errors.New("invalid payload struct type")
        }
    },
    DelayTaskBinder,
)
err = srv.RegisterSubscriber(testDelayTask,
    func(taskType string, payload MessagePayload) error {
        switch t := payload.(type) {
        case *DelayTask:
            return handleDelayTask(taskType, t)
        default:
            LogError("invalid payload struct type:", t)
            return errors.New("invalid payload struct type")
        }
    },
    DelayTaskBinder,
)
err = srv.RegisterSubscriber(testPeriodicTask,
    func(taskType string, payload MessagePayload) error {
        switch t := payload.(type) {
        case *DelayTask:
            return handlePeriodicTask(taskType, t)
        default:
            LogError("invalid payload struct type:", t)
            return errors.New("invalid payload struct type")
        }
    },
    DelayTaskBinder,
)

此步驟,相當于是異步隊列中訂閱了某一類型任務。最終它由 asynq.Server 來執(zhí)行。

創(chuàng)建新任務

新建任務,有兩個方法: NewTask NewPeriodicTask ,內部分別對應著 asynq.Client asynq.Scheduler 。

NewTask 是通過 asynq.Client 將任務直接入了隊列。

普通任務

普通任務通常是入列后立即執(zhí)行的(如果不需要排隊的),下面就是最簡單的任務,一個類型(Type),一個負載數(shù)據(jù)(Payload)就構成了一個最簡單的任務:

err = srv.NewTask(testTask1, 
    &DelayTask{Message: "delay task"},
)

當然,你也可以添加一些的參數(shù),比如重試次數(shù)、超時時間、過期時間等……

// 最多重試3次,10秒超時,20秒后過期
err = srv.NewTask(testTask1, 
    &DelayTask{Message: "delay task"},
    asynq.MaxRetry(10),
    asynq.Timeout(10*time.Second),
    asynq.Deadline(time.Now().Add(20*time.Second)),
)

延遲任務(Delay Task)

延遲任務,顧名思義,也就是推遲到指定時間執(zhí)行的任務,我們可以有兩個參數(shù)可以注入: ProcessAt ProcessIn

ProcessIn 指的是從現(xiàn)在開始推遲多少時間執(zhí)行:

// 3秒后執(zhí)行
err = srv.NewTask(testDelayTask,
    &DelayTask{Message: "delay task"},
    asynq.ProcessIn(3*time.Second),
)

ProcessAt 指的是在指定的某一個具體時間執(zhí)行:

// 1小時后的時間點執(zhí)行
oneHourLater := now.Add(time.Hour)
err = srv.NewTask(testDelayTask,
    &DelayTask{Message: "delay task"},
    asynq.ProcessAt(oneHourLater),
)

周期性任務(Periodic Task)

周期性任務 asynq.Scheduler 內部是通過Crontab來實現(xiàn)定時的,定時器到點之后,就調度任務。它默認使用的是UTC時區(qū)。

// 每分鐘執(zhí)行一次
_, err = srv.NewPeriodicTask(
    "*/1 * * * ?",
    testPeriodicTask,
    &DelayTask{Message: "periodic task"},
)

需要注意的是,若要保證周期性任務的持續(xù)調度執(zhí)行, asynq.Scheduler 必須要一直運行著,否則調度將不會發(fā)生。調度器本身不參與任務的執(zhí)行,但是沒有它的存在,調度將不不復存在,也不會發(fā)生。

示例代碼

示例代碼可以在單元測試代碼中找到:kratos-transport/transport/asynq/server_test.go at main · tx7do/kratos-transport · GitHub

以上就是Golang微服務框架Kratos實現(xiàn)分布式任務隊列Asynq的方法詳解的詳細內容,更多關于Golang Kratos實現(xiàn)Asynq的資料請關注腳本之家其它相關文章!

相關文章

  • 解決go build不去vendor下查找包的問題

    解決go build不去vendor下查找包的問題

    這篇文章主要介紹了解決go build不去vendor下查找包的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • Go語言中的錯誤處理最佳實踐詳解

    Go語言中的錯誤處理最佳實踐詳解

    這篇文章主要為大家詳細介紹了Go語言中的錯誤處理的相關知識,文中的示例代碼講解詳細,對我們深入了解Go語言有一定的幫助,需要的可以參考下
    2023-08-08
  • 使用Go語言實現(xiàn)常見hash算法

    使用Go語言實現(xiàn)常見hash算法

    這篇文章主要為大家詳細介紹了使語言實現(xiàn)各種常見hash算法的相關知識,文中的示例代碼講解詳細,具有一定的借鑒價值,需要的小伙伴可以參考下
    2024-01-01
  • 詳解golang開發(fā)中select多路選擇

    詳解golang開發(fā)中select多路選擇

    這篇文章主要介紹了golang開發(fā)中select多路選擇,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-09-09
  • go語言中的return語句

    go語言中的return語句

    這篇文章主要介紹了go語言中的return語句,文章圍繞主題展開詳細的內容介紹,具有一定的參考價值,需要的小伙伴可以參考一下,希望對你的學習有所幫助
    2022-05-05
  • Go 語言數(shù)組和切片的區(qū)別詳解

    Go 語言數(shù)組和切片的區(qū)別詳解

    本文主要介紹了Go 語言數(shù)組和切片的區(qū)別詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-04-04
  • go通過benchmark對代碼進行性能測試詳解

    go通過benchmark對代碼進行性能測試詳解

    在開發(fā)中我們要想編寫高性能的代碼,或者優(yōu)化代碼的性能時,你首先得知道當前代碼的性能,在go中可以使用testing包的benchmark來做基準測試 ,文中有詳細的代碼示例,感興趣的小伙伴可以參考一下
    2023-04-04
  • go語言定義零值可用的類型學習教程

    go語言定義零值可用的類型學習教程

    這篇文章主要為大家介紹了go語言定義零值可用的類型教程學習,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-06-06
  • 深入了解Golang中的數(shù)據(jù)類型

    深入了解Golang中的數(shù)據(jù)類型

    在計算機編程中,數(shù)據(jù)類型是非常重要的一個概念。這篇文章將詳細介紹 Golang中的數(shù)據(jù)類型,包括基本類型、復合類型、引用類型以及自定義類型,希望對大家有所幫助
    2023-04-04
  • 一文詳解Golang中的切片數(shù)據(jù)類型

    一文詳解Golang中的切片數(shù)據(jù)類型

    這篇文章主要介紹了一文詳解Golang中的切片數(shù)據(jù)類型,切片是一個種特殊的數(shù)組。是對數(shù)組的一個連續(xù)片段的引用,所以切片是一個引用類型
    2022-09-09

最新評論