一文吃透消息隊(duì)列RocketMQ實(shí)現(xiàn)消費(fèi)冪等原理
基礎(chǔ)概念
這篇文章,我們聊聊消息隊(duì)列中非常重要的最佳實(shí)踐之一:消費(fèi)冪等。

消費(fèi)冪等是指:當(dāng)出現(xiàn) RocketMQ 消費(fèi)者對(duì)某條消息重復(fù)消費(fèi)的情況時(shí),重復(fù)消費(fèi)的結(jié)果與消費(fèi)一次的結(jié)果是相同的,并且多次消費(fèi)并未對(duì)業(yè)務(wù)系統(tǒng)產(chǎn)生任何負(fù)面影響。
例如,在支付場(chǎng)景下,消費(fèi)者消費(fèi)扣款消息,對(duì)一筆訂單執(zhí)行扣款操作,扣款金額為100元。
如果因網(wǎng)絡(luò)不穩(wěn)定等原因?qū)е驴劭钕⒅貜?fù)投遞,消費(fèi)者重復(fù)消費(fèi)了該扣款消息,但最終的業(yè)務(wù)結(jié)果是只扣款一次,扣費(fèi)100元,且用戶(hù)的扣款記錄中對(duì)應(yīng)的訂單只有一條扣款流水,不會(huì)多次扣除費(fèi)用。那么這次扣款操作是符合要求的,整個(gè)消費(fèi)過(guò)程實(shí)現(xiàn)了消費(fèi)冪等。
適用場(chǎng)景
RocketMQ 消息重復(fù)的場(chǎng)景如下:
發(fā)送時(shí)消息重復(fù)
當(dāng)一條消息已被成功發(fā)送到服務(wù)端并完成持久化,此時(shí)出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶(hù)端宕機(jī),導(dǎo)致服務(wù)端對(duì)客戶(hù)端應(yīng)答失敗。
如果此時(shí)生產(chǎn)者意識(shí)到消息發(fā)送失敗并嘗試再次發(fā)送消息,消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同但 Message ID 不同的消息。
投遞時(shí)消息重復(fù)
消息消費(fèi)的場(chǎng)景下,消息已投遞到消費(fèi)者并完成業(yè)務(wù)處理,當(dāng)客戶(hù)端給服務(wù)端反饋應(yīng)答的時(shí)候網(wǎng)絡(luò)閃斷。為了保證消息至少被消費(fèi)一次,Broker 服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過(guò)的消息,消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同并且 Message ID 也相同的消息。
負(fù)載均衡時(shí)消息重復(fù)(包括但不限于網(wǎng)絡(luò)抖動(dòng)、Broker 重啟以及消費(fèi)者應(yīng)用重啟)
Broker 端或客戶(hù)端重啟、擴(kuò)容或縮容時(shí),會(huì)觸發(fā) Rebalance ,此時(shí)消費(fèi)者可能會(huì)收到少量重復(fù)消息。
業(yè)務(wù)唯一標(biāo)識(shí)
因?yàn)椴煌?Message ID 對(duì)應(yīng)的消息內(nèi)容可能相同,有可能出現(xiàn)沖突(重復(fù))的情況,所以真正安全的冪等處理,不建議以 Message ID 作為處理依據(jù)。
最好的方式是以業(yè)務(wù)唯一標(biāo)識(shí)作為冪等處理的關(guān)鍵依據(jù),消息必須攜帶業(yè)務(wù)唯一標(biāo)識(shí)。
消息攜帶業(yè)務(wù)唯一標(biāo)識(shí)一般來(lái)講有兩種方式:
- 消息 Key 存放業(yè)務(wù)唯一標(biāo)識(shí)
Message msg = new Message(TOPIC /* Topic */,
TAG /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
message.setKey("ORDERID_100"); // 訂單編號(hào)
SendResult sendResult = producer.send(message);
- 消息 body 存放業(yè)務(wù)唯一標(biāo)識(shí)
Message msg = new Message(TOPIC /* Topic */,
TAG /* Tag */,
(JSON.toJSONString(orderDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
message.setKey("ORDERID_100"); // 訂單編號(hào)
SendResult sendResult = producer.send(message);
消費(fèi)者收到消息時(shí),從消息中獲取訂單號(hào)來(lái)實(shí)現(xiàn)消息冪等 :
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt message : msgs) {
// 方法1: 根據(jù)業(yè)務(wù)唯一標(biāo)識(shí)的Key做冪等處理
String orderId = message.getKeys();
// 方法2: 從消息body體重解析出訂單號(hào)
String orderJSON = new String(messageExt.getBody(), "UTF-8");
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
String orderId = orderPO.getId();
// TODO 業(yè)務(wù)處理邏輯
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
冪等策略
1 業(yè)務(wù)狀態(tài)機(jī)判斷
為了保證冪等,一定要做業(yè)務(wù)邏輯判斷,筆者認(rèn)為這是保證冪等的首要條件。
筆者曾經(jīng)服務(wù)于神州專(zhuān)車(chē),乘客在用戶(hù)端點(diǎn)擊立即叫車(chē),訂單服務(wù)創(chuàng)建訂單,首先保存到數(shù)據(jù)庫(kù)后,然后將訂單信息同步保存到緩存中。
在訂單的載客生命周期里,訂單的修改操作先修改緩存,然后發(fā)送消息到> MetaQ ,訂單落盤(pán)服務(wù)消費(fèi)消息,并判斷訂單信息是否正常(比如有無(wú)亂序),若訂單數(shù)據(jù)無(wú)誤,則存儲(chǔ)到數(shù)據(jù)庫(kù)中。
訂單狀態(tài)機(jī)按順序分別是:創(chuàng)建、已分配司機(jī)、司機(jī)已出發(fā)、司機(jī)已到達(dá)、司機(jī)已接到乘客、已到達(dá)。
這種設(shè)計(jì)是為了快速提升系統(tǒng)性能,由于網(wǎng)絡(luò)問(wèn)題有非常小的概率,消費(fèi)者會(huì)收到亂序的消息。
當(dāng)訂單狀態(tài)是司機(jī)已到達(dá)時(shí),消費(fèi)者可能會(huì)收到司機(jī)已出發(fā)的消息,也就是先發(fā)的消息因?yàn)榫W(wǎng)絡(luò)原因被延遲消費(fèi)了。
此時(shí),消費(fèi)者需要判斷當(dāng)前的專(zhuān)車(chē)訂單狀態(tài)機(jī),保存最合理的訂單數(shù)據(jù),就可以忽略舊的消息,打印相關(guān)日志即可。
2 全局處理標(biāo)識(shí)
1 數(shù)據(jù)庫(kù)去重表
數(shù)據(jù)庫(kù)去重表有兩個(gè)要點(diǎn) :
- 操作之前先從去重表中通過(guò)唯一業(yè)務(wù)標(biāo)識(shí)查詢(xún)記錄是否存在,若不存在,則進(jìn)行后續(xù)消費(fèi)流程 ;
- 為了避免并發(fā)場(chǎng)景,去重表需要包含業(yè)務(wù)唯一鍵 uniqueKey , 這樣就算并發(fā)插入也不可能插入多條,插入失敗后,拋異常。
舉一個(gè)電商場(chǎng)景的例子:用戶(hù)購(gòu)物車(chē)結(jié)算時(shí),系統(tǒng)會(huì)創(chuàng)建支付訂單。用戶(hù)支付成功后支付訂單的狀態(tài)會(huì)由未支付修改為支付成功,然后系統(tǒng)給用戶(hù)增加積分。
我們可以使用 RocketMQ 事務(wù)消息的方案,該方案能夠發(fā)揮 MQ 的優(yōu)勢(shì):異步和解耦,以及事務(wù)的最終一致性的特性。
在消費(fèi)監(jiān)聽(tīng)器邏輯里,冪等非常重要 。積分表 SQL 如下:
CREATE TABLE `t_points` ( `id` bigint(20) NOT NULL COMMENT '主鍵', `user_id` bigint(20) NOT NULL COMMENT '用戶(hù)id', `order_id` bigint(20) NOT NULL COMMENT '訂單編號(hào)', `points` int(4) NOT NULL COMMENT '積分', `remarks` varchar(128) COLLATE utf8mb4_bin NOT NULL COMMENT '備注', `create_time` datetime NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `unique_order_Id` (`order_id`) USING BTREE COMMENT '訂單唯一' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
當(dāng)收到訂單信息后,首先判斷該訂單是否有積分記錄,若沒(méi)有記錄,才插入積分記錄。
就算出現(xiàn)極端并發(fā)場(chǎng)景下,訂單編號(hào)也是唯一鍵,數(shù)據(jù)庫(kù)中也必然不會(huì)存在相同訂單的多條積分記錄。
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String orderJSON = new String(messageExt.getBody(), "UTF-8");
logger.info("orderJSON:" + orderJSON);
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
// 首先查詢(xún)是否處理完成
PointsPO pointsPO = pointsMapper.getByOrderId(orderPO.getId());
if (pointsPO == null) {
Long id = SnowFlakeIdGenerator.getUniqueId(1023, 0);
pointsPO = new PointsPO();
pointsPO.setId(id);
pointsPO.setOrderId(orderPO.getId());
pointsPO.setUserId(orderPO.getUserId());
// 添加積分?jǐn)?shù) 30
pointsPO.setPoints(30);
pointsPO.setCreateTime(new Date());
pointsPO.setRemarks("添加積分?jǐn)?shù) 30");
pointsMapper.insert(pointsPO);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
2 Redis處理標(biāo)志位
在消費(fèi)者接收到消息后,首先判斷 Redis 中是否存在該業(yè)務(wù)主鍵的標(biāo)志位,若存在標(biāo)志位,則認(rèn)為消費(fèi)成功,否則,則執(zhí)行業(yè)務(wù)邏輯,執(zhí)行完成后,在緩存中添加標(biāo)志位。
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String bizKey = messageExt.getKeys(); // 唯一業(yè)務(wù)主鍵
//1. 判斷是否存在標(biāo)志
if(redisTemplate.hasKey(RedisKeyConstants.WAITING_SEND_LOCK + bizKey)) {
continue;
}
//2. 執(zhí)行業(yè)務(wù)邏輯
//TODO do business
//3. 設(shè)置標(biāo)志位
redisTemplate.opsForValue().set(RedisKeyConstants.WAITING_SEND_LOCK + bizKey, "1", 72, TimeUnit.HOURS);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
3 分布式鎖
僅僅有業(yè)務(wù)邏輯判斷是不夠的,為了應(yīng)對(duì)并發(fā)場(chǎng)景,我們可以使用分布式鎖。
分布式鎖一般有三種方案:
- 數(shù)據(jù)庫(kù)樂(lè)觀鎖
- 數(shù)據(jù)庫(kù)悲觀鎖
- Redis 鎖
1 數(shù)據(jù)庫(kù)樂(lè)觀鎖
數(shù)據(jù)樂(lè)觀鎖假設(shè)認(rèn)為數(shù)據(jù)一般情況下不會(huì)造成沖突,所以在數(shù)據(jù)進(jìn)行提交更新的時(shí)候,才會(huì)正式對(duì)數(shù)據(jù)的沖突與否進(jìn)行檢測(cè),如果發(fā)現(xiàn)沖突了,則讓返回用戶(hù)錯(cuò)誤的信息,讓用戶(hù)決定如何去做。
由于樂(lè)觀鎖沒(méi)有了鎖等待,提高了吞吐量,所以樂(lè)觀鎖適合讀多寫(xiě)少的場(chǎng)景。
實(shí)現(xiàn)樂(lè)觀鎖:一般是在數(shù)據(jù)表中加上一個(gè)數(shù)據(jù)版本號(hào) version 字段,表示數(shù)據(jù)被修改的次數(shù),當(dāng)數(shù)據(jù)被修改時(shí),version 值會(huì)加一。
當(dāng)線程 A 要更新數(shù)據(jù)值時(shí),在讀取數(shù)據(jù)的同時(shí)也會(huì)讀取version值,在提交更新時(shí),若剛才讀取到的 version 值為當(dāng)前數(shù)據(jù)庫(kù)中的 version 值相等時(shí)才更新,否則重試更新操作,直到更新成功。
步驟 1 : 查詢(xún)出條目數(shù)據(jù)
select version from my_table where id = #{id}
步驟 2 :修改條目數(shù)據(jù),傳遞版本參數(shù)
update my_table set n = n + 1, version = version + 1 where id=#{id} and version = #{version};
從樂(lè)觀鎖的實(shí)現(xiàn)角度來(lái)講,樂(lè)觀鎖非常容易實(shí)現(xiàn),但它有兩個(gè)缺點(diǎn):
- 對(duì)業(yè)務(wù)的侵入性,添加版本字段;
- 高并發(fā)場(chǎng)景下,只有一個(gè)線程可以修改成功,那么就會(huì)存在大量的失敗 。
消費(fèi)端演示代碼如下:
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String orderJSON = new String(messageExt.getBody(), "UTF-8");
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
Long version = orderMapper.selectVersionByOrderId(orderPO.getId()); //版本
orderPO.setVersion(version);
// 對(duì)應(yīng) SQL:update t_order t set version = version + 1 , status = #{status} where id = #{id}
// and version = #{version}
int affectedCount = orderMapper.updateOrder(orderPO);
if(affectedCount == 0) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
2 數(shù)據(jù)庫(kù)悲觀鎖
當(dāng)我們要對(duì)一個(gè)數(shù)據(jù)庫(kù)中的一條數(shù)據(jù)進(jìn)行修改的時(shí)候,為了避免同時(shí)被其他人修改,最好的辦法就是直接對(duì)該數(shù)據(jù)進(jìn)行加鎖以防止并發(fā)。
這種借助數(shù)據(jù)庫(kù)鎖機(jī)制在修改數(shù)據(jù)之前先鎖定,再修改的方式被稱(chēng)之為悲觀并發(fā)控制(又名“悲觀鎖”,Pessimistic Concurrency Control,縮寫(xiě)“PCC”)。
之所以叫做悲觀鎖,是因?yàn)檫@是一種對(duì)數(shù)據(jù)的修改抱有悲觀態(tài)度的并發(fā)控制方式。我們一般認(rèn)為數(shù)據(jù)被并發(fā)修改的概率比較大,所以需要在修改之前先加鎖。
悲觀并發(fā)控制實(shí)際上是**“先取鎖再訪問(wèn)”的保守策略**,為數(shù)據(jù)處理的安全提供了保證。
MySQL 悲觀鎖的使用方法如下:
begin; -- 讀取數(shù)據(jù)并加鎖 select ... for update; -- 修改數(shù)據(jù) update ...; commit;
例如,以下代碼將讀取 t_order 表中 id 為 1 的記錄,并將該記錄的 status 字段修改為 3:
begin; select * from t_order where id = 1 for update; update t_order set status = '3' where id = 1; commit;
如果 t_order 表中 id 為 1 的記錄正在被其他事務(wù)修改,則上述代碼會(huì)等待該記錄被釋放鎖后才能繼續(xù)執(zhí)行。
消費(fèi)端演示代碼如下:
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String orderJSON = new String(messageExt.getBody(), "UTF-8");
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
Long orderId = orderPo.getId();
//調(diào)用service的修改訂單信息,該方法事務(wù)加鎖, 當(dāng)修改訂單記錄時(shí),該其他線程會(huì)等待該記錄被釋放才能繼續(xù)執(zhí)行
orderService.updateOrderForUpdate(orderId ,orderPO);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
3 Redis鎖
使用數(shù)據(jù)庫(kù)鎖是非常重的一個(gè)操作,我們可以使用更輕量級(jí)的 Redis 鎖來(lái)替換,因?yàn)?Redis 性能高,同時(shí)有非常豐富的生態(tài)(類(lèi)庫(kù))支持不同類(lèi)型的分布式鎖。
我們選擇 Redisson 框架提供的分布式鎖功能,簡(jiǎn)化的示例代碼如下:
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String orderJSON = new String(messageExt.getBody(), "UTF-8");
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
Long orderId = orderPo.getId();
RLock lock = redissonClient.getLock("order-lock-" + orderId);
rLock.lock(10, TimeUnit.SECONDS);
// TODO 業(yè)務(wù)邏輯
rLock.unlock();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
總結(jié)
這篇文章,我們?cè)敿?xì)剖析了如何實(shí)現(xiàn) RocketMQ 消費(fèi)冪等。
1、消費(fèi)冪等:當(dāng)出現(xiàn) RocketMQ 消費(fèi)者對(duì)某條消息重復(fù)消費(fèi)的情況時(shí),重復(fù)消費(fèi)的結(jié)果與消費(fèi)一次的結(jié)果是相同的,并且多次消費(fèi)并未對(duì)業(yè)務(wù)系統(tǒng)產(chǎn)生任何負(fù)面影響。
2、適用場(chǎng)景:發(fā)送時(shí)消息重復(fù)、投遞時(shí)消息重復(fù)、負(fù)載均衡時(shí)消息重復(fù)
3、業(yè)務(wù)唯一標(biāo)識(shí):以業(yè)務(wù)唯一標(biāo)識(shí)作為冪等處理的關(guān)鍵依據(jù),消息必須攜帶業(yè)務(wù)唯一標(biāo)識(shí)。
4、冪等策略:業(yè)務(wù)邏輯代碼中需要判斷業(yè)務(wù)狀態(tài)機(jī),同時(shí)根據(jù)實(shí)際條件選擇全局處理標(biāo)識(shí)和分布式鎖兩種方式處理。

以上就是一文吃透消息隊(duì)列RocketMQ實(shí)現(xiàn)消費(fèi)冪等的詳細(xì)內(nèi)容,更多關(guān)于消息隊(duì)列RocketMQ消費(fèi)冪等的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java string類(lèi)的常用方法詳細(xì)介紹
在開(kāi)發(fā)過(guò)程中經(jīng)常會(huì)使用到j(luò)ava string類(lèi)的方法,本文將以此問(wèn)題進(jìn)行詳細(xì)介紹2012-11-11
SpringBoot文件上傳同時(shí)接收復(fù)雜參數(shù)的過(guò)程詳解
這篇文章主要介紹了SpringBoot文件上傳同時(shí),接收復(fù)雜參數(shù),本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-12-12
IDEA中make directory as的作用及說(shuō)明
這篇文章主要介紹了IDEA中make directory as的作用及說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-09-09
SpringBoot整合MongoDB實(shí)現(xiàn)文檔存儲(chǔ)功能
MongoDB是可以應(yīng)用于各種規(guī)模的企業(yè)、各個(gè)行業(yè)以及各類(lèi)應(yīng)用程序的開(kāi)源數(shù)據(jù)庫(kù),本文將結(jié)合MongoDB和SpringBoot實(shí)現(xiàn)文檔存儲(chǔ)功能,需要的可以參考下2024-12-12

