欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ 如何解決消息冪等性的問(wèn)題

 更新時(shí)間:2021年07月05日 10:00:00   作者:王小白_Ada  
這篇文章主要介紹了RabbitMQ 如何解決消息冪等性的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

前言

關(guān)于MQ消費(fèi)者的冪等性問(wèn)題,在于MQ的重試機(jī)制,因?yàn)榫W(wǎng)絡(luò)原因或客戶端延遲消費(fèi)導(dǎo)致重復(fù)消費(fèi)。使用MQ重試機(jī)制需要注意的事項(xiàng)以及如何解決消費(fèi)者冪等性問(wèn)題以下將逐一講解。

1. RabbitMQ自動(dòng)重試機(jī)制

消費(fèi)者在消費(fèi)消息的時(shí)候,如果消費(fèi)者業(yè)務(wù)邏輯出現(xiàn)程序異常,這個(gè)時(shí)候我們?nèi)绾翁幚恚?/p>

使用重試機(jī)制,RabbitMQ默認(rèn)開(kāi)啟重試機(jī)制。

實(shí)現(xiàn)原理:

  • @RabbitHandler注解 底層使用Aop攔截,如果程序(消費(fèi)者)沒(méi)有拋出異常,自動(dòng)提交事務(wù)
  • 如果Aop使用異常通知攔截獲取到異常后,自動(dòng)實(shí)現(xiàn)補(bǔ)償機(jī)制,消息緩存在RabbitMQ服務(wù)器端

注意:

  • 默認(rèn)會(huì)一直重試到消費(fèi)者不拋異常為止,這樣顯然不好。我們需要修改重試機(jī)制策略,如間隔3s重試一次)

配置:

spring:
  rabbitmq:
    # 連接地址
    host: 127.0.0.1
    # 端口號(hào)
    port: 5672
    # 賬號(hào)
    username: guest
    # 密碼
    password: guest
    # 地址(類似于數(shù)據(jù)庫(kù)的概念)
    virtual-host: /admin_vhost
    # 消費(fèi)者監(jiān)聽(tīng)相關(guān)配置
    listener:
      simple:
        retry:
          # 開(kāi)啟消費(fèi)者(程序出現(xiàn)異常)重試機(jī)制,默認(rèn)開(kāi)啟并一直重試
          enabled: true
          # 最大重試次數(shù)
          max-attempts: 5
          # 重試間隔時(shí)間(毫秒)
          initial-interval: 3000

2. 如何合理選擇重試機(jī)制?

情況1: 消費(fèi)者獲取到消息后,調(diào)用第三方接口,但接口暫時(shí)無(wú)法訪問(wèn),是否需要重試? 需要重試,可能是因?yàn)榫W(wǎng)絡(luò)原因短暫不能訪問(wèn)

情況2: 消費(fèi)者獲取到消息后,拋出數(shù)據(jù)轉(zhuǎn)換異常,是否需要重試? 不需要重試,因?yàn)閷儆诔绦騜ug需要重新發(fā)布版本

總結(jié):對(duì)于情況2,如果消費(fèi)者代碼拋出異常是需要發(fā)布新版本才能解決的問(wèn)題,那么不需要重試,重試也無(wú)濟(jì)于事。應(yīng)該采用日志記錄+定時(shí)任務(wù)job進(jìn)行健康檢查+人工進(jìn)行補(bǔ)償

3. 調(diào)用第三方接口自動(dòng)實(shí)現(xiàn)補(bǔ)償機(jī)制

我們知道了,RabbitMQ在消費(fèi)者消費(fèi)發(fā)生異常時(shí),會(huì)自動(dòng)進(jìn)行補(bǔ)償機(jī)制,所以我們(消費(fèi)者)在調(diào)用第三方接口時(shí),可以根據(jù)返回結(jié)果判斷是否成功:

  • 成功:正常消費(fèi)
  • 失?。菏謩?dòng)拋處一個(gè)異常,這時(shí)RabbitMQ自動(dòng)給我們做重試 (補(bǔ)償)。

4. 如何解決消費(fèi)者冪等性問(wèn)題

