RabbitMQ冪等性與優(yōu)先級(jí)及惰性詳細(xì)全面講解
1. 冪等性
概念
用戶對(duì)于同一操作發(fā)起的一次請(qǐng)求或者多次請(qǐng)求的結(jié)果是一致的,不會(huì)因?yàn)槎啻吸c(diǎn)擊而產(chǎn)生了副作用。 舉個(gè)最簡(jiǎn)單的例子,那就是支付,用戶購(gòu)買商品后支付,支付扣款成功,但是返回結(jié)果的時(shí)候網(wǎng)絡(luò)異常, 此時(shí)錢已經(jīng)扣了,用戶再次點(diǎn)擊按鈕,此時(shí)會(huì)進(jìn)行第二次扣款,返回結(jié)果成功,用戶查詢余額發(fā)現(xiàn)多扣錢 了,流水記錄也變成了兩條。在以前的單應(yīng)用系統(tǒng)中,我們只需要把數(shù)據(jù)操作放入事務(wù)中即可,發(fā)生錯(cuò)誤立即回滾,但是再響應(yīng)客戶端的時(shí)候也有可能出現(xiàn)網(wǎng)絡(luò)中斷或者異常等等
消息重復(fù)消費(fèi)
消費(fèi)者在消費(fèi) MQ 中的消息時(shí),MQ 已把消息發(fā)送給消費(fèi)者,消費(fèi)者在給 MQ 返回 ack 時(shí)網(wǎng)絡(luò)中斷, 故 MQ 未收到確認(rèn)信息,該條消息會(huì)重新發(fā)給其他的消費(fèi)者,或者在網(wǎng)絡(luò)重連后再次發(fā)送給該消費(fèi)者,但實(shí)際上該消費(fèi)者已成功消費(fèi)了該條消息,造成消費(fèi)者消費(fèi)了重復(fù)的消息。
解決思路
MQ 消費(fèi)者的冪等性的解決一般使用全局 ID 或者寫個(gè)唯一標(biāo)識(shí)比如時(shí)間戳 或者 UUID 或者訂單消費(fèi)者消費(fèi) MQ 中的消息也可利用 MQ 的該 id 來(lái)判斷,或者可按自己的規(guī)則生成一個(gè)全局唯一 id,每次消費(fèi)消息時(shí)用該 id 先判斷該消息是否已消費(fèi)過(guò)。
消費(fèi)端的冪等性保障
在海量訂單生成的業(yè)務(wù)高峰期,生產(chǎn)端有可能就會(huì)重復(fù)發(fā)生了消息,這時(shí)候消費(fèi)端就要實(shí)現(xiàn)冪等性, 這就意味著我們的消息永遠(yuǎn)不會(huì)被消費(fèi)多次,即使我們收到了一樣的消息。
業(yè)界主流的冪等性有兩種操作:a. 唯一 ID+指紋碼機(jī)制,利用數(shù)據(jù)庫(kù)主鍵去重, b.利用 redis 的原子性去實(shí)現(xiàn)
唯一ID+指紋碼機(jī)制
指紋碼:我們的一些規(guī)則或者時(shí)間戳加別的服務(wù)給到的唯一信息碼,它并不一定是我們系統(tǒng)生成的,基本都是由我們的業(yè)務(wù)規(guī)則拼接而來(lái),但是一定要保證唯一性,然后就利用查詢語(yǔ)句進(jìn)行判斷這個(gè) id 是否存在數(shù)據(jù)庫(kù)中,優(yōu)勢(shì)就是實(shí)現(xiàn)簡(jiǎn)單就一個(gè)拼接,然后查詢判斷是否重復(fù);劣勢(shì)就是在高并發(fā)時(shí),如果是單個(gè)數(shù)據(jù)庫(kù)就會(huì)有寫入性能瓶頸當(dāng)然也可以采用分庫(kù)分表提升性能,但也不是我們最推薦的方式。
note Redis 原子性
利用 redis 執(zhí)行 setnx 命令,天然具有冪等性。從而實(shí)現(xiàn)不重復(fù)消費(fèi)
2. 優(yōu)先級(jí)隊(duì)列
使用場(chǎng)景
在我們系統(tǒng)中有一個(gè)訂單催付的場(chǎng)景,我們的客戶在天貓下的訂單,淘寶會(huì)及時(shí)將訂單推送給我們,如果在用戶設(shè)定的時(shí)間內(nèi)未付款那么就會(huì)給用戶推送一條短信提醒,很簡(jiǎn)單的一個(gè)功能對(duì)吧。
但是,tmall 商家對(duì)我們來(lái)說(shuō),肯定是要分大客戶和小客戶的對(duì)吧,比如像蘋果,小米這樣大商家一年起碼能給我們創(chuàng)造很大的利潤(rùn),所以理應(yīng)當(dāng)然,他們的訂單必須得到優(yōu)先處理,而曾經(jīng)我們的后端系統(tǒng)是使用 redis 來(lái)存放的定時(shí)輪詢,大家都知道 redis 只能用 List 做一個(gè)簡(jiǎn)簡(jiǎn)單單的消息隊(duì)列,并不能實(shí)現(xiàn)一個(gè)優(yōu)先級(jí)的場(chǎng)景,所以訂單量大了后采用 RabbitMQ 進(jìn)行改造和優(yōu)化,如果發(fā)現(xiàn)是大客戶的訂單給一個(gè)相對(duì)比較高的優(yōu)先級(jí), 否則就是默認(rèn)優(yōu)先級(jí)。
如何添加?
控制臺(tái)頁(yè)面添加

