如何基于ThreadPoolExecutor創(chuàng)建線程池并操作
日常工作中很多地方很多效率極低的操作,往往可以改串行為并行,執(zhí)行效率往往提高數(shù)倍,廢話不多說先上代碼
1、用到的guava坐標
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
2、創(chuàng)建一個枚舉保證線程池是單例
package com.hao.service;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public enum ExecutorManager {
INSTANCE;
private ExecutorManager() {
}
private static int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors();
public static final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(AVAILABLEPROCESSORS * 50, AVAILABLEPROCESSORS * 80, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(AVAILABLEPROCESSORS * 2000),
new ThreadFactoryBuilder().setNameFormat("ExecutorManager-pool-Thread-%d").build());
}
3、創(chuàng)建一個方法類
package com.hao.service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Service;
import com.google.common.base.Preconditions;
@Service
public class ExecutorContext {
public ExecutorService executorService;
private int DEFAULT_WAIT_SECONDS = 2;
@PostConstruct
public void init() {
executorService = ExecutorManager.threadPoolExecutor;
}
public <T> List<T> waitAllFutures(List<Callable<T>> calls, int milliseconds) throws Exception {
Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
LatchedCallables<T> latchAndCallables = wrapCallables(calls);
List<Future<T>> futurres = new LinkedList<>();
for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
if (null != callable) {
futurres.add(executorService.submit(callable));
}
}
List<T> rets = new ArrayList<>();
if (latchAndCallables.latch.await(milliseconds, TimeUnit.MILLISECONDS)) {
for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
rets.add(call.getResult());
}
} else {
for (Future<T> future : futurres) {
if (!future.isDone()) {
future.cancel(true);
}
}
}
return rets;
}
public <T> List<T> waitAllCallables(List<Callable<T>> calls, int seconds) throws Exception {
Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
LatchedCallables<T> latchAndCallables = wrapCallables(calls);
for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
executorService.submit(callable);
}
List<T> rets = new ArrayList<>();
if (latchAndCallables.latch.await(seconds, TimeUnit.SECONDS)) {
for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
rets.add(call.getResult());
}
}
return rets;
}
public <T> List<T> waitAllCallables(@SuppressWarnings("unchecked") Callable<T>... calls) throws Exception {
Preconditions.checkNotNull(calls, "callable empty.");
return waitAllCallables(Arrays.asList(calls), DEFAULT_WAIT_SECONDS);
}
private static <T> LatchedCallables<T> wrapCallables(List<Callable<T>> callables) {
CountDownLatch latch = new CountDownLatch(callables.size());
List<CountdownedCallable<T>> wrapped = new ArrayList<>(callables.size());
for (Callable<T> callable : callables) {
wrapped.add(new CountdownedCallable<>(callable, latch));
}
LatchedCallables<T> returnVal = new LatchedCallables<>();
returnVal.latch = latch;
returnVal.wrappedCallables = wrapped;
return returnVal;
}
public static class LatchedCallables<T> {
public CountDownLatch latch;
public List<CountdownedCallable<T>> wrappedCallables;
}
public static class CountdownedCallable<T> implements Callable<T> {
private final Callable<T> wrapped;
private final CountDownLatch latch;
private T result;
public CountdownedCallable(Callable<T> wrapped, CountDownLatch latch) {
this.wrapped = wrapped;
this.latch = latch;
}
@Override
public T call() throws Exception {
try {
result = wrapped.call();
return result;
} finally {
latch.countDown();
}
}
public T getResult() {
return result;
}
}
}
4、創(chuàng)建一個測試類
package com.hao;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import com.hao.bean.Employee;
import com.hao.service.EmployeeService;
import com.hao.service.ExecutorContext;
public class ExecutorTest extends BaseTest {
@Autowired
ExecutorContext executorContext;
@Autowired
EmployeeService employeeService;
@Test
public void test01() {
long t0 = System.currentTimeMillis();
List<Employee> employees = new ArrayList<Employee>();
try {
List<Callable<Integer>> calls = new ArrayList<Callable<Integer>>();
Callable<Integer> able1 = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(5000);
Employee employee = employeeService.getById(1L);
employees.add(employee);
return 1;
}
};
calls.add(able1);
Callable<Integer> able2 = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(5000);
Employee employee = employeeService.getById(2L);
employees.add(employee);
return 2;
}
};
calls.add(able2);
Callable<Integer> able3 = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(5000);
Employee employee = employeeService.getById(3L);
employees.add(employee);
return 3;
}
};
calls.add(able3);
executorContext.waitAllCallables(calls, 5000);
} catch (Exception e) {
e.printStackTrace();
}
for (Employee employee : employees) {
System.out.println(employee);
}
System.out.println(System.currentTimeMillis() - t0);
}
}
5、執(zhí)行結(jié)果如下

次工具類的好處在于能夠像使用普通 service一樣使用線程池完成并行操作,當然不要忘記將 ExecutorContext 置于能被sping掃描到的地方,
否則不能直接使用@Autowired 依賴注入
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
rabbitmq使用springboot實現(xiàn)direct模式(最新推薦)
這篇文章主要介紹了rabbitmq使用springboot實現(xiàn)direct模式,本文通過示例代碼給大家介紹的非常詳細,對大家的學(xué)習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-07-07
Java事務(wù)管理學(xué)習之Spring和Hibernate詳解
這篇文章主要給大家介紹了Java事務(wù)管理學(xué)習之Spring和Hibernate的相關(guān)資料,文中通過示例代碼介紹的非常詳細,需要的朋友們可以參考借鑒,下面來一起看看吧。2017-03-03
springboot2.x默認使用的代理是cglib代理操作
這篇文章主要介紹了springboot2.x默認使用的代理是cglib代理操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
SpringBoot+MyBatisPlus+MySQL8實現(xiàn)樹形結(jié)構(gòu)查詢
這篇文章主要為大家詳細介紹了SpringBoot+MyBatisPlus+MySQL8實現(xiàn)樹形結(jié)構(gòu)查詢,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-06-06
SpringMVC 方法四種類型返回值總結(jié)(你用過幾種)
這篇文章主要介紹了SpringMVC 方法四種類型返回值總結(jié)(你用過幾種),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-05-05

