Java中的Phaser并發(fā)階段器詳解
Phaser并發(fā)階段器
Phaser由JDK1.7提出,是一個復(fù)雜強大的同步輔助類,是對同步工具類CountDownLatch和CyclicBarrier的綜合升級,能夠支持分階段實現(xiàn)等待的業(yè)務(wù)場景。
我們可以回憶下CountDownLatch講的是先指定N個線程,在N個線程干完活之前,其它線程都需要等待(導(dǎo)游等待旅游團所有人上車才能開車),而CyclicBarrier講的是先指定N個線程。等N個線程到齊了大家同時干活(多個驢友相約去旅游,先到的需要等待后來的),而Phaser是兩者的結(jié)合,可以理解為先指定N個線程,等N個線程到齊后開始干第一階段的活,等第一階段所有的線程都干完活了,接著N個線程開始干第二階段的活,直到所有的階段完成工作,程序結(jié)束,當然需要注意的是每個階段可以根據(jù)業(yè)務(wù)需要新增或者刪除一些線程,并不是開始指定多少個線程每個階段就必須有多少個線程。
入門體驗
看了概念可能不容易理解,從一個小demo入手體驗下
public class PhaserDemo1 {
// 指定隨機種子
private static Random random = new Random(System.currentTimeMillis());
public static void main(String[] args) {
Phaser phaser = new Phaser();
// 將線程注冊到phaser
phaser.register();
for (int i = 0; i <5 ; i++) {
Task task = new Task(phaser);
task.start();
}
phaser.arriveAndAwaitAdvance();
System.out.println("all task execute close");
}
static class Task extends Thread{
Phaser phaser;
public Task(Phaser phaser){
this.phaser = phaser;
this.phaser.register();
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"開始執(zhí)行");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println(Thread.currentThread().getName()+"執(zhí)行完畢");
// 類似CountDownLatch中的 await
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}不知道有沒有這樣的疑惑,phaser.register是向phaser去注冊這個線程,那么為什么主線程也需要注冊呢?
其實很簡單主線程需要等待所有子線程執(zhí)行完畢才能繼續(xù)往下面執(zhí)行所以必須要phaser.arriveAndAwaitAdvance();阻塞等待,而這個語句是意思當前線程已經(jīng)到達屏障,在此等待一段時間等條件滿足后需要向下一個屏障繼續(xù)執(zhí)行,如果沒有主線程的phaser.register,直接調(diào)用phaser.arriveAndAwaitAdvance,在源碼中提到可能會有異常,所以必須在主程序中注冊phaser.register();
/* <p>It is a usage error for an unregistered party to invoke this
* method. However, this error may result in an {@code
* IllegalStateException} only upon some subsequent operation on
* this phaser, if ever.
*/
譯:
未注冊方調(diào)用此函數(shù)是一個使用錯誤方法。但是,這個錯誤可能會導(dǎo)致
{@codeIllegalStateException}僅在一些后續(xù)操作這個相位器,如果有的話。Phaser解決分科考試問題
從體驗的示例中其實沒看出其優(yōu)勢在哪里,上訴場景完全可以采用CountDownLatch,所以現(xiàn)在換一種場景來說明Phaser的優(yōu)勢。
假設(shè)某校舉行期末考試,有三門考試語文、數(shù)學(xué)、英語,每門課允許學(xué)生提前交卷,只有當所有學(xué)生完成考試后才能舉行下一次的考試,這就是典型的分階段任務(wù)處理,示例圖如下。

將上訴場景語義化如下
public class PhaserExam {
public static Random random = new Random(System.currentTimeMillis());
public static void main(String[] args) {
// 一次初始化2個 相當于兩次register
Phaser phaser = new Phaser(2);
for (int i = 0; i <2 ; i++) {
Exam exam = new Exam(phaser,random.nextLong());
exam.start();
}
}
static class Exam extends Thread{
Phaser phaser;
Long id;
public Exam(Phaser phaser,Long id){
this.phaser = phaser;
this.id = id;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"===開始語文考試");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println(Thread.currentThread().getName()+"===結(jié)束語文考試");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"===開始數(shù)學(xué)考試");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println(Thread.currentThread().getName()+"===結(jié)束數(shù)學(xué)考試");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"===開始英語考試");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println(Thread.currentThread().getName()+"===結(jié)束英語考試");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}代碼執(zhí)行結(jié)果如下,可以看到三個階段都是等待所有線程執(zhí)行完畢后才往下執(zhí)行,相當于多個柵欄。

