欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring3?中?RabbitMQ?的使用與常見場景分析

 更新時間:2025年02月11日 10:11:09   作者:s:103  
本文介紹了Spring3中RabbitMQ的使用,涵蓋了RabbitMQ的基本知識、五種模式、數(shù)據(jù)隔離、消費者確認、死信交換機、延遲功能、消息堆積解決方法、高可用性以及消息重復消費問題的解決方案,感興趣的朋友跟隨小編一起看看吧

一、初識 MQ

傳統(tǒng)的單體架構,分布式架構的同步調用里,無論是方法調用,還是 OpenFeign 難免會有以下問題:

  • 擴展性差(高耦合,需要依賴對應的服務,同樣的事件,不斷有新需求,這個事件的業(yè)務代碼會越來越臃腫,這些子業(yè)務都寫在一起)
  • 性能下降(等待響應,最終整個業(yè)務的響應時長就是每次遠程調用的執(zhí)行時長之和)
  • 級聯(lián)失?。ㄈ绻且粋€事務,一個服務失敗就會導致全部回滾,若是分布式事務就更加麻煩了,但其實一些行為的失敗不應該導致整體回滾)
  • 服務宕機(如果服務調用者未考慮服務提供者的性能,導致提供者因為過度請求而宕機)

但如果不是很要求同步調用,其實也可以用異步調用,如果是單體架構,你可能很快能想到一個解決方案,就是阻塞隊列實現(xiàn)消息通知:

但是在分布式架構下,可能就需要一個中間件級別的阻塞隊列,這就是我們要學習的 Message Queue 消息隊列,簡稱 MQ,而現(xiàn)在流行的 MQ 還不少,在實現(xiàn)其基本的消息通知功能外,還有一些不錯的擴展

以 RabbitMQ 和 Kafka 為例:

RabbitMQKafka
公司/社區(qū)RabbitApache
開發(fā)語言ErlangScala & Java
協(xié)議支持AMQP,XMPP,SMTP,STOMP自定義協(xié)議
可用性
單機吞吐量一般非常高(Kafka 亮點)
消息延遲微秒級毫秒以內
消息可靠性一般

消息延遲指的是,消息到隊列,并在隊列中“就緒”的時間與預期時間的差距,其實就是數(shù)據(jù)在中間件中流動的耗時,預期時間可以是現(xiàn)在、幾毫秒后、幾秒后、幾天后…

據(jù)統(tǒng)計,目前國內消息隊列使用最多的還是 RabbitMQ,再加上其各方面都比較均衡,穩(wěn)定性也好,因此我們課堂上選擇 RabbitMQ 來學習。

二、RabbitMQ 安裝

Docker 安裝 RabbitMQ:

mkdir /root/mq
cd /root/mq
docker rm mq-server -f
docker rmi rabbitmq:3.8-management -f
docker volume rm mq-plugins -f
docker pull rabbitmq:3.8-management
# 插件數(shù)據(jù)卷最好還是直接掛載 volume,而不是掛載我們的目錄
docker run \
--name mq-server \
-e RABBITMQ_DEFAULT_USER=xxx \
-e RABBITMQ_DEFAULT_PASS=xxx \
--hostname mq1 \
-v mq-plugins:/plugins \
-p 15672:15672 \
-p 5672:5672 \
-d rabbitmq:3.8-management

三、RabbitMQ 基本知識

(1)架構

15672:RabbitMQ 提供的管理控制臺的端口

5672:RabbitMQ 的消息發(fā)送處理接口

用戶名密碼就是安裝時,啟動容器時指定的用戶名密碼

MQ 對應的就是這里的消息代理 Broker:

RabbitMQ 詳細架構圖:

其中包含幾個概念:

  • publisher:生產者,也就是發(fā)送消息的一方
  • consumer:消費者,也就是消費消息的一方
  • queue:隊列,存儲消息。生產者投遞的消息會暫存在消息隊列中,等待消費者處理
  • exchange:交換機,負責消息路由。生產者發(fā)送的消息由交換機決定投遞到哪個隊列。
  • virtual host:虛擬主機,起到數(shù)據(jù)隔離的作用。每個虛擬主機相互獨立,有各自的 exchange、queue

現(xiàn)在你可能只認識生產者、消費者、隊列,其他是什么呢?

其實你可以理解為 MQ 也是存儲東西的,存儲的就是消息,virtual host 就是數(shù)據(jù)庫,queue 就是表,消息就是一行數(shù)據(jù),而 MQ 有特殊的機制,消息先通過 exchange 再決定前往哪個 queue

管理控制臺的使用就不多說了

(2)五大模式

這只是最常見的五種模式:

簡單模式

工作模式

發(fā)布訂閱模式

