欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java如何利用線程池和Redis實現(xiàn)高效數(shù)據(jù)入庫

 更新時間:2025年02月08日 16:15:55   作者:微笑聽雨。  
文章介紹了如何利用線程池和Redis在高并發(fā)環(huán)境中實現(xiàn)高效的數(shù)據(jù)入庫,通過將數(shù)據(jù)首先存儲在Redis緩存中,然后利用線程池定期批量入庫處理,確保系統(tǒng)的性能和穩(wěn)定性,主要組件包括BatchDataStorageService、CacheService和RedisUtils等

利用線程池和Redis實現(xiàn)高效數(shù)據(jù)入庫

在高并發(fā)環(huán)境中,進行數(shù)據(jù)入庫是一項具有挑戰(zhàn)性的任務。

本文將介紹如何利用線程池和Redis實現(xiàn)數(shù)據(jù)的實時緩存和批量入庫處理,確保系統(tǒng)的性能和穩(wěn)定性。

主要思路和組件介紹

思路概述

在高并發(fā)情況下,數(shù)據(jù)入庫需要解決兩個主要問題:實時性和穩(wěn)定性。

通過將數(shù)據(jù)首先存儲在Redis緩存中,可以快速響應和處理大量的數(shù)據(jù)請求,然后利用線程池定期批量將數(shù)據(jù)從Redis取出并入庫,以減少數(shù)據(jù)庫壓力和提高整體性能。

主要組件

  1. BatchDataStorageService:核心服務類,負責數(shù)據(jù)的實時緩存和定期批量入庫處理。
  2. CacheService:簡易緩存服務類,使用ConcurrentHashMap實現(xiàn)內(nèi)存緩存,用于快速存取和處理數(shù)據(jù)。
  3. RedisUtils:封裝了對Redis的操作,用于數(shù)據(jù)的持久化存儲和高速讀取。
  4. BatchWorker:實現(xiàn)了Runnable接口,處理從Redis中讀取數(shù)據(jù)并執(zhí)行批量入庫的任務。
  5. BatchTimeoutCommitThread:定時監(jiān)控數(shù)據(jù)是否達到設定的批次或超時時間,并觸發(fā)數(shù)據(jù)入庫操作。

詳細代碼解析

  • BatchDataStorageService
package io.jack.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 數(shù)據(jù)批量入庫服務類
 */
@Component
@Slf4j
public class BatchDataStorageService implements InitializingBean {

    /**
     * 最大批次數(shù)量
     */
    @Value("${app.db.maxBatchCount:800}")
    private int maxBatchCount;

    /**
     * 最大線程數(shù)
     */
    @Value("${app.db.maxBatchThreads:100}")
    private int maxBatchThreads;

    /**
     * 超時時間,單位毫秒
     */
    @Value("${app.db.batchTimeout:3000}")
    private int batchTimeout;

    /**
     * 當前批次數(shù)量
     */
    private int batchCount = 0;

    /**
     * 當前批次號
     */
    private static long batchNo = 0;

    /**
     * 線程池執(zhí)行器
     */
    private ExecutorService executorService = null;

    /**
     * 緩存服務
     */
    @Resource
    private CacheService cacheService;

    /**
     * 設備實時服務
     */
    @Resource
    private DeviceRealTimeService deviceRealTimeService;

    /**
     * Redis工具類
     */
    @Resource
    private RedisUtils redisUtils;

    /**
     * 初始化線程池
     */
    @Override
    public void afterPropertiesSet() {
        executorService = Executors.newFixedThreadPool(maxBatchThreads);
    }

