java并發(fā)編程_線程池的使用方法(詳解)
一、任務(wù)和執(zhí)行策略之間的隱性耦合
Executor可以將任務(wù)的提交和任務(wù)的執(zhí)行策略解耦
只有任務(wù)是同類型的且執(zhí)行時(shí)間差別不大,才能發(fā)揮最大性能,否則,如將一些耗時(shí)長(zhǎng)的任務(wù)和耗時(shí)短的任務(wù)放在一個(gè)線程池,除非線程池很大,否則會(huì)造成死鎖等問(wèn)題
1.線程饑餓死鎖
類似于:將兩個(gè)任務(wù)提交給一個(gè)單線程池,且兩個(gè)任務(wù)之間相互依賴,一個(gè)任務(wù)等待另一個(gè)任務(wù),則會(huì)發(fā)生死鎖;表現(xiàn)為池不夠
定義:某個(gè)任務(wù)必須等待池中其他任務(wù)的運(yùn)行結(jié)果,有可能發(fā)生饑餓死鎖
2.線程池大小

注意:線程池的大小還受其他的限制,如其他資源池:數(shù)據(jù)庫(kù)連接池
如果每個(gè)任務(wù)都是一個(gè)連接,那么線程池的大小就受制于數(shù)據(jù)庫(kù)連接池的大小
3.配置ThreadPoolExecutor線程池
實(shí)例:
1.通過(guò)Executors的工廠方法返回默認(rèn)的一些實(shí)現(xiàn)
2.通過(guò)實(shí)例化ThreadPoolExecutor(.....)自定義實(shí)現(xiàn)
線程池的隊(duì)列
1.無(wú)界隊(duì)列:任務(wù)到達(dá),線程池飽滿,則任務(wù)在隊(duì)列中等待,如果任務(wù)無(wú)限達(dá)到,則隊(duì)列會(huì)無(wú)限擴(kuò)張
如:?jiǎn)卫凸潭ù笮〉木€程池用的就是此種
2.有界隊(duì)列:如果新任務(wù)到達(dá),隊(duì)列滿則使用飽和策略
3.同步移交:如果線程池很大,將任務(wù)放入隊(duì)列后在移交就會(huì)產(chǎn)生延時(shí),如果任務(wù)生產(chǎn)者很快也會(huì)導(dǎo)致任務(wù)排隊(duì)
SynchronousQueue直接將任務(wù)移交給工作線程
機(jī)制:將一個(gè)任務(wù)放入,必須有一個(gè)線程等待接受,如果沒有,則新增線程,如果線程飽和,則拒絕任務(wù)
如:CacheThreadPool就是使用的這種策略
飽和策略:
setRejectedExecutionHandler來(lái)修改飽和策略
1.終止Abort(默認(rèn)):拋出異常由調(diào)用者處理
2.拋棄Discard
3.拋棄DiscardOldest:拋棄最舊的任務(wù),注意:如果是優(yōu)先級(jí)隊(duì)列將拋棄優(yōu)先級(jí)最高的任務(wù)
4.CallerRuns:回退任務(wù),有調(diào)用者線程自行處理
4.線程工廠ThreadFactoy
每當(dāng)創(chuàng)建線程時(shí):其實(shí)是調(diào)用了線程工廠來(lái)完成
自定義線程工廠:implements ThreadFactory
可以定制該線程工廠的行為:如UncaughtExceptionHandler等
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) {
this(r, DEFAULT_NAME);
}
public MyAppThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
//設(shè)置該線程工廠創(chuàng)建的線程的 未捕獲異常的行為
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t,
Throwable e) {
log.log(Level.SEVERE,
"UNCAUGHT in thread " + t.getName(), e);
}
});
}
public void run() {
// Copy debug flag to ensure consistent value throughout.
boolean debug = debugLifecycle;
if (debug) log.log(Level.FINE, "Created " + getName());
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) log.log(Level.FINE, "Exiting " + getName());
}
}
public static int getThreadsCreated() {
return created.get();
}
public static int getThreadsAlive() {
return alive.get();
}
public static boolean getDebug() {
return debugLifecycle;
}
public static void setDebug(boolean b) {
debugLifecycle = b;
}
}
5.擴(kuò)展ThreadPoolExecutor
可以被自定義子類覆蓋的方法:
1.afterExecute:結(jié)束后,如果拋出RuntimeException則方法不會(huì)執(zhí)行
2.beforeExecute:開始前,如果拋出RuntimeException則任務(wù)不會(huì)執(zhí)行
3.terminated:在線程池關(guān)閉時(shí),可以用來(lái)釋放資源等
二、遞歸算法的并行化
1.循環(huán)
在循環(huán)中,每次循環(huán)操作都是獨(dú)立的
//串行化
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
//并行化
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements)
exec.execute(new Runnable() {
public void run() {
process(e);
}
});
}
2.迭代
如果每個(gè)迭代操作是彼此獨(dú)立的,則可以串行執(zhí)行
如:深度優(yōu)先搜索算法;注意:遞歸還是串行的,但是,每個(gè)節(jié)點(diǎn)的計(jì)算是并行的
//串行 計(jì)算compute 和串行迭代
public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
//并行 計(jì)算compute 和串行迭代
public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(() -> results.add(n.compute()));
parallelRecursive(exec, n.getChildren(), results);
}
}
//調(diào)用并行方法的操作
public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
實(shí)例:
public class ConcurrentPuzzleSolver <P, M> {
private final Puzzle<P, M> puzzle;
private final ExecutorService exec;
private final ConcurrentMap<P, Boolean> seen;
protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();
public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
this.exec = initThreadPool();
this.seen = new ConcurrentHashMap<P, Boolean>();
if (exec instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
}
}
private ExecutorService initThreadPool() {
return Executors.newCachedThreadPool();
}
public List<M> solve() throws InterruptedException {
try {
P p = puzzle.initialPosition();
exec.execute(newTask(p, null, null));
// 等待ValueLatch中閉鎖解開,則表示已經(jīng)找到答案
PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
} finally {
exec.shutdown();//最終主線程關(guān)閉線程池
}
}
protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
return new SolverTask(p, m, n);
}
protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
super(pos, move, prev);
}
public void run() {
//如果有一個(gè)線程找到了答案,則return,通過(guò)ValueLatch中isSet CountDownlatch閉鎖實(shí)現(xiàn);
//為類避免死鎖,將已經(jīng)掃描的節(jié)點(diǎn)放入set集合中,避免繼續(xù)掃描產(chǎn)生死循環(huán)
if (solution.isSet() || seen.putIfAbsent(pos, true) != null){
return; // already solved or seen this position
}
if (puzzle.isGoal(pos)) {
solution.setValue(this);
} else {
for (M m : puzzle.legalMoves(pos))
exec.execute(newTask(puzzle.move(pos, m), m, this));
}
}
}
}
以上這篇java并發(fā)編程_線程池的使用方法(詳解)就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
springboot2.x默認(rèn)使用的代理是cglib代理操作
這篇文章主要介紹了springboot2.x默認(rèn)使用的代理是cglib代理操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
Java+swing實(shí)現(xiàn)經(jīng)典貪吃蛇游戲
貪吃蛇(也叫做貪食蛇)游戲是一款休閑益智類游戲,有PC和手機(jī)等多平臺(tái)版本。既簡(jiǎn)單又耐玩。本文將通過(guò)java的swing來(lái)實(shí)現(xiàn)這一游戲,需要的可以參考一下2022-01-01
mybatis整合ehcache做三級(jí)緩存的實(shí)現(xiàn)方法
ehcache是一個(gè)快速內(nèi)存緩存框架,java項(xiàng)目里用起來(lái)很方便,下面這篇文章主要給大家介紹了關(guān)于mybatis整合ehcache做三級(jí)緩存的實(shí)現(xiàn)方法,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-06-06
Java中的lambda和stream實(shí)現(xiàn)排序
這篇文章主要介紹了Java中的lambda和stream實(shí)現(xiàn)排序,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-09-09
mybatis中orderBy(排序字段)和sort(排序方式)引起的bug及解決
這篇文章主要介紹了mybatis中orderBy(排序字段)和sort(排序方式)引起的bug,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01
Java進(jìn)程cpu頻繁100%問(wèn)題解決方案
這篇文章主要介紹了Java進(jìn)程cpu頻繁100%問(wèn)題解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10
Java中使用MongoDB數(shù)據(jù)庫(kù)實(shí)例Demo
MongoDB是由C++語(yǔ)言編寫的,基于分布式文件存儲(chǔ)的數(shù)據(jù)庫(kù),是一個(gè)介于關(guān)系數(shù)據(jù)庫(kù)和非關(guān)系數(shù)據(jù)庫(kù)之間的產(chǎn)品,是最接近于關(guān)系型數(shù)據(jù)庫(kù)的NoSQL數(shù)據(jù)庫(kù),下面這篇文章主要給大家介紹了關(guān)于Java中使用MongoDB數(shù)據(jù)庫(kù)的相關(guān)資料,需要的朋友可以參考下2023-12-12

