springboot基于注解實(shí)現(xiàn)去重表消息防止重復(fù)消費(fèi)
1. 背景/問題
在分布式系統(tǒng)中,消息隊(duì)列(如RocketMQ、Kafka)的 消息重復(fù)消費(fèi) 是常見問題,主要原因包括:
- 網(wǎng)絡(luò)抖動(dòng):生產(chǎn)者或消費(fèi)者因網(wǎng)絡(luò)不穩(wěn)定觸發(fā)消息重發(fā)。
- 消費(fèi)者超時(shí):消費(fèi)者處理時(shí)間過長(zhǎng),消息隊(duì)列誤判為失敗并重新投遞。
- 集群故障轉(zhuǎn)移:消費(fèi)者宕機(jī)后,未完成的消息會(huì)被其他節(jié)點(diǎn)重新拉取。
重復(fù)消費(fèi)帶來的問題:
- 業(yè)務(wù)邏輯多次執(zhí)行(如重復(fù)扣款、重復(fù)生成訂單)。
- 數(shù)據(jù)一致性被破壞(如庫(kù)存超賣、積分累加錯(cuò)誤)。
- 系統(tǒng)資源浪費(fèi),影響性能和穩(wěn)定性。
為了避免這種情況發(fā)生,需要在客戶端實(shí)現(xiàn)一些機(jī)制來確保消息不會(huì)被重復(fù)消費(fèi),例如記錄消費(fèi)者已經(jīng)處理的消息 ID、使用分布式鎖來控制消費(fèi)進(jìn)程的唯一性等。這些機(jī)制能夠保證消息被成功處理,同時(shí)也能夠提高系統(tǒng)的可靠性和穩(wěn)定性。
2. 什么是冪等性
冪等性 是指對(duì)同一操作的多次執(zhí)行所產(chǎn)生的影響與一次執(zhí)行的影響相同。
- 消息消費(fèi)場(chǎng)景:無論消息被消費(fèi)多少次,最終結(jié)果應(yīng)與消費(fèi)一次一致。
- 實(shí)現(xiàn)目標(biāo):通過冪等設(shè)計(jì),確保業(yè)務(wù)邏輯的重復(fù)執(zhí)行不會(huì)產(chǎn)生副作用。
3. 冪等設(shè)計(jì)
核心思路
- 冪等標(biāo)識(shí):為每條消息生成唯一標(biāo)識(shí)(如業(yè)務(wù)ID + 消息ID),記錄其處理狀態(tài)。
- 狀態(tài)管理:通過數(shù)據(jù)庫(kù)或Redis維護(hù)冪等標(biāo)識(shí)的狀態(tài)(如“消費(fèi)中”“已消費(fèi)”)。
- 過期時(shí)間:防止因系統(tǒng)崩潰導(dǎo)致狀態(tài)長(zhǎng)期滯留,需設(shè)置合理的超時(shí)時(shí)間(如10分鐘)。
[消費(fèi)者接收消息]
│
▼
[解析消息,生成唯一冪等標(biāo)識(shí)]
│
▼
[查詢冪等標(biāo)識(shí)狀態(tài)]
│
┌───────┴───────┐
│ 存在且已消費(fèi) │ [返回成功,丟棄消息]
└───────┬───────┘
│
┌───────┴───────┐
│ 存在且消費(fèi)中 │ [延遲消費(fèi),等待重試]
└───────┬───────┘
│
┌───────┴───────┐
│ 不存在 │
└───────┬───────┘
│
[設(shè)置冪等標(biāo)識(shí)為“消費(fèi)中”,并設(shè)置過期時(shí)間]
│
▼
[執(zhí)行業(yè)務(wù)邏輯]
│
▼
[業(yè)務(wù)執(zhí)行成功?]
│
┌───────┴───────┐
│ 是 │ [更新標(biāo)識(shí)為“已消費(fèi)”]
│ │ [刪除或保留標(biāo)識(shí)]
└───────┬───────┘
│
┌───────┴───────┐
│ 否 │ [刪除標(biāo)識(shí),允許重試]
└───────┬───────┘
│
▼
[流程結(jié)束]
4.抽象通用冪等組件
消息防重復(fù)消費(fèi)冪等組件是通用的通常會(huì)提取出來也可供其他模塊/服務(wù) 使用
4.1自定義冪等注解
提供了一種通用的冪等注解,并通過 SpEL 的形式生成去重表全局唯一 Key
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoMQDuplicateConsume {
/**
* 設(shè)置防重令牌 Key 前綴
*/
String keyPrefix() default "";
/**
* 通過 SpEL 表達(dá)式生成的唯一 Key
*/
String key();
/**
* 設(shè)置防重令牌 Key 過期時(shí)間,單位秒,默認(rèn) 1 小時(shí)
*/
long keyTimeout() default 3600L;
}
4.2. 定義冪等枚舉
冪等需要設(shè)置兩個(gè)狀態(tài),消費(fèi)中和已消費(fèi),創(chuàng)建對(duì)應(yīng)的枚舉
@RequiredArgsConstructor
public enum IdempotentMQConsumeStatusEnum {
/**
* 消費(fèi)中
*/
CONSUMING("0"),
/**
* 已消費(fèi)
*/
CONSUMED("1");
@Getter
private final String code;
/**
* 如果消費(fèi)狀態(tài)等于消費(fèi)中,返回失敗
*
* @param consumeStatus 消費(fèi)狀態(tài)
* @return 是否消費(fèi)失敗
*/
public static boolean isError(String consumeStatus) {
return Objects.equals(CONSUMING.code, consumeStatus);
}
}
4.3.通過 AOP 的方式進(jìn)行增強(qiáng)注解
如果說方法上加了注解,會(huì)被這段 AOP 代碼以環(huán)繞增強(qiáng)方式執(zhí)行
@Slf4j
@Aspect
@RequiredArgsConstructor
public final class NoMQDuplicateConsumeAspect {
private final StringRedisTemplate stringRedisTemplate;
private static final String LUA_SCRIPT = """
local key = KEYS[1]
local value = ARGV[1]
local expire_time_ms = ARGV[2]
return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)
""";
/**
* 增強(qiáng)方法標(biāo)記 {@link NoMQDuplicateConsume} 注解邏輯
*/
@Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoMQDuplicateConsume)")
public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {
NoMQDuplicateConsume noMQDuplicateConsume = getNoMQDuplicateConsumeAnnotation(joinPoint);
String uniqueKey = noMQDuplicateConsume.keyPrefix() + SpELUtil.parseKey(noMQDuplicateConsume.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());
String absentAndGet = stringRedisTemplate.execute(
RedisScript.of(LUA_SCRIPT, String.class),
List.of(uniqueKey),
IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),
String.valueOf(TimeUnit.SECONDS.toMillis(noMQDuplicateConsume.keyTimeout()))
);
// 如果不為空證明已經(jīng)有
if (Objects.nonNull(absentAndGet)) {
boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);
log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");
if (errorFlag) {
throw new ServiceException(String.format("消息消費(fèi)者冪等異常,冪等標(biāo)識(shí):%s", uniqueKey));
}
return null;
}
Object result;
try {
// 執(zhí)行標(biāo)記了消息隊(duì)列防重復(fù)消費(fèi)注解的方法原邏輯
result = joinPoint.proceed();
// 設(shè)置防重令牌 Key 過期時(shí)間,單位秒
stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), noMQDuplicateConsume.keyTimeout(), TimeUnit.SECONDS);
} catch (Throwable ex) {
// 刪除冪等 Key,讓消息隊(duì)列消費(fèi)者重試邏輯進(jìn)行重新消費(fèi)
stringRedisTemplate.delete(uniqueKey);
throw ex;
}
return result;
}
/**
* @return 返回自定義防重復(fù)消費(fèi)注解
*/
public static NoMQDuplicateConsume getNoMQDuplicateConsumeAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
return targetMethod.getAnnotation(NoMQDuplicateConsume.class);
}
lua腳本解釋
local key = KEYS[1] # 第一個(gè) Key,即冪等唯一標(biāo)識(shí) uniqueKey
local value = ARGV[1] # 第一個(gè)參數(shù),即初始化冪等消費(fèi)狀態(tài),為消費(fèi)中
local expire_time_ms = ARGV[2] # 第二個(gè)參數(shù),即冪等 Key 過期時(shí)間
return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)
該腳本的主要作用是:在 Redis 中嘗試以 NX 方式設(shè)置一個(gè)鍵,即如果鍵不存在,則設(shè)置新值,并返回設(shè)置之前的舊值,同時(shí)為該鍵設(shè)置過期時(shí)間(以毫秒為單位)。
獲取到 Redis 里面的 Key 值后,可能會(huì)有三個(gè)流程執(zhí)行:
absentAndGet為空:代表消息是第一次到達(dá),執(zhí)行完 LUA 腳本后,會(huì)在 Redis 設(shè)置 Key 的 Value 值為 0,消費(fèi)中狀態(tài)。absentAndGet為 0:代表已經(jīng)有相同消息到達(dá)并且還沒有處理完,會(huì)通過拋異常的形式讓 RocketMQ 重試。absentAndGet為 1:代表已經(jīng)有相同消息消費(fèi)完成,返回空表示不執(zhí)行任何處理。
4.4.注冊(cè)為 Spring Bean
另外可以看看另一篇基于分布式鎖注解防重復(fù)提交
https://blog.csdn.net/sjsjsbbsbsn/article/details/145131305?spm=1001.2014.3001.5501
public class IdempotentConfiguration {
/**
* 防止消息隊(duì)列消費(fèi)者重復(fù)消費(fèi)消息切面控制器
*/
@Bean
public NoMQDuplicateConsumeAspect noMQDuplicateConsumeAspect(StringRedisTemplate stringRedisTemplate) {
return new NoMQDuplicateConsumeAspect(stringRedisTemplate);
}
}
4.5EL工具類
public class SpELUtil {
/**
* 校驗(yàn)并返回實(shí)際使用的 spEL 表達(dá)式
*
* @param spEl spEL 表達(dá)式
* @return 實(shí)際使用的 spEL 表達(dá)式
*/
public static Object parseKey(String spEl, Method method, Object[] contextObj) {
List<String> spELFlag = ListUtil.of("#", "T(");
Optional<String> optional = spELFlag.stream().filter(spEl::contains).findFirst();
if (optional.isPresent()) {
return parse(spEl, method, contextObj);
}
return spEl;
}
/**
* 轉(zhuǎn)換參數(shù)為字符串
*
* @param spEl spEl 表達(dá)式
* @param contextObj 上下文對(duì)象
* @return 解析的字符串值
*/
public static Object parse(String spEl, Method method, Object[] contextObj) {
DefaultParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer();
ExpressionParser parser = new SpelExpressionParser();
Expression exp = parser.parseExpression(spEl);
String[] params = discoverer.getParameterNames(method);
StandardEvaluationContext context = new StandardEvaluationContext();
if (ArrayUtil.isNotEmpty(params)) {
for (int len = 0; len < params.length; len++) {
context.setVariable(params[len], contextObj[len]);
}
}
return exp.getValue(context);
}
}
5.實(shí)戰(zhàn)使用
使用天機(jī)學(xué)堂項(xiàng)目來進(jìn)行實(shí)戰(zhàn)
5.1寫入common模塊

