golang實(shí)現(xiàn)redis的延時(shí)消息隊(duì)列功能示例
前言
在學(xué)習(xí)過程中發(fā)現(xiàn)redis的zset還可以用來實(shí)現(xiàn)輕量級(jí)的延時(shí)消息隊(duì)列功能,雖然可靠性還有待提高,但是對(duì)于一些對(duì)數(shù)據(jù)可靠性要求不那么高的功能要求完全可以實(shí)現(xiàn)。本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel來實(shí)現(xiàn)一個(gè)小demo。
提前準(zhǔn)備 安裝redis, redis-go
因?yàn)橛玫氖莔acOS, 直接
$ brew install redis $ go get github.com/garyburd/redigo/redis
又因?yàn)楸容^懶,生成任務(wù)的唯一id時(shí),直接采用了bson中的objectId,所以:
$ go get gopkg.in/mgo.v2/bson
唯一id不是必須有,但如果之后有實(shí)際應(yīng)用需要攜帶,便于查找相應(yīng)任務(wù)。
生產(chǎn)者
通過一個(gè)for循環(huán)生成10w個(gè)任務(wù), 每一個(gè)任務(wù)有不同的時(shí)間
func producer() {
count := 0
//生成100000個(gè)任務(wù)
for count < 100000 {
count++
dealTime := int64(rand.Intn(5)) + time.Now().Unix()
uuid := bson.NewObjectId().Hex()
redis.Client.AddJob(&job.JobMessage{
Id: uuid,
DealTime: dealTime,
}, + int64(dealTime))
}
}
其中AddJob函數(shù)在另一個(gè)包中, 將上一個(gè)函數(shù)中隨機(jī)生成的時(shí)間作為需要處理的時(shí)間戳.
// 添加任務(wù)
func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
conn.Do("zadd", key, dealTime, util.JsonEncode(msg))
}
消費(fèi)者
消費(fèi)者處理流程分為兩個(gè)步驟:
- 獲取小于等于當(dāng)前時(shí)間戳的任務(wù)
- 通過刪除當(dāng)前任務(wù)來判斷誰獲得了當(dāng)前任務(wù)
因?yàn)樵讷@取小于等于當(dāng)前時(shí)間戳的任務(wù)時(shí),可能有多個(gè)go routine同時(shí)讀到了當(dāng)前任務(wù),而只有一個(gè)任務(wù)可以來處理當(dāng)前任務(wù)。因此我們需要通過一個(gè)方案來判斷究竟由誰來處理這個(gè)任務(wù)(當(dāng)然如果只有一個(gè)消費(fèi)者可以讀到就直接處理):這個(gè)時(shí)候可以通過redis的刪除操作來獲取,因?yàn)閯h除指定value時(shí)只有成功的操作才會(huì)返回不為0,所以我們可以認(rèn)為刪除當(dāng)前隊(duì)列成功的那個(gè)go routine拿到了當(dāng)前的任務(wù)。
下面是代碼:
// 消費(fèi)者
func consumer() {
// 啟動(dòng)10個(gè)go routine一起去拿
count := 0
for count < 10 {
go func() {
for {
jobs := redis.Client.GetJob()
if len(jobs) <= 0 {
time.Sleep(time.Second * 1)
continue
}
currentJob := jobs[0]
// 如果當(dāng)前搶redis隊(duì)列成功,
if redis.Client.DelJob(currentJob) > 0 {
var jobMessage job.JobMessage
util.JsonDecode(currentJob, &jobMessage) //自定義的json解析函數(shù)
handleMessage(&jobMessage)
}
}
}()
count++
}
}
// 處理任務(wù)用函數(shù)
func handleMessage(msg *job.JobMessage) {
fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime)
go func() {
countChan <- true
}()
}
redis部分的代碼,獲取任務(wù)和刪除任務(wù)
// 獲取任務(wù)
func (client *RedisClient) GetJob() []string {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
timeNow := time.Now().Unix()
ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1))
if err != nil {
panic(err)
}
return ret
}
// 刪除當(dāng)前任務(wù), 用來判斷是否搶到了當(dāng)前任務(wù)
func (client *RedisClient) DelJob(value string) int {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
ret, err := redis.Int(conn.Do("zrem", key, value))
if err != nil {
panic(err)
}
return ret
}
代碼大抵如此。最后跑起來之后,大概每3-4秒鐘能夠處理掉1w個(gè)任務(wù),速度上確實(shí)是...
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- Go語言的隊(duì)列和堆棧實(shí)現(xiàn)方法
- 用golang實(shí)現(xiàn)一個(gè)定時(shí)器任務(wù)隊(duì)列實(shí)例
- Django使用Celery異步任務(wù)隊(duì)列的使用
- Golang實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列
- 基于Golang實(shí)現(xiàn)延遲隊(duì)列(DelayQueue)
- Golang微服務(wù)框架Kratos實(shí)現(xiàn)Kafka消息隊(duì)列的方法
- Go高級(jí)特性探究之優(yōu)先級(jí)隊(duì)列詳解
- Go語言隊(duì)列的四種實(shí)現(xiàn)及使用場景
相關(guān)文章
Go語言狀態(tài)機(jī)的實(shí)現(xiàn)
本文主要介紹了Go語言狀態(tài)機(jī)的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03
CentOS 32 bit安裝golang 1.7的步驟詳解
Go是Google開發(fā)的一種編譯型,并發(fā)型,并具有垃圾回收功能的編程語言。在發(fā)布了6個(gè)rc版本之后,Go 1.7終于正式發(fā)布了。本文主要介紹了在CentOS 32 bit安裝golang 1.7的步驟,文中給出了詳細(xì)的步驟,相信對(duì)大家的學(xué)習(xí)和理解具有一定的參考借鑒價(jià)值,下面來一起看看吧。2016-12-12
使用golang獲取linux上文件的訪問/創(chuàng)建/修改時(shí)間
這篇文章主要介紹了使用golang獲取linux上文件的訪問/創(chuàng)建/修改時(shí)間,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-08-08
Golang算法問題之?dāng)?shù)組按指定規(guī)則排序的方法分析
這篇文章主要介紹了Golang算法問題之?dāng)?shù)組按指定規(guī)則排序的方法,結(jié)合實(shí)例形式分析了Go語言數(shù)組排序相關(guān)算法原理與操作技巧,需要的朋友可以參考下2017-02-02

