解決線程池中ThreadGroup的坑
線程池中ThreadGroup的坑
在Java中每一個(gè)線程都?xì)w屬于某個(gè)線程組管理的一員,例如在主函數(shù)main()主工作流程中產(chǎn)生一個(gè)線程,則產(chǎn)生的線程屬于main這個(gè)線程組管理的一員。簡(jiǎn)單地說,線程組(ThreadGroup)就是由線程組成的管理線程的類,這個(gè)類是java.lang.ThreadGroup類。
定義一個(gè)線程組,通過以下代碼可以實(shí)現(xiàn)。
ThreadGroup group=new ThreadGroup(“groupName”); Thread thread=new Thread(group,”the first thread of group”);
ThreadGroup類中的某些方法,可以對(duì)線程組中的線程產(chǎn)生作用。例如,setMaxPriority()方法可以設(shè)定線程組中的所有線程擁有最大的優(yōu)先權(quán)。
所有線程都隸屬于一個(gè)線程組。那可以是一個(gè)默認(rèn)線程組(不指定group),亦可是一個(gè)創(chuàng)建線程時(shí)明確指定的組。在創(chuàng)建之初,線程被限制到一個(gè)組里,而且不能改變到一個(gè)不同的組。每個(gè)應(yīng)用都至少有一個(gè)線程從屬于系統(tǒng)線程組。若創(chuàng)建多個(gè)線程而不指定一個(gè)組,它們就會(huì)自動(dòng)歸屬于系統(tǒng)線程組。
線程組也必須從屬于其他線程組。必須在構(gòu)建器里指定新線程組從屬于哪個(gè)線程組。若在創(chuàng)建一個(gè)線程組的時(shí)候沒有指定它的歸屬,則同樣會(huì)自動(dòng)成為系統(tǒng)線程組的一名屬下。因此,一個(gè)應(yīng)用程序中的所有線程組最終都會(huì)將系統(tǒng)線程組作為自己的“父”。
那么假如我們需要在線程池中實(shí)現(xiàn)一個(gè)帶自定義ThreadGroup的線程分組,該怎么實(shí)現(xiàn)呢?
我們?cè)诮o線程池(ThreadPoolExecutor)提交任務(wù)的時(shí)候可以通過execute(Runnable command)來將一個(gè)線程任務(wù)加入到該線程池,那么我們是否可以通過new一個(gè)指定了ThreadGroup的Thread實(shí)例來加入線程池來達(dá)到前面說到的目的呢?
ThreadGroup是否可行
通過new Thread(threadGroup,runnable)實(shí)現(xiàn)線程池中任務(wù)分組
public static void main(String[] args) { ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool(); final ThreadGroup group = new ThreadGroup("Main_Test_Group"); for (int i = 0; i < 5; i++) { Thread thread = new Thread(group, new Runnable() { @Override public void run() { int sleep = (int)(Math.random() * 10); try { Thread.sleep(1000 * 3); System.out.println(Thread.currentThread().getName()+"執(zhí)行完畢"); System.out.println("當(dāng)前線程組中的運(yùn)行線程數(shù)"+group.activeCount()); } catch (InterruptedException e) { e.printStackTrace(); } } }, group.getName()+" #"+i+""); pool.execute(thread); } }
運(yùn)行結(jié)果
pool-1-thread-3執(zhí)行完畢
pool-1-thread-1執(zhí)行完畢
當(dāng)前線程組中的運(yùn)行線程數(shù)0
pool-1-thread-2執(zhí)行完畢
當(dāng)前線程組中的運(yùn)行線程數(shù)0
當(dāng)前線程組中的運(yùn)行線程數(shù)0
pool-1-thread-4執(zhí)行完畢
pool-1-thread-5執(zhí)行完畢
當(dāng)前線程組中的運(yùn)行線程數(shù)0
當(dāng)前線程組中的運(yùn)行線程數(shù)0
運(yùn)行結(jié)果中可以看到group中的線程并沒有因?yàn)榫€程池啟動(dòng)了這個(gè)線程任務(wù)而運(yùn)行起來.因此通過線程組來對(duì)線程池中的線層任務(wù)分組不可行.
從java.util.concurrent.ThreadPoolExecutor源碼中可以看到如下構(gòu)造函數(shù):
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
如果我們?cè)趯?shí)例化ThreadPoolExecutor時(shí)不指定ThreadFactory,那么將以默認(rèn)的ThreadFactory來創(chuàng)建Thread.
Executors內(nèi)部類DefaultThreadFactory
下面的源碼即是默認(rèn)的Thread工廠
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
從唯一的構(gòu)造函數(shù)可以看到DefaultThreadFactory以SecurityManager 實(shí)例中的ThreadGroup來指定線程的group,如果SecurityManager 獲取到的ThreadGroup為null才默認(rèn)以當(dāng)前線程的group來指定.public Thread newThread(Runnable r) 則以group來new 一個(gè)Thead.這樣我們可以在實(shí)例化ThreadPoolExecutor對(duì)象的時(shí)候在其構(gòu)造函數(shù)內(nèi)傳入自定義的ThreadFactory實(shí)例即可達(dá)到目的.
public class MyTheadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; private ThreadGroup defaultGroup; public MyTheadFactory() { SecurityManager s = System.getSecurityManager(); defaultGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public MyTheadFactory(ThreadGroup group) { this.defaultGroup = group; namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(defaultGroup, null, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
ThreadGroup的使用及手寫線程池
監(jiān)聽線程異常關(guān)閉
以下代碼在window下不方便測(cè)試,需在linux 上 測(cè)試
// 以下線程如果強(qiáng)制關(guān)閉的話,是無法打印`線程被殺掉了` // 模擬關(guān)閉 kill PID public static void main(String[] args) { Runtime.getRuntime().addShutdownHook(new Thread( () -> { System.out.println("線程被殺掉了"); })); while(true){ System.out.println("i am working ..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
如何拿到Thread線程中異常
public static void main(String[] args) { Thread thread = new Thread(() -> { try { Thread.sleep(1000); int i = 10/0; } catch (InterruptedException e) { e.printStackTrace(); } }); thread.setUncaughtExceptionHandler((t,e)->{ System.out.println("線程的名字"+ t.getName()); System.out.println(e); }); // 通過注入接口的方式 thread.start(); }
ThreadGroup
注意: threadGroup 設(shè)置為isDaemon 后,會(huì)隨最后一個(gè)線程結(jié)束而銷毀,如果沒有設(shè)置isDaemon ,則需要手動(dòng)調(diào)用 destory()
線程池使用
自己搭建的簡(jiǎn)單線程池實(shí)現(xiàn)
其中ThreadGroup 的應(yīng)用沒有寫,但是我們可以觀察線程關(guān)閉后,檢查ThreadGroup 中是否還有活躍的線程等,具體參考ThreadGroup API
import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.stream.IntStream; /** * @Author: shengjm * @Date: 2020/2/10 9:52 * @Description: */ public class SimpleThreadPool extends Thread{ /** * 線程數(shù)量 */ private int size; private final int queueSize; /** * 默認(rèn)線程隊(duì)列數(shù)量 */ private final static int DEFAULR_TASK_QUEUE_SIZE = 2000; private static volatile int seq = 0; private final static String THREAD_PREFIX = "SIMPLE_THREAD_POLL_"; private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group"); private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>(); private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>(); private final DiscardPolicy discardPolicy; private volatile boolean destory = false; private int min; private int max; private int active; /** * 定義異常策略的實(shí)現(xiàn) */ private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> { throw new DiscardException("線程池已經(jīng)被撐爆了,后繼多余的人將丟失"); }; /** * */ public SimpleThreadPool(){ this(4,8,12,DEFAULR_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY); } /** * */ public SimpleThreadPool(int min , int active , int max , int queueSize,DiscardPolicy discardPolicy) { this.min = min; this.active = active; this.max = max; this.queueSize = queueSize; this.discardPolicy = discardPolicy; init(); } /** * 初始化 */ private void init() { for(int i = 0; i < min; i++){ createWorkTask(); } this.size = min; this.start(); } private void createWorkTask(){ WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++)); task.start(); THREAD_QUEUE.add(task); } /** * 線程池自動(dòng)擴(kuò)充 */ @Override public void run() { while(!destory){ System.out.println(this.min +" --- "+this.active+" --- "+this.max + " --- "+ this.size + " --- "+ TASK_QUEUE.size()); try { Thread.sleep(1000); if(TASK_QUEUE.size() > active && size < active){ for (int i = size; i < active;i++){ createWorkTask(); } size = active; }else if(TASK_QUEUE.size() > max && size < max){ for (int i = size; i < max;i++){ createWorkTask(); } size = max; } synchronized (THREAD_QUEUE){ if(TASK_QUEUE.isEmpty() && size > active){ int release = size - active; for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator();it.hasNext();){ if(release <=0){ break; } WorkerTask task = it.next(); task.close(); task.interrupt(); it.remove(); release--; } size = active; } } } catch (InterruptedException e) { break; } } } public void submit(Runnable runnable){ synchronized (TASK_QUEUE){ if(destory){ throw new DiscardException("線程池已經(jīng)被摧毀了..."); } if(TASK_QUEUE.size() > queueSize){ discardPolicy.discard(); } TASK_QUEUE.addLast(runnable); TASK_QUEUE.notifyAll(); } } /** * 關(guān)閉 */ public void shutdown(){ while(!TASK_QUEUE.isEmpty()){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } synchronized (THREAD_QUEUE) { int initVal = THREAD_QUEUE.size(); while (initVal > 0) { for (WorkerTask workerTask : THREAD_QUEUE) { if (workerTask.getTaskState() == TaskState.BLOCKED) { workerTask.interrupt(); workerTask.close(); initVal--; } else { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } } this.destory = true; } } public int getSize() { return size; } public int getMin() { return min; } public int getMax() { return max; } public int getActive() { return active; } /** * 線程狀態(tài) */ private enum TaskState{ FREE , RUNNING , BLOCKED , DEAD } /** * 自定義異常類 */ public static class DiscardException extends RuntimeException{ public DiscardException(String message){ super(message); } } /** * 定義異常策略 */ @FunctionalInterface public interface DiscardPolicy{ void discard() throws DiscardException; } private static class WorkerTask extends Thread{ private volatile TaskState taskState = TaskState.FREE; public TaskState getTaskState(){ return this.taskState; } public WorkerTask(ThreadGroup group , String name){ super(group , name); } @Override public void run(){ OUTER: while(this.taskState != TaskState.DEAD){ Runnable runnable; synchronized (TASK_QUEUE){ while(TASK_QUEUE.isEmpty()){ try { taskState = TaskState.BLOCKED; TASK_QUEUE.wait(); } catch (InterruptedException e) { break OUTER; } } runnable = TASK_QUEUE.removeFirst(); } if(runnable != null){ taskState = TaskState.RUNNING; runnable.run(); taskState = TaskState.FREE; } } } public void close(){ this.taskState = TaskState.DEAD; } } /** * 測(cè)試 * @param args */ public static void main(String[] args) { SimpleThreadPool simpleThreadPool = new SimpleThreadPool(); // SimpleThreadPool simpleThreadPool = new SimpleThreadPool(6,15,SimpleThreadPool.DEFAULT_DISCARD_POLICY); IntStream.rangeClosed(0,40).forEach(i -> { simpleThreadPool.submit(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("the runnable " + i + "be servered by " + Thread.currentThread()); }); }); // try { // Thread.sleep(15000); // } catch (InterruptedException e) { // e.printStackTrace(); // } simpleThreadPool.shutdown(); } }
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java連接Access數(shù)據(jù)庫(kù)的方法
這篇文章主要為大家詳細(xì)介紹了java連接Access數(shù)據(jù)庫(kù)的方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-05-05Java并發(fā)包工具類CountDownLatch的應(yīng)用詳解
CountDownLatch是Java并發(fā)包中非常實(shí)用的一個(gè)工具類,它可以幫助我們實(shí)現(xiàn)線程之間的同步和協(xié)作。本文主要介紹了CountDownLatch的應(yīng)用場(chǎng)景及最佳實(shí)踐,希望對(duì)大家有所幫助2023-04-04MyBatis框架迭代器模式實(shí)現(xiàn)原理解析
這篇文章主要介紹了MyBatis框架迭代器模式實(shí)現(xiàn)原理解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03SpringBoot?DataSource數(shù)據(jù)源實(shí)現(xiàn)自動(dòng)配置流程詳解
這篇文章主要介紹了SpringBoot?DataSource數(shù)據(jù)源實(shí)現(xiàn)自動(dòng)配置流程,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2022-10-10