關(guān)于golang監(jiān)聽rabbitmq消息隊(duì)列任務(wù)斷線自動(dòng)重連接的問題
golang監(jiān)聽消息隊(duì)列rabbitmq任務(wù)腳本,當(dāng)rabbimq消息隊(duì)列斷開連接后自動(dòng)重試,重新喚起協(xié)程執(zhí)行任務(wù)
需求背景:
goalng常駐內(nèi)存任務(wù)腳本監(jiān)聽rbmq執(zhí)行任務(wù)
任務(wù)腳本由supervisor來管理

當(dāng)rabbitmq長(zhǎng)時(shí)間斷開連接會(huì)出現(xiàn)如下圖 進(jìn)程處于fatal狀態(tài)


假如因?yàn)椴豢煽咕芤蛩兀瑀abbitmq服務(wù)器內(nèi)存滿了或者其它原因?qū)е聄abbitmq消息隊(duì)列服務(wù)停止了
如果是短時(shí)間的停止重啟,supervisor是可以即時(shí)喚醒該程序。如果服務(wù)器長(zhǎng)時(shí)間沒有恢復(fù)正常運(yùn)行,程序就會(huì)出現(xiàn)fatal進(jìn)程啟動(dòng)失敗的狀態(tài),此時(shí)可以通過告警來提醒開發(fā)人員

如果以上告警能時(shí)時(shí)通知運(yùn)維人員此問題可以略過了。今天討論的是如果在長(zhǎng)時(shí)間斷開連接還能在服務(wù)器恢復(fù)正常情況下自動(dòng)實(shí)現(xiàn)重連。