關聯(lián)交換機的隊列都能收到一份消息,廣播

路由模式

關聯(lián)交換機時,提供 routing key(可以是多個,隊列之間可以重復),發(fā)布消息時提供一個 routing key,由此發(fā)送給指定的隊列

值得注意的是,簡單模式和工作模式,其實也是有交換機的,任何隊列都會綁定一個默認交換機 "",類型是 direct,routing key 為隊列的名稱

主題模式

路由模式的基礎上,隊列關聯(lián)交換機時 routing key 可以是帶通配符的

routing key 的單詞通過 . 分割, # 匹配 n 個單詞(n ≥ 0),* 只匹配一個單詞

例如 #.red:

可以匹配的 routing key:p1.red、red、p2.p1.red

在發(fā)布消息時,要使用具體的 routing key,交換機發(fā)送給匹配的隊列

(3)數(shù)據(jù)隔離 隔離 virtual host

隔離用戶(賦予訪問權限)

四、RabbitMQ 基本使用 Spring AMQP

引入 RabbitMQ 相關的 SDK,可以通過創(chuàng)建連接 Connection、創(chuàng)建通道 Channel,用 Channel 進行操作,接受消息也差不多,不過多演示:

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立連接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼
        factory.setHost("xx.xx.xx.xx");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xxx");
        factory.setPassword("xxx");
        // 1.2.建立連接
        Connection connection = factory.newConnection();
        // 2.創(chuàng)建通道Channel
        Channel channel = connection.createChannel();
        // 3.創(chuàng)建隊列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);
        // 4.發(fā)送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("發(fā)送消息成功:【" + message + "】");
        // 5.關閉通道和連接
        channel.close();
        connection.close();
    }
}

但比較麻煩,Spring AMQP 框架可以自動裝配 RabbitMQ 的操作對象 RabbitTemplate,這樣我們就可以更方便的操作 MQ,并充分發(fā)揮其特性

<!--AMQP依賴,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

默認包含 RabbitMQ 的實現(xiàn),如果你想對接其他 AMQP 協(xié)議的 MQ,得自己實現(xiàn)其抽象封裝的接口

(1)發(fā)送消息

注意,下面是 Spring3 的寫法,所以會有點不一樣,可能看不懂,稍后解釋!

消息發(fā)送器封裝:

@Repository
@RequiredArgsConstructor
@Slf4j
public class RabbitMQSender {
    private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");
    private final RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() {
        rabbitTemplate.setTaskExecutor(EXECUTOR);
    }
    private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
        log.error("處理 ack 回執(zhí)失敗, {}", ex.getMessage());
        return null;
    };
    private MessagePostProcessor delayMessagePostProcessor(long delay) {
        return message -> {
            // 小于 0 也是立即執(zhí)行
            // setDelay 才是給 RabbitMQ 看的,setReceivedDelay 是給 publish-returns 看的
            message.getMessageProperties().setDelay((int) Math.max(delay, 0));
            return message;
        };
    };
    private CorrelationData newCorrelationData() {
        return new CorrelationData(UUIDUtil.uuid32());
    }
    /**
     * @param exchange 交換機
     * @param routingKey routing key
     * @param msg 消息
     * @param delay 延遲時間(如果是延遲交換機,delay 才有效)
     * @param maxRetries 最大重試機會
     * @param <T> 消息的對象類型
     */
    private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){
        log.info("準備發(fā)送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",
                exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);
        CorrelationData correlationData = newCorrelationData();
        MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);
        correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {
            private int retryCount = 0; // 一次 send 從始至終都用的是一個 Consumer 對象,所以作用的都是同一個計數(shù)器
            @Override
            public void accept(CorrelationData.Confirm confirm) {
                Optional.ofNullable(confirm).ifPresent(c -> {
                    if(c.isAck()) {
                        log.info("ACK {} 消息成功到達,{}", correlationData.getId(), c.getReason());
                    } else {
                        log.warn("NACK {} 消息未能到達,{}", correlationData.getId(), c.getReason());
                        if(retryCount >= maxRetries) {
                            log.error("次數(shù)到達上限 {}", maxRetries);
                            return;
                        }
                        retryCount++;
                        log.warn("開始第 {} 次重試", retryCount);
                        CorrelationData cd = newCorrelationData();
                        cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);
                        rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);
                    }
                });
            }
        }, EXECUTOR);
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);
    }
    public void sendMessage(String exchange, String routingKey, Object msg) {
        send(exchange, routingKey, msg, 0, 0);
    }
    public void sendDelayMessage(String exchange, String routingKey, Object msg, long delay){
        send(exchange, routingKey, msg, delay, 0);
    }
    public void sendWithConfirm(String exchange, String routingKey, Object msg, int maxReties) {
        send(exchange, routingKey, msg, 0, maxReties);
    }
    public void sendDelayMessageWithConfirm(String exchange, String routingKey, Object msg, long delay, int maxReties) {
        send(exchange, routingKey, msg, delay, maxReties);
    }
}

