欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java?RabbitMQ消息隊列詳解常見問題

 更新時間:2022年07月28日 11:52:05   作者:kaico2018  
消息隊列是最古老的中間件之一,從系統(tǒng)之間有通信需求開始,就自然產(chǎn)生了消息隊列。本文告訴什么是消息隊列,為什么需要消息隊列,常見的消息隊列有哪些,RabbitMQ的部署和使用

消息堆積

消息堆積的產(chǎn)生場景:

  • 生產(chǎn)者產(chǎn)生的消息速度大于消費者消費的速度。解決:增加消費者的數(shù)量或速度。
  • 沒有消費者進行消費的時候。解決:死信隊列、設置消息有效期。相當于對我們的消息設置有效期,在規(guī)定的時間內如果沒有消費的話,自動過期,過期的時候會執(zhí)行客戶端回調監(jiān)聽的方法將消息存放到數(shù)據(jù)庫表記錄,后期實現(xiàn)補償。

保證消息不丟失

1、生產(chǎn)者使用消息確認機制保證消息百分之百能夠將消息投遞到MQ成功。

2、MQ服務器端應該將消息持久化到硬盤

3、消費者使用手動ack機制確認消息消費成功

如果MQ服務器容量滿了怎么辦?

使用死信隊列將消息存到數(shù)據(jù)庫中去,后期補償消費。

死信隊列

RabbitMQ死信隊列俗稱,備胎隊列;消息中間件因為某種原因拒收該消息后,可以轉移到死信隊列中存放,死信隊列也可以有交換機和路由key等。

產(chǎn)生背景:

  • 消息投遞到MQ中存放 消息已經(jīng)過期
  • 隊列達到最大的長度 (隊列容器已經(jīng)滿了)生產(chǎn)者拒絕接收消息
  • 消費者消費多次消息失敗,就會轉移存放到死信隊列中

代碼案例:

maven依賴

<dependencies>
        <!-- springboot-web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

yml配置

server:
#  服務啟動端口配置
  port: 8081
  servlet:
#    應用訪問路徑
    context-path: /
spring:
  #增加application.druid.yml 的配置文件
#  profiles:
#    active: rabbitmq
  rabbitmq:
    ####連接地址
    host: www.kaicostudy.com
    ####端口號
    port: 5672
    ####賬號
    username: kaico
    ####密碼
    password: kaico
    ### 地址
    virtual-host: /kaicoStudy
###模擬演示死信隊列
kaico:
  dlx:
    exchange: kaico_order_dlx_exchange
    queue: kaico_order_dlx_queue
    routingKey: kaico.order.dlx
  ###備胎交換機
  order:
    exchange: kaico_order_exchange
    queue: kaico_order_queue
    routingKey: kaico.order

隊列配置類

