Java?多線程并發(fā)編程提高數據處理效率的詳細過程
??工作場景中遇到這樣一個需求:根據主機的 IP 地址聯動更新其他模型的相關信息。需求很簡單,只涉及一般的數據庫聯動查詢以及更新操作,然而在編碼實現過程中發(fā)現,由于主機的數量很多,導致循環(huán)遍歷查詢、更新時花費很長的時間,調用一次接口大概需要 30-40 min 時間才能完成操作。
??因此,為了有效縮短接口方法的執(zhí)行時間,便考慮使用多線程并發(fā)編程方法,利用多核處理器并行執(zhí)行的能力,通過異步處理數據的方式,便可以大大縮短執(zhí)行時間,提高執(zhí)行效率。
??這里使用可重用固定線程數的線程池 FixedThreadPool,并使用 CountDownLatch 并發(fā)工具類提供的并發(fā)流程控制工具作為配合使用,保證多線程并發(fā)編程過程中的正常運行:
- 首先,通過
Runtime.getRuntime().availableProcessors()方法獲取運行機器的 CPU 線程數,用于后續(xù)設置固定線程池的線程數量。 - 其次,判斷任務的特性,如果為計算密集型任務則設置線程數為
CPU 線程數+1,如果為 IO 密集型任務則設置線程數為2 * CPU 線程數,由于在方法中需要與數據庫進行頻繁的交互,因此屬于 IO 密集型任務。 - 之后,對數據進行分組切割,每個線程處理一個分組的數據,分組的組數與線程數保持一致,并且還要創(chuàng)建計數器對象
CountDownLatch,調用構造函數,初始化參數值為線程數個數,保證主線程等待所有子線程運行結束后,再進行后續(xù)的操作。 - 然后,調用
executorService.execute()方法,重寫run方法編寫業(yè)務邏輯與數據處理代碼,執(zhí)行完當前線程后記得將計數器減1操作。 - 最后,當所有子線程執(zhí)行完成后,關閉線程池。
?在省略工作場景中的業(yè)務邏輯代碼后,通用的處理方法示例如下所示:
public ResponseData updateHostDept() {
// ...
List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
// split the hostMapList for the following multi-threads task
// return the number of logical CPUs
int processorsNum = Runtime.getRuntime().availableProcessors();
// set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
// if Computing Tasks set the threadNum as (the number of logical CPUs) + 1
int threadNum = processorsNum * 2;
// the number of each group data
int eachGroupNum = hostMapList.size() / threadNum;
List<List<Map>> groupList = new ArrayList<>();
for (int i = 0; i < threadNum; i++) {
int start = i * eachGroupNum;
if (i == threadNum - 1) {
int end = mapList.size();
groupList.add(hostMapList.subList(start, end));
} else {
int end = (i+1) * eachGroupNum;
groupList.add(hostMapList.subList(start, end));
}
}
// update data by using multi-threads asynchronously
ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2);
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
for (List<Map> group : groupList) {
executorService.execute(()->{
try {
for (Map map : group) {
// update the data in mongodb
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// let counter minus one
countDownLatch.countDown();
}
});
}
try {
// main thread donnot execute until all child threads finish
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
// remember to shutdown the threadPool
executorService.shutdown();
return ResponseData.success();
}
??那么在使用多線程異步更新的策略后,從當初調用接口所需的大概時間為 30-40 min 下降到了 8-10 min,大大提高了執(zhí)行效率。
??需要注意的是,這里使用的
newFixedThreadPool創(chuàng)建線程池,它有一個缺陷就是,它的阻塞隊列默認是一個無界隊列,默認值為Integer.MAX_VALUE極有可能會造成 OOM 問題。因此,一般可以使用ThreadPoolExecutor來創(chuàng)建線程池,自己可以指定等待隊列中的線程個數,避免產生 OOM 問題。
public ResponseData updateHostDept() {
// ...
List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
// split the hostMapList for the following multi-threads task
// return the number of logical CPUs
int processorsNum = Runtime.getRuntime().availableProcessors();
// set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
// if Computing Tasks set the threadNum as (the number of logical CPUs) + 1
int threadNum = processorsNum * 2;
// the number of each group data
int eachGroupNum = hostMapList.size() / threadNum;
List<List<Map>> groupList = new ArrayList<>();
for (int i = 0; i < threadNum; i++) {
int start = i * eachGroupNum;
if (i == threadNum - 1) {
int end = mapList.size();
groupList.add(hostMapList.subList(start, end));
} else {
int end = (i+1) * eachGroupNum;
groupList.add(hostMapList.subList(start, end));
}
}
// update data by using multi-threads asynchronously
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100));
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
for (List<Map> group : groupList) {
executor.execute(()->{
try {
for (Map map : group) {
// update the data in mongodb
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// let counter minus one
countDownLatch.countDown();
}
});
}
try {
// main thread donnot execute until all child threads finish
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
// remember to shutdown the threadPool
executor.shutdown();
return ResponseData.success();
}在上述的代碼中,核心線程數和最大線程數分別為 5 和 8,并沒有設置的很大的值,因為如果如果設置的很大,線程間頻繁的上下文切換也會增加時間消耗,反而不能最大程度上發(fā)揮多線程的優(yōu)勢。至于如何選擇合適的參數,需要根據機器的參數以及任務的類型綜合考慮決定。
??最后補充一點,如果想要通過非編碼的方式獲取機器的 CPU 線程個數也很簡單,windows 系統(tǒng)通過任務管理器,選擇 “性能”,便可以查看 CPU 線程個數的情況,如下圖所示:

??從上圖可以看到,我的機器中內核是八個 CPU,但是通過超線程技術一個物理的 CPU 核心可以模擬成兩個邏輯 CPU 線程,因此我的機器是支持8核16線程的。
到此這篇關于Java 多線程并發(fā)編程提高數據處理效率的文章就介紹到這了,更多相關Java 多線程提高數據處理效率內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
JAVA8 List<List<Integer>> list中再裝一個list轉成一個list操
這篇文章主要介紹了JAVA8 List<List<Integer>> list中再裝一個list轉成一個list操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08
maven 在執(zhí)行package,install,deploy時使用clean與不使用clean的不同之處
有時候用mvn install后,新改的內容不生效,一定要后來使用mvn clean install 才生效,由于之前沒有做記錄,以及記不清是什么情況下才會出現的問題,于是想看看clean和不clean的區(qū)別,感興趣的朋友跟隨小編一起看看吧2021-08-08
spring?boot?validation參數校驗與分組嵌套各種類型及使用小結
參數校驗基本上是controller必做的事情,畢竟前端傳過來的一切都不可信,validation可以簡化這一操作,這篇文章主要介紹了spring?boot?validation參數校驗分組嵌套各種類型及使用小結,需要的朋友可以參考下2023-09-09