(2)接受消息

監(jiān)聽器:

  • RabbitTemplate 是可以主動獲取消息的,也可以不實時監(jiān)聽,但是一般情況都是監(jiān)聽,有消息就執(zhí)行
  • 監(jiān)聽的是 queue,若 queue 不存在,就會根據(jù)注解創(chuàng)建一遍
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "xxx"),
    exchange = @Exchange(name = "xxx", delayed = "true"),
    key = {"xxx"}
))
public void xxx(X x) {
}

(3)聲明交換機與隊列

可以通過 @Bean 創(chuàng)建 Bean 對象的方式去聲明,可以自行搜索,我更喜歡監(jiān)聽器注解的形式,而且 Bean 的方式,可能會因為配置不完全一樣,導致其他配置類的交換機隊列無法聲明(現(xiàn)象如此,底層為啥我不知道)

(4)消息轉換器

消息是一個字符串,但為了滿足更多需求,需要將一個對象序列化成一個字符串,但默認的序列化實現(xiàn)貌似用的是 java 對象的序列化,這種方式可能得同一個程序的 java 類才能反序列化成功,所以我們應該選擇分布式的序列化方式,比如 json

@Configuration
@RequiredArgsConstructor
@Slf4j
public class MessageConverterConfig {
    @Bean
    public MessageConverter messageConverter(){
        // 1. 定義消息轉換器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(JsonUtil.OBJECT_MAPPER);
        // 2. 配置自動創(chuàng)建消息 id,用于識別不同消息
        jackson2JsonMessageConverter.setCreateMessageIds(Boolean.TRUE);
        return jackson2JsonMessageConverter;
    }
}

這里的 JsonUtil.OBJECT_MAPPER,就是框架的或者自己實現(xiàn)的 ObjectMapper

(5)配置文件

spring:
  rabbitmq:
    host: ${xxx.mq.host} # rabbitMQ 的 ip 地址
    port: ${xxx.mq.port} # 端口
    username: ${xxx.mq.username}
    password: ${xxx.mq.password}
    virtual-host: ${xxx.mq.virtual-host}
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true # 若是 false 則直接丟棄了,并不會發(fā)送者回執(zhí)
    listener:
      simple:
        prefetch: 1 # 預取為一個(消費完才能拿下一個)
        concurrency: 2 # 消費者最少 2 個線程
        max-concurrency: 10 # 消費者最多 10 個線程
        auto-startup: true # 為 false 監(jiān)聽者不會實時創(chuàng)建和監(jiān)聽,為 true 監(jiān)聽的過程中,若 queue 不存在,會再根據(jù)注解進行創(chuàng)建,創(chuàng)建后只監(jiān)聽 queue,declare = "false" 才是不自動聲明
        default-requeue-rejected: false # 拒絕后不 requeue(成為死信,若沒有綁定死信交換機,就真的丟了)
        acknowledge-mode: auto # 消費者執(zhí)行成功 ack、異常 nack(manual 為手動、none 代表無論如何都是 ack)
        retry: # 這個屬于 spring amqp 的 retry 機制
          enabled: false # 不開啟失敗重試
#          initial-interval: 1000
#          multiplier: 2
#          max-attempts: 3
#          stateless: true # true 代表沒有狀態(tài),若有消費者包含事務,這里改為 false

五、常見問題

(1)RabbitMQ 如何保證消息可靠性

保證消息可靠性、不丟失。主要從三個層面考慮

如果報錯可以先記錄到日志中,再去修復數(shù)據(jù)(保底)

1、生產者確認機制

生產者確認機制,確保生產者的消息能到達隊列

publisher-confirm,針對的是消息從發(fā)送者到交換機的可靠性,成功則進行下一步,失敗返回 NACK

private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
    rabbitTemplate.setTaskExecutor(EXECUTOR);
}
private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
    log.error("處理 ack 回執(zhí)失敗, {}", ex.getMessage());
    return null;
};
private MessagePostProcessor delayMessagePostProcessor(long delay) {
    return message -> {
        // 小于 0 也是立即執(zhí)行
        // setDelay 才是給 RabbitMQ 看的,setReceivedDelay 是給 publish-returns 看的
        message.getMessageProperties().setDelay((int) Math.max(delay, 0));
        return message;
    };
};
private CorrelationData newCorrelationData() {
    return new CorrelationData(UUIDUtil.uuid32());
}
/**
     * @param exchange 交換機
     * @param routingKey routing key
     * @param msg 消息
     * @param delay 延遲時間(如果是延遲交換機,delay 才有效)
     * @param maxRetries 最大重試機會
     * @param <T> 消息的對象類型
     */
