RabbitMq消息防丟失功能實(shí)現(xiàn)方式講解
1.概述
1.1.數(shù)據(jù)丟失的原因
在消息中有三種可能性造成數(shù)據(jù)丟失:
- 消費(fèi)者消費(fèi)消息失敗
- 生產(chǎn)者生產(chǎn)消息失敗
- MQ數(shù)據(jù)丟失
消費(fèi)者消費(fèi)消息失?。?/p>
RabbitMq存在應(yīng)答機(jī)制,默認(rèn)為自動應(yīng)答,MQ向消費(fèi)者推送一條消息,消費(fèi)者收到這條消息后會返回一個ack(應(yīng)答)給MQ,MQ收到應(yīng)答后會刪除這條消息。
自動應(yīng)答存在一個問題,就是消費(fèi)者收到消息后立馬就會給MQ返回ack,如果消費(fèi)者返回完ack但還沒來的及真正處理這條消息時,消費(fèi)者斷電宕機(jī)了,那么這條消息就丟失了。
這就是由于消費(fèi)者消費(fèi)消息失敗造成的數(shù)據(jù)丟失。
生產(chǎn)者生產(chǎn)數(shù)據(jù)失?。?/p>
生產(chǎn)者向MQ推送了一條消息,但是由于由于諸如網(wǎng)絡(luò)故障等原因mq并沒有收到該條消息,這樣就造成了這條消息的丟失。
MQ數(shù)據(jù)丟失:
MQ的數(shù)據(jù)是存在內(nèi)存中的,諸如斷電等原因可能會造成數(shù)據(jù)的丟失。
1.2.如何防止數(shù)據(jù)丟失
解決以上列舉的數(shù)據(jù)丟失問題的辦法有三種:
- 手動應(yīng)答
- 消息確認(rèn)機(jī)制
- 持久化
手動應(yīng)答:
RabbitMQ默認(rèn)是自動應(yīng)答,消費(fèi)者收到消息后就會自動返回ack給MQ,可以將應(yīng)答模式改為手動應(yīng)答,在消費(fèi)者一側(cè)消息的消費(fèi)動作完成后手動來返回ack給MQ,用來解決“消費(fèi)者消費(fèi)消息失敗”問題。
消息確認(rèn)機(jī)制:
當(dāng)消息隊(duì)列收到消息后,告知生產(chǎn)者,讓生產(chǎn)者感知到自己生產(chǎn)的消息,消息隊(duì)列已經(jīng)接收到,用來解決“生產(chǎn)者生產(chǎn)消息失敗”問題。消息確認(rèn)機(jī)制有兩種實(shí)現(xiàn)方式:
- AMQP事務(wù)
- confirm
持久化:
消息隊(duì)列的消息持久化到磁盤上,用來解決“MQ數(shù)據(jù)丟失”問題。
2.手動應(yīng)答
手動應(yīng)答是通過設(shè)置channel來實(shí)現(xiàn)的,以下為一個完整代碼示例。
配置類:
@Configuration public class config { @Bean public Queue queue(){ return new Queue("queue_01",false); } }
生產(chǎn)者:
@SpringBootTest(classes = Main.class) public class Producer { @Autowired RabbitTemplate rabbitTemplate; @Test public void producerMsg(){ rabbitTemplate.convertAndSend("queue_01","hello_world"); } }
消費(fèi)者:
@Component @Slf4j public class Consumer { @RabbitListener(queues = {"queue_01"}) public void consumerMsg(String msg, Message message,Channel channel){ try { log.info("消費(fèi)者消費(fèi)消息: "+msg); /** * 沒有異常就確認(rèn)消息 * basicAck(long deliveryTag, boolean multiple) * deliveryTag:當(dāng)前消息在隊(duì)列中的的索引; * multiple:為true的話就是批量確認(rèn) */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { /** * 有異常就拒收消息 * basicNack(long deliveryTag, boolean multiple, boolean requeue) * requeue:true為將消息重返當(dāng)前消息隊(duì)列,重新發(fā)送給消費(fèi)者; * false將消息丟棄 */ try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } catch (Exception ex) { log.error(ex.getMessage()); } } } }
3.消息確認(rèn)機(jī)制
AQMP事務(wù)、confirm其實(shí)都是基于channel的。
3.1.AMQP事務(wù)
AMQP事務(wù)和數(shù)據(jù)庫事務(wù)類似,定義一組對MQ的操作,統(tǒng)一提交,成功則全部一起執(zhí)行,失敗則全部回滾。AMQP事務(wù)在spring boot中的使用很簡單,和數(shù)據(jù)庫的事務(wù)一樣,一個注解就可以搞定。
@GetMapping("/direct/wx/transactional") @Transactional(rollbackFor = Exception.class) public String sendDirectMessageTransactional() { rabbitTemplate.convertAndSend("direct_exchange", "wx","hello world!"); log.info("開啟事務(wù)消息機(jī)制"); try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } return "ok"; }
3.2.confirm
confirm是基于channel的,一旦channel進(jìn)入confirm模式,所有在該channel上發(fā)布的消息都會被指派一個唯一的ID(從1開始),消息被投遞道匹配隊(duì)列后broker會發(fā)送一個確認(rèn)消息給生產(chǎn)者。如果消息和隊(duì)列是可持久化的(durable為true),那么確認(rèn)消息會在消息被寫入磁盤后發(fā)出。
confirm最大的好處在于異步,生產(chǎn)者在等待上一條消息的確認(rèn)消息的時候可以繼續(xù)往下發(fā)送。
confirm在spring boot中的使用很簡單,在配置文件中開啟即可,并且支持自定義回調(diào)函數(shù):
配置文件:
spring.rabbitmq.publisher-confirms: true
spring.rabbitmq.publisher-returns: true
生產(chǎn)者:
@Slf4j @Component public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange,String routingKey,Object msg) { // 設(shè)置交換機(jī)處理失敗消息的模式 true 表示消息由交換機(jī) 到達(dá)不了隊(duì)列時,會將消息重新返回給生產(chǎn)者 // 如果不設(shè)置這個指令,則交換機(jī)向隊(duì)列推送消息失敗后,不會觸發(fā) setReturnCallback rabbitTemplate.setMandatory(true); //消息消費(fèi)者確認(rèn)收到消息后,手動ack回執(zhí) rabbitTemplate.setConfirmCallback(this); // 暫時關(guān)閉 return 配置 //rabbitTemplate.setReturnCallback(this); //發(fā)送消息 rabbitTemplate.convertAndSend(exchange,routingKey,msg); } /** * 交換機(jī)并未將數(shù)據(jù)丟入指定的隊(duì)列中時,觸發(fā) * channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes()); * 參數(shù)三:true 表示如果消息無法正常投遞,則return給生產(chǎn)者 ;false 表示直接丟棄 * @param message 消息對象 * @param replyCode 錯誤碼 * @param replyText 錯誤信息 * @param exchange 交換機(jī) * @param routingKey 路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" "); } /** * 消息生產(chǎn)者發(fā)送消息至交換機(jī)時觸發(fā),用于判斷交換機(jī)是否成功收到消息 * @param correlationData 相關(guān)配置信息 * @param ack exchange 交換機(jī),判斷交換機(jī)是否成功收到消息 true 表示交換機(jī)收到 * @param cause 失敗原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause)); log.info("correlationData -->"+correlationData.toString()); if(ack){ // 交換機(jī)接收到 log.info("---- confirm ----ack==true cause="+cause); }else{ // 沒有接收到 log.info("---- confirm ----ack==false cause="+cause); } } }
到此這篇關(guān)于RabbitMq消息防丟失功能實(shí)現(xiàn)方式講解的文章就介紹到這了,更多相關(guān)RabbitMq消息防丟失內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java基于elasticsearch實(shí)現(xiàn)集群管理
這篇文章主要介紹了java基于elasticsearch實(shí)現(xiàn)集群管理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02SpringCloud Gateway 利用 Mysql 實(shí)現(xiàn)動態(tài)路由的方法
這篇文章主要介紹了SpringCloud Gateway 利用 Mysql 實(shí)現(xiàn)動態(tài)路由的方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02解決mybatis-plus新增數(shù)據(jù)自增ID變無序問題
這篇文章主要介紹了解決mybatis-plus新增數(shù)據(jù)自增ID變無序問題,具有很好的參考價(jià)值,希望對大家有所幫助。2023-07-07Java之InputStreamReader類的實(shí)現(xiàn)
這篇文章主要介紹了Java之InputStreamReader類的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11BCryptPasswordEncoder加密與MD5加密的區(qū)別及說明
這篇文章主要介紹了BCryptPasswordEncoder加密與MD5加密的區(qū)別及說明,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08