欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMq如何做到消息的可靠性投遞

 更新時(shí)間:2022年12月17日 16:32:07   作者:Onemorelight95  
現(xiàn)在的一些互聯(lián)網(wǎng)項(xiàng)目或者是高并發(fā)的項(xiàng)目中很少有沒有引入消息隊(duì)列的。 引入消息隊(duì)列可以給這個(gè)項(xiàng)目帶來(lái)很多的好處,這篇文章主要為大家介紹了RabbitMq如何做到消息的可靠性投遞,有需要的朋友可以借鑒參考下

如何保證消息不丟失

在使用RabbitMQ的時(shí)候,我們需要保證消息不能丟失,消息從生產(chǎn)者生產(chǎn)出來(lái)一直到消費(fèi)者消費(fèi)成功,這條鏈路是這樣的:

消息的可靠投遞分為了兩大內(nèi)容:發(fā)送端的確認(rèn)(p->broker和exchange->queue)和消費(fèi)端的確認(rèn)(queue->c)。

發(fā)送端的確認(rèn)

Rabbit提供了兩種方式來(lái)保證發(fā)送端的消息可靠性投遞:confirm 確認(rèn)模式

和return 退回模式。

confirm 確認(rèn)模式:消息從 producer 到達(dá) exchange 則會(huì)給 producer 發(fā)送一個(gè)應(yīng)答,我們需要開啟confirm模式,才能接收到這條應(yīng)答。開啟方式是將Channel.Confirm(noWait bool)參數(shù)設(shè)置為false,表示同意發(fā)送者將當(dāng)前channel信道設(shè)置為confirm模式。

return 退回模式:消息從 exchange–>queue 投遞失敗,會(huì)將消息退回給producer。

消費(fèi)端的確認(rèn)

消息從Queue發(fā)送到消費(fèi)端之后,消費(fèi)端會(huì)發(fā)送一個(gè)確認(rèn)消息:Consumer Ack,有兩種確認(rèn)方式:自動(dòng)確認(rèn)和手動(dòng)確認(rèn)。

在編碼中,關(guān)于消息的確認(rèn)方式,我們需要在消費(fèi)者端調(diào)用Consumer函數(shù)時(shí),設(shè)置第三個(gè)參數(shù):autoAck是false還是true(false表示手動(dòng),true表示自動(dòng))。

自動(dòng)確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng) message 從 RabbitMQ 的消息緩存中移除。

但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會(huì)丟失。如果設(shè)置了手動(dòng)確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用ch.Ack(false),手動(dòng)簽收,如果出現(xiàn)異常,則調(diào)用d.Reject(true)讓其自動(dòng)重新發(fā)送消息。

Go 實(shí)現(xiàn)

安裝操作庫(kù)

安裝API庫(kù)

Go可以使用streadway/amqp庫(kù)來(lái)操作rabbit,使用以下命令來(lái)安裝:

go get github.com/streadway/amqp

封裝rabbitmq

接下來(lái)我們對(duì)streadway/amqp庫(kù)的內(nèi)容進(jìn)行一個(gè)二次封裝,封裝為一個(gè)rabbitmq.go文件:

package rabbitmq
import (
	"encoding/json"
	"github.com/streadway/amqp"
	"log"
)
// RabbitMQ RabbitMQ結(jié)構(gòu)
type RabbitMQ struct {
	channel  *amqp.Channel
	Name     string
	exchange string
}
// Connect 連接服務(wù)器
func Connect(s string) *RabbitMQ {
	//連接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連接Rabbitmq服務(wù)器失??!")
	ch, e := conn.Channel()
	failOnError(e, "無(wú)法打開頻道!")
	mq := new(RabbitMQ)
	mq.channel = ch
	return mq
}
// New 初始化消息隊(duì)列
//第一個(gè)參數(shù):rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù):隊(duì)列名字
func New(s string, name string) *RabbitMQ {
	//連接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連接Rabbitmq服務(wù)器失敗!")
	ch, e := conn.Channel()
	failOnError(e, "無(wú)法打開頻道!")
	q, e := ch.QueueDeclare(
		name,  //隊(duì)列名
		false, //是否開啟持久化
		true,  //不使用時(shí)刪除
		false, //排他
		false, //不等待
		nil,   //參數(shù)
	)
	failOnError(e, "初始化消息隊(duì)列失?。?)
	mq := new(RabbitMQ)
	mq.channel = ch
	mq.Name = q.Name
	return mq
}
// QueueDeclare 聲明queue
func (q *RabbitMQ) QueueDeclare(queue string) {
	_, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)
	failOnError(e, "聲明queue失??!")
}
// QueueDelete 刪除queue
func (q *RabbitMQ) QueueDelete(queue string) {
	_, e := q.channel.QueueDelete(queue, false, true, false)
	failOnError(e, "刪除queue失??!")
}
// Qos 配置queue參數(shù)
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "無(wú)法設(shè)置QoS")
}
// NewExchange 初始化交換機(jī)
//第一個(gè)參數(shù):rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù):交換機(jī)名字,第三個(gè)參數(shù):交換機(jī)類型
func NewExchange(s string, name string, typename string) {
	//連接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連接Rabbitmq服務(wù)器失敗!")
	ch, e := conn.Channel()
	failOnError(e, "無(wú)法打開頻道!")
	e = ch.ExchangeDeclare(
		name,     // name
		typename, // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(e, "初始化交換機(jī)失敗!")
}
// ExchangeDelete 刪除交換機(jī)
func (q *RabbitMQ) ExchangeDelete(exchange string) {
	e := q.channel.ExchangeDelete(exchange, false, true)
	failOnError(e, "刪除交換機(jī)失敗!")
}
// Bind 綁定消息隊(duì)列到exchange
func (q *RabbitMQ) Bind(exchange string, key string) {
	e := q.channel.QueueBind(
		q.Name,
		key,
		exchange,
		false,
		nil,
	)
	failOnError(e, "綁定隊(duì)列失??!")
	q.exchange = exchange
}
// Send 向消息隊(duì)列發(fā)送消息
//Send方法可以往某個(gè)消息隊(duì)列發(fā)送消息
func (q *RabbitMQ) Send(queue string, body interface{}) {
	str, e := json.Marshal(body)
	failOnError(e, "消息序列化失?。?)
	e = q.channel.Publish(
		"",    //交換
		queue, //路由鍵
		false, //必填
		false, //立即
		amqp.Publishing{
			ReplyTo: q.Name,
			Body:    []byte(str),
		})
	msg := "向隊(duì)列:" + q.Name + "發(fā)送消息失?。?
	failOnError(e, msg)
}
// Publish 向exchange發(fā)送消息
//Publish方法可以往某個(gè)exchange發(fā)送消息
func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
	str, e := json.Marshal(body)
	failOnError(e, "消息序列化失??!")
	e = q.channel.Publish(
		exchange,
		key,
		false,
		false,
		amqp.Publishing{ReplyTo: q.Name,
			Body: []byte(str)},
	)
	failOnError(e, "向交換機(jī)發(fā)送消息失??!")
}
// Consume 接收某個(gè)消息隊(duì)列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name, //指定從哪個(gè)隊(duì)列中接收消息
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收消息失??!")
	return c
}
// Close 關(guān)閉隊(duì)列連接
func (q *RabbitMQ) Close() {
	q.channel.Close()
}
//錯(cuò)誤處理函數(shù)
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

發(fā)送端的確認(rèn)

首先初始化消息隊(duì)列的時(shí)候,我們要開啟confirm模式,才能接收到這條應(yīng)答。開啟方式是將Channel.Confirm(noWait bool)參數(shù)設(shè)置為false,表示同意發(fā)送者將當(dāng)前channel信道設(shè)置為confirm模式。

