欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang微服務(wù)框架Kratos實(shí)現(xiàn)分布式任務(wù)隊(duì)列Asynq的方法詳解

 更新時(shí)間:2023年09月02日 08:36:57   作者:喵個(gè)咪  
任務(wù)隊(duì)列(Task Queue) 一般用于跨線程或跨計(jì)算機(jī)分配工作的一種機(jī)制,在Golang語言里面,我們有像Asynq和Machinery這樣的類似于Celery的分布式任務(wù)隊(duì)列,本文就給大家詳細(xì)介紹一下Golang微服務(wù)框架Kratos實(shí)現(xiàn)分布式任務(wù)隊(duì)列Asynq的方法,需要的朋友可以參考下

Golang微服務(wù)框架Kratos實(shí)現(xiàn)分布式任務(wù)隊(duì)列Asynq

任務(wù)隊(duì)列(Task Queue) 一般用于跨線程或跨計(jì)算機(jī)分配工作的一種機(jī)制。其本質(zhì)是生產(chǎn)者消費(fèi)者模型,生產(chǎn)者發(fā)送任務(wù)到消息隊(duì)列,消費(fèi)者負(fù)責(zé)處理任務(wù)。

任務(wù)隊(duì)列的輸入是稱為 任務(wù)(Task) 的工作單元。專用的工作進(jìn)程不斷監(jiān)視任務(wù)隊(duì)列以查找要執(zhí)行的新工作。

在Golang語言里面,我們有像AsynqMachinery這樣的類似于 Celery 的分布式任務(wù)隊(duì)列。

什么是任務(wù)隊(duì)列

消息隊(duì)列(Message Queue),一般來說知道的人不少。比如常見的:kafka、Rabbitmq、RocketMQ等。

任務(wù)隊(duì)列(Task Queue),聽說過這個(gè)概念的人不會(huì)太多,清楚它的概念的人怕是更少。

這兩個(gè)概念是有關(guān)系的,他們是怎樣的關(guān)系呢?任務(wù)隊(duì)列(Task Queue)是消息隊(duì)列(Message Queue)的超集。任務(wù)隊(duì)列是構(gòu)建在消息隊(duì)列之上的。消息隊(duì)列是任務(wù)隊(duì)列的一部分。

提起分布式任務(wù)隊(duì)列(Distributed Task Queue),就不得不提 Python Celery。故而,下面我們來看Celery的架構(gòu)圖,以此來講解。其他的任務(wù)隊(duì)列也并不會(huì)與之有太大的差異性,基礎(chǔ)的原理是一致的。

celery_framework.png

Celery 的架構(gòu)中,由多臺(tái) Server 發(fā)起 異步任務(wù)(Async Task) ,發(fā)送任務(wù)到 Broker 的隊(duì)列中,其中的 Celery Beat 進(jìn)程可負(fù)責(zé)發(fā)起定時(shí)任務(wù)。當(dāng) Task 到達(dá) Broker 后,會(huì)將其分發(fā)給相應(yīng)的 Celery Worker 進(jìn)行處理。當(dāng) Task 處理完成后,其結(jié)果存儲(chǔ)至 Backend

在上述過程中的 Broker Backend , Celery 并沒有去實(shí)現(xiàn),而是使用了已有的開源實(shí)現(xiàn),例如 RabbitMQ 作為 Broker 提供消息隊(duì)列服務(wù), Redis 作為 Backend 提供結(jié)果存儲(chǔ)服務(wù)。Celery 就像是抽象了消息隊(duì)列架構(gòu)中 Producer 、 Consumer 的實(shí)現(xiàn),將消息隊(duì)列中基本單位 “消息” 抽象成了任務(wù)隊(duì)列中的“任務(wù)”,并將異步、定時(shí)任務(wù)的發(fā)起和結(jié)果存儲(chǔ)等操作進(jìn)行了封裝,讓開發(fā)者可以忽略 AMQP、RabbitMQ 等實(shí)現(xiàn)細(xì)節(jié),為開發(fā)帶來便利。

綜上所述,Celery 作為任務(wù)隊(duì)列是基于消息隊(duì)列的進(jìn)一步封裝,其實(shí)現(xiàn)依賴消息隊(duì)列。

任務(wù)隊(duì)列的應(yīng)用場(chǎng)景

