欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中的Phaser使用詳解

 更新時(shí)間:2023年11月30日 09:36:57   作者:小柴林  
這篇文章主要介紹了Java中的Phaser使用詳解,與其他障礙不同,注冊(cè)在phaser上進(jìn)行同步的parties數(shù)量可能會(huì)隨時(shí)間變化,任務(wù)可以隨時(shí)進(jìn)行注冊(cè),需要的朋友可以參考下

Java中的Phaser

JDK1.7新特性

可重用的同步屏障,其功能類似于CyclicBarrier和CountDownLatch,但支持更靈活的用法。

  • Registration:與其他障礙不同,注冊(cè)在phaser上進(jìn)行同步的parties數(shù)量可能會(huì)隨時(shí)間變化。任務(wù)可以隨時(shí)進(jìn)行注冊(cè)(使用方法register,bulkRegister或建立初始方數(shù)目的構(gòu)造函數(shù)形式),也可以選擇在任何到達(dá)時(shí)注銷(使用ArcadeAndDeregister)。與大多數(shù)基本同步結(jié)構(gòu)一樣,registration和deregistration會(huì)只影響內(nèi)部計(jì)數(shù)。它們不會(huì)建立任何進(jìn)一步的內(nèi)部簿記,因此任務(wù)無法查詢它們是否已注冊(cè)。 (但是,可以通過通過子類來引入這種記帳方法。)
  • Synchronization:像CyclicBarrier一樣,Phaser可能會(huì)反復(fù)等待。arriveAndAwaitAdvance方法的作用類似于CyclicBarrier.await。每一代的phaser都有一個(gè)關(guān)聯(lián)的相數(shù)。相數(shù)從零開始,并在所有各方到達(dá)移相器時(shí)被遞增,在達(dá)到Integer.MAX_VALUE后回繞為零。通過使用任何注冊(cè)方可以調(diào)用的兩種方法,階段編號(hào)的使用可以在到達(dá)移相器時(shí)以及在等待其他人時(shí)獨(dú)立控制動(dòng)作。
  • Arrival:arrive和arriveAndDeregister方法記錄到達(dá)。這些方法不會(huì)阻塞,但是會(huì)返回一個(gè)關(guān)聯(lián)的到達(dá)phase數(shù)。也就是說,到達(dá)應(yīng)用到的相位器的phase數(shù)。當(dāng)給定階段的最終參與者到達(dá)時(shí),將執(zhí)行可選操作,并且階段將前進(jìn)。這些動(dòng)作由觸發(fā)相位超前的一方執(zhí)行,并由覆蓋方法onAdvance(int,int)安排,該方法也控制終止。重寫此方法類似于為CyclicBarrier提供屏障操作,但比它更具靈活性。
  • Waiting:方法awaitAdvance需要一個(gè)指示到達(dá)階段編號(hào)的參數(shù),并在移相器前進(jìn)到(或已經(jīng)處于)另一個(gè)階段時(shí)返回。與使用CyclicBarrier的類似構(gòu)造不同,即使等待線程被中斷,方法awaitAdvance仍繼續(xù)等待。也提供中斷版本和超時(shí)版本,但是任務(wù)中斷或超時(shí)等待時(shí)遇到的異常不會(huì)更改相位器的狀態(tài)。如有必要,通??梢栽谡{(diào)用forceTermination之后,在這些異常的處理程序內(nèi)執(zhí)行任何關(guān)聯(lián)的恢復(fù)。在ForkJoinPool中執(zhí)行的任務(wù)也可以使用相位器,當(dāng)其他任務(wù)被阻止等待階段前進(jìn)時(shí),它將確保足夠的并行度來執(zhí)行任務(wù)。
  • Termination:移相器可能會(huì)進(jìn)入終止?fàn)顟B(tài),可以使用方法isTerminated檢查該狀態(tài)。終止后,所有同步方法將立即返回,而無需等待提前,如負(fù)的返回值所示。同樣,終止注冊(cè)的嘗試也無效。當(dāng)onAdvance的調(diào)用返回true時(shí),將觸發(fā)終止。如果注銷導(dǎo)致注冊(cè)方的數(shù)量變?yōu)榱?,則默認(rèn)實(shí)現(xiàn)返回true。如下所示,當(dāng)相位器以固定的迭代次數(shù)控制動(dòng)作時(shí),通常方便的方法是在當(dāng)前相位數(shù)達(dá)到閾值時(shí)覆蓋此方法以終止。方法forceTermination也可用于突然釋放等待線程并允許它們終止。
  • Tiering:hasers可以分層(即,以樹形結(jié)構(gòu)構(gòu)建)以減少競(jìng)爭(zhēng)。相反,可以設(shè)置具有大量參與方的相位器,否則它們將承受沉重的同步競(jìng)爭(zhēng)成本,以便子相位器組共享一個(gè)公共父級(jí)。即使這會(huì)招致更大的每次操作開銷,也可能會(huì)大大提高吞吐量。在分層的相位器樹中,將自動(dòng)管理子相位器及其父級(jí)的注冊(cè)和注銷。只要子相位器的注冊(cè)方數(shù)量不為零(在Phaser(Phaser,int)構(gòu)造函數(shù),寄存器或bulkRegister中確定),子相位器就會(huì)向其父級(jí)注冊(cè)。每當(dāng)調(diào)用到達(dá)和取消注冊(cè)的結(jié)果而導(dǎo)致注冊(cè)方的數(shù)量為零時(shí),子相位器都會(huì)從其父級(jí)注銷。
  • Monitoring:雖然同步方法只能由注冊(cè)方調(diào)用,但是相位器的當(dāng)前狀態(tài)可以由任何調(diào)用者監(jiān)視。在任何給定時(shí)刻,總共有g(shù)etRegisteredParties參與者,其中g(shù)etArrivedParties已到達(dá)當(dāng)前階段(getPhase)。當(dāng)其余(getUnarrivedParties)方到達(dá)時(shí),該階段前進(jìn)。這些方法返回的值可能反映瞬態(tài),因此通常對(duì)于同步控制沒有用。方法toString以便于非正式監(jiān)視的形式返回這些狀態(tài)查詢的快照。

