RabbitMQ進(jìn)階之消息可靠性詳解
消息的可靠性
Rabbitmq消息的投遞過程中,怎么確保消息能不丟失,這是一個(gè)很重要的問題。哪怕我們做了Rabbitmq持久化,也不能保證我們的業(yè)務(wù)消息不會(huì)被丟失。
我們可以從消息的收發(fā)過程中來分析,消息首先要從生產(chǎn)者producer發(fā)送到broker,再?gòu)腷roker把消息發(fā)送給消費(fèi)者consumer。
所以我們總的可以從發(fā)送方(生產(chǎn)者)確認(rèn)和接收方(消費(fèi)者)確認(rèn)來保證消息的可靠性。
異常捕獲機(jī)制
先執(zhí)行業(yè)務(wù)操作,業(yè)務(wù)操作成功后執(zhí)行行消息發(fā)送,消息發(fā)送過程通過try catch 方式捕獲異常, 在異常處理理的代碼塊中執(zhí)行回滾業(yè)務(wù)操作或者執(zhí)行重發(fā)操作等。
這是一種最大努力確保的方式, 并無法保證100%絕對(duì)可靠,因?yàn)檫@里沒有異常并不代表消息就一定投遞成功。
另外,可以通過spring.rabbitmq.template.retry.enabled=true 配置開啟發(fā)送端的重試。
AMQP/RabbitMQ的事務(wù)機(jī)制
沒有捕獲到異常并不能代表消息就一定投遞成功了。 一直到事務(wù)提交后都沒有異常,確實(shí)就說明消息是投遞成功了。
但是,這種方式在性能方面的開銷 比較大,一般也不推薦使用。
- 事務(wù)實(shí)現(xiàn)
channel.txSelect(): 將當(dāng)前信道設(shè)置成事務(wù)模式 channel.txCommit(): 用于提交事務(wù) channel.txRollback(): 用于回滾事務(wù)
發(fā)送端確認(rèn)機(jī)制
RabbitMQ后來引入了一種輕量量級(jí)的方式,叫發(fā)送方確認(rèn)(publisher confirm)機(jī)制。生產(chǎn)者將信 道設(shè)置成confirm(確認(rèn))模式,一旦信道進(jìn)入confirm 模式,所有在該信道上?面發(fā)布的消息都會(huì)被指派 一個(gè)唯一的ID(從1 開始),一旦消息被投遞到所有匹配的隊(duì)列之后(如果消息和隊(duì)列是持久化的,那么 確認(rèn)消息會(huì)在消息持久化后發(fā)出),RabbitMQ 就會(huì)發(fā)送一個(gè)確認(rèn)(Basic.Ack)給生產(chǎn)者(包含消息的唯一 ID),這樣生產(chǎn)者就知道消息已經(jīng)正確送達(dá)了。
RabbitMQ 回傳給生產(chǎn)者的確認(rèn)消息中的deliveryTag 字段包含了確認(rèn)消息的序號(hào),另外,通過設(shè)置channel.basicAck方法中的multiple參數(shù),表示到這個(gè)序號(hào)之前的所有消息是否都已經(jīng)得到了處理了。生產(chǎn)者投遞消息后并不需要一直阻塞著,可以繼續(xù)投遞下一條消息并通過回調(diào)方式處理理ACK響應(yīng)。
如果 RabbitMQ 因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失等異常情況發(fā)生,就會(huì)響應(yīng)一條nack(Basic.Nack)命令,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理理該 nack 命令。
package confirm; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import util.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PublisherConfirmsProducer { public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // 向RabbitMQ服務(wù)器發(fā)送AMQP命令,將當(dāng)前通道標(biāo)記為發(fā)送方確認(rèn)通道 final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); channel.queueDeclare("queue.pc", true, false, false, null); channel.exchangeDeclare("ex.pc", "direct", true, false, null); channel.queueBind("queue.pc", "ex.pc", "key.pc"); try { // 發(fā)送消息 for (int i = 1 ; i < 10000 ; i++){ channel.basicPublish("ex.pc", "key.pc", null, "hello world".getBytes()); } // 同步的方式等待RabbitMQ的確認(rèn)消息 channel.waitForConfirmsOrDie(5000); System.out.println("發(fā)送的消息已經(jīng)得到確認(rèn)"); } catch (IOException ex) { System.out.println("消息被拒收"); } catch (IllegalStateException ex) { System.out.println("發(fā)送消息的通道不是PublisherConfirms通道"); } catch (TimeoutException ex) { System.out.println("等待消息確認(rèn)超時(shí)"); } channel.close(); connection.close(); } }
waitForConfirm方法有個(gè)重載的,可以自定義timeout超時(shí)時(shí)間,超時(shí)后會(huì)拋TimeoutException。類似的有幾個(gè)waitForConfirmsOrDie方法,Broker端在返回nack(Basic.Nack)之后該方法會(huì)拋出java.io.IOException。
需要根據(jù)異常類型來做區(qū)別處理理, TimeoutException超時(shí)是屬于第三狀態(tài)(無法確定成功還是失?。?,而返回Basic.Nack拋出IOException這種是明確的失敗。上面的代碼主要只是演示confirm機(jī)制,實(shí)際上還是同步阻塞模式的,性能并不不是太好。
實(shí)際上,我們也可以通過“批處理理”的方式來改善整體的性能(即批量量發(fā)送消息后僅調(diào)用一次 waitForConfirms方法)。正常情況下這種批量處理的方式效率會(huì)高很多,但是如果發(fā)生了超時(shí)或者nack(失?。┖竽蔷托枰苛恐匕l(fā)消息或者通知上游業(yè)務(wù)批量回滾(因?yàn)槲覀冎恢肋@個(gè)批次中有消息沒投遞成功,而并不知道具體是那條消息投遞失敗了,所以很難針對(duì)性處理),如此看來,批量重發(fā)消息肯定會(huì)造成部分消息重復(fù)。
另外,我們可以通過異步回調(diào)的方式來處理Broker的響應(yīng)。addConfirmListener 方法可以添加ConfirmListener 這個(gè)回調(diào)接口,這個(gè) ConfirmListener 接口包含兩個(gè)方法:handleAck 和handleNack,分別用來處理 RabbitMQ 回傳的 Basic.Ack 和 Basic.Nack。
package confirm; /** * 創(chuàng)建者: 魏紅 * 創(chuàng)建時(shí)間: 2023-02-28 * 描述: */ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import util.ConnectionUtil; public class PublisherConfirmsProducer2 { public static void main(String[] args) throws Exception { //獲取連接 Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // 向RabbitMQ服務(wù)器發(fā)送AMQP命令,將當(dāng)前通道標(biāo)記為發(fā)送方確認(rèn)通道 final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); channel.queueDeclare("queue.pc", true, false, false, null); channel.exchangeDeclare("ex.pc", "direct", true, false, null); channel.queueBind("queue.pc", "ex.pc", "key.pc"); String message = "hello-"; // 批處理的大小 int batchSize = 10; // 用于對(duì)需要等待確認(rèn)消息的計(jì)數(shù) int outstrandingConfirms = 0; for (int i = 0; i < 10000; i++) { channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes()); outstrandingConfirms++; if (outstrandingConfirms == batchSize) { // 此時(shí)已經(jīng)有一個(gè)批次的消息需要同步等待broker的確認(rèn)消息 // 同步等待 channel.waitForConfirmsOrDie(5000); System.out.println("消息已經(jīng)被確認(rèn)了"); outstrandingConfirms = 0; } } if (outstrandingConfirms > 0) { channel.waitForConfirmsOrDie(5000); System.out.println("剩余消息已經(jīng)被確認(rèn)了"); } channel.close(); connection.close(); } }
還可以使用異步方法:
package confirm; /** * 創(chuàng)建者: 魏紅 * 創(chuàng)建時(shí)間: 2023-02-28 * 描述: */ import com.rabbitmq.client.*; import util.ConnectionUtil; import javax.management.loading.MLet; import java.io.IOException; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; public class PublisherConfirmsProducer3 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // 向RabbitMQ服務(wù)器發(fā)送AMQP命令,將當(dāng)前通道標(biāo)記為發(fā)送方確認(rèn)通道 final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); channel.queueDeclare("queue.pc", true, false, false, null); channel.exchangeDeclare("ex.pc", "direct", true, false, null); channel.queueBind("queue.pc", "ex.pc", "key.pc"); // ConfirmCallback clearOutstandingConfirms = new ConfirmCallback() { // @Override // public void handle(long deliveryTag, boolean multiple) throws IOException { // if (multiple) { // System.out.println("編號(hào)小于等于 " + deliveryTag + " 的消息都已經(jīng)被確認(rèn)了"); // } else { // System.out.println("編號(hào)為:" + deliveryTag + " 的消息被確認(rèn)"); // } // } // }; ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); ConfirmCallback clearOutstandingConfirms = (deliveryTag, multiple) -> { if (multiple) { System.out.println("編號(hào)小于等于 " + deliveryTag + " 的消息都已經(jīng)被確認(rèn)了"); final ConcurrentNavigableMap<Long, String> headMap = outstandingConfirms.headMap(deliveryTag, true); // 清空outstandingConfirms中已經(jīng)被確認(rèn)的消息信息 headMap.clear(); } else { // 移除已經(jīng)被確認(rèn)的消息 outstandingConfirms.remove(deliveryTag); System.out.println("編號(hào)為:" + deliveryTag + " 的消息被確認(rèn)"); } }; ConfirmCallback confirmCallback = (deliveryTag, multiple) -> { if (multiple) { // 將沒有確認(rèn)的消息記錄到一個(gè)集合中 // 此處省略實(shí)現(xiàn) System.out.println("消息編號(hào)小于等于:" + deliveryTag + " 的消息 不確認(rèn)"); } else { System.out.println("編號(hào)為:" + deliveryTag + " 的消息不確認(rèn)"); } }; // 設(shè)置channel的監(jiān)聽器,處理確認(rèn)的消息和不確認(rèn)的消息 channel.addConfirmListener(clearOutstandingConfirms, confirmCallback); String message = "hello-"; for (int i = 0; i < 500000; i++) { // 獲取下一條即將發(fā)送的消息的消息ID final long nextPublishSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes()); System.out.println("編號(hào)為:" + nextPublishSeqNo + " 的消息已經(jīng)發(fā)送成功,尚未確認(rèn)"); outstandingConfirms.put(nextPublishSeqNo, (message + i)); } // 等待消息被確認(rèn) Thread.sleep(10000); channel.close(); connection.close(); } }
持久化存儲(chǔ)機(jī)制
持久化是提高RabbitMQ可靠性的基礎(chǔ),否則當(dāng)RabbitMQ遇到異常時(shí)(如:重啟、斷電、停機(jī)等)數(shù)據(jù)將會(huì)丟失。主要從以下幾個(gè)方面來保障消息的持久性:
- Exchange的持久化。通過定義時(shí)設(shè)置durable 參數(shù)為ture來保證Exchange相關(guān)的元數(shù)據(jù)不不丟失。
- Queue的持久化。也是通過定義時(shí)設(shè)置durable 參數(shù)為ture來保證Queue相關(guān)的元數(shù)據(jù)不不丟失。
- 消息的持久化。通過將消息的投遞模式 (BasicProperties 中的 deliveryMode 屬性)設(shè)置為 2即可實(shí)現(xiàn)消息的持久化,保證消息自身不丟失。
接收端確認(rèn)機(jī)制
如何保證消息被消費(fèi)者成功消費(fèi)?
前面我們講了生產(chǎn)者發(fā)送確認(rèn)機(jī)制和消息的持久化存儲(chǔ)機(jī)制,然而這依然無法完全保證整個(gè)過程的 可靠性,因?yàn)槿绻⒈幌M(fèi)過程中業(yè)務(wù)處理失敗了但是消息卻已經(jīng)出列了(被標(biāo)記為已消費(fèi)了),我 們又沒有任何重試,那結(jié)果跟消息丟失沒什么分別。
RabbitMQ在消費(fèi)端會(huì)有Ack機(jī)制,即消費(fèi)端消費(fèi)消息后需要發(fā)送Ack確認(rèn)報(bào)文給Broker端,告知自 己是否已消費(fèi)完成,否則可能會(huì)一直重發(fā)消息直到消息過期(AUTO模式)。這也是我們之前一直在講的“最終一致性”、“可恢復(fù)性” 的基礎(chǔ)。
一般而言,我們有如下處理手段:
- 采用NONE模式,消費(fèi)的過程中自行捕獲異常,引發(fā)異常后直接記錄日志并落到異常恢復(fù)表,再通過后臺(tái)定時(shí)任務(wù)掃描異?;謴?fù)表嘗試做重試動(dòng)作。如果業(yè)務(wù)不自行處理則有丟失數(shù)據(jù)的風(fēng)險(xiǎn)
- 采用AUTO(自動(dòng)Ack)模式,不主動(dòng)捕獲異常,當(dāng)消費(fèi)過程中出現(xiàn)異常時(shí)會(huì)將消息放回Queue中,然后消息會(huì)被重新分配到其他消費(fèi)者節(jié)點(diǎn)(如果沒有則還是選擇當(dāng)前節(jié)點(diǎn))重新被消費(fèi),默認(rèn)會(huì)一直重發(fā)消息并直到消費(fèi)完成返回Ack或者一直到過期
- 采用MANUAL(手動(dòng)Ack)模式,消費(fèi)者自行控制流程并手動(dòng)調(diào)用channel相關(guān)的方法返回Ack
package workmode; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * NONE模式,則只要收到消息后就立即確認(rèn)(消息出列,標(biāo)記已消費(fèi)),有丟失數(shù)據(jù)的風(fēng)險(xiǎn) * AUTO模式,看情況確認(rèn),如果此時(shí)消費(fèi)者拋出異常則消息會(huì)返回到隊(duì)列中 * MANUAL模式,需要顯式的調(diào)用當(dāng)前channel的basicAck方法 */ public class Recer2 { public static void main(String[] args) throws Exception { // 1.獲得連接 Connection connection = ConnectionUtil.getConnection(); // 2.獲得通道(信道) final Channel channel = connection.createChannel(); channel.queueDeclare("work_queue",false,false,false,null); // 3.從信道中獲得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override //交付處理(收件人信息,包裹上的快遞標(biāo)簽,協(xié)議的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); // System.out.println("【顧客2】吃掉 " + s+" ! 總共吃【"+i+++"】串!"); System.out.println("【消費(fèi)者2】得到 " + s); // 模擬網(wǎng)絡(luò)延遲 try{ Thread.sleep(400); }catch (Exception e){ } // 手動(dòng)確認(rèn)(收件人信息,是否同時(shí)確認(rèn)多個(gè)消息) channel.basicAck(envelope.getDeliveryTag(),false); } }; // 4.監(jiān)聽隊(duì)列 false:手動(dòng)消息確認(rèn) channel.basicConsume("work_queue", false,consumer); } }
本小節(jié)的內(nèi)容總結(jié)起來就如圖所示,本質(zhì)上就是“請(qǐng)求/應(yīng)答”確認(rèn)模式
到此這篇關(guān)于RabbitMQ進(jìn)階之消息可靠性詳解的文章就介紹到這了,更多相關(guān)RabbitMQ消息可靠性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java Set接口及常用實(shí)現(xiàn)類總結(jié)
Collection的另一個(gè)子接口就是Set,他并沒有我們List常用,并且自身也沒有一些額外的方法,全是繼承自Collection中的,因此我們還是簡(jiǎn)單總結(jié)一下,包括他的常用實(shí)現(xiàn)類HashSet、LinkedHashSet、TreeSet的總結(jié)2023-01-01一次排查@CacheEvict注解失效的經(jīng)歷及解決
這篇文章主要介紹了一次排查@CacheEvict注解失效的經(jīng)歷及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12Maven多模塊及version修改的實(shí)現(xiàn)方法
這篇文章主要介紹了Maven多模塊及version修改的實(shí)現(xiàn)方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-06-06Spring Boot 2 實(shí)戰(zhàn):自定義啟動(dòng)運(yùn)行邏輯實(shí)例詳解
這篇文章主要介紹了Spring Boot 2 實(shí)戰(zhàn):自定義啟動(dòng)運(yùn)行邏輯,結(jié)合實(shí)例形式詳細(xì)分析了Spring Boot 2自定義啟動(dòng)運(yùn)行邏輯詳細(xì)操作技巧與注意事項(xiàng),需要的朋友可以參考下2020-05-05java多線程之wait(),notify(),notifyAll()的詳解分析
本篇文章是對(duì)java多線程 wait(),notify(),notifyAll()進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-06-06JAVA集成Freemarker生成靜態(tài)html過程解析
這篇文章主要介紹了JAVA集成Freemarker生成靜態(tài)html過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06