???????Golang實(shí)現(xiàn)RabbitMQ中死信隊(duì)列幾種情況
下面這段教程針對(duì)是你已經(jīng)有一些基本的MQ的知識(shí),比如說能夠很清楚的理解queue、exchange等概念,如果你還不是很理解,我建議你先訪問官網(wǎng)查看基本的教程。
1、造成死信隊(duì)列的主要原因
- 消費(fèi)者超時(shí)未應(yīng)答
- 隊(duì)列的容量有限
- 消費(fèi)者拒絕了的消息
2、操作邏輯圖
3、代碼實(shí)戰(zhàn)
其實(shí)整體的思路就是分別創(chuàng)建一個(gè)normal_exchange、dead_exchange、normal_queue、dead_queue,然后將normal_exchange與normal_queue進(jìn)行綁定,將dead_exchange與dead_queue進(jìn)行綁定,這里比較關(guān)鍵的一個(gè)點(diǎn)在于說如何將normal_queue與dead_exchange進(jìn)行綁定,這樣才能將錯(cuò)誤的消息傳遞過來。下面就是這段代碼的關(guān)鍵。
// 聲明一個(gè)normal隊(duì)列 _, err = ch.QueueDeclare( constant.NormalQueue, true, false, false, false, amqp.Table{ //"x-message-ttl": 5000, // 指定過期時(shí)間 //"x-max-length": 6, // 指定長度。超過這個(gè)長度的消息會(huì)發(fā)送到dead_exchange中 "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機(jī) "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key })
3.1 針對(duì)原因1:消費(fèi)者超出時(shí)間未應(yīng)答
consumer1.go
package day07 import ( ?? ?amqp "github.com/rabbitmq/amqp091-go" ?? ?"log" ?? ?"v1/utils" ) type Constant struct { ?? ?NormalExchange ? string ?? ?DeadExchange ? ? string ?? ?NormalQueue ? ? ?string ?? ?DeadQueue ? ? ? ?string ?? ?NormalRoutingKey string ?? ?DeadRoutingKey ? string } func Consumer1() { ?? ?// 獲取連接 ?? ?ch := utils.GetChannel() ?? ?// 創(chuàng)建一個(gè)變量常量 ?? ?constant := Constant{ ?? ??? ?NormalExchange: ? "normal_exchange", ?? ??? ?DeadExchange: ? ? "dead_exchange", ?? ??? ?NormalQueue: ? ? ?"normal_queue", ?? ??? ?DeadQueue: ? ? ? ?"dead_queue", ?? ??? ?NormalRoutingKey: "normal_key", ?? ??? ?DeadRoutingKey: ? "dead_key", ?? ?} ?? ?// 聲明normal交換機(jī) ?? ?err := ch.ExchangeDeclare( ?? ??? ?constant.NormalExchange, ?? ??? ?amqp.ExchangeDirect, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil, ?? ?) ?? ?utils.FailOnError(err, "Failed to declare a normal exchange") ?? ?// 聲明一個(gè)dead交換機(jī) ?? ?err = ch.ExchangeDeclare( ?? ??? ?constant.DeadExchange, ?? ??? ?amqp.ExchangeDirect, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil, ?? ?) ?? ?utils.FailOnError(err, "Failed to declare a dead exchange") ?? ?// 聲明一個(gè)normal隊(duì)列 ?? ?_, err = ch.QueueDeclare( ?? ??? ?constant.NormalQueue, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?amqp.Table{ ?? ??? ??? ?"x-message-ttl": 5000, // 指定過期時(shí)間 ?? ??? ??? ?//"x-max-length": ? ? ? ? ? ? ?6, ?? ??? ??? ?"x-dead-letter-exchange": ? ?constant.DeadExchange, ? // 指定死信交換機(jī) ?? ??? ??? ?"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key ?? ??? ?}) ?? ?utils.FailOnError(err, "Failed to declare a normal queue") ?? ?// 聲明一個(gè)dead隊(duì)列:注意不要給死信隊(duì)列設(shè)置消息時(shí)間,否者死信隊(duì)列里面的信息會(huì)再次過期 ?? ?_, err = ch.QueueDeclare( ?? ??? ?constant.DeadQueue, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil) ?? ?utils.FailOnError(err, "Failed to declare a dead queue") ?? ?// 將normal_exchange與normal_queue進(jìn)行綁定 ?? ?err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil) ?? ?utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue") ?? ?// 將dead_exchange與dead_queue進(jìn)行綁定 ?? ?err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil) ?? ?utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue") ?? ?// 消費(fèi)消息 ?? ?msgs, err := ch.Consume(constant.NormalQueue, ?? ??? ?"", ?? ??? ?false, // 這個(gè)地方一定要關(guān)閉自動(dòng)應(yīng)答 ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil) ?? ?utils.FailOnError(err, "Failed to consume in Consumer1") ?? ?var forever chan struct{} ?? ?go func() { ?? ??? ?for d := range msgs { ?? ??? ??? ?if err := d.Reject(false); err != nil { ?? ??? ??? ??? ?utils.FailOnError(err, "Failed to Reject a message") ?? ??? ??? ?} ?? ??? ?} ?? ?}() ?? ?log.Printf(" [*] Waiting for logs. To exit press CTRL+C") ?? ?<-forever }
consumer2.go
package day07 import ( ?? ?amqp "github.com/rabbitmq/amqp091-go" ?? ?"log" ?? ?"v1/utils" ) func Consumer2() { ?? ?// 拿取信道 ?? ?ch := utils.GetChannel() ?? ?// 聲明一個(gè)交換機(jī) ?? ?err := ch.ExchangeDeclare( ?? ??? ?"dead_exchange", ?? ??? ?amqp.ExchangeDirect, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil) ?? ?utils.FailOnError(err, "Failed to Declare a exchange") ?? ?// 接收消息的應(yīng)答 ?? ?msgs, err := ch.Consume("dead_queue", ?? ??? ?"", ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil, ?? ?) ?? ?var forever chan struct{} ?? ?go func() { ?? ??? ?for d := range msgs { ?? ??? ??? ?log.Printf("[x] %s", d.Body) ?? ??? ??? ?// 開啟手動(dòng)應(yīng)答? ?? ??? ??? ?d.Ack(false) ?? ??? ?} ?? ?}() ?? ?log.Printf(" [*] Waiting for logs. To exit press CTRL+C") ?? ?<-forever }
produce.go
package day07 import ( ?? ?"context" ?? ?amqp "github.com/rabbitmq/amqp091-go" ?? ?"strconv" ?? ?"time" ?? ?"v1/utils" ) func Produce() { ?? ?// 獲取信道 ?? ?ch := utils.GetChannel() ?? ?// 聲明一個(gè)交換機(jī) ?? ?err := ch.ExchangeDeclare( ?? ??? ?"normal_exchange", ?? ??? ?amqp.ExchangeDirect, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil) ?? ?utils.FailOnError(err, "Failed to declare a exchange") ?? ?ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second) ?? ?defer cancer() ?? ?// 發(fā)送了10條消息 ?? ?for i := 0; i < 10; i++ { ?? ??? ?msg := "Info:" + strconv.Itoa(i) ?? ??? ?ch.PublishWithContext(ctx, ?? ??? ??? ?"normal_exchange", ?? ??? ??? ?"normal_key", ?? ??? ??? ?false, ?? ??? ??? ?false, ?? ??? ??? ?amqp.Publishing{ ?? ??? ??? ??? ?ContentType: "text/plain", ?? ??? ??? ??? ?Body: ? ? ? ?[]byte(msg), ?? ??? ??? ?}) ?? ?} }
3.2 針對(duì)原因2:限制一定的長度
只需要改變consumer1.go中的對(duì)normal_queue的聲明
// 聲明一個(gè)normal隊(duì)列 _, err = ch.QueueDeclare( constant.NormalQueue, true, false, false, false, amqp.Table{ //"x-message-ttl": 5000, // 指定過期時(shí)間 "x-max-length": 6, "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機(jī) "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key })
3.3 針對(duì)原因3:消費(fèi)者拒絕的消息回到死信隊(duì)列中
這里需要完成兩點(diǎn)工作
工作1:需要在consumer1中作出拒絕的操作
go func() { for d := range msgs { if err := d.Reject(false); err != nil { utils.FailOnError(err, "Failed to Reject a message") } } }()
工作2:如果你consume的時(shí)候開啟了自動(dòng)應(yīng)答一定要關(guān)閉
// 消費(fèi)消息 msgs, err := ch.Consume(constant.NormalQueue, "", false, // 這個(gè)地方一定要關(guān)閉自動(dòng)應(yīng)答 false, false, false, nil)
其他的部分不需要改變,按照問題1中的設(shè)計(jì)即可。
到此這篇關(guān)于Golang實(shí)現(xiàn)RabbitMQ中死信隊(duì)列幾種情況的文章就介紹到這了,更多相關(guān)???????Golang RabbitMQ死信隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列
redisson?delayqueue可以使用redis的有序集合結(jié)構(gòu)實(shí)現(xiàn)延時(shí)隊(duì)列,遺憾的是go語言社區(qū)中并無類似的庫。不過問題不大,本文將用Go語言實(shí)現(xiàn)這一功能,需要的可以參考一下2022-06-06golang實(shí)現(xiàn)http服務(wù)器處理靜態(tài)文件示例
這篇文章主要介紹了golang實(shí)現(xiàn)http服務(wù)器處理靜態(tài)文件的方法,涉及Go語言基于http協(xié)議處理文件的相關(guān)技巧,需要的朋友可以參考下2016-07-07go單體日志采集zincsearch方案實(shí)現(xiàn)
這篇文章主要為大家介紹了go單體日志采集zincsearch方案實(shí)現(xiàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07詳解golang中發(fā)送http請(qǐng)求的幾種常見情況
這篇文章主要介紹了詳解golang中發(fā)送http請(qǐng)求的幾種常見情況,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12