到這里請注意,通過Phaser類的構(gòu)造方法構(gòu)建的party數(shù),也就是線程數(shù)需要和循環(huán)的次數(shù)對應(yīng),不然可能影響后續(xù)階段器的正常運行。
兩個重要狀態(tài)
在Phaser內(nèi)有2個重要狀態(tài),分別是phase和party,乍一看很難理解,他們的定義如下。
phase就是階段,如上面提到的語文、數(shù)學(xué)、英語考試這每個考試對應(yīng)一個階段,不過phase是從0開始的,當所有任務(wù)執(zhí)行完畢,準備進入下一個階段時phase就會加一。
party對應(yīng)注冊到Phaser線程數(shù),party初始值有兩種形式
- 方法一就是通過Phaser的有參構(gòu)造初始化party值。
- 方法二采用動態(tài)注冊方法phaser.register()或phaser.bulkRegister(線程數(shù))指定線程數(shù),注銷線程調(diào)用phaser.arriveAndDeregister()方法party值會減一。
Phaser常用API
Phaser常用API總結(jié)如下所示
// 獲取Phaser階段數(shù),默認0 public final int getPhase(); // 向Phaser注冊一個線程 public int register(); // 向Phaser注冊多個線程 public int bulkRegister(int parties); // 獲取已經(jīng)注冊的線程數(shù),也就是重要狀態(tài)party的值 public int getRegisteredParties(); // 到達并且等待其它線程到達 public int arriveAndAwaitAdvance(); // 到達后注銷不等待其它線程,繼續(xù)往下執(zhí)行 public int arriveAndDeregister(); // 已到達線程數(shù) public int getArrivedParties(); // 未到達線程數(shù) public int getUnarrivedParties(); // Phaser是否結(jié)束 只有當party的數(shù)量是0或者調(diào)用方法forceTermination時才會結(jié)束 public boolean isTerminated(); // 結(jié)束Phaser public void forceTermination();
代碼演示如下
public class PhaserApiTest {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser(5);
System.out.println("當前階段"+phaser.getPhase());
System.out.println("注冊線程數(shù)==="+phaser.getRegisteredParties());
// 向phaser注冊一個線程
phaser.register();
System.out.println("注冊線程數(shù)==="+phaser.getRegisteredParties());
// 向phaser注冊多個線程,批量注冊
phaser.bulkRegister(4);
System.out.println("注冊線程數(shù)==="+phaser.getRegisteredParties());
new Thread(()->{
// 到達且等待
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"===執(zhí)行1");
}).start();
new Thread(()->{
// 到達不等待,從phaser中注銷一個線程
phaser.arriveAndDeregister();
System.out.println(Thread.currentThread().getName()+"===執(zhí)行2");
}).start();
TimeUnit.SECONDS.sleep(3);
System.out.println("已到達線程數(shù)==="+phaser.getArrivedParties());
System.out.println("未到達線程數(shù)==="+phaser.getUnarrivedParties());
System.out.println("Phaser是否結(jié)束"+phaser.isTerminated());
phaser.forceTermination();
System.out.println("Phaser是否結(jié)束"+phaser.isTerminated());
}
}執(zhí)行結(jié)果如下所示

arriveAndAwaitAdvance解析
arriveAndAwaitAdvance是Phaser中一個重要實現(xiàn)阻塞的API,其實arriveAndAwaitAdvance是由arrive方法和awaitAdvance方法合并而來,兩個方法的作用分別為
- arrive:到達屏障但不阻塞,返回值為到達的階段號。
- awaitAdvance(int):接收一個 int 值的階段號,在指定的屏障處阻塞。
測試代碼如下
public class PhaserTestArrive {
public static Random random = new Random(System.currentTimeMillis());
public static void main(String[] args) {
Phaser phaser = new Phaser(5);
for (int i = 0; i <5 ; i++) {
new Task(i,phaser).start();
}
phaser.register();
// 主線程需要調(diào)用arrive的原因是主線程注冊的第六個線程還未到達,需要手動到達,才能調(diào)用awaitAdvance阻塞屏障
phaser.arrive();
// 因為Phaser線程數(shù)為6,所以即使5個線程已經(jīng)到達,但是還差主線程的一個,目前階段數(shù)就是0
phaser.awaitAdvance(0);
System.out.println("all task is end");
}
static class Task extends Thread{
Phaser phaser;
public Task(int num,Phaser phaser){
super("Thread--"+String.valueOf(num));
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"===task1 is start");
TimeUnit.SECONDS.sleep(random.nextInt(3));
System.out.println(Thread.currentThread().getName()+"===task1 is end");
// 到達且不等待
phaser.arrive();
System.out.println(Thread.currentThread().getName()+"===task2 is start");
TimeUnit.SECONDS.sleep(random.nextInt(3));
System.out.println(Thread.currentThread().getName()+"===task2 is end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}中斷響應(yīng)
我們需要特別注意的就是Phaser所有API中只有awaitAdvanceInterruptibly是響應(yīng)中斷的,其余全部不會響應(yīng)中斷所以不需要對其進行異常處理,演示如下
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
Thread T1 = new Thread(()->{
try {
phaser.awaitAdvanceInterruptibly(phaser.getPhase());
} catch (InterruptedException e) {
System.out.println("中斷異常");
e.printStackTrace();
}
//phaser.arriveAndAwaitAdvance();
});
T1.start();
T1.interrupt();
phaser.arriveAndAwaitAdvance();
}
到此這篇關(guān)于Java中的Phaser并發(fā)階段器詳解的文章就介紹到這了,更多相關(guān)Phaser并發(fā)階段器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
GsonFormat快速生成JSon實體類的實現(xiàn)
GsonFormat主要用于使用Gson庫將JSONObject格式的String?解析成實體,本文主要介紹了GsonFormat快速生成JSon實體類的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2023-05-05
FastJson踩坑:@JsonField在反序列化時失效的解決
這篇文章主要介紹了FastJson踩坑:@JsonField在反序列化時失效的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06
java mybatis框架實現(xiàn)多表關(guān)系查詢功能
這篇文章主要介紹了java mybatis框架實現(xiàn)多表關(guān)系查詢,基于Maven框架的整體設(shè)計 —— 一多一的關(guān)系,文中通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-10-10

