rabbitmq消息ACK確認(rèn)機(jī)制及發(fā)送失敗處理方式
rabbitmq為確保消息發(fā)送和接收成功,采用ack機(jī)制。
(1)生產(chǎn)者producter發(fā)送消息到mq時(shí),mq會(huì)發(fā)送ack給producter告知消息是否投遞成功;
(2)消費(fèi)者consumer接收處理消息后,consumer會(huì)發(fā)送ack給mq告知消息是否處理成功;
通過(guò)ack機(jī)制,確保消息能夠被producter成功發(fā)送和consumer成功接收處理,保證消息不丟失。
1、消息發(fā)送
rabbitmq消息發(fā)送分為兩個(gè)階段:
(1)producter將消息發(fā)送到broker,即發(fā)送到exchage交換機(jī);
(2)消息通過(guò)交換機(jī)exchange被路由到隊(duì)列queue;
消息只有被正確投遞到隊(duì)列queue中,才算發(fā)送成功。
消息發(fā)送代碼:
public boolean send(String queueName, String json, String msgId){ Message message = MessageBuilder.withBody(json.getBytes()).setCorrelationId(msgId).build(); message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//設(shè)置消息持久化 CorrelationDataExt correlationData = new CorrelationDataExt(); correlationData.setId(msgId); correlationData.setData(json); rabbitTemplate.setEncoding("UTF-8"); rabbitTemplate.setMandatory(true);//設(shè)置手工ack確認(rèn) rabbitTemplate.setConfirmCallback(this);//ack回調(diào) rabbitTemplate.setReturnCallback(this);//回退回調(diào) rabbitTemplate.convertAndSend(queueName, message, correlationData); return true; }
在消息發(fā)送之前,我們要設(shè)置ack機(jī)制相關(guān)參數(shù):
- setMandatory:設(shè)置手工確認(rèn)ack;
- setConfirmCallback:設(shè)置消息發(fā)送到exchange結(jié)果回調(diào);
- setReturnCallback:設(shè)置消息投遞到queue失敗回退時(shí)回調(diào);
通過(guò)上述兩個(gè)回調(diào)方法,我們能夠?qū)Πl(fā)送失敗的消息進(jìn)行重發(fā)處理,確保消息不丟失。
2、消息發(fā)送失敗
根據(jù)rabbitmq發(fā)送過(guò)程,消息發(fā)送失敗的有三種情況會(huì)出現(xiàn):
(1)producter連接mq失敗,消息沒(méi)有發(fā)送到mq
(2)producter連接mq成功,但是發(fā)送到exchange失敗
(3)消息發(fā)送到exchange成功,但是路由到queue失??;
3、發(fā)送失敗處理
(1)producter連接mq失敗,消息沒(méi)有發(fā)送到mq
這種情況下,在發(fā)送消息時(shí)可以通過(guò)捕捉AmqpException異常,將消息保存db中后續(xù)進(jìn)行重發(fā)處理。
try{ rabbitTemplate.convertAndSend(queueName, message, correlationData); }catch (Exception e){ logger.error("連接MQ失敗", e); //todo 存儲(chǔ)到db中進(jìn)行重發(fā) }
(2)producter連接mq成功,但是發(fā)送到exchange失敗
通過(guò)實(shí)現(xiàn)ConfirmCallback接口,對(duì)發(fā)送結(jié)果進(jìn)行處理。
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData.getId(); if(ack){ //發(fā)送成功 logger.debug("ack,消息投遞到exchange成功,msgId:{}",msgId); }else{ //發(fā)送失敗,重試 logger.error("ack,消息投遞exchange失敗,msgId:{},原因{}" ,msgId, cause); } }
confirm方法有3個(gè)參數(shù),correlationData是消息發(fā)送時(shí)攜帶的數(shù)據(jù)對(duì)象,ack消息是否成功發(fā)送到exchange,cause是發(fā)送失敗時(shí)的原因。
通過(guò)ack我們可以判斷發(fā)送到exchange是否成功,如果ack=false,則我們進(jìn)行失敗處理。
但是這里存在一個(gè)問(wèn)題,correlationData里面只有一個(gè)id屬性,沒(méi)有關(guān)于消息內(nèi)容的屬性,對(duì)于數(shù)據(jù)失敗處理非常不方便。
為解決此問(wèn)題,我們可以自定義一個(gè)CorrelationData擴(kuò)展對(duì)象,繼承CorrelationData,并添加自己想要保存數(shù)據(jù)的屬性,在消息發(fā)送時(shí),攜帶相關(guān)數(shù)據(jù)在該對(duì)象上即可。
自定義CorrelationData對(duì)象:
/** * CorrelationData的自定義實(shí)現(xiàn),用于拿到消息內(nèi)容 */ public class CorrelationDataExt extends CorrelationData { //數(shù)據(jù) private volatile Object data; //隊(duì)列 private String queueName; public Object getData() { return data; } public void setData(Object data) { this.data = data; } public String getQueueName() { return queueName; } public void setQueueName(String queueName) { this.queueName = queueName; } }
重寫(xiě)發(fā)送方法,使用CorrelationDataExt對(duì)象攜帶數(shù)據(jù):
public boolean send(String queueName, String json, String msgId){ Message message = MessageBuilder.withBody(json.getBytes()).setCorrelationId(msgId).build(); message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//設(shè)置消息持久化 //使用自定義的數(shù)據(jù)對(duì)象 CorrelationDataExt correlationData = new CorrelationDataExt(); correlationData.setId(msgId); correlationData.setData(json); correlationData.setQueueName(queueName); rabbitTemplate.setEncoding("UTF-8"); rabbitTemplate.setMandatory(true);//設(shè)置手工ack確認(rèn) rabbitTemplate.setConfirmCallback(this);//設(shè)置發(fā)送成功回調(diào) rabbitTemplate.setReturnCallback(this);//設(shè)置消息回退回調(diào) try{ rabbitTemplate.convertAndSend(queueName, message, correlationData);//使用amqp default exchange direct }catch (Exception e){ logger.error("MQ連接失敗,請(qǐng)聯(lián)系管理員處理!!!!"); //保存到db重發(fā) saveToDB(msgId, json, queueName, "90"); } return true; }
重寫(xiě)confirm方法,對(duì)CorrelationData進(jìn)行處理:
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData.getId(); if(ack){ //發(fā)送成功 logger.debug("ack,消息投遞到exchange成功,msgId:{}",msgId); }else{ //發(fā)送失敗,重試 logger.error("ack,消息投遞exchange失敗,msgId:{},原因{}" ,msgId, cause); if(correlationData instanceof CorrelationDataExt){ CorrelationDataExt correlationDataExt = (CorrelationDataExt) correlationData; String message = (String) correlationDataExt.getData(); String queueName = ((CorrelationDataExt) correlationData).getQueueName(); saveToDB(msgId, message, queueName, "91"); }else{ logger.info("correlationData對(duì)象不包含數(shù)據(jù)"); } } }
(3)消息發(fā)送到exchange成功,但是路由到queue失敗
通過(guò)實(shí)現(xiàn)ReturnCallback接口,對(duì)回退消息進(jìn)行重發(fā)處理。
@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.error("消息發(fā)送失敗-消息回退,應(yīng)答碼:{},原因:{},交換機(jī):{},路由鍵:{}", replyCode, replyText, exchange, routingKey); String msgId = message.getMessageProperties().getCorrelationId(); String data = new String(message.getBody()); saveToDB(msgId, data, routingKey, "92"); }
關(guān)于對(duì)失敗消息的處理,我這里是統(tǒng)一保存到DB中,后續(xù)通過(guò)定時(shí)任務(wù)進(jìn)行重發(fā)處理的。
通過(guò)以上3個(gè)方面對(duì)失敗消息的處理,可以確保消息能夠成功發(fā)送到mq,確保不丟失。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java二維數(shù)組實(shí)現(xiàn)推箱子小游戲
這篇文章主要為大家詳細(xì)介紹了java二維數(shù)組實(shí)現(xiàn)推箱子小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-11-11springboot實(shí)現(xiàn)郵箱驗(yàn)證碼功能
這篇文章主要為大家詳細(xì)介紹了springboot實(shí)現(xiàn)郵箱驗(yàn)證碼功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-02-02Feign Client 超時(shí)時(shí)間配置不生效的解決
這篇文章主要介紹了Feign Client 超時(shí)時(shí)間配置不生效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09Java實(shí)現(xiàn)動(dòng)態(tài)獲取圖片驗(yàn)證碼的示例代碼
這篇文章主要介紹了Java實(shí)現(xiàn)動(dòng)態(tài)獲取圖片驗(yàn)證碼的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08詳解Spring-Boot集成Spring session并存入redis
這篇文章主要介紹了詳解Spring-Boot集成Spring session并存入redis,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-05-05Java實(shí)現(xiàn)跳躍表(skiplist)的簡(jiǎn)單實(shí)例
這篇文章主要介紹了Java編程中跳躍表的概念和實(shí)現(xiàn)原理,并簡(jiǎn)要敘述了它的結(jié)構(gòu),具有一定參考價(jià)值,需要的朋友可以了解下。2017-09-09Spring @Transactional注解的聲明式事務(wù)簡(jiǎn)化業(yè)務(wù)邏輯中的事務(wù)管理
這篇文章主要為大家介紹了Spring @Transactional注解的聲明式事務(wù)簡(jiǎn)化業(yè)務(wù)邏輯中的事務(wù)管理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10Spring?Boot?中的?Native?SQL基本概念及使用方法
在本文中,我們介紹了 Spring Boot 中的 Native SQL,以及如何使用 JdbcTemplate 和 NamedParameterJdbcTemplate 來(lái)執(zhí)行自定義的 SQL 查詢(xún)或更新語(yǔ)句,需要的朋友跟隨小編一起看看吧2023-07-07詳解Spring Boot 目錄文件結(jié)構(gòu)
這篇文章主要介紹了Spring Boot 目錄文件結(jié)構(gòu)的相關(guān)資料,文中示例代碼非常詳細(xì),幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下2020-07-07