golang redigo發(fā)布訂閱使用的方法
redigo 對 發(fā)布訂閱的使用
redigo 對redis 的發(fā)布訂閱機(jī)制放在pubsub.go 中
訂閱主題后 通過Receive() 函數(shù)接受發(fā)布訂閱主題的消息
// Receive returns a pushed message as a Subscription, Message, Pong or error. // The return value is intended to be used directly in a type switch as // illustrated in the PubSubConn example. func (c PubSubConn) Receive() interface{} { return c.receiveInternal(c.Conn.Receive()) }
返回的是一個(gè)空類型的interface{} , 由于空接口沒有方法, 因此所有的類型都實(shí)現(xiàn)了空接口, 也就是說可以返回任意類型。
具體返回的類型 在receiveInternal() 方法里面可以看到
func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interface{} { reply, err := Values(replyArg, errArg) if err != nil { return err } var kind string reply, err = Scan(reply, &kind) if err != nil { return err } switch kind { case "message": var m Message if _, err := Scan(reply, &m.Channel, &m.Data); err != nil { return err } return m case "pmessage": var m Message if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil { return err } return m case "subscribe", "psubscribe", "unsubscribe", "punsubscribe": s := Subscription{Kind: kind} if _, err := Scan(reply, &s.Channel, &s.Count); err != nil { return err } return s case "pong": var p Pong if _, err := Scan(reply, &p.Data); err != nil { return err } return p } return errors.New("redigo: unknown pubsub notification") }
目前返回 Message Subscription Pone
訂閱的主題
收到消息之后通過注冊的回調(diào)函數(shù)處理的方式, 所以代碼中多了長map存放回調(diào)函數(shù)
package main import ( //"github.com/go-redis/redis" "fmt" "github.com/labstack/gommon/log" "github.com/gomodule/redigo/redis" "time" //"reflect" "unsafe" ) type SubscribeCallback func (channel, message string) type Subscriber struct { client redis.PubSubConn cbMap map[string]SubscribeCallback } func (c *Subscriber) Connect(ip string, port uint16) { conn, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { log.Error("redis dial failed.") } c.client = redis.PubSubConn{conn} c.cbMap = make(map[string]SubscribeCallback) go func() { for { log.Info("wait...") switch res := c.client.Receive().(type) { case redis.Message: channel := (*string)(unsafe.Pointer(&res.Channel)) message := (*string)(unsafe.Pointer(&res.Data)) c.cbMap[*channel](*channel, *message) case redis.Subscription: fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count) case error: log.Error("error handle...") continue } } }() } func (c *Subscriber) Close() { err := c.client.Close() if err != nil{ log.Error("redis close error.") } } func (c *Subscriber) Subscribe(channel interface{}, cb SubscribeCallback) { err := c.client.Subscribe(channel) if err != nil{ log.Error("redis Subscribe error.") } c.cbMap[channel.(string)] = cb } func TestCallback1(chann, msg string){ log.Info("TestCallback1 channel : ", chann, " message : ", msg) } func TestCallback2(chann, msg string){ log.Info("TestCallback2 channel : ", chann, " message : ", msg) } func TestCallback3(chann, msg string){ log.Info("TestCallback3 channel : ", chann, " message : ", msg) } func main() { log.Info("===========main start============") var sub Subscriber sub.Connect("127.0.0.1", 6397) sub.Subscribe("test_chan1", TestCallback1) sub.Subscribe("test_chan2", TestCallback2) sub.Subscribe("test_chan3", TestCallback3) for{ // 這段代碼的作用就是 阻止線程結(jié)束 time.Sleep(1 * time.Second) } }
運(yùn)行main 文件 然后 看到
在redis 客戶端 執(zhí)行 發(fā)布信息
在控制臺(tái) 看到監(jiān)控結(jié)果 回調(diào)函數(shù) 執(zhí)行的結(jié)果
發(fā)布
上面是訂閱的代碼和 代碼要處理 的回調(diào)函數(shù)
發(fā)布直接使用默認(rèn)的 Conn來Send Publish 就可以
redigo 的管道使用方法設(shè)計(jì)到三個(gè)函數(shù) Do 函數(shù)也是下面這個(gè)函數(shù)的合并
- c.Send()
- c.Flush()
- c.Receive()
解釋:
send() 方法吧命令寫到緩沖區(qū), flush() 把緩沖區(qū)的命令刷新到redis 服務(wù)器 receive() 函數(shù)接受redis 給予的 回應(yīng), 三個(gè)操作共同完成一套命令流程。
代碼
package main import( //"github.com/go-redis/redis" "github.com/gomodule/redigo/redis" log "github.com/astaxie/beego/logs" ) func main() { client, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { log.Critical("redis dial failed.") } defer client.Close() _, err = client.Do("Publish", "test_chan1", "hello") if err != nil { log.Critical("redis Publish failed.") } _, err = client.Do("Publish", "test_chan2", "hello") if err != nil { log.Critical("redis Publish failed.") } _, err = client.Do("Publish", "test_chan3", "hello") if err != nil { log.Critical("redis Publish failed.") } }
到此這篇關(guān)于golang redigo發(fā)布訂閱使用的方法的文章就介紹到這了,更多相關(guān)golang redigo發(fā)布訂閱內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang中l(wèi)og包自定義輸出日志格式與寫入到文件
這篇文章主要給大家介紹了關(guān)于golang中l(wèi)og包自定義輸出日志格式與寫入到文件的相關(guān)資料,日志輸出在任何項(xiàng)目中都極其重要,是有助于后續(xù)我們排查解決程序BUG,需要的朋友可以參考下2023-06-06Golang實(shí)現(xiàn)簡易的rpc調(diào)用
RPC指(Remote Procedure Call Protocol)遠(yuǎn)程過程調(diào)用協(xié)議。本文將實(shí)現(xiàn)利用Golang進(jìn)行rpc調(diào)用(只實(shí)現(xiàn)一個(gè)rpc框架基本的功能,不對性能做保證),需要的可以參考一下2023-03-03詳解golang中make與new的異同點(diǎn)和用法
這篇文章將給大家介紹了go語言中函數(shù)new與make的使用和區(qū)別,關(guān)于go語言中new和make是內(nèi)建的兩個(gè)函數(shù),主要用來創(chuàng)建分配類型內(nèi)存,文中通過代碼示例介紹的非常詳細(xì),具有一定的參考價(jià)值,需要的朋友可以參考下2024-01-01Go語言實(shí)現(xiàn)的可讀性更高的并發(fā)神庫詳解
這篇文章主要為大家介紹了Go語言實(shí)現(xiàn)的可讀性更高的并發(fā)神庫詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01一文帶你搞懂Golang依賴注入的設(shè)計(jì)與實(shí)現(xiàn)
在現(xiàn)代的 web 框架里面,基本都有實(shí)現(xiàn)了依賴注入的功能,可以讓我們很方便地對應(yīng)用的依賴進(jìn)行管理。今天我們來看看 go 里面實(shí)現(xiàn)依賴注入的一種方式,感興趣的可以了解一下2023-01-01Ubuntu下安裝Go語言開發(fā)環(huán)境及編輯器的相關(guān)配置
這篇文章主要介紹了Ubuntu下安裝Go語言開發(fā)環(huán)境及編輯器的相關(guān)配置,編輯器方面介紹了包括Vim和Eclipse,需要的朋友可以參考下2016-02-02gin正確多次讀取http?request?body內(nèi)容實(shí)現(xiàn)詳解
這篇文章主要為大家介紹了gin正確多次讀取http?request?body內(nèi)容實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01