golang redigo發(fā)布訂閱使用的方法
redigo 對 發(fā)布訂閱的使用
redigo 對redis 的發(fā)布訂閱機制放在pubsub.go 中
訂閱主題后 通過Receive() 函數(shù)接受發(fā)布訂閱主題的消息
// Receive returns a pushed message as a Subscription, Message, Pong or error.
// The return value is intended to be used directly in a type switch as
// illustrated in the PubSubConn example.
func (c PubSubConn) Receive() interface{} {
return c.receiveInternal(c.Conn.Receive())
}
返回的是一個空類型的interface{} , 由于空接口沒有方法, 因此所有的類型都實現(xiàn)了空接口, 也就是說可以返回任意類型。
具體返回的類型 在receiveInternal() 方法里面可以看到
func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interface{} {
reply, err := Values(replyArg, errArg)
if err != nil {
return err
}
var kind string
reply, err = Scan(reply, &kind)
if err != nil {
return err
}
switch kind {
case "message":
var m Message
if _, err := Scan(reply, &m.Channel, &m.Data); err != nil {
return err
}
return m
case "pmessage":
var m Message
if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil {
return err
}
return m
case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
s := Subscription{Kind: kind}
if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
return err
}
return s
case "pong":
var p Pong
if _, err := Scan(reply, &p.Data); err != nil {
return err
}
return p
}
return errors.New("redigo: unknown pubsub notification")
}
目前返回 Message Subscription Pone
訂閱的主題
收到消息之后通過注冊的回調(diào)函數(shù)處理的方式, 所以代碼中多了長map存放回調(diào)函數(shù)
package main
import (
//"github.com/go-redis/redis"
"fmt"
"github.com/labstack/gommon/log"
"github.com/gomodule/redigo/redis"
"time"
//"reflect"
"unsafe"
)
type SubscribeCallback func (channel, message string)
type Subscriber struct {
client redis.PubSubConn
cbMap map[string]SubscribeCallback
}
func (c *Subscriber) Connect(ip string, port uint16) {
conn, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Error("redis dial failed.")
}
c.client = redis.PubSubConn{conn}
c.cbMap = make(map[string]SubscribeCallback)
go func() {
for {
log.Info("wait...")
switch res := c.client.Receive().(type) {
case redis.Message:
channel := (*string)(unsafe.Pointer(&res.Channel))
message := (*string)(unsafe.Pointer(&res.Data))
c.cbMap[*channel](*channel, *message)
case redis.Subscription:
fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
case error:
log.Error("error handle...")
continue
}
}
}()
}
func (c *Subscriber) Close() {
err := c.client.Close()
if err != nil{
log.Error("redis close error.")
}
}
func (c *Subscriber) Subscribe(channel interface{}, cb SubscribeCallback) {
err := c.client.Subscribe(channel)
if err != nil{
log.Error("redis Subscribe error.")
}
c.cbMap[channel.(string)] = cb
}
func TestCallback1(chann, msg string){
log.Info("TestCallback1 channel : ", chann, " message : ", msg)
}
func TestCallback2(chann, msg string){
log.Info("TestCallback2 channel : ", chann, " message : ", msg)
}
func TestCallback3(chann, msg string){
log.Info("TestCallback3 channel : ", chann, " message : ", msg)
}
func main() {
log.Info("===========main start============")
var sub Subscriber
sub.Connect("127.0.0.1", 6397)
sub.Subscribe("test_chan1", TestCallback1)
sub.Subscribe("test_chan2", TestCallback2)
sub.Subscribe("test_chan3", TestCallback3)
for{ // 這段代碼的作用就是 阻止線程結(jié)束
time.Sleep(1 * time.Second)
}
}
運行main 文件 然后 看到

在redis 客戶端 執(zhí)行 發(fā)布信息

在控制臺 看到監(jiān)控結(jié)果 回調(diào)函數(shù) 執(zhí)行的結(jié)果

發(fā)布
上面是訂閱的代碼和 代碼要處理 的回調(diào)函數(shù)
發(fā)布直接使用默認(rèn)的 Conn來Send Publish 就可以
redigo 的管道使用方法設(shè)計到三個函數(shù) Do 函數(shù)也是下面這個函數(shù)的合并
- c.Send()
- c.Flush()
- c.Receive()
解釋:
send() 方法吧命令寫到緩沖區(qū), flush() 把緩沖區(qū)的命令刷新到redis 服務(wù)器 receive() 函數(shù)接受redis 給予的 回應(yīng), 三個操作共同完成一套命令流程。
代碼
package main
import(
//"github.com/go-redis/redis"
"github.com/gomodule/redigo/redis"
log "github.com/astaxie/beego/logs"
)
func main() {
client, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Critical("redis dial failed.")
}
defer client.Close()
_, err = client.Do("Publish", "test_chan1", "hello")
if err != nil {
log.Critical("redis Publish failed.")
}
_, err = client.Do("Publish", "test_chan2", "hello")
if err != nil {
log.Critical("redis Publish failed.")
}
_, err = client.Do("Publish", "test_chan3", "hello")
if err != nil {
log.Critical("redis Publish failed.")
}
}
到此這篇關(guān)于golang redigo發(fā)布訂閱使用的方法的文章就介紹到這了,更多相關(guān)golang redigo發(fā)布訂閱內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang中l(wèi)og包自定義輸出日志格式與寫入到文件
這篇文章主要給大家介紹了關(guān)于golang中l(wèi)og包自定義輸出日志格式與寫入到文件的相關(guān)資料,日志輸出在任何項目中都極其重要,是有助于后續(xù)我們排查解決程序BUG,需要的朋友可以參考下2023-06-06
一文帶你搞懂Golang依賴注入的設(shè)計與實現(xiàn)
在現(xiàn)代的 web 框架里面,基本都有實現(xiàn)了依賴注入的功能,可以讓我們很方便地對應(yīng)用的依賴進(jìn)行管理。今天我們來看看 go 里面實現(xiàn)依賴注入的一種方式,感興趣的可以了解一下2023-01-01
Ubuntu下安裝Go語言開發(fā)環(huán)境及編輯器的相關(guān)配置
這篇文章主要介紹了Ubuntu下安裝Go語言開發(fā)環(huán)境及編輯器的相關(guān)配置,編輯器方面介紹了包括Vim和Eclipse,需要的朋友可以參考下2016-02-02
gin正確多次讀取http?request?body內(nèi)容實現(xiàn)詳解
這篇文章主要為大家介紹了gin正確多次讀取http?request?body內(nèi)容實現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01