5.2使用

直接加上注解就可以
但是實(shí)際上這里不存在冪等問題,因?yàn)閡serId和courseId設(shè)置了唯一索引,所以這里不存在冪等性,不需要加上冪等注解
到此這篇關(guān)于springboot基于注解實(shí)現(xiàn)去重表消息防止重復(fù)消費(fèi)的文章就介紹到這了,更多相關(guān)springboot注解防止重復(fù)消費(fèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot 防止重復(fù)請(qǐng)求防止重復(fù)點(diǎn)擊的操作
- SpringBoot?使用AOP?+?Redis?防止表單重復(fù)提交的方法
- SpringBoot+Redis使用AOP防止重復(fù)提交的實(shí)現(xiàn)
- SpringBoot整合redis+Aop防止重復(fù)提交的實(shí)現(xiàn)
- SpringBoot攔截器實(shí)現(xiàn)項(xiàng)目防止接口重復(fù)提交
- SpringBoot中防止接口重復(fù)提交的有效方法
- SpringBoot利用Redis實(shí)現(xiàn)防止訂單重復(fù)提交的解決方案
- SpringBoot整合ShedLock解決定時(shí)任務(wù)防止重復(fù)執(zhí)行的問題
相關(guān)文章
Java中實(shí)現(xiàn)OCR識(shí)別讀取圖片中的文字
圖片內(nèi)容一般無法編輯,如果想要讀取圖片中的文本,我們需要用到OCR工具,本文將介紹如何在Java中實(shí)現(xiàn)OCR識(shí)別讀取圖片中的文字,文中通過代碼示例介紹的非常詳細(xì),需要的朋友可以參考下2024-04-04
java調(diào)用openoffice將office系列文檔轉(zhuǎn)換為PDF的示例方法
本篇文章主要介紹了java使用openoffice將office系列文檔轉(zhuǎn)換為PDF的示例方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-11-11
Java如何使用HTTPclient訪問url獲得數(shù)據(jù)
這篇文章主要介紹了Java使用HTTPclient訪問url獲得數(shù)據(jù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09