func New(s string, name string) *RabbitMQ {
	conn, e := amqp.Dial(s)
	failOnError(e, "連接Rabbitmq服務(wù)器失?。?)
	ch, e := conn.Channel()
	failOnError(e, "無(wú)法打開頻道!")
	q, e := ch.QueueDeclare(
		name,  //隊(duì)列名
		false, //是否開啟持久化
		true,  //不使用時(shí)刪除
		false, //排他
		false, //不等待
		nil,   //參數(shù)
	)
	failOnError(e, "初始化消息隊(duì)列失敗!")
	mq := new(RabbitMQ)
	mq.channel = ch
	mq.Name = q.Name
	// 設(shè)置為confirm模式
	mq.channel.Confirm(false)
	return mq
}

然后在封裝庫(kù)中創(chuàng)建一個(gè)函數(shù)handleConfirm()用于接收來(lái)自Borker的回復(fù):

func (q *RabbitMQ) ConfirmFromBroker(ch chan amqp.Confirmation) chan amqp.Confirmation {
	return q.channel.NotifyPublish(ch)
}

生產(chǎn)者

生產(chǎn)者端在向Broker發(fā)送消息的時(shí)候,我們使用一個(gè)無(wú)緩沖的通道來(lái)接收來(lái)自Broker的回復(fù),然后創(chuàng)建一個(gè)協(xié)程監(jiān)聽這個(gè)無(wú)緩沖通道。

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定為topic類型
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout")
	confirm := producer.ConfirmFromBroker(make(chan amqp.Confirmation))
	go handleConfirm(confirm)
	var i int
	for {
		time.Sleep(time.Second)
		producer.Publish("exchange1", "fanout message: "+strconv.Itoa(i), "")
		i++
	}
}
func handleConfirm(confirm <-chan amqp.Confirmation) {
	for {
		select {
		case message := <-confirm:
			fmt.Println("接收到來(lái)自Broker的回復(fù):", message)
		}
	}
}

運(yùn)行結(jié)果:

接收到來(lái)自Broker的回復(fù): {1 true}
接收到來(lái)自Broker的回復(fù): {2 true}
接收到來(lái)自Broker的回復(fù): {3 true}
接收到來(lái)自Broker的回復(fù): {4 true}
接收到來(lái)自Broker的回復(fù): {5 true}

消費(fèi)端的確認(rèn)

首先將Consume函數(shù)的第三個(gè)參數(shù)autoAck參數(shù)標(biāo)記為false:

// Consume 接收某個(gè)消息隊(duì)列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name,
		"",
		false, // 不自動(dòng)確認(rèn)消息
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收消息失??!")
	return c
}

在消費(fèi)者端我們采用公平派遣模式,即隊(duì)列發(fā)送消息給消費(fèi)者的時(shí)候,不再采用輪詢機(jī)制,而是一個(gè)消費(fèi)者消費(fèi)完消息之后,會(huì)調(diào)用Ack(false)函數(shù)向隊(duì)列發(fā)送一個(gè)回復(fù),隊(duì)列每次會(huì)將消息優(yōu)先發(fā)送給消費(fèi)完消息的消費(fèi)者(回復(fù)過(guò))。

消費(fèi)端限流:

實(shí)現(xiàn)公平派遣模式我們需要設(shè)置消費(fèi)者端一次只能消費(fèi)一條消息,之前我們已經(jīng)進(jìn)行了封裝,直接在消費(fèi)者端調(diào)用即可:

// Qos 配置queue參數(shù)
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "無(wú)法設(shè)置QoS")
}

生產(chǎn)者

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定為direct類型
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct")
	i := 0
	for {
		time.Sleep(time.Second)
		producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1")
		i = i + 1
	}
}

消費(fèi)者1

消費(fèi)者2在消費(fèi)第三條消息的時(shí)候,假設(shè)發(fā)生了錯(cuò)誤,我們調(diào)用d.Reject(true)函數(shù)讓隊(duì)列重新發(fā)送消息。

func main() {
	//第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 指定一次只消費(fèi)一條消息,直到消費(fèi)完才重新接收
	consumer1.Qos()
	// 隊(duì)列綁定到exchange
	consumer1.Bind("exchange", "key1")
	//接收消息
	msgs := consumer1.Consume()
	go func() {
		var i int
		for d := range msgs {
			time.Sleep(time.Second * 1)
			log.Printf("Consumer1 received a message: %s", d.Body)
			// 假設(shè)消費(fèi)第三條消息的時(shí)候出現(xiàn)了錯(cuò)誤,我們就調(diào)用d.Reject(true),隊(duì)列會(huì)重新發(fā)送消息給消費(fèi)者
			if i == 2 {
				d.Reject(true)
			} else {
				// 消息消費(fèi)成功之后就回復(fù)
				d.Ack(false)
			}
			i++
		}
	}()
	select {}
}

消費(fèi)者2

func main() {
	//第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 指定一次只消費(fèi)一條消息,直到消費(fèi)完才重新接收
	consumer2.Qos()
	// 隊(duì)列綁定到exchange
	consumer2.Bind("exchange", "key1")
	//接收消息
	msgs := consumer2.Consume()
	go func() {
		for d := range msgs {
			time.Sleep(time.Second * 5)
			log.Printf("Consumer2 received a message: %s", d.Body)
			// 消息消費(fèi)成功之后就回復(fù)
			d.Ack(false)
		}
	}()
	select {}
}

