java ExecutorService CompletionService線程池區(qū)別與選擇
引言
這段時間對業(yè)務系統(tǒng)做了個性能測試,其中使用了較多線程池的技術,故此做一個技術總結。
這次總結的內(nèi)容比較多,主要是四個:
ExecutorService
CompletionService
Runnable
Callable
前兩個是線程池相關接口,后兩個是多線程相關接口。在最后,我會說明什么情況下使用哪個接口,這兩類接口如何搭配使用。
Tips:個人拙見,如有不對,請多多指正。
一、ExecutorService
ExecutorService是一個接口,繼承自Executor。ExecutorService提供了一些常用操作和方法,但是ExecutorService是一個接口,無法實例化。
不過,Java提供了一個幫助類Executors,可以快速獲取一個ExecutorService對象,并使用ExecutorService接口的一些方法。
Executors幫助類提供了多個構造線程池的方法,常用的分為兩類:
直接執(zhí)行的
newCachedThreadPool
newFixedThreadPool
newSingleThreadExecutor
延遲或定時執(zhí)行的
newScheduledThreadPool
newSingleThreadScheduledExecutor
Executors為每種方法提供了一個線程工廠重載。
(一)newCachedThreadPool
創(chuàng)建一個默認的線程池對象,里面的線程和重用,且在第一次使用的時候才創(chuàng)建??梢岳斫鉃榫€程優(yōu)先模式,來一個創(chuàng)一個線程,直到線程處理完成后,再處理其他的任務。
Code:
package com.macro.boot.javaBuiltThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class MyExecutorService {
public static void main(String[] args) {
// 1. 使用幫助類
// ExecutorService executorService = Executors.newCachedThreadPool();
// 2. 提交任務
/* for (int i = 0; i < 20; i++) {
executorService.submit(new MyRunnable(i));
}*/
// 3. 重載方法測試
test();
}
private static void test() {
// 1. 使用幫助類
ExecutorService executorService = Executors.newCachedThreadPool(
new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "線程正在執(zhí)行 --->" + n++);
}
}
);
// 2. 提交任務
for (int i = 0; i < 20; i++) {
executorService.submit(new MyRunnable(i));
}
}
}
/**
* 1. 線程類
*/
class MyRunnable implements Runnable {
private int id;
public MyRunnable(int id) {
this.id = id;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + "正在執(zhí)行..." + "--->" + id);
}
}輸出:幾乎是一下子就執(zhí)行了,newCachedThreadPool會創(chuàng)建和任務數(shù)同等匹配的線程,直到處理完成任務的線程可以處理新增的任務。
(二)newFixedThreadPool
Code:創(chuàng)建一個可重用固定線程數(shù)量的線程池
package com.macro.boot.javaBuiltThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* 創(chuàng)建一個可固定重用次數(shù)的線程池
*/
public class MyNewFixedThreadPool {
public static void main(String[] args) {
/* // nThreads:線程數(shù)量
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
es.submit(new MyRunnable(i));
}*/
test();
}
private static void test() {
ExecutorService es = Executors.newFixedThreadPool(5, new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "線程" + n++);
}
});
// 提交任務
for (int i = 0; i < 10; i++) {
es.submit(new MyRunnable(i));
}
}
}(三)newSingleThreadExecutor
只有一個線程(線程安全)
package com.macro.boot.javaBuiltThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class MyNewSingleThreadExecutor {
public static void main(String[] args) throws InterruptedException {
/* ExecutorService es = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
es.submit(new MyRunnable(i));
}*/
test();
}
private static void test() throws InterruptedException {
ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "線程" + n++);
}
});
for (int i = 0; i < 10; i++) {
Thread.sleep(100);
es.submit(new MyRunnable(i));
}
}
}(四)newScheduledThreadPool
怎么理解這個線程池的延遲時間?很簡單,第一次執(zhí)行的開始時間,加上延遲的時間,就是第二次執(zhí)行的時間。
package com.macro.boot.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MyScheduledExecutor {
public static void main(String[] args) {
ScheduledExecutorService sec = Executors.newScheduledThreadPool(4);
for (int i = 0; i < 10; i++) {
sec.schedule(new MyRunnable(i), 1, TimeUnit.SECONDS);
}
System.out.println("開始執(zhí)行。。。");
sec.shutdown();
}
}
class MyRunnable implements Runnable {
private int id;
@Override
public String toString() {
return "MyRunnable{" +
"id=" + id +
'}';
}
public MyRunnable(int id) {
this.id = id;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + "執(zhí)行了任務" + id);
}
}(五)newSingleThreadScheduledExecutor
newSingleThreadScheduledExecutor和newScheduledThreadPool的區(qū)別是,newSingleThreadScheduledExecutor的第二次執(zhí)行時間,等于第一次開始執(zhí)行的時間,加上執(zhí)行線程所耗費的時間,再加上延遲時間,即等于第二次執(zhí)行的時間。
二、CompletionService
CompletionService是一個接口。
當我們使用ExecutorService啟動多個Callable時,每個Callable返回一個Future,而當我們執(zhí)行Future的get方法獲取結果時,會阻塞線程直到獲取結果。
而CompletionService正是為了解決這個問題,它是Java8的新增接口,它的實現(xiàn)類是ExecutorCompletionService。CompletionService會根據(jù)線程池中Task的執(zhí)行結果按執(zhí)行完成的先后順序排序,任務先完成的可優(yōu)先獲取到。
Code:
package com.macro.boot.completions;
import java.util.concurrent.*;
public class CompletionBoot {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 實例化線程池
ExecutorService es = Executors.newCachedThreadPool();
ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(es);
for (int i = 0, j = 3; i < 20; i++) {
ecs.submit(new CallableExample(i, j));
}
for (int i = 0; i < 20; i++) {
// take:阻塞方法,從結果隊列中獲取并移除一個已經(jīng)執(zhí)行完成的任務的結果,如果沒有就會阻塞,直到有任務完成返回結果。
Integer integer = ecs.take().get();
// 從結果隊列中獲取并移除一個已經(jīng)執(zhí)行完成的任務的結果,如果沒有就會返回null,該方法不會阻塞。
// Integer integer = ecs.poll().get();
System.out.println(integer);
}
// 不要忘記關閉線程池
es.shutdown();
}
}
class CallableExample implements Callable<Integer> {
/**
* 使用構造方法獲取變量
* */
private int a;
private int b;
public CallableExample(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public Integer call() throws Exception {
return a + b;
}
@Override
public String toString() {
return "CallableExample{" +
"a=" + a +
", b=" + b +
'}';
}
}三、Runnable
Runnable和Callable兩者都是接口,但是也有區(qū)別:
實現(xiàn)Callable接口的任務線程能返回執(zhí)行結果;而實現(xiàn)Runnable接口的任務線程不能返回結果;(重點)
Callable接口的call()方法允許拋出異常;而Runnable接口的run()方法的異常只能在內(nèi)部消化,不能繼續(xù)上拋;
Code:
class MyRunnable02 implements Runnable {
private int i;
public MyRunnable02(int i) {
this.i = i;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + "執(zhí)行了... ---> " + i);
}
@Override
public String toString() {
return "MyRunnable{" +
"i=" + i +
'}';
}
}四、Callable
Code:
class CallableExample implements Callable<Integer> {
/**
* 使用構造方法獲取變量
* */
private int a;
private int b;
public CallableExample(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public Integer call() throws Exception {
return a + b;
}
@Override
public String toString() {
return "CallableExample{" +
"a=" + a +
", b=" + b +
'}';
}
}
五、Example
本次Demo:使用線程池,循環(huán)查詢數(shù)據(jù)庫500次。
在最開始的時候,是使用ExecutorServer + Future.get(因為查詢數(shù)據(jù)庫肯定需要獲取結果,所以必須要用Callable,并且get到結果集)。但是get的阻塞操作,實在是太影響速度了,雖然考慮了兩種手段去解決,但是都不了了之。
Code:(只貼線程池的代碼,線程類和獲取連接的類就不放了)
private void executorServerStart() throws SQLException, ClassNotFoundException, ExecutionException, InterruptedException {
// get con
TDConUtils tdConUtils = new TDConUtils();
Connection con = tdConUtils.getCon();
Statement statement = con.createStatement();
// SQL
String sql = "select last_row(value_double) from db1.tb1;";
// ThreadPool
ExecutorService es = Executors.newCachedThreadPool();
// for each
int count = 500;
for (int i = 0; i < count; i++) {
Future<ResultSet> submit = es.submit(new MyThread(i, con, sql));
ResultSet resultSet = submit.get();
// print
while (resultSet.next()) {
System.out.printf("輸出:時間:%s,值:%f \n", resultSet.getTimestamp(1)
, resultSet.getDouble(2));
}
}
es.shutdown();
// close resources
tdConUtils.close(con, statement);
}運行時間:8000ms +
改CompletionService:
Code:
private void completionServerStart() throws SQLException, ClassNotFoundException, InterruptedException, ExecutionException {
// get con
TDConUtils tdConUtils = new TDConUtils();
Connection con = tdConUtils.getCon();
Statement statement = con.createStatement();
// SQL
String sql = "select last_row(value_double) from db1.tb1;";
// ThreadPool
ExecutorService es = Executors.newCachedThreadPool();
//構建ExecutorCompletionService,與線程池關聯(lián)
ExecutorCompletionService<ResultSet> ecs = new ExecutorCompletionService<ResultSet>(es);
// for each
int count = 500;
for (int i = 0; i < count; i++) {
ecs.submit(new MyThread(i, con, sql));
}
for (int i = 0; i < count; i++) {
// 通過take獲取Future結果,此方法會阻塞
ResultSet resultSet = ecs.take().get();
while (resultSet.next()) {
System.out.printf("輸出:時間:%s,值:%f \n", resultSet.getTimestamp(1)
, resultSet.getDouble(2));
}
}
es.shutdown();
tdConUtils.close(con, statement);
}運行時間:300+ms
六、使用小結
分情況。
如果需要獲取結果:線程使用Callable;
如果需要異步獲取結果:線程池使用CompletionService。
如果不需要獲取結果:線程使用Runnable;
如果需要阻塞獲取結果:線程池使用ExecutorService。
以上就是java ExecutorService CompletionService線程池區(qū)別與選擇的詳細內(nèi)容,更多關于ExecutorService CompletionService的資料請關注腳本之家其它相關文章!
相關文章
java synchronized同步靜態(tài)方法和同步非靜態(tài)方法的異同
這篇文章主要介紹了java synchronized同步靜態(tài)方法和同步非靜態(tài)方法的異同的相關資料,需要的朋友可以參考下2017-01-01
詳解SpringBoot 解決攔截器注入Service為空問題
這篇文章主要介紹了詳解SpringBoot 解決攔截器注入Service為空問題的解決,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-06-06

