golang實(shí)現(xiàn)redis的延時(shí)消息隊(duì)列功能示例
前言
在學(xué)習(xí)過(guò)程中發(fā)現(xiàn)redis的zset還可以用來(lái)實(shí)現(xiàn)輕量級(jí)的延時(shí)消息隊(duì)列功能,雖然可靠性還有待提高,但是對(duì)于一些對(duì)數(shù)據(jù)可靠性要求不那么高的功能要求完全可以實(shí)現(xiàn)。本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel來(lái)實(shí)現(xiàn)一個(gè)小demo。
提前準(zhǔn)備 安裝redis, redis-go
因?yàn)橛玫氖莔acOS, 直接
$ brew install redis $ go get github.com/garyburd/redigo/redis
又因?yàn)楸容^懶,生成任務(wù)的唯一id時(shí),直接采用了bson中的objectId,所以:
$ go get gopkg.in/mgo.v2/bson
唯一id不是必須有,但如果之后有實(shí)際應(yīng)用需要攜帶,便于查找相應(yīng)任務(wù)。
生產(chǎn)者
通過(guò)一個(gè)for循環(huán)生成10w個(gè)任務(wù), 每一個(gè)任務(wù)有不同的時(shí)間
func producer() { count := 0 //生成100000個(gè)任務(wù) for count < 100000 { count++ dealTime := int64(rand.Intn(5)) + time.Now().Unix() uuid := bson.NewObjectId().Hex() redis.Client.AddJob(&job.JobMessage{ Id: uuid, DealTime: dealTime, }, + int64(dealTime)) } }
其中AddJob函數(shù)在另一個(gè)包中, 將上一個(gè)函數(shù)中隨機(jī)生成的時(shí)間作為需要處理的時(shí)間戳.
// 添加任務(wù) func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) { conn := client.Get() defer conn.Close() key := "JOB_MESSAGE_QUEUE" conn.Do("zadd", key, dealTime, util.JsonEncode(msg)) }
消費(fèi)者
消費(fèi)者處理流程分為兩個(gè)步驟:
- 獲取小于等于當(dāng)前時(shí)間戳的任務(wù)
- 通過(guò)刪除當(dāng)前任務(wù)來(lái)判斷誰(shuí)獲得了當(dāng)前任務(wù)
因?yàn)樵讷@取小于等于當(dāng)前時(shí)間戳的任務(wù)時(shí),可能有多個(gè)go routine同時(shí)讀到了當(dāng)前任務(wù),而只有一個(gè)任務(wù)可以來(lái)處理當(dāng)前任務(wù)。因此我們需要通過(guò)一個(gè)方案來(lái)判斷究竟由誰(shuí)來(lái)處理這個(gè)任務(wù)(當(dāng)然如果只有一個(gè)消費(fèi)者可以讀到就直接處理):這個(gè)時(shí)候可以通過(guò)redis的刪除操作來(lái)獲取,因?yàn)閯h除指定value時(shí)只有成功的操作才會(huì)返回不為0,所以我們可以認(rèn)為刪除當(dāng)前隊(duì)列成功的那個(gè)go routine拿到了當(dāng)前的任務(wù)。
下面是代碼:
// 消費(fèi)者 func consumer() { // 啟動(dòng)10個(gè)go routine一起去拿 count := 0 for count < 10 { go func() { for { jobs := redis.Client.GetJob() if len(jobs) <= 0 { time.Sleep(time.Second * 1) continue } currentJob := jobs[0] // 如果當(dāng)前搶redis隊(duì)列成功, if redis.Client.DelJob(currentJob) > 0 { var jobMessage job.JobMessage util.JsonDecode(currentJob, &jobMessage) //自定義的json解析函數(shù) handleMessage(&jobMessage) } } }() count++ } } // 處理任務(wù)用函數(shù) func handleMessage(msg *job.JobMessage) { fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime) go func() { countChan <- true }() }
redis部分的代碼,獲取任務(wù)和刪除任務(wù)
// 獲取任務(wù) func (client *RedisClient) GetJob() []string { conn := client.Get() defer conn.Close() key := "JOB_MESSAGE_QUEUE" timeNow := time.Now().Unix() ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1)) if err != nil { panic(err) } return ret } // 刪除當(dāng)前任務(wù), 用來(lái)判斷是否搶到了當(dāng)前任務(wù) func (client *RedisClient) DelJob(value string) int { conn := client.Get() defer conn.Close() key := "JOB_MESSAGE_QUEUE" ret, err := redis.Int(conn.Do("zrem", key, value)) if err != nil { panic(err) } return ret }
代碼大抵如此。最后跑起來(lái)之后,大概每3-4秒鐘能夠處理掉1w個(gè)任務(wù),速度上確實(shí)是...
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Go語(yǔ)言實(shí)現(xiàn)JSON解析的神器詳解
php轉(zhuǎn)go是大趨勢(shì),越來(lái)越多公司的php服務(wù)都在用go進(jìn)行重構(gòu),重構(gòu)過(guò)程中,會(huì)發(fā)現(xiàn)php的json解析操作是真的香。本文和大家分享了一個(gè)Go語(yǔ)言實(shí)現(xiàn)JSON解析的神器,希望對(duì)大家有所幫助2023-01-01詳解Go語(yǔ)言如何實(shí)現(xiàn)并發(fā)安全的map
go語(yǔ)言提供的數(shù)據(jù)類(lèi)型中,只有channel是并發(fā)安全的,基礎(chǔ)map并不是并發(fā)安全的,本文為大家整理了三種實(shí)現(xiàn)了并發(fā)安全的map的方案,有需要的可以參考下2023-12-12Go 語(yǔ)言結(jié)構(gòu)實(shí)例分析
在本篇文章里小編給大家整理的是一篇關(guān)于Go 語(yǔ)言結(jié)構(gòu)實(shí)例分析的相關(guān)知識(shí)點(diǎn),有興趣的朋友們可以學(xué)習(xí)下。2021-07-07一文搞懂Golang 時(shí)間和日期相關(guān)函數(shù)
這篇文章主要介紹了Golang 時(shí)間和日期相關(guān)函數(shù),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-12-12go語(yǔ)言Timer計(jì)時(shí)器的用法示例詳解
Go語(yǔ)言的標(biāo)準(zhǔn)庫(kù)里提供兩種類(lèi)型的計(jì)時(shí)器Timer和Ticker。這篇文章通過(guò)實(shí)例代碼給大家介紹go語(yǔ)言Timer計(jì)時(shí)器的用法,代碼簡(jiǎn)單易懂,感興趣的朋友跟隨小編一起看看吧2020-05-05