詳解RabbitMq如何做到消息的可靠性投遞
前言
現(xiàn)在的一些互聯(lián)網(wǎng)項目或者是高并發(fā)的項目中很少有沒有引入消息隊列的。 引入消息隊列可以給這個項目帶來很多的好處:比如
- 削峰
這個就很好的理解,在系統(tǒng)中的請求量是固定的,但是有的時候會多出很多的突發(fā)流量,比如在有秒殺活動的時候,這種瞬時的高流量可能會打垮系統(tǒng),這個時候就可以很好的引入MQ,將這些請求積壓到MQ中,然后消費端在按照自已的能力去處理這里請求
- 解耦合
比如現(xiàn)在有系統(tǒng)A,當(dāng)系統(tǒng)A執(zhí)行完成后,B、C系統(tǒng)需要拿到A系統(tǒng)的結(jié)果才可以繼續(xù)執(zhí)行,如果不引入MQ,A系統(tǒng)還要調(diào)用B、C系統(tǒng),這樣這A、B、C三個系統(tǒng)的耦合性就很大。引入MQ后A系統(tǒng)的執(zhí)行結(jié)果只需要保證將消息投遞到MQ就好,其它的兩個系統(tǒng)只需要監(jiān)聽這個MQ的某個隊列,這樣就降低了這三個系統(tǒng)之間的耦合性。
- 異步
再通過A、B、C這三個系統(tǒng)舉例。A系統(tǒng)在返回給用戶的執(zhí)行結(jié)果前需要完成B、C系統(tǒng)的調(diào)用,這個總的執(zhí)行時間是A+B+C的執(zhí)行時間,如果引入MQ,A系統(tǒng)的執(zhí)行完成后將數(shù)據(jù)投遞到MQ,直接響應(yīng)用戶。B、C再這在通過監(jiān)聽完成數(shù)據(jù)的處理。這樣也降低了用戶的等待時間
除了這些好處,當(dāng)然引入MQ還會有不好的地方:比如
- 數(shù)據(jù)一致性問題
- A系統(tǒng)執(zhí)行完將數(shù)據(jù)投遞到了MQ,B、C在消費的時候如果出現(xiàn)了問題,是不是就導(dǎo)致了數(shù)據(jù)不一致的問題
- 可用性降低
- 一個好好的系統(tǒng),引入一個MQ,如果這個MQ拓機(jī)了呢?這個可能就需要集群來提高M(jìn)Q的高可用。
- 系統(tǒng)的復(fù)雜度提高
- 引入了MQ,我們還需要關(guān)注消息是否被成功的投遞,MQ中的消息被積壓太多怎么辦?消費端是否成功的消費的消息。
這些都是問題,所在是否要引入MQ還需要看業(yè)務(wù)需求
RabbitMq的投遞及消費流程
這里有張投遞消息到消費的流程圖
從這張圖上可看出這也是一種AMQP協(xié)議的實現(xiàn)。消息的提供者先是通過某一個信道將消息發(fā)送到交換機(jī),然后交換機(jī)通通RoutingKey來將消息分發(fā)到某一個隊列上。然后,消費者在臨聽某一個隊列來進(jìn)行消息的消費。
今天我們的主題是如何保證消息的投遞可靠性。那么我們來想想在這個流程中那些位置可能會影響我們消息的投遞可靠性?
從上圖中我們可以總結(jié)出有二個因素影響著消息是否被成功投遞和被成功消費
提供者
- 提供者有沒有將消成功的發(fā)送到MQ并被處理
- 發(fā)送到MQ中的消息有沒有成功的被路由到隊列中
消費者
- 消費者有沒有成功的簽收消息并成功處理。
- 消費者是否可以保證消費者的穩(wěn)定性
提供者如何確保消息的成功投遞
解決這個問題,我們可以通過提供者的發(fā)送方確認(rèn)機(jī)制來實現(xiàn),這個發(fā)送方確認(rèn)機(jī)制又分成三種:
- 單條消息的同步確認(rèn)
- 多條消息的同步確認(rèn)
- 異步消息確認(rèn)
單條消息的同步確認(rèn)
首先要在當(dāng)前的Channel上開啟消息確認(rèn)模式,然后通過waitForConfirms()方法進(jìn)行消息確認(rèn)是否發(fā)送成功。
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("host"); cf.setPort(5672); cf.setUsername("賬號"); cf.setPassword("密碼"); try(Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.confirmSelect(); Map<String,String> mes = new HashMap<>(); mes.put("name","1111"); String messageStr = objectMapper.writeValueAsString(mes); channel.basicPublish( "exchange.drinks", "drinks.juzi", null, messageStr.getBytes()); boolean isSendSuccess = channel.waitForConfirms(); if(isSendSuccess){ System.out.print("消息發(fā)送成功"); } } }
這樣做的話每次發(fā)完消息后,都會確保消息是否發(fā)送成功。如果發(fā)送失敗的話進(jìn)行相應(yīng)的處理。
多條消息的同步確認(rèn)
多條消息的確認(rèn)和單條的差不多,比如我將發(fā)送消息的代碼放到一個循環(huán)內(nèi)。
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("host"); cf.setPort(5672); cf.setUsername("賬號"); cf.setPassword("密碼"); try(Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.confirmSelect(); Map<String,String> mes = new HashMap<>(); mes.put("name","1111"); String messageStr = objectMapper.writeValueAsString(mes); for(int i = 0;i < 100;i++){ channel.basicPublish( "exchange.drinks", "drinks.juzi", null, messageStr()); } boolean isSendSuccess = channel.waitForConfirms(); System.out.println(isSendSuccess); } }
這樣的話當(dāng)一批消息發(fā)送完成后,進(jìn)行統(tǒng)一的消息確認(rèn)是否發(fā)送成功,就成了多條的消息確認(rèn),不過并不推薦使用這種確認(rèn)消息的方式
在多條的消息確認(rèn)中,比如我先是發(fā)送了一批的消息,比如這批消息有100條,這個時候如果有其中的一條消息沒有發(fā)送成功,這里返回的也是false,然爾我們并不能知道是具體的哪 一條消息發(fā)送失敗。
異步消息確認(rèn)
異步的消息確認(rèn)是通過一個監(jiān)聽器來實現(xiàn)的,當(dāng)消息發(fā)送后,會接著執(zhí)行下面的邏輯,可能在稍會的一段時間,監(jiān)聽器監(jiān)聽到了Broker的返回,再進(jìn)行邏輯的處理。
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("host"); cf.setPort(5672); cf.setUsername("賬號"); cf.setPassword("密碼"); try(Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.confirmSelect(); ConfirmListener confirmListener = new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("發(fā)送成功:" + deliveryTag + " multiple:" + multiple); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("發(fā)送失?。? + deliveryTag); } }; channel.addConfirmListener(confirmListener); Map<String,String> mes = new HashMap<>(); mes.put("name","11111"); String messageStr = objectMapper.writeValueAsString(mes); for(int i = 0;i < 100;i++){ channel.basicPublish( "exchange.drinks", "drinks.juzi", null, messageStr.getBytes()); } Thread.sleep(Integer.MAX_VALUE); } }
當(dāng)成功的發(fā)送消息的時候會回調(diào)監(jiān)聽器中的handleAck
方法,如果沒有發(fā)送成功會回調(diào)handleNack
方法 在這個監(jiān)聽器里面有兩個參數(shù)一個deliveryTag
和multiple
:
- deliveryTag:表示當(dāng)前的Channel發(fā)送的第幾條消息
- multiple:是否在確認(rèn)多條消息
這個異步的雖然在聽覺上感覺比較厲害些,這里也不推薦使用,原因和上面的一樣,我們并不能具休的知道是哪一條消息沒有被確認(rèn)發(fā)送。
綜上:這里更加推薦單條消息確認(rèn),具體選擇哪一種還是要用業(yè)務(wù)做出選擇
注:注意一點是當(dāng)一條消息成功的發(fā)送到Broker,但是如果沒有正確的路由到隊列,那么這時borker也是會返回true,因為Broker確時接收到了消息只是RoutingKey不可達(dá),所以這里也會返回true,并且直接將消息丟棄
消息的返回機(jī)制
這個消息返回機(jī)制的作用就是在當(dāng)一個消息成功的發(fā)送,但是并沒有正確路由到隊列的時候所回調(diào)的。
這也彌補(bǔ)了上面確認(rèn)消息是否發(fā)送成功但沒有路由到隊列所返回true的問題 在使用消息返回機(jī)制的時候在發(fā)送消息時需要將mandatory
置成true。再添加對應(yīng)的監(jiān)聽器。
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("host"); cf.setPort(5672); cf.setUsername("賬號"); cf.setPassword("密碼"); try(Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.addReturnListener(new ReturnCallback() { @Override public void handle(Return returnMessage) { System.out.println("replyCode:" + returnMessage.getReplyCode() + " replyText:" + returnMessage.getReplyText() + " routingKey:" + returnMessage.getRoutingKey() + " exchange:" + returnMessage.getExchange() + " body:" + new String(returnMessage.getBody())); } }); Map<String,String> mes = new HashMap<>(); mes.put("name","11111"); String messageStr = objectMapper.writeValueAsString(mes); channel.basicPublish( "exchange.drinks", "drinks.juzi1", true, null, messageStr.getBytes()); Thread.sleep(Integer.MAX_VALUE); } }
這里的addReturnListener方法有兩個重載:只不過是handle的參數(shù)不同,一個是參數(shù)都顯示在了參數(shù)列表內(nèi),一個是將參數(shù)封裝到了Return對象內(nèi)。當(dāng)handle被回調(diào)的時候也可以獲取到相應(yīng)的參數(shù)比如:exchange routingkey body。
注:保證消息可靠性投遞的前提是服務(wù)的高可用,服務(wù)不高可用談其它的都是扯
以上就是詳解RabbitMq如何做到消息的可靠性投遞的詳細(xì)內(nèi)容,更多關(guān)于RabbitMq 消息可靠性投遞的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Arthas排查Kubernetes中應(yīng)用頻繁掛掉重啟異常
這篇文章主要為大家介紹了Arthas排查Kubernetes中應(yīng)用頻繁掛掉重啟的異常分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助祝大家多多進(jìn)步2022-02-02詳解Java如何優(yōu)雅的調(diào)用dubbo同時不使用其它jar包
這篇文章主要介紹了如何在不使用他人jar包的情況下優(yōu)雅的進(jìn)行dubbo調(diào)用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2023-02-02Java使用自定義注解實現(xiàn)為事件源綁定事件監(jiān)聽器操作示例
這篇文章主要介紹了Java使用自定義注解實現(xiàn)為事件源綁定事件監(jiān)聽器操作,結(jié)合實例形式分析了java自定義注解、注解處理、事件監(jiān)聽與響應(yīng)等相關(guān)操作技巧,需要的朋友可以參考下2019-10-10Java線程安全的常用類_動力節(jié)點Java學(xué)院整理
在集合框架中,有些類是線程安全的,這些都是jdk1.1中的出現(xiàn)的。在jdk1.2之后,就出現(xiàn)許許多多非線程安全的類。 下面是這些線程安全的同步的類2017-06-06