探索Golang?Redis實現(xiàn)發(fā)布訂閱功能實例
引言
用11篇文章實現(xiàn)一個可用的Redis服務(wù),姑且叫EasyRedis吧,希望通過文章將Redis掰開撕碎了呈現(xiàn)給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺,歡迎持續(xù)關(guān)注學(xué)習(xí)。
[x] easyredis之TCP服務(wù)
[x] easyredis之網(wǎng)絡(luò)請求序列化協(xié)議(RESP)
[x] easyredis之內(nèi)存數(shù)據(jù)庫
[x] easyredis之過期時間 (時間輪實現(xiàn))
[x] easyredis之持久化 (AOF實現(xiàn))
[x] easyredis之發(fā)布訂閱功能
[ ] easyredis之有序集合(跳表實現(xiàn))
[ ] easyredis之 pipeline 客戶端實現(xiàn)
[ ] easyredis之事務(wù)(原子性/回滾)
[ ] easyredis之連接池
[ ] easyredis之分布式集群存儲
EasyRedis之發(fā)布訂閱
代碼路徑: pubhub/pubhub.go
這個代碼很簡單,總共就200行
發(fā)布訂閱的基本原理:客戶端A/B/C訂閱通道,客戶端D往通道中發(fā)送消息后,客戶端A/B/C可以接收到通道中的消息
效果演示:
底層實現(xiàn)的數(shù)據(jù)結(jié)構(gòu)采用map + list
,map中的key
表示channel,value則用list來存儲同一個channel下的多個客戶端clientN
type Pubhub struct { // 自定義實現(xiàn)的map dataDict dict.ConcurrentDict // 該鎖的顆粒度太大 //locker sync.RWMutex locker *locker.Locker // 自定義一個分布鎖 }
dataDict
就是我們自己實現(xiàn)的map
locker
用來對操作同一個鏈表的不同客戶端加鎖,避免并發(fā)問題
訂閱Subscribe
獲取客戶端發(fā)送來的通道名
加鎖(鎖的原理看文章最后)
遍歷通道,獲取該通道下的客戶端鏈表
將當(dāng)前的客戶端加入到鏈表中即可(前提:沒有訂閱過)
// SUBSCRIBE channel [channel ...] func (p *Pubhub) Subscribe(c abstract.Connection, args [][]byte) protocol.Reply { iflen(args) < 1 { return protocol.NewArgNumErrReply("subscribe") } // 通道名 keys := make([]string, 0, len(args)) for _, arg := range args { keys = append(keys, string(arg)) } // 加鎖 p.locker.Locks(keys...) defer p.locker.Unlocks(keys...) for _, arg := range args { chanName := string(arg) // 記錄當(dāng)前客戶端連接訂閱的通道 c.Subscribe(chanName) // 雙向鏈表,記錄通道下的客戶端連接 var l *list.LinkedList raw, exist := p.dataDict.Get(chanName) if !exist { // 說明該channel第一次使用 l = list.NewLinkedList() p.dataDict.Put(chanName, l) } else { l, _ = raw.(*list.LinkedList) } // 未訂閱 if !l.Contain(func(actual interface{}) bool { return c == actual }) { // 如果不重復(fù),那就記錄訂閱 logger.Debug("subscribe channel [" + chanName + "] success") l.Add(c) } // 回復(fù)客戶端消息 _, err := c.Write(channelMsg(_subscribe, chanName, c.SubCount())) if err != nil { logger.Warn(err) } } return protocol.NewNoReply() }
取消訂閱 Unsubscribe
獲取通道名(如果沒有指定,就是取消當(dāng)前客戶端的所有通道)
加鎖(鎖的原理看文章最后)
獲取該通道下的客戶端鏈表
從鏈表中刪除當(dāng)前的客戶端
// 取消訂閱 // unsubscribes itself from all the channels using the UNSUBSCRIBE command without additional arguments func (p *Pubhub) Unsubscribe(c abstract.Connection, args [][]byte) protocol.Reply { var channels []string iflen(args) < 1 { // 取消全部 channels = c.GetChannels() } else { // 取消指定channel channels = make([]string, len(args)) for i, v := range args { channels[i] = string(v) } } p.locker.Locks(channels...) defer p.locker.Unlocks(channels...) // 說明已經(jīng)沒有訂閱的通道 iflen(channels) == 0 { c.Write(noChannelMsg()) } for _, channel := range channels { // 從客戶端中刪除當(dāng)前通道 c.Unsubscribe(channel) // 獲取鏈表 raw, ok := p.dataDict.Get(channel) if ok { // 從鏈表中刪除當(dāng)前客戶端 l, _ := raw.(*list.LinkedList) l.DelAllByVal(func(actual interface{}) bool { return c == actual }) // 如果鏈表為空,清理map if l.Len() == 0 { p.dataDict.Delete(channel) } } c.Write(channelMsg(_unsubscribe, channel, c.SubCount())) } return protocol.NewNoReply() }
發(fā)布 publish
獲取客戶端的channel
從map將channel作為key得到客戶端鏈表
對鏈表的所有客戶端發(fā)送數(shù)據(jù)即可
func (p *Pubhub) Publish(self abstract.Connection, args [][]byte) protocol.Reply { iflen(args) != 2 { return protocol.NewArgNumErrReply("publish") } channelName := string(args[0]) // 加鎖 p.locker.Locks(channelName) defer p.locker.Unlocks(channelName) raw, ok := p.dataDict.Get(channelName) if ok { var sendSuccess int64 var failedClient = make(map[interface{}]struct{}) // 取出鏈表 l, _ := raw.(*list.LinkedList) // 遍歷鏈表 l.ForEach(func(i int, val interface{}) bool { conn, _ := val.(abstract.Connection) if conn.IsClosed() { failedClient[val] = struct{}{} returntrue } if val == self { //不給自己發(fā)送 returntrue } // 發(fā)送數(shù)據(jù) conn.Write(publisMsg(channelName, string(args[1]))) sendSuccess++ returntrue }) // 剔除客戶端 iflen(failedClient) > 0 { removed := l.DelAllByVal(func(actual interface{}) bool { _, ok := failedClient[actual] return ok }) logger.Debugf("del %d closed client", removed) } // 返回發(fā)送的客戶端數(shù)量 return protocol.NewIntegerReply(sendSuccess) } // 如果channel不存在 return protocol.NewIntegerReply(0) }
鎖的原理
代碼路徑 tool/locker/locker.go
type Pubhub struct { // 自定義實現(xiàn)的map dataDict dict.ConcurrentDict // 該鎖的顆粒度太大 //locker sync.RWMutex locker *locker.Locker // 自定義一個分布鎖 }
在結(jié)構(gòu)體中,當(dāng)有【多個客戶端同時訂閱不同的通道】,通過通道名,可以獲取到不同的客戶端鏈表,也就是不同的客戶端操作不同的鏈表可以并行操作(只有操作同一個鏈表才是互斥),如果我們使用 locker sync.RWMutex
鎖,那就是所有的客戶端持有同一把鎖,一個客戶端只有操作完成一個鏈表,才能允許另一個客戶端操作另外一個鏈表,整個操作只能是串行的。所以我們需要實現(xiàn)一個顆粒度更小的鎖
通過不同的通道名,加不同的鎖即可(盡可能的減小鎖的粒度),同時為了避免死鎖,并行的協(xié)程加鎖的順序要一致。所以代碼中有個排序。
這里做了一個技巧,通過hash將通道名映射成不同的hash值,再通過取余,將鎖固定在一個范圍內(nèi)(將無限多的channel名 轉(zhuǎn)成 有限范圍的值),所以可能存在不同的通道名取余的結(jié)果,用的同一個鎖
type Locker struct { mu []*sync.RWMutex mask uint32 } // 順序加鎖(互斥) func (l *Locker) Locks(keys ...string) { indexs := l.toLockIndex(keys...) for _, index := range indexs { mu := l.mu[index] mu.Lock() } } // 順序解鎖(互斥) func (l *Locker) Unlocks(keys ...string) { indexs := l.toLockIndex(keys...) for _, index := range indexs { mu := l.mu[index] mu.Unlock() } } func (l *Locker) toLockIndex(keys ...string) []uint32 { // 將key轉(zhuǎn)成 切片索引[0,mask] mapIndex := make(map[uint32]struct{}) // 去重 for _, key := range keys { mapIndex[l.spread(utils.Fnv32(key))] = struct{}{} } indices := make([]uint32, 0, len(mapIndex)) for k := range mapIndex { indices = append(indices, k) } // 對索引排序 sort.Slice(indices, func(i, j int) bool { return indices[i] < indices[j] }) return indices }
總結(jié)
鎖相關(guān)的代碼很有實踐意義,建議大家自己的手動敲一下,平時工作中作為自己的代碼小組件使用,絕對可以讓人眼前一亮。
項目代碼地址: https://github.com/gofish2020/easyredis
以上就是探索Golang Redis發(fā)布訂閱功能實例的詳細(xì)內(nèi)容,更多關(guān)于Golang Redis發(fā)布訂閱的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go結(jié)構(gòu)體指針引發(fā)的值傳遞思考分析
這篇文章主要為大家介紹了Go結(jié)構(gòu)體指針引發(fā)的值傳遞思考分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12