private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){
    log.info("準備發(fā)送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",
             exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);
    CorrelationData correlationData = newCorrelationData();
    MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);
    correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {
        private int retryCount = 0; // 一次 send 從始至終都用的是一個 Consumer 對象,所以作用的都是同一個計數(shù)器
        @Override
        public void accept(CorrelationData.Confirm confirm) {
            Optional.ofNullable(confirm).ifPresent(c -> {
                if(c.isAck()) {
                    log.info("ACK {} 消息成功到達,{}", correlationData.getId(), c.getReason());
                } else {
                    log.warn("NACK {} 消息未能到達,{}", correlationData.getId(), c.getReason());
                    if(retryCount >= maxRetries) {
                        log.error("次數(shù)到達上限 {}", maxRetries);
                        return;
                    }
                    retryCount++;
                    log.warn("開始第 {} 次重試", retryCount);
                    CorrelationData cd = newCorrelationData();
                    cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);
                    rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);
                }
            });
        }
    }, EXECUTOR);
    rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);
}

Spring3 的 RabbitMQ Confirm,需要配置為 correlated,發(fā)送消息時提供 CorrelationData,也就是與消息關聯(lián)的數(shù)據(jù),包括發(fā)送者確認時的回調方法

要想提供 Confirm 的回調辦法,需要配置 correlationData.getFuture() 返回的 CompletableFuture 對象(新的 JUC 工具類,可以查一查如何使用)

配置后,在未來根據(jù)回調函數(shù)進行處理(當然也可以直接設置在 RabbitTemplate 對象的 ConfirmCallBack)

還可以自己實現(xiàn)消息的發(fā)送者重試:

publisher-returns,針對的是消息從交換機到隊列的可靠性,成功則返回 ACK,失敗觸發(fā) returns 的回調方法

@Component
@RequiredArgsConstructor
@Slf4j
public class PublisherReturnsCallBack implements RabbitTemplate.ReturnsCallback {
    // 不存在 routing key 對應的隊列,那在我看來轉發(fā)到零個是合理的現(xiàn)象,但在這里也認為是路由失?。∕Q 認為消息一定至少要進入一個隊列,之后才能被處理,這就是可靠性)(反正就是回執(zhí)了,你愛咋處理是你自己的事情)
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        // 可能一些版本的 mq 會因為是延時交換機,導致發(fā)送者回執(zhí),只要沒有 NACK 這種情況其實并不是不可靠(其實我也不知道有沒有版本會忽略)
        // 但是其實不忽略也不錯,畢竟者本來就是特殊情況,一般交換機是不存儲的,但是這個臨時存儲消息
        // 但這樣也就代表了,延時后消息路由失敗是沒法再次處理的(因為我們交給延時交換機后就不管了,可靠性有 mq 自己保持)
        MessageProperties messageProperties = returnedMessage.getMessage().getMessageProperties();
        // 這里的 message 并不是原本的 message,是額外的封裝,x-delay 在 publish-returns 里面封裝到 receiveDelay 里了
        Integer delay = messageProperties.getReceivedDelay();
        // 如果不是延時交換機,卻設置了 delay 大于 0,是不會延時的,所以是其他原因導致的(以防萬一把消息記錄到日志里)
        if(Objects.nonNull(delay) && delay.compareTo(0) > 0) {
            log.info("交換機 {}, 路由鍵 {} 消息 {} 延遲 {} s", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), messageProperties, TimeUnit.MILLISECONDS.toSeconds(delay));
            return;
        }
        log.warn("publisher-returns 發(fā)送者回執(zhí)(應答碼{},應答內容{})(消息 {} 成功到達交換機 {},但路由失敗,路由鍵為 {})",
                returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage(),
                returnedMessage.getExchange(), returnedMessage.getRoutingKey());
    }
}

RabbitMQSender:

private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");
private final RabbitTemplate rabbitTemplate;
private final PublisherReturnsCallBack publisherReturnsCallBack;
@PostConstruct
public void init() {
    rabbitTemplate.setTaskExecutor(EXECUTOR);
    // 設置統(tǒng)一的 publisher-returns(confirm 也可以設置統(tǒng)一的,但最好還是在發(fā)送時設置在 future 里)
    // rabbitTemplate 的 publisher-returns 同一時間只能存在一個
    // 因為 publisher confirm 后,其實 exchange 有沒有轉發(fā)成功,publisher 沒必要每次發(fā)送都關注這個 exchange 的內部職責,更多的是“系統(tǒng)與 MQ 去約定”
    rabbitTemplate.setReturnsCallback(publisherReturnsCallBack);
}

