Java中的Phaser使用詳解
Java中的Phaser
JDK1.7新特性
可重用的同步屏障,其功能類似于CyclicBarrier和CountDownLatch,但支持更靈活的用法。
- Registration:與其他障礙不同,注冊(cè)在phaser上進(jìn)行同步的parties數(shù)量可能會(huì)隨時(shí)間變化。任務(wù)可以隨時(shí)進(jìn)行注冊(cè)(使用方法register,bulkRegister或建立初始方數(shù)目的構(gòu)造函數(shù)形式),也可以選擇在任何到達(dá)時(shí)注銷(使用ArcadeAndDeregister)。與大多數(shù)基本同步結(jié)構(gòu)一樣,registration和deregistration會(huì)只影響內(nèi)部計(jì)數(shù)。它們不會(huì)建立任何進(jìn)一步的內(nèi)部簿記,因此任務(wù)無法查詢它們是否已注冊(cè)。 (但是,可以通過通過子類來引入這種記帳方法。)
- Synchronization:像CyclicBarrier一樣,Phaser可能會(huì)反復(fù)等待。arriveAndAwaitAdvance方法的作用類似于CyclicBarrier.await。每一代的phaser都有一個(gè)關(guān)聯(lián)的相數(shù)。相數(shù)從零開始,并在所有各方到達(dá)移相器時(shí)被遞增,在達(dá)到Integer.MAX_VALUE后回繞為零。通過使用任何注冊(cè)方可以調(diào)用的兩種方法,階段編號(hào)的使用可以在到達(dá)移相器時(shí)以及在等待其他人時(shí)獨(dú)立控制動(dòng)作。
- Arrival:arrive和arriveAndDeregister方法記錄到達(dá)。這些方法不會(huì)阻塞,但是會(huì)返回一個(gè)關(guān)聯(lián)的到達(dá)phase數(shù)。也就是說,到達(dá)應(yīng)用到的相位器的phase數(shù)。當(dāng)給定階段的最終參與者到達(dá)時(shí),將執(zhí)行可選操作,并且階段將前進(jìn)。這些動(dòng)作由觸發(fā)相位超前的一方執(zhí)行,并由覆蓋方法onAdvance(int,int)安排,該方法也控制終止。重寫此方法類似于為CyclicBarrier提供屏障操作,但比它更具靈活性。
- Waiting:方法awaitAdvance需要一個(gè)指示到達(dá)階段編號(hào)的參數(shù),并在移相器前進(jìn)到(或已經(jīng)處于)另一個(gè)階段時(shí)返回。與使用CyclicBarrier的類似構(gòu)造不同,即使等待線程被中斷,方法awaitAdvance仍繼續(xù)等待。也提供中斷版本和超時(shí)版本,但是任務(wù)中斷或超時(shí)等待時(shí)遇到的異常不會(huì)更改相位器的狀態(tài)。如有必要,通??梢栽谡{(diào)用forceTermination之后,在這些異常的處理程序內(nèi)執(zhí)行任何關(guān)聯(lián)的恢復(fù)。在ForkJoinPool中執(zhí)行的任務(wù)也可以使用相位器,當(dāng)其他任務(wù)被阻止等待階段前進(jìn)時(shí),它將確保足夠的并行度來執(zhí)行任務(wù)。
- Termination:移相器可能會(huì)進(jìn)入終止?fàn)顟B(tài),可以使用方法isTerminated檢查該狀態(tài)。終止后,所有同步方法將立即返回,而無需等待提前,如負(fù)的返回值所示。同樣,終止注冊(cè)的嘗試也無效。當(dāng)onAdvance的調(diào)用返回true時(shí),將觸發(fā)終止。如果注銷導(dǎo)致注冊(cè)方的數(shù)量變?yōu)榱?,則默認(rèn)實(shí)現(xiàn)返回true。如下所示,當(dāng)相位器以固定的迭代次數(shù)控制動(dòng)作時(shí),通常方便的方法是在當(dāng)前相位數(shù)達(dá)到閾值時(shí)覆蓋此方法以終止。方法forceTermination也可用于突然釋放等待線程并允許它們終止。
- Tiering:hasers可以分層(即,以樹形結(jié)構(gòu)構(gòu)建)以減少競(jìng)爭(zhēng)。相反,可以設(shè)置具有大量參與方的相位器,否則它們將承受沉重的同步競(jìng)爭(zhēng)成本,以便子相位器組共享一個(gè)公共父級(jí)。即使這會(huì)招致更大的每次操作開銷,也可能會(huì)大大提高吞吐量。在分層的相位器樹中,將自動(dòng)管理子相位器及其父級(jí)的注冊(cè)和注銷。只要子相位器的注冊(cè)方數(shù)量不為零(在Phaser(Phaser,int)構(gòu)造函數(shù),寄存器或bulkRegister中確定),子相位器就會(huì)向其父級(jí)注冊(cè)。每當(dāng)調(diào)用到達(dá)和取消注冊(cè)的結(jié)果而導(dǎo)致注冊(cè)方的數(shù)量為零時(shí),子相位器都會(huì)從其父級(jí)注銷。
- Monitoring:雖然同步方法只能由注冊(cè)方調(diào)用,但是相位器的當(dāng)前狀態(tài)可以由任何調(diào)用者監(jiān)視。在任何給定時(shí)刻,總共有g(shù)etRegisteredParties參與者,其中g(shù)etArrivedParties已到達(dá)當(dāng)前階段(getPhase)。當(dāng)其余(getUnarrivedParties)方到達(dá)時(shí),該階段前進(jìn)。這些方法返回的值可能反映瞬態(tài),因此通常對(duì)于同步控制沒有用。方法toString以便于非正式監(jiān)視的形式返回這些狀態(tài)查詢的快照。
用法示例
可以使用Phaser代替CountDownLatch來控制為可變方提供服務(wù)的單發(fā)操作。典型的習(xí)慣用法是將方法設(shè)置為首先注冊(cè),然后開始操作,然后注銷,如下所示:
void runTasks(List<Runnable> tasks) { final Phaser phaser = new Phaser(1); // "1" to register self // create and start threads for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { phaser.arriveAndAwaitAdvance(); // await all creation task.run(); } }.start(); } // allow threads to start and deregister self phaser.arriveAndDeregister(); }
使一組線程針對(duì)給定的迭代次數(shù)重復(fù)執(zhí)行操作的一種方法是重寫onAdvance:
void startTasks(List<Runnable> tasks, final int iterations) { final Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations || registeredParties == 0; } }; phaser.register(); for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); } }.start(); } phaser.arriveAndDeregister(); // deregister self, don't wait }
如果主要任務(wù)以后必須等待終止,則它可以重新注冊(cè),然后執(zhí)行類似的循環(huán):
// ... phaser.register(); while (!phaser.isTerminated()) phaser.arriveAndAwaitAdvance();
在確保相位永遠(yuǎn)不會(huì)環(huán)繞Integer.MAX_VALUE的情況下,可以使用相關(guān)的構(gòu)造來等待特定的相位編號(hào)。例如:
void awaitPhase(Phaser phaser, int phase) { int p = phaser.register(); // assumes caller not already registered while (p < phase) { if (phaser.isTerminated()) // ... deal with unexpected termination else p = phaser.arriveAndAwaitAdvance(); } phaser.arriveAndDeregister(); }
要使用移相器樹創(chuàng)建一組n個(gè)任務(wù),可以使用以下形式的代碼,假設(shè)Task類的構(gòu)造函數(shù)接受在構(gòu)造時(shí)注冊(cè)的Phaser。在調(diào)用build(new Task [n],0,n,new Phaser())之后,可以啟動(dòng)這些任務(wù),例如通過提交到池中:
void build(Task[] tasks, int lo, int hi, Phaser ph) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(tasks, i, j, new Phaser(ph)); } } else { for (int i = lo; i < hi; ++i) tasks[i] = new Task(ph); // assumes new Task(ph) performs ph.register() } }
TASKS_PER_PHASER的最佳值主要取決于預(yù)期的同步速率。對(duì)于極小的每階段任務(wù)主體(因此是高比率),低至4的值可能合適,而對(duì)于極大型的任務(wù)階段,則最高可能為數(shù)百。實(shí)施注意事項(xiàng):此實(shí)施將參與方的最大數(shù)量限制為65535。嘗試注冊(cè)其他參與方會(huì)導(dǎo)致IllegalStateException。但是,您可以并且應(yīng)該創(chuàng)建分層的相位器,以容納任意數(shù)量的參與者。
構(gòu)造方法和公共方法
int arriveAndAwaitAdvance() //當(dāng)前線程到達(dá)屏障后等待滿足條件后再向下一個(gè)屏障執(zhí)行,當(dāng)計(jì)數(shù)不滿足時(shí)線程會(huì)一直阻塞 int arriveAndDeregister() //使當(dāng)前線程退出,并使parties值減1 int getPhase() //獲取已滿足多少次屏障 protected boolean onAdvance(int phase, int registeredParties) //通過新的屏障時(shí)被調(diào)用,在創(chuàng)建Phaser時(shí)子類實(shí)現(xiàn)邏輯,返回true使phaser失效 int getRegisteredParties() //獲取注冊(cè)的parties數(shù)量 int register() // 每一次調(diào)用就動(dòng)態(tài)遞增一個(gè)parties int bulkRegister(int parties) //多個(gè)添加parties int getArrivedParties() // 獲取已被使用的parties個(gè)數(shù) int getUnarrivedParties() // 獲取未被使用的parties個(gè)數(shù) int arrive() //使parties個(gè)數(shù)加一,并且當(dāng)前線程不用等待其他線程到達(dá)屏障;當(dāng)其他線程計(jì)數(shù)不足,依然會(huì)處于等待狀態(tài) int awaitAdvance(int phase) //如果入?yún)hase值和當(dāng)前getPhase()方法值一致,則等待屏障,否則忽略屏障,當(dāng)前線程不可被中斷 int awaitAdvanceInterruptibly(int phase) //當(dāng)前線程可被中斷,當(dāng)前的parties不滿足入?yún)r(shí)跳過屏障 int awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit) //當(dāng)前線程可被中斷,當(dāng)前的parties不滿足入?yún)r(shí)跳過屏障或者超過指定時(shí)間 void forceTermination() //使phaser屏障功能失效 boolean isTerminated() //判斷phaser對(duì)象是否呈銷毀狀態(tài)
到此這篇關(guān)于Java中的Phaser使用詳解的文章就介紹到這了,更多相關(guān)Java中的Phaser內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java String轉(zhuǎn)換時(shí)為null的解決方法
這篇文章主要介紹了Java String轉(zhuǎn)換時(shí)為null的解決方法,需要的朋友可以參考下2017-07-07spring boot實(shí)現(xiàn)驗(yàn)證碼功能
這篇文章主要為大家詳細(xì)介紹了spring boot實(shí)現(xiàn)驗(yàn)證碼功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-07-07java 定時(shí)器Timer和TimerTask的使用詳解(執(zhí)行和暫停)
這篇文章主要介紹了java 定時(shí)器Timer和TimerTask的使用詳解(執(zhí)行和暫停),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2023-11-11Java數(shù)據(jù)結(jié)構(gòu)學(xué)習(xí)之棧和隊(duì)列
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)學(xué)習(xí)之棧和隊(duì)列,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有一定的幫助,需要的朋友可以參考下2021-05-05Java使用POI解析帶圖片的excel文件(簡(jiǎn)潔好用!)
這篇文章主要給大家介紹了關(guān)于Java如何使用POI解析帶圖片的excel文件的相關(guān)資料,最近項(xiàng)目需要讀取excel中的信息,帶圖片,所以這里給大家總結(jié)下,需要的朋友可以參考下2023-08-08使用feign傳遞參數(shù)類型為MultipartFile的問題
這篇文章主要介紹了使用feign傳遞參數(shù)類型為MultipartFile的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03java報(bào)錯(cuò):找不到或無法加載主類的解決方法簡(jiǎn)單粗暴
本文主要介紹了java報(bào)錯(cuò):找不到或無法加載主類的解決方法簡(jiǎn)單粗暴,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01Java使用訪問者模式解決公司層級(jí)結(jié)構(gòu)圖問題詳解
這篇文章主要介紹了Java使用訪問者模式解決公司層級(jí)結(jié)構(gòu)圖問題,結(jié)合實(shí)例形式分析了訪問者模式的概念、原理及Java使用訪問者模式解決公司曾經(jīng)結(jié)構(gòu)圖問題的相關(guān)操作技巧與注意事項(xiàng),需要的朋友可以參考下2018-04-04SpringBoot深入刨析數(shù)據(jù)層技術(shù)
這篇文章主要介紹了SpringBoot數(shù)據(jù)層技術(shù)的解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08