Java中RabbitMQ的幾種消息確認機制
??RabbitMQ的消息確認機制
RabbitMQ消息確認機制指的是在消息傳遞過程中,發(fā)送方發(fā)送消息后,接收方需要對消息進行確認,以確保消息被正確地接收和處理。RabbitMQ的消息確認機制分為兩種:
生產者確認機制:生產者發(fā)送消息后,需要等待RabbitMQ服務器的確認消息,以確保消息已經被成功地發(fā)送到RabbitMQ服務器。如果RabbitMQ服務器沒有收到消息或者消息發(fā)送失敗,生產者會收到一個確認消息,從而可以進行重發(fā)或者其他處理。
消費者確認機制:消費者接收到消息后,需要向RabbitMQ服務器發(fā)送確認消息,以告訴服務器已經成功地接收并處理了該消息。如果消費者沒有發(fā)送確認消息,RabbitMQ服務器會認為該消息沒有被正確地處理,從而會將該消息重新發(fā)送給其他消費者進行處理。
在RabbitMQ中,消息確認機制是通過ACK機制來實現(xiàn)的。ACK代表Acknowledgement,即確認消息。當消息發(fā)送方發(fā)送消息后,接收方需要向消息發(fā)送方發(fā)送ACK消息,以表示已經成功地接收和處理了該消息。如果消息發(fā)送方沒有收到ACK消息,就會認為該消息沒有被正確地處理,從而進行重發(fā)或者其他處理。
總之,RabbitMQ的消息確認機制可以保證消息的可靠性,從而提高系統(tǒng)的穩(wěn)定性和可靠性。
??消息可靠抵達-ConfirmCallback
RabbitMQ的消息確認機制確保了消息的可靠抵達,其中ConfirmCallback是其中一種實現(xiàn)方式。
ConfirmCallback是一個回調函數(shù),用于在消息被確認時進行回調,以確保消息已經被正確地發(fā)送到RabbitMQ Broker并被處理。當生產者發(fā)送消息時,可以通過調用channel的confirmSelect()方法將channel設置為confirm模式,然后通過添加ConfirmCallback回調函數(shù)來處理消息確認。
當消息被發(fā)送到Broker后,如果Broker成功地將消息路由到目標隊列,則會調用ConfirmCallback回調函數(shù)的handleAck()方法,表示消息已被確認。如果Broker無法將消息路由到目標隊列,則會調用handleNack()方法,表示消息未被確認。
使用ConfirmCallback可以確保消息已經被正確地發(fā)送到RabbitMQ Broker并被處理,從而避免了消息丟失或重復發(fā)送的情況。同時,ConfirmCallback還可以在消息未被確認時進行重試或記錄日志等操作,以確保消息的可靠性和穩(wěn)定性。
ConfirmCallback使用說明:
在配置文件中配置:spring.rabbitmq.publisher-confirms=true
在創(chuàng)建 connectionFactory 的時候設置 PublisherConfirms(true) 選項,開啟
confirmcallback 。
CorrelationData:用來表示當前消息唯一性。
消息只要被 broker 接收到就會執(zhí)行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才會調用 confirmCallback。
被 broker 接收到只能表示 message 已經到達服務器,并不能保證消息一定會被投遞 到目標 queue 里。所以需要用到接下來的 returnCallback 。
??消息可靠抵達-ReturnCallback
RabbitMQ的ReturnCallback機制是為了解決消息無法路由到指定隊列的問題。當發(fā)送的消息無法被路由到指定隊列時,RabbitMQ會將消息返回給生產者,這時候如果生產者設置了ReturnCallback回調函數(shù),就可以在回調函數(shù)中處理這種情況。
ReturnCallback機制的使用場景一般是在消息發(fā)送時,指定了mandatory參數(shù)為true,表示如果消息無法被路由到指定隊列,則將消息返回給生產者。如果mandatory參數(shù)為false,則消息會被直接丟棄。
當生產者設置了ReturnCallback回調函數(shù)后,RabbitMQ在將消息返回給生產者時,會觸發(fā)該回調函數(shù)。在ReturnCallback回調函數(shù)中,可以處理消息無法路由的情況,例如重發(fā)消息、記錄日志等。
下面是一個使用ReturnCallback機制的示例代碼:
channel.addReturnListener(new ReturnCallback() { @Override public void handle(ReturnedMessage returnedMessage) { String message = new String(returnedMessage.getBody()); System.out.println("Message returned: " + message); } }); channel.basicPublish(exchangeName, routingKey, true, null, message.getBytes());
在上面的代碼中,我們通過addReturnListener方法設置了ReturnCallback回調函數(shù),當消息無法路由到指定隊列時,會觸發(fā)該回調函數(shù)。在回調函數(shù)中,我們將返回的消息打印出來,以便處理。
需要注意的是,ReturnCallback機制只有在消息被發(fā)送到交換機后,才會觸發(fā)。如果消息發(fā)送的交換機不存在,或者路由鍵不符合任何綁定規(guī)則,消息會被直接丟棄,不會觸發(fā)ReturnCallback回調函數(shù)。
在配置文件中配置:
spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
confrim 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 里。在有 些業(yè)務場景下,我們需要保證消息一定要投遞到目標 queue 里,此時就需要用到return 退回模式。
這樣如果未能投遞到目標 queue 里將調用 returnCallback ,可以記錄下詳細到投遞數(shù)據(jù),定期的巡檢或者自動糾錯都需要這些數(shù)據(jù)
??RabbitMQ自動確認和手動確認
RabbitMQ消息確認機制是一種保證消息可靠抵達的機制。在RabbitMQ中,消息確認機制分為兩種模式:自動確認模式和手動確認模式。
在自動確認模式下,當消費者收到消息并將其處理完畢后,RabbitMQ會自動將該消息標記為已確認,然后將其從隊列中刪除。這種模式比較簡單,但是存在消息丟失的風險,因為如果消費者在處理消息的過程中出現(xiàn)異常,消息就會被丟失。
在手動確認模式下,當消費者收到消息并將其處理完畢后,需要向RabbitMQ發(fā)送一個確認消息,告訴RabbitMQ該消息已經被處理完畢,可以從隊列中刪除。如果消費者在處理消息的過程中出現(xiàn)異常,可以選擇不發(fā)送確認消息,這樣消息就不會被從隊列中刪除,可以重新被其他消費者獲取。
手動確認模式可以保證消息不會被丟失,但是需要消費者編寫額外的代碼來處理確認消息的發(fā)送。此外,手動確認模式還可以設置確認模式為批量確認模式,即一次性確認多個消息,可以提高消息處理的效率。
總結:
- 自動ACK模式
在自動ACK模式下,消息一旦被消費者接收到,就會自動被確認。這種模式下,消息一旦被發(fā)送到消費者,就會從隊列中刪除,無論消費者是否已經成功消費該消息。
- 手動ACK模式
在手動ACK模式下,消費者必須手動發(fā)送ACK消息來確認消息已經被成功消費。如果消費者沒有發(fā)送ACK消息,RabbitMQ服務器就會認為該消息還沒有被消費,會將該消息重新發(fā)送給其他消費者。這種模式下,消費者可以選擇性地確認消息,只有當消費者成功消費一條消息并確認后,該消息才會從隊列中刪除。
手動ACK模式可以保證消息的可靠性,但是需要消費者在處理消息時進行額外的處理,消費者需要在處理消息后發(fā)送ACK消息,否則消息會被重新發(fā)送。
在 RabbitMQ 中,消費者可以通過設置 channel.basicAck(deliveryTag, multiple) 方法來發(fā)送 ack 消息。其中,deliveryTag 表示消息的唯一標識符,multiple 表示是否批量確認。如果 multiple 為 true,表示要確認該 deliveryTag 及其之前的所有消息;如果 multiple 為 false,表示只確認該 deliveryTag 指定的一條消息。
需要注意的是,如果消費者在處理消息時發(fā)生了異常,消息并沒有被成功處理,那么不應該發(fā)送 ack 消息,而應該將消息重新放回隊列中,讓其他消費者來處理
??RabbitMQ處理消息方法
RabbitMQ提供了一些基本的方法來處理消息,這些方法包括:
basic.publish
: 發(fā)布消息到指定的交換機上。basic.consume
: 消費消息,啟動一個消費者來監(jiān)聽指定隊列上的消息。basic.ack
: 確認消息已經被消費,告訴RabbitMQ可以刪除該消息。basic.nack
: 否認消息已經被消費,告訴RabbitMQ需要重新發(fā)送該消息。basic.reject
: 拒絕消息,告訴RabbitMQ不需要再次發(fā)送該消息。basic.get
: 獲取指定隊列上的一條消息。basic.cancel
: 取消消費者的消費,停止監(jiān)聽指定隊列上的消息。
這些方法都是基于AMQP協(xié)議定義的,可以使用RabbitMQ提供的客戶端庫或者自己實現(xiàn)AMQP協(xié)議來調用這些方法。
到此這篇關于Java中RabbitMQ的幾種消息確認機制的文章就介紹到這了,更多相關Java RabbitMQ消息確認機制內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring MVC 關于controller的字符編碼問題
在使用springMVC框架構建web應用,客戶端常會請求字符串、整型、json等格式的數(shù)據(jù),通常使用@ResponseBody注解使 controller回應相應的數(shù)據(jù)而不是去渲染某個頁面。2017-03-03Spring實戰(zhàn)之使用注解實現(xiàn)聲明式事務操作示例
這篇文章主要介紹了Spring實戰(zhàn)之使用注解實現(xiàn)聲明式事務操作,結合實例形式詳細分析了spring使用注解實現(xiàn)聲明式事務相關配置、接口實現(xiàn)與使用技巧,需要的朋友可以參考下2020-01-01