防止重復(fù)消費(fèi) (MQ重試機(jī)制需要注意的問(wèn)題)

產(chǎn)生原因:網(wǎng)絡(luò)延遲傳輸中,消費(fèi)者出現(xiàn)異?;蛘呦M(fèi)者延遲消費(fèi),會(huì)造成進(jìn)行MQ重試補(bǔ)償,在重試過(guò)程中,可能會(huì)造成重復(fù)消費(fèi)。

面試題:MQ中消費(fèi)者如何保證冪等性問(wèn)題,不被重復(fù)消費(fèi)?

在這里插入圖片描述

偽代碼:

生產(chǎn)者核心代碼:

請(qǐng)求頭設(shè)置消息id(messageId)

@Component
public class FanoutProducer {
 @Autowired
 private AmqpTemplate amqpTemplate;

 public void send(String queueName) {
  String msg = "my_fanout_msg:" + System.currentTimeMillis();
  //請(qǐng)求頭設(shè)置消息id(messageId)
  Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
    .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
  System.out.println(msg + ":" + msg);
  amqpTemplate.convertAndSend(queueName, message);
 }
}

消費(fèi)者核心代碼:

@RabbitListener(queues = "fanout_email_queue")
 public void process(Message message) throws Exception {
  // 獲取消息Id
  String messageId = message.getMessageProperties().getMessageId();
  String msg = new String(message.getBody(), "UTF-8");
  //② 判斷唯一Id是否被消費(fèi),消息消費(fèi)成功后將id和狀態(tài)保存在日志表中,我們從(①步驟)表中獲取并判斷messageId的狀態(tài)即可
  //從redis中獲取messageId的value
  String value = redisUtils.get(messageId)+"";
  if(value.equals("1") ){ //表示已經(jīng)消費(fèi)
   return; //結(jié)束
  }
  System.out.println("郵件消費(fèi)者獲取生產(chǎn)者消息" + "messageId:" + messageId + ",消息內(nèi)容:" + msg);
  JSONObject jsonObject = JSONObject.parseObject(msg);
  // 獲取email參數(shù)
  String email = jsonObject.getString("email");
  // 請(qǐng)求地址
  String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
  JSONObject result = HttpClientUtils.httpGet(emailUrl);
  if (result == null) {
   // 因?yàn)榫W(wǎng)絡(luò)原因,造成無(wú)法訪問(wèn),繼續(xù)重試
   throw new Exception("調(diào)用接口失敗!");
  }
  System.out.println("執(zhí)行結(jié)束....");
  //① 執(zhí)行到這里已經(jīng)消費(fèi)成功,我們可以修改messageId的狀態(tài),并存入日志表(可以存到redis中,key為消息Id、value為狀態(tài))
 }

5. SpringBoot整合RabbitMQ應(yīng)答模式(ACK)

1.修改配置simple下添加 acknowledge-mode: manual:

spring:
  rabbitmq:
    # 連接地址
    host: 127.0.0.1
    # 端口號(hào)
    port: 5672
    # 賬號(hào)
    username: guest
    # 密碼
    password: guest
    # 地址(類似于數(shù)據(jù)庫(kù)的概念)
    virtual-host: /admin_vhost
    # 消費(fèi)者監(jiān)聽(tīng)相關(guān)配置
    listener:
      simple:
        retry:
          # 開(kāi)啟消費(fèi)者(程序出現(xiàn)異常)重試機(jī)制,默認(rèn)開(kāi)啟并一直重試
          enabled: true
          # 最大重試次數(shù)
          max-attempts: 5
          # 重試間隔時(shí)間(毫秒)
          initial-interval: 3000
        # 開(kāi)啟手動(dòng)ack
        acknowledge-mode: manual

2.消費(fèi)者增加代碼:

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); 手動(dòng)ack
channel.basicAck(deliveryTag, false);手動(dòng)簽收
//郵件隊(duì)列
@Component
public class FanoutEamilConsumer {
 @RabbitListener(queues = "fanout_email_queue")
 public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
  System.out
    .println(Thread.currentThread().getName() + ",郵件消費(fèi)者獲取生產(chǎn)者消息msg:" + new String(message.getBody(), "UTF-8")
      + ",messageId:" + message.getMessageProperties().getMessageId());
  // 手動(dòng)ack
  Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  // 手動(dòng)簽收
  channel.basicAck(deliveryTag, false);
 }
}