同理你也可以按照自己的想法進行重試…

在測試練習階段里,這個過程是異步回調的,如果是單元測試,發(fā)送完消息進程就結束了,可能就沒回調,程序就結束了,自然就看不到回調時的日志

如果既沒有 ACK 也沒有 NACK,也沒有發(fā)布者回執(zhí),那就相當于這個消息銷聲匿跡了,沒有任何的回應,那么就會拋出異常,我們可以處理這個異常,比如打印日志、重發(fā)之類的…

private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
    log.error("處理 ack 回執(zhí)失敗, {}", ex.getMessage());
    return null;
};

2、持久化

消息隊列的數(shù)據(jù)持久化,確保消息未消費前在隊列中不會丟失,其中的交換機、隊列、和消息都要做持久化

默認都是持久化的

3、消費者確認

隊列的消息出隊列,并不會立即刪除,而是等待消費者返回 ACK 或者 NACK

消費者要什么時候發(fā)送 ACK 呢?

  • 1)RabbitMQ投遞消息給消費者
  • 2)消費者獲取消息后,返回ACK給RabbitMQ
  • 3)RabbitMQ刪除消息
  • 4)消費者宕機,消息尚未處理

如果出現(xiàn)這種場景,就是不可靠的,所以應該是消息處理后,再發(fā)送 ACK

Spring AMQP 有三種消費者確認模式:

  • manual,手段 ack,自己用 rabbitTemplate 去發(fā)送 ACK/NACK(這個比較麻煩,不用 RabbitListener 接受消息才必須用這個)
  • auto,配合 RabbitListener 注解,代碼若出現(xiàn)異常,NACK,成功則 ACK
  • none,獲得消息后直接 ACK,無論是否執(zhí)行成功

出現(xiàn) NACK 后要如何處理(此過程還在我們的服務器):

  • 拒絕(默認)
  • 重新入隊列
  • 返回 ACK,消費者重新發(fā)布消息指定的交換機
@Configuration
@RequiredArgsConstructor
@Slf4j
public class MessageRecovererConfig {
    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RejectAndDontRequeueRecoverer(); // nack、直接 reject 和不 requeue,成為死信(默認)
//        return new ImmediateRequeueMessageRecoverer(); // nack、requeue
//        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); // ack、發(fā)送給指定的交換機,confirm 機制需要設置到 rabbitTemplate 里
    }
}

Spring 提供的 retry 機制,在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊列。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 消費者執(zhí)行成功 ack、異常 nack(manual 為手動、none 代表無論如何都是 ack)
        retry: # 這個屬于 spring amqp 的 retry 機制
          enabled: false # 不開啟失敗重試
          initial-interval: 1000 # 第一次重試時間間隔
          multiplier: 3 # 每次重試間隔的倍數(shù)
          max-attempts: 4 # 最大接受次數(shù)
          stateless: true # true 代表沒有狀態(tài),若有消費者包含事務,這里改為 false

解釋:第一次失敗,一秒后重試、第二次失敗,三秒后重試,第三次失敗,九秒后重試,第四次失敗就沒機會了(SpringAMQP會拋出異常AmqpRejectAndDontRequeueException)

失敗之后根據(jù)對應的處理策略進行處理

(2)死信交換機

消息過期、消息執(zhí)行失敗并且不重試也不重新入隊列,堆積過多等情況,消息會成為死信,若隊列綁定死信交換機,則轉發(fā)給死信交換機,若沒有則直接丟棄

隊列1 -> 死信交換機 -> 隊列2,這個過程是消息隊列內部保證的可靠性,消息也沒有包含原發(fā)送者的信息,甚至連接已經(jīng)斷開了,所以沒有 publisher-confirm 也沒有 publisher-returns

這個機制和 republish 有點像,但是有本質的區(qū)別,republish 是消費者重發(fā),而這里是隊列將死信轉發(fā)給死信交換機

死信的情況:

  • nack && requeue == false
  • 超時未消費
  • 隊列滿了,由于隊列的特性,隊列頭會先成為死信

(3)延遲功能如何實現(xiàn)

剛才提到死信的誕生可能是超時未消費,那么其實這個點也可以簡單的實現(xiàn)一個延遲隊列:

隊列為一個不被監(jiān)聽的專門用來延遲消息發(fā)送的緩沖帶,其死信交換機才是目標交換機,

message.getMessageProperties().setExpiration("1000");

設置的是過期時間,其本意并不是延遲,是可以實現(xiàn)延遲~

