欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

JAVA實(shí)現(xiàn)億級(jí)千萬(wàn)級(jí)數(shù)據(jù)順序?qū)С龅氖纠a

 更新時(shí)間:2025年08月22日 09:42:51   作者:云往而生  
本文主要介紹了JAVA實(shí)現(xiàn)億級(jí)千萬(wàn)級(jí)數(shù)據(jù)順序?qū)С龅氖纠a,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

前提:主要考慮控制內(nèi)存占用空間,避免出現(xiàn)同時(shí)導(dǎo)出,導(dǎo)致主程序OOM問(wèn)題。

實(shí)現(xiàn)思路:

A.啟用線程池,分頁(yè)讀取數(shù)據(jù)

B.使用 PriorityBlockingQueue 隊(duì)列存儲(chǔ)查詢出來(lái)的數(shù)據(jù),方便寫入線程去

優(yōu)先級(jí)隊(duì)列特性 PriorityBlockingQueue是一個(gè)優(yōu)先級(jí)隊(duì)列,這意味著它里面的元素是按照某種優(yōu)先級(jí)順序進(jìn)行排列的。元素的優(yōu)先級(jí)是通過(guò)元素自身的自然順序(如果元素實(shí)現(xiàn)了Comparable接口)或者通過(guò)一個(gè)自定義的比較器(Comparator)來(lái)確定的。 當(dāng)從隊(duì)列中獲取元素時(shí),具有最高優(yōu)先級(jí)的元素會(huì)被首先返回。例如,在一個(gè)存儲(chǔ)任務(wù)的PriorityBlockingQueue中,緊急任務(wù)可以被定義為具有較高優(yōu)先級(jí),這樣它們就能在普通任務(wù)之前被執(zhí)行。 阻塞隊(duì)列特性 作為一個(gè)阻塞隊(duì)列,PriorityBlockingQueue提供了阻塞操作。當(dāng)隊(duì)列為空時(shí),試圖從隊(duì)列中獲取元素的線程會(huì)被阻塞,直到隊(duì)列中有可用的元素。 同樣,當(dāng)隊(duì)列已滿(不過(guò)PriorityBlockingQueue在理論上是無(wú)界的,這個(gè)情況比較特殊,后面會(huì)詳細(xì)說(shuō))時(shí),試圖向隊(duì)列中添加元素的線程會(huì)被阻塞,直到隊(duì)列中有足夠的空間。這種阻塞特性使得它在多線程環(huán)境下能夠有效地協(xié)調(diào)生產(chǎn)者 - 消費(fèi)者模式

C.開(kāi)啟單獨(dú)的一個(gè)線程,做讀取寫入,利用join()方法 等待所有寫入結(jié)束,直接返回。

CompletableFuture.runAsync(() -> System.out.println("執(zhí)行一個(gè)異步任務(wù)"));

D.利用 CountDownLatch 做讀取數(shù)據(jù)任務(wù)阻塞。

E.字典轉(zhuǎn)換是用的反射。

以下是代碼實(shí)現(xiàn)

1.配置線程池

/**
 * 導(dǎo)出線程池配置
 */
@Configuration
@Slf4j
public class ThreadPoolExportExecutorConfig {
 
    @Bean("ExportServiceExecutor")
    @Primary
    public Executor exportServiceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心線程數(shù)
        executor.setCorePoolSize(12);
        //配置最大線程數(shù)
        executor.setMaxPoolSize(20);
        //空閑時(shí)間
        executor.setKeepAliveSeconds(60);
        //配置隊(duì)列大小
        executor.setQueueCapacity(100);
        //配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix("ExportThread-");
        // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
        // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執(zhí)行初始化
        executor.initialize();
        return executor;
    }
}

2.轉(zhuǎn)換工具類

public class PojoInfoUtil {
 