用法示例

可以使用Phaser代替CountDownLatch來控制為可變方提供服務(wù)的單發(fā)操作。典型的習(xí)慣用法是將方法設(shè)置為首先注冊(cè),然后開始操作,然后注銷,如下所示:

 void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         phaser.arriveAndAwaitAdvance(); // await all creation
         task.run();
       }
     }.start();
   }
 
   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 }

使一組線程針對(duì)給定的迭代次數(shù)重復(fù)執(zhí)行操作的一種方法是重寫onAdvance:

void startTasks(List<Runnable> tasks, final int iterations) {
   final Phaser phaser = new Phaser() {
     protected boolean onAdvance(int phase, int registeredParties) {
       return phase >= iterations || registeredParties == 0;
     }
   };
   phaser.register();
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         do {
           task.run();
           phaser.arriveAndAwaitAdvance();
         } while (!phaser.isTerminated());
       }
     }.start();
   }
   phaser.arriveAndDeregister(); // deregister self, don't wait
 }

如果主要任務(wù)以后必須等待終止,則它可以重新注冊(cè),然后執(zhí)行類似的循環(huán):

// ...
phaser.register();
while (!phaser.isTerminated())
  phaser.arriveAndAwaitAdvance();

在確保相位永遠(yuǎn)不會(huì)環(huán)繞Integer.MAX_VALUE的情況下,可以使用相關(guān)的構(gòu)造來等待特定的相位編號(hào)。例如:

 void awaitPhase(Phaser phaser, int phase) {
   int p = phaser.register(); // assumes caller not already registered
   while (p < phase) {
     if (phaser.isTerminated())
       // ... deal with unexpected termination
     else
       p = phaser.arriveAndAwaitAdvance();
   }
   phaser.arriveAndDeregister();
 }

要使用移相器樹創(chuàng)建一組n個(gè)任務(wù),可以使用以下形式的代碼,假設(shè)Task類的構(gòu)造函數(shù)接受在構(gòu)造時(shí)注冊(cè)的Phaser。在調(diào)用build(new Task [n],0,n,new Phaser())之后,可以啟動(dòng)這些任務(wù),例如通過提交到池中:

 void build(Task[] tasks, int lo, int hi, Phaser ph) {
   if (hi - lo > TASKS_PER_PHASER) {
     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
       int j = Math.min(i + TASKS_PER_PHASER, hi);
       build(tasks, i, j, new Phaser(ph));
     }
   } else {
     for (int i = lo; i < hi; ++i)
       tasks[i] = new Task(ph);
       // assumes new Task(ph) performs ph.register()
   }
 }

TASKS_PER_PHASER的最佳值主要取決于預(yù)期的同步速率。對(duì)于極小的每階段任務(wù)主體(因此是高比率),低至4的值可能合適,而對(duì)于極大型的任務(wù)階段,則最高可能為數(shù)百。實(shí)施注意事項(xiàng):此實(shí)施將參與方的最大數(shù)量限制為65535。嘗試注冊(cè)其他參與方會(huì)導(dǎo)致IllegalStateException。但是,您可以并且應(yīng)該創(chuàng)建分層的相位器,以容納任意數(shù)量的參與者。

構(gòu)造方法和公共方法

int arriveAndAwaitAdvance()     //當(dāng)前線程到達(dá)屏障后等待滿足條件后再向下一個(gè)屏障執(zhí)行,當(dāng)計(jì)數(shù)不滿足時(shí)線程會(huì)一直阻塞
int arriveAndDeregister()       //使當(dāng)前線程退出,并使parties值減1
int getPhase()                  //獲取已滿足多少次屏障
protected boolean onAdvance(int phase, int registeredParties)  //通過新的屏障時(shí)被調(diào)用,在創(chuàng)建Phaser時(shí)子類實(shí)現(xiàn)邏輯,返回true使phaser失效
int getRegisteredParties()      //獲取注冊(cè)的parties數(shù)量
int register()                 // 每一次調(diào)用就動(dòng)態(tài)遞增一個(gè)parties
int bulkRegister(int parties)   //多個(gè)添加parties
int getArrivedParties()        // 獲取已被使用的parties個(gè)數(shù)
 int getUnarrivedParties()     // 獲取未被使用的parties個(gè)數(shù)
 
int arrive()                    //使parties個(gè)數(shù)加一,并且當(dāng)前線程不用等待其他線程到達(dá)屏障;當(dāng)其他線程計(jì)數(shù)不足,依然會(huì)處于等待狀態(tài)
int awaitAdvance(int phase)     //如果入?yún)hase值和當(dāng)前getPhase()方法值一致,則等待屏障,否則忽略屏障,當(dāng)前線程不可被中斷
int awaitAdvanceInterruptibly(int phase) //當(dāng)前線程可被中斷,當(dāng)前的parties不滿足入?yún)r(shí)跳過屏障
int awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit) //當(dāng)前線程可被中斷,當(dāng)前的parties不滿足入?yún)r(shí)跳過屏障或者超過指定時(shí)間
void forceTermination()         //使phaser屏障功能失效
 boolean isTerminated()         //判斷phaser對(duì)象是否呈銷毀狀態(tài)  

到此這篇關(guān)于Java中的Phaser使用詳解的文章就介紹到這了,更多相關(guān)Java中的Phaser內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論