Spring3?中?RabbitMQ?的使用與常見場景分析
一、初識 MQ
傳統(tǒng)的單體架構,分布式架構的同步調用里,無論是方法調用,還是 OpenFeign 難免會有以下問題:
- 擴展性差(高耦合,需要依賴對應的服務,同樣的事件,不斷有新需求,這個事件的業(yè)務代碼會越來越臃腫,這些子業(yè)務都寫在一起)
- 性能下降(等待響應,最終整個業(yè)務的響應時長就是每次遠程調用的執(zhí)行時長之和)
- 級聯(lián)失?。ㄈ绻且粋€事務,一個服務失敗就會導致全部回滾,若是分布式事務就更加麻煩了,但其實一些行為的失敗不應該導致整體回滾)
- 服務宕機(如果服務調用者未考慮服務提供者的性能,導致提供者因為過度請求而宕機)
但如果不是很要求同步調用,其實也可以用異步調用,如果是單體架構,你可能很快能想到一個解決方案,就是阻塞隊列實現(xiàn)消息通知:
但是在分布式架構下,可能就需要一個中間件級別的阻塞隊列,這就是我們要學習的 Message Queue 消息隊列,簡稱 MQ,而現(xiàn)在流行的 MQ 還不少,在實現(xiàn)其基本的消息通知功能外,還有一些不錯的擴展
以 RabbitMQ 和 Kafka 為例:
RabbitMQ | Kafka | |
---|---|---|
公司/社區(qū) | Rabbit | Apache |
開發(fā)語言 | Erlang | Scala & Java |
協(xié)議支持 | AMQP,XMPP,SMTP,STOMP | 自定義協(xié)議 |
可用性 | 高 | 高 |
單機吞吐量 | 一般 | 非常高(Kafka 亮點) |
消息延遲 | 微秒級 | 毫秒以內 |
消息可靠性 | 高 | 一般 |
消息延遲指的是,消息到隊列,并在隊列中“就緒”的時間與預期時間的差距,其實就是數(shù)據(jù)在中間件中流動的耗時,預期時間可以是現(xiàn)在、幾毫秒后、幾秒后、幾天后…
據(jù)統(tǒng)計,目前國內消息隊列使用最多的還是 RabbitMQ,再加上其各方面都比較均衡,穩(wěn)定性也好,因此我們課堂上選擇 RabbitMQ 來學習。
二、RabbitMQ 安裝
Docker 安裝 RabbitMQ:
mkdir /root/mq cd /root/mq docker rm mq-server -f docker rmi rabbitmq:3.8-management -f docker volume rm mq-plugins -f docker pull rabbitmq:3.8-management # 插件數(shù)據(jù)卷最好還是直接掛載 volume,而不是掛載我們的目錄 docker run \ --name mq-server \ -e RABBITMQ_DEFAULT_USER=xxx \ -e RABBITMQ_DEFAULT_PASS=xxx \ --hostname mq1 \ -v mq-plugins:/plugins \ -p 15672:15672 \ -p 5672:5672 \ -d rabbitmq:3.8-management
三、RabbitMQ 基本知識
(1)架構
15672:RabbitMQ 提供的管理控制臺的端口
5672:RabbitMQ 的消息發(fā)送處理接口
用戶名密碼就是安裝時,啟動容器時指定的用戶名密碼
MQ 對應的就是這里的消息代理 Broker:
RabbitMQ 詳細架構圖:
其中包含幾個概念:
publisher
:生產者,也就是發(fā)送消息的一方consumer
:消費者,也就是消費消息的一方queue
:隊列,存儲消息。生產者投遞的消息會暫存在消息隊列中,等待消費者處理exchange
:交換機,負責消息路由。生產者發(fā)送的消息由交換機決定投遞到哪個隊列。virtual host
:虛擬主機,起到數(shù)據(jù)隔離的作用。每個虛擬主機相互獨立,有各自的 exchange、queue
現(xiàn)在你可能只認識生產者、消費者、隊列,其他是什么呢?
其實你可以理解為 MQ 也是存儲東西的,存儲的就是消息,virtual host 就是數(shù)據(jù)庫,queue 就是表,消息就是一行數(shù)據(jù),而 MQ 有特殊的機制,消息先通過 exchange 再決定前往哪個 queue
管理控制臺的使用就不多說了
(2)五大模式
這只是最常見的五種模式:
簡單模式
工作模式
發(fā)布訂閱模式
關聯(lián)交換機的隊列都能收到一份消息,廣播
路由模式
關聯(lián)交換機時,提供 routing key(可以是多個,隊列之間可以重復),發(fā)布消息時提供一個 routing key,由此發(fā)送給指定的隊列
值得注意的是,簡單模式和工作模式,其實也是有交換機的,任何隊列都會綁定一個默認交換機
""
,類型是 direct,routing key 為隊列的名稱
主題模式
路由模式的基礎上,隊列關聯(lián)交換機時 routing key 可以是帶通配符的
routing key 的單詞通過
.
分割,#
匹配 n 個單詞(n ≥ 0),*
只匹配一個單詞例如 #.red:
可以匹配的 routing key:p1.red、red、p2.p1.red
在發(fā)布消息時,要使用具體的 routing key,交換機發(fā)送給匹配的隊列
(3)數(shù)據(jù)隔離 隔離 virtual host
隔離用戶(賦予訪問權限)
四、RabbitMQ 基本使用 Spring AMQP
引入 RabbitMQ 相關的 SDK,可以通過創(chuàng)建連接 Connection、創(chuàng)建通道 Channel,用 Channel 進行操作,接受消息也差不多,不過多演示:
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立連接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.設置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼 factory.setHost("xx.xx.xx.xx"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("xxx"); factory.setPassword("xxx"); // 1.2.建立連接 Connection connection = factory.newConnection(); // 2.創(chuàng)建通道Channel Channel channel = connection.createChannel(); // 3.創(chuàng)建隊列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.發(fā)送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("發(fā)送消息成功:【" + message + "】"); // 5.關閉通道和連接 channel.close(); connection.close(); } }
但比較麻煩,Spring AMQP 框架可以自動裝配 RabbitMQ 的操作對象 RabbitTemplate,這樣我們就可以更方便的操作 MQ,并充分發(fā)揮其特性
<!--AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
默認包含 RabbitMQ 的實現(xiàn),如果你想對接其他 AMQP 協(xié)議的 MQ,得自己實現(xiàn)其抽象封裝的接口
(1)發(fā)送消息
注意,下面是 Spring3 的寫法,所以會有點不一樣,可能看不懂,稍后解釋!
消息發(fā)送器封裝:
@Repository @RequiredArgsConstructor @Slf4j public class RabbitMQSender { private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread"); private final RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setTaskExecutor(EXECUTOR); } private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> { log.error("處理 ack 回執(zhí)失敗, {}", ex.getMessage()); return null; }; private MessagePostProcessor delayMessagePostProcessor(long delay) { return message -> { // 小于 0 也是立即執(zhí)行 // setDelay 才是給 RabbitMQ 看的,setReceivedDelay 是給 publish-returns 看的 message.getMessageProperties().setDelay((int) Math.max(delay, 0)); return message; }; }; private CorrelationData newCorrelationData() { return new CorrelationData(UUIDUtil.uuid32()); } /** * @param exchange 交換機 * @param routingKey routing key * @param msg 消息 * @param delay 延遲時間(如果是延遲交換機,delay 才有效) * @param maxRetries 最大重試機會 * @param <T> 消息的對象類型 */ private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){ log.info("準備發(fā)送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}", exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries); CorrelationData correlationData = newCorrelationData(); MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay); correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() { private int retryCount = 0; // 一次 send 從始至終都用的是一個 Consumer 對象,所以作用的都是同一個計數(shù)器 @Override public void accept(CorrelationData.Confirm confirm) { Optional.ofNullable(confirm).ifPresent(c -> { if(c.isAck()) { log.info("ACK {} 消息成功到達,{}", correlationData.getId(), c.getReason()); } else { log.warn("NACK {} 消息未能到達,{}", correlationData.getId(), c.getReason()); if(retryCount >= maxRetries) { log.error("次數(shù)到達上限 {}", maxRetries); return; } retryCount++; log.warn("開始第 {} 次重試", retryCount); CorrelationData cd = newCorrelationData(); cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR); rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd); } }); } }, EXECUTOR); rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData); } public void sendMessage(String exchange, String routingKey, Object msg) { send(exchange, routingKey, msg, 0, 0); } public void sendDelayMessage(String exchange, String routingKey, Object msg, long delay){ send(exchange, routingKey, msg, delay, 0); } public void sendWithConfirm(String exchange, String routingKey, Object msg, int maxReties) { send(exchange, routingKey, msg, 0, maxReties); } public void sendDelayMessageWithConfirm(String exchange, String routingKey, Object msg, long delay, int maxReties) { send(exchange, routingKey, msg, delay, maxReties); } }
(2)接受消息
監(jiān)聽器:
- RabbitTemplate 是可以主動獲取消息的,也可以不實時監(jiān)聽,但是一般情況都是監(jiān)聽,有消息就執(zhí)行
- 監(jiān)聽的是 queue,若 queue 不存在,就會根據(jù)注解創(chuàng)建一遍
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "xxx"), exchange = @Exchange(name = "xxx", delayed = "true"), key = {"xxx"} )) public void xxx(X x) { }
(3)聲明交換機與隊列
可以通過 @Bean 創(chuàng)建 Bean 對象的方式去聲明,可以自行搜索,我更喜歡監(jiān)聽器注解的形式,而且 Bean 的方式,可能會因為配置不完全一樣,導致其他配置類的交換機隊列無法聲明(現(xiàn)象如此,底層為啥我不知道)
(4)消息轉換器
消息是一個字符串,但為了滿足更多需求,需要將一個對象序列化成一個字符串,但默認的序列化實現(xiàn)貌似用的是 java 對象的序列化,這種方式可能得同一個程序的 java 類才能反序列化成功,所以我們應該選擇分布式的序列化方式,比如 json
@Configuration @RequiredArgsConstructor @Slf4j public class MessageConverterConfig { @Bean public MessageConverter messageConverter(){ // 1. 定義消息轉換器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(JsonUtil.OBJECT_MAPPER); // 2. 配置自動創(chuàng)建消息 id,用于識別不同消息 jackson2JsonMessageConverter.setCreateMessageIds(Boolean.TRUE); return jackson2JsonMessageConverter; } }
這里的 JsonUtil.OBJECT_MAPPER,就是框架的或者自己實現(xiàn)的 ObjectMapper
(5)配置文件
spring: rabbitmq: host: ${xxx.mq.host} # rabbitMQ 的 ip 地址 port: ${xxx.mq.port} # 端口 username: ${xxx.mq.username} password: ${xxx.mq.password} virtual-host: ${xxx.mq.virtual-host} publisher-confirm-type: correlated publisher-returns: true template: mandatory: true # 若是 false 則直接丟棄了,并不會發(fā)送者回執(zhí) listener: simple: prefetch: 1 # 預取為一個(消費完才能拿下一個) concurrency: 2 # 消費者最少 2 個線程 max-concurrency: 10 # 消費者最多 10 個線程 auto-startup: true # 為 false 監(jiān)聽者不會實時創(chuàng)建和監(jiān)聽,為 true 監(jiān)聽的過程中,若 queue 不存在,會再根據(jù)注解進行創(chuàng)建,創(chuàng)建后只監(jiān)聽 queue,declare = "false" 才是不自動聲明 default-requeue-rejected: false # 拒絕后不 requeue(成為死信,若沒有綁定死信交換機,就真的丟了) acknowledge-mode: auto # 消費者執(zhí)行成功 ack、異常 nack(manual 為手動、none 代表無論如何都是 ack) retry: # 這個屬于 spring amqp 的 retry 機制 enabled: false # 不開啟失敗重試 # initial-interval: 1000 # multiplier: 2 # max-attempts: 3 # stateless: true # true 代表沒有狀態(tài),若有消費者包含事務,這里改為 false
五、常見問題
(1)RabbitMQ 如何保證消息可靠性
保證消息可靠性、不丟失。主要從三個層面考慮
如果報錯可以先記錄到日志中,再去修復數(shù)據(jù)(保底)
1、生產者確認機制
生產者確認機制,確保生產者的消息能到達隊列
publisher-confirm,針對的是消息從發(fā)送者到交換機的可靠性,成功則進行下一步,失敗返回 NACK
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread"); private final RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setTaskExecutor(EXECUTOR); } private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> { log.error("處理 ack 回執(zhí)失敗, {}", ex.getMessage()); return null; }; private MessagePostProcessor delayMessagePostProcessor(long delay) { return message -> { // 小于 0 也是立即執(zhí)行 // setDelay 才是給 RabbitMQ 看的,setReceivedDelay 是給 publish-returns 看的 message.getMessageProperties().setDelay((int) Math.max(delay, 0)); return message; }; }; private CorrelationData newCorrelationData() { return new CorrelationData(UUIDUtil.uuid32()); } /** * @param exchange 交換機 * @param routingKey routing key * @param msg 消息 * @param delay 延遲時間(如果是延遲交換機,delay 才有效) * @param maxRetries 最大重試機會 * @param <T> 消息的對象類型 */ private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){ log.info("準備發(fā)送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}", exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries); CorrelationData correlationData = newCorrelationData(); MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay); correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() { private int retryCount = 0; // 一次 send 從始至終都用的是一個 Consumer 對象,所以作用的都是同一個計數(shù)器 @Override public void accept(CorrelationData.Confirm confirm) { Optional.ofNullable(confirm).ifPresent(c -> { if(c.isAck()) { log.info("ACK {} 消息成功到達,{}", correlationData.getId(), c.getReason()); } else { log.warn("NACK {} 消息未能到達,{}", correlationData.getId(), c.getReason()); if(retryCount >= maxRetries) { log.error("次數(shù)到達上限 {}", maxRetries); return; } retryCount++; log.warn("開始第 {} 次重試", retryCount); CorrelationData cd = newCorrelationData(); cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR); rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd); } }); } }, EXECUTOR); rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData); }
Spring3 的 RabbitMQ Confirm,需要配置為 correlated,發(fā)送消息時提供 CorrelationData,也就是與消息關聯(lián)的數(shù)據(jù),包括發(fā)送者確認時的回調方法
要想提供 Confirm 的回調辦法,需要配置 correlationData.getFuture() 返回的 CompletableFuture 對象(新的 JUC 工具類,可以查一查如何使用)
配置后,在未來根據(jù)回調函數(shù)進行處理(當然也可以直接設置在 RabbitTemplate 對象的 ConfirmCallBack)
還可以自己實現(xiàn)消息的發(fā)送者重試:
publisher-returns,針對的是消息從交換機到隊列的可靠性,成功則返回 ACK,失敗觸發(fā) returns 的回調方法
@Component @RequiredArgsConstructor @Slf4j public class PublisherReturnsCallBack implements RabbitTemplate.ReturnsCallback { // 不存在 routing key 對應的隊列,那在我看來轉發(fā)到零個是合理的現(xiàn)象,但在這里也認為是路由失?。∕Q 認為消息一定至少要進入一個隊列,之后才能被處理,這就是可靠性)(反正就是回執(zhí)了,你愛咋處理是你自己的事情) @Override public void returnedMessage(ReturnedMessage returnedMessage) { // 可能一些版本的 mq 會因為是延時交換機,導致發(fā)送者回執(zhí),只要沒有 NACK 這種情況其實并不是不可靠(其實我也不知道有沒有版本會忽略) // 但是其實不忽略也不錯,畢竟者本來就是特殊情況,一般交換機是不存儲的,但是這個臨時存儲消息 // 但這樣也就代表了,延時后消息路由失敗是沒法再次處理的(因為我們交給延時交換機后就不管了,可靠性有 mq 自己保持) MessageProperties messageProperties = returnedMessage.getMessage().getMessageProperties(); // 這里的 message 并不是原本的 message,是額外的封裝,x-delay 在 publish-returns 里面封裝到 receiveDelay 里了 Integer delay = messageProperties.getReceivedDelay(); // 如果不是延時交換機,卻設置了 delay 大于 0,是不會延時的,所以是其他原因導致的(以防萬一把消息記錄到日志里) if(Objects.nonNull(delay) && delay.compareTo(0) > 0) { log.info("交換機 {}, 路由鍵 {} 消息 {} 延遲 {} s", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), messageProperties, TimeUnit.MILLISECONDS.toSeconds(delay)); return; } log.warn("publisher-returns 發(fā)送者回執(zhí)(應答碼{},應答內容{})(消息 {} 成功到達交換機 {},但路由失敗,路由鍵為 {})", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage(), returnedMessage.getExchange(), returnedMessage.getRoutingKey()); } }
RabbitMQSender:
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread"); private final RabbitTemplate rabbitTemplate; private final PublisherReturnsCallBack publisherReturnsCallBack; @PostConstruct public void init() { rabbitTemplate.setTaskExecutor(EXECUTOR); // 設置統(tǒng)一的 publisher-returns(confirm 也可以設置統(tǒng)一的,但最好還是在發(fā)送時設置在 future 里) // rabbitTemplate 的 publisher-returns 同一時間只能存在一個 // 因為 publisher confirm 后,其實 exchange 有沒有轉發(fā)成功,publisher 沒必要每次發(fā)送都關注這個 exchange 的內部職責,更多的是“系統(tǒng)與 MQ 去約定” rabbitTemplate.setReturnsCallback(publisherReturnsCallBack); }
同理你也可以按照自己的想法進行重試…
在測試練習階段里,這個過程是異步回調的,如果是單元測試,發(fā)送完消息進程就結束了,可能就沒回調,程序就結束了,自然就看不到回調時的日志
如果既沒有 ACK 也沒有 NACK,也沒有發(fā)布者回執(zhí),那就相當于這個消息銷聲匿跡了,沒有任何的回應,那么就會拋出異常,我們可以處理這個異常,比如打印日志、重發(fā)之類的…
private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> { log.error("處理 ack 回執(zhí)失敗, {}", ex.getMessage()); return null; };
2、持久化
消息隊列的數(shù)據(jù)持久化,確保消息未消費前在隊列中不會丟失,其中的交換機、隊列、和消息都要做持久化
默認都是持久化的
3、消費者確認
隊列的消息出隊列,并不會立即刪除,而是等待消費者返回 ACK 或者 NACK
消費者要什么時候發(fā)送 ACK 呢?
- 1)RabbitMQ投遞消息給消費者
- 2)消費者獲取消息后,返回ACK給RabbitMQ
- 3)RabbitMQ刪除消息
- 4)消費者宕機,消息尚未處理
如果出現(xiàn)這種場景,就是不可靠的,所以應該是消息處理后,再發(fā)送 ACK
Spring AMQP 有三種消費者確認模式:
- manual,手段 ack,自己用 rabbitTemplate 去發(fā)送 ACK/NACK(這個比較麻煩,不用 RabbitListener 接受消息才必須用這個)
- auto,配合 RabbitListener 注解,代碼若出現(xiàn)異常,NACK,成功則 ACK
- none,獲得消息后直接 ACK,無論是否執(zhí)行成功
出現(xiàn) NACK 后要如何處理(此過程還在我們的服務器):
- 拒絕(默認)
- 重新入隊列
- 返回 ACK,消費者重新發(fā)布消息指定的交換機
@Configuration @RequiredArgsConstructor @Slf4j public class MessageRecovererConfig { @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) { return new RejectAndDontRequeueRecoverer(); // nack、直接 reject 和不 requeue,成為死信(默認) // return new ImmediateRequeueMessageRecoverer(); // nack、requeue // return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); // ack、發(fā)送給指定的交換機,confirm 機制需要設置到 rabbitTemplate 里 } }
Spring 提供的 retry 機制,在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊列。
spring: rabbitmq: listener: simple: acknowledge-mode: auto # 消費者執(zhí)行成功 ack、異常 nack(manual 為手動、none 代表無論如何都是 ack) retry: # 這個屬于 spring amqp 的 retry 機制 enabled: false # 不開啟失敗重試 initial-interval: 1000 # 第一次重試時間間隔 multiplier: 3 # 每次重試間隔的倍數(shù) max-attempts: 4 # 最大接受次數(shù) stateless: true # true 代表沒有狀態(tài),若有消費者包含事務,這里改為 false
解釋:第一次失敗,一秒后重試、第二次失敗,三秒后重試,第三次失敗,九秒后重試,第四次失敗就沒機會了(SpringAMQP會拋出異常AmqpRejectAndDontRequeueException)
失敗之后根據(jù)對應的處理策略進行處理
(2)死信交換機
消息過期、消息執(zhí)行失敗并且不重試也不重新入隊列,堆積過多等情況,消息會成為死信,若隊列綁定死信交換機,則轉發(fā)給死信交換機,若沒有則直接丟棄
隊列1 -> 死信交換機 -> 隊列2,這個過程是消息隊列內部保證的可靠性,消息也沒有包含原發(fā)送者的信息,甚至連接已經(jīng)斷開了,所以沒有 publisher-confirm 也沒有 publisher-returns
這個機制和 republish 有點像,但是有本質的區(qū)別,republish 是消費者重發(fā),而這里是隊列將死信轉發(fā)給死信交換機
死信的情況:
- nack && requeue == false
- 超時未消費
- 隊列滿了,由于隊列的特性,隊列頭會先成為死信
(3)延遲功能如何實現(xiàn)
剛才提到死信的誕生可能是超時未消費,那么其實這個點也可以簡單的實現(xiàn)一個延遲隊列:
隊列為一個不被監(jiān)聽的專門用來延遲消息發(fā)送的緩沖帶,其死信交換機才是目標交換機,
message.getMessageProperties().setExpiration("1000");
設置的是過期時間,其本意并不是延遲,是可以實現(xiàn)延遲~
另外,隊列本身也能設置 ttl 過期時間,但并不是隊列的過期時間(顯然不合理,截止后無論啥都丟了,冤不冤啊,至少我想不到這種場景),而是隊列中的消息存活的最大時間,消息的過期時間和這個取一個最小值才是真實的過期時間
值得注意的是,雖然能實現(xiàn)延時消息的功能,但是
實現(xiàn)復雜延遲可能不準確,因為隊列的特性,如果隊列頭未出隊列,哪怕其后者出現(xiàn)死信,也只能乖乖等前面的先出去之后才能前往死信交換機(例如消息的 ttl 分別為 9s、3s、1s,最終三個消息會被同時轉發(fā),因為“最長壽的”排在了前面)
這種方式的順序優(yōu)先級大于時間優(yōu)先級
而 RabbitMQ 也提供了一個插件,叫 DelayExchange 延時交換機,專門用來實現(xiàn)延時功能
Scheduling Messages with RabbitMQ | RabbitMQ
請自行上網(wǎng)下載
延時交換機的聲明:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayMessage(String msg){ log.info("接收到delay.queue的延遲消息:{}", msg); }
延時消息的發(fā)送:
private MessagePostProcessor delayMessagePostProcessor(long delay) { return message -> { // 小于 0 也是立即執(zhí)行 message.getMessageProperties().setDelay((int) Math.max(delay, 0)); return message; }; };
這里設置的是 Delay,不是過期時間,哪怕超過了時間也不叫做死信
期間一直存在延時交換機的硬存里,延遲消息插件內部會維護一個本地數(shù)據(jù)庫表,同時使用 Elang Timers 功能實現(xiàn)計時。如果消息的延遲時間設置較長,可能會導致堆積的延遲消息非常多,會帶來較大的CPU開銷,同時延遲消息的時間會存在誤差。
(4)消息堆積如何解決
死信的成因還可能是堆疊過多
我在實際的開發(fā)中,沒遇到過這種情況,不過,如果發(fā)生了堆積的問題,解決方案也所有很多的
- 提高消費者的消費能力 ,可以使用多線程消費任務
- 增加更多消費者,提高消費速度,使用工作隊列模式, 設置多個消費者消費消費同一個隊列中的消息
- 擴大隊列容積,提高堆積上限
但是,RabbitMQ 隊列占的是內存,間接性的落盤,提高上限最終的結果很有可能就是反復落庫,特別不穩(wěn)定,且并沒有解決消息堆積過多的問題
我們可以使用 RabbitMQ 惰性隊列,惰性隊列的好處主要是
- 接收到消息后直接存入磁盤而非內存,雖然慢,但沒有間歇性的 page-out,性能比較穩(wěn)定
- 消費者要消費消息時才會從磁盤中讀取并加載到內存,正常消費后就刪除了
- 基于磁盤存儲,消息上限高,支持數(shù)百萬條的消息存儲
聲明方式:
而要設置一個隊列為惰性隊列,只需要在聲明隊列時,指定x-queue-mode屬性為lazy即可。可以通過命令行將一個運行中的隊列修改為惰性隊列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
- rabbitmqctl :RabbitMQ的命令行工具
- set_policy :添加一個策略
- Lazy :策略名稱,可以自定義
- "^lazy-queue$" :用正則表達式匹配隊列的名字
- '{"queue-mode":"lazy"}' :設置隊列模式為lazy模式
- --apply-to queues :策略的作用對象,是所有的隊列
x-queue-mode 參數(shù)的值為 lazy
@RabbitListener(bindings = @QueueBinding( exchange = @Exchange(name = "xxx"), value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-mode", value = "lazy")), key = "xxx" ))
交換機、隊列擴展屬性叫參數(shù),消息的拓展屬性叫頭部,擴展屬性一般都以 x- 開頭(extra)
消息堆積問題的解決方案?
- 隊列上綁定多個消費者,提高消費速度
- 使用惰性隊列,可以再mq中保存更多消息
惰性隊列的優(yōu)點有哪些?
- 基于磁盤存儲,消息上限高
- 沒有間歇性的 page-out,性能比較穩(wěn)定
惰性隊列的缺點有哪些?
- 基于磁盤存儲,消息時效性會降低
- 性能受限于磁盤的IO
(5)高可用如何保證
RabbitMQ 在服務大規(guī)模項目時,一般情況下不會像數(shù)據(jù)庫那樣存儲的瓶頸,用惰性隊列已經(jīng)是很頂天了的,其特性和用途不會有太極端的存儲壓力
更多的是在并發(fā)情況下,處理消息的能力有瓶頸,可能出現(xiàn)節(jié)點宕機的情況,而避免單節(jié)點宕機,數(shù)據(jù)丟失、無法提供服務等問題需要解決,也就是需要保證高可用性
Erlang 是一種面向并發(fā)的語言,天然支持集群模式,RabbitMQ 的集群有兩種模式:
- 普通集群:是一種分布式集群,將隊列分散到集群的各個節(jié)點,從而提高整個集群的并發(fā)能力
- 鏡像集群:是一種主從集群,在普通集群的基礎上,添加了主從備份的功能,提高集群的數(shù)據(jù)可用性
鏡像集群雖然支持主從,但主從同步并不是強一致的,某些情況下可能有數(shù)據(jù)丟失的風險(雖然重啟能解決,但那不是強一致,而是最終一致),因此 RabbitMQ 3.8 以后,推出了新的功能:仲裁隊列來代替鏡像集群,底層采用 Raft 協(xié)議確保主從的數(shù)據(jù)一致性
1、普通集群
- 各個節(jié)點之間,實時同步 MQ 元數(shù)據(jù)(一些靜態(tài)的共享的數(shù)據(jù)):
- 交換機的信息隊列的信息
但不包括隊列中的消息(動態(tài)的數(shù)據(jù)不同步)
監(jiān)聽隊列的時候,如果監(jiān)聽的節(jié)點不存在該隊列(只是知道元數(shù)據(jù)),當前節(jié)點會訪問隊列所在的節(jié)點,該節(jié)點返回數(shù)據(jù)到當前節(jié)點并返回給監(jiān)聽者
隊列所在節(jié)點宕機,隊列中的消息就會“丟失”(是在重啟之前,這個消息就消失無法被處理的意思)
如何部署,上網(wǎng)搜搜就行
2、鏡像集群
各個節(jié)點之間,實時同步 MQ 元數(shù)據(jù)(一些靜態(tài)的共享的數(shù)據(jù)):
- 交換機的信息
- 隊列的信息
本質是主從模式,創(chuàng)建隊列的節(jié)點為主節(jié)點,其他節(jié)點為鏡像節(jié)點,隊列中的消息會從主節(jié)點備份到鏡像節(jié)點中
注意
- 像 Redis 那樣的主從集群,同步都是全部同步來著
- 但 RabbitMQ 集群的主從模式比較特別,他的粒度是隊列,而不是全部
也就是說,一個隊列的主節(jié)點,可能是另一個隊列的鏡像節(jié)點,所以分析某個場景的時候,要確認是哪個隊列,單獨進行觀察分析討論
- 不同隊列之間只有交互,不會相互影響數(shù)據(jù)同步
針對某一個隊列,所有寫操作都在主節(jié)點完成,然后同步給鏡像節(jié)點,讀操作任何一個都 ok
主節(jié)點宕機,鏡像節(jié)成為新的主節(jié)點
鏡像集群有三種模式:
- exactly 準確模式,指定副本數(shù) count = 主節(jié)點數(shù) 1 + 鏡像節(jié)點數(shù),集群會盡可能的維護這個數(shù)值,如果鏡像節(jié)點出現(xiàn)故障,就在另一個節(jié)點上創(chuàng)建鏡像,比較建議這種模式,可以設置為 N/2 + 1
- all 全部模式,count = N,主節(jié)點外全部都是鏡像節(jié)點
- nodes 模式,指定鏡像節(jié)點名稱列表,隨機一個作為主節(jié)點,如果列表里的節(jié)點都不存在或不可用,則創(chuàng)建隊列時的節(jié)點作為主節(jié)點,之后訪問集群,列表中的節(jié)點若存在才會創(chuàng)建鏡像節(jié)點
沒有鏡像節(jié)點其實就相當于普通模式了
如何配置上網(wǎng)搜搜就行,比較麻煩,需要設置策略,以及匹配的隊列(不同隊列分開來討論,可以設置不同的策略)
3、仲裁隊列
RabbitMQ 3.8 以后,推出了新的功能仲裁隊列來
- 代替鏡像集群,都是主從模式,支持主從數(shù)據(jù)同步,默認是 exactly count = 5
- 約定大于配置,使用非常簡單沒有復雜的配置,隊列的類型選擇 Quorum 即可
- 底層采用 Raft 協(xié)議確保主從的數(shù)據(jù)強一致性
Spring Boot 配置:
仲裁隊列聲明:
@RabbitListener(bindings = @QueueBinding( exchange = @Exchange(name = "xxx"), value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-type", value = "quorum")), key = "xxx" ))
隊列不聲明默認就是普通集群,這里聲明的仲裁隊列也只是針對一個隊列
(6)消息重復消費問題
在保證MQ消息不重復的情況下,MQ 的一條消息被消費者消費了多次
消費者消費消息成功后,在給MQ發(fā)送消息確認的時候出現(xiàn)了網(wǎng)絡異?;蛘呤欠斟礄C,MQ 遲遲沒有接收到 ACK 也沒有 NACK,此時 MQ 不會將發(fā)送的消息刪除,按兵不動,消費者重新監(jiān)聽或者有其他消費者的時候,交由它消費,而這條消息如果在之前就消費過了的話,則會導致重復消費
解決方案:
- 消息消費的業(yè)務本身具有冪等性,再次處理相同消息時不會產生副作用,一些時候可能需要用到分布式鎖去維護冪等性
- 比如一個訂單的狀態(tài)設置為結束,那重復消費的結果一致
- 記錄消息的唯一標識,如果消費過了的,則不再消費
- 消費成功將 id 緩存起來,消費時查詢緩存里是否有這條消息
- 設置允許的緩存時間時,你不必想得太極端,一般很快就有消費者繼續(xù)監(jiān)聽拿到消息,哪怕真有那個情況,這里帶來的損失大概率可以忽略不記了,一切要結合實際情況!
有時候兩種方案沒有嚴格的界定
到此這篇關于Spring3 中 RabbitMQ 的使用與常見場景的文章就介紹到這了,更多相關Spring3 RabbitMQ 使用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
- SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
- RabbitMQ在Spring Boot中的使用步驟
- Springboot RabbitMQ 消息隊列使用示例詳解
- Spring Boot中RabbitMQ自動配置的介紹、原理和使用方法
- 詳解SpringBoot中使用RabbitMQ的RPC功能
- SpringMVC和rabbitmq集成的使用案例
- SpringBoot+RabbitMq具體使用的幾種姿勢
- 詳解Spring Cloud Stream使用延遲消息實現(xiàn)定時任務(RabbitMQ)
- SpringBoot之RabbitMQ的使用方法
- spring boot使用RabbitMQ實現(xiàn)topic 主題
相關文章
SpringBoot整合MongoDB實現(xiàn)文檔存儲功能
MongoDB是可以應用于各種規(guī)模的企業(yè)、各個行業(yè)以及各類應用程序的開源數(shù)據(jù)庫,本文將結合MongoDB和SpringBoot實現(xiàn)文檔存儲功能,需要的可以參考下2024-12-12如何在JDK 9中更簡潔使用 try-with-resources 語句
本文詳細介紹了自 JDK 7 引入的 try-with-resources 語句的原理和用法,以及介紹了 JDK 9 對 try-with-resources 的改進,使得用戶可以更加方便、簡潔的使用 try-with-resources 語句。,需要的朋友可以參考下2019-06-06JAVA PDF操作之實現(xiàn)截取N頁和多個PDF合并
這篇文章主要為大家詳細介紹了java關于PDF的一些操作,例如截取N頁并生成新文件,轉圖片以及多個PDF合并,文中的示例代碼講解詳細,感興趣的可以了解下2025-01-01關于@JsonProperty,@NotNull,@JsonIgnore的具體使用
這篇文章主要介紹了關于@JsonProperty,@NotNull,@JsonIgnore的具體使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-08-08