    /**
     * 保存設備實時數(shù)據(jù)
     * 
     * @param deviceRealTimeDTO 設備實時數(shù)據(jù)傳輸對象
     */
    public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) {
        final String failedCacheKey = "device:real_time:failed_records";

        try {
            // 生成批次和持續(xù)時間的緩存鍵
            String durationKey = "device:real_time:batchDuration" + batchNo;
            String batchKey = "device:real_time:batch" + batchNo;

            // 如果當前批次持續(xù)時間不存在,則創(chuàng)建并啟動超時處理線程
            if (!cacheService.exists(durationKey)) {
                cacheService.put(durationKey, System.currentTimeMillis());
                new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start();
            }

            // 將設備實時數(shù)據(jù)加入當前批次
            cacheService.lPush(batchKey, deviceRealTimeDTO);
            if (++batchCount >= maxBatchCount) {
                // 達到最大批次,執(zhí)行入庫邏輯
                dataStorage(durationKey, batchKey, failedCacheKey);
            }

        } catch (Exception ex) {
            log.warn("[DB:FAILED] 設備上報記錄入批處理集合異常: " + ex.getMessage() + ", DeviceRealTimeDTO: " + JSON.toJSONString(deviceRealTimeDTO), ex);
            cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
        } finally {
            updateRealTimeData(deviceRealTimeDTO);
        }
    }

    /**
     * 更新實時數(shù)據(jù)到Redis
     * 
     * @param deviceRealTimeDTO 設備實時數(shù)據(jù)傳輸對象
     */
    private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) {
        redisUtils.set("real_time:" + deviceRealTimeDTO.getDeviceId(), JSONArray.toJSONString(deviceRealTimeDTO));
    }

    /**
     * 批量入庫處理
     * 
     * @param durationKey 持續(xù)時間標識
     * @param batchKey    批次標識
     * @param failedCacheKey 錯誤記錄標識
     */
    private void dataStorage(String durationKey, String batchKey, String failedCacheKey) {
        batchNo++;
        batchCount = 0;
        cacheService.del(durationKey);
        if (batchNo >= Long.MAX_VALUE) {
            batchNo = 0;
        }
        executorService.execute(new BatchWorker(batchKey, failedCacheKey));
    }

    /**
     * 批量工作線程
     */
    private class BatchWorker implements Runnable {

        private final String failedCacheKey;
        private final String batchKey;

        public BatchWorker(String batchKey, String failedCacheKey) {
            this.batchKey = batchKey;
            this.failedCacheKey = failedCacheKey;
        }

        @Override
        public void run() {
            final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>();
            try {
                // 從緩存中獲取批量數(shù)據(jù)
                DeviceRealTimeDTO deviceRealTimeDTO = cacheService.lPop(batchKey);
                while (deviceRealTimeDTO != null) {
                    deviceRealTimeDTOList.add(deviceRealTimeDTO);
                    deviceRealTimeDTO = cacheService.lPop(batchKey);
                }

                long timeMillis = System.currentTimeMillis();

                try {
                    // 將DTO轉(zhuǎn)換為實體對象并批量入庫
                    List<DeviceRealTimeEntity> deviceRealTimeEntityList = ConvertUtils.sourceToTarget(deviceRealTimeDTOList, DeviceRealTimeEntity.class);
                    deviceRealTimeService.insertBatch(deviceRealTimeEntityList);
                } finally {
                    cacheService.del(batchKey);
                    log.info("[DB:BATCH_WORKER] 批次:" + batchKey + ",保存設備上報記錄數(shù):" + deviceRealTimeDTOList.size() + ", 耗時:" + (System.currentTimeMillis() - timeMillis) + "ms");
                }
            } catch (Exception e) {
                log.warn("[DB:FAILED] 設備上報記錄批量入庫失?。? + e.getMessage() + ", DeviceRealTimeDTO: " + deviceRealTimeDTOList.size(), e);
                for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) {
                    cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
                }
            }
        }
    }

    /**
     * 批次超時提交線程
     */
    class BatchTimeoutCommitThread extends Thread {

        private final String batchKey;
        private final String durationKey;
        private final String failedCacheKey;

        public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) {
            this.batchKey = batchKey;
            this.durationKey = durationKey;
            this.failedCacheKey = failedCacheKey;
            this.setName("batch-thread-" + batchKey);
        }

        @Override
        public void run() {
            try {
                Thread.sleep(batchTimeout);
            } catch (InterruptedException e) {
                log.error("[DB] 內(nèi)部錯誤,直接提交:" + e.getMessage());
            }

            if (cacheService.exists(durationKey)) {
                // 達到最大批次的超時間,執(zhí)行入庫邏輯
                dataStorage(durationKey, batchKey, failedCacheKey);
            }
        }
    }
}
  • CacheService
