golang gin 監(jiān)聽rabbitmq隊列無限消費的案例代碼
golang gin 監(jiān)聽rabbitmq隊列無限消費
連接rabbitmq
package database import ( "github.com/streadway/amqp" "log" "reflect" "yy-data-processing/common/config" ) var RabbitConn *amqp.Connection var RabbitChannel *amqp.Channel func InitRabbitmq() { var err error RabbitConn, err = amqp.Dial(config.Config.RabbitUrl) if err != nil { log.Println("連接RabbitMQ失敗") panic(err) } RabbitChannel, err = RabbitConn.Channel() if err != nil { log.Println("獲取RabbitMQ channel失敗") panic(err) } } // 0表示channel未關(guān)閉,1表示channel已關(guān)閉 func CheckRabbitClosed(ch amqp.Channel) int64 { d := reflect.ValueOf(ch) i := d.FieldByName("closed").Int() return i }
創(chuàng)建生產(chǎn)者
package service import ( "encoding/json" "github.com/streadway/amqp" "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Producer() { // 聲明隊列,沒有則創(chuàng)建 // 隊列名稱、是否持久化、所有消費者與隊列斷開時是否自動刪除隊列、是否獨享(不同連接的channel能否使用該隊列) declare, err := database.RabbitChannel.QueueDeclare(config.Config.HawkSaveQueueName, true, false, false, false, nil) if err != nil { log.Printf("聲明隊列 %v 失敗, error: %v", config.Config.HawkSaveQueueName, err) panic(err) } request := model.Request{} marshal, _ := json.Marshal(request ) // exchange、routing key、mandatory、immediate err = database.RabbitChannel.Publish("", declare.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(marshal), }) if err != nil { log.Printf("生產(chǎn)者發(fā)送消息失敗, error: %v", err) } else { log.Println("生產(chǎn)者發(fā)送消息成功") } }
創(chuàng)建消費者
package service import ( "encoding/json" "log" "os" "strings" "sync" "time" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Consumer() { // 聲明隊列,沒有則創(chuàng)建 // 隊列名稱、是否持久化、所有消費者與隊列斷開時是否自動刪除隊列、是否獨享(不同連接的channel能否使用該隊列) _, err := database.RabbitChannel.QueueDeclare(config.Config.QueueName, true, false, false, false, nil) if err != nil { log.Printf("聲明隊列 %v 失敗, error: %v", config.Config.QueueName, err) panic(err) } // 隊列名稱、consumer、auto-ack、是否獨享 // deliveries是一個管道,有消息到隊列,就會消費,消費者的消息只需要從deliveries這個管道獲取 deliveries, err := database.RabbitChannel.Consume(config.Config.QueueName, "", true, false, false, false, nil) if err != nil { log.Printf("從隊列 %v 獲取數(shù)據(jù)失敗, error: %v", config.Config.QueueName, err) } else { log.Println("從消費隊列獲取任務(wù)成功") } // 阻塞住 for { select { case message := <-deliveries: closed := database.CheckRabbitClosed(*database.RabbitChannel) if closed == 1 { // channel 已關(guān)閉,重連一下 database.InitRabbitmq() } else { msgData := string(message.Body) request := model.Request{} err := json.Unmarshal([]byte(msgData), &request) if err != nil { log.Printf("解析rabbitmq數(shù)據(jù) %v 失敗, error: %v", msgData, err) } else { // TODO... // 處理邏輯 } } } } }
main方法協(xié)程調(diào)用
package main import ( "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/router" "yy-data-processing/service" ) func main() { // 初始化路由 routers := router.InitRouters() // 初始化RabbitMQ database.InitRabbitmq() go service.Producer() go service.Consumer() port := config.Config.Port if err := routers.Run(":" + port); err != nil { log.Printf("啟動服務(wù)失敗: ", err) } }
到此這篇關(guān)于golang gin 監(jiān)聽rabbitmq隊列無限消費的文章就介紹到這了,更多相關(guān)golang監(jiān)聽rabbitmq內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Go多協(xié)程并發(fā)環(huán)境下的錯誤處理
這篇文章主要介紹了詳解Go多協(xié)程并發(fā)環(huán)境下的錯誤處理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08使用Go語言構(gòu)建高效的二叉搜索樹聯(lián)系簿
樹是一種重要的數(shù)據(jù)結(jié)構(gòu),而二叉搜索樹(BST)則是樹的一種常見形式,在本文中,我們將學(xué)習(xí)如何構(gòu)建一個高效的二叉搜索樹聯(lián)系簿,感興趣的可以了解下2024-01-01Go語言數(shù)據(jù)結(jié)構(gòu)之插入排序示例詳解
這篇文章主要為大家介紹了Go語言數(shù)據(jù)結(jié)構(gòu)之插入排序示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08golang的匿名函數(shù)和普通函數(shù)的區(qū)別解析
匿名函數(shù)是不具名的函數(shù),可以在不定義函數(shù)名的情況下直接使用,通常用于函數(shù)內(nèi)部的局部作用域中,這篇文章主要介紹了golang的匿名函數(shù)和普通函數(shù)的區(qū)別,需要的朋友可以參考下2023-03-03golang連接mysql數(shù)據(jù)庫操作使用示例
這篇文章主要為大家介紹了golang連接mysql數(shù)據(jù)庫操作使用示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04golang服務(wù)報錯:?write:?broken?pipe的解決方案
在開發(fā)在線客服系統(tǒng)的時候,看到日志里有一些錯誤信息,下面這篇文章主要給大家介紹了關(guān)于golang服務(wù)報錯:?write:?broken?pipe的解決方案,需要的朋友可以參考下2022-09-09