java ExecutorService CompletionService線程池區(qū)別與選擇
引言
這段時(shí)間對(duì)業(yè)務(wù)系統(tǒng)做了個(gè)性能測(cè)試,其中使用了較多線程池的技術(shù),故此做一個(gè)技術(shù)總結(jié)。
這次總結(jié)的內(nèi)容比較多,主要是四個(gè):
ExecutorService
CompletionService
Runnable
Callable
前兩個(gè)是線程池相關(guān)接口,后兩個(gè)是多線程相關(guān)接口。在最后,我會(huì)說(shuō)明什么情況下使用哪個(gè)接口,這兩類接口如何搭配使用。
Tips:個(gè)人拙見(jiàn),如有不對(duì),請(qǐng)多多指正。
一、ExecutorService
ExecutorService是一個(gè)接口,繼承自Executor。ExecutorService提供了一些常用操作和方法,但是ExecutorService是一個(gè)接口,無(wú)法實(shí)例化。
不過(guò),Java提供了一個(gè)幫助類Executors,可以快速獲取一個(gè)ExecutorService對(duì)象,并使用ExecutorService接口的一些方法。
Executors幫助類提供了多個(gè)構(gòu)造線程池的方法,常用的分為兩類:
直接執(zhí)行的
newCachedThreadPool
newFixedThreadPool
newSingleThreadExecutor
延遲或定時(shí)執(zhí)行的
newScheduledThreadPool
newSingleThreadScheduledExecutor
Executors為每種方法提供了一個(gè)線程工廠重載。
(一)newCachedThreadPool
創(chuàng)建一個(gè)默認(rèn)的線程池對(duì)象,里面的線程和重用,且在第一次使用的時(shí)候才創(chuàng)建??梢岳斫鉃榫€程優(yōu)先模式,來(lái)一個(gè)創(chuàng)一個(gè)線程,直到線程處理完成后,再處理其他的任務(wù)。
Code:
package com.macro.boot.javaBuiltThreadPool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; public class MyExecutorService { public static void main(String[] args) { // 1. 使用幫助類 // ExecutorService executorService = Executors.newCachedThreadPool(); // 2. 提交任務(wù) /* for (int i = 0; i < 20; i++) { executorService.submit(new MyRunnable(i)); }*/ // 3. 重載方法測(cè)試 test(); } private static void test() { // 1. 使用幫助類 ExecutorService executorService = Executors.newCachedThreadPool( new ThreadFactory() { int n = 1; @Override public Thread newThread(Runnable r) { return new Thread(r, "線程正在執(zhí)行 --->" + n++); } } ); // 2. 提交任務(wù) for (int i = 0; i < 20; i++) { executorService.submit(new MyRunnable(i)); } } } /** * 1. 線程類 */ class MyRunnable implements Runnable { private int id; public MyRunnable(int id) { this.id = id; } @Override public void run() { String name = Thread.currentThread().getName(); System.out.println(name + "正在執(zhí)行..." + "--->" + id); } }
輸出:幾乎是一下子就執(zhí)行了,newCachedThreadPool會(huì)創(chuàng)建和任務(wù)數(shù)同等匹配的線程,直到處理完成任務(wù)的線程可以處理新增的任務(wù)。
(二)newFixedThreadPool
Code:創(chuàng)建一個(gè)可重用固定線程數(shù)量的線程池
package com.macro.boot.javaBuiltThreadPool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /** * 創(chuàng)建一個(gè)可固定重用次數(shù)的線程池 */ public class MyNewFixedThreadPool { public static void main(String[] args) { /* // nThreads:線程數(shù)量 ExecutorService es = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { es.submit(new MyRunnable(i)); }*/ test(); } private static void test() { ExecutorService es = Executors.newFixedThreadPool(5, new ThreadFactory() { int n = 1; @Override public Thread newThread(Runnable r) { return new Thread(r, "線程" + n++); } }); // 提交任務(wù) for (int i = 0; i < 10; i++) { es.submit(new MyRunnable(i)); } } }
(三)newSingleThreadExecutor
只有一個(gè)線程(線程安全)
package com.macro.boot.javaBuiltThreadPool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; public class MyNewSingleThreadExecutor { public static void main(String[] args) throws InterruptedException { /* ExecutorService es = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { es.submit(new MyRunnable(i)); }*/ test(); } private static void test() throws InterruptedException { ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() { int n = 1; @Override public Thread newThread(Runnable r) { return new Thread(r, "線程" + n++); } }); for (int i = 0; i < 10; i++) { Thread.sleep(100); es.submit(new MyRunnable(i)); } } }
(四)newScheduledThreadPool
怎么理解這個(gè)線程池的延遲時(shí)間?很簡(jiǎn)單,第一次執(zhí)行的開(kāi)始時(shí)間,加上延遲的時(shí)間,就是第二次執(zhí)行的時(shí)間。
package com.macro.boot.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class MyScheduledExecutor { public static void main(String[] args) { ScheduledExecutorService sec = Executors.newScheduledThreadPool(4); for (int i = 0; i < 10; i++) { sec.schedule(new MyRunnable(i), 1, TimeUnit.SECONDS); } System.out.println("開(kāi)始執(zhí)行。。。"); sec.shutdown(); } } class MyRunnable implements Runnable { private int id; @Override public String toString() { return "MyRunnable{" + "id=" + id + '}'; } public MyRunnable(int id) { this.id = id; } @Override public void run() { String name = Thread.currentThread().getName(); System.out.println(name + "執(zhí)行了任務(wù)" + id); } }
(五)newSingleThreadScheduledExecutor
newSingleThreadScheduledExecutor和newScheduledThreadPool的區(qū)別是,newSingleThreadScheduledExecutor的第二次執(zhí)行時(shí)間,等于第一次開(kāi)始執(zhí)行的時(shí)間,加上執(zhí)行線程所耗費(fèi)的時(shí)間,再加上延遲時(shí)間,即等于第二次執(zhí)行的時(shí)間。
二、CompletionService
CompletionService是一個(gè)接口。
當(dāng)我們使用ExecutorService啟動(dòng)多個(gè)Callable時(shí),每個(gè)Callable返回一個(gè)Future,而當(dāng)我們執(zhí)行Future的get方法獲取結(jié)果時(shí),會(huì)阻塞線程直到獲取結(jié)果。
而CompletionService正是為了解決這個(gè)問(wèn)題,它是Java8的新增接口,它的實(shí)現(xiàn)類是ExecutorCompletionService。CompletionService會(huì)根據(jù)線程池中Task的執(zhí)行結(jié)果按執(zhí)行完成的先后順序排序,任務(wù)先完成的可優(yōu)先獲取到。
Code:
package com.macro.boot.completions; import java.util.concurrent.*; public class CompletionBoot { public static void main(String[] args) throws InterruptedException, ExecutionException { // 實(shí)例化線程池 ExecutorService es = Executors.newCachedThreadPool(); ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(es); for (int i = 0, j = 3; i < 20; i++) { ecs.submit(new CallableExample(i, j)); } for (int i = 0; i < 20; i++) { // take:阻塞方法,從結(jié)果隊(duì)列中獲取并移除一個(gè)已經(jīng)執(zhí)行完成的任務(wù)的結(jié)果,如果沒(méi)有就會(huì)阻塞,直到有任務(wù)完成返回結(jié)果。 Integer integer = ecs.take().get(); // 從結(jié)果隊(duì)列中獲取并移除一個(gè)已經(jīng)執(zhí)行完成的任務(wù)的結(jié)果,如果沒(méi)有就會(huì)返回null,該方法不會(huì)阻塞。 // Integer integer = ecs.poll().get(); System.out.println(integer); } // 不要忘記關(guān)閉線程池 es.shutdown(); } } class CallableExample implements Callable<Integer> { /** * 使用構(gòu)造方法獲取變量 * */ private int a; private int b; public CallableExample(int a, int b) { this.a = a; this.b = b; } @Override public Integer call() throws Exception { return a + b; } @Override public String toString() { return "CallableExample{" + "a=" + a + ", b=" + b + '}'; } }
三、Runnable
Runnable和Callable兩者都是接口,但是也有區(qū)別:
實(shí)現(xiàn)Callable接口的任務(wù)線程能返回執(zhí)行結(jié)果;而實(shí)現(xiàn)Runnable接口的任務(wù)線程不能返回結(jié)果;(重點(diǎn))
Callable接口的call()方法允許拋出異常;而Runnable接口的run()方法的異常只能在內(nèi)部消化,不能繼續(xù)上拋;
Code:
class MyRunnable02 implements Runnable { private int i; public MyRunnable02(int i) { this.i = i; } @Override public void run() { String name = Thread.currentThread().getName(); System.out.println(name + "執(zhí)行了... ---> " + i); } @Override public String toString() { return "MyRunnable{" + "i=" + i + '}'; } }
四、Callable
Code:
class CallableExample implements Callable<Integer> { /** * 使用構(gòu)造方法獲取變量 * */ private int a; private int b; public CallableExample(int a, int b) { this.a = a; this.b = b; } @Override public Integer call() throws Exception { return a + b; } @Override public String toString() { return "CallableExample{" + "a=" + a + ", b=" + b + '}'; } }
五、Example
本次Demo:使用線程池,循環(huán)查詢數(shù)據(jù)庫(kù)500次。
在最開(kāi)始的時(shí)候,是使用ExecutorServer + Future.get(因?yàn)椴樵償?shù)據(jù)庫(kù)肯定需要獲取結(jié)果,所以必須要用Callable,并且get到結(jié)果集)。但是get的阻塞操作,實(shí)在是太影響速度了,雖然考慮了兩種手段去解決,但是都不了了之。
Code:(只貼線程池的代碼,線程類和獲取連接的類就不放了)
private void executorServerStart() throws SQLException, ClassNotFoundException, ExecutionException, InterruptedException { // get con TDConUtils tdConUtils = new TDConUtils(); Connection con = tdConUtils.getCon(); Statement statement = con.createStatement(); // SQL String sql = "select last_row(value_double) from db1.tb1;"; // ThreadPool ExecutorService es = Executors.newCachedThreadPool(); // for each int count = 500; for (int i = 0; i < count; i++) { Future<ResultSet> submit = es.submit(new MyThread(i, con, sql)); ResultSet resultSet = submit.get(); // print while (resultSet.next()) { System.out.printf("輸出:時(shí)間:%s,值:%f \n", resultSet.getTimestamp(1) , resultSet.getDouble(2)); } } es.shutdown(); // close resources tdConUtils.close(con, statement); }
運(yùn)行時(shí)間:8000ms +
改CompletionService:
Code:
private void completionServerStart() throws SQLException, ClassNotFoundException, InterruptedException, ExecutionException { // get con TDConUtils tdConUtils = new TDConUtils(); Connection con = tdConUtils.getCon(); Statement statement = con.createStatement(); // SQL String sql = "select last_row(value_double) from db1.tb1;"; // ThreadPool ExecutorService es = Executors.newCachedThreadPool(); //構(gòu)建ExecutorCompletionService,與線程池關(guān)聯(lián) ExecutorCompletionService<ResultSet> ecs = new ExecutorCompletionService<ResultSet>(es); // for each int count = 500; for (int i = 0; i < count; i++) { ecs.submit(new MyThread(i, con, sql)); } for (int i = 0; i < count; i++) { // 通過(guò)take獲取Future結(jié)果,此方法會(huì)阻塞 ResultSet resultSet = ecs.take().get(); while (resultSet.next()) { System.out.printf("輸出:時(shí)間:%s,值:%f \n", resultSet.getTimestamp(1) , resultSet.getDouble(2)); } } es.shutdown(); tdConUtils.close(con, statement); }
運(yùn)行時(shí)間:300+ms
六、使用小結(jié)
分情況。
如果需要獲取結(jié)果:線程使用Callable;
如果需要異步獲取結(jié)果:線程池使用CompletionService。
如果不需要獲取結(jié)果:線程使用Runnable;
如果需要阻塞獲取結(jié)果:線程池使用ExecutorService。
以上就是java ExecutorService CompletionService線程池區(qū)別與選擇的詳細(xì)內(nèi)容,更多關(guān)于ExecutorService CompletionService的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java實(shí)現(xiàn)隨機(jī)驗(yàn)證碼功能實(shí)例代碼
在這里,我們使用servlet來(lái)實(shí)現(xiàn)隨機(jī)驗(yàn)證碼的實(shí)現(xiàn),有需要的朋友可以參考一下2013-08-08java synchronized同步靜態(tài)方法和同步非靜態(tài)方法的異同
這篇文章主要介紹了java synchronized同步靜態(tài)方法和同步非靜態(tài)方法的異同的相關(guān)資料,需要的朋友可以參考下2017-01-01詳解SpringBoot 解決攔截器注入Service為空問(wèn)題
這篇文章主要介紹了詳解SpringBoot 解決攔截器注入Service為空問(wèn)題的解決,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-06-06