java多線程教程之如何使用線程池詳解
為什么要用線程池?
諸如 Web 服務器、數(shù)據(jù)庫服務器、文件服務器或郵件服務器之類的許多服務器應用程序都面臨處理來自某些遠程來源的大量短小的任務。請求以某種方式到達服務器,這種方式可能是通過網(wǎng)絡協(xié)議(例如 HTTP、FTP 或 POP)、通過 JMS 隊列或者可能通過輪詢數(shù)據(jù)庫。不管請求如何到達,服務器應用程序中經常出現(xiàn)的情況是:單個任務處理的時間很短而請求的數(shù)目卻是巨大的。
只有當任務都是同類型并且相互獨立時,線程池的性能才能達到最佳。如果將運行時間較長的與運行時間較短的任務混合在一起,那么除非線程池很大,否則將可能造成擁塞,如果提交的任務依賴于其他任務,那么除非線程池無線大,否則將可能造成死鎖。
例如饑餓死鎖:線程池中的任務需要無限等待一些必須由池中其他任務才能提供的資源或條件。
ThreadPoolExecutor的通用構造函數(shù):(在調用完構造函數(shù)之后可以繼續(xù)定制ThreadPoolExecutor)
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory, RejectedExecutionHandler handler){ //... }
飽和策略:
ThreadPoolExecutor允許提供一個BlockingQueue來保存等待執(zhí)行的任務。
當有界隊列被填滿后,飽和策略開始發(fā)揮作用??梢酝ㄟ^調用setRejectedExecutionHandler來修改。
中止是默認的飽和策略,該策略將拋出未檢查的RejectedExecutionException,調用者可以捕獲這個異常,然后根據(jù)需求編寫自己的處理代碼。
調用者運行策略實現(xiàn)了一種調節(jié)機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而降低新任務的流量。
例如對于WebServer,當線程池中的所有線程都被占用,并且工作隊列被填滿后,下一個任務在調用execute時在主線程中執(zhí)行。
由于執(zhí)行任務需要一定的時間,因此主線程至少在一段時間內不能提交任何任務,從而使得工作者線程有時間來處理完正在執(zhí)行的任務。
在這期間,主線程不會調用accept,因此到達的請求將被保存在TCP層的隊列中而不是在應用程序的隊列中,如果持續(xù)過載,那么TCP層最終發(fā)現(xiàn)它的請求隊列被填滿,同樣會開始拋棄請求。
因此當服務器過載時,這種過載會逐漸向外蔓延開來---從線程池到工作隊列到應用程序再到TCP層,最終到達客戶端,導致服務器在高負載下實現(xiàn)一種平緩的性能降低。
exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
當工作隊列被填滿后,沒有預定于的飽和策略來阻塞execute。而通過Semaphore來現(xiàn)在任務的到達率,可以實現(xiàn)。
/** * 設置信號量的上界設置為線程池的大小加上可排隊任務的數(shù)量,控制正在執(zhí)行和等待執(zhí)行的任務數(shù)量。 */ public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec,int bound){ this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable task) throws InterruptedException{ semaphore.acquire(); try{ exec.execute(new Runnable(){ public void run(){ try{ task.run(); }finally{ semaphore.release(); } } }); }catch(RejectedExecutionException e){ semaphore.release(); } } }
線程工廠
線程池配置信息中可以定制線程工廠,在ThreadFactory中只定義了一個方法newThread,每當線程池需要創(chuàng)建一個新線程時都會調用這個方法。
public interface ThreadFactory{ Thread newThread(Runnable r); }
// 示例:將一個特定于線程池的名字傳遞給MyThread的構造函數(shù),從而可以再線程轉儲和錯誤日志信息中區(qū)分來自不同線程池的線程。 public class MyThreadFactory implements ThreadFactory{ private final String poolName; public MyThreadFactory(String poolName){ this.poolName = poolName; } public Thread newThread(Runnable runnable){ return new MyThread(runnable,poolName); } }
// 示例:為線程指定名字,設置自定義UncaughtExceptionHandler向Logger中寫入信息及維護一些統(tǒng)計信息以及在線程被創(chuàng)建或者終止時把調試消息寫入日志。 public class MyThread extends Thread{ public static final String default_name = "myThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyThread(Runnable runnable){ this(runnable,default_name); } public MyThread(Runnable runnable, String defaultName) { super(runnable,defaultName + "-" + created.incrementAndGet()); setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE,"uncaught in thread " + t.getName(), e); } }); } public void run(){ boolean debug = debugLifecycle; if(debug){ log.log(Level.FINE,"created " + getName()); } try{ alive.incrementAndGet(); super.run(); }finally{ alive.decrementAndGet(); if(debug){ log.log(Level.FINE,"Exiting " + getName()); } } } }
擴展ThreadPoolExecutor
在線程池完成關閉操作時調用terminated,也就是在所有任務都已經完成并且所有工作者線程也已經關閉后。terminated可以用來釋放Executor在其生命周期里分配的各種資源,此外還可以執(zhí)行發(fā)送通知、記錄日志或者收集finalize統(tǒng)計信息等操作。
示例:給線程池添加統(tǒng)計信息
/** * TimingThreadPool中給出了一個自定義的線程池,通過beforeExecute、afterExecute、terminated等方法來添加日志記錄和統(tǒng)計信息收集。 * 為了測量任務的運行時間,beforeExecute必須記錄開始時間并把它保存到一個afterExecute可用訪問的地方。 * 因為這些方法將在執(zhí)行任務的線程中調用,因此beforeExecute可以把值保存到一個ThreadLocal變量中。然后由afterExecute來取。 * 在TimingThreadPool中使用了兩個AtomicLong變量,分別用于記錄已處理的任務和總的處理時間,并通過包含平均任務時間的日志消息。 */ public class TimingThreadPool extends ThreadPoolExecutor{ public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } private final ThreadLocal<Long> startTime = new ThreadLocal<Long>(); private final Logger log = Logger.getLogger("TimingThreadPool"); private final AtomicLong numTasks = new AtomicLong(); private final AtomicLong totalTime = new AtomicLong(); protected void beforeExecute(Thread t,Runnable r){ super.beforeExecute(t, r); log.fine(String.format("Thread %s: start %s", t,r)); startTime.set(System.nanoTime()); } protected void afterExecute(Throwable t,Runnable r){ try{ long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); log.fine(String.format("Thread %s: end %s, time=%dns", t,r,taskTime)); }finally{ super.afterExecute(r, t); } } protected void terminated(){ try{ log.info(String.format("Terminated: avg time=%dns", totalTime.get()/numTasks.get())); }finally{ super.terminated(); } } }
#筆記內容參考 《java并發(fā)編程實戰(zhàn)》
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。
相關文章
淺析Spring配置中的classpath:與classpath*:的區(qū)別
這篇文章主要介紹了Spring配置中的"classpath:"與"classpath*:"的區(qū)別,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08SpringBoot如何動態(tài)修改Scheduled(系統(tǒng)啟動默認執(zhí)行,動態(tài)修改)
這篇文章主要介紹了SpringBoot如何動態(tài)修改Scheduled(系統(tǒng)啟動默認執(zhí)行,動態(tài)修改)的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07SSH框架網(wǎng)上商城項目第17戰(zhàn)之購物車基本功能
這篇文章主要為大家詳細介紹了SSH框架網(wǎng)上商城項目第17戰(zhàn)之購物車基本功能的實現(xiàn)過程,感興趣的小伙伴們可以參考一下2016-06-06關于jdk環(huán)境變量配置以及javac不是內部或外部命令的解決
這篇文章主要介紹了關于jdk環(huán)境變量配置以及javac不是內部或外部命令的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01