package io.jack.service;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 緩存服務類,提供簡易的緩存機制
 */
@Component
public class CacheService implements InitializingBean {

    /**
     * 內(nèi)存緩存,用于存儲數(shù)據(jù)
     */
    private Map<String, Object> objectCache = new ConcurrentHashMap<>();

    /**
     * 統(tǒng)計緩存,用于統(tǒng)計計數(shù)
     */
    private Map<String, AtomicLong> statCache = new ConcurrentHashMap<>();

    /**
     * 初始化統(tǒng)計緩存
     */
    @Override
    public void afterPropertiesSet() {
        statCache.put("terminals", new AtomicLong(0));
        statCache.put("connections", new AtomicLong(0));
    }

    /**
     * 增加指定統(tǒng)計項的計數(shù)
     * 
     * @param statName 統(tǒng)計項名稱
     * @return 增加后的計數(shù)值
     */
    public long incr

(String statName) {
        statCache.putIfAbsent(statName, new AtomicLong(0));
        return statCache.get(statName).incrementAndGet();
    }

    /**
     * 減少指定統(tǒng)計項的計數(shù)
     * 
     * @param statName 統(tǒng)計項名稱
     * @return 減少后的計數(shù)值
     */
    public long decr(String statName) {
        statCache.putIfAbsent(statName, new AtomicLong(0));
        return statCache.get(statName).decrementAndGet();
    }

    /**
     * 獲取指定統(tǒng)計項的當前計數(shù)值
     * 
     * @param statName 統(tǒng)計項名稱
     * @return 當前計數(shù)值
     */
    public long stat(String statName) {
        statCache.putIfAbsent(statName, new AtomicLong(0));
        return statCache.get(statName).get();
    }

    /**
     * 存儲數(shù)據(jù)
     * 
     * @param key 緩存鍵
     * @param object 緩存數(shù)據(jù)
     */
    public <T> void put(String key, T object) {
        objectCache.put(key, object);
    }

    /**
     * 獲取數(shù)據(jù)
     * 
     * @param key 緩存鍵
     * @return 緩存數(shù)據(jù)
     */
    public <T> T get(String key) {
        return (T) objectCache.get(key);
    }

    /**
     * 刪除數(shù)據(jù)
     * 
     * @param key 緩存鍵
     */
    public void remove(String key) {
        objectCache.remove(key);
    }

    /**
     * 存儲哈希表數(shù)據(jù)
     * 
     * @param key 哈希表鍵
     * @param subkey 哈希表子鍵
     * @param value 哈希表值
     */
    public void hSet(String key, String subkey, Object value) {
        synchronized (objectCache) {
            Map<String, Object> submap = (Map<String, Object>) objectCache.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
            submap.put(subkey, value);
        }
    }

    /**
     * 獲取哈希表數(shù)據(jù)
     * 
     * @param key 哈希表鍵
     * @param subkey 哈希表子鍵
     * @return 哈希表值
     */
    public <T> T hGet(String key, String subkey) {
        synchronized (objectCache) {
            Map<String, Object> submap = (Map<String, Object>) objectCache.get(key);
            return submap != null ? (T) submap.get(subkey) : null;
        }
    }

    /**
     * 判斷哈希表子鍵是否存在
     * 
     * @param key 哈希表鍵
     * @param subkey 哈希表子鍵
     * @return 是否存在
     */
    public boolean hExists(String key, String subkey) {
        synchronized (objectCache) {
            Map<String, Object> submap = (Map<String, Object>) objectCache.get(key);
            return submap != null && submap.containsKey(subkey);
        }
    }

    /**
     * 將數(shù)據(jù)推入列表
     * 
     * @param key 列表鍵
     * @param value 列表值
     */
    public void lPush(String key, Object value) {
        synchronized (objectCache) {
            LinkedList<Object> queue = (LinkedList<Object>) objectCache.computeIfAbsent(key, k -> new LinkedList<>());
            queue.addLast(value);
        }
    }

