java中RabbitMQ高級應(yīng)用
1、消息可靠性投遞
在使用 RabbitMQ
的時候,生產(chǎn)者在進行消息投遞的時候如果想知道消息是否成功的投遞到對應(yīng)的交換機和隊列中,有兩種方式可以用來控制消息投遞的可靠性模式 。
由上圖的整個消息的投遞過程來看,生產(chǎn)者的消息進入到中間件中會首先到達交換機,然后再從交換機傳遞到隊列中去,也就是分為兩步走戰(zhàn)略。那么消息的丟失情況也就是會出現(xiàn)在這兩個階段中,RabbitMQ 貼心的為我們提供了針對于這兩個部分的可靠新傳遞模式:
- confirm 模式。
- return 模式。
利用這兩個回調(diào)模式來確保消息的傳遞可靠。
1.1、確認模式
消息從生產(chǎn)者到交換機之間傳遞會返回一個 confirmCallback
的回調(diào)??梢灾苯釉?rabbitTemplate
實例中進行確認邏輯的設(shè)置。如果是使用 XML
配置的話需要在工廠配置開啟 publisher-confirms="true",YAML
的配置就直接 publisher-confirm-type: correlated,他默認是 NONE
,需要手動開啟。
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq.xml") public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Test public void producer() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println(); if (!b) { // 消息重發(fā)之類的處理 System.out.println(s); } else { System.out.println("交換機成功接收消息"); } } }); rabbitTemplate.convertAndSend("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep(5); } }
上面的確認是由一個 confirm
的函數(shù)執(zhí)行的,里面攜帶了三個參數(shù),第一個是配置的相關(guān)信息,第二個表示交換機是否成功的接收到消息,第三個參數(shù)是指沒有成功接收消息的原因。
1.2、退回模式
從交換機到消息隊列投遞失敗會返回一個 returnCallback
。在工廠配置中開啟回退模式 publisher-returns="true" ,設(shè)置交換機處理消息失敗的模式(默認 false 直接將消息進行丟棄),添加退回處理的邏輯。
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq.xml") public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Test public void producer() throws InterruptedException { rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // 重發(fā)邏輯處理 System.out.println(message.getBody() + " 投遞消息隊列失敗"); } }); rabbitTemplate.convertAndSend("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep(5); } }
returnedMessage
中攜帶五個參數(shù)、分別指的是消息對象、錯誤碼、錯誤信息、交換機、路由鍵。
1.3、確認機制
在消費者抓取消息隊列中的數(shù)據(jù)取消費之后會有一個確認機制進行消息的確認,防止因為抓取消息之后但沒有消費成功而導(dǎo)致的消息丟失。有三種確認方式:
自動確認:
acknowledge="none"
手動確認:
acknowledge="manual"
根據(jù)異常情況確認:
acknowledge="auto"
其中自動確認是指一旦消息被消費者抓取就自動默認成功,并將消息從消息隊列中進行移除,如果這個時候消費端消費出現(xiàn)問題,那么也會是默認消息消費成功,但是實際上是沒有消費成功的,也就是當前的消息丟失了。默認的情況就是自動確認機制。
如果設(shè)置手動確認的方式,就需要在正常消費消息之后進行回調(diào)確認 channel.basicAck()
,手動簽收。如果業(yè)務(wù)處理過程中發(fā)生了異常則調(diào)用 channel.basicNack()
重新發(fā)送消息。
首先需要在隊列綁定時進行確認機制的配置,設(shè)置為手動簽收。
<!-- 綁定隊列 --> <rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/> </rabbit:listener-container>
生產(chǎn)者一端不用更改,只需要改變消費者的實現(xiàn)進行消息自動簽收就可以了,正常執(zhí)行業(yè)務(wù)則簽收消息,業(yè)務(wù)發(fā)生錯誤則選擇消息拒簽,消息重發(fā)或者丟棄。
public class ConsumerAck implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { // 消息唯一ID long tag = message.getMessageProperties().getDeliveryTag(); try { String msg = new String(message.getBody(), "utf-8"); channel.basicAck(tag, true); System.out.println("接收消息: " + msg); } catch (Exception e) { System.out.println("接收消息異常"); channel.basicNack(tag, true, true); e.printStackTrace(); } } }
里面涉及三個簡單的簽收函數(shù),一是正確簽收的 basicAck
,二是單條拒簽的 basicReject
,三是批量拒簽的 basicNack
。
- basicAck 第一個參數(shù)表示消息在通道中的唯一ID,只針對當前的 Channel;第二個參數(shù)表示是否批量同意,如果是 false 的話只會同意簽收當前ID的一條消息,將其從消息隊列中進行刪除,而如果是 true 的話將會把此ID之前的消息一起給同意簽收了。
- basicReject 第一個參數(shù)依舊表示消息的唯一ID,第二個參數(shù)表示是否重新回隊發(fā)送,false 表示直接丟棄該條消息或者有死信隊列可以接收, true 則表示重新回隊進行消息發(fā)送,所有操作只針對當前的消息。
- basicNack 比第二個多了一個參數(shù),也就是處于中間位置的布爾值,表示是否批量進行。
2、消費端限流
在用戶請求和DB服務(wù)處理之間增加消息中間件的隔離,使得突發(fā)流量全部讓消息隊列來抗,降低服務(wù)端被沖垮的可能性。讓所有的請求都往隊列中存,消費端只需要勻速的取出消息進行消費,這樣就能保證運行效率,也不會因為后臺的阻塞而導(dǎo)致客戶端得不到正常的響應(yīng)(當然指的是一些不需要同步回顯的任務(wù))。
只需要在消費者綁定消息隊列時指定取出消息的速率即可,需要使用手動簽收的方式,每進行一次的簽收才會從隊列中再取出下一條數(shù)據(jù)。
<!-- 綁定隊列 --> <rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual" prefetch="1"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/> </rabbit:listener-container>
3、消息過期時間
消息隊列提供了存儲在隊列中消息的過期時間,分為兩個方向的實現(xiàn),一個是針對于整個隊列中的所有消息,也就是隊列的過期時間,另一個是針對當前消息的過期時間,也就是針對于單條消息單獨設(shè)置。
隊列的過期時間設(shè)置很簡單,只需要在創(chuàng)建隊列時進行過期時間的指定即可,也可以通過控制臺直接創(chuàng)建指定過期時間。一旦隊列過期時間到了,隊列中還未被消費的消息都將過期,進行隊列的過期處理。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> </rabbit:queue-arguments> </rabbit:queue>
單條消息的過期時間需要在發(fā)送的時候進行單獨的指定,發(fā)送的時候指定配置的額外信息,配置的編寫由配置類完成。
如果一條消息的過期時間到了,但是他此時處于隊列的中間,那么他將不會被處理,只有當之后處理到時候才會進行判斷是否過期。
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 設(shè)置 message 的過期時間 message.getMessageProperties().setExpiration("5000"); // 返回該消息 return message; } }; rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);
如果說同時設(shè)置了消息的過期時間和隊列的過期時間,那么最終的過期時間由最短的時間進行決定,也就是說如果當前消息的過期時間沒到,但是整個隊列的過期時間到了,那么隊列中的所有消息也自然就過期了,執(zhí)行過期的處理策略。
4、死信隊列
4.1、死信概念
死信隊列指的是死信交換機,當一條消息成為死信之后可以重新發(fā)送到另一個交換機進行處理,而進行處理的這個交換機就叫做死信交換機。
- 消息成為死信消息有幾種情況
隊列的消息長度達到限制
消費者拒接消息的時候不把消息重新放入隊列中
隊列存在消息過期設(shè)置,消息超時未被消費
消息存在過期時間,在投遞給消費者時發(fā)現(xiàn)過期
在創(chuàng)建隊列時可以在配置中指定相關(guān)的信息,例如死信交換機、隊列長度等等,之后的一系列工作就不由程序員進行操作了,MQ 會自己完成配置過的事件響應(yīng)。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <!-- 死信交換機 --> <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/> <!-- 路由 --> <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/> <!-- 隊列過期時間 --> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> <!-- 隊列長度 --> <entry key="x-max-length" value-type="java.lang.Integer" value="10"/> </rabbit:queue-arguments> </rabbit:queue>
4.2、延遲隊列
延遲隊列指的是消息在進入隊列后不會立即被消費,只有到達指定時間之后才會被消費,也就是需要有一個時間的判斷條件。
消息隊列實際上是沒有提供對延遲隊列的實現(xiàn)的,但是可以通過 TTL
+ 死信隊列
的方式完成,設(shè)置一個隊列,不被任何的消費者所消費,所有的消息進入都會被保存在里面,設(shè)置隊列的過期時間,一旦隊列過期將所有的消息過渡到綁定的死信隊列中。
再由具體的消費者來消費死信隊列中的消息,這樣就實現(xiàn)了延遲隊列的功能。
例如實現(xiàn)一個下單超時支付取消訂單的功能:
到此這篇關(guān)于java中RabbitMQ高級應(yīng)用的文章就介紹到這了,更多相關(guān)java RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
spring使用RedisTemplate的操作類訪問Redis
本篇文章主要介紹了spring使用RedisTemplate的操作類訪問Redis,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-05-05Java?Rabbitmq中四種集群架構(gòu)的區(qū)別詳解
這篇文章主要為大家詳細介紹了Java?Rabbitmq中四種集群架構(gòu)的區(qū)別,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-02-02java遍歷http請求request的所有參數(shù)實現(xiàn)方法
下面小編就為大家?guī)硪黄猨ava遍歷http請求request的所有參數(shù)實現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-09-09Java中的System.arraycopy()淺復(fù)制方法詳解
這篇文章主要介紹了Java中的System.arraycopy()淺復(fù)制方法詳解,Java數(shù)組的復(fù)制操作可以分為深度復(fù)制和淺度復(fù)制,簡單來說深度復(fù)制,可以將對象的值和對象的內(nèi)容復(fù)制;淺復(fù)制是指對對象引用的復(fù)制,需要的朋友可以參考下2023-11-11Java注解中@Component和@Bean的區(qū)別
這篇文章主要介紹了@Component和@Bean的區(qū)別,在這給大家簡單介紹下作用對象不同:@Component 注解作用于類,而 @Bean 注解作用于方法,具體實例代碼參考下本文2024-03-03啟動Springboot項目時找不到Mapper的問題及解決
這篇文章主要介紹了啟動Springboot項目時找不到Mapper的問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-11-11java使用任務(wù)架構(gòu)執(zhí)行任務(wù)調(diào)度示例
在Java 5.0之前啟動一個任務(wù)是通過調(diào)用Thread類的start()方法來實現(xiàn)的,5.0里提供了一個新的任務(wù)執(zhí)行架構(gòu)使你可以輕松地調(diào)度和控制任務(wù)的執(zhí)行,并且可以建立一個類似數(shù)據(jù)庫連接池的線程池來執(zhí)行任務(wù),下面看一個示例2014-01-01