另外,隊列本身也能設置 ttl 過期時間,但并不是隊列的過期時間(顯然不合理,截止后無論啥都丟了,冤不冤啊,至少我想不到這種場景),而是隊列中的消息存活的最大時間,消息的過期時間和這個取一個最小值才是真實的過期時間

值得注意的是,雖然能實現(xiàn)延時消息的功能,但是

實現(xiàn)復雜延遲可能不準確,因為隊列的特性,如果隊列頭未出隊列,哪怕其后者出現(xiàn)死信,也只能乖乖等前面的先出去之后才能前往死信交換機(例如消息的 ttl 分別為 9s、3s、1s,最終三個消息會被同時轉發(fā),因為“最長壽的”排在了前面)

這種方式的順序優(yōu)先級大于時間優(yōu)先級

而 RabbitMQ 也提供了一個插件,叫 DelayExchange 延時交換機,專門用來實現(xiàn)延時功能

Scheduling Messages with RabbitMQ | RabbitMQ

請自行上網(wǎng)下載

延時交換機的聲明:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延遲消息:{}", msg);
}

延時消息的發(fā)送:

private MessagePostProcessor delayMessagePostProcessor(long delay) {
    return message -> {
        // 小于 0 也是立即執(zhí)行
        message.getMessageProperties().setDelay((int) Math.max(delay, 0));
        return message;
    };
};

這里設置的是 Delay,不是過期時間,哪怕超過了時間也不叫做死信

期間一直存在延時交換機的硬存里,延遲消息插件內部會維護一個本地數(shù)據(jù)庫表,同時使用 Elang Timers 功能實現(xiàn)計時。如果消息的延遲時間設置較長,可能會導致堆積的延遲消息非常多,會帶來較大的CPU開銷,同時延遲消息的時間會存在誤差。

(4)消息堆積如何解決

死信的成因還可能是堆疊過多

我在實際的開發(fā)中,沒遇到過這種情況,不過,如果發(fā)生了堆積的問題,解決方案也所有很多的

  • 提高消費者的消費能力 ,可以使用多線程消費任務
  • 增加更多消費者,提高消費速度,使用工作隊列模式, 設置多個消費者消費消費同一個隊列中的消息
  • 擴大隊列容積,提高堆積上限

但是,RabbitMQ 隊列占的是內存,間接性的落盤,提高上限最終的結果很有可能就是反復落庫,特別不穩(wěn)定,且并沒有解決消息堆積過多的問題

我們可以使用 RabbitMQ 惰性隊列,惰性隊列的好處主要是

  • 接收到消息后直接存入磁盤而非內存,雖然慢,但沒有間歇性的 page-out,性能比較穩(wěn)定
  • 消費者要消費消息時才會從磁盤中讀取并加載到內存,正常消費后就刪除了
  • 基于磁盤存儲,消息上限高,支持數(shù)百萬條的消息存儲

聲明方式:

而要設置一個隊列為惰性隊列,只需要在聲明隊列時,指定x-queue-mode屬性為lazy即可。可以通過命令行將一個運行中的隊列修改為惰性隊列:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解讀:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一個策略
  • Lazy :策略名稱,可以自定義
  • "^lazy-queue$" :用正則表達式匹配隊列的名字
  • '{"queue-mode":"lazy"}' :設置隊列模式為lazy模式
  • --apply-to queues :策略的作用對象,是所有的隊列

x-queue-mode 參數(shù)的值為 lazy

@RabbitListener(bindings = @QueueBinding(
    exchange = @Exchange(name = "xxx"),
    value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-mode", value = "lazy")),
    key = "xxx"
))

交換機、隊列擴展屬性叫參數(shù),消息的拓展屬性叫頭部,擴展屬性一般都以 x- 開頭(extra)

消息堆積問題的解決方案?

  • 隊列上綁定多個消費者,提高消費速度
  • 使用惰性隊列,可以再mq中保存更多消息

惰性隊列的優(yōu)點有哪些?

  • 基于磁盤存儲,消息上限高
  • 沒有間歇性的 page-out,性能比較穩(wěn)定

惰性隊列的缺點有哪些?

  • 基于磁盤存儲,消息時效性會降低
  • 性能受限于磁盤的IO

(5)高可用如何保證

RabbitMQ 在服務大規(guī)模項目時,一般情況下不會像數(shù)據(jù)庫那樣存儲的瓶頸,用惰性隊列已經(jīng)是很頂天了的,其特性和用途不會有太極端的存儲壓力

更多的是在并發(fā)情況下,處理消息的能力有瓶頸,可能出現(xiàn)節(jié)點宕機的情況,而避免單節(jié)點宕機,數(shù)據(jù)丟失、無法提供服務等問題需要解決,也就是需要保證高可用性

