RabbitMQ的消息確認(rèn)機(jī)制的詳細(xì)總結(jié)
RabbitMQ的消息確認(rèn)機(jī)制
RabbitMQ消息確認(rèn)機(jī)制指的是在消息傳遞過程中,發(fā)送方發(fā)送消息后,接收方需要對(duì)消息進(jìn)行確認(rèn),以確保消息被正確地接收和處理。RabbitMQ的消息確認(rèn)機(jī)制分為兩種:
生產(chǎn)者確認(rèn)機(jī)制:生產(chǎn)者發(fā)送消息后,需要等待RabbitMQ服務(wù)器的確認(rèn)消息,以確保消息已經(jīng)被成功地發(fā)送到RabbitMQ服務(wù)器。如果RabbitMQ服務(wù)器沒有收到消息或者消息發(fā)送失敗,生產(chǎn)者會(huì)收到一個(gè)確認(rèn)消息,從而可以進(jìn)行重發(fā)或者其他處理。
消費(fèi)者確認(rèn)機(jī)制:消費(fèi)者接收到消息后,需要向RabbitMQ服務(wù)器發(fā)送確認(rèn)消息,以告訴服務(wù)器已經(jīng)成功地接收并處理了該消息。如果消費(fèi)者沒有發(fā)送確認(rèn)消息,RabbitMQ服務(wù)器會(huì)認(rèn)為該消息沒有被正確地處理,從而會(huì)將該消息重新發(fā)送給其他消費(fèi)者進(jìn)行處理。
在RabbitMQ中,消息確認(rèn)機(jī)制是通過ACK機(jī)制來實(shí)現(xiàn)的。ACK代表Acknowledgement,即確認(rèn)消息。當(dāng)消息發(fā)送方發(fā)送消息后,接收方需要向消息發(fā)送方發(fā)送ACK消息,以表示已經(jīng)成功地接收和處理了該消息。如果消息發(fā)送方?jīng)]有收到ACK消息,就會(huì)認(rèn)為該消息沒有被正確地處理,從而進(jìn)行重發(fā)或者其他處理。
總之,RabbitMQ的消息確認(rèn)機(jī)制可以保證消息的可靠性,從而提高系統(tǒng)的穩(wěn)定性和可靠性。
消息可靠抵達(dá)-ConfirmCallback
RabbitMQ的消息確認(rèn)機(jī)制確保了消息的可靠抵達(dá),其中ConfirmCallback是其中一種實(shí)現(xiàn)方式。
ConfirmCallback是一個(gè)回調(diào)函數(shù),用于在消息被確認(rèn)時(shí)進(jìn)行回調(diào),以確保消息已經(jīng)被正確地發(fā)送到RabbitMQ Broker并被處理。當(dāng)生產(chǎn)者發(fā)送消息時(shí),可以通過調(diào)用channel的confirmSelect()方法將channel設(shè)置為confirm模式,然后通過添加ConfirmCallback回調(diào)函數(shù)來處理消息確認(rèn)。
當(dāng)消息被發(fā)送到Broker后,如果Broker成功地將消息路由到目標(biāo)隊(duì)列,則會(huì)調(diào)用ConfirmCallback回調(diào)函數(shù)的handleAck()方法,表示消息已被確認(rèn)。如果Broker無法將消息路由到目標(biāo)隊(duì)列,則會(huì)調(diào)用handleNack()方法,表示消息未被確認(rèn)。
使用ConfirmCallback可以確保消息已經(jīng)被正確地發(fā)送到RabbitMQ Broker并被處理,從而避免了消息丟失或重復(fù)發(fā)送的情況。同時(shí),ConfirmCallback還可以在消息未被確認(rèn)時(shí)進(jìn)行重試或記錄日志等操作,以確保消息的可靠性和穩(wěn)定性。
ConfirmCallback使用說明:
在配置文件中配置:spring.rabbitmq.publisher-confirms=true
在創(chuàng)建 connectionFactory 的時(shí)候設(shè)置 PublisherConfirms(true) 選項(xiàng),開啟
confirmcallback 。
CorrelationData:用來表示當(dāng)前消息唯一性。
消息只要被 broker 接收到就會(huì)執(zhí)行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才會(huì)調(diào)用 confirmCallback。
被 broker 接收到只能表示 message 已經(jīng)到達(dá)服務(wù)器,并不能保證消息一定會(huì)被投遞 到目標(biāo) queue 里。所以需要用到接下來的 returnCallback 。
消息可靠抵達(dá)-ReturnCallback
RabbitMQ的ReturnCallback機(jī)制是為了解決消息無法路由到指定隊(duì)列的問題。當(dāng)發(fā)送的消息無法被路由到指定隊(duì)列時(shí),RabbitMQ會(huì)將消息返回給生產(chǎn)者,這時(shí)候如果生產(chǎn)者設(shè)置了ReturnCallback回調(diào)函數(shù),就可以在回調(diào)函數(shù)中處理這種情況。
ReturnCallback機(jī)制的使用場景一般是在消息發(fā)送時(shí),指定了mandatory參數(shù)為true,表示如果消息無法被路由到指定隊(duì)列,則將消息返回給生產(chǎn)者。如果mandatory參數(shù)為false,則消息會(huì)被直接丟棄。
當(dāng)生產(chǎn)者設(shè)置了ReturnCallback回調(diào)函數(shù)后,RabbitMQ在將消息返回給生產(chǎn)者時(shí),會(huì)觸發(fā)該回調(diào)函數(shù)。在ReturnCallback回調(diào)函數(shù)中,可以處理消息無法路由的情況,例如重發(fā)消息、記錄日志等。
下面是一個(gè)使用ReturnCallback機(jī)制的示例代碼:
channel.addReturnListener(new ReturnCallback() { @Override public void handle(ReturnedMessage returnedMessage) { String message = new String(returnedMessage.getBody()); System.out.println("Message returned: " + message); } }); channel.basicPublish(exchangeName, routingKey, true, null, message.getBytes());
在上面的代碼中,我們通過addReturnListener方法設(shè)置了ReturnCallback回調(diào)函數(shù),當(dāng)消息無法路由到指定隊(duì)列時(shí),會(huì)觸發(fā)該回調(diào)函數(shù)。在回調(diào)函數(shù)中,我們將返回的消息打印出來,以便處理。
需要注意的是,ReturnCallback機(jī)制只有在消息被發(fā)送到交換機(jī)后,才會(huì)觸發(fā)。如果消息發(fā)送的交換機(jī)不存在,或者路由鍵不符合任何綁定規(guī)則,消息會(huì)被直接丟棄,不會(huì)觸發(fā)ReturnCallback回調(diào)函數(shù)。
在配置文件中配置:
spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
confrim 模式只能保證消息到達(dá) broker,不能保證消息準(zhǔn)確投遞到目標(biāo) queue 里。在有 些業(yè)務(wù)場景下,我們需要保證消息一定要投遞到目標(biāo) queue 里,此時(shí)就需要用到return 退回模式。
這樣如果未能投遞到目標(biāo) queue 里將調(diào)用 returnCallback ,可以記錄下詳細(xì)到投遞數(shù)據(jù),定期的巡檢或者自動(dòng)糾錯(cuò)都需要這些數(shù)據(jù)。
RabbitMQ自動(dòng)確認(rèn)和手動(dòng)確認(rèn)
RabbitMQ消息確認(rèn)機(jī)制是一種保證消息可靠抵達(dá)的機(jī)制。在RabbitMQ中,消息確認(rèn)機(jī)制分為兩種模式:自動(dòng)確認(rèn)模式和手動(dòng)確認(rèn)模式。
在自動(dòng)確認(rèn)模式下,當(dāng)消費(fèi)者收到消息并將其處理完畢后,RabbitMQ會(huì)自動(dòng)將該消息標(biāo)記為已確認(rèn),然后將其從隊(duì)列中刪除。這種模式比較簡單,但是存在消息丟失的風(fēng)險(xiǎn),因?yàn)槿绻M(fèi)者在處理消息的過程中出現(xiàn)異常,消息就會(huì)被丟失。
在手動(dòng)確認(rèn)模式下,當(dāng)消費(fèi)者收到消息并將其處理完畢后,需要向RabbitMQ發(fā)送一個(gè)確認(rèn)消息,告訴RabbitMQ該消息已經(jīng)被處理完畢,可以從隊(duì)列中刪除。如果消費(fèi)者在處理消息的過程中出現(xiàn)異常,可以選擇不發(fā)送確認(rèn)消息,這樣消息就不會(huì)被從隊(duì)列中刪除,可以重新被其他消費(fèi)者獲取。
手動(dòng)確認(rèn)模式可以保證消息不會(huì)被丟失,但是需要消費(fèi)者編寫額外的代碼來處理確認(rèn)消息的發(fā)送。此外,手動(dòng)確認(rèn)模式還可以設(shè)置確認(rèn)模式為批量確認(rèn)模式,即一次性確認(rèn)多個(gè)消息,可以提高消息處理的效率。
總結(jié):
- 自動(dòng)ACK模式
在自動(dòng)ACK模式下,消息一旦被消費(fèi)者接收到,就會(huì)自動(dòng)被確認(rèn)。這種模式下,消息一旦被發(fā)送到消費(fèi)者,就會(huì)從隊(duì)列中刪除,無論消費(fèi)者是否已經(jīng)成功消費(fèi)該消息。
- 手動(dòng)ACK模式
在手動(dòng)ACK模式下,消費(fèi)者必須手動(dòng)發(fā)送ACK消息來確認(rèn)消息已經(jīng)被成功消費(fèi)。如果消費(fèi)者沒有發(fā)送ACK消息,RabbitMQ服務(wù)器就會(huì)認(rèn)為該消息還沒有被消費(fèi),會(huì)將該消息重新發(fā)送給其他消費(fèi)者。這種模式下,消費(fèi)者可以選擇性地確認(rèn)消息,只有當(dāng)消費(fèi)者成功消費(fèi)一條消息并確認(rèn)后,該消息才會(huì)從隊(duì)列中刪除。
手動(dòng)ACK模式可以保證消息的可靠性,但是需要消費(fèi)者在處理消息時(shí)進(jìn)行額外的處理,消費(fèi)者需要在處理消息后發(fā)送ACK消息,否則消息會(huì)被重新發(fā)送。
在 RabbitMQ 中,消費(fèi)者可以通過設(shè)置 channel.basicAck(deliveryTag, multiple) 方法來發(fā)送 ack 消息。其中,deliveryTag 表示消息的唯一標(biāo)識(shí)符,multiple 表示是否批量確認(rèn)。如果 multiple 為 true,表示要確認(rèn)該 deliveryTag 及其之前的所有消息;如果 multiple 為 false,表示只確認(rèn)該 deliveryTag 指定的一條消息。
需要注意的是,如果消費(fèi)者在處理消息時(shí)發(fā)生了異常,消息并沒有被成功處理,那么不應(yīng)該發(fā)送 ack 消息,而應(yīng)該將消息重新放回隊(duì)列中,讓其他消費(fèi)者來
RabbitMQ處理消息方法
RabbitMQ提供了一些基本的方法來處理消息,這些方法包括:
basic.publish
: 發(fā)布消息到指定的交換機(jī)上。basic.consume
: 消費(fèi)消息,啟動(dòng)一個(gè)消費(fèi)者來監(jiān)聽指定隊(duì)列上的消息。basic.ack
: 確認(rèn)消息已經(jīng)被消費(fèi),告訴RabbitMQ可以刪除該消息。basic.nack
: 否認(rèn)消息已經(jīng)被消費(fèi),告訴RabbitMQ需要重新發(fā)送該消息。basic.reject
: 拒絕消息,告訴RabbitMQ不需要再次發(fā)送該消息。basic.get
: 獲取指定隊(duì)列上的一條消息。basic.cancel
: 取消消費(fèi)者的消費(fèi),停止監(jiān)聽指定隊(duì)列上的消息。
這些方法都是基于AMQP協(xié)議定義的,可以使用RabbitMQ提供的客戶端庫或者自己實(shí)現(xiàn)AMQP協(xié)議來調(diào)用這些方法。
到此這篇關(guān)于RabbitMQ的消息確認(rèn)機(jī)制的詳細(xì)總結(jié)的文章就介紹到這了,更多相關(guān)RabbitMQ消息確認(rèn)機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- RabbitMQ的ACK確認(rèn)機(jī)制保障消費(fèi)端消息的可靠性詳解
- springboot實(shí)現(xiàn)rabbitmq消息確認(rèn)的示例代碼
- 一文總結(jié)RabbitMQ中的消息確認(rèn)機(jī)制
- RabbitMQ消息確認(rèn)機(jī)制剖析
- 詳解SpringBoot整合RabbitMQ如何實(shí)現(xiàn)消息確認(rèn)
- SpringBoot整合RabbitMQ實(shí)現(xiàn)消息確認(rèn)機(jī)制
- springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制(踩坑經(jīng)驗(yàn))
- Java中RabbitMQ的幾種消息確認(rèn)機(jī)制
相關(guān)文章
Java中Stream流對(duì)多個(gè)字段進(jìn)行排序的方法
我們?cè)谔幚頂?shù)據(jù)的時(shí)候經(jīng)常會(huì)需要進(jìn)行排序后再返回給前端調(diào)用,比如按照時(shí)間升序排序,前端展示數(shù)據(jù)就是按時(shí)間先后進(jìn)行排序,下面這篇文章主要給大家介紹了關(guān)于Java中Stream流對(duì)多個(gè)字段進(jìn)行排序的相關(guān)資料,需要的朋友可以參考下2023-10-10SpringBoot整合WxJava開啟消息推送的實(shí)現(xiàn)
本文主要介紹了SpringBoot整合WxJava開啟消息推送,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-03-03Seata AT模式TransactionHook被刪除探究
這篇文章主要為大家介紹了Seata AT模式TransactionHook被刪除探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11