java中RabbitMQ高級(jí)應(yīng)用
1、消息可靠性投遞
在使用 RabbitMQ
的時(shí)候,生產(chǎn)者在進(jìn)行消息投遞的時(shí)候如果想知道消息是否成功的投遞到對(duì)應(yīng)的交換機(jī)和隊(duì)列中,有兩種方式可以用來(lái)控制消息投遞的可靠性模式 。
由上圖的整個(gè)消息的投遞過(guò)程來(lái)看,生產(chǎn)者的消息進(jìn)入到中間件中會(huì)首先到達(dá)交換機(jī),然后再?gòu)慕粨Q機(jī)傳遞到隊(duì)列中去,也就是分為兩步走戰(zhàn)略。那么消息的丟失情況也就是會(huì)出現(xiàn)在這兩個(gè)階段中,RabbitMQ 貼心的為我們提供了針對(duì)于這兩個(gè)部分的可靠新傳遞模式:
- confirm 模式。
- return 模式。
利用這兩個(gè)回調(diào)模式來(lái)確保消息的傳遞可靠。
1.1、確認(rèn)模式
消息從生產(chǎn)者到交換機(jī)之間傳遞會(huì)返回一個(gè) confirmCallback
的回調(diào)??梢灾苯釉?rabbitTemplate
實(shí)例中進(jìn)行確認(rèn)邏輯的設(shè)置。如果是使用 XML
配置的話需要在工廠配置開啟 publisher-confirms="true",YAML
的配置就直接 publisher-confirm-type: correlated,他默認(rèn)是 NONE
,需要手動(dòng)開啟。
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq.xml") public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Test public void producer() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println(); if (!b) { // 消息重發(fā)之類的處理 System.out.println(s); } else { System.out.println("交換機(jī)成功接收消息"); } } }); rabbitTemplate.convertAndSend("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep(5); } }
上面的確認(rèn)是由一個(gè) confirm
的函數(shù)執(zhí)行的,里面攜帶了三個(gè)參數(shù),第一個(gè)是配置的相關(guān)信息,第二個(gè)表示交換機(jī)是否成功的接收到消息,第三個(gè)參數(shù)是指沒有成功接收消息的原因。
1.2、退回模式
從交換機(jī)到消息隊(duì)列投遞失敗會(huì)返回一個(gè) returnCallback
。在工廠配置中開啟回退模式 publisher-returns="true" ,設(shè)置交換機(jī)處理消息失敗的模式(默認(rèn) false 直接將消息進(jìn)行丟棄),添加退回處理的邏輯。
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq.xml") public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Test public void producer() throws InterruptedException { rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // 重發(fā)邏輯處理 System.out.println(message.getBody() + " 投遞消息隊(duì)列失敗"); } }); rabbitTemplate.convertAndSend("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep(5); } }
returnedMessage
中攜帶五個(gè)參數(shù)、分別指的是消息對(duì)象、錯(cuò)誤碼、錯(cuò)誤信息、交換機(jī)、路由鍵。
1.3、確認(rèn)機(jī)制
在消費(fèi)者抓取消息隊(duì)列中的數(shù)據(jù)取消費(fèi)之后會(huì)有一個(gè)確認(rèn)機(jī)制進(jìn)行消息的確認(rèn),防止因?yàn)樽ト∠⒅蟮珱]有消費(fèi)成功而導(dǎo)致的消息丟失。有三種確認(rèn)方式:
自動(dòng)確認(rèn):
acknowledge="none"
手動(dòng)確認(rèn):
acknowledge="manual"
根據(jù)異常情況確認(rèn):
acknowledge="auto"
其中自動(dòng)確認(rèn)是指一旦消息被消費(fèi)者抓取就自動(dòng)默認(rèn)成功,并將消息從消息隊(duì)列中進(jìn)行移除,如果這個(gè)時(shí)候消費(fèi)端消費(fèi)出現(xiàn)問(wèn)題,那么也會(huì)是默認(rèn)消息消費(fèi)成功,但是實(shí)際上是沒有消費(fèi)成功的,也就是當(dāng)前的消息丟失了。默認(rèn)的情況就是自動(dòng)確認(rèn)機(jī)制。
如果設(shè)置手動(dòng)確認(rèn)的方式,就需要在正常消費(fèi)消息之后進(jìn)行回調(diào)確認(rèn) channel.basicAck()
,手動(dòng)簽收。如果業(yè)務(wù)處理過(guò)程中發(fā)生了異常則調(diào)用 channel.basicNack()
重新發(fā)送消息。
首先需要在隊(duì)列綁定時(shí)進(jìn)行確認(rèn)機(jī)制的配置,設(shè)置為手動(dòng)簽收。
<!-- 綁定隊(duì)列 --> <rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/> </rabbit:listener-container>
生產(chǎn)者一端不用更改,只需要改變消費(fèi)者的實(shí)現(xiàn)進(jìn)行消息自動(dòng)簽收就可以了,正常執(zhí)行業(yè)務(wù)則簽收消息,業(yè)務(wù)發(fā)生錯(cuò)誤則選擇消息拒簽,消息重發(fā)或者丟棄。
public class ConsumerAck implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { // 消息唯一ID long tag = message.getMessageProperties().getDeliveryTag(); try { String msg = new String(message.getBody(), "utf-8"); channel.basicAck(tag, true); System.out.println("接收消息: " + msg); } catch (Exception e) { System.out.println("接收消息異常"); channel.basicNack(tag, true, true); e.printStackTrace(); } } }
里面涉及三個(gè)簡(jiǎn)單的簽收函數(shù),一是正確簽收的 basicAck
,二是單條拒簽的 basicReject
,三是批量拒簽的 basicNack
。
- basicAck 第一個(gè)參數(shù)表示消息在通道中的唯一ID,只針對(duì)當(dāng)前的 Channel;第二個(gè)參數(shù)表示是否批量同意,如果是 false 的話只會(huì)同意簽收當(dāng)前ID的一條消息,將其從消息隊(duì)列中進(jìn)行刪除,而如果是 true 的話將會(huì)把此ID之前的消息一起給同意簽收了。
- basicReject 第一個(gè)參數(shù)依舊表示消息的唯一ID,第二個(gè)參數(shù)表示是否重新回隊(duì)發(fā)送,false 表示直接丟棄該條消息或者有死信隊(duì)列可以接收, true 則表示重新回隊(duì)進(jìn)行消息發(fā)送,所有操作只針對(duì)當(dāng)前的消息。
- basicNack 比第二個(gè)多了一個(gè)參數(shù),也就是處于中間位置的布爾值,表示是否批量進(jìn)行。
2、消費(fèi)端限流
在用戶請(qǐng)求和DB服務(wù)處理之間增加消息中間件的隔離,使得突發(fā)流量全部讓消息隊(duì)列來(lái)抗,降低服務(wù)端被沖垮的可能性。讓所有的請(qǐng)求都往隊(duì)列中存,消費(fèi)端只需要?jiǎng)蛩俚娜〕鱿⑦M(jìn)行消費(fèi),這樣就能保證運(yùn)行效率,也不會(huì)因?yàn)楹笈_(tái)的阻塞而導(dǎo)致客戶端得不到正常的響應(yīng)(當(dāng)然指的是一些不需要同步回顯的任務(wù))。
只需要在消費(fèi)者綁定消息隊(duì)列時(shí)指定取出消息的速率即可,需要使用手動(dòng)簽收的方式,每進(jìn)行一次的簽收才會(huì)從隊(duì)列中再取出下一條數(shù)據(jù)。
<!-- 綁定隊(duì)列 --> <rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual" prefetch="1"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/> </rabbit:listener-container>
3、消息過(guò)期時(shí)間
消息隊(duì)列提供了存儲(chǔ)在隊(duì)列中消息的過(guò)期時(shí)間,分為兩個(gè)方向的實(shí)現(xiàn),一個(gè)是針對(duì)于整個(gè)隊(duì)列中的所有消息,也就是隊(duì)列的過(guò)期時(shí)間,另一個(gè)是針對(duì)當(dāng)前消息的過(guò)期時(shí)間,也就是針對(duì)于單條消息單獨(dú)設(shè)置。
隊(duì)列的過(guò)期時(shí)間設(shè)置很簡(jiǎn)單,只需要在創(chuàng)建隊(duì)列時(shí)進(jìn)行過(guò)期時(shí)間的指定即可,也可以通過(guò)控制臺(tái)直接創(chuàng)建指定過(guò)期時(shí)間。一旦隊(duì)列過(guò)期時(shí)間到了,隊(duì)列中還未被消費(fèi)的消息都將過(guò)期,進(jìn)行隊(duì)列的過(guò)期處理。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> </rabbit:queue-arguments> </rabbit:queue>
單條消息的過(guò)期時(shí)間需要在發(fā)送的時(shí)候進(jìn)行單獨(dú)的指定,發(fā)送的時(shí)候指定配置的額外信息,配置的編寫由配置類完成。
如果一條消息的過(guò)期時(shí)間到了,但是他此時(shí)處于隊(duì)列的中間,那么他將不會(huì)被處理,只有當(dāng)之后處理到時(shí)候才會(huì)進(jìn)行判斷是否過(guò)期。
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 設(shè)置 message 的過(guò)期時(shí)間 message.getMessageProperties().setExpiration("5000"); // 返回該消息 return message; } }; rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);
如果說(shuō)同時(shí)設(shè)置了消息的過(guò)期時(shí)間和隊(duì)列的過(guò)期時(shí)間,那么最終的過(guò)期時(shí)間由最短的時(shí)間進(jìn)行決定,也就是說(shuō)如果當(dāng)前消息的過(guò)期時(shí)間沒到,但是整個(gè)隊(duì)列的過(guò)期時(shí)間到了,那么隊(duì)列中的所有消息也自然就過(guò)期了,執(zhí)行過(guò)期的處理策略。
4、死信隊(duì)列
4.1、死信概念
死信隊(duì)列指的是死信交換機(jī),當(dāng)一條消息成為死信之后可以重新發(fā)送到另一個(gè)交換機(jī)進(jìn)行處理,而進(jìn)行處理的這個(gè)交換機(jī)就叫做死信交換機(jī)。
- 消息成為死信消息有幾種情況
隊(duì)列的消息長(zhǎng)度達(dá)到限制
消費(fèi)者拒接消息的時(shí)候不把消息重新放入隊(duì)列中
隊(duì)列存在消息過(guò)期設(shè)置,消息超時(shí)未被消費(fèi)
消息存在過(guò)期時(shí)間,在投遞給消費(fèi)者時(shí)發(fā)現(xiàn)過(guò)期
在創(chuàng)建隊(duì)列時(shí)可以在配置中指定相關(guān)的信息,例如死信交換機(jī)、隊(duì)列長(zhǎng)度等等,之后的一系列工作就不由程序員進(jìn)行操作了,MQ 會(huì)自己完成配置過(guò)的事件響應(yīng)。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <!-- 死信交換機(jī) --> <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/> <!-- 路由 --> <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/> <!-- 隊(duì)列過(guò)期時(shí)間 --> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> <!-- 隊(duì)列長(zhǎng)度 --> <entry key="x-max-length" value-type="java.lang.Integer" value="10"/> </rabbit:queue-arguments> </rabbit:queue>
4.2、延遲隊(duì)列
延遲隊(duì)列指的是消息在進(jìn)入隊(duì)列后不會(huì)立即被消費(fèi),只有到達(dá)指定時(shí)間之后才會(huì)被消費(fèi),也就是需要有一個(gè)時(shí)間的判斷條件。
消息隊(duì)列實(shí)際上是沒有提供對(duì)延遲隊(duì)列的實(shí)現(xiàn)的,但是可以通過(guò) TTL
+ 死信隊(duì)列
的方式完成,設(shè)置一個(gè)隊(duì)列,不被任何的消費(fèi)者所消費(fèi),所有的消息進(jìn)入都會(huì)被保存在里面,設(shè)置隊(duì)列的過(guò)期時(shí)間,一旦隊(duì)列過(guò)期將所有的消息過(guò)渡到綁定的死信隊(duì)列中。
再由具體的消費(fèi)者來(lái)消費(fèi)死信隊(duì)列中的消息,這樣就實(shí)現(xiàn)了延遲隊(duì)列的功能。
例如實(shí)現(xiàn)一個(gè)下單超時(shí)支付取消訂單的功能:
到此這篇關(guān)于java中RabbitMQ高級(jí)應(yīng)用的文章就介紹到這了,更多相關(guān)java RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java?RabbitMQ消息隊(duì)列詳解常見問(wèn)題
- Java實(shí)現(xiàn)Kafka生產(chǎn)者消費(fèi)者代碼實(shí)例
- Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式實(shí)例解析
- Java多線程生產(chǎn)者消費(fèi)者模式實(shí)現(xiàn)過(guò)程解析
- Java多線程 生產(chǎn)者消費(fèi)者模型實(shí)例詳解
- Java多線程 BlockingQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型詳解
- Java編寫簡(jiǎn)易rabbitmq生產(chǎn)者與消費(fèi)者的代碼
相關(guān)文章
spring使用RedisTemplate的操作類訪問(wèn)Redis
本篇文章主要介紹了spring使用RedisTemplate的操作類訪問(wèn)Redis,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-05-05Java?Rabbitmq中四種集群架構(gòu)的區(qū)別詳解
這篇文章主要為大家詳細(xì)介紹了Java?Rabbitmq中四種集群架構(gòu)的區(qū)別,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2022-02-02java遍歷http請(qǐng)求request的所有參數(shù)實(shí)現(xiàn)方法
下面小編就為大家?guī)?lái)一篇java遍歷http請(qǐng)求request的所有參數(shù)實(shí)現(xiàn)方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-09-09Java中的System.arraycopy()淺復(fù)制方法詳解
這篇文章主要介紹了Java中的System.arraycopy()淺復(fù)制方法詳解,Java數(shù)組的復(fù)制操作可以分為深度復(fù)制和淺度復(fù)制,簡(jiǎn)單來(lái)說(shuō)深度復(fù)制,可以將對(duì)象的值和對(duì)象的內(nèi)容復(fù)制;淺復(fù)制是指對(duì)對(duì)象引用的復(fù)制,需要的朋友可以參考下2023-11-11Java注解中@Component和@Bean的區(qū)別
這篇文章主要介紹了@Component和@Bean的區(qū)別,在這給大家簡(jiǎn)單介紹下作用對(duì)象不同:@Component 注解作用于類,而 @Bean 注解作用于方法,具體實(shí)例代碼參考下本文2024-03-03啟動(dòng)Springboot項(xiàng)目時(shí)找不到Mapper的問(wèn)題及解決
這篇文章主要介紹了啟動(dòng)Springboot項(xiàng)目時(shí)找不到Mapper的問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11java使用任務(wù)架構(gòu)執(zhí)行任務(wù)調(diào)度示例
在Java 5.0之前啟動(dòng)一個(gè)任務(wù)是通過(guò)調(diào)用Thread類的start()方法來(lái)實(shí)現(xiàn)的,5.0里提供了一個(gè)新的任務(wù)執(zhí)行架構(gòu)使你可以輕松地調(diào)度和控制任務(wù)的執(zhí)行,并且可以建立一個(gè)類似數(shù)據(jù)庫(kù)連接池的線程池來(lái)執(zhí)行任務(wù),下面看一個(gè)示例2014-01-01