詳解SpringBoot整合RabbitMQ如何實(shí)現(xiàn)消息確認(rèn)
簡介
本文介紹SpringBoot整合RabbitMQ如何進(jìn)行消息的確認(rèn)。
生產(chǎn)者消息確認(rèn)
介紹
發(fā)送消息確認(rèn):用來確認(rèn)消息從 producer發(fā)送到 broker 然后broker 的 exchange 到 queue過程中,消息是否成功投遞。
如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會將消息寫入磁盤之后發(fā)出;如果是鏡像隊(duì)列,所有鏡像接受成功后發(fā)確認(rèn)消息。
流程
- 如果消息沒有到達(dá)exchange,則confirm回調(diào),ack=false
- 如果消息到達(dá)exchange,則confirm回調(diào),ack=true
- exchange到queue成功,則不回調(diào)return
- exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不會回調(diào),這樣消息就丟了)
配置
application.yml
# 發(fā)送者開啟 confirm 確認(rèn)機(jī)制 spring.rabbitmq.publisher-confirms=true # 發(fā)送者開啟 return 確認(rèn)機(jī)制 spring.rabbitmq.publisher-returns=true
ConfirmCallback
ConfirmCallback:消息只要被 RabbitMQ broker 接收到就會觸發(fā)confirm方法。
@Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("confirm==>發(fā)送到broker失敗\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}", correlationData, ack, cause); } else { log.info("confirm==>發(fā)送到broker成功\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}", correlationData, ack, cause); } } }
correlationData:對象內(nèi)部有id (消息的唯一性)和Message。
(若ack為false,則Message不為null,可將Message數(shù)據(jù) 重新投遞;若ack是true,則correlationData為null)
ack:消息投遞到exchange 的狀態(tài),true表示成功。
cause:表示投遞失敗的原因。 (若ack為false,則cause不為null;若ack是true,則cause為null)
給每一條信息添加一個dataId,放在CorrelationData,這樣在RabbitConfirmCallback返回失敗時(shí)可以知道哪個消息失敗。
public void send(String dataId, String exchangeName, String rountingKey, String message){ CorrelationData correlationData = new CorrelationData(); correlationData.setId(dataId); rabbitTemplate.convertAndSend(exchangeName, rountingKey, message, correlationData); } public String receive(String queueName){ return String.valueOf(rabbitTemplate.receiveAndConvert(queueName)); }
2.1版本開始,CorrelationData對象具有ListenableFuture,可用于獲取結(jié)果,而不是在rabbitTemplate上使用ConfirmCallback。
CorrelationData cd1 = new CorrelationData(); this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1); assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnCallback
ReturnCallback:如果消息未能投遞到目標(biāo) queue 里將觸發(fā)returnedMessage方法。
若向 queue 投遞消息未成功,可記錄下當(dāng)前消息的詳細(xì)投遞數(shù)據(jù),方便后續(xù)做重發(fā)或者補(bǔ)償?shù)炔僮鳌?/p>
注意:需要rabbitTemplate.setMandatory(true);
當(dāng)mandatory設(shè)置為true時(shí),若exchange根據(jù)自身類型和消息routingKey無法找到一個合適的queue存儲消息,那么broker會調(diào)用basic.return方法將消息返還給生產(chǎn)者。當(dāng)mandatory設(shè)置為false時(shí),出現(xiàn)上述情況broker會直接將消息丟棄。
代碼:
@Slf4j @Component public class ReturnCallbackService implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" + "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}", message, replyCode, replyText, exchange, routingKey); } }
message(消息體)、replyCode(響應(yīng)code)、replyText(響應(yīng)內(nèi)容)、exchange(交換機(jī))、routingKey(隊(duì)列)。
注冊ConfirmCallback和ReturnCallback
整合后的寫法
package com.example.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Slf4j @Configuration public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); return rabbitTemplate; } // 下邊這樣寫也可以 // @Autowired // private RabbitTemplate rabbitTemplate; // @PostConstruct // public void init() { // rabbitTemplate.setMandatory(true); // rabbitTemplate.setReturnCallback(this); // rabbitTemplate.setConfirmCallback(this); // } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("confirm==>發(fā)送到broker失敗\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}", correlationData, ack, cause); } else { log.info("confirm==>發(fā)送到broker成功\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}", correlationData, ack, cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" + "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}", message, replyCode, replyText, exchange, routingKey); } }
消費(fèi)者消息確認(rèn)
介紹
確認(rèn)方式 | 簡介 | 詳述 |
---|---|---|
auto(默認(rèn)) | 根據(jù)消息消費(fèi)的情況,智能判定 | 若消費(fèi)者拋出異常,則mq不會收到確認(rèn)消息,mq會一直此消息發(fā)出去 若消費(fèi)者沒有拋出異常,則mq會收到確認(rèn)消息,mq不會再次將此消息發(fā)出去。 若消費(fèi)者在消費(fèi)時(shí)所在服務(wù)掛了,mq不會再次將此消息發(fā)出去。 |
none | mq發(fā)出消息后直接確認(rèn)消息 | |
manual | 消費(fèi)端手動確認(rèn)消息 | 消費(fèi)者調(diào)用 ack、nack、reject 幾種方法進(jìn)行確認(rèn),可以在業(yè)務(wù)失敗后進(jìn)行一些操作,如果消息未被 ACK 則消息還會存在于MQ,mq會一直將此消息發(fā)出去。 如果某個服務(wù)忘記 ACK 了,則 RabbitMQ 不會再發(fā)送數(shù)據(jù)給它,因?yàn)?RabbitMQ 認(rèn)為該服務(wù)的處理能力有限。 |
只要消息沒有被消費(fèi)者確認(rèn)(包括沒有自動確認(rèn)),會導(dǎo)致消息一直被失敗消費(fèi),死循環(huán)導(dǎo)致消耗大量資源。正確的處理方式是:發(fā)生異常,將消息記錄到db,再通過補(bǔ)償機(jī)制來補(bǔ)償消息,或者記錄消息的重復(fù)次數(shù),進(jìn)行重試,超過幾次后再放到db中。
消息確認(rèn)三種方式配置方法
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
手動確認(rèn)三種方式
basicAck,basicNack,basicReject
basicAck
含義
表示成功確認(rèn),使用此回執(zhí)方法后,消息會被RabbitMQ broker 刪除。
函數(shù)原型
void basicAck(long deliveryTag, boolean multiple)
deliveryTag
- 消息投遞序號
- 每次消費(fèi)消息或者消息重新投遞后,deliveryTag都會增加。手動消息確認(rèn)模式下,我們可以對指定deliveryTag的消息進(jìn)行ack、nack、reject等操作。
multiple
- 是否批量確認(rèn)
- 值為 true 則會一次性 ack所有小于當(dāng)前消息 deliveryTag 的消息。
示例: 假設(shè)我先發(fā)送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認(rèn),當(dāng)我發(fā)第四條消息此時(shí)deliveryTag為8,multiple設(shè)置為 true,會將5、6、7、8的消息全部進(jìn)行確認(rèn)。
實(shí)例
@RabbitHandler public void process(String content, Channel channel, Message message){ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
basicNack
含義
表示失敗確認(rèn),一般在消費(fèi)消息業(yè)務(wù)異常時(shí)用到此方法,可以將消息重新投遞入隊(duì)列。
函數(shù)原型
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
- deliveryTag:表示消息投遞序號。
- multiple:是否批量確認(rèn)。
- requeue:值為 true 消息將重新入隊(duì)列。
basicReject
含義
拒絕消息,與basicNack區(qū)別在于不能進(jìn)行批量操作,其他用法很相似。
函數(shù)原型
void basicReject(long deliveryTag, boolean requeue)
- deliveryTag:表示消息投遞序號。
- requeue:值為 true 消息將重新入隊(duì)列。
以上就是詳解SpringBoot整合RabbitMQ如何實(shí)現(xiàn)消息確認(rèn)的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ消息確認(rèn)的資料請關(guān)注腳本之家其它相關(guān)文章!
- RabbitMQ的ACK確認(rèn)機(jī)制保障消費(fèi)端消息的可靠性詳解
- springboot實(shí)現(xiàn)rabbitmq消息確認(rèn)的示例代碼
- RabbitMQ的消息確認(rèn)機(jī)制的詳細(xì)總結(jié)
- 一文總結(jié)RabbitMQ中的消息確認(rèn)機(jī)制
- RabbitMQ消息確認(rèn)機(jī)制剖析
- SpringBoot整合RabbitMQ實(shí)現(xiàn)消息確認(rèn)機(jī)制
- springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制(踩坑經(jīng)驗(yàn))
- Java中RabbitMQ的幾種消息確認(rèn)機(jī)制
相關(guān)文章
SpringBoot開發(fā)技巧之使用AOP記錄日志示例解析
這篇文章主要為大家介紹了SpringBoot開發(fā)技巧之如何利用AOP記錄日志的示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2021-10-10java實(shí)用型-高并發(fā)下RestTemplate的正確使用說明
這篇文章主要介紹了java實(shí)用型-高并發(fā)下RestTemplate的正確使用說明,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10springboot整合mybatis實(shí)現(xiàn)簡單的一對多級聯(lián)查詢功能
這篇文章主要介紹了springboot整合mybatis實(shí)現(xiàn)簡單的一對多級聯(lián)查詢功能,分步驟通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-08-08Java批量操作文件系統(tǒng)的實(shí)現(xiàn)示例
文件上傳和下載是java web中常見的操作,本文主要介紹了Java批量操作文件系統(tǒng)的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-03-03SpringBoot服務(wù)訪問路徑動態(tài)處理方式
這篇文章主要介紹了SpringBoot服務(wù)訪問路徑動態(tài)處理方式,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12