    /**
     * 從列表中彈出數(shù)據(jù)
     * 
     * @param key 列表鍵
     * @return 列表值
     */
    public <T> T lPop(String key) {
        synchronized (objectCache) {
            LinkedList<Object> queue = (LinkedList<Object>) objectCache.get(key);
            return queue != null && !queue.isEmpty() ? (T) queue.removeLast() : null;
        }
    }

    /**
     * 刪除緩存數(shù)據(jù)
     * 
     * @param key 緩存鍵
     */
    public void del(String key) {
        objectCache.remove(key);
    }

    /**
     * 判斷緩存鍵是否存在
     * 
     * @param key 緩存鍵
     * @return 是否存在
     */
    public boolean exists(String key) {
        return objectCache.containsKey(key);
    }
}

詳細講解

BatchDataStorageService

字段和初始化

  • maxBatchCount:配置文件中指定的最大批次大小,用于控制每批處理的數(shù)據(jù)量。
  • maxBatchThreads:線程池的最大線程數(shù),影響處理并發(fā)能力。
  • batchTimeout:批次超時時間,用于控制數(shù)據(jù)處理的最遲時間。
  • batchCount:當前批次中的數(shù)據(jù)條數(shù),用于判斷是否需要提交批次數(shù)據(jù)。
  • batchNo:批次號,用于標識不同的批次。
  • executorService:用于執(zhí)行批量處理任務的線程池。
  • cacheService、deviceRealTimeService、redisUtils:分別用于緩存操作、數(shù)據(jù)存儲和 Redis 操作。

方法詳解

  • afterPropertiesSet:初始化線程池,以便在后續(xù)操作中執(zhí)行批量處理任務。
  • saveRealTimeData
    • 將實時數(shù)據(jù)推入緩存中,檢查是否需要提交批次數(shù)據(jù)。
    • 如果超時或數(shù)據(jù)量達到閾值,則調(diào)用 dataStorage 方法處理數(shù)據(jù)。
  • updateRealTimeData:將數(shù)據(jù)更新到 Redis,確保實時數(shù)據(jù)的可用性。
  • dataStorage
    • 執(zhí)行批量數(shù)據(jù)的存儲操作,并提交工作線程處理數(shù)據(jù)。
  • BatchWorker
    • 從緩存中獲取數(shù)據(jù)并執(zhí)行入庫操作,將成功的數(shù)據(jù)記錄日志,將失敗的數(shù)據(jù)放入失敗緩存。
  • BatchTimeoutCommitThread
    • 處理批次超時邏輯,即使在未滿批次的情況下也會提交數(shù)據(jù),確保數(shù)據(jù)及時處理。

CacheService

字段

  • objectCache:用于存儲普通緩存數(shù)據(jù)。
  • statCache:用于存儲統(tǒng)計數(shù)據(jù),例如計數(shù)器等。

方法詳解

  • put/get/remove:基本的緩存操作,支持存儲、獲取和刪除數(shù)據(jù)。
  • hSet/hGet/hExists
    • 對哈希表進行操作,支持設置、獲取和檢查鍵值對。
  • lPush/lPop
    • 對列表進行操作,支持推入和彈出數(shù)據(jù)。
  • incr/decr/stat
    • 對統(tǒng)計數(shù)據(jù)進行操作,支持增加、減少和獲取當前值。

總結(jié)

本文介紹了如何在高并發(fā)環(huán)境下利用線程池和Redis實現(xiàn)高效的數(shù)據(jù)入庫。通過將數(shù)據(jù)首先存入Redis緩存,并利用線程池定期批量處理入庫操作,能夠有效提升系統(tǒng)的性能和穩(wěn)定性。完整代碼包括核心的BatchDataStorageService服務類、CacheService緩存服務類以及RedisUtils工具類,均提供了詳細的注釋和解析,以便讀者理解和實現(xiàn)類似的高并發(fā)數(shù)據(jù)處理系統(tǒng)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

最新評論