隊(duì)列中代碼添加優(yōu)先級(jí)
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
消息中代碼添加優(yōu)先級(jí)
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();
注意事項(xiàng):
要讓隊(duì)列實(shí)現(xiàn)優(yōu)先級(jí)需要做的事情有如下事情:隊(duì)列需要設(shè)置為優(yōu)先級(jí)隊(duì)列,消息需要設(shè)置消息的優(yōu)先級(jí),消費(fèi)者需要等待消息已經(jīng)發(fā)送到隊(duì)列中才去消費(fèi)因?yàn)椋@樣才有機(jī)會(huì)對(duì)消息進(jìn)行排序
實(shí)戰(zhàn)
生產(chǎn)者:
package com.jm.rabbitmq.one;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
/**
* 生產(chǎn)者 :發(fā)消息
*/
public class Producer {
//隊(duì)列名稱
public static final String QUEUE_NAME="hello";
//發(fā)消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Map<String, Object> arguments=new HashMap<>();
arguments.put("x-max-priority",10);//官方允許0-255 此處設(shè)置10 允許優(yōu)先級(jí)范圍為0-10 不要設(shè)置過(guò)大,浪費(fèi)cpu和內(nèi)存
channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
for (int i = 1; i < 11; i++) {
String message="info"+i;
if(i==5){
AMQP.BasicProperties properties=
new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());
}else{
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}
}
System.out.println("消息發(fā)送完畢");
}
}
消費(fèi)者:
package com.jm.rabbitmq.one;
import com.rabbitmq.client.*;
/**
* 消費(fèi)者 :接收消息的
*/
public class Consumer {
//隊(duì)列的名稱
public static final String QUEUE_NAME="hello";
//接收消息
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//聲明 接收消息
DeliverCallback deliverCallback= (consumerTag,message) ->{
System.out.println(new String(message.getBody()));
};
//取消消息時(shí)的回調(diào)
CancelCallback cancelCallback= consumerTag ->{
System.out.println("消息消費(fèi)被中斷");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
3. 惰性隊(duì)列
使用場(chǎng)景
RabbitMQ 從 3.6.0 版本開始引入了惰性隊(duì)列的概念。惰性隊(duì)列會(huì)盡可能的將消息存入磁盤中,而在消費(fèi)者消費(fèi)到相應(yīng)的消息時(shí)才會(huì)被加載到內(nèi)存中,它的一個(gè)重要的設(shè)計(jì)目標(biāo)是能夠支持更長(zhǎng)的隊(duì)列,即支持更多的消息存儲(chǔ)。當(dāng)消費(fèi)者由于各種各樣的原因(比如消費(fèi)者下線、宕機(jī)亦或者是由于維護(hù)而關(guān)閉等)而致使長(zhǎng)時(shí)間內(nèi)不能消費(fèi)消息造成堆積時(shí),惰性隊(duì)列就很有必要了。
默認(rèn)情況下,當(dāng)生產(chǎn)者將消息發(fā)送到 RabbitMQ 的時(shí)候,隊(duì)列中的消息會(huì)盡可能的存儲(chǔ)在內(nèi)存之中, 這樣可以更加快速的將消息發(fā)送給消費(fèi)者。即使是持久化的消息,在被寫入磁盤的同時(shí)也會(huì)在內(nèi)存中駐留一份備份。當(dāng)RabbitMQ 需要釋放內(nèi)存的時(shí)候,會(huì)將內(nèi)存中的消息換頁(yè)至磁盤中,這個(gè)操作會(huì)耗費(fèi)較長(zhǎng)的時(shí)間,也會(huì)阻塞隊(duì)列的操作,進(jìn)而無(wú)法接收新的消息。雖然 RabbitMQ 的開發(fā)者們一直在升級(jí)相關(guān)的算法, 但是效果始終不太理想,尤其是在消息量特別大的時(shí)候。
兩種模式
隊(duì)列具備兩種模式:default 和 lazy。默認(rèn)的為default 模式,在3.6.0 之前的版本無(wú)需做任何變更。lazy 模式即為惰性隊(duì)列的模式,可以通過(guò)調(diào)用 channel.queueDeclare 方法的時(shí)候在參數(shù)中設(shè)置,也可以通過(guò) Policy 的方式設(shè)置,如果一個(gè)隊(duì)列同時(shí)使用這兩種方式設(shè)置的話,那么 Policy 的方式具備更高的優(yōu)先級(jí)。 如果要通過(guò)聲明的方式改變已有隊(duì)列的模式的話,那么只能先刪除隊(duì)列,然后再重新聲明一個(gè)新的。
在隊(duì)列聲明的時(shí)候可以通過(guò)“x-queue-mode”參數(shù)來(lái)設(shè)置隊(duì)列的模式,取值為“default”和“lazy”。下面示例中演示了一個(gè)惰性隊(duì)列的聲明細(xì)節(jié):
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
內(nèi)存開銷對(duì)比

在發(fā)送 1 百萬(wàn)條消息,每條消息大概占 1KB 的情況下,普通隊(duì)列占用內(nèi)存是 1.2GB,而惰性隊(duì)列僅僅 占用 1.5MB
到此這篇關(guān)于RabbitMQ冪等性與優(yōu)先級(jí)及惰性詳細(xì)全面講解的文章就介紹到這了,更多相關(guān)RabbitMQ冪等性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java從源碼看異步任務(wù)計(jì)算FutureTask
這篇文章主要介紹了Java從源碼看異步任務(wù)計(jì)算FutureTask,F(xiàn)utureTask就能夠很好的幫助我們實(shí)現(xiàn)異步計(jì)算,并且可以實(shí)現(xiàn)同步獲取異步任務(wù)的計(jì)算結(jié)果,具體是怎樣實(shí)現(xiàn)的,下面我們就一起來(lái)學(xué)習(xí)下面文章的具體內(nèi)容吧2022-04-04
我用java實(shí)現(xiàn)了王者榮耀的皮膚和英雄技能
上篇文章主要實(shí)現(xiàn)了創(chuàng)建英雄,創(chuàng)建野怪,創(chuàng)建裝備.并且實(shí)現(xiàn)了簡(jiǎn)單的刷怪,購(gòu)買裝備等.本篇文章我優(yōu)化了我的操作界面,并且實(shí)現(xiàn)了英雄技能,英雄皮膚等,需要的朋友可以參考下2021-05-05
基于Java實(shí)現(xiàn)一個(gè)簡(jiǎn)單的單詞本Android App的實(shí)踐
本文基于Java實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的單詞本安卓app,用的是SQLite數(shù)據(jù)庫(kù),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01
Java之定時(shí)器Timer和定時(shí)任務(wù)TimerTask應(yīng)用以及原理解讀
文章介紹了Java JDK自帶的定時(shí)器Timer和定時(shí)任務(wù)TimerTask的使用和原理,Timer和TimerTask成對(duì)出現(xiàn),Timer是定時(shí)器,TimerTask是定時(shí)任務(wù),TimerTask實(shí)現(xiàn)Runnable接口的run方法,Timer的屬性TimerThreadthread繼承Thread2024-12-12
Java雜談之類和對(duì)象 封裝 構(gòu)造方法以及代碼塊詳解
在現(xiàn)實(shí)世界中,真實(shí)存在的東西,比如吉普車,卡丁車,貨車。我們?cè)谡J(rèn)識(shí)它的時(shí)候就會(huì)在腦海中將它抽象為一種類別叫做車。 好了,那再計(jì)算機(jī)世界中,它同樣的也會(huì)這樣做2021-09-09
springboot2學(xué)習(xí)世界著名程序springboot開發(fā)體驗(yàn)
這篇文章主要為大家介紹了世界著名程序springboot開發(fā)體驗(yàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05
Java 靜態(tài)綁定與動(dòng)態(tài)綁定深入分析
這篇文章主要介紹了Java 靜態(tài)綁定與動(dòng)態(tài)綁定深入分析的相關(guān)資料,這里對(duì)java 的動(dòng)態(tài)綁定和靜態(tài)綁定做了詳細(xì)的介紹,對(duì)其進(jìn)行總結(jié)整理,需要的朋友可以參考下2016-11-11
MyBatis-Plus?實(shí)體類注解的實(shí)現(xiàn)示例
MyBatis-Plus作為MyBatis的增強(qiáng)版,提供了一系列實(shí)用的注解,如@TableName、@TableId、@TableField等,旨在簡(jiǎn)化數(shù)據(jù)庫(kù)和Java實(shí)體類之間的映射及CRUD操作,通過(guò)這些注解,開發(fā)者可以輕松實(shí)現(xiàn)表映射、字段映射、邏輯刪除、自動(dòng)填充和樂觀鎖等功能2024-09-09
Java實(shí)現(xiàn)郵件發(fā)送QQ郵箱帶附件
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)郵件發(fā)送QQ郵箱帶附件功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-03-03