Erlang 是一種面向并發(fā)的語言,天然支持集群模式,RabbitMQ 的集群有兩種模式:

  • 普通集群:是一種分布式集群,將隊列分散到集群的各個節(jié)點,從而提高整個集群的并發(fā)能力
  • 鏡像集群:是一種主從集群,在普通集群的基礎上,添加了主從備份的功能,提高集群的數(shù)據(jù)可用性

鏡像集群雖然支持主從,但主從同步并不是強一致的,某些情況下可能有數(shù)據(jù)丟失的風險(雖然重啟能解決,但那不是強一致,而是最終一致),因此 RabbitMQ 3.8 以后,推出了新的功能:仲裁隊列來代替鏡像集群,底層采用 Raft 協(xié)議確保主從的數(shù)據(jù)一致性

1、普通集群

  • 各個節(jié)點之間,實時同步 MQ 元數(shù)據(jù)(一些靜態(tài)的共享的數(shù)據(jù)):
  • 交換機的信息隊列的信息

但不包括隊列中的消息(動態(tài)的數(shù)據(jù)不同步)

監(jiān)聽隊列的時候,如果監(jiān)聽的節(jié)點不存在該隊列(只是知道元數(shù)據(jù)),當前節(jié)點會訪問隊列所在的節(jié)點,該節(jié)點返回數(shù)據(jù)到當前節(jié)點并返回給監(jiān)聽者

隊列所在節(jié)點宕機,隊列中的消息就會“丟失”(是在重啟之前,這個消息就消失無法被處理的意思)

如何部署,上網(wǎng)搜搜就行

2、鏡像集群

各個節(jié)點之間,實時同步 MQ 元數(shù)據(jù)(一些靜態(tài)的共享的數(shù)據(jù)):

  • 交換機的信息
  • 隊列的信息

本質是主從模式,創(chuàng)建隊列的節(jié)點為主節(jié)點,其他節(jié)點為鏡像節(jié)點,隊列中的消息會從主節(jié)點備份到鏡像節(jié)點中

注意

  • 像 Redis 那樣的主從集群,同步都是全部同步來著
  • 但 RabbitMQ 集群的主從模式比較特別,他的粒度是隊列,而不是全部

也就是說,一個隊列的主節(jié)點,可能是另一個隊列的鏡像節(jié)點,所以分析某個場景的時候,要確認是哪個隊列,單獨進行觀察分析討論

  • 不同隊列之間只有交互,不會相互影響數(shù)據(jù)同步

針對某一個隊列,所有寫操作都在主節(jié)點完成,然后同步給鏡像節(jié)點,讀操作任何一個都 ok

主節(jié)點宕機,鏡像節(jié)成為新的主節(jié)點

鏡像集群有三種模式:

  • exactly 準確模式,指定副本數(shù) count = 主節(jié)點數(shù) 1 + 鏡像節(jié)點數(shù),集群會盡可能的維護這個數(shù)值,如果鏡像節(jié)點出現(xiàn)故障,就在另一個節(jié)點上創(chuàng)建鏡像,比較建議這種模式,可以設置為 N/2 + 1
  • all 全部模式,count = N,主節(jié)點外全部都是鏡像節(jié)點
  • nodes 模式,指定鏡像節(jié)點名稱列表,隨機一個作為主節(jié)點,如果列表里的節(jié)點都不存在或不可用,則創(chuàng)建隊列時的節(jié)點作為主節(jié)點,之后訪問集群,列表中的節(jié)點若存在才會創(chuàng)建鏡像節(jié)點

沒有鏡像節(jié)點其實就相當于普通模式了

如何配置上網(wǎng)搜搜就行,比較麻煩,需要設置策略,以及匹配的隊列(不同隊列分開來討論,可以設置不同的策略)

3、仲裁隊列

RabbitMQ 3.8 以后,推出了新的功能仲裁隊列來

  • 代替鏡像集群,都是主從模式,支持主從數(shù)據(jù)同步,默認是 exactly count = 5
  • 約定大于配置,使用非常簡單沒有復雜的配置,隊列的類型選擇 Quorum 即可
  • 底層采用 Raft 協(xié)議確保主從的數(shù)據(jù)強一致性

Spring Boot 配置:

仲裁隊列聲明:

@RabbitListener(bindings = @QueueBinding(
    exchange = @Exchange(name = "xxx"),
    value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-type", value = "quorum")),
    key = "xxx"
))

隊列不聲明默認就是普通集群,這里聲明的仲裁隊列也只是針對一個隊列

(6)消息重復消費問題

在保證MQ消息不重復的情況下,MQ 的一條消息被消費者消費了多次

