Java多線程實(shí)現(xiàn)第三方數(shù)據(jù)同步
本文實(shí)例為大家分享了Java多線程實(shí)現(xiàn)第三方數(shù)據(jù)同步的具體代碼,供大家參考,具體內(nèi)容如下
一、場景
最近的一項開發(fā)任務(wù)是同步第三方數(shù)據(jù),而第三方數(shù)據(jù)一般有存量數(shù)據(jù)和增量數(shù)據(jù),存量數(shù)據(jù)有100w+。在得知此需求時,進(jìn)行了一定的信息檢索和工具學(xué)習(xí),提前獲取存量數(shù)據(jù)到目標(biāo)庫,再使用kettle進(jìn)行存量數(shù)據(jù)轉(zhuǎn)換;增量數(shù)據(jù)則根據(jù)業(yè)務(wù)方規(guī)定的請求時間,通過定時任務(wù)去獲取增量數(shù)據(jù)并進(jìn)行數(shù)據(jù)轉(zhuǎn)換。在數(shù)據(jù)獲取和轉(zhuǎn)換時,我們應(yīng)該要記錄每一次的請求信息,便于溯源和數(shù)據(jù)對賬!!!
二、獲取數(shù)據(jù)的方式
2.1 遞歸方式
使用遞歸方式時,要求數(shù)據(jù)量少,否則會出現(xiàn)棧溢出或堆溢出!!!并且遞歸方式是單線程,所以會導(dǎo)致同步速度很慢!!!
/** ?? ? * 數(shù)據(jù)同步 - 遞歸方式 ?? ? * 此處存量數(shù)據(jù)只需要請求到數(shù)據(jù)并保存數(shù)據(jù)庫即可,后期通過kettle進(jìn)行轉(zhuǎn)換。 ?? ? * Data為自定義實(shí)體類,這里僅做示例!!! */ ?? ?private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception { ? ? ? ? log.info("【數(shù)據(jù)同步 - 存量】,第{}次同步,", pageIndex); ? ? ? ? List<Data> datas= getDataByPage(pageIndex,pageSize); ? ? ? ? if (CollectionUtils.isNotEmpty(datas)) { ? ? ? ? ? ? dataService.saveOrUpdateBatch(datas); ? ? ? ? ? ? log.info("【數(shù)據(jù)同步 - 存量】,第{}次同步,同步成功", pageIndex); ? ? ? ? ? ? if (datas.size() < pageSize) { ? ? ? ? ? ? ? ? log.info("【數(shù)據(jù)同步 - 存量】,第{}次同步,獲取數(shù)據(jù)小于每頁獲取條數(shù),證明已全部同步完畢!!!", pageIndex); ? ? ? ? ? ? ? ? return; ? ? ? ? ? ? } ? ? ? ? ? ? // 遞歸操作-直到數(shù)據(jù)同步完畢 ? ? ? ? ? ? fetchAndSaveDB(pageIndex + 1, pageSize); ? ? ? ? } else { ? ? ? ? ? ? log.info("【數(shù)據(jù)同步 - 存量】,第{}次同步,獲取數(shù)據(jù)為空,證明已全部同步完畢!!!", pageIndex); ? ? ? ? ? ? return; ? ? ? ? } ?? ?} ?? ?/**? ?? ? * 獲取分頁數(shù)據(jù),Data為自定義實(shí)體類,這里僅做示例!!! ?? ? */ ? ? private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception { ? ? ?? ?//通過feign調(diào)用第三方接口獲取數(shù)據(jù) ? ? ? ? String data = dataFeignService.fetchAllData(pageSize, pageIndex); ? ? ? ? JSONObject jsonObject = JSONObject.parseObject(data); ? ? ? ? JSONArray datalist = jsonObject.getJSONArray("datalist"); ? ? ? ? List<Data> datas = datalist.toJavaList(Data.class); ? ? ? ? return datas; ? ? }
2.2 多線程方式
由于遞歸方式是單線程,考慮到數(shù)據(jù)的龐大,且易造成內(nèi)存溢出,因此將遞歸更換成多線程方式,不僅避免了內(nèi)存溢出的情況,且速度大大的提升!!!
public void synAllData() { ? ? ??? ?// 定義原子變量 - 頁數(shù) ? ? ?? ?AtomicInteger pageIndex = new AtomicInteger(0); ? ? ??? ?// 創(chuàng)建線程池 ? ? ??? ?ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); ? ? ? ? // 100萬數(shù)據(jù) ? ? ? ? int total = 1000000;//數(shù)據(jù)總量 ? ? ? ? int times = total / 1000; ? ? ? ? if (total % 1000!= 0) { ? ? ? ? ? ? times = times + 1; ? ? ? ? } ? ? ? ? LocalDateTime beginLocalDateTime = LocalDateTime.now(); ? ? ? ? log.info("【數(shù)據(jù)同步 - 存量】開始同步時間:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); ? ? ? ? for (int index = 1; index <= times; index++) { ? ? ? ? ? ? fixedThreadPool.submit(new Runnable() { ? ? ? ? ? ? ? ? @Override ? ? ? ? ? ? ? ? public void run() { ? ? ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? ? ? multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000); ? ? ? ? ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? ? ? ? ? log.error("并發(fā)獲取并保存數(shù)據(jù)異常:{}", e); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } ? ? ? ? ? ? }); ? ? ? ? } ? ? ? ? LocalDateTime endLocalDateTime = LocalDateTime.now(); ? ? ? ? log.info("【數(shù)據(jù)同步 - 存量】同步結(jié)束時間:{},總共耗時:{}分鐘", ? ? ? ? ? ? ? ? endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), ? ? ? ? ? ? ? ? Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes()); ? ? } ?? ?/** ? ? ?* 數(shù)據(jù)同步 - 【多線程方式】 ? ? ?* ? ? ?* @throws Exception ? ? ?*/ ? ? private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception { ? ? ? ? log.info("【數(shù)據(jù)同步 - 存量】,第{}次同步,", pageIndex); ? ? ? ? List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1 ? ? ? ? if (CollectionUtils.isNotEmpty(datas)) { ? ? ? ? ? ? log.info("【數(shù)據(jù)同步 - 存量】,第{}次同步,同步成功", pageIndex); ? ? ? ? ? ? if (datas.size() < pageSize) { ? ? ? ? ? ? ? ? log.info("【數(shù)據(jù)同步 - 存量】,第{}次同步,獲取數(shù)據(jù)小于每頁獲取條數(shù),證明已全部同步完畢!!!", pageIndex); ? ? ? ? ? ? ? ? return; ? ? ? ? ? ? } ? ? ? ? } else { ? ? ? ? ? ? log.info("【數(shù)據(jù)同步 - 存量】,第{}次同步,獲取數(shù)據(jù)為空,證明已全部同步完畢!!!", pageIndex); ? ? ? ? ? ? return; ? ? ? ? } ? ? }
三、增量數(shù)據(jù)如何對接
增量數(shù)據(jù)需要寫定時任務(wù),可使用Scheduled注解,并需要將增量數(shù)據(jù)存放到目標(biāo)庫中且進(jìn)行數(shù)據(jù)轉(zhuǎn)換!!!此處就不再提供代碼,可以參考上面的存量數(shù)據(jù)的方式編寫!!!
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
java?Springboot實(shí)現(xiàn)教務(wù)管理系統(tǒng)
這篇文章主要介紹了java?Springboot實(shí)現(xiàn)教務(wù)管理系統(tǒng)的過程,文章圍繞實(shí)現(xiàn)過程展開全文詳細(xì)內(nèi)容,具有一定的參考價值,需要的朋友可以參考一下,希望對你有所幫助2021-11-11Java FileDescriptor總結(jié)_動力節(jié)點(diǎn)Java學(xué)院整理
FileDescriptor 是“文件描述符”??梢员挥脕肀硎鹃_放文件、開放套接字等。接下來通過本文給大家分享Java FileDescriptor總結(jié),感興趣的朋友一起學(xué)習(xí)吧2017-05-05SpringMVC中@RequestMapping注解用法實(shí)例
通過@RequestMapping注解可以定義不同的處理器映射規(guī)則,下面這篇文章主要給大家介紹了關(guān)于SpringMVC中@RequestMapping注解用法的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-06-06