關(guān)于golang監(jiān)聽rabbitmq消息隊列任務(wù)斷線自動重連接的問題
golang監(jiān)聽消息隊列rabbitmq任務(wù)腳本,當(dāng)rabbimq消息隊列斷開連接后自動重試,重新喚起協(xié)程執(zhí)行任務(wù)
需求背景:
goalng常駐內(nèi)存任務(wù)腳本監(jiān)聽rbmq執(zhí)行任務(wù)
任務(wù)腳本由supervisor來管理
當(dāng)rabbitmq長時間斷開連接會出現(xiàn)如下圖 進程處于fatal狀態(tài)
假如因為不可抗拒因素,rabbitmq服務(wù)器內(nèi)存滿了或者其它原因?qū)е聄abbitmq消息隊列服務(wù)停止了
如果是短時間的停止重啟,supervisor是可以即時喚醒該程序。如果服務(wù)器長時間沒有恢復(fù)正常運行,程序就會出現(xiàn)fatal進程啟動失敗的狀態(tài),此時可以通過告警來提醒開發(fā)人員
如果以上告警能時時通知運維人員此問題可以略過了。今天討論的是如果在長時間斷開連接還能在服務(wù)器恢復(fù)正常情況下自動實現(xiàn)重連。
代碼實現(xiàn)一:
消費者:
package main import ( "fmt" "github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq" ) type RecvPro struct { } //// 實現(xiàn)消費者 消費消息失敗 自動進入延時嘗試 嘗試3次之后入庫db /* 返回值 error 為nil 則表示該消息消費成功 否則消息會進入ttl延時隊列 重復(fù)嘗試消費3次 3次后消息如果還是失敗 消息就執(zhí)行失敗 進入告警 FailAction */ func (t *RecvPro) Consumer(dataByte []byte) error { //time.Sleep(500*time.Microsecond) //return errors.New("頂頂頂頂") fmt.Println(string(dataByte)) //time.Sleep(1*time.Second) return nil //消息已經(jīng)消費3次 失敗了 請進行處理 如果消息 消費3次后 仍然失敗 此處可以根據(jù)情況 對消息進行告警提醒 或者 補償 入庫db 釘釘告警等等 func (t *RecvPro) FailAction(err error,dataByte []byte) error { fmt.Println(err) fmt.Println("任務(wù)處理失敗了,我要進入db日志庫了") fmt.Println("任務(wù)處理失敗了,發(fā)送釘釘消息通知主人") func main() { t := &RecvPro{} //rabbitmq.Recv(rabbitmq.QueueExchange{ // "a_test_0001", // "", // "amqp://guest:guest@192.168.2.232:5672/", //},t,5) /* runNums: 表示任務(wù)并發(fā)處理數(shù)量 一般建議 普通任務(wù)1-3 就可以了 */ err := rabbitmq.Recv(rabbitmq.QueueExchange{ "a_test_0001", "hello_go", "direct", "amqp://guest:guest@192.168.1.169:5672/", },t,4) if(err != nil){ fmt.Println(err) }
rabbitmq代碼
package rabbitmq import ( "errors" "strconv" "time" //"errors" "fmt" "github.com/streadway/amqp" "log" ) // 定義全局變量,指針類型 var mqConn *amqp.Connection var mqChan *amqp.Channel // 定義生產(chǎn)者接口 type Producer interface { MsgContent() string } type RetryProducer interface { // 定義接收者接口 type Receiver interface { Consumer([]byte) error FailAction(error , []byte) error // 定義RabbitMQ對象 type RabbitMQ struct { connection *amqp.Connection Channel *amqp.Channel dns string QueueName string // 隊列名稱 RoutingKey string // key名稱 ExchangeName string // 交換機名稱 ExchangeType string // 交換機類型 producerList []Producer retryProducerList []RetryProducer receiverList []Receiver // 定義隊列交換機對象 type QueueExchange struct { QuName string // 隊列名稱 RtKey string // key值 ExName string // 交換機名稱 ExType string // 交換機類型 Dns string //鏈接地址 // 鏈接rabbitMQ func (r *RabbitMQ)MqConnect() (err error){ mqConn, err = amqp.Dial(r.dns) r.connection = mqConn // 賦值給RabbitMQ對象 if err != nil { fmt.Printf("rbmq鏈接失敗 :%s \n", err) } return // 關(guān)閉mq鏈接 func (r *RabbitMQ)CloseMqConnect() (err error){ err = r.connection.Close() if err != nil{ fmt.Printf("關(guān)閉mq鏈接失敗 :%s \n", err) func (r *RabbitMQ)MqOpenChannel() (err error){ mqConn := r.connection r.Channel, err = mqConn.Channel() //defer mqChan.Close() fmt.Printf("MQ打開管道失敗:%s \n", err) return err func (r *RabbitMQ)CloseMqChannel() (err error){ r.Channel.Close() // 創(chuàng)建一個新的操作對象 func NewMq(q QueueExchange) RabbitMQ { return RabbitMQ{ QueueName:q.QuName, RoutingKey:q.RtKey, ExchangeName: q.ExName, ExchangeType: q.ExType, dns:q.Dns, func (mq *RabbitMQ) sendMsg (body string) (err error) { err = mq.MqOpenChannel() ch := mq.Channel log.Printf("Channel err :%s \n", err) defer mq.Channel.Close() if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { log.Printf("ExchangeDeclare err :%s \n", err) // 用于檢查隊列是否存在,已經(jīng)存在不需要重復(fù)聲明 _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil) log.Printf("QueueDeclare err :%s \n", err) // 綁定任務(wù) if mq.RoutingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil) log.Printf("QueueBind err :%s \n", err) if mq.ExchangeName != "" && mq.RoutingKey != ""{ err = mq.Channel.Publish( mq.ExchangeName, // exchange mq.RoutingKey, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), }) }else{ "", // exchange mq.QueueName, // routing key /* 發(fā)送延時消息 */ func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){ err =mq.MqOpenChannel() return if ttl <= 0{ return errors.New("發(fā)送延時消息,ttl參數(shù)是必須的") table := make(map[string]interface{},3) table["x-dead-letter-routing-key"] = mq.RoutingKey table["x-dead-letter-exchange"] = mq.ExchangeName table["x-message-ttl"] = ttl*1000 //fmt.Printf("%+v",table) //fmt.Printf("%+v",mq) ttlstring := strconv.FormatInt(ttl,10) queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring) routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring) _, err = ch.QueueDeclare(queueName, true, false, false, false, table) return if routingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil) header := make(map[string]interface{},1) header["retry_nums"] = 0 var ttl_exchange string var ttl_routkey string if(mq.ExchangeName != "" ){ ttl_exchange = mq.ExchangeName ttl_exchange = "" if mq.RoutingKey != "" && mq.ExchangeName != ""{ ttl_routkey = routingKey ttl_routkey = queueName err = mq.Channel.Publish( ttl_exchange, // exchange ttl_routkey, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), Headers:header, }) func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string) { err :=mq.MqOpenChannel() //原始路由key oldRoutingKey := args[0] //原始交換機名 oldExchangeName := args[1] table["x-dead-letter-routing-key"] = oldRoutingKey if oldExchangeName != "" { table["x-dead-letter-exchange"] = oldExchangeName mq.ExchangeName = "" table["x-dead-letter-exchange"] = "" table["x-message-ttl"] = int64(20000) _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table) header["retry_nums"] = retry_nums + int32(1) ttl_routkey = mq.RoutingKey ttl_routkey = mq.QueueName //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey) fmt.Printf("MQ任務(wù)發(fā)送失敗:%s \n", err) // 監(jiān)聽接收者接收任務(wù) 消費者 func (mq *RabbitMQ) ListenReceiver(receiver Receiver) { // 獲取消費通道,確保rabbitMQ一個一個發(fā)送消息 err = ch.Qos(1, 0, false) msgList, err := ch.Consume(mq.QueueName, "", false, false, false, false, nil) log.Printf("Consume err :%s \n", err) for msg := range msgList { retry_nums,ok := msg.Headers["retry_nums"].(int32) if(!ok){ retry_nums = int32(0) // 處理數(shù)據(jù) err := receiver.Consumer(msg.Body) if err!=nil { //消息處理失敗 進入延時嘗試機制 if retry_nums < 3{ fmt.Println(string(msg.Body)) fmt.Printf("消息處理失敗 消息開始進入嘗試 ttl延時隊列 \n") retry_msg(msg.Body,retry_nums,QueueExchange{ mq.QueueName, mq.RoutingKey, mq.ExchangeName, mq.ExchangeType, mq.dns, }) }else{ //消息失敗 入庫db fmt.Printf("消息處理3次后還是失敗了 入庫db 釘釘告警 \n") receiver.FailAction(err,msg.Body) } err = msg.Ack(true) if err != nil { fmt.Printf("確認消息未完成異常:%s \n", err) }else { // 確認消息,必須為false fmt.Printf("消息消費ack失敗 err :%s \n", err) //消息處理失敗之后 延時嘗試 func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){ //原始隊列名稱 交換機名稱 oldQName := queueExchange.QuName oldExchangeName := queueExchange.ExName oldRoutingKey := queueExchange.RtKey if oldRoutingKey == "" || oldExchangeName == ""{ oldRoutingKey = oldQName if queueExchange.QuName != "" { queueExchange.QuName = queueExchange.QuName + "_retry_3"; if queueExchange.RtKey != "" { queueExchange.RtKey = queueExchange.RtKey + "_retry_3"; queueExchange.RtKey = queueExchange.QuName + "_retry_3"; //fmt.Printf("%+v",queueExchange) mq := NewMq(queueExchange) _ = mq.MqConnect() defer func(){ _ = mq.CloseMqConnect() }() //fmt.Printf("%+v",queueExchange) mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName) func Send(queueExchange QueueExchange,msg string) (err error){ err = mq.MqConnect() mq.CloseMqConnect() err = mq.sendMsg(msg) //發(fā)送延時消息 func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){ err = mq.sendDelayMsg(msg,ttl) runNums 開啟并發(fā)執(zhí)行任務(wù)數(shù)量 func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){ //鏈接rabbitMQ if(err != nil){ //rbmq斷開鏈接后 協(xié)程退出釋放信號 taskQuit:= make(chan struct{}, 1) //嘗試鏈接rbmq tryToLinkC := make(chan struct{}, 1) //開始執(zhí)行任務(wù) for i:=1;i<=runNums;i++{ go Recv2(mq,receiver,taskQuit); //如果rbmq斷開連接后 嘗試重新建立鏈接 var tryToLink = func() { for { err = mq.MqConnect() if(err == nil){ tryToLinkC <- struct{}{} break time.Sleep(time.Second * 10) for{ select { case <- taskQuit ://rbmq斷開連接后 開始嘗試重新建立鏈接 go tryToLink() <-tryToLinkC //建立鏈接成功后 重新開啟協(xié)程執(zhí)行任務(wù) fmt.Println("重新開啟新的協(xié)程執(zhí)行任務(wù)") go Recv2(mq,receiver,taskQuit); time.Sleep(time.Millisecond*100) func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){ defer func() { fmt.Println("rbmq鏈接失敗,協(xié)程任務(wù)退出~~~~~~~~~~~~~~~~~~~~") taskQuit <- struct{}{} }() // 驗證鏈接是否正常 err := mq.MqOpenChannel() if(err != nil){ mq.ListenReceiver(receiver) type retryPro struct { msgContent string
實現(xiàn)重連方式很多,下面實現(xiàn)方式比較簡單
1.Recv方法創(chuàng)建ampq鏈接
2.啟動協(xié)程開始執(zhí)行任務(wù)
MqOpenChannel 打開一個channel通道處理amqp消息
拿到消息 處理任務(wù)
3,協(xié)程中捕獲異常發(fā)送消息到taskQuit <- struct{}{}
4,主進程監(jiān)聽taskQuit管道 開始嘗試重新鏈接amqp 直到鏈接成功
5,重新鏈接成功后啟動新的協(xié)程處理任務(wù)
主要代碼分析:
/* runNums 開啟并發(fā)執(zhí)行任務(wù)數(shù)量 */ func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){ mq := NewMq(queueExchange) //鏈接rabbitMQ err = mq.MqConnect() if(err != nil){ return } //rbmq斷開鏈接后 協(xié)程退出釋放信號 taskQuit:= make(chan struct{}, 1) //嘗試鏈接rbmq tryToLinkC := make(chan struct{}, 1) //開始執(zhí)行任務(wù) for i:=1;i<=runNums;i++{ go Recv2(mq,receiver,taskQuit); //如果rbmq斷開連接后 嘗試重新建立鏈接 var tryToLink = func() { for { err = mq.MqConnect() if(err == nil){ tryToLinkC <- struct{}{} break } time.Sleep(time.Second * 10) } for{ select { case <- taskQuit ://rbmq斷開連接后 開始嘗試重新建立鏈接 go tryToLink() <-tryToLinkC //建立鏈接成功后 重新開啟協(xié)程執(zhí)行任務(wù) fmt.Println("重新開啟新的協(xié)程執(zhí)行任務(wù)") go Recv2(mq,receiver,taskQuit); time.Sleep(time.Millisecond*100) } func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){ defer func() { fmt.Println("rbmq鏈接失敗,協(xié)程任務(wù)退出~~~~~~~~~~~~~~~~~~~~") taskQuit <- struct{}{} return }() // 驗證鏈接是否正常 err := mq.MqOpenChannel() if(err != nil){ mq.ListenReceiver(receiver)
到此這篇關(guān)于golang監(jiān)聽rabbitmq消息隊列任務(wù)斷線自動重連接的文章就介紹到這了,更多相關(guān)golang rabbitmq斷線自動重連內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
利用golang實現(xiàn)封裝trycatch異常處理實例代碼
Go語言追求簡潔優(yōu)雅,所以go語言不支持傳統(tǒng)的 try…catch…finally 這種異常,最近發(fā)現(xiàn)了不錯的trycatch包,下面這篇文章主要跟大家分享了關(guān)于利用golang實現(xiàn)封裝trycatch異常處理的實例代碼,需要的朋友可以參考下。2017-07-07GOLANG使用Context管理關(guān)聯(lián)goroutine的方法
這篇文章主要介紹了GOLANG使用Context管理關(guān)聯(lián)goroutine的方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-01-01Go gorilla securecookie庫的安裝使用詳解
這篇文章主要介紹了Go gorilla securecookie庫的安裝使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-08-08Golang實現(xiàn)CronJob(定時任務(wù))的方法詳解
這篇文章主要為大家詳細介紹了Golang如何通過一個單 pod 去實現(xiàn)一個常駐服務(wù),去跑定時任務(wù)(CronJob),文中的示例代碼講解詳細,需要的可以參考下2023-04-04GoFrame通用類型變量gvar與interface基本使用對比
這篇文章主要為大家介紹了GoFrame通用類型變量gvar與interface基本使用對比,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-06-06Go項目與Docker結(jié)合實現(xiàn)高效部署深入探究
在現(xiàn)代軟件開發(fā)中,使用Docker部署應(yīng)用程序已經(jīng)成為一種標準實踐,本文將深入探討如何將Go項目與Docker結(jié)合,實現(xiàn)高效、可靠的部署過程,通過詳細的步驟和豐富的示例,你將能夠迅速掌握這一流程2023-12-12