欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

關(guān)于golang監(jiān)聽rabbitmq消息隊列任務(wù)斷線自動重連接的問題

 更新時間:2022年03月03日 15:50:38   作者:孫龍-程序員  
這篇文章主要介紹了golang監(jiān)聽rabbitmq消息隊列任務(wù)斷線自動重連接,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下

golang監(jiān)聽消息隊列rabbitmq任務(wù)腳本,當(dāng)rabbimq消息隊列斷開連接后自動重試,重新喚起協(xié)程執(zhí)行任務(wù)

需求背景:

goalng常駐內(nèi)存任務(wù)腳本監(jiān)聽rbmq執(zhí)行任務(wù)

任務(wù)腳本由supervisor來管理

當(dāng)rabbitmq長時間斷開連接會出現(xiàn)如下圖 進程處于fatal狀態(tài)

假如因為不可抗拒因素,rabbitmq服務(wù)器內(nèi)存滿了或者其它原因?qū)е聄abbitmq消息隊列服務(wù)停止了

如果是短時間的停止重啟,supervisor是可以即時喚醒該程序。如果服務(wù)器長時間沒有恢復(fù)正常運行,程序就會出現(xiàn)fatal進程啟動失敗的狀態(tài),此時可以通過告警來提醒開發(fā)人員

如果以上告警能時時通知運維人員此問題可以略過了。今天討論的是如果在長時間斷開連接還能在服務(wù)器恢復(fù)正常情況下自動實現(xiàn)重連。

代碼實現(xiàn)一:

消費者:

package main
import (
    "fmt"
    "github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
)
type RecvPro struct {
}
//// 實現(xiàn)消費者 消費消息失敗 自動進入延時嘗試  嘗試3次之后入庫db
/*
返回值 error 為nil  則表示該消息消費成功
否則消息會進入ttl延時隊列  重復(fù)嘗試消費3次
3次后消息如果還是失敗 消息就執(zhí)行失敗  進入告警 FailAction
 */
