???????Golang實現(xiàn)RabbitMQ中死信隊列幾種情況
下面這段教程針對是你已經(jīng)有一些基本的MQ的知識,比如說能夠很清楚的理解queue、exchange等概念,如果你還不是很理解,我建議你先訪問官網(wǎng)查看基本的教程。
1、造成死信隊列的主要原因
- 消費者超時未應(yīng)答
- 隊列的容量有限
- 消費者拒絕了的消息
2、操作邏輯圖

3、代碼實戰(zhàn)
其實整體的思路就是分別創(chuàng)建一個normal_exchange、dead_exchange、normal_queue、dead_queue,然后將normal_exchange與normal_queue進(jìn)行綁定,將dead_exchange與dead_queue進(jìn)行綁定,這里比較關(guān)鍵的一個點在于說如何將normal_queue與dead_exchange進(jìn)行綁定,這樣才能將錯誤的消息傳遞過來。下面就是這段代碼的關(guān)鍵。
// 聲明一個normal隊列
_, err = ch.QueueDeclare(
constant.NormalQueue,
true,
false,
false,
false,
amqp.Table{
//"x-message-ttl": 5000, // 指定過期時間
//"x-max-length": 6, // 指定長度。超過這個長度的消息會發(fā)送到dead_exchange中
"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機(jī)
"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
})
3.1 針對原因1:消費者超出時間未應(yīng)答
consumer1.go
package day07
import (
?? ?amqp "github.com/rabbitmq/amqp091-go"
?? ?"log"
?? ?"v1/utils"
)
type Constant struct {
?? ?NormalExchange ? string
?? ?DeadExchange ? ? string
?? ?NormalQueue ? ? ?string
?? ?DeadQueue ? ? ? ?string
?? ?NormalRoutingKey string
?? ?DeadRoutingKey ? string
}
func Consumer1() {
?? ?// 獲取連接
?? ?ch := utils.GetChannel()
?? ?// 創(chuàng)建一個變量常量
?? ?constant := Constant{
?? ??? ?NormalExchange: ? "normal_exchange",
?? ??? ?DeadExchange: ? ? "dead_exchange",
?? ??? ?NormalQueue: ? ? ?"normal_queue",
?? ??? ?DeadQueue: ? ? ? ?"dead_queue",
?? ??? ?NormalRoutingKey: "normal_key",
?? ??? ?DeadRoutingKey: ? "dead_key",
?? ?}
?? ?// 聲明normal交換機(jī)
?? ?err := ch.ExchangeDeclare(
?? ??? ?constant.NormalExchange,
?? ??? ?amqp.ExchangeDirect,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil,
?? ?)
?? ?utils.FailOnError(err, "Failed to declare a normal exchange")
?? ?// 聲明一個dead交換機(jī)
?? ?err = ch.ExchangeDeclare(
?? ??? ?constant.DeadExchange,
?? ??? ?amqp.ExchangeDirect,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil,
?? ?)
?? ?utils.FailOnError(err, "Failed to declare a dead exchange")
?? ?// 聲明一個normal隊列
?? ?_, err = ch.QueueDeclare(
?? ??? ?constant.NormalQueue,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?amqp.Table{
?? ??? ??? ?"x-message-ttl": 5000, // 指定過期時間
?? ??? ??? ?//"x-max-length": ? ? ? ? ? ? ?6,
?? ??? ??? ?"x-dead-letter-exchange": ? ?constant.DeadExchange, ? // 指定死信交換機(jī)
?? ??? ??? ?"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
?? ??? ?})
?? ?utils.FailOnError(err, "Failed to declare a normal queue")
?? ?// 聲明一個dead隊列:注意不要給死信隊列設(shè)置消息時間,否者死信隊列里面的信息會再次過期
?? ?_, err = ch.QueueDeclare(
?? ??? ?constant.DeadQueue,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil)
?? ?utils.FailOnError(err, "Failed to declare a dead queue")
?? ?// 將normal_exchange與normal_queue進(jìn)行綁定
?? ?err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil)
?? ?utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue")
?? ?// 將dead_exchange與dead_queue進(jìn)行綁定
?? ?err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil)
?? ?utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue")
?? ?// 消費消息
?? ?msgs, err := ch.Consume(constant.NormalQueue,
?? ??? ?"",
?? ??? ?false, // 這個地方一定要關(guān)閉自動應(yīng)答
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil)
?? ?utils.FailOnError(err, "Failed to consume in Consumer1")
?? ?var forever chan struct{}
?? ?go func() {
?? ??? ?for d := range msgs {
?? ??? ??? ?if err := d.Reject(false); err != nil {
?? ??? ??? ??? ?utils.FailOnError(err, "Failed to Reject a message")
?? ??? ??? ?}
?? ??? ?}
?? ?}()
?? ?log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
?? ?<-forever
}consumer2.go
package day07
import (
?? ?amqp "github.com/rabbitmq/amqp091-go"
?? ?"log"
?? ?"v1/utils"
)
func Consumer2() {
?? ?// 拿取信道
?? ?ch := utils.GetChannel()
?? ?// 聲明一個交換機(jī)
?? ?err := ch.ExchangeDeclare(
?? ??? ?"dead_exchange",
?? ??? ?amqp.ExchangeDirect,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil)
?? ?utils.FailOnError(err, "Failed to Declare a exchange")
?? ?// 接收消息的應(yīng)答
?? ?msgs, err := ch.Consume("dead_queue",
?? ??? ?"",
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil,
?? ?)
?? ?var forever chan struct{}
?? ?go func() {
?? ??? ?for d := range msgs {
?? ??? ??? ?log.Printf("[x] %s", d.Body)
?? ??? ??? ?// 開啟手動應(yīng)答?
?? ??? ??? ?d.Ack(false)
?? ??? ?}
?? ?}()
?? ?log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
?? ?<-forever
}produce.go
package day07
import (
?? ?"context"
?? ?amqp "github.com/rabbitmq/amqp091-go"
?? ?"strconv"
?? ?"time"
?? ?"v1/utils"
)
func Produce() {
?? ?// 獲取信道
?? ?ch := utils.GetChannel()
?? ?// 聲明一個交換機(jī)
?? ?err := ch.ExchangeDeclare(
?? ??? ?"normal_exchange",
?? ??? ?amqp.ExchangeDirect,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil)
?? ?utils.FailOnError(err, "Failed to declare a exchange")
?? ?ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second)
?? ?defer cancer()
?? ?// 發(fā)送了10條消息
?? ?for i := 0; i < 10; i++ {
?? ??? ?msg := "Info:" + strconv.Itoa(i)
?? ??? ?ch.PublishWithContext(ctx,
?? ??? ??? ?"normal_exchange",
?? ??? ??? ?"normal_key",
?? ??? ??? ?false,
?? ??? ??? ?false,
?? ??? ??? ?amqp.Publishing{
?? ??? ??? ??? ?ContentType: "text/plain",
?? ??? ??? ??? ?Body: ? ? ? ?[]byte(msg),
?? ??? ??? ?})
?? ?}
}3.2 針對原因2:限制一定的長度
只需要改變consumer1.go中的對normal_queue的聲明
// 聲明一個normal隊列
_, err = ch.QueueDeclare(
constant.NormalQueue,
true,
false,
false,
false,
amqp.Table{
//"x-message-ttl": 5000, // 指定過期時間
"x-max-length": 6,
"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機(jī)
"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
})
3.3 針對原因3:消費者拒絕的消息回到死信隊列中
這里需要完成兩點工作
工作1:需要在consumer1中作出拒絕的操作
go func() {
for d := range msgs {
if err := d.Reject(false); err != nil {
utils.FailOnError(err, "Failed to Reject a message")
}
}
}()
工作2:如果你consume的時候開啟了自動應(yīng)答一定要關(guān)閉
// 消費消息
msgs, err := ch.Consume(constant.NormalQueue,
"",
false, // 這個地方一定要關(guān)閉自動應(yīng)答
false,
false,
false,
nil)
其他的部分不需要改變,按照問題1中的設(shè)計即可。
到此這篇關(guān)于Golang實現(xiàn)RabbitMQ中死信隊列幾種情況的文章就介紹到這了,更多相關(guān)???????Golang RabbitMQ死信隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang實現(xiàn)http服務(wù)器處理靜態(tài)文件示例
這篇文章主要介紹了golang實現(xiàn)http服務(wù)器處理靜態(tài)文件的方法,涉及Go語言基于http協(xié)議處理文件的相關(guān)技巧,需要的朋友可以參考下2016-07-07

