spring boot Rabbit高級教程(最新推薦)
消息可靠性
生產者重試機制
首先第一種情況,就是生產者發(fā)送消息時,出現(xiàn)了網絡故障,導致與MQ的連接中斷。
為了解決這個問題,SpringAMQP提供的消息發(fā)送時的重試機制。即:當RabbitTemplate
與MQ連接超時后,多次重試。
修改publisher
模塊的application.yaml
文件,添加下面的內容:
spring: rabbitmq: connection-timeout: 1s # 設置MQ的連接超時時間 template: retry: enabled: true # 開啟超時重試機制 initial-interval: 1000ms # 失敗后的初始等待時間 multiplier: 1 # 失敗后下次的等待時長倍數(shù),下次等待時長 = initial-interval * multiplier max-attempts: 3 # 最大重試次數(shù)
注意:當網絡不穩(wěn)定的時候,利用重試機制可以有效提高消息發(fā)送的成功率。不過SpringAMQP提供的重試機制是阻塞式的重試,也就是說多次重試等待的過程中,當前線程是被阻塞的。
如果對于業(yè)務性能有要求,建議禁用重試機制。如果一定要使用,請合理配置等待時長和重試次數(shù),當然也可以考慮使用異步線程來執(zhí)行發(fā)送消息的代碼。
:::
生產者確認機制
一般情況下,只要生產者與MQ之間的網路連接順暢,基本不會出現(xiàn)發(fā)送消息丟失的情況,因此大多數(shù)情況下我們無需考慮這種問題。
不過,在少數(shù)情況下,也會出現(xiàn)消息發(fā)送到MQ之后丟失的現(xiàn)象,比如:
- MQ內部處理消息的進程發(fā)生了異常
- 生產者發(fā)送消息到達MQ后未找到
Exchange
- 生產者發(fā)送消息到達MQ的
Exchange
后,未找到合適的Queue
,因此無法路由
針對上述情況,RabbitMQ提供了生產者消息確認機制,包括Publisher Confirm
和Publisher Return
兩種。在開啟確認機制的情況下,當生產者發(fā)送消息給MQ后,MQ會根據(jù)消息處理的情況返回不同的回執(zhí)。
- 當消息投遞到MQ,但是路由失敗時,通過Publisher Return返回異常信息,同時返回ack的確認信息,代表投遞成功
- 臨時消息投遞到了MQ,并且入隊成功,返回ACK,告知投遞成功
- 持久消息投遞到了MQ,并且入隊完成持久化,返回ACK ,告知投遞成功
- 其它情況都會返回NACK,告知投遞失敗
其中ack
和nack
屬于Publisher Confirm機制,ack
是投遞成功;nack
是投遞失敗。而return
則屬于Publisher Return機制。
默認兩種機制都是關閉狀態(tài),需要通過配置文件來開啟。
在publisher模塊的application.yaml
中添加配置:
spring: rabbitmq: publisher-confirm-type: correlated # 開啟publisher confirm機制,并設置confirm類型 publisher-returns: true # 開啟publisher return機制
這里publisher-confirm-type
有三種模式可選:
none
:關閉confirm機制simple
:同步阻塞等待MQ的回執(zhí)correlated
:MQ異步回調返回回執(zhí)
一般我們推薦使用correlated
,回調機制。
定義ReturnCallback
每個RabbitTemplate
只能配置一個ReturnCallback
,因此我們可以在配置類中統(tǒng)一設置。我們在publisher模塊定義一個配置類:
內容如下:
package com.itheima.publisher.config; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Slf4j @AllArgsConstructor @Configuration public class MqConfig { private final RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { log.error("觸發(fā)return callback,"); log.debug("exchange: {}", returned.getExchange()); log.debug("routingKey: {}", returned.getRoutingKey()); log.debug("message: {}", returned.getMessage()); log.debug("replyCode: {}", returned.getReplyCode()); log.debug("replyText: {}", returned.getReplyText()); } }); } }
定義ConfirmCallback
由于每個消息發(fā)送時的處理邏輯不一定相同,因此ConfirmCallback需要在每次發(fā)消息時定義。具體來說,是在調用RabbitTemplate中的convertAndSend方法時,多傳遞一個參數(shù):
這里的CorrelationData中包含兩個核心的東西:
id
:消息的唯一標示,MQ對不同的消息的回執(zhí)以此做判斷,避免混淆SettableListenableFuture
:回執(zhí)結果的Future對象
將來MQ的回執(zhí)就會通過這個Future
來返回,我們可以提前給CorrelationData
中的Future
添加回調函數(shù)來處理消息回執(zhí):
我們新建一個測試,向系統(tǒng)自帶的交換機發(fā)送消息,并且添加ConfirmCallback
:
@Test void testPublisherConfirm() { // 1.創(chuàng)建CorrelationData CorrelationData cd = new CorrelationData(); // 2.給Future添加ConfirmCallback cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { // 2.1.Future發(fā)生異常時的處理邏輯,基本不會觸發(fā) log.error("send message fail", ex); } @Override public void onSuccess(CorrelationData.Confirm result) { // 2.2.Future接收到回執(zhí)的處理邏輯,參數(shù)中的result就是回執(zhí)內容 if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執(zhí),false 代表 nack回執(zhí) log.debug("發(fā)送消息成功,收到 ack!"); }else{ // result.getReason(),String類型,返回nack時的異常描述 log.error("發(fā)送消息失敗,收到 nack, reason : {}", result.getReason()); } } }); // 3.發(fā)送消息 rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd); }
執(zhí)行結果如下:
可以看到,由于傳遞的RoutingKey
是錯誤的,路由失敗后,觸發(fā)了return callback
,同時也收到了ack。
當我們修改為正確的RoutingKey
以后,就不會觸發(fā)return callback
了,只收到ack。
而如果連交換機都是錯誤的,則只會收到nack。
注意:
開啟生產者確認比較消耗MQ性能,一般不建議開啟。而且大家思考一下觸發(fā)確認的幾種情況:
- 路由失?。阂话闶且驗镽outingKey錯誤導致,往往是編程導致
- 交換機名稱錯誤:同樣是編程錯誤導致
- MQ內部故障:這種需要處理,但概率往往較低。因此只有對消息可靠性要求非常高的業(yè)務才需要開啟,而且僅僅需要開啟ConfirmCallback處理nack就可以了。
數(shù)據(jù)持久化
為了提升性能,默認情況下MQ的數(shù)據(jù)都是在內存存儲的臨時數(shù)據(jù),重啟后就會消失。為了保證數(shù)據(jù)的可靠性,必須配置數(shù)據(jù)持久化,包括:
- 交換機持久化
- 隊列持久化
- 消息持久化
我們以控制臺界面為例來說明。
交換機持久化
在控制臺的Exchanges
頁面,添加交換機時可以配置交換機的Durability
參數(shù):
設置為Durable
就是持久化模式,Transient
就是臨時模式。
隊列持久化
在控制臺的Queues頁面,添加隊列時,同樣可以配置隊列的Durability
參數(shù):
除了持久化以外。
消息持久化
在控制臺發(fā)送消息的時候,可以添加很多參數(shù),而消息的持久化是要配置一個properties
:
說明:在開啟持久化機制以后,如果同時還開啟了生產者確認,那么MQ會在消息持久化以后才發(fā)送ACK回執(zhí),進一步確保消息的可靠性。
不過出于性能考慮,為了減少IO次數(shù),發(fā)送到MQ的消息并不是逐條持久化到數(shù)據(jù)庫的,而是每隔一段時間批量持久化。一般間隔在100毫秒左右,這就會導致ACK有一定的延遲,因此建議生產者確認全部采用異步方式。
LazyQueue
在默認情況下,RabbitMQ會將接收到的信息保存在內存中以降低消息收發(fā)的延遲。但在某些特殊情況下,這會導致消息積壓,比如:
- 消費者宕機或出現(xiàn)網絡故障
- 消息發(fā)送量激增,超過了消費者處理速度
- 消費者處理業(yè)務發(fā)生阻塞
一旦出現(xiàn)消息堆積問題,RabbitMQ的內存占用就會越來越高,直到觸發(fā)內存預警上限。此時RabbitMQ會將內存消息刷到磁盤上,這個行為成為PageOut
. PageOut
會耗費一段時間,并且會阻塞隊列進程。因此在這個過程中RabbitMQ不會再處理新的消息,生產者的所有請求都會被阻塞。
為了解決這個問題,從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的模式,也就是惰性隊列。惰性隊列的特征如下:
- 接收到消息后直接存入磁盤而非內存
- 消費者要消費消息時才會從磁盤中讀取并加載到內存(也就是懶加載)
- 支持數(shù)百萬條的消息存儲
而在3.12版本之后,LazyQueue已經成為所有隊列的默認格式。因此官方推薦升級MQ為3.12版本或者所有隊列都設置為LazyQueue模式。
控制臺配置Lazy模式
在添加隊列的時候,添加x-queue-mod=lazy
參數(shù)即可設置隊列為Lazy模式:
代碼配置Lazy模式
在利用SpringAMQP聲明隊列的時候,添加x-queue-mod=lazy
參數(shù)也可設置隊列為Lazy模式:
@Bean public Queue lazyQueue(){ return QueueBuilder .durable("lazy.queue") .lazy() // 開啟Lazy模式 .build(); }
這里是通過QueueBuilder
的lazy()
函數(shù)配置Lazy模式。
當然,我們也可以基于注解來聲明隊列并設置為Lazy模式:
@RabbitListener(queuesToDeclare = @Queue( name = "lazy.queue", durable = "true", arguments = @Argument(name = "x-queue-mode", value = "lazy") )) public void listenLazyQueue(String msg){ log.info("接收到 lazy.queue的消息:{}", msg); }
更新已有隊列為lazy模式
對于已經存在的隊列,也可以配置為lazy模式,但是要通過設置policy實現(xiàn)。
可以基于命令行設置policy:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
rabbitmqctl
:RabbitMQ的命令行工具set_policy
:添加一個策略Lazy
:策略名稱,可以自定義"^lazy-queue$"
:用正則表達式匹配隊列的名字'{"queue-mode":"lazy"}'
:設置隊列模式為lazy模式--apply-to queues
:策略的作用對象,是所有的隊列
當然,也可以在控制臺配置policy,進入在控制臺的Admin
頁面,點擊Policies
,即可添加配置:
消費者的可靠性
當RabbitMQ向消費者投遞消息以后,需要知道消費者的處理狀態(tài)如何。因為消息投遞給消費者并不代表就一定被正確消費了,可能出現(xiàn)的故障有很多,比如:
- 消息投遞的過程中出現(xiàn)了網絡故障
- 消費者接收到消息后突然宕機
- 消費者接收到消息后,因處理不當導致異常
一旦發(fā)生上述情況,消息也會丟失。因此,RabbitMQ必須知道消費者的處理狀態(tài),一旦消息處理失敗才能重新投遞消息。
但問題來了:RabbitMQ如何得知消費者的處理狀態(tài)呢?
本章我們就一起研究一下消費者處理消息時的可靠性解決方案。
消費者確認機制
為了確認消費者是否成功處理消息,RabbitMQ提供了消費者確認機制(Consumer Acknowledgement)。即:當消費者處理消息結束后,應該向RabbitMQ發(fā)送一個回執(zhí),告知RabbitMQ自己消息處理狀態(tài)。回執(zhí)有三種可選值:
- ack:成功處理消息,RabbitMQ從隊列中刪除該消息
- nack:消息處理失敗,RabbitMQ需要再次投遞消息
- reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息
一般reject方式用的較少,除非是消息格式有問題,那就是開發(fā)問題了。因此大多數(shù)情況下我們需要將消息處理的代碼通過try catch
機制捕獲,消息處理成功時返回ack,處理失敗時返回nack.
由于消息回執(zhí)的處理代碼比較統(tǒng)一,因此SpringAMQP幫我們實現(xiàn)了消息確認。并允許我們通過配置文件設置ACK處理方式,有三種模式:
**none**:不處理。即消息投遞給消費者后立刻ack,消息會立刻從MQ刪除。非常不安全,不建議使用
**manual**:手動模式。需要自己在業(yè)務代碼中調用api,發(fā)送ack或reject,存在業(yè)務入侵,但更靈活
**auto**:自動模式。SpringAMQP利用AOP對我們的消息處理邏輯做了環(huán)繞增強,當業(yè)務正常執(zhí)行時則自動返回ack. 當業(yè)務出現(xiàn)異常時,根據(jù)異常判斷返回不同結果:
如果是業(yè)務異常,會自動返回nack;
如果是消息處理或校驗異常,自動返回reject;
————————————————
版權聲明:本文為CSDN博主「過去日記」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權協(xié)議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/studycodeday/article/details/133839942
**none**
:不處理。即消息投遞給消費者后立刻ack,消息會立刻從MQ刪除。非常不安全,不建議使用**manual**
:手動模式。需要自己在業(yè)務代碼中調用api,發(fā)送ack
或reject
,存在業(yè)務入侵,但更靈活**auto**
:自動模式。SpringAMQP利用AOP對我們的消息處理邏輯做了環(huán)繞增強,當業(yè)務正常執(zhí)行時則自動返回ack
. 當業(yè)務出現(xiàn)異常時,根據(jù)異常判斷返回不同結果:- 如果是業(yè)務異常,會自動返回
nack
; - 如果是消息處理或校驗異常,自動返回
reject
;
- 如果是業(yè)務異常,會自動返回
通過下面的配置可以修改SpringAMQP的ACK處理方式:
spring: rabbitmq: listener: simple: acknowledge-mode: none # 不做處理
修改consumer服務的SpringRabbitListener類中的方法,模擬一個消息處理的異常:
@RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { log.info("spring 消費者接收到消息:【" + msg + "】"); if (true) { throw new MessageConversionException("故意的"); } log.info("消息處理完成"); }
測試可以發(fā)現(xiàn):當消息處理發(fā)生異常時,消息依然被RabbitMQ刪除了。
我們再次把確認機制修改為auto:
spring: rabbitmq: listener: simple: acknowledge-mode: auto # 自動ack
在異常位置打斷點,再次發(fā)送消息,程序卡在斷點時,可以發(fā)現(xiàn)此時消息狀態(tài)為unacked
(未確定狀態(tài)):
放行以后,由于拋出的是消息轉換異常,因此Spring會自動返回reject
,所以消息依然會被刪除
我們將異常改為RuntimeException類型:
@RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { log.info("spring 消費者接收到消息:【" + msg + "】"); if (true) { throw new RuntimeException("故意的"); } log.info("消息處理完成"); }
在異常位置打斷點,然后再次發(fā)送消息測試,程序卡在斷點時,可以發(fā)現(xiàn)此時消息狀態(tài)為unacked
(未確定狀態(tài))以后,由于拋出的是業(yè)務異常,所以Spring返回ack
,最終消息恢復至Ready
狀態(tài),并且沒有被RabbitMQ刪除
當我們把配置改為auto
時,消息處理失敗后,會回到RabbitMQ,并重新投遞到消費者。
失敗重試機制
當消費者出現(xiàn)異常后,消息會不斷requeue(重入隊)到隊列,再重新發(fā)送給消費者。如果消費者再次執(zhí)行依然出錯,消息會再次requeue到隊列,再次投遞,直到消息處理成功為止。
極端情況就是消費者一直無法執(zhí)行成功,那么消息requeue就會無限循環(huán),導致mq的消息處理飆升,帶來不必要的壓力:
當然,上述極端情況發(fā)生的概率還是非常低的,不過不怕一萬就怕萬一。為了應對上述情況Spring又提供了消費者失敗重試機制:在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊列。
修改consumer服務的application.yml文件,添加內容:
spring: rabbitmq: listener: simple: retry: enabled: true # 開啟消費者失敗重試 initial-interval: 1000ms # 初識的失敗等待時長為1秒 multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval max-attempts: 3 # 最大重試次數(shù) stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務中包含事務,這里改為false
重啟consumer服務,重復之前的測試??梢园l(fā)現(xiàn):
- 消費者在失敗后消息沒有重新回到MQ無限重新投遞,而是在本地重試了3次
- 本地重試3次以后,拋出了
AmqpRejectAndDontRequeueException
異常。查看RabbitMQ控制臺,發(fā)現(xiàn)消息被刪除了,說明最后SpringAMQP返回的是reject
結論:
- 開啟本地重試時,消息處理過程中拋出異常,不會requeue到隊列,而是在消費者本地重試
- 重試達到最大次數(shù)后,Spring會返回reject,消息會被丟棄
失敗處理策略
在之前的測試中,本地測試達到最大重試次數(shù)后,消息會被丟棄。這在某些對于消息可靠性要求較高的業(yè)務場景下,顯然不太合適了。
因此Spring允許我們自定義重試次數(shù)耗盡后的消息處理策略,這個策略是由MessageRecovery
接口來定義的,它有3個不同實現(xiàn):
RejectAndDontRequeueRecoverer
:重試耗盡后,直接reject
,丟棄消息。默認就是這種方式ImmediateRequeueMessageRecoverer
:重試耗盡后,返回nack
,消息重新入隊RepublishMessageRecoverer
:重試耗盡后,將失敗消息投遞到指定的交換機
比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer
,失敗后將消息投遞到一個指定的,專門存放異常消息的隊列,后續(xù)由人工集中處理。
1)在consumer服務中定義處理失敗消息的交換機和隊列
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }
2)定義一個RepublishMessageRecoverer,關聯(lián)隊列和交換機
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }
完整代碼如下:
package com.itheima.consumer.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; @Configuration @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true") public class ErrorMessageConfig { @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); } @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
業(yè)務冪等性
冪等是一個數(shù)學概念,用函數(shù)表達式來描述是這樣的:f(x) = f(f(x))
,例如求絕對值函數(shù)。
在程序開發(fā)中,則是指同一個業(yè)務,執(zhí)行一次或多次對業(yè)務狀態(tài)的影響是一致的。例如:
- 根據(jù)id刪除數(shù)據(jù)
- 查詢數(shù)據(jù)
- 新增數(shù)據(jù)
但數(shù)據(jù)的更新往往不是冪等的,如果重復執(zhí)行可能造成不一樣的后果。比如:
- 取消訂單,恢復庫存的業(yè)務。如果多次恢復就會出現(xiàn)庫存重復增加的情況
- 退款業(yè)務。重復退款對商家而言會有經濟損失。
所以,我們要盡可能避免業(yè)務被重復執(zhí)行。
然而在實際業(yè)務場景中,由于意外經常會出現(xiàn)業(yè)務被重復執(zhí)行的情況,例如:
- 頁面卡頓時頻繁刷新導致表單重復提交
- 服務間調用的重試
- MQ消息的重復投遞
我們在用戶支付成功后會發(fā)送MQ消息到交易服務,修改訂單狀態(tài)為已支付,就可能出現(xiàn)消息重復投遞的情況。如果消費者不做判斷,很有可能導致消息被消費多次,出現(xiàn)業(yè)務故障。
舉例:
- 假如用戶剛剛支付完成,并且投遞消息到交易服務,交易服務更改訂單為已支付狀態(tài)。
- 由于某種原因,例如網絡故障導致生產者沒有得到確認,隔了一段時間后重新投遞給交易服務。
- 但是,在新投遞的消息被消費之前,用戶選擇了退款,將訂單狀態(tài)改為了已退款狀態(tài)。
- 退款完成后,新投遞的消息才被消費,那么訂單狀態(tài)會被再次改為已支付。業(yè)務異常。
因此,我們必須想辦法保證消息處理的冪等性。這里給出兩種方案:
- 唯一消息ID
- 業(yè)務狀態(tài)判斷
唯一消息ID
這個思路非常簡單:
- 每一條消息都生成一個唯一的id,與消息一起投遞給消費者。
- 消費者接收到消息后處理自己的業(yè)務,業(yè)務處理成功后將消息ID保存到數(shù)據(jù)庫
- 如果下次又收到相同消息,去數(shù)據(jù)庫查詢判斷是否存在,存在則為重復消息放棄處理。
我們該如何給消息添加唯一ID呢?
其實很簡單,SpringAMQP的MessageConverter自帶了MessageID的功能,我們只要開啟這個功能即可。
以Jackson的消息轉換器為例:
@Bean public MessageConverter messageConverter(){ // 1.定義消息轉換器 Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter(); // 2.配置自動創(chuàng)建消息id,用于識別不同消息,也可以在業(yè)務中基于ID判斷是否是重復消息 jjmc.setCreateMessageIds(true); return jjmc; }
業(yè)務判斷
業(yè)務判斷就是基于業(yè)務本身的邏輯或狀態(tài)來判斷是否是重復的請求或消息,不同的業(yè)務場景判斷的思路也不一樣。
處理消息的業(yè)務邏輯是把訂單狀態(tài)從未支付修改為已支付。因此我們就可以在執(zhí)行業(yè)務時判斷訂單狀態(tài)是否是未支付,如果不是則證明訂單已經被處理過,無需重復處理。
相比較而言,消息ID的方案需要改造原有的數(shù)據(jù)庫,所以我更推薦使用業(yè)務判斷的方案。
以支付修改訂單的業(yè)務為例,我們需要修改OrderServiceImpl
中的markOrderPaySuccess
方法:
@Override public void markOrderPaySuccess(Long orderId) { // 1.查詢訂單 Order old = getById(orderId); // 2.判斷訂單狀態(tài) if (old == null || old.getStatus() != 1) { // 訂單不存在或者訂單狀態(tài)不是1,放棄處理 return; } // 3.嘗試更新訂單 Order order = new Order(); order.setId(orderId); order.setStatus(2); order.setPayTime(LocalDateTime.now()); updateById(order); }
上述代碼邏輯上符合了冪等判斷的需求,但是由于判斷和更新是兩步動作,因此在極小概率下可能存在線程安全問題。
我們可以合并上述操作為這樣:
@Override public void markOrderPaySuccess(Long orderId) { // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1 lambdaUpdate() .set(Order::getStatus, 2) .set(Order::getPayTime, LocalDateTime.now()) .eq(Order::getId, orderId) .eq(Order::getStatus, 1) .update(); }
注意看,上述代碼等同于這樣的SQL語句:
UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
我們在where條件中除了判斷id以外,還加上了status必須為1的條件。如果條件不符(說明訂單已支付),則SQL匹配不到數(shù)據(jù),根本不會執(zhí)行。
兜底方案
其實思想很簡單:既然MQ通知不一定發(fā)送到交易服務,那么交易服務就必須自己主動去查詢支付狀態(tài)。這樣即便支付服務的MQ通知失敗,我們依然能通過主動查詢來保證訂單狀態(tài)的一致。
流程如下:
圖中黃色線圈起來的部分就是MQ通知失敗后的兜底處理方案,由交易服務自己主動去查詢支付狀態(tài)。
不過需要注意的是,交易服務并不知道用戶會在什么時候支付,如果查詢的時機不正確(比如查詢的時候用戶正在支付中),可能查詢到的支付狀態(tài)也不正確。
那么問題來了,我們到底該在什么時間主動查詢支付狀態(tài)呢?
這個時間是無法確定的,因此,通常我們采取的措施就是利用定時任務定期查詢.
- 首先,支付服務會正在用戶支付成功以后利用MQ消息通知交易服務,完成訂單狀態(tài)同步。
- 其次,為了保證MQ消息的可靠性,我們采用了生產者確認機制、消費者確認、消費者失敗重試等策略,確保消息投遞的可靠性
- 最后,我們還在交易服務設置了定時任務,定期查詢訂單支付狀態(tài)。這樣即便MQ通知失敗,還可以利用定時任務作為兜底方案,確保訂單支付狀態(tài)的最終一致性。
延遲消息
在電商的支付業(yè)務中,對于一些庫存有限的商品,為了更好的用戶體驗,通常都會在用戶下單時立刻扣減商品庫存。例如電影院購票、高鐵購票,下單后就會鎖定座位資源,其他人無法重復購買。
但是這樣就存在一個問題,假如用戶下單后一直不付款,就會一直占有庫存資源,導致其他客戶無法正常交易,最終導致商戶利益受損!
因此,電商中通常的做法就是:對于超過一定時間未支付的訂單,應該立刻取消訂單并釋放占用的庫存。
例如,訂單支付超時時間為30分鐘,則我們應該在用戶下單后的第30分鐘檢查訂單支付狀態(tài),如果發(fā)現(xiàn)未支付,應該立刻取消訂單,釋放庫存。
但問題來了:如何才能準確的實現(xiàn)在下單后第30分鐘去檢查支付狀態(tài)呢?
像這種在一段時間以后才執(zhí)行的任務,我們稱之為延遲任務,而要實現(xiàn)延遲任務,最簡單的方案就是利用MQ的延遲消息了。
在RabbitMQ中實現(xiàn)延遲消息也有兩種方案:
- 死信交換機+TTL
- 延遲消息插件
這一章我們就一起研究下這兩種方案的實現(xiàn)方式,以及優(yōu)缺點。
死信交換機和延遲消息
死信交換機
當一個隊列中的消息滿足下列情況之一時,可以成為死信(dead letter):
- 消費者使用
basic.reject
或basic.nack
聲明消費失敗,并且消息的requeue
參數(shù)設置為false - 消息是一個過期消息,超時無人消費
- 要投遞的隊列消息滿了,無法投遞
如果一個隊列中的消息已經成為死信,并且這個隊列通過**dead-letter-exchange**
屬性指定了一個交換機,那么隊列中的死信就會投遞到這個交換機中,而這個交換機就稱為死信交換機(Dead Letter Exchange)。而此時加入有隊列與死信交換機綁定,則最終死信就會被投遞到這個隊列中。
死信交換機有什么作用呢?
- 收集那些因處理失敗而被拒絕的消息
- 收集那些因隊列滿了而被拒絕的消息
- 收集因TTL(有效期)到期的消息
延遲消息
前面兩種作用場景可以看做是把死信交換機當做一種消息處理的最終兜底方案,與消費者重試時講的RepublishMessageRecoverer
作用類似。
而最后一種場景,大家設想一下這樣的場景:
如圖,有一組綁定的交換機(ttl.fanout
)和隊列(ttl.queue
)。但是ttl.queue
沒有消費者監(jiān)聽,而是設定了死信交換機hmall.direct
,而隊列direct.queue1
則與死信交換機綁定,RoutingKey是blue:
假如我們現(xiàn)在發(fā)送一條消息到ttl.fanout
,RoutingKey為blue,并設置消息的有效期為5000毫秒:
注意:盡管這里的ttl.fanout
不需要RoutingKey,但是當消息變?yōu)樗佬挪⑼哆f到死信交換機時,會沿用之前的RoutingKey,這樣hmall.direct
才能正確路由消息。
消息肯定會被投遞到ttl.queue
之后,由于沒有消費者,因此消息無人消費。5秒之后,消息的有效期到期,成為死信:
死信被再次投遞到死信交換機hmall.direct
,并沿用之前的RoutingKey,也就是blue
:
由于direct.queue1
與hmall.direct
綁定的key是blue,因此最終消息被成功路由到direct.queue1
,如果此時有消費者與direct.queue1
綁定, 也就能成功消費消息了。但此時已經是5秒鐘以后了:
也就是說,publisher發(fā)送了一條消息,但最終consumer在5秒后才收到消息。我們成功實現(xiàn)了延遲消息。
總結
注意:
RabbitMQ的消息過期是基于追溯方式來實現(xiàn)的,也就是說當一個消息的TTL到期以后不一定會被移除或投遞到死信交換機,而是在消息恰好處于隊首時才會被處理。
當隊列中消息堆積很多的時候,過期消息可能不會被按時處理,因此你設置的TTL時間不一定準確。
:::
DelayExchange插件
基于死信隊列雖然可以實現(xiàn)延遲消息,但是太麻煩了。因此RabbitMQ社區(qū)提供了一個延遲消息插件來實現(xiàn)相同的效果。
官方文檔說明:
Scheduling Messages with RabbitMQ | RabbitMQ - Blog
下載
插件下載地址:
GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
安裝
因為我們是基于Docker安裝,所以需要先查看RabbitMQ的插件目錄對應的數(shù)據(jù)卷。
docker volume inspect mq-plugins
結果如下:
[
{
"CreatedAt": "2024-06-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]
插件目錄被掛載到了/var/lib/docker/volumes/mq-plugins/_data
這個目錄,我們上傳插件到該目錄下。
接下來執(zhí)行命令,安裝插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
運行結果如下:
聲明延遲交換機
基于注解方式:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayMessage(String msg){ log.info("接收到delay.queue的延遲消息:{}", msg); }
基于@Bean
的方式:
package com.itheima.consumer.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class DelayExchangeConfig { @Bean public DirectExchange delayExchange(){ return ExchangeBuilder .directExchange("delay.direct") // 指定交換機類型和名稱 .delayed() // 設置delay的屬性為true .durable(true) // 持久化 .build(); } @Bean public Queue delayedQueue(){ return new Queue("delay.queue"); } @Bean public Binding delayQueueBinding(){ return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay"); } }
發(fā)送延遲消息
發(fā)送消息時,必須通過x-delay屬性設定延遲時間:
@Test void testPublisherDelayMessage() { // 1.創(chuàng)建消息 String message = "hello, delayed message"; // 2.發(fā)送消息,利用消息后置處理器添加消息頭 rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 添加延遲消息屬性 message.getMessageProperties().setDelay(5000); return message; } }); }
:::warning
注意:
延遲消息插件內部會維護一個本地數(shù)據(jù)庫表,同時使用Elang Timers功能實現(xiàn)計時。如果消息的延遲時間設置較長,可能會導致堆積的延遲消息非常多,會帶來較大的CPU開銷,同時延遲消息的時間會存在誤差。
因此,不建議設置延遲時間過長的延遲消息。
到此這篇關于spring boot Rabbit高級教程的文章就介紹到這了,更多相關spring boot Rabbit內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java并發(fā)編程之常用的多線程實現(xiàn)方式分析
這篇文章主要介紹了Java并發(fā)編程之常用的多線程實現(xiàn)方式,結合實例形式分析了java并發(fā)編程中多線程的相關原理、實現(xiàn)方法與操作注意事項,需要的朋友可以參考下2020-02-02bootstrap.yml如何讀取nacos配置中心的配置文件
這篇文章主要介紹了bootstrap.yml讀取nacos配置中心的配置文件問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-12-12