一文總結(jié)RabbitMQ中的消息確認機制
RabbitMQ的消息確認機制
RabbitMQ消息確認機制指的是在消息傳遞過程中,發(fā)送方發(fā)送消息后,接收方需要對消息進行確認,以確保消息被正確地接收和處理。RabbitMQ的消息確認機制分為兩種:
- 生產(chǎn)者確認機制:生產(chǎn)者發(fā)送消息后,需要等待RabbitMQ服務(wù)器的確認消息,以確保消息已經(jīng)被成功地發(fā)送到RabbitMQ服務(wù)器。如果RabbitMQ服務(wù)器沒有收到消息或者消息發(fā)送失敗,生產(chǎn)者會收到一個確認消息,從而可以進行重發(fā)或者其他處理。
- 消費者確認機制:消費者接收到消息后,需要向RabbitMQ服務(wù)器發(fā)送確認消息,以告訴服務(wù)器已經(jīng)成功地接收并處理了該消息。如果消費者沒有發(fā)送確認消息,RabbitMQ服務(wù)器會認為該消息沒有被正確地處理,從而會將該消息重新發(fā)送給其他消費者進行處理。
在RabbitMQ中,消息確認機制是通過ACK機制來實現(xiàn)的。ACK代表Acknowledgement,即確認消息。當消息發(fā)送方發(fā)送消息后,接收方需要向消息發(fā)送方發(fā)送ACK消息,以表示已經(jīng)成功地接收和處理了該消息。如果消息發(fā)送方?jīng)]有收到ACK消息,就會認為該消息沒有被正確地處理,從而進行重發(fā)或者其他處理。
總之,RabbitMQ的消息確認機制可以保證消息的可靠性,從而提高系統(tǒng)的穩(wěn)定性和可靠性。
消息可靠抵達-ConfirmCallback
RabbitMQ的消息確認機制確保了消息的可靠抵達,其中ConfirmCallback是其中一種實現(xiàn)方式。
ConfirmCallback是一個回調(diào)函數(shù),用于在消息被確認時進行回調(diào),以確保消息已經(jīng)被正確地發(fā)送到RabbitMQ Broker并被處理。當生產(chǎn)者發(fā)送消息時,可以通過調(diào)用channel的confirmSelect()方法將channel設(shè)置為confirm模式,然后通過添加ConfirmCallback回調(diào)函數(shù)來處理消息確認。
當消息被發(fā)送到Broker后,如果Broker成功地將消息路由到目標隊列,則會調(diào)用ConfirmCallback回調(diào)函數(shù)的handleAck()方法,表示消息已被確認。如果Broker無法將消息路由到目標隊列,則會調(diào)用handleNack()方法,表示消息未被確認。
使用ConfirmCallback可以確保消息已經(jīng)被正確地發(fā)送到RabbitMQ Broker并被處理,從而避免了消息丟失或重復發(fā)送的情況。同時,ConfirmCallback還可以在消息未被確認時進行重試或記錄日志等操作,以確保消息的可靠性和穩(wěn)定性。
ConfirmCallback使用說明: 在配置文件中配置:spring.rabbitmq.publisher-confirms=true 在創(chuàng)建 connectionFactory 的時候設(shè)置 PublisherConfirms(true) 選項,開啟 confirmcallback 。 CorrelationData:用來表示當前消息唯一性。 消息只要被 broker 接收到就會執(zhí)行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才會調(diào)用 confirmCallback。 被 broker 接收到只能表示 message 已經(jīng)到達服務(wù)器,并不能保證消息一定會被投遞 到目標 queue 里。所以需要用到接下來的 returnCallback 。
消息可靠抵達-ReturnCallback
RabbitMQ的ReturnCallback機制是為了解決消息無法路由到指定隊列的問題。當發(fā)送的消息無法被路由到指定隊列時,RabbitMQ會將消息返回給生產(chǎn)者,這時候如果生產(chǎn)者設(shè)置了ReturnCallback回調(diào)函數(shù),就可以在回調(diào)函數(shù)中處理這種情況。
ReturnCallback機制的使用場景一般是在消息發(fā)送時,指定了mandatory參數(shù)為true,表示如果消息無法被路由到指定隊列,則將消息返回給生產(chǎn)者。如果mandatory參數(shù)為false,則消息會被直接丟棄。
當生產(chǎn)者設(shè)置了ReturnCallback回調(diào)函數(shù)后,RabbitMQ在將消息返回給生產(chǎn)者時,會觸發(fā)該回調(diào)函數(shù)。在ReturnCallback回調(diào)函數(shù)中,可以處理消息無法路由的情況,例如重發(fā)消息、記錄日志等。
下面是一個使用ReturnCallback機制的示例代碼:
channel.addReturnListener(new ReturnCallback() { @Override public void handle(ReturnedMessage returnedMessage) { String message = new String(returnedMessage.getBody()); System.out.println("Message returned: " + message); } }); channel.basicPublish(exchangeName, routingKey, true, null, message.getBytes());
在上面的代碼中,我們通過addReturnListener方法設(shè)置了ReturnCallback回調(diào)函數(shù),當消息無法路由到指定隊列時,會觸發(fā)該回調(diào)函數(shù)。在回調(diào)函數(shù)中,我們將返回的消息打印出來,以便處理。
需要注意的是,ReturnCallback機制只有在消息被發(fā)送到交換機后,才會觸發(fā)。如果消息發(fā)送的交換機不存在,或者路由鍵不符合任何綁定規(guī)則,消息會被直接丟棄,不會觸發(fā)ReturnCallback回調(diào)函數(shù)。
在配置文件中配置:
spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
confrim 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 里。在有 些業(yè)務(wù)場景下,我們需要保證消息一定要投遞到目標 queue 里,此時就需要用到return 退回模式。
這樣如果未能投遞到目標 queue 里將調(diào)用 returnCallback ,可以記錄下詳細到投遞數(shù)據(jù),定期的巡檢或者自動糾錯都需要這些數(shù)據(jù)。
RabbitMQ自動確認和手動確認
RabbitMQ消息確認機制是一種保證消息可靠抵達的機制。在RabbitMQ中,消息確認機制分為兩種模式:自動確認模式和手動確認模式。
在自動確認模式下,當消費者收到消息并將其處理完畢后,RabbitMQ會自動將該消息標記為已確認,然后將其從隊列中刪除。這種模式比較簡單,但是存在消息丟失的風險,因為如果消費者在處理消息的過程中出現(xiàn)異常,消息就會被丟失。
在手動確認模式下,當消費者收到消息并將其處理完畢后,需要向RabbitMQ發(fā)送一個確認消息,告訴RabbitMQ該消息已經(jīng)被處理完畢,可以從隊列中刪除。如果消費者在處理消息的過程中出現(xiàn)異常,可以選擇不發(fā)送確認消息,這樣消息就不會被從隊列中刪除,可以重新被其他消費者獲取。
手動確認模式可以保證消息不會被丟失,但是需要消費者編寫額外的代碼來處理確認消息的發(fā)送。此外,手動確認模式還可以設(shè)置確認模式為批量確認模式,即一次性確認多個消息,可以提高消息處理的效率。
總結(jié):
1.自動ACK模式
在自動ACK模式下,消息一旦被消費者接收到,就會自動被確認。這種模式下,消息一旦被發(fā)送到消費者,就會從隊列中刪除,無論消費者是否已經(jīng)成功消費該消息。
2.手動ACK模式
在手動ACK模式下,消費者必須手動發(fā)送ACK消息來確認消息已經(jīng)被成功消費。如果消費者沒有發(fā)送ACK消息,RabbitMQ服務(wù)器就會認為該消息還沒有被消費,會將該消息重新發(fā)送給其他消費者。這種模式下,消費者可以選擇性地確認消息,只有當消費者成功消費一條消息并確認后,該消息才會從隊列中刪除。
手動ACK模式可以保證消息的可靠性,但是需要消費者在處理消息時進行額外的處理,消費者需要在處理消息后發(fā)送ACK消息,否則消息會被重新發(fā)送。
在 RabbitMQ 中,消費者可以通過設(shè)置 channel.basicAck(deliveryTag, multiple) 方法來發(fā)送 ack 消息。其中,deliveryTag 表示消息的唯一標識符,multiple 表示是否批量確認。如果 multiple 為 true,表示要確認該 deliveryTag 及其之前的所有消息;如果 multiple 為 false,表示只確認該 deliveryTag 指定的一條消息。
需要注意的是,如果消費者在處理消息時發(fā)生了異常,消息并沒有被成功處理,那么不應(yīng)該發(fā)送 ack 消息,而應(yīng)該將消息重新放回隊列中,讓其他消費者來處理。
RabbitMQ處理消息方法
RabbitMQ提供了一些基本的方法來處理消息,這些方法包括:
basic.publish
: 發(fā)布消息到指定的交換機上。basic.consume
: 消費消息,啟動一個消費者來監(jiān)聽指定隊列上的消息。basic.ack
: 確認消息已經(jīng)被消費,告訴RabbitMQ可以刪除該消息。basic.nack
: 否認消息已經(jīng)被消費,告訴RabbitMQ需要重新發(fā)送該消息。basic.reject
: 拒絕消息,告訴RabbitMQ不需要再次發(fā)送該消息。basic.get
: 獲取指定隊列上的一條消息。basic.cancel
: 取消消費者的消費,停止監(jiān)聽指定隊列上的消息。
這些方法都是基于AMQP協(xié)議定義的,可以使用RabbitMQ提供的客戶端庫或者自己實現(xiàn)AMQP協(xié)議來調(diào)用這些方法。
到此這篇關(guān)于一文總結(jié)RabbitMQ中的消息確認機制的文章就介紹到這了,更多相關(guān)RabbitMQ消息確認機制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java FileInputStream與FileOutputStream使用詳解
這篇文章主要介紹了Java FileInputStream與FileOutputStream使用詳解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-08-08Java使用DateTimeFormatter格式化輸入的日期時間
這篇文章主要介紹了Java使用DateTimeFormatter格式化輸入的日期時間,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-01-01多數(shù)據(jù)源@DS和@Transactional實戰(zhàn)
這篇文章主要介紹了多數(shù)據(jù)源@DS和@Transactional實戰(zhàn),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09Springboot+aop實現(xiàn)配置多數(shù)據(jù)源的示例代碼
本文介紹了如何使用SpringAOP和注解實現(xiàn)動態(tài)數(shù)據(jù)源切換,通過自定義注解和ThreadLocal存儲數(shù)據(jù)上下文信息,重寫AbstractRoutingDataSource類并使用自定義切面來實現(xiàn)動態(tài)數(shù)據(jù)源的切換,感興趣的可以了解一下2024-11-11簡單了解spring cloud 網(wǎng)關(guān)服務(wù)
這篇文章主要介紹了簡單了解spring cloud 網(wǎng)關(guān)服務(wù),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-10-10