Java?RabbitMQ消息隊(duì)列詳解常見問題
消息堆積
消息堆積的產(chǎn)生場(chǎng)景:
- 生產(chǎn)者產(chǎn)生的消息速度大于消費(fèi)者消費(fèi)的速度。解決:增加消費(fèi)者的數(shù)量或速度。
- 沒有消費(fèi)者進(jìn)行消費(fèi)的時(shí)候。解決:死信隊(duì)列、設(shè)置消息有效期。相當(dāng)于對(duì)我們的消息設(shè)置有效期,在規(guī)定的時(shí)間內(nèi)如果沒有消費(fèi)的話,自動(dòng)過期,過期的時(shí)候會(huì)執(zhí)行客戶端回調(diào)監(jiān)聽的方法將消息存放到數(shù)據(jù)庫表記錄,后期實(shí)現(xiàn)補(bǔ)償。
保證消息不丟失
1、生產(chǎn)者使用消息確認(rèn)機(jī)制保證消息百分之百能夠?qū)⑾⑼哆f到MQ成功。
2、MQ服務(wù)器端應(yīng)該將消息持久化到硬盤
3、消費(fèi)者使用手動(dòng)ack機(jī)制確認(rèn)消息消費(fèi)成功
如果MQ服務(wù)器容量滿了怎么辦?
使用死信隊(duì)列將消息存到數(shù)據(jù)庫中去,后期補(bǔ)償消費(fèi)。
死信隊(duì)列
RabbitMQ死信隊(duì)列俗稱,備胎隊(duì)列;消息中間件因?yàn)槟撤N原因拒收該消息后,可以轉(zhuǎn)移到死信隊(duì)列中存放,死信隊(duì)列也可以有交換機(jī)和路由key等。
產(chǎn)生背景:
- 消息投遞到MQ中存放 消息已經(jīng)過期
- 隊(duì)列達(dá)到最大的長(zhǎng)度 (隊(duì)列容器已經(jīng)滿了)生產(chǎn)者拒絕接收消息
- 消費(fèi)者消費(fèi)多次消息失敗,就會(huì)轉(zhuǎn)移存放到死信隊(duì)列中
代碼案例:
maven依賴
<dependencies> <!-- springboot-web組件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot對(duì)amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies>
yml配置
server: # 服務(wù)啟動(dòng)端口配置 port: 8081 servlet: # 應(yīng)用訪問路徑 context-path: / spring: #增加application.druid.yml 的配置文件 # profiles: # active: rabbitmq rabbitmq: ####連接地址 host: www.kaicostudy.com ####端口號(hào) port: 5672 ####賬號(hào) username: kaico ####密碼 password: kaico ### 地址 virtual-host: /kaicoStudy ###模擬演示死信隊(duì)列 kaico: dlx: exchange: kaico_order_dlx_exchange queue: kaico_order_dlx_queue routingKey: kaico.order.dlx ###備胎交換機(jī) order: exchange: kaico_order_exchange queue: kaico_order_queue routingKey: kaico.order
隊(duì)列配置類
@Configuration public class DeadLetterMQConfig { /** * 訂單交換機(jī) */ @Value("${kaico.order.exchange}") private String orderExchange; /** * 訂單隊(duì)列 */ @Value("${kaico.order.queue}") private String orderQueue; /** * 訂單路由key */ @Value("${kaico.order.routingKey}") private String orderRoutingKey; /** * 死信交換機(jī) */ @Value("${kaico.dlx.exchange}") private String dlxExchange; /** * 死信隊(duì)列 */ @Value("${kaico.dlx.queue}") private String dlxQueue; /** * 死信路由 */ @Value("${kaico.dlx.routingKey}") private String dlxRoutingKey; /** * 聲明死信交換機(jī) * * @return DirectExchange */ @Bean public DirectExchange dlxExchange() { return new DirectExchange(dlxExchange); } /** * 聲明死信隊(duì)列 * * @return Queue */ @Bean public Queue dlxQueue() { return new Queue(dlxQueue); } /** * 聲明訂單業(yè)務(wù)交換機(jī) * * @return DirectExchange */ @Bean public DirectExchange orderExchange() { return new DirectExchange(orderExchange); } /** * 綁定死信隊(duì)列到死信交換機(jī) * * @return Binding */ @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()) .to(dlxExchange()) .with(dlxRoutingKey); } /** * 聲明訂單隊(duì)列,并且綁定死信隊(duì)列 * * @return Queue */ @Bean public Queue orderQueue() { // 訂單隊(duì)列綁定我們的死信交換機(jī) Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", dlxExchange); arguments.put("x-dead-letter-routing-key", dlxRoutingKey); return new Queue(orderQueue, true, false, false, arguments); } /** * 綁定訂單隊(duì)列到訂單交換機(jī) * * @return Binding */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(orderExchange()) .with(orderRoutingKey); } }
死信隊(duì)列消費(fèi)者
@Component public class OrderDlxConsumer { /** * 死信隊(duì)列監(jiān)聽隊(duì)列回調(diào)的方法 * @param msg */ @RabbitListener(queues = "kaico_order_dlx_queue") public void orderDlxConsumer(String msg) { System.out.println("死信隊(duì)列消費(fèi)訂單消息" + msg); } }
普通隊(duì)列消費(fèi)者
@Component public class OrderConsumer { /** * 監(jiān)聽隊(duì)列回調(diào)的方法 * * @param msg */ @RabbitListener(queues = "kaico_order_queue") public void orderConsumer(String msg) { System.out.println("正常訂單消費(fèi)者消息msg:" + msg); } }
后臺(tái)隊(duì)列管理頁面如下:
部署方式:死信隊(duì)列不能夠和正常隊(duì)列存在同一個(gè)服務(wù)器中,應(yīng)該分服務(wù)器存放。
延遲隊(duì)列
訂單30分鐘未支付,系統(tǒng)自動(dòng)超時(shí)關(guān)閉的實(shí)現(xiàn)方案。
基于任務(wù)調(diào)度實(shí)現(xiàn),效率是非常低。
基于redis過期key實(shí)現(xiàn),key失效時(shí)會(huì)回調(diào)客戶端一個(gè)方法。
用戶下單的時(shí)候,生成一個(gè)令牌(有效期)30分鐘,存放到我們r(jià)edis;缺點(diǎn):非常冗余,會(huì)在表中存放一個(gè)冗余字段。
基于mq的延遲隊(duì)列(最佳方案)rabbitmq情況下。
原理:在我們下單的時(shí)候,往mq投遞一個(gè)消息設(shè)置有效期為30分鐘,但該消息失效的時(shí)候(沒有被消費(fèi)的情況下),執(zhí)行我們客戶端一個(gè)方法告訴我們?cè)撓⒁呀?jīng)失效,這時(shí)候查詢這筆訂單是否已經(jīng)支付。
實(shí)現(xiàn)邏輯:
主要使用死信隊(duì)列來實(shí)現(xiàn)。
想要的代碼:就是正常的消費(fèi)者不消費(fèi)消息,或者沒有正常的消費(fèi)者,在設(shè)置的時(shí)間后進(jìn)入死信隊(duì)列中,然后死信消費(fèi)者實(shí)現(xiàn)相應(yīng)的業(yè)務(wù)邏輯。
RabbitMQ消息冪等問題
RabbitMQ消息自動(dòng)重試機(jī)制
當(dāng)消費(fèi)者業(yè)務(wù)邏輯代碼中,拋出異常自動(dòng)實(shí)現(xiàn)重試 (默認(rèn)是無數(shù)次重試)
應(yīng)該對(duì)RabbitMQ重試次數(shù)實(shí)現(xiàn)限制,比如最多重試5次,每次間隔3s;重試多次還是失敗的情況下,存放到死信隊(duì)列或者存放到數(shù)據(jù)庫表中記錄后期人工補(bǔ)償。因?yàn)橹卦囀〈螖?shù)之后,隊(duì)列會(huì)自動(dòng)刪除這個(gè)消息。
消息重試原理: 在重試的過程中,使用aop攔截我們的消費(fèi)監(jiān)聽方法,也不會(huì)打印這個(gè)錯(cuò)誤日志。如果重試多次還是失敗,達(dá)到最大失敗次數(shù)的時(shí)候才會(huì)打印錯(cuò)誤日志。
如果消費(fèi)多次還是失敗的情況下:
1、自動(dòng)刪除該消息;(消息可能丟失)
解決辦法:
如果充實(shí)多次還是失敗的情況下,最終存放到死信隊(duì)列;
采用表日志記,消費(fèi)失敗錯(cuò)誤日志的日志記錄,后期人工自動(dòng)對(duì)該消息實(shí)現(xiàn)補(bǔ)償。
合理的選擇重試機(jī)制
消費(fèi)者獲取消息后,調(diào)用第三方接口(HTTP請(qǐng)求),但是調(diào)用第三方接口失敗呢?是否需要重試 ?
答:有時(shí)是因?yàn)榫W(wǎng)絡(luò)異常調(diào)用失敗,應(yīng)該需要重試幾次。
消費(fèi)者獲取消息后,應(yīng)該代碼問題拋出數(shù)據(jù)異常,是否需要重試?
答:不需要重試,代碼異常需要重新修改代碼發(fā)布項(xiàng)目。
消費(fèi)者開啟手動(dòng)ack模式
第一步、springboot項(xiàng)目配置需要開啟ack模式
acknowledge-mode: manual
第二步、消費(fèi)者Java代碼
int result = orderMapper.addOrder(orderEntity); if (result >= 0) { // 開啟消息確認(rèn)機(jī)制 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
rabbitMQ如何解決消息冪等問題
什么是消息冪等性?MQ消費(fèi)者如何保證冪等性?
產(chǎn)生的原因:就是因?yàn)橄M(fèi)者可能會(huì)開啟自動(dòng)重試,重試過程中可能會(huì)導(dǎo)致消費(fèi)者業(yè)務(wù)邏輯代碼重復(fù)執(zhí)行。此刻消息已經(jīng)消費(fèi)了,因?yàn)闃I(yè)務(wù)報(bào)錯(cuò)導(dǎo)致消息重新消費(fèi),這時(shí)會(huì)出現(xiàn)
解決方案:采用消息全局id根據(jù)業(yè)務(wù)來定,根據(jù)業(yè)務(wù)id(全局唯一id)消費(fèi)者可以判斷這條消息已經(jīng)消費(fèi)了。
消費(fèi)者代碼邏輯:
RabbitMQ解決分布式事務(wù)問題
分布式事務(wù):在分布式系統(tǒng)中,因?yàn)榭绶?wù)調(diào)用接口,存在多個(gè)不同的事務(wù),每個(gè)事務(wù)都互不影響。就存在分布式事務(wù)的問題。
解決分布式事務(wù)核心思想:數(shù)據(jù)最終一致性。
分布式領(lǐng)域中名詞:
強(qiáng)一致性 :要么同步速度非常快或者采用鎖的機(jī)制 不允許出現(xiàn)臟讀;
強(qiáng)一致性解決方案:要么數(shù)據(jù)庫A非常迅速的將數(shù)據(jù)同步給數(shù)據(jù)B,或者數(shù)據(jù)庫A沒有同步完成之前數(shù)據(jù)庫B不能夠讀取數(shù)據(jù)。
弱一致性: 允許讀取的數(shù)據(jù)為原來的臟數(shù)據(jù),允許讀取的結(jié)果不一致性。
最終一致性: 在我們的分布式系統(tǒng)中,因?yàn)閿?shù)據(jù)之間同步通過網(wǎng)絡(luò)實(shí)現(xiàn)通訊,短暫的數(shù)據(jù)延遲是允許的,但是最終數(shù)據(jù)必須要一致性。
基于RabbitMQ解決分布式事務(wù)的思路
基于RabbitMQ解決分布式事務(wù)的思路:(采用最終一致性的方案)
- 確認(rèn)我們的生產(chǎn)者消息一定要投遞到MQ中(消息確認(rèn)機(jī)制)投遞失敗 就繼續(xù)重試
- 消費(fèi)者采用手動(dòng)ack的形式確認(rèn)消息實(shí)現(xiàn)消費(fèi) 注意冪等性問題,消費(fèi)失敗的情況下,mq自動(dòng)幫消費(fèi)者重試。
- 保證我們的生產(chǎn)者第一事務(wù)先執(zhí)行,如果執(zhí)行失敗采用補(bǔ)單隊(duì)列(給生產(chǎn)者自己事務(wù)補(bǔ)充,確保生產(chǎn)者第一事務(wù)執(zhí)行完成【數(shù)據(jù)最終一致性】)。
解決思路圖:核心是利用mq發(fā)送消息給其他系統(tǒng)將數(shù)據(jù)修改回來。
到此這篇關(guān)于Java RabbitMQ詳解常見問題的解決的文章就介紹到這了,更多相關(guān)Java RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- java中RabbitMQ高級(jí)應(yīng)用
- Java實(shí)現(xiàn)Kafka生產(chǎn)者消費(fèi)者代碼實(shí)例
- Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式實(shí)例解析
- Java多線程生產(chǎn)者消費(fèi)者模式實(shí)現(xiàn)過程解析
- Java多線程 生產(chǎn)者消費(fèi)者模型實(shí)例詳解
- Java多線程 BlockingQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型詳解
- Java編寫簡(jiǎn)易rabbitmq生產(chǎn)者與消費(fèi)者的代碼
相關(guān)文章
java數(shù)據(jù)結(jié)構(gòu)之二分查找法 binarySearch的實(shí)例
這篇文章主要介紹了java數(shù)據(jù)結(jié)構(gòu)之二分查找法 binarySearch的實(shí)例的相關(guān)資料,希望通過本文能幫助到大家,讓大家理解掌握這部分內(nèi)容,需要的朋友可以參考下2017-10-10Java實(shí)現(xiàn)簡(jiǎn)單臺(tái)球游戲
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單臺(tái)球游戲,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-07-07Java中創(chuàng)建線程的兩種方式詳細(xì)說明
這篇文章主要介紹了Java中創(chuàng)建線程的兩種方式詳細(xì)說明,Java使用java.lang.Thread類代表線程,所有的線程對(duì)象都必須是Thread類或其子類的實(shí)例,每個(gè)線程的作用是完成一定的任務(wù),實(shí)際上就是執(zhí)行一段程序流即一段順序執(zhí)行的代碼,需要的朋友可以參考下2023-11-11Java創(chuàng)建類模式_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了Java創(chuàng)建類模式的相關(guān)方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08完美解決gson將Integer默認(rèn)轉(zhuǎn)換成Double的問題
下面小編就為大家?guī)硪黄昝澜鉀Qgson將Integer默認(rèn)轉(zhuǎn)換成Double的問題。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-03-03java高效打印一個(gè)二維數(shù)組的實(shí)例(不用遞歸,不用兩個(gè)for循環(huán))
下面小編就為大家?guī)硪黄猨ava高效打印一個(gè)二維數(shù)組的實(shí)例(不用遞歸,不用兩個(gè)for循環(huán))。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-03-03詳解在Spring3中使用注解(@Scheduled)創(chuàng)建計(jì)劃任務(wù)
本篇文章主要介紹了詳解在Spring3中使用注解(@Scheduled)創(chuàng)建計(jì)劃任務(wù),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-03-03