運(yùn)行結(jié)果:

# 消費(fèi)者1
2022/11/06 19:55:08 Consumer1 received a message: "routing message: 0"
2022/11/06 19:55:10 Consumer1 received a message: "routing message: 2"
2022/11/06 19:55:11 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:12 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:13 Consumer1 received a message: "routing message: 4"
2022/11/06 19:55:14 Consumer1 received a message: "routing message: 6"

# 消費(fèi)者2
2022/11/06 19:55:13 Consumer2 received a message: "routing message: 1"

到此這篇關(guān)于RabbitMq如何做到消息的可靠性投遞的文章就介紹到這了,更多相關(guān)RabbitMq消息可靠性投遞內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Go并發(fā)編程中的錯(cuò)誤恢復(fù)機(jī)制與代碼持續(xù)執(zhí)行實(shí)例探索

    Go并發(fā)編程中的錯(cuò)誤恢復(fù)機(jī)制與代碼持續(xù)執(zhí)行實(shí)例探索

    這篇文章主要為大家介紹了Go并發(fā)編程中的錯(cuò)誤恢復(fù)機(jī)制與代碼持續(xù)執(zhí)行實(shí)例探索,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2024-01-01
  • 數(shù)據(jù)競(jìng)爭(zhēng)和內(nèi)存重分配Golang slice并發(fā)不安全問(wèn)題解決

    數(shù)據(jù)競(jìng)爭(zhēng)和內(nèi)存重分配Golang slice并發(fā)不安全問(wèn)題解決

    這篇文章主要為大家介紹了數(shù)據(jù)競(jìng)爭(zhēng)和內(nèi)存重分配Golang slice并發(fā)不安全問(wèn)題解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-10-10
  • Go語(yǔ)言使用buffer讀取文件的實(shí)現(xiàn)示例

    Go語(yǔ)言使用buffer讀取文件的實(shí)現(xiàn)示例

    本文主要介紹了Go語(yǔ)言使用buffer讀取文件的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • go語(yǔ)言中的return語(yǔ)句

    go語(yǔ)言中的return語(yǔ)句

    這篇文章主要介紹了go語(yǔ)言中的return語(yǔ)句,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下,希望對(duì)你的學(xué)習(xí)有所幫助
    2022-05-05
  • 讓goland支持proto文件類型的實(shí)現(xiàn)

    讓goland支持proto文件類型的實(shí)現(xiàn)

    這篇文章主要介紹了讓goland支持proto文件類型的實(shí)現(xiàn)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-12-12
  • Go語(yǔ)言zip文件的讀寫操作

    Go語(yǔ)言zip文件的讀寫操作

    本文主要介紹了Go語(yǔ)言zip文件的讀寫操作,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • go通過(guò)benchmark對(duì)代碼進(jìn)行性能測(cè)試詳解

    go通過(guò)benchmark對(duì)代碼進(jìn)行性能測(cè)試詳解

    在開發(fā)中我們要想編寫高性能的代碼,或者優(yōu)化代碼的性能時(shí),你首先得知道當(dāng)前代碼的性能,在go中可以使用testing包的benchmark來(lái)做基準(zhǔn)測(cè)試 ,文中有詳細(xì)的代碼示例,感興趣的小伙伴可以參考一下
    2023-04-04
  • golang結(jié)構(gòu)體與json格式串實(shí)例代碼

    golang結(jié)構(gòu)體與json格式串實(shí)例代碼

    本文通過(guò)實(shí)例代碼給大家介紹了golang結(jié)構(gòu)體與json格式串的相關(guān)知識(shí),非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2018-10-10
  • go語(yǔ)言中排序sort的使用方法示例

    go語(yǔ)言中排序sort的使用方法示例

    golang中也實(shí)現(xiàn)了排序算法的包sort包,下面這篇文章就來(lái)給大家介紹了關(guān)于go語(yǔ)言中排序sort的使用方法,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2018-06-06
  • Go語(yǔ)言對(duì)JSON進(jìn)行編碼和解碼的方法

    Go語(yǔ)言對(duì)JSON進(jìn)行編碼和解碼的方法

    這篇文章主要介紹了Go語(yǔ)言對(duì)JSON進(jìn)行編碼和解碼的方法,涉及Go語(yǔ)言操作json的技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-02-02

最新評(píng)論