SpringBoot 多任務(wù)并行+線程池處理的實(shí)現(xiàn)
前言
前幾篇文章著重介紹了后端服務(wù)數(shù)據(jù)庫和多線程并行處理優(yōu)化,并示例了改造前后的偽代碼邏輯。當(dāng)然了,優(yōu)化是無止境的,前人栽樹后人乘涼。作為我們開發(fā)者來說,既然站在了巨人的肩膀上,就要寫出更加優(yōu)化的程序。
SpringBoot開發(fā)案例之JdbcTemplate批量操作
SpringBoot開發(fā)案例之CountDownLatch多任務(wù)并行處理
改造
理論上講,線程越多程序可能更快,但是在實(shí)際使用中我們需要考慮到線程本身的創(chuàng)建以及銷毀的資源消耗,以及保護(hù)操作系統(tǒng)本身的目的。我們通常需要將線程限制在一定的范圍之類,線程池就起到了這樣的作用。
程序邏輯
多任務(wù)并行+線程池處理.png
一張圖能解決的問題,就應(yīng)該盡可能的少BB,當(dāng)然底層原理性的東西還是需要大家去記憶并理解的。
Java 線程池
Java通過Executors提供四種線程池,分別為:
- newCachedThreadPool創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
- newFixedThreadPool 創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊(duì)列中等待。
- newScheduledThreadPool 創(chuàng)建一個定長線程池,支持定時及周期性任務(wù)執(zhí)行。
- newSingleThreadExecutor 創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
優(yōu)點(diǎn)
- 重用存在的線程,減少對象創(chuàng)建、消亡的開銷,性能佳。
- 可有效控制最大并發(fā)線程數(shù),提高系統(tǒng)資源的使用率,同時避免過多資源競爭,避免堵塞。
- 提供定時執(zhí)行、定期執(zhí)行、單線程、并發(fā)數(shù)控制等功能。
代碼實(shí)現(xiàn)
方式一(CountDownLatch)
/** * 多任務(wù)并行+線程池統(tǒng)計(jì) * 創(chuàng)建時間 2018年4月17日 */ public class StatsDemo { final static SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); final static String startTime = sdf.format(new Date()); /** * IO密集型任務(wù) = 一般為2*CPU核心數(shù)(常出現(xiàn)于線程中:數(shù)據(jù)庫數(shù)據(jù)交互、文件上傳下載、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)鹊龋? * CPU密集型任務(wù) = 一般為CPU核心數(shù)+1(常出現(xiàn)于線程中:復(fù)雜算法) * 混合型任務(wù) = 視機(jī)器配置和復(fù)雜度自測而定 */ private static int corePoolSize = Runtime.getRuntime().availableProcessors(); /** * public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, * TimeUnit unit,BlockingQueue<Runnable> workQueue) * corePoolSize用于指定核心線程數(shù)量 * maximumPoolSize指定最大線程數(shù) * keepAliveTime和TimeUnit指定線程空閑后的最大存活時間 * workQueue則是線程池的緩沖隊(duì)列,還未執(zhí)行的線程會在隊(duì)列中等待 * 監(jiān)控隊(duì)列長度,確保隊(duì)列有界 * 不當(dāng)?shù)木€程池大小會使得處理速度變慢,穩(wěn)定性下降,并且導(dǎo)致內(nèi)存泄露。如果配置的線程過少,則隊(duì)列會持續(xù)變大,消耗過多內(nèi)存。 * 而過多的線程又會 由于頻繁的上下文切換導(dǎo)致整個系統(tǒng)的速度變緩——殊途而同歸。隊(duì)列的長度至關(guān)重要,它必須得是有界的,這樣如果線程池不堪重負(fù)了它可以暫時拒絕掉新的請求。 * ExecutorService 默認(rèn)的實(shí)現(xiàn)是一個無界的 LinkedBlockingQueue。 */ private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000)); public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); //使用execute方法 executor.execute(new Stats("任務(wù)A", 1000, latch)); executor.execute(new Stats("任務(wù)B", 1000, latch)); executor.execute(new Stats("任務(wù)C", 1000, latch)); executor.execute(new Stats("任務(wù)D", 1000, latch)); executor.execute(new Stats("任務(wù)E", 1000, latch)); latch.await();// 等待所有人任務(wù)結(jié)束 System.out.println("所有的統(tǒng)計(jì)任務(wù)執(zhí)行完成:" + sdf.format(new Date())); } static class Stats implements Runnable { String statsName; int runTime; CountDownLatch latch; public Stats(String statsName, int runTime, CountDownLatch latch) { this.statsName = statsName; this.runTime = runTime; this.latch = latch; } public void run() { try { System.out.println(statsName+ " do stats begin at "+ startTime); //模擬任務(wù)執(zhí)行時間 Thread.sleep(runTime); System.out.println(statsName + " do stats complete at "+ sdf.format(new Date())); latch.countDown();//單次任務(wù)結(jié)束,計(jì)數(shù)器減一 } catch (InterruptedException e) { e.printStackTrace(); } } } }
方式二(Future)
/** * 多任務(wù)并行+線程池統(tǒng)計(jì) * 創(chuàng)建時間 2018年4月17日 */ public class StatsDemo { final static SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); final static String startTime = sdf.format(new Date()); /** * IO密集型任務(wù) = 一般為2*CPU核心數(shù)(常出現(xiàn)于線程中:數(shù)據(jù)庫數(shù)據(jù)交互、文件上傳下載、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)鹊龋? * CPU密集型任務(wù) = 一般為CPU核心數(shù)+1(常出現(xiàn)于線程中:復(fù)雜算法) * 混合型任務(wù) = 視機(jī)器配置和復(fù)雜度自測而定 */ private static int corePoolSize = Runtime.getRuntime().availableProcessors(); /** * public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, * TimeUnit unit,BlockingQueue<Runnable> workQueue) * corePoolSize用于指定核心線程數(shù)量 * maximumPoolSize指定最大線程數(shù) * keepAliveTime和TimeUnit指定線程空閑后的最大存活時間 * workQueue則是線程池的緩沖隊(duì)列,還未執(zhí)行的線程會在隊(duì)列中等待 * 監(jiān)控隊(duì)列長度,確保隊(duì)列有界 * 不當(dāng)?shù)木€程池大小會使得處理速度變慢,穩(wěn)定性下降,并且導(dǎo)致內(nèi)存泄露。如果配置的線程過少,則隊(duì)列會持續(xù)變大,消耗過多內(nèi)存。 * 而過多的線程又會 由于頻繁的上下文切換導(dǎo)致整個系統(tǒng)的速度變緩——殊途而同歸。隊(duì)列的長度至關(guān)重要,它必須得是有界的,這樣如果線程池不堪重負(fù)了它可以暫時拒絕掉新的請求。 * ExecutorService 默認(rèn)的實(shí)現(xiàn)是一個無界的 LinkedBlockingQueue。 */ private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000)); public static void main(String[] args) throws InterruptedException { List<Future<String>> resultList = new ArrayList<Future<String>>(); //使用submit提交異步任務(wù),并且獲取返回值為future resultList.add(executor.submit(new Stats("任務(wù)A", 1000))); resultList.add(executor.submit(new Stats("任務(wù)B", 1000))); resultList.add(executor.submit(new Stats("任務(wù)C", 1000))); resultList.add(executor.submit(new Stats("任務(wù)D", 1000))); resultList.add(executor.submit(new Stats("任務(wù)E", 1000))); //遍歷任務(wù)的結(jié)果 for (Future<String> fs : resultList) { try { System.out.println(fs.get());//打印各個線任務(wù)執(zhí)行的結(jié)果,調(diào)用future.get() 阻塞主線程,獲取異步任務(wù)的返回結(jié)果 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally { //啟動一次順序關(guān)閉,執(zhí)行以前提交的任務(wù),但不接受新任務(wù)。如果已經(jīng)關(guān)閉,則調(diào)用沒有其他作用。 executor.shutdown(); } } System.out.println("所有的統(tǒng)計(jì)任務(wù)執(zhí)行完成:" + sdf.format(new Date())); } static class Stats implements Callable<String> { String statsName; int runTime; public Stats(String statsName, int runTime) { this.statsName = statsName; this.runTime = runTime; } public String call() { try { System.out.println(statsName+ " do stats begin at "+ startTime); //模擬任務(wù)執(zhí)行時間 Thread.sleep(runTime); System.out.println(statsName + " do stats complete at "+ sdf.format(new Date())); } catch (InterruptedException e) { e.printStackTrace(); } return call(); } } }
執(zhí)行時間
以上代碼,均是偽代碼,下面是2000+個學(xué)生的真實(shí)測試記錄。
2018-04-17 17:42:29.284 INFO 測試記錄81e51ab031eb4ada92743ddf66528d82-單線程順序執(zhí)行,花費(fèi)時間:3797
2018-04-17 17:42:31.452 INFO 測試記錄81e51ab031eb4ada92743ddf66528d82-多線程并行任務(wù),花費(fèi)時間:2167
2018-04-17 17:42:33.170 INFO 測試記錄81e51ab031eb4ada92743ddf66528d82-多線程并行任務(wù)+線程池,花費(fèi)時間:1717
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot打jar包遇到的xml文件丟失的解決方案
這篇文章主要介紹了SpringBoot打jar包遇到的xml文件丟失的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09springcloud教程之zuul路由網(wǎng)關(guān)的實(shí)現(xiàn)
這篇文章主要介紹了springcloud教程之zuul路由網(wǎng)關(guān)的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-02-02詳解Spring整合Quartz實(shí)現(xiàn)動態(tài)定時任務(wù)
本篇文章主要介紹了詳解Spring整合Quartz實(shí)現(xiàn)動態(tài)定時任務(wù),具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-03-03Java數(shù)據(jù)結(jié)構(gòu)二叉樹難點(diǎn)解析
樹是一種重要的非線性數(shù)據(jù)結(jié)構(gòu),直觀地看,它是數(shù)據(jù)元素(在樹中稱為結(jié)點(diǎn))按分支關(guān)系組織起來的結(jié)構(gòu),很象自然界中的樹那樣。樹結(jié)構(gòu)在客觀世界中廣泛存在,如人類社會的族譜和各種社會組織機(jī)構(gòu)都可用樹形象表示2021-10-10Java Socket實(shí)現(xiàn)聊天室附1500行源代碼
Socket是應(yīng)用層與TCP/IP協(xié)議族通信的中間軟件抽象層,它是一組接口。本篇文章手把手帶你通過Java Socket來實(shí)現(xiàn)自己的聊天室,大家可以在過程中查缺補(bǔ)漏,溫故而知新2021-10-10java 讀取網(wǎng)頁內(nèi)容的實(shí)例詳解
這篇文章主要介紹了java 讀取網(wǎng)頁內(nèi)容的實(shí)例詳解的相關(guān)資料,希望通過本文能幫助到大家,讓大家學(xué)習(xí)理解這部分內(nèi)容,需要的朋友可以參考下2017-09-09Java Swing樹狀組件JTree用法實(shí)例詳解
這篇文章主要介紹了Java Swing樹狀組件JTree用法,結(jié)合具體實(shí)例形式分析了Swing組件JTree構(gòu)成樹狀列表的節(jié)點(diǎn)設(shè)置與事件響應(yīng),以及自定義圖形節(jié)點(diǎn)的相關(guān)操作技巧,需要的朋友可以參考下2017-11-11