RabbitMQ 如何保證冪等性,數(shù)據(jù)一致性

mq的作用主要是用來(lái)解耦,削峰,異步,

增加MQ,系統(tǒng)的復(fù)雜性也會(huì)增加很多,

也會(huì)帶來(lái)其他的問(wèn)題,比如MQ掛了怎么辦,怎么保持?jǐn)?shù)據(jù)的冪等性

冪等性問(wèn)題通俗點(diǎn)講就是保證數(shù)據(jù)不被重復(fù)消費(fèi),同時(shí)數(shù)據(jù)也不能少,

也就是數(shù)據(jù)一致性問(wèn)題。

下面是MQ丟失的3種情況

rabbitmq-message-lose

1,生產(chǎn)者發(fā)送消息至MQ的數(shù)據(jù)丟失

解決方法:在生產(chǎn)者端開(kāi)啟comfirm 確認(rèn)模式,你每次寫(xiě)的消息都會(huì)分配一個(gè)唯一的 id,

然后如果寫(xiě)入了 RabbitMQ 中,RabbitMQ 會(huì)給你回傳一個(gè) ack 消息,告訴你說(shuō)這個(gè)消息 ok 了

2,MQ收到消息,暫存內(nèi)存中,還沒(méi)消費(fèi),自己掛掉,數(shù)據(jù)會(huì)都丟失

解決方式:MQ設(shè)置為持久化。將內(nèi)存數(shù)據(jù)持久化到磁盤(pán)中

3,消費(fèi)者剛拿到消息,還沒(méi)處理,掛掉了,MQ又以為消費(fèi)者處理完

解決方式:用 RabbitMQ 提供的 ack 機(jī)制,簡(jiǎn)單來(lái)說(shuō),就是你必須關(guān)閉 RabbitMQ 的自動(dòng) ack,可以通過(guò)一個(gè) api 來(lái)調(diào)用就行,然后每次你自己代碼里確保處理完的時(shí)候,再在程序里 ack 一把。這樣的話,如果你還沒(méi)處理完,不就沒(méi)有 ack 了?那 RabbitMQ 就認(rèn)為你還沒(méi)處理完,這個(gè)時(shí)候 RabbitMQ 會(huì)把這個(gè)消費(fèi)分配給別的 consumer 去處理,消息是不會(huì)丟的。

rabbitmq-message-lose-solution

