Java如何利用線程池和Redis實現(xiàn)高效數(shù)據(jù)入庫
利用線程池和Redis實現(xiàn)高效數(shù)據(jù)入庫
在高并發(fā)環(huán)境中,進行數(shù)據(jù)入庫是一項具有挑戰(zhàn)性的任務。
本文將介紹如何利用線程池和Redis實現(xiàn)數(shù)據(jù)的實時緩存和批量入庫處理,確保系統(tǒng)的性能和穩(wěn)定性。
主要思路和組件介紹
思路概述
在高并發(fā)情況下,數(shù)據(jù)入庫需要解決兩個主要問題:實時性和穩(wěn)定性。
通過將數(shù)據(jù)首先存儲在Redis緩存中,可以快速響應和處理大量的數(shù)據(jù)請求,然后利用線程池定期批量將數(shù)據(jù)從Redis取出并入庫,以減少數(shù)據(jù)庫壓力和提高整體性能。
主要組件
- BatchDataStorageService:核心服務類,負責數(shù)據(jù)的實時緩存和定期批量入庫處理。
- CacheService:簡易緩存服務類,使用ConcurrentHashMap實現(xiàn)內(nèi)存緩存,用于快速存取和處理數(shù)據(jù)。
- RedisUtils:封裝了對Redis的操作,用于數(shù)據(jù)的持久化存儲和高速讀取。
- BatchWorker:實現(xiàn)了Runnable接口,處理從Redis中讀取數(shù)據(jù)并執(zhí)行批量入庫的任務。
- BatchTimeoutCommitThread:定時監(jiān)控數(shù)據(jù)是否達到設定的批次或超時時間,并觸發(fā)數(shù)據(jù)入庫操作。
詳細代碼解析
- BatchDataStorageService
package io.jack.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 數(shù)據(jù)批量入庫服務類 */ @Component @Slf4j public class BatchDataStorageService implements InitializingBean { /** * 最大批次數(shù)量 */ @Value("${app.db.maxBatchCount:800}") private int maxBatchCount; /** * 最大線程數(shù) */ @Value("${app.db.maxBatchThreads:100}") private int maxBatchThreads; /** * 超時時間,單位毫秒 */ @Value("${app.db.batchTimeout:3000}") private int batchTimeout; /** * 當前批次數(shù)量 */ private int batchCount = 0; /** * 當前批次號 */ private static long batchNo = 0; /** * 線程池執(zhí)行器 */ private ExecutorService executorService = null; /** * 緩存服務 */ @Resource private CacheService cacheService; /** * 設備實時服務 */ @Resource private DeviceRealTimeService deviceRealTimeService; /** * Redis工具類 */ @Resource private RedisUtils redisUtils; /** * 初始化線程池 */ @Override public void afterPropertiesSet() { executorService = Executors.newFixedThreadPool(maxBatchThreads); } /** * 保存設備實時數(shù)據(jù) * * @param deviceRealTimeDTO 設備實時數(shù)據(jù)傳輸對象 */ public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { final String failedCacheKey = "device:real_time:failed_records"; try { // 生成批次和持續(xù)時間的緩存鍵 String durationKey = "device:real_time:batchDuration" + batchNo; String batchKey = "device:real_time:batch" + batchNo; // 如果當前批次持續(xù)時間不存在,則創(chuàng)建并啟動超時處理線程 if (!cacheService.exists(durationKey)) { cacheService.put(durationKey, System.currentTimeMillis()); new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start(); } // 將設備實時數(shù)據(jù)加入當前批次 cacheService.lPush(batchKey, deviceRealTimeDTO); if (++batchCount >= maxBatchCount) { // 達到最大批次,執(zhí)行入庫邏輯 dataStorage(durationKey, batchKey, failedCacheKey); } } catch (Exception ex) { log.warn("[DB:FAILED] 設備上報記錄入批處理集合異常: " + ex.getMessage() + ", DeviceRealTimeDTO: " + JSON.toJSONString(deviceRealTimeDTO), ex); cacheService.lPush(failedCacheKey, deviceRealTimeDTO); } finally { updateRealTimeData(deviceRealTimeDTO); } } /** * 更新實時數(shù)據(jù)到Redis * * @param deviceRealTimeDTO 設備實時數(shù)據(jù)傳輸對象 */ private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { redisUtils.set("real_time:" + deviceRealTimeDTO.getDeviceId(), JSONArray.toJSONString(deviceRealTimeDTO)); } /** * 批量入庫處理 * * @param durationKey 持續(xù)時間標識 * @param batchKey 批次標識 * @param failedCacheKey 錯誤記錄標識 */ private void dataStorage(String durationKey, String batchKey, String failedCacheKey) { batchNo++; batchCount = 0; cacheService.del(durationKey); if (batchNo >= Long.MAX_VALUE) { batchNo = 0; } executorService.execute(new BatchWorker(batchKey, failedCacheKey)); } /** * 批量工作線程 */ private class BatchWorker implements Runnable { private final String failedCacheKey; private final String batchKey; public BatchWorker(String batchKey, String failedCacheKey) { this.batchKey = batchKey; this.failedCacheKey = failedCacheKey; } @Override public void run() { final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>(); try { // 從緩存中獲取批量數(shù)據(jù) DeviceRealTimeDTO deviceRealTimeDTO = cacheService.lPop(batchKey); while (deviceRealTimeDTO != null) { deviceRealTimeDTOList.add(deviceRealTimeDTO); deviceRealTimeDTO = cacheService.lPop(batchKey); } long timeMillis = System.currentTimeMillis(); try { // 將DTO轉(zhuǎn)換為實體對象并批量入庫 List<DeviceRealTimeEntity> deviceRealTimeEntityList = ConvertUtils.sourceToTarget(deviceRealTimeDTOList, DeviceRealTimeEntity.class); deviceRealTimeService.insertBatch(deviceRealTimeEntityList); } finally { cacheService.del(batchKey); log.info("[DB:BATCH_WORKER] 批次:" + batchKey + ",保存設備上報記錄數(shù):" + deviceRealTimeDTOList.size() + ", 耗時:" + (System.currentTimeMillis() - timeMillis) + "ms"); } } catch (Exception e) { log.warn("[DB:FAILED] 設備上報記錄批量入庫失?。? + e.getMessage() + ", DeviceRealTimeDTO: " + deviceRealTimeDTOList.size(), e); for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) { cacheService.lPush(failedCacheKey, deviceRealTimeDTO); } } } } /** * 批次超時提交線程 */ class BatchTimeoutCommitThread extends Thread { private final String batchKey; private final String durationKey; private final String failedCacheKey; public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) { this.batchKey = batchKey; this.durationKey = durationKey; this.failedCacheKey = failedCacheKey; this.setName("batch-thread-" + batchKey); } @Override public void run() { try { Thread.sleep(batchTimeout); } catch (InterruptedException e) { log.error("[DB] 內(nèi)部錯誤,直接提交:" + e.getMessage()); } if (cacheService.exists(durationKey)) { // 達到最大批次的超時間,執(zhí)行入庫邏輯 dataStorage(durationKey, batchKey, failedCacheKey); } } } }
- CacheService
package io.jack.service; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; /** * 緩存服務類,提供簡易的緩存機制 */ @Component public class CacheService implements InitializingBean { /** * 內(nèi)存緩存,用于存儲數(shù)據(jù) */ private Map<String, Object> objectCache = new ConcurrentHashMap<>(); /** * 統(tǒng)計緩存,用于統(tǒng)計計數(shù) */ private Map<String, AtomicLong> statCache = new ConcurrentHashMap<>(); /** * 初始化統(tǒng)計緩存 */ @Override public void afterPropertiesSet() { statCache.put("terminals", new AtomicLong(0)); statCache.put("connections", new AtomicLong(0)); } /** * 增加指定統(tǒng)計項的計數(shù) * * @param statName 統(tǒng)計項名稱 * @return 增加后的計數(shù)值 */ public long incr (String statName) { statCache.putIfAbsent(statName, new AtomicLong(0)); return statCache.get(statName).incrementAndGet(); } /** * 減少指定統(tǒng)計項的計數(shù) * * @param statName 統(tǒng)計項名稱 * @return 減少后的計數(shù)值 */ public long decr(String statName) { statCache.putIfAbsent(statName, new AtomicLong(0)); return statCache.get(statName).decrementAndGet(); } /** * 獲取指定統(tǒng)計項的當前計數(shù)值 * * @param statName 統(tǒng)計項名稱 * @return 當前計數(shù)值 */ public long stat(String statName) { statCache.putIfAbsent(statName, new AtomicLong(0)); return statCache.get(statName).get(); } /** * 存儲數(shù)據(jù) * * @param key 緩存鍵 * @param object 緩存數(shù)據(jù) */ public <T> void put(String key, T object) { objectCache.put(key, object); } /** * 獲取數(shù)據(jù) * * @param key 緩存鍵 * @return 緩存數(shù)據(jù) */ public <T> T get(String key) { return (T) objectCache.get(key); } /** * 刪除數(shù)據(jù) * * @param key 緩存鍵 */ public void remove(String key) { objectCache.remove(key); } /** * 存儲哈希表數(shù)據(jù) * * @param key 哈希表鍵 * @param subkey 哈希表子鍵 * @param value 哈希表值 */ public void hSet(String key, String subkey, Object value) { synchronized (objectCache) { Map<String, Object> submap = (Map<String, Object>) objectCache.computeIfAbsent(key, k -> new ConcurrentHashMap<>()); submap.put(subkey, value); } } /** * 獲取哈希表數(shù)據(jù) * * @param key 哈希表鍵 * @param subkey 哈希表子鍵 * @return 哈希表值 */ public <T> T hGet(String key, String subkey) { synchronized (objectCache) { Map<String, Object> submap = (Map<String, Object>) objectCache.get(key); return submap != null ? (T) submap.get(subkey) : null; } } /** * 判斷哈希表子鍵是否存在 * * @param key 哈希表鍵 * @param subkey 哈希表子鍵 * @return 是否存在 */ public boolean hExists(String key, String subkey) { synchronized (objectCache) { Map<String, Object> submap = (Map<String, Object>) objectCache.get(key); return submap != null && submap.containsKey(subkey); } } /** * 將數(shù)據(jù)推入列表 * * @param key 列表鍵 * @param value 列表值 */ public void lPush(String key, Object value) { synchronized (objectCache) { LinkedList<Object> queue = (LinkedList<Object>) objectCache.computeIfAbsent(key, k -> new LinkedList<>()); queue.addLast(value); } } /** * 從列表中彈出數(shù)據(jù) * * @param key 列表鍵 * @return 列表值 */ public <T> T lPop(String key) { synchronized (objectCache) { LinkedList<Object> queue = (LinkedList<Object>) objectCache.get(key); return queue != null && !queue.isEmpty() ? (T) queue.removeLast() : null; } } /** * 刪除緩存數(shù)據(jù) * * @param key 緩存鍵 */ public void del(String key) { objectCache.remove(key); } /** * 判斷緩存鍵是否存在 * * @param key 緩存鍵 * @return 是否存在 */ public boolean exists(String key) { return objectCache.containsKey(key); } }
詳細講解
BatchDataStorageService
字段和初始化:
maxBatchCount
:配置文件中指定的最大批次大小,用于控制每批處理的數(shù)據(jù)量。maxBatchThreads
:線程池的最大線程數(shù),影響處理并發(fā)能力。batchTimeout
:批次超時時間,用于控制數(shù)據(jù)處理的最遲時間。batchCount
:當前批次中的數(shù)據(jù)條數(shù),用于判斷是否需要提交批次數(shù)據(jù)。batchNo
:批次號,用于標識不同的批次。executorService
:用于執(zhí)行批量處理任務的線程池。cacheService
、deviceRealTimeService
、redisUtils
:分別用于緩存操作、數(shù)據(jù)存儲和 Redis 操作。
方法詳解:
afterPropertiesSet
:初始化線程池,以便在后續(xù)操作中執(zhí)行批量處理任務。saveRealTimeData
:- 將實時數(shù)據(jù)推入緩存中,檢查是否需要提交批次數(shù)據(jù)。
- 如果超時或數(shù)據(jù)量達到閾值,則調(diào)用
dataStorage
方法處理數(shù)據(jù)。
updateRealTimeData
:將數(shù)據(jù)更新到 Redis,確保實時數(shù)據(jù)的可用性。dataStorage
:- 執(zhí)行批量數(shù)據(jù)的存儲操作,并提交工作線程處理數(shù)據(jù)。
BatchWorker
:- 從緩存中獲取數(shù)據(jù)并執(zhí)行入庫操作,將成功的數(shù)據(jù)記錄日志,將失敗的數(shù)據(jù)放入失敗緩存。
BatchTimeoutCommitThread
:- 處理批次超時邏輯,即使在未滿批次的情況下也會提交數(shù)據(jù),確保數(shù)據(jù)及時處理。
CacheService
字段:
objectCache
:用于存儲普通緩存數(shù)據(jù)。statCache
:用于存儲統(tǒng)計數(shù)據(jù),例如計數(shù)器等。
方法詳解:
put/get/remove
:基本的緩存操作,支持存儲、獲取和刪除數(shù)據(jù)。hSet/hGet/hExists
:- 對哈希表進行操作,支持設置、獲取和檢查鍵值對。
lPush/lPop
:- 對列表進行操作,支持推入和彈出數(shù)據(jù)。
incr/decr/stat
:- 對統(tǒng)計數(shù)據(jù)進行操作,支持增加、減少和獲取當前值。
總結(jié)
本文介紹了如何在高并發(fā)環(huán)境下利用線程池和Redis實現(xiàn)高效的數(shù)據(jù)入庫。通過將數(shù)據(jù)首先存入Redis緩存,并利用線程池定期批量處理入庫操作,能夠有效提升系統(tǒng)的性能和穩(wěn)定性。完整代碼包括核心的BatchDataStorageService服務類、CacheService緩存服務類以及RedisUtils工具類,均提供了詳細的注釋和解析,以便讀者理解和實現(xiàn)類似的高并發(fā)數(shù)據(jù)處理系統(tǒng)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
繼承JpaRepository后,找不到findOne()方法的解決
這篇文章主要介紹了繼承JpaRepository后,找不到findOne()方法的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08Docker環(huán)境下Spring Boot應用內(nèi)存飆升分析與解決場景分析
當運行一個Spring Boot項目時,如果未設置JVM內(nèi)存參數(shù),Spring Boot默認會采用JVM自身默認的配置策略,接下來通過本文給大家介紹Docker環(huán)境下Spring Boot應用內(nèi)存飆升分析與解決方法,需要的朋友參考下吧2021-08-08mybatisplus where QueryWrapper加括號嵌套查詢方式
這篇文章主要介紹了mybatisplus where QueryWrapper加括號嵌套查詢方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教。2022-01-01用Java設計模式中的觀察者模式開發(fā)微信公眾號的例子
這篇文章主要介紹了用Java設計模式中的觀察者模式開發(fā)微信公眾號的例子,這里Java的微信SDK等部分便不再詳述,只注重關鍵部分和開發(fā)過程中觀察者模式優(yōu)點的體現(xiàn),需要的朋友可以參考下2016-02-02Java畢業(yè)設計實戰(zhàn)之仿小米電子產(chǎn)品售賣商城系統(tǒng)的實現(xiàn)
這是一個使用了java+SpringBoot+Vue+MySQL+Redis+ElementUI開發(fā)的仿小米商城系統(tǒng),是一個畢業(yè)設計的實戰(zhàn)練習,具有小米商城該有的所有基礎功能,感興趣的朋友快來看看吧2022-01-01