我們現(xiàn)在知道了任務(wù)隊(duì)列是什么,也知道了它的工作原理。但是,我們并不知道它可以用來做什么。下面,我們就來看看,它到底用在什么樣的場(chǎng)景下。

  1. 分布式任務(wù):可以將任務(wù)分發(fā)到多個(gè)工作者進(jìn)程或機(jī)器上執(zhí)行,以提高任務(wù)處理速度。
  2. 定時(shí)任務(wù):可以在指定時(shí)間執(zhí)行任務(wù)。例如:每天定時(shí)備份數(shù)據(jù)、日志歸檔、心跳測(cè)試、運(yùn)維巡檢。支持 crontab 定時(shí)模式
  3. 后臺(tái)任務(wù):可以在后臺(tái)執(zhí)行耗時(shí)任務(wù),例如圖像處理、數(shù)據(jù)分析等,不影響用戶界面的響應(yīng)。
  4. 解耦任務(wù):可以將任務(wù)與主程序解耦,以提高代碼的可讀性和可維護(hù)性,解耦應(yīng)用程序最直接的好處就是可擴(kuò)展性和并發(fā)性能的提高。支持并發(fā)執(zhí)行任務(wù),同時(shí)支持自動(dòng)動(dòng)態(tài)擴(kuò)展。
  5. 實(shí)時(shí)處理:可以支持實(shí)時(shí)處理任務(wù),例如即時(shí)通訊、消息隊(duì)列等。

Asynq概述

Asynq是一個(gè)使用Go語言實(shí)現(xiàn)的分布式任務(wù)隊(duì)列和異步處理庫,它由Redis提供支持,它提供了輕量級(jí)的、易于使用的API,并且具有高可擴(kuò)展性和高可定制化性。其作者Ken Hibino,任職于Google。

Asynq主要由以下幾個(gè)組件組成:

  • 任務(wù)(Task):需要被異步執(zhí)行的操作;
  • 處理器(Processor):負(fù)責(zé)執(zhí)行任務(wù)的工作進(jìn)程;
  • 隊(duì)列(Queue):存放待執(zhí)行任務(wù)的隊(duì)列;
  • 調(diào)度器(Scheduler):根據(jù)規(guī)則將任務(wù)分配給不同的處理器進(jìn)行執(zhí)行。

asynq_framework.png

通過使用Asynq,我們可以非常輕松的實(shí)現(xiàn)異步任務(wù)處理,同時(shí)還可以提供高效率、高可擴(kuò)展性和高自定義性的處理方案。

Asynq的特點(diǎn)

  • 保證至少執(zhí)行一次任務(wù)
  • 任務(wù)寫入Redis后可以持久化
  • 任務(wù)失敗之后,會(huì)自動(dòng)重試
  • worker崩潰自動(dòng)恢復(fù)
  • 可是實(shí)現(xiàn)任務(wù)的優(yōu)先級(jí)
  • 任務(wù)可以進(jìn)行編排
  • 任務(wù)可以設(shè)定執(zhí)行時(shí)間或者最長(zhǎng)可執(zhí)行的時(shí)間
  • 支持中間件
  • 可以使用 unique-option 來避免任務(wù)重復(fù)執(zhí)行,實(shí)現(xiàn)唯一性
  • 支持 Redis Cluster 和 Redis Sentinels 以達(dá)成高可用性
  • 作者提供了Web UI & CLI Tool讓大家查看任務(wù)的執(zhí)行情況

Asynq可視化監(jiān)控

Asynq提供了兩種監(jiān)控手段:CLI和Web UI。

命令行工具CLI

go install github.com/hibiken/asynq/tools/asynq@latest

Web UI

Asynqmon是一個(gè)基于Web的工具,用于監(jiān)視管理Asynq的任務(wù)和隊(duì)列,有關(guān)詳細(xì)的信息可以參閱工具的README。

Web UI我們可以通過Docker的方式來進(jìn)行安裝:

docker pull hibiken/asynqmon:latest
docker run -d \
    --name asynq \
    -p 8080:8080 \
    hibiken/asynqmon:latest --redis-addr=host.docker.internal:6379

安裝好Web UI之后,我們就可以打開瀏覽器訪問管理后臺(tái)了:http://localhost:8080

  • 儀表盤

asynq_web_ui_dashboard.png

  • 任務(wù)視圖

asynq_web_ui_task_view.png

  • 性能

asynq_web_ui_metrics.png

Kratos下實(shí)現(xiàn)分布式任務(wù)隊(duì)列

我們將分布式任務(wù)隊(duì)列以 transport.Server 的形式整合進(jìn)微服務(wù)框架 Kratos 。

目前,go里面有兩個(gè)分布式任務(wù)隊(duì)列可用:

我已經(jīng)對(duì)這兩個(gè)庫進(jìn)行了支持:

創(chuàng)建Kratos服務(wù)端

因?yàn)樗蕾嘡edis,因此,我們可以使用Docker的方式安裝Redis的服務(wù)器:

