Java并發(fā)之Phaser的全面解析詳解
內(nèi)容概要
Phaser是Java中一個(gè)靈活的同步工具,其優(yōu)點(diǎn)在于支持多階段的任務(wù)拆分與同步,并且能夠動(dòng)態(tài)地注冊(cè)與注銷參與者,它提供了豐富的等待與推進(jìn)機(jī)制,使得開發(fā)者能夠更細(xì)粒度地控制線程的協(xié)調(diào)行為,實(shí)現(xiàn)復(fù)雜的并行任務(wù)處理,相比于其他同步工具,Phaser更加靈活且易于擴(kuò)展,適用于多種并發(fā)場景。
核心概念
在Java中,Phaser
是一個(gè)靈活的同步工具類,它允許多個(gè)線程在一個(gè)或多個(gè)屏障(barrier points)上進(jìn)行協(xié)調(diào),可以把Phaser
想象成一個(gè)多線程聚會(huì)的組織者,它負(fù)責(zé)確保所有參與的線程都到達(dá)某個(gè)階段后再一起進(jìn)行下一步。
舉一個(gè)實(shí)際生活中的場景:假設(shè)正在開發(fā)一個(gè)在線多人游戲,比如,團(tuán)隊(duì)解謎游戲,在這個(gè)游戲中,有幾個(gè)玩家(線程)需要合作完成一系列任務(wù)來通關(guān),每個(gè)任務(wù)都被劃分為幾個(gè)階段,而每個(gè)階段都需要所有玩家共同完成某些操作后才能進(jìn)入下一階段。
這個(gè)場景中,Phaser
就可以發(fā)揮它的作用,可以把每個(gè)階段看作是一個(gè)屏障點(diǎn),每個(gè)玩家線程在完成自己當(dāng)前階段的任務(wù)后會(huì)向Phaser
報(bào)告,然后等待其他玩家完成,一旦所有玩家都完成了當(dāng)前階段的任務(wù),Phaser
就會(huì)像一個(gè)響鈴一樣,通知所有玩家可以進(jìn)入下一階段了。
比如,在解謎游戲的關(guān)卡中,四個(gè)玩家需要分別找到四個(gè)不同的線索,并將這些線索組合起來才能打開通往下一關(guān)的大門,每個(gè)玩家在找到線索后,都會(huì)告知Phaser
自己已經(jīng)完成任務(wù),Phaser
會(huì)等待所有四個(gè)玩家都找到線索后,再通知他們可以將線索組合起來打開大門進(jìn)入下一關(guān)了。
Phaser主要用于解決多個(gè)線程分階段共同完成任務(wù)的同步問題,它可以確保一組線程在達(dá)到某個(gè)屏障點(diǎn)(phase)之前都保持同步,即所有線程都完成了某個(gè)階段的任務(wù)后,才能一起進(jìn)入下一個(gè)階段,這種同步機(jī)制尤其適用于需要多個(gè)線程協(xié)作完成復(fù)雜任務(wù)的情況,比如在線多人游戲、分布式系統(tǒng)、并行計(jì)算等場景。
Phaser在內(nèi)部維護(hù)了一個(gè)狀態(tài)機(jī),用來跟蹤和管理每個(gè)線程的執(zhí)行狀態(tài)以及各個(gè)階段的完成情況,當(dāng)一個(gè)線程完成任務(wù)并達(dá)到屏障點(diǎn)時(shí),會(huì)調(diào)用Phaser的相應(yīng)方法來通知其他線程,然后等待其他線程一起進(jìn)入下一階段,在這個(gè)過程中,Phaser會(huì)管理線程的同步和協(xié)作,確保所有線程都能按照預(yù)定的順序完成各自的任務(wù)。
此外,Phaser還提供了靈活的注冊(cè)和注銷線程的功能,可以動(dòng)態(tài)地添加或刪除參與同步的線程,它還支持中斷和超時(shí)機(jī)制,可以在等待其他線程的過程中被中斷或設(shè)置超時(shí),增強(qiáng)了對(duì)多線程同步的靈活性。
官方文檔:docx.iamqiang.com/jdk11/api/java.base/java/util/concurrent/Phaser.html
代碼案例
下面是一個(gè)簡單的Java示例代碼,演示了如何使用Phaser
來同步多個(gè)線程,以確保它們分階段完成任務(wù),如下代碼:
import java.util.concurrent.Phaser; public class PhaserExample { public static void main(String[] args) throws InterruptedException { // 創(chuàng)建一個(gè)Phaser實(shí)例,初始時(shí)注冊(cè)3個(gè)線程(不包括主線程) Phaser phaser = new Phaser(3); // 創(chuàng)建并啟動(dòng)3個(gè)線程 for (int i = 0; i < 3; i++) { int threadNum = i + 1; // 為了在輸出中區(qū)分線程 new Thread(() -> { System.out.println("線程" + threadNum + ":已經(jīng)準(zhǔn)備好,等待其他線程。"); // 線程在此等待,直到所有線程都到達(dá)這個(gè)屏障點(diǎn) phaser.arriveAndAwaitAdvance(); System.out.println("線程" + threadNum + ":第一階段任務(wù)完成。"); // 模擬第二階段的任務(wù) try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } // 再次到達(dá)屏障點(diǎn),等待其他線程 phaser.arriveAndAwaitAdvance(); System.out.println("線程" + threadNum + ":第二階段任務(wù)完成。"); // 模擬第三階段的任務(wù) try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } // 最后一次到達(dá)屏障點(diǎn),所有線程都完成后Phaser將自動(dòng)進(jìn)入終止?fàn)顟B(tài) phaser.arriveAndAwaitAdvance(); System.out.println("線程" + threadNum + ":第三階段任務(wù)完成,Phaser任務(wù)結(jié)束。"); }).start(); } // 等待所有線程完成任務(wù) // 注意:在實(shí)際應(yīng)用中,可能不希望主線程在這里阻塞,而是去做其他工作 // 但為了演示目的,讓主線程等待所有工作線程完成 phaser.awaitAdvance(phaser.getPhase()); System.out.println("所有線程的第一階段任務(wù)完成。"); phaser.awaitAdvance(phaser.getPhase() + 1); System.out.println("所有線程的第二階段任務(wù)完成。"); phaser.awaitAdvance(phaser.getPhase() + 1); System.out.println("所有線程的第三階段任務(wù)完成,整個(gè)任務(wù)結(jié)束。"); } }
在上面代碼中,創(chuàng)建了一個(gè)Phaser
實(shí)例并初始注冊(cè)了3個(gè)線程以及主線程,每個(gè)線程都執(zhí)行三個(gè)階段的任務(wù),每個(gè)階段任務(wù)之間都通過phaser.arriveAndAwaitAdvance()
方法進(jìn)行同步,每個(gè)線程在完成當(dāng)前階段的任務(wù)后,都會(huì)在這個(gè)方法上阻塞,直到所有其他線程也完成了它們當(dāng)前階段的任務(wù),在所有線程都完成最后一個(gè)階段的任務(wù)后,Phaser
會(huì)自動(dòng)進(jìn)入終止?fàn)顟B(tài),此時(shí)不會(huì)再有線程被阻塞。
上述代碼輸出如下結(jié)果:
主線程也已經(jīng)準(zhǔn)備好,等待其他線程。
線程x已經(jīng)準(zhǔn)備好,等待其他線程。
線程y已經(jīng)準(zhǔn)備好,等待其他線程。
線程z已經(jīng)準(zhǔn)備好,等待其他線程。
(這里所有線程和主線程都會(huì)等待,直到所有參與者都調(diào)用了arriveAndAwaitAdvance)
主線程第一階段任務(wù)完成(實(shí)際上主線程可能只是監(jiān)控或協(xié)調(diào)其他線程)。
線程x第一階段任務(wù)完成。
線程y第一階段任務(wù)完成。
線程z第一階段任務(wù)完成。
(所有線程和主線程繼續(xù)執(zhí)行,直到它們?cè)俅握{(diào)用arriveAndAwaitAdvance)
主線程第二階段任務(wù)完成(實(shí)際上可能是等待其他線程完成某些任務(wù))。
線程x第二階段任務(wù)完成。
線程y第二階段任務(wù)完成。
線程z第二階段任務(wù)完成。
(所有線程和主線程繼續(xù)執(zhí)行第三階段任務(wù))
主線程第三階段任務(wù)完成(實(shí)際上可能是進(jìn)行一些清理工作或者匯總結(jié)果)。
線程x第三階段任務(wù)完成,Phaser任務(wù)結(jié)束。
線程y第三階段任務(wù)完成,Phaser任務(wù)結(jié)束。
線程z第三階段任務(wù)完成,Phaser任務(wù)結(jié)束。
核心API
Phaser
它允許一組線程互相等待,直到所有線程都到達(dá)某個(gè)屏障(barrier)點(diǎn),Phaser
非常適合用于多階段的任務(wù)拆分和同步,以下是Phaser
中一些重要方法的簡要說明:
Phaser(int parties)
: 構(gòu)造函數(shù),創(chuàng)建一個(gè)新的Phaser
實(shí)例,并設(shè)置注冊(cè)的線程數(shù)(parties),這個(gè)數(shù)字表示在繼續(xù)到下一個(gè)階段之前,必須到達(dá)屏障的線程數(shù)。Phaser()
: 構(gòu)造函數(shù),創(chuàng)建一個(gè)新的Phaser
實(shí)例,但不設(shè)置注冊(cè)的線程數(shù),這通常用于層次結(jié)構(gòu)的Phaser
,其中子Phaser
會(huì)繼承父Phaser
的注冊(cè)線程數(shù)。register()
: 增加一個(gè)到達(dá)屏障所需的線程數(shù),如果調(diào)用此方法的線程尚未注冊(cè),它也會(huì)將自己注冊(cè)為未到達(dá)的線程。arrive()
: 表示當(dāng)前線程已經(jīng)到達(dá)屏障,并減少未到達(dá)的線程數(shù),如果這是最后一個(gè)到達(dá)的線程,并且已經(jīng)設(shè)置了下一個(gè)階段的屏障,那么這個(gè)方法將返回true
,否則返回false
。arriveAndAwaitAdvance()
: 當(dāng)前線程到達(dá)屏障,并等待其他線程也到達(dá),當(dāng)所有線程都到達(dá)后,屏障會(huì)自動(dòng)推進(jìn)到下一個(gè)階段,然后該方法返回,如果當(dāng)前Phaser
被終止,這個(gè)方法會(huì)拋出IllegalStateException
。awaitAdvance(int phase)
: 等待直到屏障推進(jìn)到給定的階段,如果當(dāng)前階段大于或等于給定的階段,那么此方法將立即返回。isTerminated()
: 檢查Phaser
是否已經(jīng)終止,當(dāng)注冊(cè)的線程數(shù)減少到零,且沒有新的線程注冊(cè)時(shí),Phaser
將被終止。getPhase()
: 獲取當(dāng)前屏障的階段號(hào),每個(gè)屏障都有一個(gè)唯一的階段號(hào),初始階段號(hào)為0。getRegisteredParties()
: 獲取當(dāng)前注冊(cè)的線程數(shù)。getArrivedParties()
: 獲取已經(jīng)到達(dá)當(dāng)前屏障的線程數(shù)。getUnarrivedParties()
: 獲取尚未到達(dá)當(dāng)前屏障的線程數(shù),這實(shí)際上是getRegisteredParties()
和getArrivedParties()
之間的差值。forceTermination()
: 強(qiáng)制終止Phaser
,即使還有未到達(dá)的線程,這會(huì)導(dǎo)致所有等待在arriveAndAwaitAdvance()
或awaitAdvance(int)
方法上的線程拋出IllegalStateException
。onAdvance(int phase, int registeredParties)
: 這是一個(gè)受保護(hù)的方法,可以在子類中覆蓋,以便在每個(gè)屏障階段推進(jìn)時(shí)執(zhí)行自定義操作。bulkRegister(int parties)
: 一次性注冊(cè)多個(gè)線程,這通常用于靜態(tài)已知的線程數(shù),或者當(dāng)多個(gè)任務(wù)由同一個(gè)線程代表時(shí)。
核心總結(jié)
Phaser類的目的是允許在并發(fā)編程中同步多個(gè)線程之間的執(zhí)行,它具有如下優(yōu)點(diǎn),如下:
- 更好的可擴(kuò)展性:Phaser類相對(duì)于其他同步工具類(如CyclicBarrier和CountDownLatch)具有更好的可擴(kuò)展性,因?yàn)樗С指嗟膮⑴c者(即線程)同時(shí)進(jìn)行同步。
- 自動(dòng)注銷和清理:當(dāng)所有參與者都完成執(zhí)行后,Phaser會(huì)自動(dòng)注銷并釋放相關(guān)資源,這有助于避免內(nèi)存泄漏和資源浪費(fèi)。
- 靈活的執(zhí)行模式:Phaser類提供了多種執(zhí)行模式,如并行、串行和混合模式,這使得在處理并發(fā)任務(wù)時(shí)更加靈活。
它也有不少缺點(diǎn),如:1、與其他的同步工具類相比,Phaser類的實(shí)現(xiàn)相對(duì)復(fù)雜,因此在某些場景下可能會(huì)引入額外的性能開銷,并且Phaser類具有一定的使用門檻,使用時(shí)深入理解并發(fā)編程和Java并發(fā)API,這可能會(huì)增加學(xué)習(xí)成本。
到此這篇關(guān)于Java并發(fā)之Phaser的全面解析詳解的文章就介紹到這了,更多相關(guān)Java Phaser內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)實(shí)時(shí)監(jiān)控目錄下文件變化的方法
今天小編就為大家分享一篇關(guān)于Java實(shí)現(xiàn)實(shí)時(shí)監(jiān)控目錄下文件變化的方法,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-03-03Java微信公眾平臺(tái)開發(fā)(8) 多媒體消息回復(fù)
這篇文章主要為大家詳細(xì)介紹了Java微信公眾平臺(tái)開發(fā)第八步,微信多媒體消息回復(fù),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-04-04基于springboot?配置文件context-path的坑
這篇文章主要介紹了基于springboot?配置文件context-path的坑,基于很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01基于MybatisPlus插件TenantLineInnerInterceptor實(shí)現(xiàn)多租戶功能
這篇文章主要介紹了基于MybatisPlus插件TenantLineInnerInterceptor實(shí)現(xiàn)多租戶功能,需要的朋友可以參考下2021-11-11AsyncHttpClient?RequestFilter請(qǐng)求篩選源碼解讀
這篇文章主要為大家介紹了AsyncHttpClient?RequestFilter請(qǐng)求篩選源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12java虛擬機(jī)JVM類加載機(jī)制原理(面試必問)
這篇文章主要介紹了面試當(dāng)中必會(huì)問到的java虛擬機(jī)JVM類加載機(jī)制,非常的詳細(xì),有需要的朋友可以借鑒參考下,歡迎多多交流討論2021-08-08Java設(shè)計(jì)模式之策略模式的使用(Strategy?Pattern)
策略模式是一種行為型設(shè)計(jì)模式,用于定義一系列算法并將每個(gè)算法封裝起來,使它們可以互相替換,從而實(shí)現(xiàn)代碼的可維護(hù)性和靈活性,策略模式包含策略接口、具體策略類和上下文類,并通過將算法的選擇與使用分離,使得算法可以獨(dú)立變化2025-03-03SpringBoot3集成iText實(shí)現(xiàn)PDF導(dǎo)出功能
不知道小伙伴們?cè)陧?xiàng)目中有沒有遇到過導(dǎo)出 PDF 的需求,小編在之前的 tienchin 項(xiàng)目中有一個(gè)合同導(dǎo)出的功能,需要將文檔導(dǎo)出為PDF,將文檔導(dǎo)出為 PDF 有很多方案,不同方案的優(yōu)缺點(diǎn)也各不相同,今天小編就和大家演示一個(gè),感興趣的小伙伴跟著小編一起來看看吧2024-10-10