    /**
     * ListDTO轉(zhuǎn)換
     *
     * @param <E>       entity類
     * @param <D>       DTO類
     * @param listInfoE listInfoE<E>類對(duì)象
     * @return List<D>  轉(zhuǎn)換后List<D>
     */
    public static <E, D> List<D> listInfoToDTO(List<E> listInfo, Class<D> dtoClass) {
 
        if (CollectionUtils.isEmpty(listInfo)) {
            return Lists.newArrayList();
        }
 
        // 創(chuàng)建 ModelMapper 實(shí)例
        ModelMapper modelMapper = new ModelMapper();
 
        // 設(shè)置匹配策略為基于字段名稱
        modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
 
        // 使用流和 ModelMapper 進(jìn)行轉(zhuǎn)換
        List<D> list = listInfo.stream()
                .map(entity -> modelMapper.map(entity, dtoClass))
                .collect(Collectors.toList());
 
        // 釋放舊數(shù)據(jù)
        listInfo.clear();
 
        return list;
    }

3.導(dǎo)出工具類

@Component
public class MultiThreadExportUtil {
 
    @Resource(name = "ExportServiceExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
 
    @Autowired
    private DicAdapter dicAdapter;
 
    /**
     *
     * @param exportClazz 導(dǎo)出實(shí)體類
     * @param dictFieldNames 字典轉(zhuǎn)換字段
     * @param fileName 導(dǎo)出文件名稱
     * @param SheetNamePre sheet頁(yè)名前綴
     * @param service 查詢接口類
     * @param queryWrapper 查詢條件
     */
    public void exprtBythread(Class<?> exportClazz,List<String> dictFieldNames,String fileName,String SheetNamePre,IService service, LambdaQueryWrapper<?> queryWrapper){
        HttpServletResponse response = HttpServletUtil.getResponse();
        Map<String, String> threadExportLimit = dicAdapter.getBasicInfoDicMapInfo("threadExportLimit");
 
        if(ObjectUtils.isEmpty(threadExportLimit) || !threadExportLimit.containsKey("limit") || !threadExportLimit.containsKey("maxLimit")){
            throw new BizException("請(qǐng)?zhí)崆芭渲脤?dǎo)出數(shù)量限制");
        }
 
        //每次請(qǐng)求限制條數(shù)
        int limit = Integer.parseInt(threadExportLimit.get("limit"));
        //最大限制條數(shù)
        int maxLimit = Integer.parseInt(threadExportLimit.get("maxLimit"));
 
        int count = service.count(queryWrapper);
 
        if(count > maxLimit) throw new BizException("導(dǎo)出條數(shù)超出最大條數(shù)" + maxLimit + "限制,請(qǐng)調(diào)整查詢條件");
 
 
        //設(shè)置響應(yīng)頭
        response.setContentType("application/vnd.ms-excel");
        response.setCharacterEncoding("utf-8");
        try {
            String name = URLEncoder.encode(fileName, "UTF-8");
            response.setHeader("Content-disposition", "attachment;filename=" + name + ".xlsx");
        } catch (Exception e) {
            throw new BizException(e.getMessage());
        }
 
        // 查詢次數(shù)
        int i = (count + limit - 1) / limit;
 
        AtomicReference<ExcelWriter> excelWriterRef = new AtomicReference<>();
        ServletOutputStream outputStream = null;
 
        try {
            outputStream = response.getOutputStream();
            excelWriterRef.set(EasyExcel.write(outputStream, exportClazz).build());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
 
        if(i == 0){
            WriteSheet writeSheet = EasyExcel.writerSheet(SheetNamePre).head(exportClazz)
                    .registerWriteHandler(new LongestMatchColumnWidthStyleStrategy()).build();
            excelWriterRef.get().write(null, writeSheet);
            excelWriterRef.get().finish();
            return;
        }
 
 
        //計(jì)數(shù)器
        CountDownLatch countDownLatch = new CountDownLatch(i);
 
 
        // 使用 PriorityBlockingQueue 作為查詢結(jié)果的緩沖區(qū)
        BlockingQueue<ThreadQueryResult> resultQueue = new PriorityBlockingQueue<ThreadQueryResult>();
 
 
        for(int j = 0; j < i; j++){
 
            int pageNo =  j + 1;
            String sheetName = SheetNamePre + pageNo;
            threadPoolTaskExecutor.execute(()->{
                try {
 
                    IPage<T> page = service.page(new Page<>(pageNo, limit), queryWrapper);
 
                    List<T> records = page.getRecords();
 
                    //轉(zhuǎn)換
                    List<?> exportList = PojoInfoUtil.listInfoToDTO(records, exportClazz);
                    //處理字典數(shù)據(jù)
                    convertFieldWithDictionary(exportList,dictFieldNames);
 
                    resultQueue.put(new ThreadQueryResult(pageNo, sheetName, exportList));
 
                }catch (Exception e){
                    e.printStackTrace();
                    LogUtil.info("查詢第" + pageNo + "頁(yè)時(shí),發(fā)生異常" + e.getMessage());
                    throw new BizException("查詢第" + pageNo + "頁(yè)時(shí),發(fā)生異常" + e.getMessage());
                }
            });
            countDownLatch.countDown();
        }
 
        // 啟動(dòng)一個(gè)寫入線程  按照查詢順序讀取 寫入
        CompletableFuture<Void> writeFuture  = CompletableFuture.runAsync(() -> {
            for (int pageNo = 1; pageNo <= i; ) {
                boolean interrupted = false;
                try {
                    while (true) {
                        ThreadQueryResult result = resultQueue.take();
                        if (result.getPageNo() == pageNo) {
 
                            WriteSheet writeSheet = EasyExcel.writerSheet(result.getSheetName()).head(exportClazz)
                                    .registerWriteHandler(new LongestMatchColumnWidthStyleStrategy()).build();
 
                            ExcelWriter excelWriter = excelWriterRef.get();
                            excelWriter.write(result.getRecords(), writeSheet);
                            result.getRecords().clear(); //及時(shí)釋放寫入數(shù)據(jù)
 
                            pageNo++;
                            break;
                        } else {
                            // 如果不是當(dāng)前需要的 pageNo,則放回隊(duì)列
                            resultQueue.put(result);
                        }
                    }
                } catch (InterruptedException e) {
                    interrupted = true;
                    LogUtil.error("寫入線程被中斷: " + e.getMessage());
                } finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt(); // 重新設(shè)置中斷標(biāo)志
                    }
                }
            }
        }, threadPoolTaskExecutor);
 
        // 等待所有任務(wù)完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //等待寫入線程完成
        writeFuture.join();
 
        // 關(guān)閉 ExcelWriter
        ExcelWriter excelWriter = excelWriterRef.get();
        if (excelWriter != null) {
            excelWriter.finish();
        }
    }
 
 
 
 
    /**
     * 對(duì)給定的對(duì)象列表中的指定字段進(jìn)行字典轉(zhuǎn)換。
     *
     * @param records   要轉(zhuǎn)換的對(duì)象列表
     * @param fieldDictNames 字段名列表(帶前綴)
     * @param <T>       對(duì)象類型
     */
    public <T> void convertFieldWithDictionary(List<T> records, List<String> fieldDictNames) {
        if (CollectionUtils.isEmpty(records) || CollectionUtils.isEmpty(fieldDictNames)) {
            return;
        }
 
        Map<String, Map<String, String>> dictMap = dicAdapter.handleStaticBasicInfoDicMap();
        try {
            for (String fieldDictName : fieldDictNames) {
                //前綴區(qū)分各實(shí)體類字段
                if(StringUtils.isBlank(fieldDictName) || !fieldDictName.contains("-")){
                    return;
                }
                String fieldName = fieldDictName.split("-")[1];
                for (T record : records) {
 
                    Field field = record.getClass().getDeclaredField(fieldName);
                    field.setAccessible(true);
 
                    Object fieldValue = field.get(record);
                    if (fieldValue != null && dictMap.containsKey(fieldDictName)) {
 
                        Map<String, String> dictValueMap = dictMap.get(fieldDictName);
 
                        if (dictValueMap != null && dictValueMap.containsKey(fieldValue)) {
                            String dictValue = dictValueMap.get(fieldValue);
                            if (dictValue != null) {
                                field.set(record, dictValue);
                            }
                        }
                    }
                }
            }
        } catch(Exception e){
            LogUtil.error("導(dǎo)出轉(zhuǎn)換字典失敗,請(qǐng)聯(lián)系管理員" + e);
            throw new BizException("導(dǎo)出轉(zhuǎn)換字典失敗,請(qǐng)聯(lián)系管理員"+ e);
        }
    }
}

到此這篇關(guān)于JAVA實(shí)現(xiàn)億級(jí)千萬(wàn)級(jí)數(shù)據(jù)順序?qū)С龅氖纠a的文章就介紹到這了,更多相關(guān)JAVA 億級(jí)千萬(wàn)級(jí)順序?qū)С鰞?nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論