Java中RabbitMQ的幾種消息確認(rèn)機(jī)制
??RabbitMQ的消息確認(rèn)機(jī)制
RabbitMQ消息確認(rèn)機(jī)制指的是在消息傳遞過程中,發(fā)送方發(fā)送消息后,接收方需要對消息進(jìn)行確認(rèn),以確保消息被正確地接收和處理。RabbitMQ的消息確認(rèn)機(jī)制分為兩種:
生產(chǎn)者確認(rèn)機(jī)制:生產(chǎn)者發(fā)送消息后,需要等待RabbitMQ服務(wù)器的確認(rèn)消息,以確保消息已經(jīng)被成功地發(fā)送到RabbitMQ服務(wù)器。如果RabbitMQ服務(wù)器沒有收到消息或者消息發(fā)送失敗,生產(chǎn)者會收到一個確認(rèn)消息,從而可以進(jìn)行重發(fā)或者其他處理。
消費(fèi)者確認(rèn)機(jī)制:消費(fèi)者接收到消息后,需要向RabbitMQ服務(wù)器發(fā)送確認(rèn)消息,以告訴服務(wù)器已經(jīng)成功地接收并處理了該消息。如果消費(fèi)者沒有發(fā)送確認(rèn)消息,RabbitMQ服務(wù)器會認(rèn)為該消息沒有被正確地處理,從而會將該消息重新發(fā)送給其他消費(fèi)者進(jìn)行處理。
在RabbitMQ中,消息確認(rèn)機(jī)制是通過ACK機(jī)制來實現(xiàn)的。ACK代表Acknowledgement,即確認(rèn)消息。當(dāng)消息發(fā)送方發(fā)送消息后,接收方需要向消息發(fā)送方發(fā)送ACK消息,以表示已經(jīng)成功地接收和處理了該消息。如果消息發(fā)送方?jīng)]有收到ACK消息,就會認(rèn)為該消息沒有被正確地處理,從而進(jìn)行重發(fā)或者其他處理。
總之,RabbitMQ的消息確認(rèn)機(jī)制可以保證消息的可靠性,從而提高系統(tǒng)的穩(wěn)定性和可靠性。
??消息可靠抵達(dá)-ConfirmCallback
RabbitMQ的消息確認(rèn)機(jī)制確保了消息的可靠抵達(dá),其中ConfirmCallback是其中一種實現(xiàn)方式。
ConfirmCallback是一個回調(diào)函數(shù),用于在消息被確認(rèn)時進(jìn)行回調(diào),以確保消息已經(jīng)被正確地發(fā)送到RabbitMQ Broker并被處理。當(dāng)生產(chǎn)者發(fā)送消息時,可以通過調(diào)用channel的confirmSelect()方法將channel設(shè)置為confirm模式,然后通過添加ConfirmCallback回調(diào)函數(shù)來處理消息確認(rèn)。
當(dāng)消息被發(fā)送到Broker后,如果Broker成功地將消息路由到目標(biāo)隊列,則會調(diào)用ConfirmCallback回調(diào)函數(shù)的handleAck()方法,表示消息已被確認(rèn)。如果Broker無法將消息路由到目標(biāo)隊列,則會調(diào)用handleNack()方法,表示消息未被確認(rèn)。
使用ConfirmCallback可以確保消息已經(jīng)被正確地發(fā)送到RabbitMQ Broker并被處理,從而避免了消息丟失或重復(fù)發(fā)送的情況。同時,ConfirmCallback還可以在消息未被確認(rèn)時進(jìn)行重試或記錄日志等操作,以確保消息的可靠性和穩(wěn)定性。
ConfirmCallback使用說明:
在配置文件中配置:spring.rabbitmq.publisher-confirms=true
在創(chuàng)建 connectionFactory 的時候設(shè)置 PublisherConfirms(true) 選項,開啟
confirmcallback 。
CorrelationData:用來表示當(dāng)前消息唯一性。
消息只要被 broker 接收到就會執(zhí)行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才會調(diào)用 confirmCallback。
被 broker 接收到只能表示 message 已經(jīng)到達(dá)服務(wù)器,并不能保證消息一定會被投遞 到目標(biāo) queue 里。所以需要用到接下來的 returnCallback 。
??消息可靠抵達(dá)-ReturnCallback
RabbitMQ的ReturnCallback機(jī)制是為了解決消息無法路由到指定隊列的問題。當(dāng)發(fā)送的消息無法被路由到指定隊列時,RabbitMQ會將消息返回給生產(chǎn)者,這時候如果生產(chǎn)者設(shè)置了ReturnCallback回調(diào)函數(shù),就可以在回調(diào)函數(shù)中處理這種情況。
ReturnCallback機(jī)制的使用場景一般是在消息發(fā)送時,指定了mandatory參數(shù)為true,表示如果消息無法被路由到指定隊列,則將消息返回給生產(chǎn)者。如果mandatory參數(shù)為false,則消息會被直接丟棄。
當(dāng)生產(chǎn)者設(shè)置了ReturnCallback回調(diào)函數(shù)后,RabbitMQ在將消息返回給生產(chǎn)者時,會觸發(fā)該回調(diào)函數(shù)。在ReturnCallback回調(diào)函數(shù)中,可以處理消息無法路由的情況,例如重發(fā)消息、記錄日志等。
下面是一個使用ReturnCallback機(jī)制的示例代碼:
channel.addReturnListener(new ReturnCallback() { @Override public void handle(ReturnedMessage returnedMessage) { String message = new String(returnedMessage.getBody()); System.out.println("Message returned: " + message); } }); channel.basicPublish(exchangeName, routingKey, true, null, message.getBytes());
在上面的代碼中,我們通過addReturnListener方法設(shè)置了ReturnCallback回調(diào)函數(shù),當(dāng)消息無法路由到指定隊列時,會觸發(fā)該回調(diào)函數(shù)。在回調(diào)函數(shù)中,我們將返回的消息打印出來,以便處理。
需要注意的是,ReturnCallback機(jī)制只有在消息被發(fā)送到交換機(jī)后,才會觸發(fā)。如果消息發(fā)送的交換機(jī)不存在,或者路由鍵不符合任何綁定規(guī)則,消息會被直接丟棄,不會觸發(fā)ReturnCallback回調(diào)函數(shù)。
在配置文件中配置:
spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
confrim 模式只能保證消息到達(dá) broker,不能保證消息準(zhǔn)確投遞到目標(biāo) queue 里。在有 些業(yè)務(wù)場景下,我們需要保證消息一定要投遞到目標(biāo) queue 里,此時就需要用到return 退回模式。
這樣如果未能投遞到目標(biāo) queue 里將調(diào)用 returnCallback ,可以記錄下詳細(xì)到投遞數(shù)據(jù),定期的巡檢或者自動糾錯都需要這些數(shù)據(jù)
??RabbitMQ自動確認(rèn)和手動確認(rèn)
RabbitMQ消息確認(rèn)機(jī)制是一種保證消息可靠抵達(dá)的機(jī)制。在RabbitMQ中,消息確認(rèn)機(jī)制分為兩種模式:自動確認(rèn)模式和手動確認(rèn)模式。
在自動確認(rèn)模式下,當(dāng)消費(fèi)者收到消息并將其處理完畢后,RabbitMQ會自動將該消息標(biāo)記為已確認(rèn),然后將其從隊列中刪除。這種模式比較簡單,但是存在消息丟失的風(fēng)險,因為如果消費(fèi)者在處理消息的過程中出現(xiàn)異常,消息就會被丟失。
在手動確認(rèn)模式下,當(dāng)消費(fèi)者收到消息并將其處理完畢后,需要向RabbitMQ發(fā)送一個確認(rèn)消息,告訴RabbitMQ該消息已經(jīng)被處理完畢,可以從隊列中刪除。如果消費(fèi)者在處理消息的過程中出現(xiàn)異常,可以選擇不發(fā)送確認(rèn)消息,這樣消息就不會被從隊列中刪除,可以重新被其他消費(fèi)者獲取。
手動確認(rèn)模式可以保證消息不會被丟失,但是需要消費(fèi)者編寫額外的代碼來處理確認(rèn)消息的發(fā)送。此外,手動確認(rèn)模式還可以設(shè)置確認(rèn)模式為批量確認(rèn)模式,即一次性確認(rèn)多個消息,可以提高消息處理的效率。
總結(jié):
- 自動ACK模式
在自動ACK模式下,消息一旦被消費(fèi)者接收到,就會自動被確認(rèn)。這種模式下,消息一旦被發(fā)送到消費(fèi)者,就會從隊列中刪除,無論消費(fèi)者是否已經(jīng)成功消費(fèi)該消息。
- 手動ACK模式
在手動ACK模式下,消費(fèi)者必須手動發(fā)送ACK消息來確認(rèn)消息已經(jīng)被成功消費(fèi)。如果消費(fèi)者沒有發(fā)送ACK消息,RabbitMQ服務(wù)器就會認(rèn)為該消息還沒有被消費(fèi),會將該消息重新發(fā)送給其他消費(fèi)者。這種模式下,消費(fèi)者可以選擇性地確認(rèn)消息,只有當(dāng)消費(fèi)者成功消費(fèi)一條消息并確認(rèn)后,該消息才會從隊列中刪除。
手動ACK模式可以保證消息的可靠性,但是需要消費(fèi)者在處理消息時進(jìn)行額外的處理,消費(fèi)者需要在處理消息后發(fā)送ACK消息,否則消息會被重新發(fā)送。
在 RabbitMQ 中,消費(fèi)者可以通過設(shè)置 channel.basicAck(deliveryTag, multiple) 方法來發(fā)送 ack 消息。其中,deliveryTag 表示消息的唯一標(biāo)識符,multiple 表示是否批量確認(rèn)。如果 multiple 為 true,表示要確認(rèn)該 deliveryTag 及其之前的所有消息;如果 multiple 為 false,表示只確認(rèn)該 deliveryTag 指定的一條消息。
需要注意的是,如果消費(fèi)者在處理消息時發(fā)生了異常,消息并沒有被成功處理,那么不應(yīng)該發(fā)送 ack 消息,而應(yīng)該將消息重新放回隊列中,讓其他消費(fèi)者來處理
??RabbitMQ處理消息方法
RabbitMQ提供了一些基本的方法來處理消息,這些方法包括:
basic.publish
: 發(fā)布消息到指定的交換機(jī)上。basic.consume
: 消費(fèi)消息,啟動一個消費(fèi)者來監(jiān)聽指定隊列上的消息。basic.ack
: 確認(rèn)消息已經(jīng)被消費(fèi),告訴RabbitMQ可以刪除該消息。basic.nack
: 否認(rèn)消息已經(jīng)被消費(fèi),告訴RabbitMQ需要重新發(fā)送該消息。basic.reject
: 拒絕消息,告訴RabbitMQ不需要再次發(fā)送該消息。basic.get
: 獲取指定隊列上的一條消息。basic.cancel
: 取消消費(fèi)者的消費(fèi),停止監(jiān)聽指定隊列上的消息。
這些方法都是基于AMQP協(xié)議定義的,可以使用RabbitMQ提供的客戶端庫或者自己實現(xiàn)AMQP協(xié)議來調(diào)用這些方法。
到此這篇關(guān)于Java中RabbitMQ的幾種消息確認(rèn)機(jī)制的文章就介紹到這了,更多相關(guān)Java RabbitMQ消息確認(rèn)機(jī)制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring MVC 關(guān)于controller的字符編碼問題
在使用springMVC框架構(gòu)建web應(yīng)用,客戶端常會請求字符串、整型、json等格式的數(shù)據(jù),通常使用@ResponseBody注解使 controller回應(yīng)相應(yīng)的數(shù)據(jù)而不是去渲染某個頁面。2017-03-03Java案例實現(xiàn)不重復(fù)的隨機(jī)數(shù)
這篇文章主要介紹了Java案例實現(xiàn)不重復(fù)的隨機(jī)數(shù),通過創(chuàng)建Set集合對象,可以使用HashSet也可以使用TreeSet,區(qū)別在于TreeSet是排序后的,創(chuàng)建隨機(jī)數(shù)對象,獲取一個隨機(jī)數(shù)去重等操作,需要的朋友可以參考一下2022-04-04java實現(xiàn)輕量型http代理服務(wù)器示例
這篇文章主要介紹了java實現(xiàn)輕量型http代理服務(wù)器示例,需要的朋友可以參考下2014-04-04Spring實戰(zhàn)之使用注解實現(xiàn)聲明式事務(wù)操作示例
這篇文章主要介紹了Spring實戰(zhàn)之使用注解實現(xiàn)聲明式事務(wù)操作,結(jié)合實例形式詳細(xì)分析了spring使用注解實現(xiàn)聲明式事務(wù)相關(guān)配置、接口實現(xiàn)與使用技巧,需要的朋友可以參考下2020-01-01