RabbitMQ消費端ACK NACK及重回隊列機制詳解
ACK和NACK
當我們使用RabbitMQ時用于網(wǎng)絡異常,業(yè)務處理異?;蛘邩I(yè)務錯誤導致消息無法立即消費時,在這種情況下,傳輸中的信息將無法正常投遞——它們需要被重新投遞。Acknowledgements機制讓服務器和客戶端知道何時需要重新投遞。
當設置autoACK=false 時,就可以使用手工ACK。 其實手工方式包括了手工ACK、手工NACK。
- 手工 ACK 時,會發(fā)送給Broker一個應答,代表消息處理成功,Broker就可回送響應給Pro;
- NACK 則表示消息處理失敗,如果設置了重回隊列,Broker端就會將沒有成功處理的消息重新發(fā)送。
使用方式
1、basicNack
Consumer消費時,若由于業(yè)務異常,可手工 NACK 記錄日志,然后進行補償: basic.nack方法為不確認deliveryTag對應的消息,第二個參數(shù)是否應用于多消息,第三個參數(shù)是否requeue,如果requeue 參數(shù)設置為true ,則RabbitMQ 會重新將這條消息存入隊列,以便可以發(fā)送給下一個訂閱的消費者, 如果requeue 參數(shù)設置為false ,則RabbitMQ立即會把消息從隊列中移除,而不會把它發(fā)送給新的消費者。與basic.reject區(qū)別就是同時支持多個消息,可以nack該消費者先前接收未ack的所有消息。nack后的消息也會被自己消費到。
multiple 參數(shù)設置為false 則表示拒絕編號為deliveryTag的這一條消息,這時候basicNack 和basicReject 方法一樣;multiple 參數(shù)設置為true 則表示拒絕deliveryTag 編號之前所有未被當前消費者確認的消息。
- 參數(shù)1: 消息
- 參數(shù)2: 是否應用于多消息
- 參數(shù)3: 是否重新放回隊列,否則丟棄或者進入死信隊列
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
2、basicAck
如果由于服務器宕機等嚴重問題,就需要手工 ACK 保障Con消費成功:deliveryTag : 每次接收消息+1,可以做此消息處理通道的名字。
void basicAck(long deliveryTag, boolean multiple)
3、basicReject
basic.Reject拒絕deliveryTag對應的消息,第二個參數(shù)是否requeue,true則重新入隊列,否則丟棄或者進入死信隊列。該方法reject后,該消費者還是會消費到該條被reject的消息。reject一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用Basic.Nack 這個命令。
- 參數(shù)1: 消息
- 參數(shù)2: 是否重新放回隊列,否則丟棄或者進入死信隊列
void basicReject(long deliveryTag, boolean requeue);
4、basicRecover
basic.recover是否恢復消息到隊列,參數(shù)是是否requeue,true則重新入隊列,并且盡可能的將之前recover的消息投遞給其他消費者消費,而不是自己再次消費。false則消息會重新被投遞給自己。
channel.basicRecover(true);
消費端的重回隊列
重回隊列針對沒有處理成功的消息,將消息重新投遞給Broker。
重回隊列會把消費失敗的消息重新添加到隊列尾端,供Consumer重新消費。
一般在實際應用中,都會關閉重回隊列,即設置為false。
在確認ack后再把消息通過basicPublish到隊列尾部,防止正常消息堆積。
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,JSON.toJSONBytes(new Object()));
實際運用
@RabbitListener(queues = "${rabbitmq.one-queue}",
containerFactory = "rabbitListenerContainerFactory")
@RabbitHandler
public void doRmOnduty(Message message, Channel channel) {
try {
String body = new String(message.getBody());
//log.info("消息處理:{}",body);
} catch (Exception e) {
//消費異常,設置重回隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} finally {
//最終確認消息消費成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
ACK機制可以保證Con拉取到了消息,若處理失敗了,則隊列中還有這個消息,仍然可以給Con處理。ack機制是 Con 告訴 Broker 當前消息是否成功消費,至于 Broker 如何處理 NACK,取決于 Con 是否設置了 requeue:如果 requeue=false, 則NACK 后 Broker 還是會刪除消息的。
但一般處理消息失敗都是因為代碼邏輯出bug,即使隊列中后來仍然保留該消息,然后再給Con消費,依舊報錯。 當然,若一臺機器宕機,消息還有,還可以給另外機器消費,這種情景下 ACK 很有用。
如果不使用 ACK 機制,直接把出錯消息存庫,便于日后查bug或重新執(zhí)行。
以上就是RabbitMQ消費端ACK NACK及重回隊列機制詳解的詳細內(nèi)容,更多關于RabbitMQ消費端ACK NACK的資料請關注腳本之家其它相關文章!
相關文章
JAVA 文件監(jiān)控 WatchService的示例方法
本篇文章主要介紹了JAVA 文件監(jiān)控 WatchService的示例方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-10-10
LoggingEventAsyncDisruptorAppender類執(zhí)行流程源碼解讀
這篇文章主要介紹了LoggingEventAsyncDisruptorAppender類執(zhí)行流程源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12
解決@Test注解在Maven工程的Test.class類中無法使用的問題
這篇文章主要介紹了解決@Test注解在Maven工程的Test.class類中無法使用的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
使用java8 API遍歷過濾文件目錄及子目錄和隱藏文件示例詳解
這篇文章主要介紹了使用java8API遍歷過濾文件目錄及子目錄及隱藏文件示例詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-07-07