func (t *RecvPro) Consumer(dataByte []byte) error {
    //time.Sleep(500*time.Microsecond)
    //return errors.New("頂頂頂頂")
    fmt.Println(string(dataByte))
    //time.Sleep(1*time.Second)
    return nil
//消息已經(jīng)消費3次 失敗了 請進行處理
如果消息 消費3次后 仍然失敗  此處可以根據(jù)情況 對消息進行告警提醒 或者 補償  入庫db  釘釘告警等等
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
    fmt.Println(err)
    fmt.Println("任務(wù)處理失敗了,我要進入db日志庫了")
    fmt.Println("任務(wù)處理失敗了,發(fā)送釘釘消息通知主人")
func main() {
    t := &RecvPro{}
    //rabbitmq.Recv(rabbitmq.QueueExchange{
    //    "a_test_0001",
    //    "",
    //    "amqp://guest:guest@192.168.2.232:5672/",
    //},t,5)
    /*
        runNums: 表示任務(wù)并發(fā)處理數(shù)量  一般建議 普通任務(wù)1-3    就可以了
     */
    err := rabbitmq.Recv(rabbitmq.QueueExchange{
        "a_test_0001",
        "hello_go",
        "direct",
        "amqp://guest:guest@192.168.1.169:5672/",
    },t,4)
    if(err != nil){
        fmt.Println(err)
    }

rabbitmq代碼

package rabbitmq

import (
    "errors"
    "strconv"
    "time"
    //"errors"
    "fmt"
    "github.com/streadway/amqp"
    "log"
)
// 定義全局變量,指針類型
var mqConn *amqp.Connection
var mqChan *amqp.Channel
// 定義生產(chǎn)者接口
type Producer interface {
    MsgContent() string
}
type RetryProducer interface {
// 定義接收者接口
type Receiver interface {
    Consumer([]byte)    error
    FailAction(error , []byte)  error
// 定義RabbitMQ對象
type RabbitMQ struct {
    connection *amqp.Connection
    Channel *amqp.Channel
    dns string
    QueueName   string            // 隊列名稱
    RoutingKey  string            // key名稱
    ExchangeName string           // 交換機名稱
    ExchangeType string           // 交換機類型
    producerList []Producer
    retryProducerList []RetryProducer
    receiverList []Receiver
// 定義隊列交換機對象
type QueueExchange struct {
    QuName  string           // 隊列名稱
    RtKey   string           // key值
    ExName  string           // 交換機名稱
    ExType  string           // 交換機類型
    Dns     string              //鏈接地址
// 鏈接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){
    mqConn, err = amqp.Dial(r.dns)
    r.connection = mqConn   // 賦值給RabbitMQ對象
    if err != nil {
        fmt.Printf("rbmq鏈接失敗  :%s \n", err)
    }
    return
// 關(guān)閉mq鏈接
func (r *RabbitMQ)CloseMqConnect() (err error){
    err = r.connection.Close()
    if err != nil{
        fmt.Printf("關(guān)閉mq鏈接失敗  :%s \n", err)
func (r *RabbitMQ)MqOpenChannel() (err error){
    mqConn := r.connection
    r.Channel, err = mqConn.Channel()
    //defer mqChan.Close()
        fmt.Printf("MQ打開管道失敗:%s \n", err)
    return err
func (r *RabbitMQ)CloseMqChannel() (err error){
    r.Channel.Close()
// 創(chuàng)建一個新的操作對象
func NewMq(q QueueExchange) RabbitMQ {
    return RabbitMQ{
        QueueName:q.QuName,
        RoutingKey:q.RtKey,
        ExchangeName: q.ExName,
        ExchangeType: q.ExType,
        dns:q.Dns,
func (mq *RabbitMQ) sendMsg (body string) (err error)  {
    err = mq.MqOpenChannel()
    ch := mq.Channel
        log.Printf("Channel err  :%s \n", err)
    defer mq.Channel.Close()
    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s \n", err)
    // 用于檢查隊列是否存在,已經(jīng)存在不需要重復(fù)聲明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
        log.Printf("QueueDeclare err :%s \n", err)
    // 綁定任務(wù)
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
            log.Printf("QueueBind err :%s \n", err)
    if mq.ExchangeName != "" && mq.RoutingKey != ""{
        err = mq.Channel.Publish(
            mq.ExchangeName,     // exchange
            mq.RoutingKey, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
            })
    }else{
            "",     // exchange
            mq.QueueName, // routing key
/*
發(fā)送延時消息
 */
func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){
    err =mq.MqOpenChannel()
            return
    if ttl <= 0{
        return errors.New("發(fā)送延時消息,ttl參數(shù)是必須的")
    table := make(map[string]interface{},3)
    table["x-dead-letter-routing-key"] = mq.RoutingKey
    table["x-dead-letter-exchange"] = mq.ExchangeName
    table["x-message-ttl"] = ttl*1000
    //fmt.Printf("%+v",table)
    //fmt.Printf("%+v",mq)
    ttlstring := strconv.FormatInt(ttl,10)
    queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
    routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
    _, err = ch.QueueDeclare(queueName, true, false, false, false, table)
        return
    if routingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)
    header := make(map[string]interface{},1)
    header["retry_nums"] = 0
    var ttl_exchange string
    var ttl_routkey string
    if(mq.ExchangeName != "" ){
        ttl_exchange = mq.ExchangeName
        ttl_exchange = ""
    if mq.RoutingKey != "" && mq.ExchangeName != ""{
        ttl_routkey = routingKey
        ttl_routkey = queueName
    err = mq.Channel.Publish(
        ttl_exchange,     // exchange
        ttl_routkey, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            ContentType: "text/plain",
            Body:        []byte(body),
            Headers:header,
        })
func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string)  {
    err :=mq.MqOpenChannel()
    //原始路由key
    oldRoutingKey := args[0]
    //原始交換機名
    oldExchangeName := args[1]
    table["x-dead-letter-routing-key"] = oldRoutingKey
    if oldExchangeName != "" {
        table["x-dead-letter-exchange"] = oldExchangeName
        mq.ExchangeName = ""
        table["x-dead-letter-exchange"] = ""
    table["x-message-ttl"] = int64(20000)
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)
    header["retry_nums"] = retry_nums + int32(1)
        ttl_routkey = mq.RoutingKey
        ttl_routkey = mq.QueueName
    //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)
        fmt.Printf("MQ任務(wù)發(fā)送失敗:%s \n", err)
// 監(jiān)聽接收者接收任務(wù) 消費者
func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {
    // 獲取消費通道,確保rabbitMQ一個一個發(fā)送消息
    err =  ch.Qos(1, 0, false)
    msgList, err :=  ch.Consume(mq.QueueName, "", false, false, false, false, nil)
        log.Printf("Consume err :%s \n", err)
    for msg := range msgList {
        retry_nums,ok := msg.Headers["retry_nums"].(int32)
        if(!ok){
            retry_nums = int32(0)
        // 處理數(shù)據(jù)
        err := receiver.Consumer(msg.Body)
        if err!=nil {
            //消息處理失敗 進入延時嘗試機制
            if retry_nums < 3{
                fmt.Println(string(msg.Body))
                fmt.Printf("消息處理失敗 消息開始進入嘗試  ttl延時隊列 \n")
                retry_msg(msg.Body,retry_nums,QueueExchange{
                        mq.QueueName,
                        mq.RoutingKey,
                        mq.ExchangeName,
                        mq.ExchangeType,
                        mq.dns,
                    })
            }else{
                //消息失敗 入庫db
                fmt.Printf("消息處理3次后還是失敗了 入庫db 釘釘告警 \n")
                receiver.FailAction(err,msg.Body)
            }
            err = msg.Ack(true)
            if err != nil {
                fmt.Printf("確認消息未完成異常:%s \n", err)
        }else {
            // 確認消息,必須為false
                fmt.Printf("消息消費ack失敗 err :%s \n", err)
//消息處理失敗之后 延時嘗試
func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){
    //原始隊列名稱 交換機名稱
    oldQName := queueExchange.QuName
    oldExchangeName := queueExchange.ExName
    oldRoutingKey := queueExchange.RtKey
    if oldRoutingKey == "" || oldExchangeName == ""{
        oldRoutingKey = oldQName
    if queueExchange.QuName != "" {
        queueExchange.QuName = queueExchange.QuName + "_retry_3";
    if queueExchange.RtKey != "" {
        queueExchange.RtKey = queueExchange.RtKey + "_retry_3";
        queueExchange.RtKey = queueExchange.QuName + "_retry_3";
//fmt.Printf("%+v",queueExchange)
    mq := NewMq(queueExchange)
    _ = mq.MqConnect()
    defer func(){
        _ = mq.CloseMqConnect()
    }()
    //fmt.Printf("%+v",queueExchange)
    mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)
func Send(queueExchange QueueExchange,msg string) (err error){
    err = mq.MqConnect()
        mq.CloseMqConnect()
    err = mq.sendMsg(msg)
//發(fā)送延時消息
func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){
    err = mq.sendDelayMsg(msg,ttl)
runNums  開啟并發(fā)執(zhí)行任務(wù)數(shù)量
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
    //鏈接rabbitMQ
    if(err != nil){
    //rbmq斷開鏈接后 協(xié)程退出釋放信號
    taskQuit:= make(chan struct{}, 1)
    //嘗試鏈接rbmq
    tryToLinkC := make(chan struct{}, 1)
    //開始執(zhí)行任務(wù)
    for i:=1;i<=runNums;i++{
        go Recv2(mq,receiver,taskQuit);
    //如果rbmq斷開連接后 嘗試重新建立鏈接
    var tryToLink = func() {
        for {
            err = mq.MqConnect()
            if(err == nil){
                tryToLinkC <- struct{}{}
                break
            time.Sleep(time.Second * 10)
    for{
        select {
        case <- taskQuit ://rbmq斷開連接后 開始嘗試重新建立鏈接
             go tryToLink()
            <-tryToLinkC //建立鏈接成功后 重新開啟協(xié)程執(zhí)行任務(wù)
            fmt.Println("重新開啟新的協(xié)程執(zhí)行任務(wù)")
            go Recv2(mq,receiver,taskQuit);
        time.Sleep(time.Millisecond*100)
func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
        defer func() {
            fmt.Println("rbmq鏈接失敗,協(xié)程任務(wù)退出~~~~~~~~~~~~~~~~~~~~")
            taskQuit <- struct{}{}
        }()
        // 驗證鏈接是否正常
        err := mq.MqOpenChannel()
        if(err != nil){
        mq.ListenReceiver(receiver)
type retryPro struct {
    msgContent   string

實現(xiàn)重連方式很多,下面實現(xiàn)方式比較簡單

1.Recv方法創(chuàng)建ampq鏈接

2.啟動協(xié)程開始執(zhí)行任務(wù) 

MqOpenChannel 打開一個channel通道處理amqp消息

拿到消息 處理任務(wù)

  3,協(xié)程中捕獲異常發(fā)送消息到taskQuit <- struct{}{}

  4,主進程監(jiān)聽taskQuit管道 開始嘗試重新鏈接amqp 直到鏈接成功

  5,重新鏈接成功后啟動新的協(xié)程處理任務(wù)

主要代碼分析:

/*
runNums  開啟并發(fā)執(zhí)行任務(wù)數(shù)量
 */
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
    mq := NewMq(queueExchange)
    //鏈接rabbitMQ
    err = mq.MqConnect()
    if(err != nil){
        return
    }
    //rbmq斷開鏈接后 協(xié)程退出釋放信號
    taskQuit:= make(chan struct{}, 1)
    //嘗試鏈接rbmq
    tryToLinkC := make(chan struct{}, 1)
    //開始執(zhí)行任務(wù)
    for i:=1;i<=runNums;i++{
        go Recv2(mq,receiver,taskQuit);

    //如果rbmq斷開連接后 嘗試重新建立鏈接
    var tryToLink = func() {
        for {
            err = mq.MqConnect()
            if(err == nil){
                tryToLinkC <- struct{}{}
                break
            }
            time.Sleep(time.Second * 10)
        }
    for{
        select {
        case <- taskQuit ://rbmq斷開連接后 開始嘗試重新建立鏈接
             go tryToLink()
            <-tryToLinkC //建立鏈接成功后 重新開啟協(xié)程執(zhí)行任務(wù)
            fmt.Println("重新開啟新的協(xié)程執(zhí)行任務(wù)")
            go Recv2(mq,receiver,taskQuit);
        time.Sleep(time.Millisecond*100)
}
func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
        defer func() {
            fmt.Println("rbmq鏈接失敗,協(xié)程任務(wù)退出~~~~~~~~~~~~~~~~~~~~")
            taskQuit <- struct{}{}
            return
        }()
        // 驗證鏈接是否正常
        err := mq.MqOpenChannel()
        if(err != nil){
        mq.ListenReceiver(receiver)

到此這篇關(guān)于golang監(jiān)聽rabbitmq消息隊列任務(wù)斷線自動重連接的文章就介紹到這了,更多相關(guān)golang rabbitmq斷線自動重連內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • golang函數(shù)的返回值實現(xiàn)

    golang函數(shù)的返回值實現(xiàn)

    本文主要介紹了golang函數(shù)的返回值實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • 利用golang實現(xiàn)封裝trycatch異常處理實例代碼

    利用golang實現(xiàn)封裝trycatch異常處理實例代碼

    Go語言追求簡潔優(yōu)雅,所以go語言不支持傳統(tǒng)的 try…catch…finally 這種異常,最近發(fā)現(xiàn)了不錯的trycatch包,下面這篇文章主要跟大家分享了關(guān)于利用golang實現(xiàn)封裝trycatch異常處理的實例代碼,需要的朋友可以參考下。
    2017-07-07
  • Go語言實現(xiàn)本地緩存的策略詳解

    Go語言實現(xiàn)本地緩存的策略詳解

    今天給大家分享的是Go語言本地緩存的一些內(nèi)容,主要是結(jié)合bigcache和fastcache兩個優(yōu)秀的開源代碼庫,總結(jié)一些設(shè)計思路和感悟,文章通過代碼示例介紹的非常詳細,需要的朋友可以參考下
    2023-07-07
  • go語言操作es的實現(xiàn)示例

    go語言操作es的實現(xiàn)示例

    本文主要介紹了go語言操作es的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-04-04
  • GOLANG使用Context管理關(guān)聯(lián)goroutine的方法

    GOLANG使用Context管理關(guān)聯(lián)goroutine的方法

    這篇文章主要介紹了GOLANG使用Context管理關(guān)聯(lián)goroutine的方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-01-01
  • Go gorilla securecookie庫的安裝使用詳解

    Go gorilla securecookie庫的安裝使用詳解

    這篇文章主要介紹了Go gorilla securecookie庫的安裝使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-08-08
  • Golang實現(xiàn)CronJob(定時任務(wù))的方法詳解

    Golang實現(xiàn)CronJob(定時任務(wù))的方法詳解

    這篇文章主要為大家詳細介紹了Golang如何通過一個單 pod 去實現(xiàn)一個常駐服務(wù),去跑定時任務(wù)(CronJob),文中的示例代碼講解詳細,需要的可以參考下
    2023-04-04
  • GoFrame通用類型變量gvar與interface基本使用對比

    GoFrame通用類型變量gvar與interface基本使用對比

    這篇文章主要為大家介紹了GoFrame通用類型變量gvar與interface基本使用對比,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-06-06
  • Go項目與Docker結(jié)合實現(xiàn)高效部署深入探究

    Go項目與Docker結(jié)合實現(xiàn)高效部署深入探究

    在現(xiàn)代軟件開發(fā)中,使用Docker部署應(yīng)用程序已經(jīng)成為一種標準實踐,本文將深入探討如何將Go項目與Docker結(jié)合,實現(xiàn)高效、可靠的部署過程,通過詳細的步驟和豐富的示例,你將能夠迅速掌握這一流程
    2023-12-12
  • Go設(shè)計模式之單例模式圖文詳解

    Go設(shè)計模式之單例模式圖文詳解

    單例模式是一種創(chuàng)建型設(shè)計模式,讓你能夠保證一個類只有一個實例,并提供一個訪問該實例的全局節(jié)點,本文就通過圖文給大家介紹一下Go的單例模式,需要的朋友可以參考下
    2023-07-07

最新評論