???????Golang實現(xiàn)RabbitMQ中死信隊列幾種情況
下面這段教程針對是你已經(jīng)有一些基本的MQ的知識,比如說能夠很清楚的理解queue、exchange等概念,如果你還不是很理解,我建議你先訪問官網(wǎng)查看基本的教程。
1、造成死信隊列的主要原因
- 消費者超時未應(yīng)答
- 隊列的容量有限
- 消費者拒絕了的消息
2、操作邏輯圖
3、代碼實戰(zhàn)
其實整體的思路就是分別創(chuàng)建一個normal_exchange、dead_exchange、normal_queue、dead_queue,然后將normal_exchange與normal_queue進行綁定,將dead_exchange與dead_queue進行綁定,這里比較關(guān)鍵的一個點在于說如何將normal_queue與dead_exchange進行綁定,這樣才能將錯誤的消息傳遞過來。下面就是這段代碼的關(guān)鍵。
// 聲明一個normal隊列 _, err = ch.QueueDeclare( constant.NormalQueue, true, false, false, false, amqp.Table{ //"x-message-ttl": 5000, // 指定過期時間 //"x-max-length": 6, // 指定長度。超過這個長度的消息會發(fā)送到dead_exchange中 "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機 "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key })
3.1 針對原因1:消費者超出時間未應(yīng)答
consumer1.go
package day07 import ( ?? ?amqp "github.com/rabbitmq/amqp091-go" ?? ?"log" ?? ?"v1/utils" ) type Constant struct { ?? ?NormalExchange ? string ?? ?DeadExchange ? ? string ?? ?NormalQueue ? ? ?string ?? ?DeadQueue ? ? ? ?string ?? ?NormalRoutingKey string ?? ?DeadRoutingKey ? string } func Consumer1() { ?? ?// 獲取連接 ?? ?ch := utils.GetChannel() ?? ?// 創(chuàng)建一個變量常量 ?? ?constant := Constant{ ?? ??? ?NormalExchange: ? "normal_exchange", ?? ??? ?DeadExchange: ? ? "dead_exchange", ?? ??? ?NormalQueue: ? ? ?"normal_queue", ?? ??? ?DeadQueue: ? ? ? ?"dead_queue", ?? ??? ?NormalRoutingKey: "normal_key", ?? ??? ?DeadRoutingKey: ? "dead_key", ?? ?} ?? ?// 聲明normal交換機 ?? ?err := ch.ExchangeDeclare( ?? ??? ?constant.NormalExchange, ?? ??? ?amqp.ExchangeDirect, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil, ?? ?) ?? ?utils.FailOnError(err, "Failed to declare a normal exchange") ?? ?// 聲明一個dead交換機 ?? ?err = ch.ExchangeDeclare( ?? ??? ?constant.DeadExchange, ?? ??? ?amqp.ExchangeDirect, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil, ?? ?) ?? ?utils.FailOnError(err, "Failed to declare a dead exchange") ?? ?// 聲明一個normal隊列 ?? ?_, err = ch.QueueDeclare( ?? ??? ?constant.NormalQueue, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?amqp.Table{ ?? ??? ??? ?"x-message-ttl": 5000, // 指定過期時間 ?? ??? ??? ?//"x-max-length": ? ? ? ? ? ? ?6, ?? ??? ??? ?"x-dead-letter-exchange": ? ?constant.DeadExchange, ? // 指定死信交換機 ?? ??? ??? ?"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key ?? ??? ?}) ?? ?utils.FailOnError(err, "Failed to declare a normal queue") ?? ?// 聲明一個dead隊列:注意不要給死信隊列設(shè)置消息時間,否者死信隊列里面的信息會再次過期 ?? ?_, err = ch.QueueDeclare( ?? ??? ?constant.DeadQueue, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil) ?? ?utils.FailOnError(err, "Failed to declare a dead queue") ?? ?// 將normal_exchange與normal_queue進行綁定 ?? ?err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil) ?? ?utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue") ?? ?// 將dead_exchange與dead_queue進行綁定 ?? ?err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil) ?? ?utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue") ?? ?// 消費消息 ?? ?msgs, err := ch.Consume(constant.NormalQueue, ?? ??? ?"", ?? ??? ?false, // 這個地方一定要關(guān)閉自動應(yīng)答 ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil) ?? ?utils.FailOnError(err, "Failed to consume in Consumer1") ?? ?var forever chan struct{} ?? ?go func() { ?? ??? ?for d := range msgs { ?? ??? ??? ?if err := d.Reject(false); err != nil { ?? ??? ??? ??? ?utils.FailOnError(err, "Failed to Reject a message") ?? ??? ??? ?} ?? ??? ?} ?? ?}() ?? ?log.Printf(" [*] Waiting for logs. To exit press CTRL+C") ?? ?<-forever }
consumer2.go
package day07 import ( ?? ?amqp "github.com/rabbitmq/amqp091-go" ?? ?"log" ?? ?"v1/utils" ) func Consumer2() { ?? ?// 拿取信道 ?? ?ch := utils.GetChannel() ?? ?// 聲明一個交換機 ?? ?err := ch.ExchangeDeclare( ?? ??? ?"dead_exchange", ?? ??? ?amqp.ExchangeDirect, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil) ?? ?utils.FailOnError(err, "Failed to Declare a exchange") ?? ?// 接收消息的應(yīng)答 ?? ?msgs, err := ch.Consume("dead_queue", ?? ??? ?"", ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil, ?? ?) ?? ?var forever chan struct{} ?? ?go func() { ?? ??? ?for d := range msgs { ?? ??? ??? ?log.Printf("[x] %s", d.Body) ?? ??? ??? ?// 開啟手動應(yīng)答? ?? ??? ??? ?d.Ack(false) ?? ??? ?} ?? ?}() ?? ?log.Printf(" [*] Waiting for logs. To exit press CTRL+C") ?? ?<-forever }
produce.go
package day07 import ( ?? ?"context" ?? ?amqp "github.com/rabbitmq/amqp091-go" ?? ?"strconv" ?? ?"time" ?? ?"v1/utils" ) func Produce() { ?? ?// 獲取信道 ?? ?ch := utils.GetChannel() ?? ?// 聲明一個交換機 ?? ?err := ch.ExchangeDeclare( ?? ??? ?"normal_exchange", ?? ??? ?amqp.ExchangeDirect, ?? ??? ?true, ?? ??? ?false, ?? ??? ?false, ?? ??? ?false, ?? ??? ?nil) ?? ?utils.FailOnError(err, "Failed to declare a exchange") ?? ?ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second) ?? ?defer cancer() ?? ?// 發(fā)送了10條消息 ?? ?for i := 0; i < 10; i++ { ?? ??? ?msg := "Info:" + strconv.Itoa(i) ?? ??? ?ch.PublishWithContext(ctx, ?? ??? ??? ?"normal_exchange", ?? ??? ??? ?"normal_key", ?? ??? ??? ?false, ?? ??? ??? ?false, ?? ??? ??? ?amqp.Publishing{ ?? ??? ??? ??? ?ContentType: "text/plain", ?? ??? ??? ??? ?Body: ? ? ? ?[]byte(msg), ?? ??? ??? ?}) ?? ?} }
3.2 針對原因2:限制一定的長度
只需要改變consumer1.go中的對normal_queue的聲明
// 聲明一個normal隊列 _, err = ch.QueueDeclare( constant.NormalQueue, true, false, false, false, amqp.Table{ //"x-message-ttl": 5000, // 指定過期時間 "x-max-length": 6, "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機 "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key })
3.3 針對原因3:消費者拒絕的消息回到死信隊列中
這里需要完成兩點工作
工作1:需要在consumer1中作出拒絕的操作
go func() { for d := range msgs { if err := d.Reject(false); err != nil { utils.FailOnError(err, "Failed to Reject a message") } } }()
工作2:如果你consume的時候開啟了自動應(yīng)答一定要關(guān)閉
// 消費消息 msgs, err := ch.Consume(constant.NormalQueue, "", false, // 這個地方一定要關(guān)閉自動應(yīng)答 false, false, false, nil)
其他的部分不需要改變,按照問題1中的設(shè)計即可。
到此這篇關(guān)于Golang實現(xiàn)RabbitMQ中死信隊列幾種情況的文章就介紹到這了,更多相關(guān)???????Golang RabbitMQ死信隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang實現(xiàn)http服務(wù)器處理靜態(tài)文件示例
這篇文章主要介紹了golang實現(xiàn)http服務(wù)器處理靜態(tài)文件的方法,涉及Go語言基于http協(xié)議處理文件的相關(guān)技巧,需要的朋友可以參考下2016-07-07