@Configuration
public class DeadLetterMQConfig {
    /**
     * 訂單交換機
     */
    @Value("${kaico.order.exchange}")
    private String orderExchange;
    /**
     * 訂單隊列
     */
    @Value("${kaico.order.queue}")
    private String orderQueue;
    /**
     * 訂單路由key
     */
    @Value("${kaico.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交換機
     */
    @Value("${kaico.dlx.exchange}")
    private String dlxExchange;
    /**
     * 死信隊列
     */
    @Value("${kaico.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${kaico.dlx.routingKey}")
    private String dlxRoutingKey;
    /**
     * 聲明死信交換機
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    /**
     * 聲明死信隊列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    /**
     * 聲明訂單業(yè)務交換機
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    /**
     * 綁定死信隊列到死信交換機
     *
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }
    /**
     * 聲明訂單隊列,并且綁定死信隊列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 訂單隊列綁定我們的死信交換機
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    /**
     * 綁定訂單隊列到訂單交換機
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
}

死信隊列消費者

@Component
public class OrderDlxConsumer {
    /**
     * 死信隊列監(jiān)聽隊列回調的方法
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_dlx_queue")
    public void orderDlxConsumer(String msg) {
        System.out.println("死信隊列消費訂單消息" + msg);
    }
}

普通隊列消費者

@Component
public class OrderConsumer {
    /**
     * 監(jiān)聽隊列回調的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_queue")
    public void orderConsumer(String msg) {
        System.out.println("正常訂單消費者消息msg:" + msg);
    }
}

后臺隊列管理頁面如下:

部署方式:死信隊列不能夠和正常隊列存在同一個服務器中,應該分服務器存放。

延遲隊列

訂單30分鐘未支付,系統(tǒng)自動超時關閉的實現(xiàn)方案。

基于任務調度實現(xiàn),效率是非常低。

基于redis過期key實現(xiàn),key失效時會回調客戶端一個方法。

用戶下單的時候,生成一個令牌(有效期)30分鐘,存放到我們redis;缺點:非常冗余,會在表中存放一個冗余字段。

基于mq的延遲隊列(最佳方案)rabbitmq情況下。

原理:在我們下單的時候,往mq投遞一個消息設置有效期為30分鐘,但該消息失效的時候(沒有被消費的情況下),執(zhí)行我們客戶端一個方法告訴我們該消息已經(jīng)失效,這時候查詢這筆訂單是否已經(jīng)支付。

實現(xiàn)邏輯:

主要使用死信隊列來實現(xiàn)。

想要的代碼:就是正常的消費者不消費消息,或者沒有正常的消費者,在設置的時間后進入死信隊列中,然后死信消費者實現(xiàn)相應的業(yè)務邏輯。

RabbitMQ消息冪等問題

RabbitMQ消息自動重試機制

當消費者業(yè)務邏輯代碼中,拋出異常自動實現(xiàn)重試 (默認是無數(shù)次重試)

應該對RabbitMQ重試次數(shù)實現(xiàn)限制,比如最多重試5次,每次間隔3s;重試多次還是失敗的情況下,存放到死信隊列或者存放到數(shù)據(jù)庫表中記錄后期人工補償。因為重試失敗次數(shù)之后,隊列會自動刪除這個消息。

消息重試原理: 在重試的過程中,使用aop攔截我們的消費監(jiān)聽方法,也不會打印這個錯誤日志。如果重試多次還是失敗,達到最大失敗次數(shù)的時候才會打印錯誤日志。

如果消費多次還是失敗的情況下:

1、自動刪除該消息;(消息可能丟失)

解決辦法:

如果充實多次還是失敗的情況下,最終存放到死信隊列;

采用表日志記,消費失敗錯誤日志的日志記錄,后期人工自動對該消息實現(xiàn)補償。

合理的選擇重試機制

消費者獲取消息后,調用第三方接口(HTTP請求),但是調用第三方接口失敗呢?是否需要重試 ?

答:有時是因為網(wǎng)絡異常調用失敗,應該需要重試幾次。

消費者獲取消息后,應該代碼問題拋出數(shù)據(jù)異常,是否需要重試?

答:不需要重試,代碼異常需要重新修改代碼發(fā)布項目。

消費者開啟手動ack模式

第一步、springboot項目配置需要開啟ack模式

acknowledge-mode: manual

第二步、消費者Java代碼

int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
    // 開啟消息確認機制
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

rabbitMQ如何解決消息冪等問題

什么是消息冪等性?MQ消費者如何保證冪等性?

產(chǎn)生的原因:就是因為消費者可能會開啟自動重試,重試過程中可能會導致消費者業(yè)務邏輯代碼重復執(zhí)行。此刻消息已經(jīng)消費了,因為業(yè)務報錯導致消息重新消費,這時會出現(xiàn)

解決方案:采用消息全局id根據(jù)業(yè)務來定,根據(jù)業(yè)務id(全局唯一id)消費者可以判斷這條消息已經(jīng)消費了。

消費者代碼邏輯:

RabbitMQ解決分布式事務問題

分布式事務:在分布式系統(tǒng)中,因為跨服務調用接口,存在多個不同的事務,每個事務都互不影響。就存在分布式事務的問題。

解決分布式事務核心思想:數(shù)據(jù)最終一致性。

分布式領域中名詞:

強一致性 :要么同步速度非??旎蛘卟捎面i的機制 不允許出現(xiàn)臟讀;

強一致性解決方案:要么數(shù)據(jù)庫A非常迅速的將數(shù)據(jù)同步給數(shù)據(jù)B,或者數(shù)據(jù)庫A沒有同步完成之前數(shù)據(jù)庫B不能夠讀取數(shù)據(jù)。

弱一致性: 允許讀取的數(shù)據(jù)為原來的臟數(shù)據(jù),允許讀取的結果不一致性。

最終一致性: 在我們的分布式系統(tǒng)中,因為數(shù)據(jù)之間同步通過網(wǎng)絡實現(xiàn)通訊,短暫的數(shù)據(jù)延遲是允許的,但是最終數(shù)據(jù)必須要一致性。

基于RabbitMQ解決分布式事務的思路

基于RabbitMQ解決分布式事務的思路:(采用最終一致性的方案)

  • 確認我們的生產(chǎn)者消息一定要投遞到MQ中(消息確認機制)投遞失敗 就繼續(xù)重試
  • 消費者采用手動ack的形式確認消息實現(xiàn)消費 注意冪等性問題,消費失敗的情況下,mq自動幫消費者重試。
  • 保證我們的生產(chǎn)者第一事務先執(zhí)行,如果執(zhí)行失敗采用補單隊列(給生產(chǎn)者自己事務補充,確保生產(chǎn)者第一事務執(zhí)行完成【數(shù)據(jù)最終一致性】)。

解決思路圖:核心是利用mq發(fā)送消息給其他系統(tǒng)將數(shù)據(jù)修改回來。

到此這篇關于Java RabbitMQ詳解常見問題的解決的文章就介紹到這了,更多相關Java RabbitMQ內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

最新評論