springboot基于注解實現(xiàn)去重表消息防止重復(fù)消費
1. 背景/問題
在分布式系統(tǒng)中,消息隊列(如RocketMQ、Kafka)的 消息重復(fù)消費 是常見問題,主要原因包括:
- 網(wǎng)絡(luò)抖動:生產(chǎn)者或消費者因網(wǎng)絡(luò)不穩(wěn)定觸發(fā)消息重發(fā)。
- 消費者超時:消費者處理時間過長,消息隊列誤判為失敗并重新投遞。
- 集群故障轉(zhuǎn)移:消費者宕機后,未完成的消息會被其他節(jié)點重新拉取。
重復(fù)消費帶來的問題:
- 業(yè)務(wù)邏輯多次執(zhí)行(如重復(fù)扣款、重復(fù)生成訂單)。
- 數(shù)據(jù)一致性被破壞(如庫存超賣、積分累加錯誤)。
- 系統(tǒng)資源浪費,影響性能和穩(wěn)定性。
為了避免這種情況發(fā)生,需要在客戶端實現(xiàn)一些機制來確保消息不會被重復(fù)消費,例如記錄消費者已經(jīng)處理的消息 ID、使用分布式鎖來控制消費進程的唯一性等。這些機制能夠保證消息被成功處理,同時也能夠提高系統(tǒng)的可靠性和穩(wěn)定性。
2. 什么是冪等性
冪等性 是指對同一操作的多次執(zhí)行所產(chǎn)生的影響與一次執(zhí)行的影響相同。
- 消息消費場景:無論消息被消費多少次,最終結(jié)果應(yīng)與消費一次一致。
- 實現(xiàn)目標:通過冪等設(shè)計,確保業(yè)務(wù)邏輯的重復(fù)執(zhí)行不會產(chǎn)生副作用。
3. 冪等設(shè)計
核心思路
- 冪等標識:為每條消息生成唯一標識(如業(yè)務(wù)ID + 消息ID),記錄其處理狀態(tài)。
- 狀態(tài)管理:通過數(shù)據(jù)庫或Redis維護冪等標識的狀態(tài)(如“消費中”“已消費”)。
- 過期時間:防止因系統(tǒng)崩潰導(dǎo)致狀態(tài)長期滯留,需設(shè)置合理的超時時間(如10分鐘)。
[消費者接收消息] │ ▼ [解析消息,生成唯一冪等標識] │ ▼ [查詢冪等標識狀態(tài)] │ ┌───────┴───────┐ │ 存在且已消費 │ [返回成功,丟棄消息] └───────┬───────┘ │ ┌───────┴───────┐ │ 存在且消費中 │ [延遲消費,等待重試] └───────┬───────┘ │ ┌───────┴───────┐ │ 不存在 │ └───────┬───────┘ │ [設(shè)置冪等標識為“消費中”,并設(shè)置過期時間] │ ▼ [執(zhí)行業(yè)務(wù)邏輯] │ ▼ [業(yè)務(wù)執(zhí)行成功?] │ ┌───────┴───────┐ │ 是 │ [更新標識為“已消費”] │ │ [刪除或保留標識] └───────┬───────┘ │ ┌───────┴───────┐ │ 否 │ [刪除標識,允許重試] └───────┬───────┘ │ ▼ [流程結(jié)束]
4.抽象通用冪等組件
消息防重復(fù)消費冪等組件是通用的通常會提取出來也可供其他模塊/服務(wù) 使用
4.1自定義冪等注解
提供了一種通用的冪等注解,并通過 SpEL 的形式生成去重表全局唯一 Key
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface NoMQDuplicateConsume { /** * 設(shè)置防重令牌 Key 前綴 */ String keyPrefix() default ""; /** * 通過 SpEL 表達式生成的唯一 Key */ String key(); /** * 設(shè)置防重令牌 Key 過期時間,單位秒,默認 1 小時 */ long keyTimeout() default 3600L; }
4.2. 定義冪等枚舉
冪等需要設(shè)置兩個狀態(tài),消費中和已消費,創(chuàng)建對應(yīng)的枚舉
@RequiredArgsConstructor public enum IdempotentMQConsumeStatusEnum { /** * 消費中 */ CONSUMING("0"), /** * 已消費 */ CONSUMED("1"); @Getter private final String code; /** * 如果消費狀態(tài)等于消費中,返回失敗 * * @param consumeStatus 消費狀態(tài) * @return 是否消費失敗 */ public static boolean isError(String consumeStatus) { return Objects.equals(CONSUMING.code, consumeStatus); } }
4.3.通過 AOP 的方式進行增強注解
如果說方法上加了注解,會被這段 AOP 代碼以環(huán)繞增強方式執(zhí)行
@Slf4j @Aspect @RequiredArgsConstructor public final class NoMQDuplicateConsumeAspect { private final StringRedisTemplate stringRedisTemplate; private static final String LUA_SCRIPT = """ local key = KEYS[1] local value = ARGV[1] local expire_time_ms = ARGV[2] return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms) """; /** * 增強方法標記 {@link NoMQDuplicateConsume} 注解邏輯 */ @Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoMQDuplicateConsume)") public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable { NoMQDuplicateConsume noMQDuplicateConsume = getNoMQDuplicateConsumeAnnotation(joinPoint); String uniqueKey = noMQDuplicateConsume.keyPrefix() + SpELUtil.parseKey(noMQDuplicateConsume.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs()); String absentAndGet = stringRedisTemplate.execute( RedisScript.of(LUA_SCRIPT, String.class), List.of(uniqueKey), IdempotentMQConsumeStatusEnum.CONSUMING.getCode(), String.valueOf(TimeUnit.SECONDS.toMillis(noMQDuplicateConsume.keyTimeout())) ); // 如果不為空證明已經(jīng)有 if (Objects.nonNull(absentAndGet)) { boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet); log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed"); if (errorFlag) { throw new ServiceException(String.format("消息消費者冪等異常,冪等標識:%s", uniqueKey)); } return null; } Object result; try { // 執(zhí)行標記了消息隊列防重復(fù)消費注解的方法原邏輯 result = joinPoint.proceed(); // 設(shè)置防重令牌 Key 過期時間,單位秒 stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), noMQDuplicateConsume.keyTimeout(), TimeUnit.SECONDS); } catch (Throwable ex) { // 刪除冪等 Key,讓消息隊列消費者重試邏輯進行重新消費 stringRedisTemplate.delete(uniqueKey); throw ex; } return result; } /** * @return 返回自定義防重復(fù)消費注解 */ public static NoMQDuplicateConsume getNoMQDuplicateConsumeAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException { MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes()); return targetMethod.getAnnotation(NoMQDuplicateConsume.class); }
lua腳本解釋
local key = KEYS[1] # 第一個 Key,即冪等唯一標識 uniqueKey local value = ARGV[1] # 第一個參數(shù),即初始化冪等消費狀態(tài),為消費中 local expire_time_ms = ARGV[2] # 第二個參數(shù),即冪等 Key 過期時間 return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)
該腳本的主要作用是:在 Redis 中嘗試以 NX
方式設(shè)置一個鍵,即如果鍵不存在,則設(shè)置新值,并返回設(shè)置之前的舊值,同時為該鍵設(shè)置過期時間(以毫秒為單位)。
獲取到 Redis 里面的 Key 值后,可能會有三個流程執(zhí)行:
absentAndGet
為空:代表消息是第一次到達,執(zhí)行完 LUA 腳本后,會在 Redis 設(shè)置 Key 的 Value 值為 0,消費中狀態(tài)。absentAndGet
為 0:代表已經(jīng)有相同消息到達并且還沒有處理完,會通過拋異常的形式讓 RocketMQ 重試。absentAndGet
為 1:代表已經(jīng)有相同消息消費完成,返回空表示不執(zhí)行任何處理。
4.4.注冊為 Spring Bean
另外可以看看另一篇基于分布式鎖注解防重復(fù)提交
https://blog.csdn.net/sjsjsbbsbsn/article/details/145131305?spm=1001.2014.3001.5501
public class IdempotentConfiguration { /** * 防止消息隊列消費者重復(fù)消費消息切面控制器 */ @Bean public NoMQDuplicateConsumeAspect noMQDuplicateConsumeAspect(StringRedisTemplate stringRedisTemplate) { return new NoMQDuplicateConsumeAspect(stringRedisTemplate); } }
4.5EL工具類
public class SpELUtil { /** * 校驗并返回實際使用的 spEL 表達式 * * @param spEl spEL 表達式 * @return 實際使用的 spEL 表達式 */ public static Object parseKey(String spEl, Method method, Object[] contextObj) { List<String> spELFlag = ListUtil.of("#", "T("); Optional<String> optional = spELFlag.stream().filter(spEl::contains).findFirst(); if (optional.isPresent()) { return parse(spEl, method, contextObj); } return spEl; } /** * 轉(zhuǎn)換參數(shù)為字符串 * * @param spEl spEl 表達式 * @param contextObj 上下文對象 * @return 解析的字符串值 */ public static Object parse(String spEl, Method method, Object[] contextObj) { DefaultParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer(); ExpressionParser parser = new SpelExpressionParser(); Expression exp = parser.parseExpression(spEl); String[] params = discoverer.getParameterNames(method); StandardEvaluationContext context = new StandardEvaluationContext(); if (ArrayUtil.isNotEmpty(params)) { for (int len = 0; len < params.length; len++) { context.setVariable(params[len], contextObj[len]); } } return exp.getValue(context); } }
5.實戰(zhàn)使用
使用天機學(xué)堂項目來進行實戰(zhàn)
5.1寫入common模塊
5.2使用
直接加上注解就可以
但是實際上這里不存在冪等問題,因為userId和courseId設(shè)置了唯一索引,所以這里不存在冪等性,不需要加上冪等注解
到此這篇關(guān)于springboot基于注解實現(xiàn)去重表消息防止重復(fù)消費的文章就介紹到這了,更多相關(guān)springboot注解防止重復(fù)消費內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot 防止重復(fù)請求防止重復(fù)點擊的操作
- SpringBoot?使用AOP?+?Redis?防止表單重復(fù)提交的方法
- SpringBoot+Redis使用AOP防止重復(fù)提交的實現(xiàn)
- SpringBoot整合redis+Aop防止重復(fù)提交的實現(xiàn)
- SpringBoot攔截器實現(xiàn)項目防止接口重復(fù)提交
- SpringBoot中防止接口重復(fù)提交的有效方法
- SpringBoot利用Redis實現(xiàn)防止訂單重復(fù)提交的解決方案
- SpringBoot整合ShedLock解決定時任務(wù)防止重復(fù)執(zhí)行的問題
相關(guān)文章
java調(diào)用openoffice將office系列文檔轉(zhuǎn)換為PDF的示例方法
本篇文章主要介紹了java使用openoffice將office系列文檔轉(zhuǎn)換為PDF的示例方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-11-11Java如何使用HTTPclient訪問url獲得數(shù)據(jù)
這篇文章主要介紹了Java使用HTTPclient訪問url獲得數(shù)據(jù)的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09