高并發(fā)下Redis精確計數(shù)與時間窗口過期的方法詳解
引言
在實時數(shù)據(jù)處理系統(tǒng)中,我們經(jīng)常需要統(tǒng)計某個事件在特定時間窗口內(nèi)的發(fā)生次數(shù),例如:
- 統(tǒng)計用戶每小時訪問次數(shù)
- 限制設(shè)備每分鐘請求頻率
- 廣告曝光按小時去重計數(shù)
這類需求通常面臨兩個核心挑戰(zhàn):
- 高并發(fā)計數(shù):多臺服務(wù)器同時讀寫同一個計數(shù)器
- 精確時間窗口:數(shù)據(jù)到點自動過期,避免累積
本文將詳細(xì)介紹如何基于 Redis 實現(xiàn)高性能、高可用的計數(shù)方案,并提供完整的Java代碼實現(xiàn)。
一、Redis計數(shù)方案選型
1.1 為什么選擇Redis
方案 | QPS | 數(shù)據(jù)一致性 | 實現(xiàn)復(fù)雜度 |
---|---|---|---|
數(shù)據(jù)庫+事務(wù) | ~1K | 強一致 | 高 |
本地緩存 | ~100K | 最終一致 | 中 |
Redis原子操作 | 50K+ | 強一致 | 低 |
Redis的單線程模型天然適合計數(shù)場景,提供INCR/INCRBY等原子命令。
1.2 Key設(shè)計原則
// 格式:業(yè)務(wù)前綴:appId:deviceId:ip:時間窗口 String key = "flow:count:app123:device456:127.0.0.1:2023080117";
- 包含所有維度信息
- 時間窗口按小時切分(可調(diào)整)
- 添加業(yè)務(wù)前綴避免沖突
二、基礎(chǔ)實現(xiàn)方案
2.1 簡單INCRBY實現(xiàn)
public void incrementCount(String key, int delta) { redisTemplate.opsForValue().increment(key, delta); }
問題:沒有過期時間,會導(dǎo)致數(shù)據(jù)無限堆積
2.2 增加過期時間
public void incrementWithExpire(String key, int delta, long ttlSeconds) { redisTemplate.opsForValue().increment(key, delta); redisTemplate.expire(key, ttlSeconds, TimeUnit.SECONDS); }
新問題:每次操作都設(shè)置TTL,造成冗余Redis調(diào)用
三、優(yōu)化方案:精準(zhǔn)TTL控制
3.1 判斷Key是否首次寫入
我們需要確保TTL只在Key創(chuàng)建時設(shè)置一次,兩種實現(xiàn)方式:
方案A:Lua腳本(推薦)
private static final String LUA_SCRIPT = "local current = redis.call('INCRBY', KEYS[1], ARGV[1])\n" + "if current == tonumber(ARGV[1]) then\n" + " redis.call('EXPIRE', KEYS[1], ARGV[2])\n" + "end\n" + "return current"; public Long incrementAtomically(String key, int delta, long ttl) { return redisTemplate.execute( new DefaultRedisScript<>(LUA_SCRIPT, Long.class), Collections.singletonList(key), String.valueOf(delta), String.valueOf(ttl) ); }
優(yōu)勢:
- 完全原子性執(zhí)行
- 單次網(wǎng)絡(luò)往返
- 精準(zhǔn)判斷首次寫入
方案B:SETNX+INCRBY
public void incrementWithNX(String key, int delta, long ttl) { redisTemplate.executePipelined((RedisCallback<Object>) connection -> { StringRedisConnection conn = (StringRedisConnection) connection; conn.setNX(key, "0"); // 嘗試初始化 conn.incrBy(key, delta); if (conn.setNX(key + ":lock", "1")) { // 簡易鎖判斷首次 conn.expire(key, ttl); conn.expire(key + ":lock", 10); } return null; }); }
適用場景:Redis版本<2.6(不支持Lua)
四、完整生產(chǎn)級實現(xiàn)
4.1 時間窗口計算
public long calculateTtlToNextHour() { LocalDateTime now = LocalDateTime.now(); LocalDateTime nextHour = now.plusHours(1).truncatedTo(ChronoUnit.HOURS); return ChronoUnit.SECONDS.between(now, nextHour); }
4.2 Kafka消費者集成
@Component @RequiredArgsConstructor public class FlowCounter { private final RedisTemplate<String, String> redisTemplate; private static final String KEY_PREFIX = "flow:count:"; @KafkaListener(topics = "${kafka.topic}") public void handleMessages(List<Message> messages) { Map<String, Integer> countMap = messages.stream() .collect(Collectors.toMap( this::buildKey, msg -> 1, Integer::sum )); countMap.forEach((k, v) -> incrementAtomically(k, v, calculateTtlToNextHour()) ); } ??????? private String buildKey(Message msg) { return String.format("%s%s:%s:%s:%s", KEY_PREFIX, msg.getAppId(), msg.getDeviceId(), msg.getIp(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH")) ); } }
4.3 查詢接口
public long getCurrentCount(String appId, String deviceId, String ip) { String key = buildKey(appId, deviceId, ip); String val = redisTemplate.opsForValue().get(key); return val != null ? Long.parseLong(val) : 0L; }
五、性能優(yōu)化技巧
5.1 Pipeline批量處理
redisTemplate.executePipelined((RedisCallback<Object>) connection -> { StringRedisConnection conn = (StringRedisConnection) connection; countMap.forEach((k, v) -> { conn.incrBy(k, v); // 可結(jié)合Lua腳本進一步優(yōu)化 }); return null; });
5.2 本地預(yù)聚合
// 在內(nèi)存中先合并相同Key的計數(shù) Map<String, Integer> localCount = messages.stream() .collect(Collectors.toMap( this::buildKey, m -> 1, Integer::sum ));
5.3 集群部署注意事項
使用{}強制哈希標(biāo)簽,保證相同Key路由到同一節(jié)點
"{flow}:count:app123:..."
考慮分片策略避免熱點
六、異常處理與監(jiān)控
6.1 Redis重試機制
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 100)) public void safeIncrement(String key, int delta) { // 業(yè)務(wù)邏輯 }
6.2 監(jiān)控指標(biāo)
# TYPE redis_operations_total counter redis_operations_total{operation="incr"} 12345 redis_operations_total{operation="expire"} 678
6.3 數(shù)據(jù)補償
@Scheduled(fixedRate = 3600000) public void checkDataConsistency() { // 對比DB與Redis計數(shù)差異 }
七、方案對比總結(jié)
方案 | 優(yōu)點 | 缺點 | 適用場景 |
---|---|---|---|
Lua腳本 | 原子性強,性能最佳 | 需要Redis 2.6+ | 新項目首選 |
SETNX+INCR | 兼容舊版 | 有競態(tài)風(fēng)險 | 遺留系統(tǒng) |
純INCR+TTL | 實現(xiàn)簡單 | TTL冗余 | 不推薦生產(chǎn) |
結(jié)語
通過本文的方案,我們實現(xiàn)了:
- 單機50K+ QPS的計數(shù)能力
- 精確到小時的時間窗口控制
- 分布式環(huán)境下的強一致性
最佳實踐建議:
- 生產(chǎn)環(huán)境優(yōu)先選擇Lua腳本方案
- 對于超高并發(fā)場景(如雙11),可增加本地緩存層
- 定期檢查Redis內(nèi)存使用情況
以上就是高并發(fā)下Redis精確計數(shù)與時間窗口過期的方法詳解的詳細(xì)內(nèi)容,更多關(guān)于Redis高并發(fā)精確計數(shù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
redis實現(xiàn)存儲帖子的點贊狀態(tài)和數(shù)量的示例代碼
使用Redis來實現(xiàn)點贊功能是一種高效的選擇,因為Redis是一個內(nèi)存數(shù)據(jù)庫,適用于處理高并發(fā)的數(shù)據(jù)操作,這篇文章主要介紹了redis實現(xiàn)存儲帖子的點贊狀態(tài)和數(shù)量的示例代碼,需要的朋友可以參考下2023-09-09Redis高級玩法之利用SortedSet實現(xiàn)多維度排序的方法
Redis的SortedSet是可以根據(jù)score進行排序的,以手機應(yīng)用商店的熱門榜單排序為例,根據(jù)下載量倒序排列。接下來通過本文給大家分享Redis高級玩法之利用SortedSet實現(xiàn)多維度排序的方法,一起看看吧2019-07-07