消費者消費消息成功后,在給MQ發(fā)送消息確認的時候出現(xiàn)了網(wǎng)絡異?;蛘呤欠斟礄C,MQ 遲遲沒有接收到 ACK 也沒有 NACK,此時 MQ 不會將發(fā)送的消息刪除,按兵不動,消費者重新監(jiān)聽或者有其他消費者的時候,交由它消費,而這條消息如果在之前就消費過了的話,則會導致重復消費

解決方案:

  • 消息消費的業(yè)務本身具有冪等性,再次處理相同消息時不會產生副作用,一些時候可能需要用到分布式鎖去維護冪等性
    • 比如一個訂單的狀態(tài)設置為結束,那重復消費的結果一致
  • 記錄消息的唯一標識,如果消費過了的,則不再消費
    • 消費成功將 id 緩存起來,消費時查詢緩存里是否有這條消息
    • 設置允許的緩存時間時,你不必想得太極端,一般很快就有消費者繼續(xù)監(jiān)聽拿到消息,哪怕真有那個情況,這里帶來的損失大概率可以忽略不記了,一切要結合實際情況!

有時候兩種方案沒有嚴格的界定

到此這篇關于Spring3 中 RabbitMQ 的使用與常見場景的文章就介紹到這了,更多相關Spring3 RabbitMQ 使用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • SpringCloud遠程服務調用實戰(zhàn)筆記

    SpringCloud遠程服務調用實戰(zhàn)筆記

    本文給大家介紹SpringCloud遠程服務調用實戰(zhàn)筆記,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2021-11-11
  • SpringCloud?OpenFeign概述與使用教程

    SpringCloud?OpenFeign概述與使用教程

    OpenFeign源于Netflix的Feign,是http通信的客戶端。屏蔽了網(wǎng)絡通信的細節(jié),直接面向接口的方式開發(fā),讓開發(fā)者感知不到網(wǎng)絡通信細節(jié)。所有遠程調用,都像調用本地方法一樣完成
    2023-02-02
  • Java TCP協(xié)議通信超詳細講解

    Java TCP協(xié)議通信超詳細講解

    TCP/IP是一種面向連接的、可靠的、基于字節(jié)流的傳輸層通信協(xié)議,它會保證數(shù)據(jù)不丟包、不亂序。TCP全名是Transmission Control Protocol,它是位于網(wǎng)絡OSI模型中的第四層
    2022-09-09
  • SpringBoot整合MongoDB實現(xiàn)文檔存儲功能

    SpringBoot整合MongoDB實現(xiàn)文檔存儲功能

    MongoDB是可以應用于各種規(guī)模的企業(yè)、各個行業(yè)以及各類應用程序的開源數(shù)據(jù)庫,本文將結合MongoDB和SpringBoot實現(xiàn)文檔存儲功能,需要的可以參考下
    2024-12-12
  • 如何在JDK 9中更簡潔使用 try-with-resources 語句

    如何在JDK 9中更簡潔使用 try-with-resources 語句

    本文詳細介紹了自 JDK 7 引入的 try-with-resources 語句的原理和用法,以及介紹了 JDK 9 對 try-with-resources 的改進,使得用戶可以更加方便、簡潔的使用 try-with-resources 語句。,需要的朋友可以參考下
    2019-06-06
  • JAVA PDF操作之實現(xiàn)截取N頁和多個PDF合并

    JAVA PDF操作之實現(xiàn)截取N頁和多個PDF合并

    這篇文章主要為大家詳細介紹了java關于PDF的一些操作,例如截取N頁并生成新文件,轉圖片以及多個PDF合并,文中的示例代碼講解詳細,感興趣的可以了解下
    2025-01-01
  • 關于@JsonProperty,@NotNull,@JsonIgnore的具體使用

    關于@JsonProperty,@NotNull,@JsonIgnore的具體使用

    這篇文章主要介紹了關于@JsonProperty,@NotNull,@JsonIgnore的具體使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-08-08
  • java中的三種取整函數(shù)總結

    java中的三種取整函數(shù)總結

    下面小編就為大家?guī)硪黄猨ava中的三種取整函數(shù)總結。希望對大家有所幫助。一起跟隨小編過來看看吧,祝大家游戲愉快哦
    2016-11-11
  • java單元測試JUnit框架原理與用法實例教程

    java單元測試JUnit框架原理與用法實例教程

    這篇文章主要介紹了java單元測試JUnit框架原理與用法,結合實例形式較為詳細的分析了java單元測試JUnit框架的概念、原理、使用方法及相關注意事項,需要的朋友可以參考下
    2017-11-11
  • IDEA Debug模式下改變各類型變量值的方法

    IDEA Debug模式下改變各類型變量值的方法

    這篇文章主要介紹了IDEA Debug模式下改變各類型變量值的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-04-04

最新評論