golang實(shí)現(xiàn)延遲隊(duì)列(delay queue)的兩種實(shí)現(xiàn)
1 延遲隊(duì)列:郵件提醒、訂單自動(dòng)取消
延遲隊(duì)列:處理需要在未來(lái)某個(gè)特定時(shí)間執(zhí)行的任務(wù)。這些任務(wù)被添加到隊(duì)列中,并且指定了一個(gè)執(zhí)行時(shí)間,只有達(dá)到指定的時(shí)間點(diǎn)時(shí)才能從隊(duì)列中取出并執(zhí)行。
應(yīng)用場(chǎng)景:
- 郵件提醒
- 訂單自動(dòng)取消(超過(guò)多少時(shí)間未支付,就取消訂單)
- 對(duì)超時(shí)任務(wù)的處理等
由于任務(wù)的執(zhí)行是在未來(lái)的某個(gè)時(shí)間點(diǎn),因此這些任務(wù)不會(huì)立即執(zhí)行,而是存儲(chǔ)在隊(duì)列中,直到它的預(yù)定執(zhí)行時(shí)間才會(huì)被執(zhí)行。
2 實(shí)現(xiàn)
2.1 simple簡(jiǎn)單版:go自帶的time包實(shí)現(xiàn)
思路:
定義Task結(jié)構(gòu)體,包含
- ExecuteTime time.Time
- Job func()
定義DelayQueue
- TaskQueue []Task
- func AddTask
- func RemoveTask
- ExecuteTask
這種方案存在的問(wèn)題:
Go程序重啟時(shí),存儲(chǔ)在slice中的延遲處理任務(wù)將全部丟失
完整代碼:
package main import ( "fmt" "time" ) /* 基于go實(shí)現(xiàn)延遲隊(duì)列 */ type Task struct { ExecuteTime time.Time Job func() } type DelayQueue struct { Tasks []*Task } func (d *DelayQueue) AddTask(t *Task) { d.Tasks = append(d.Tasks, t) } func (d *DelayQueue) RemoveTask() { //FIFO: remove the first task to enqueue d.Tasks = d.Tasks[1:] } func (d *DelayQueue) ExecuteTask() { for len(d.Tasks) > 0 { //dequeue a task currentTask := d.Tasks[0] if time.Now().Before(currentTask.ExecuteTime) { //if the task execution time is not up, wait time.Sleep(currentTask.ExecuteTime.Sub(time.Now())) } //execute the task currentTask.Job() //remove task who has been executed d.RemoveTask() } } func main() { fmt.Println("start delayQueue") delayQueue := &DelayQueue{} firstTask := &Task{ ExecuteTime: time.Now().Add(time.Second * 1), Job: func() { fmt.Println("executed task 1 after delay") }, } delayQueue.AddTask(firstTask) secondTask := &Task{ ExecuteTime: time.Now().Add(time.Second * 7), Job: func() { fmt.Println("executed task 2 after delay") }, } delayQueue.AddTask(secondTask) delayQueue.ExecuteTask() fmt.Println("all tasks have been done!!!") }
效果:
2.2 complex持久版:go+redis
為了防止Go重啟后存儲(chǔ)到delayQueue的數(shù)據(jù)丟失,我們可以將任務(wù)持久化到redis中。
思路:
初始化redis連接
延遲隊(duì)列采用redis的zset(有序集合)實(shí)現(xiàn)
前置準(zhǔn)備:
# 安裝docker yum install -y yum-utils yum-config-manager \ --add-repo \ https://download.docker.com/linux/centos/docker-ce.repo yum install docker systemctl start docker # docker搭建redis mkdir -p /Users/ziyi2/docker-home/redis docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis
完整代碼:
package main import ( "fmt" "github.com/go-redis/redis" log "github.com/ziyifast/log" "time" ) /* 基于redis zset實(shí)現(xiàn)延遲隊(duì)列 */ var redisdb *redis.Client var DelayQueueKey = "delay-queue" func initClient() (err error) { redisdb = redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // not set password DB: 0, //use default db }) _, err = redisdb.Ping().Result() if err != nil { log.Errorf("%v", err) return err } return nil } func main() { err := initClient() if err != nil { log.Errorf("init redis client err: %v", err) return } addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix()) addTaskToQueue("task2", time.Now().Add(time.Second*8).Unix()) //執(zhí)行隊(duì)列中的任務(wù) getAndExecuteTask() } // executeTime為unix時(shí)間戳,作為zset中的score。允許redis按照task應(yīng)該執(zhí)行時(shí)間來(lái)進(jìn)行排序 func addTaskToQueue(task string, executeTime int64) { err := redisdb.ZAdd(DelayQueueKey, redis.Z{ Score: float64(executeTime), Member: task, }).Err() if err != nil { panic(err) } } // 從redis中取一個(gè)task并執(zhí)行 func getAndExecuteTask() { for { tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{ Min: "-inf", Max: fmt.Sprintf("%d", time.Now().Unix()), Offset: 0, Count: 1, }).Result() if err != nil { time.Sleep(time.Second * 1) continue } //處理任務(wù) for _, task := range tasks { fmt.Println("Execute task: ", task) //執(zhí)行完任務(wù)之后用 ZREM 移除該任務(wù) redisdb.ZRem(DelayQueueKey, task) } time.Sleep(time.Second * 1) } }
效果:
redis一直從延遲隊(duì)列中取數(shù)據(jù),如果處理完一批則睡眠1s
- 具體根據(jù)大家的業(yè)務(wù)調(diào)整,此處主要介紹思路
到此這篇關(guān)于golang實(shí)現(xiàn)延遲隊(duì)列(delay queue)的示例代碼的文章就介紹到這了,更多相關(guān)golang 延遲隊(duì)列(delay queue)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang使用sort接口實(shí)現(xiàn)排序示例
這篇文章主要介紹了golang使用sort接口實(shí)現(xiàn)排序的方法,簡(jiǎn)單分析了sort接口的功能并實(shí)例演示了基于sort接口的排序?qū)崿F(xiàn)方法,需要的朋友可以參考下2016-07-07go mutex互斥鎖使用Lock和Unlock方法占有釋放資源
Go號(hào)稱(chēng)是為了高并發(fā)而生的,在高并發(fā)場(chǎng)景下,勢(shì)必會(huì)涉及到對(duì)公共資源的競(jìng)爭(zhēng),當(dāng)對(duì)應(yīng)場(chǎng)景發(fā)生時(shí),我們經(jīng)常會(huì)使用 mutex 的 Lock() 和 Unlock() 方法來(lái)占有或釋放資源,雖然調(diào)用簡(jiǎn)單,但 mutex 的內(nèi)部卻涉及挺多的,本文來(lái)好好研究一下2023-09-09Sublime Text3安裝Go語(yǔ)言相關(guān)插件gosublime時(shí)搜不到gosublime的解決方法
本文主要介紹了Sublime Text3安裝Go語(yǔ)言相關(guān)插件gosublime時(shí)搜不到gosublime的解決方法,具有一定的參考價(jià)值,感興趣的可以了解一下2022-01-01