Java實(shí)現(xiàn)多線程大批量同步數(shù)據(jù)(分頁)
背景
最近遇到個(gè)功能,兩個(gè)月有300w+的數(shù)據(jù),之后還在累加,因一開始該數(shù)據(jù)就全部存儲(chǔ)在mysql表,現(xiàn)需要展示在頁面,還需要關(guān)聯(lián)另一張表的數(shù)據(jù),而且產(chǎn)品要求頁面的查詢條件多達(dá)20個(gè)條件,最終,這個(gè)功能卡的要死,基本查不出來數(shù)據(jù)。
最后是打算把這兩張表的數(shù)據(jù)同時(shí)存儲(chǔ)到MongoDB中去,以提高查詢效率。
一開始同步的時(shí)候,采用單線程,循環(huán)以分頁的模式去同步這兩張表數(shù)據(jù),結(jié)果是…一晚上,只同步了30w數(shù)據(jù),特慢?。?!
最后,改造了一番,2小時(shí),就成功同步了300w+數(shù)據(jù)。
以下是主要邏輯。
線程的個(gè)數(shù)請(qǐng)根據(jù)你自己的服務(wù)器性能酌情設(shè)置。
思路
先通過count查出結(jié)果集的總條數(shù),設(shè)置每個(gè)線程分頁查詢的條數(shù),通過總條數(shù)和單次條數(shù)得到線程數(shù)量,通過改變limit的下標(biāo)實(shí)現(xiàn)分批查詢。
代碼實(shí)現(xiàn)
package com.github.admin.controller.loans; import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.github.admin.model.entity.CaseCheckCallRecord; import com.github.admin.model.entity.duyan.DuyanCallRecordDetail; import com.github.admin.model.entity.loans.CaseCallRemarkRecord; import com.github.admin.service.duyan.DuyanCallRecordDetailService; import com.github.admin.service.loans.CaseCallRemarkRecordService; import com.github.common.constant.MongodbConstant; import com.github.common.util.DingDingMsgSendUtils; import com.github.common.util.ListUtils; import com.github.common.util.Response; import com.github.common.util.concurrent.Executors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; /** ?* 多線程同步歷史數(shù)據(jù) ?* @author songfayuan ?* @date 2019-09-26 15:38 ?*/ @Slf4j @RestController @RequestMapping("/demo") public class SynchronizeHistoricalDataController implements DisposableBean { ? ? private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController"); ?//newFixedThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。 ? ? @Value("${spring.profiles.active}") ? ? private String profile; ? ? @Autowired ? ? private DuyanCallRecordDetailService duyanCallRecordDetailService; ? ? @Autowired ? ? private MongoTemplate mongoTemplate; ? ? @Autowired ? ? private CaseCallRemarkRecordService caseCallRemarkRecordService; ? ? /** ? ? ?* 多線程同步通話記錄歷史數(shù)據(jù) ? ? ?* @param params ? ? ?* @return ? ? ?* @throws Exception ? ? ?*/ ? ? @GetMapping("/syncHistoryData") ? ? public Response syncHistoryData(Map<String, Object> params) throws Exception { ? ? ? ? executor.execute(new Runnable() { ? ? ? ? ? ? @Override ? ? ? ? ? ? public void run() { ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? logicHandler(params); ? ? ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? ? ? log.warn("多線程同步稽查通話記錄歷史數(shù)據(jù)才處理異常,errMsg = {}", e); ? ? ? ? ? ? ? ? ? ? DingDingMsgSendUtils.sendDingDingGroupMsg("【系統(tǒng)消息】" + profile + "環(huán)境,多線程同步稽查通話記錄歷史數(shù)據(jù)才處理異常,errMsg = "+e); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? }); ? ? ? ? return Response.success("請(qǐng)求成功"); ? ? } ? ? /** ? ? ?* 處理數(shù)據(jù)邏輯 ? ? ?* @param params ? ? ?* @throws Exception ? ? ?*/ ? ? private void logicHandler(Map<String, Object> params) throws Exception { ? ? ? ? /******返回結(jié)果:多線程處理完的最終數(shù)據(jù)******/ ? ? ? ? List<DuyanCallRecordDetail> result = new ArrayList<>(); ? ? ? ? /******查詢數(shù)據(jù)庫總的數(shù)據(jù)條數(shù)******/ ? ? ? ? int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper<DuyanCallRecordDetail>() ? ? ? ? ? ? ? ? .eq("is_delete", 0) ? ? ? ? ? ? ? ? .eq("platform_type", 1)); ? ? ? ? DingDingMsgSendUtils.sendDingDingGroupMsg("【系統(tǒng)消息】" + profile + "環(huán)境,本次需要同步" + count + "條歷史稽查通話記錄數(shù)據(jù)。"); // ? ? ? ?int count = 2620266; ? ? ? ? /******限制每次查詢的條數(shù)******/ ? ? ? ? int num = 1000; ? ? ? ? /******計(jì)算需要查詢的次數(shù)******/ ? ? ? ? int times = count / num; ? ? ? ? if (count % num != 0) { ? ? ? ? ? ? times = times + 1; ? ? ? ? } ? ? ? ? /******每個(gè)線程開始查詢的行數(shù)******/ ? ? ? ? int offset = 0; ? ? ? ? /******添加任務(wù)******/ ? ? ? ? List<Callable<List<DuyanCallRecordDetail>>> tasks = new ArrayList<>(); ? ? ? ? for (int i = 0; i < times; i++) { ? ? ? ? ? ? Callable<List<DuyanCallRecordDetail>> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num); ? ? ? ? ? ? tasks.add(qfe); ? ? ? ? ? ? offset = offset + num; ? ? ? ? } ? ? ? ? /******為避免太多任務(wù)的最終數(shù)據(jù)全部存在list導(dǎo)致內(nèi)存溢出,故將任務(wù)再次拆分單獨(dú)處理******/ ? ? ? ? List<List<Callable<List<DuyanCallRecordDetail>>>> smallList = ListUtils.partition(tasks, 10); ? ? ? ? for (List<Callable<List<DuyanCallRecordDetail>>> callableList : smallList) { ? ? ? ? ? ? if (CollectionUtils.isNotEmpty(callableList)) { // ? ? ? ? ? ? ? ?executor.execute(new Runnable() { // ? ? ? ? ? ? ? ? ? ?@Override // ? ? ? ? ? ? ? ? ? ?public void run() { // ? ? ? ? ? ? ? ? ? ? ? ?log.info("任務(wù)拆分執(zhí)行開始:線程{}拆分處理開始...", Thread.currentThread().getName()); // // ? ? ? ? ? ? ? ? ? ? ? ?log.info("任務(wù)拆分執(zhí)行結(jié)束:線程{}拆分處理開始...", Thread.currentThread().getName()); // ? ? ? ? ? ? ? ? ? ?} // ? ? ? ? ? ? ? ?}); ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? List<Future<List<DuyanCallRecordDetail>>> futures = executor.invokeAll(callableList); ? ? ? ? ? ? ? ? ? ? /******處理線程返回結(jié)果******/ ? ? ? ? ? ? ? ? ? ? if (futures != null && futures.size() > 0) { ? ? ? ? ? ? ? ? ? ? ? ? for (Future<List<DuyanCallRecordDetail>> future : futures) { ? ? ? ? ? ? ? ? ? ? ? ? ? ? List<DuyanCallRecordDetail> duyanCallRecordDetailList = future.get(); ? ? ? ? ? ? ? ? ? ? ? ? ? ? if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){ ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? executor.execute(new Runnable() { ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? @Override ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? public void run() { ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? /******異步存儲(chǔ)******/ ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? log.info("異步存儲(chǔ)MongoDB開始:線程{}拆分處理開始...", Thread.currentThread().getName()); ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? saveMongoDB(duyanCallRecordDetailList); ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? log.info("異步存儲(chǔ)MongoDB結(jié)束:線程{}拆分處理開始...", Thread.currentThread().getName()); ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }); ? ? ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? ? ? //result.addAll(future.get()); ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? ? ? log.warn("任務(wù)拆分執(zhí)行異常,errMsg = {}", e); ? ? ? ? ? ? ? ? ? ? DingDingMsgSendUtils.sendDingDingGroupMsg("【系統(tǒng)消息】" + profile + "環(huán)境,任務(wù)拆分執(zhí)行異常,errMsg = "+e); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? } ? ? } ? ? /** ? ? ?* 數(shù)據(jù)存儲(chǔ)MongoDB ? ? ?* @param duyanCallRecordDetailList ? ? ?*/ ? ? private void saveMongoDB(List<DuyanCallRecordDetail> duyanCallRecordDetailList) { ? ? ? ? for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) { ? ? ? ? ? ? /******重復(fù)數(shù)據(jù)不同步MongoDB******/ ? ? ? ? ? ? org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query(); ? ? ? ? ? ? query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid())); ? ? ? ? ? ? List<CaseCheckCallRecord> caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD); ? ? ? ? ? ? if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) { ? ? ? ? ? ? ? ? log.warn("call_uuid = {}在MongoDB已經(jīng)存在數(shù)據(jù),后面數(shù)據(jù)將被舍棄...", duyanCallRecordDetail.getCallUuid()); ? ? ? ? ? ? ? ? continue; ? ? ? ? ? ? } ? ? ? ? ? ? /******關(guān)聯(lián)填寫的記錄******/ ? ? ? ? ? ? CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper<CaseCallRemarkRecord>() ? ? ? ? ? ? ? ? ? ? .eq("is_delete", 0) ? ? ? ? ? ? ? ? ? ? .eq("call_uuid", duyanCallRecordDetail.getCallUuid())); ? ? ? ? ? ? CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord(); ? ? ? ? ? ? BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord); ? ? ? ? ? ? //補(bǔ)充 ? ? ? ? ? ? caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId()); ? ? ? ? ? ?? ? ? ? ? ? ? if (caseCallRemarkRecord != null) { ? ? ? ? ? ? ? ? //補(bǔ)充 ? ? ? ? ? ? ? ? caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName()); ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? log.info("正在存儲(chǔ)數(shù)據(jù)到MongoDB:{}", caseCheckCallRecord.toString()); ? ? ? ? ? ? this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD); ? ? ? ? } ? ? } ? ? @Override ? ? public void destroy() throws Exception { ? ? ? ? executor.shutdown(); ? ? } } class ThredQuery implements Callable<List<DuyanCallRecordDetail>> { ? ? /******需要通過構(gòu)造方法把對(duì)應(yīng)的業(yè)務(wù)service傳進(jìn)來 實(shí)際用的時(shí)候把類型變?yōu)閷?duì)應(yīng)的類型******/ ? ? private DuyanCallRecordDetailService myService; ? ? /******查詢條件 根據(jù)條件來定義該類的屬性******/ ? ? private Map<String, Object> params; ? ? /******分頁index******/ ? ? private int offset; ? ? /******數(shù)量******/ ? ? private int num; ? ? public ThredQuery(DuyanCallRecordDetailService myService, Map<String, Object> params, int offset, int num) { ? ? ? ? this.myService = myService; ? ? ? ? this.params = params; ? ? ? ? this.offset = offset; ? ? ? ? this.num = num; ? ? } ? ? @Override ? ? public List<DuyanCallRecordDetail> call() throws Exception { ? ? ? ? /******通過service查詢得到對(duì)應(yīng)結(jié)果******/ ? ? ? ? List<DuyanCallRecordDetail> duyanCallRecordDetailList = myService.selectList(new EntityWrapper<DuyanCallRecordDetail>() ? ? ? ? ? ? ? ? .eq("is_delete", 0) ? ? ? ? ? ? ? ? .eq("platform_type", 1) ? ? ? ? ? ? ? ? .last("limit "+offset+", "+num)); ? ? ? ? return duyanCallRecordDetailList; ? ? } }
ListUtils工具
package com.github.common.util; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import java.io.*; import java.util.ArrayList; import java.util.List; /** ?* 描述:List工具類 ?* @author songfayuan ?* 2018年7月22日下午2:23:22 ?*/ @Slf4j public class ListUtils { ?? ? ?? ?/** ?? ? * 描述:list集合深拷貝 ?? ? * @param src ?? ? * @return ?? ? * @throws IOException ?? ? * @throws ClassNotFoundException ?? ? * @author songfayuan ?? ? * 2018年7月22日下午2:35:23 ?? ? */ ?? ?public static <T> List<T> deepCopy(List<T> src) { ?? ??? ?try { ?? ??? ??? ?ByteArrayOutputStream byteout = new ByteArrayOutputStream(); ?? ??? ??? ?ObjectOutputStream out = new ObjectOutputStream(byteout); ?? ??? ??? ?out.writeObject(src); ?? ??? ??? ?ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray()); ?? ??? ??? ?ObjectInputStream in = new ObjectInputStream(bytein); ?? ??? ??? ?@SuppressWarnings("unchecked") ?? ??? ??? ?List<T> dest = (List<T>) in.readObject(); ?? ??? ??? ?return dest; ?? ??? ?} catch (ClassNotFoundException e) { ?? ??? ??? ?e.printStackTrace(); ?? ??? ??? ?return null; ?? ??? ?} catch (IOException e) { ?? ??? ??? ?e.printStackTrace(); ?? ??? ??? ?return null; ?? ??? ?} ?? ?} ?? ?/** ?? ? * 描述:對(duì)象深拷貝 ?? ? * @param src ?? ? * @return ?? ? * @throws IOException ?? ? * @throws ClassNotFoundException ?? ? * @author songfayuan ?? ? * 2018年12月14日 ?? ? */ ?? ?public static <T> T objDeepCopy(T src) { ?? ??? ?try { ?? ??? ??? ?ByteArrayOutputStream byteout = new ByteArrayOutputStream(); ?? ??? ??? ?ObjectOutputStream out = new ObjectOutputStream(byteout); ?? ??? ??? ?out.writeObject(src); ?? ??? ??? ?ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray()); ?? ??? ??? ?ObjectInputStream in = new ObjectInputStream(bytein); ?? ??? ??? ?@SuppressWarnings("unchecked") ?? ??? ??? ?T dest = (T) in.readObject(); ?? ??? ??? ?return dest; ?? ??? ?} catch (ClassNotFoundException e) { ?? ??? ??? ?log.error("errMsg = {}", e); ?? ??? ??? ?return null; ?? ??? ?} catch (IOException e) { ?? ??? ??? ?log.error("errMsg = {}", e); ?? ??? ??? ?return null; ?? ??? ?} ?? ?} ?? ?/** ?? ? * 將一個(gè)list均分成n個(gè)list,主要通過偏移量來實(shí)現(xiàn)的 ?? ? * @author songfayuan ?? ? * 2018年12月14日 ?? ? */ ?? ?public static <T> List<List<T>> averageAssign(List<T> source, int n) { ?? ??? ?List<List<T>> result = new ArrayList<List<T>>(); ?? ??? ?int remaider = source.size() % n; ?//(先計(jì)算出余數(shù)) ?? ??? ?int number = source.size() / n; ?//然后是商 ?? ??? ?int offset = 0;//偏移量 ?? ??? ?for (int i = 0; i < n; i++) { ?? ??? ??? ?List<T> value = null; ?? ??? ??? ?if (remaider > 0) { ?? ??? ??? ??? ?value = source.subList(i * number + offset, (i + 1) * number + offset + 1); ?? ??? ??? ??? ?remaider--; ?? ??? ??? ??? ?offset++; ?? ??? ??? ?} else { ?? ??? ??? ??? ?value = source.subList(i * number + offset, (i + 1) * number + offset); ?? ??? ??? ?} ?? ??? ??? ?result.add(value); ?? ??? ?} ?? ??? ?return result; ?? ?} ?? ?/** ?? ? * List按指定長(zhǎng)度分割 ?? ? * @param list the list to return consecutive sublists of (需要分隔的list) ?? ? * @param size the desired size of each sublist (the last may be smaller) (分隔的長(zhǎng)度) ?? ? * @author songfayuan ?? ? * @date 2019-07-07 21:37 ?? ? */ ?? ?public static <T> List<List<T>> partition(List<T> list, int size){ ?? ??? ?return ?Lists.partition(list, size); // 使用guava ?? ?} ?? ?/** ?? ? * 測(cè)試 ?? ? * @param args ?? ? */ ?? ?public static void main(String[] args) { ?? ??? ?List<Integer> bigList = new ArrayList<>(); ?? ??? ?for (int i = 0; i < 101; i++){ ?? ??? ??? ?bigList.add(i); ?? ??? ?} ?? ??? ?log.info("bigList長(zhǎng)度為:{}", bigList.size()); ?? ??? ?log.info("bigList為:{}", bigList); ?? ??? ?List<List<Integer>> smallists = partition(bigList, 20); ?? ??? ?log.info("smallists長(zhǎng)度為:{}", smallists.size()); ?? ??? ?for (List<Integer> smallist : smallists) { ?? ??? ??? ?log.info("拆分結(jié)果:{},長(zhǎng)度為:{}", smallist, smallist.size()); ?? ??? ?} ?? ?} }
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Springboot關(guān)于自定義stater的yml無法提示問題解決方案
這篇文章主要介紹了Springboot關(guān)于自定義stater的yml無法提示問題及解決方案,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-06-06Java?超詳細(xì)講解設(shè)計(jì)模式之原型模式講解
原型模式是用于創(chuàng)建重復(fù)的對(duì)象,同時(shí)又能保證性能。這種類型的設(shè)計(jì)模式屬于創(chuàng)建型模式,它提供了一種創(chuàng)建對(duì)象的最佳方式,今天通過本文給大家介紹下Java?原型設(shè)計(jì)模式,感興趣的朋友一起看看吧2022-03-03關(guān)于Spring項(xiàng)目對(duì)JDBC的支持與基本使用詳解
這段時(shí)間一直在觀看Spring框架,所以下面這篇文章主要給大家介紹了關(guān)于Spring項(xiàng)目對(duì)JDBC的支持與基本使用的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-11-11基于SpringBoot和Vue3的博客平臺(tái)發(fā)布、編輯、刪除文章功能實(shí)現(xiàn)
在上一個(gè)教程中,我們已經(jīng)實(shí)現(xiàn)了基于Spring?Boot和Vue3的用戶注冊(cè)與登錄功能。本教程將繼續(xù)引導(dǎo)您實(shí)現(xiàn)博客平臺(tái)的發(fā)布、編輯、刪除文章功能,需要的朋友參考一下2023-04-04springBoot整合Eureka啟動(dòng)失敗的解決方案
這篇文章主要介紹了springBoot整合Eureka啟動(dòng)失敗的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07Java后臺(tái)通過Collections獲取list集合中最大數(shù),最小數(shù)代碼
這篇文章主要介紹了Java后臺(tái)通過Collections獲取list集合中最大數(shù),最小數(shù)代碼,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-08-08詳解如何在Spring?Security中自定義權(quán)限表達(dá)式
這篇文章主要和大家詳細(xì)介紹一下如何在Spring?Security中自定義權(quán)限表達(dá)式,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2022-07-07SpringBoot yaml語法與JRS303校驗(yàn)超詳細(xì)講解
YAML 是 “YAML Ain’t Markup Language”(YAML 不是一種標(biāo)記語言)的遞歸縮寫。在開發(fā)的這種語言時(shí),YAML 的意思其實(shí)是:“Yet Another Markup Language”(仍是一種標(biāo)記語言),本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-10-10