代碼實(shí)現(xiàn)一:
消費(fèi)者:
package main
import (
"fmt"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
)
type RecvPro struct {
}
//// 實(shí)現(xiàn)消費(fèi)者 消費(fèi)消息失敗 自動(dòng)進(jìn)入延時(shí)嘗試 嘗試3次之后入庫db
/*
返回值 error 為nil 則表示該消息消費(fèi)成功
否則消息會(huì)進(jìn)入ttl延時(shí)隊(duì)列 重復(fù)嘗試消費(fèi)3次
3次后消息如果還是失敗 消息就執(zhí)行失敗 進(jìn)入告警 FailAction
*/
func (t *RecvPro) Consumer(dataByte []byte) error {
//time.Sleep(500*time.Microsecond)
//return errors.New("頂頂頂頂")
fmt.Println(string(dataByte))
//time.Sleep(1*time.Second)
return nil
//消息已經(jīng)消費(fèi)3次 失敗了 請(qǐng)進(jìn)行處理
如果消息 消費(fèi)3次后 仍然失敗 此處可以根據(jù)情況 對(duì)消息進(jìn)行告警提醒 或者 補(bǔ)償 入庫db 釘釘告警等等
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
fmt.Println(err)
fmt.Println("任務(wù)處理失敗了,我要進(jìn)入db日志庫了")
fmt.Println("任務(wù)處理失敗了,發(fā)送釘釘消息通知主人")
func main() {
t := &RecvPro{}
//rabbitmq.Recv(rabbitmq.QueueExchange{
// "a_test_0001",
// "",
// "amqp://guest:guest@192.168.2.232:5672/",
//},t,5)
/*
runNums: 表示任務(wù)并發(fā)處理數(shù)量 一般建議 普通任務(wù)1-3 就可以了
*/
err := rabbitmq.Recv(rabbitmq.QueueExchange{
"a_test_0001",
"hello_go",
"direct",
"amqp://guest:guest@192.168.1.169:5672/",
},t,4)
if(err != nil){
fmt.Println(err)
}rabbitmq代碼
package rabbitmq
import (
"errors"
"strconv"
"time"
//"errors"
"fmt"
"github.com/streadway/amqp"
"log"
)
// 定義全局變量,指針類型
var mqConn *amqp.Connection
var mqChan *amqp.Channel
// 定義生產(chǎn)者接口
type Producer interface {
MsgContent() string
}
type RetryProducer interface {
// 定義接收者接口
type Receiver interface {
Consumer([]byte) error
FailAction(error , []byte) error
// 定義RabbitMQ對(duì)象
type RabbitMQ struct {
connection *amqp.Connection
Channel *amqp.Channel
dns string
QueueName string // 隊(duì)列名稱
RoutingKey string // key名稱
ExchangeName string // 交換機(jī)名稱
ExchangeType string // 交換機(jī)類型
producerList []Producer
retryProducerList []RetryProducer
receiverList []Receiver
// 定義隊(duì)列交換機(jī)對(duì)象
type QueueExchange struct {
QuName string // 隊(duì)列名稱
RtKey string // key值
ExName string // 交換機(jī)名稱
ExType string // 交換機(jī)類型
Dns string //鏈接地址
// 鏈接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){
mqConn, err = amqp.Dial(r.dns)
r.connection = mqConn // 賦值給RabbitMQ對(duì)象
if err != nil {
fmt.Printf("rbmq鏈接失敗 :%s \n", err)
}
return
// 關(guān)閉mq鏈接
func (r *RabbitMQ)CloseMqConnect() (err error){
err = r.connection.Close()
if err != nil{
fmt.Printf("關(guān)閉mq鏈接失敗 :%s \n", err)
func (r *RabbitMQ)MqOpenChannel() (err error){
mqConn := r.connection
r.Channel, err = mqConn.Channel()
//defer mqChan.Close()
fmt.Printf("MQ打開管道失敗:%s \n", err)
return err
func (r *RabbitMQ)CloseMqChannel() (err error){
r.Channel.Close()
// 創(chuàng)建一個(gè)新的操作對(duì)象
func NewMq(q QueueExchange) RabbitMQ {
return RabbitMQ{
QueueName:q.QuName,
RoutingKey:q.RtKey,
ExchangeName: q.ExName,
ExchangeType: q.ExType,
dns:q.Dns,
func (mq *RabbitMQ) sendMsg (body string) (err error) {
err = mq.MqOpenChannel()
ch := mq.Channel
log.Printf("Channel err :%s \n", err)
defer mq.Channel.Close()
if mq.ExchangeName != "" {
if mq.ExchangeType == ""{
mq.ExchangeType = "direct"
}
err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
if err != nil {
log.Printf("ExchangeDeclare err :%s \n", err)
// 用于檢查隊(duì)列是否存在,已經(jīng)存在不需要重復(fù)聲明
_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
log.Printf("QueueDeclare err :%s \n", err)
// 綁定任務(wù)
if mq.RoutingKey != "" && mq.ExchangeName != "" {
err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
log.Printf("QueueBind err :%s \n", err)
if mq.ExchangeName != "" && mq.RoutingKey != ""{
err = mq.Channel.Publish(
mq.ExchangeName, // exchange
mq.RoutingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
}else{
"", // exchange
mq.QueueName, // routing key
/*
發(fā)送延時(shí)消息
*/
func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){
err =mq.MqOpenChannel()
return
if ttl <= 0{
return errors.New("發(fā)送延時(shí)消息,ttl參數(shù)是必須的")
table := make(map[string]interface{},3)
table["x-dead-letter-routing-key"] = mq.RoutingKey
table["x-dead-letter-exchange"] = mq.ExchangeName
table["x-message-ttl"] = ttl*1000
//fmt.Printf("%+v",table)
//fmt.Printf("%+v",mq)
ttlstring := strconv.FormatInt(ttl,10)
queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
_, err = ch.QueueDeclare(queueName, true, false, false, false, table)
return
if routingKey != "" && mq.ExchangeName != "" {
err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)
header := make(map[string]interface{},1)
header["retry_nums"] = 0
var ttl_exchange string
var ttl_routkey string
if(mq.ExchangeName != "" ){
ttl_exchange = mq.ExchangeName
ttl_exchange = ""
if mq.RoutingKey != "" && mq.ExchangeName != ""{
ttl_routkey = routingKey
ttl_routkey = queueName
err = mq.Channel.Publish(
ttl_exchange, // exchange
ttl_routkey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
Headers:header,
})
func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string) {
err :=mq.MqOpenChannel()
//原始路由key
oldRoutingKey := args[0]
//原始交換機(jī)名
oldExchangeName := args[1]
table["x-dead-letter-routing-key"] = oldRoutingKey
if oldExchangeName != "" {
table["x-dead-letter-exchange"] = oldExchangeName
mq.ExchangeName = ""
table["x-dead-letter-exchange"] = ""
table["x-message-ttl"] = int64(20000)
_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)
header["retry_nums"] = retry_nums + int32(1)
ttl_routkey = mq.RoutingKey
ttl_routkey = mq.QueueName
//fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)
fmt.Printf("MQ任務(wù)發(fā)送失敗:%s \n", err)
// 監(jiān)聽接收者接收任務(wù) 消費(fèi)者
func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {
// 獲取消費(fèi)通道,確保rabbitMQ一個(gè)一個(gè)發(fā)送消息
err = ch.Qos(1, 0, false)
msgList, err := ch.Consume(mq.QueueName, "", false, false, false, false, nil)
log.Printf("Consume err :%s \n", err)
for msg := range msgList {
retry_nums,ok := msg.Headers["retry_nums"].(int32)
if(!ok){
retry_nums = int32(0)
// 處理數(shù)據(jù)
err := receiver.Consumer(msg.Body)
if err!=nil {
//消息處理失敗 進(jìn)入延時(shí)嘗試機(jī)制
if retry_nums < 3{
fmt.Println(string(msg.Body))
fmt.Printf("消息處理失敗 消息開始進(jìn)入嘗試 ttl延時(shí)隊(duì)列 \n")
retry_msg(msg.Body,retry_nums,QueueExchange{
mq.QueueName,
mq.RoutingKey,
mq.ExchangeName,
mq.ExchangeType,
mq.dns,
})
}else{
//消息失敗 入庫db
fmt.Printf("消息處理3次后還是失敗了 入庫db 釘釘告警 \n")
receiver.FailAction(err,msg.Body)
}
err = msg.Ack(true)
if err != nil {
fmt.Printf("確認(rèn)消息未完成異常:%s \n", err)
}else {
// 確認(rèn)消息,必須為false
fmt.Printf("消息消費(fèi)ack失敗 err :%s \n", err)
//消息處理失敗之后 延時(shí)嘗試
func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){
//原始隊(duì)列名稱 交換機(jī)名稱
oldQName := queueExchange.QuName
oldExchangeName := queueExchange.ExName
oldRoutingKey := queueExchange.RtKey
if oldRoutingKey == "" || oldExchangeName == ""{
oldRoutingKey = oldQName
if queueExchange.QuName != "" {
queueExchange.QuName = queueExchange.QuName + "_retry_3";
if queueExchange.RtKey != "" {
queueExchange.RtKey = queueExchange.RtKey + "_retry_3";
queueExchange.RtKey = queueExchange.QuName + "_retry_3";
//fmt.Printf("%+v",queueExchange)
mq := NewMq(queueExchange)
_ = mq.MqConnect()
defer func(){
_ = mq.CloseMqConnect()
}()
//fmt.Printf("%+v",queueExchange)
mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)
func Send(queueExchange QueueExchange,msg string) (err error){
err = mq.MqConnect()
mq.CloseMqConnect()
err = mq.sendMsg(msg)
//發(fā)送延時(shí)消息
func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){
err = mq.sendDelayMsg(msg,ttl)
runNums 開啟并發(fā)執(zhí)行任務(wù)數(shù)量
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
//鏈接rabbitMQ
if(err != nil){
//rbmq斷開鏈接后 協(xié)程退出釋放信號(hào)
taskQuit:= make(chan struct{}, 1)
//嘗試鏈接rbmq
tryToLinkC := make(chan struct{}, 1)
//開始執(zhí)行任務(wù)
for i:=1;i<=runNums;i++{
go Recv2(mq,receiver,taskQuit);
//如果rbmq斷開連接后 嘗試重新建立鏈接
var tryToLink = func() {
for {
err = mq.MqConnect()
if(err == nil){
tryToLinkC <- struct{}{}
break
time.Sleep(time.Second * 10)
for{
select {
case <- taskQuit ://rbmq斷開連接后 開始嘗試重新建立鏈接
go tryToLink()
<-tryToLinkC //建立鏈接成功后 重新開啟協(xié)程執(zhí)行任務(wù)
fmt.Println("重新開啟新的協(xié)程執(zhí)行任務(wù)")
go Recv2(mq,receiver,taskQuit);
time.Sleep(time.Millisecond*100)
func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
defer func() {
fmt.Println("rbmq鏈接失敗,協(xié)程任務(wù)退出~~~~~~~~~~~~~~~~~~~~")
taskQuit <- struct{}{}
}()
// 驗(yàn)證鏈接是否正常
err := mq.MqOpenChannel()
if(err != nil){
mq.ListenReceiver(receiver)
type retryPro struct {
msgContent string實(shí)現(xiàn)重連方式很多,下面實(shí)現(xiàn)方式比較簡(jiǎn)單

1.Recv方法創(chuàng)建ampq鏈接
2.啟動(dòng)協(xié)程開始執(zhí)行任務(wù)
MqOpenChannel 打開一個(gè)channel通道處理amqp消息
拿到消息 處理任務(wù)
3,協(xié)程中捕獲異常發(fā)送消息到taskQuit <- struct{}{}
4,主進(jìn)程監(jiān)聽taskQuit管道 開始嘗試重新鏈接amqp 直到鏈接成功
5,重新鏈接成功后啟動(dòng)新的協(xié)程處理任務(wù)
主要代碼分析:
/*
runNums 開啟并發(fā)執(zhí)行任務(wù)數(shù)量
*/
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
mq := NewMq(queueExchange)
//鏈接rabbitMQ
err = mq.MqConnect()
if(err != nil){
return
}
//rbmq斷開鏈接后 協(xié)程退出釋放信號(hào)
taskQuit:= make(chan struct{}, 1)
//嘗試鏈接rbmq
tryToLinkC := make(chan struct{}, 1)
//開始執(zhí)行任務(wù)
for i:=1;i<=runNums;i++{
go Recv2(mq,receiver,taskQuit);
//如果rbmq斷開連接后 嘗試重新建立鏈接
var tryToLink = func() {
for {
err = mq.MqConnect()
if(err == nil){
tryToLinkC <- struct{}{}
break
}
time.Sleep(time.Second * 10)
}
for{
select {
case <- taskQuit ://rbmq斷開連接后 開始嘗試重新建立鏈接
go tryToLink()
<-tryToLinkC //建立鏈接成功后 重新開啟協(xié)程執(zhí)行任務(wù)
fmt.Println("重新開啟新的協(xié)程執(zhí)行任務(wù)")
go Recv2(mq,receiver,taskQuit);
time.Sleep(time.Millisecond*100)
}
func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
defer func() {
fmt.Println("rbmq鏈接失敗,協(xié)程任務(wù)退出~~~~~~~~~~~~~~~~~~~~")
taskQuit <- struct{}{}
return
}()
// 驗(yàn)證鏈接是否正常
err := mq.MqOpenChannel()
if(err != nil){
mq.ListenReceiver(receiver)到此這篇關(guān)于golang監(jiān)聽rabbitmq消息隊(duì)列任務(wù)斷線自動(dòng)重連接的文章就介紹到這了,更多相關(guān)golang rabbitmq斷線自動(dòng)重連內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang函數(shù)的返回值實(shí)現(xiàn)
本文主要介紹了golang函數(shù)的返回值實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03
利用golang實(shí)現(xiàn)封裝trycatch異常處理實(shí)例代碼
Go語言追求簡(jiǎn)潔優(yōu)雅,所以go語言不支持傳統(tǒng)的 try…catch…finally 這種異常,最近發(fā)現(xiàn)了不錯(cuò)的trycatch包,下面這篇文章主要跟大家分享了關(guān)于利用golang實(shí)現(xiàn)封裝trycatch異常處理的實(shí)例代碼,需要的朋友可以參考下。2017-07-07
GOLANG使用Context管理關(guān)聯(lián)goroutine的方法
這篇文章主要介紹了GOLANG使用Context管理關(guān)聯(lián)goroutine的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-01-01
Go gorilla securecookie庫的安裝使用詳解
這篇文章主要介紹了Go gorilla securecookie庫的安裝使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
Golang實(shí)現(xiàn)CronJob(定時(shí)任務(wù))的方法詳解
這篇文章主要為大家詳細(xì)介紹了Golang如何通過一個(gè)單 pod 去實(shí)現(xiàn)一個(gè)常駐服務(wù),去跑定時(shí)任務(wù)(CronJob),文中的示例代碼講解詳細(xì),需要的可以參考下2023-04-04
GoFrame通用類型變量gvar與interface基本使用對(duì)比
這篇文章主要為大家介紹了GoFrame通用類型變量gvar與interface基本使用對(duì)比,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06
Go項(xiàng)目與Docker結(jié)合實(shí)現(xiàn)高效部署深入探究
在現(xiàn)代軟件開發(fā)中,使用Docker部署應(yīng)用程序已經(jīng)成為一種標(biāo)準(zhǔn)實(shí)踐,本文將深入探討如何將Go項(xiàng)目與Docker結(jié)合,實(shí)現(xiàn)高效、可靠的部署過程,通過詳細(xì)的步驟和豐富的示例,你將能夠迅速掌握這一流程2023-12-12