docker pull bitnami/redis:latest
docker run -itd \
    --name redis-test \
    -p 6379:6379 \
    -e ALLOW_EMPTY_PASSWORD=yes \
    bitnami/redis:latest

然后,我們需要在項(xiàng)目中安裝Asynq的依賴庫:

go get -u github.com/tx7do/kratos-transport/transport/asynq

接著,我們?cè)诖a當(dāng)中引入庫,并且創(chuàng)建出來 Server

import github.com/tx7do/kratos-transport/transport/asynq
const (
	localRedisAddr = "127.0.0.1:6379"
)
ctx := context.Background()
srv := asynq.NewServer(
    asynq.WithAddress(localRedisAddr),
)
if err := srv.Start(ctx); err != nil {
    panic(err)
}
defer srv.Stop(ctx)

注冊(cè)任務(wù)回調(diào)

const (
	testTask1        = "test_task_1"
	testDelayTask    = "test_delay_task"
	testPeriodicTask = "test_periodic_task"
)
type DelayTask struct {
	Message string `json:"message"`
}
func DelayTaskBinder() Any { return &DelayTask{} }
func handleTask1(taskType string, taskData *DelayTask) error {
	LogInfof("Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
	return nil
}
func handleDelayTask(taskType string, taskData *DelayTask) error {
	LogInfof("Delay Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
	return nil
}
func handlePeriodicTask(taskType string, taskData *DelayTask) error {
	LogInfof("Periodic Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
	return nil
}
var err error
err = srv.RegisterSubscriber(testTask1,
    func(taskType string, payload MessagePayload) error {
        switch t := payload.(type) {
        case *DelayTask:
            return handleTask1(taskType, t)
        default:
            LogError("invalid payload struct type:", t)
            return errors.New("invalid payload struct type")
        }
    },
    DelayTaskBinder,
)
err = srv.RegisterSubscriber(testDelayTask,
    func(taskType string, payload MessagePayload) error {
        switch t := payload.(type) {
        case *DelayTask:
            return handleDelayTask(taskType, t)
        default:
            LogError("invalid payload struct type:", t)
            return errors.New("invalid payload struct type")
        }
    },
    DelayTaskBinder,
)
err = srv.RegisterSubscriber(testPeriodicTask,
    func(taskType string, payload MessagePayload) error {
        switch t := payload.(type) {
        case *DelayTask:
            return handlePeriodicTask(taskType, t)
        default:
            LogError("invalid payload struct type:", t)
            return errors.New("invalid payload struct type")
        }
    },
    DelayTaskBinder,
)

此步驟,相當(dāng)于是異步隊(duì)列中訂閱了某一類型任務(wù)。最終它由 asynq.Server 來執(zhí)行。

創(chuàng)建新任務(wù)

新建任務(wù),有兩個(gè)方法: NewTask NewPeriodicTask ,內(nèi)部分別對(duì)應(yīng)著 asynq.Client asynq.Scheduler

NewTask 是通過 asynq.Client 將任務(wù)直接入了隊(duì)列。

普通任務(wù)

普通任務(wù)通常是入列后立即執(zhí)行的(如果不需要排隊(duì)的),下面就是最簡(jiǎn)單的任務(wù),一個(gè)類型(Type),一個(gè)負(fù)載數(shù)據(jù)(Payload)就構(gòu)成了一個(gè)最簡(jiǎn)單的任務(wù):

err = srv.NewTask(testTask1, 
    &DelayTask{Message: "delay task"},
)

當(dāng)然,你也可以添加一些的參數(shù),比如重試次數(shù)、超時(shí)時(shí)間、過期時(shí)間等……

// 最多重試3次,10秒超時(shí),20秒后過期
err = srv.NewTask(testTask1, 
    &DelayTask{Message: "delay task"},
    asynq.MaxRetry(10),
    asynq.Timeout(10*time.Second),
    asynq.Deadline(time.Now().Add(20*time.Second)),
)

延遲任務(wù)(Delay Task)

延遲任務(wù),顧名思義,也就是推遲到指定時(shí)間執(zhí)行的任務(wù),我們可以有兩個(gè)參數(shù)可以注入: ProcessAt ProcessIn 。

ProcessIn 指的是從現(xiàn)在開始推遲多少時(shí)間執(zhí)行:

// 3秒后執(zhí)行
err = srv.NewTask(testDelayTask,
    &DelayTask{Message: "delay task"},
    asynq.ProcessIn(3*time.Second),
)

ProcessAt 指的是在指定的某一個(gè)具體時(shí)間執(zhí)行:

// 1小時(shí)后的時(shí)間點(diǎn)執(zhí)行
oneHourLater := now.Add(time.Hour)
err = srv.NewTask(testDelayTask,
    &DelayTask{Message: "delay task"},
    asynq.ProcessAt(oneHourLater),
)

周期性任務(wù)(Periodic Task)

周期性任務(wù) asynq.Scheduler 內(nèi)部是通過Crontab來實(shí)現(xiàn)定時(shí)的,定時(shí)器到點(diǎn)之后,就調(diào)度任務(wù)。它默認(rèn)使用的是UTC時(shí)區(qū)。

// 每分鐘執(zhí)行一次
_, err = srv.NewPeriodicTask(
    "*/1 * * * ?",
    testPeriodicTask,
    &DelayTask{Message: "periodic task"},
)

需要注意的是,若要保證周期性任務(wù)的持續(xù)調(diào)度執(zhí)行, asynq.Scheduler 必須要一直運(yùn)行著,否則調(diào)度將不會(huì)發(fā)生。調(diào)度器本身不參與任務(wù)的執(zhí)行,但是沒有它的存在,調(diào)度將不不復(fù)存在,也不會(huì)發(fā)生。

示例代碼

示例代碼可以在單元測(cè)試代碼中找到:kratos-transport/transport/asynq/server_test.go at main · tx7do/kratos-transport · GitHub

以上就是Golang微服務(wù)框架Kratos實(shí)現(xiàn)分布式任務(wù)隊(duì)列Asynq的方法詳解的詳細(xì)內(nèi)容,更多關(guān)于Golang Kratos實(shí)現(xiàn)Asynq的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 解決go build不去vendor下查找包的問題

    解決go build不去vendor下查找包的問題

    這篇文章主要介紹了解決go build不去vendor下查找包的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • Go語言中的錯(cuò)誤處理最佳實(shí)踐詳解

    Go語言中的錯(cuò)誤處理最佳實(shí)踐詳解

    這篇文章主要為大家詳細(xì)介紹了Go語言中的錯(cuò)誤處理的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),對(duì)我們深入了解Go語言有一定的幫助,需要的可以參考下
    2023-08-08
  • 使用Go語言實(shí)現(xiàn)常見hash算法

    使用Go語言實(shí)現(xiàn)常見hash算法

    這篇文章主要為大家詳細(xì)介紹了使語言實(shí)現(xiàn)各種常見hash算法的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,需要的小伙伴可以參考下
    2024-01-01
  • 詳解golang開發(fā)中select多路選擇

    詳解golang開發(fā)中select多路選擇

    這篇文章主要介紹了golang開發(fā)中select多路選擇,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-09-09
  • go語言中的return語句

    go語言中的return語句

    這篇文章主要介紹了go語言中的return語句,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下,希望對(duì)你的學(xué)習(xí)有所幫助
    2022-05-05
  • Go 語言數(shù)組和切片的區(qū)別詳解

    Go 語言數(shù)組和切片的區(qū)別詳解

    本文主要介紹了Go 語言數(shù)組和切片的區(qū)別詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • go通過benchmark對(duì)代碼進(jìn)行性能測(cè)試詳解

    go通過benchmark對(duì)代碼進(jìn)行性能測(cè)試詳解

    在開發(fā)中我們要想編寫高性能的代碼,或者優(yōu)化代碼的性能時(shí),你首先得知道當(dāng)前代碼的性能,在go中可以使用testing包的benchmark來做基準(zhǔn)測(cè)試 ,文中有詳細(xì)的代碼示例,感興趣的小伙伴可以參考一下
    2023-04-04
  • go語言定義零值可用的類型學(xué)習(xí)教程

    go語言定義零值可用的類型學(xué)習(xí)教程

    這篇文章主要為大家介紹了go語言定義零值可用的類型教程學(xué)習(xí),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-06-06
  • 深入了解Golang中的數(shù)據(jù)類型

    深入了解Golang中的數(shù)據(jù)類型

    在計(jì)算機(jī)編程中,數(shù)據(jù)類型是非常重要的一個(gè)概念。這篇文章將詳細(xì)介紹 Golang中的數(shù)據(jù)類型,包括基本類型、復(fù)合類型、引用類型以及自定義類型,希望對(duì)大家有所幫助
    2023-04-04
  • 一文詳解Golang中的切片數(shù)據(jù)類型

    一文詳解Golang中的切片數(shù)據(jù)類型

    這篇文章主要介紹了一文詳解Golang中的切片數(shù)據(jù)類型,切片是一個(gè)種特殊的數(shù)組。是對(duì)數(shù)組的一個(gè)連續(xù)片段的引用,所以切片是一個(gè)引用類型
    2022-09-09

最新評(píng)論