數(shù)據(jù)重復(fù)的問(wèn)題簡(jiǎn)單的多,就是在消費(fèi)端判斷數(shù)據(jù)是否已經(jīng)被消費(fèi)過(guò)

  • 比如你拿個(gè)數(shù)據(jù)要寫(xiě)庫(kù),你先根據(jù)主鍵查一下,如果這數(shù)據(jù)都有了,你就別插入了,update 一下好吧。
  • 比如你是寫(xiě) Redis,那沒(méi)問(wèn)題了,反正每次都是 set,天然冪等性。
  • 比如你不是上面兩個(gè)場(chǎng)景,那做的稍微復(fù)雜一點(diǎn),你需要讓生產(chǎn)者發(fā)送每條數(shù)據(jù)的時(shí)候,里面加一個(gè)全局唯一的 id,類似訂單 id 之類的東西,然后你這里消費(fèi)到了之后,先根據(jù)這個(gè) id 去比如 Redis 里查一下,之前消費(fèi)過(guò)嗎?如果沒(méi)有消費(fèi)過(guò),你就處理,然后這個(gè) id 寫(xiě) Redis。如果消費(fèi)過(guò)了,那你就別處理了,保證別重復(fù)處理相同的消息即可。
  • 比如基于數(shù)據(jù)庫(kù)的唯一鍵來(lái)保證重復(fù)數(shù)據(jù)不會(huì)重復(fù)插入多條。因?yàn)橛形ㄒ绘I約束了,重復(fù)數(shù)據(jù)插入只會(huì)報(bào)錯(cuò),不會(huì)導(dǎo)致數(shù)據(jù)庫(kù)中出現(xiàn)臟數(shù)據(jù)。

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • 詳解Java中接口的定義與實(shí)例代碼

    詳解Java中接口的定義與實(shí)例代碼

    這篇文章主要介紹了詳解Java中接口的定義與實(shí)例代碼的相關(guān)資料,需要的朋友可以參考下
    2017-03-03
  • 詳解Java中的內(nèi)存屏障

    詳解Java中的內(nèi)存屏障

    這篇文章主要介紹了Java中的內(nèi)存屏障的相關(guān)資料,幫助大家更好的理解和學(xué)習(xí)使用Java,感興趣的朋友可以了解下
    2021-05-05
  • 快速了解Spring Boot

    快速了解Spring Boot

    這篇文章主要介紹了快速了解Spring Boot,介紹了其環(huán)境準(zhǔn)備,URL中的變量以及模板渲染等內(nèi)容,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-11-11
  • java連接zookeeper的3種方式小結(jié)

    java連接zookeeper的3種方式小結(jié)

    這篇文章主要介紹了java連接zookeeper的3種方式小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • java高并發(fā)的用戶線程和守護(hù)線程詳解

    java高并發(fā)的用戶線程和守護(hù)線程詳解

    本篇文章主要介紹了淺談java中守護(hù)線程與用戶線程,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2021-10-10
  • Java讀寫(xiě)txt文件代碼實(shí)例

    Java讀寫(xiě)txt文件代碼實(shí)例

    這篇文章主要給大家介紹了關(guān)于Java讀寫(xiě)txt文件的相關(guān)資料,近期處理的數(shù)據(jù)規(guī)模比較大,正好又是統(tǒng)計(jì)合并的事情,想著借助excel就可以完成了,然后就了解了下java讀取excel的事情,需要的朋友可以參考下
    2023-09-09
  • SpringBoot集成整合JWT與Shiro流程詳解

    SpringBoot集成整合JWT與Shiro流程詳解

    安全管理是軟件系統(tǒng)必不可少的的功能。根據(jù)經(jīng)典的“墨菲定律”——凡是可能,總會(huì)發(fā)生。如果系統(tǒng)存在安全隱患,最終必然會(huì)出現(xiàn)問(wèn)題,這篇文章主要介紹了SpringBoot集成JWT、Shiro框架的使用
    2022-12-12
  • vue3實(shí)現(xiàn)一個(gè)todo-list

    vue3實(shí)現(xiàn)一個(gè)todo-list

    這篇文章主要為大家詳細(xì)介紹了基于vuejs實(shí)現(xiàn)一個(gè)todolist項(xiàng)目,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能給你帶來(lái)幫助
    2021-08-08
  • Session過(guò)期后實(shí)現(xiàn)自動(dòng)跳轉(zhuǎn)登錄頁(yè)面

    Session過(guò)期后實(shí)現(xiàn)自動(dòng)跳轉(zhuǎn)登錄頁(yè)面

    這篇文章主要介紹了Session過(guò)期后實(shí)現(xiàn)自動(dòng)跳轉(zhuǎn)登錄頁(yè)面,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-12-12
  • Spring Boot 啟動(dòng)失敗:循環(huán)依賴排查到懶加載配置的過(guò)程解析

    Spring Boot 啟動(dòng)失敗:循環(huán)依賴排查到懶加載配置的過(guò)程解析

    本文我將從一個(gè)真實(shí)的生產(chǎn)環(huán)境故障案例出發(fā),帶你深入了解Spring Boot循環(huán)依賴的檢測(cè)機(jī)制、排查方法和解決方案,通過(guò)系統(tǒng)性分析和實(shí)戰(zhàn)演練幫助掌握如何在復(fù)雜的應(yīng)用中處理循環(huán)依賴,感興趣的朋友跟隨小編一起看看吧
    2025-08-08

最新評(píng)論