Java的CyclicBarrier循環(huán)屏障解析
CyclicBarrier循環(huán)屏障
CyclicBarrier和CountDownLatch一樣,是一個同步工具類,它允許一組線程相互等待直到達到某個common barrier point。
在程序中CyclicBarrier是非常有用的,它適用于一組線程必須互相等待的情況。barrier被稱為周期是因為等待的線程在釋放后可以重用。
構(gòu)造函數(shù)
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}CyclicBarrier有兩種構(gòu)造方法,默認是指定一個線程數(shù)量,并把它賦給等待狀態(tài)的線程count,還有一個barrierAction任務(wù),它是在滿足barrier時,所有的線程準備好之前,才執(zhí)行這個runnable,這是最后一個線程來執(zhí)行的。parties是表示必須有幾個線程要達到barrier,count是表示當前未達到barrier的線程數(shù)量,只有count=0時,線程才能繼續(xù)執(zhí)行。
看一下它的示例。
public static void main(String[] args) {
CyclicBarrier c = new CyclicBarrier(5,new Runnable() {
@Override
public void run() {
System.out.println("開始");
}
});
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
es.execute(new Task(i, c));
}
es.shutdown();
}
static public class Task implements Runnable {
private int i;
private CyclicBarrier cyclicBarrier;
public Task(int i,CyclicBarrier cyclicBarrier) {
this.i = i;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println(i+"準備完畢");
try {
cyclicBarrier.await();
Thread.sleep(100);
System.out.println(i+"加入了");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}執(zhí)行結(jié)果
0準備完畢
4準備完畢
3準備完畢
1準備完畢
2準備完畢
開始
4加入了
1加入了
2加入了
3加入了
0加入了
看一下它的阻塞await方法。
/**
* 如果當前線程不是最后一個達到的它就會休眠直到出現(xiàn)下面的任意一種情況:
* 1. 最后一個線程達到
* 2. 一些其他的線程中斷了當前線程
* 3. 一些其他的線程中斷了正在等待的線程
* 4. 一些其他線程在等待過程中超時了
* 5. 一些其他的線程在barrier被重置
* 當前線程在等待的過程中如果被中斷直接會拋出異常并且當前線程的中斷狀態(tài)會被清除
* 如果在等待的過程中一些線程被中斷,其他的等待線程就會拋出異常,barrier就會被設(shè)置成broken為true的狀態(tài)
* 如果當前線程是最后一個達到的話,如果構(gòu)造方法中有runnable任務(wù),直接由最后一個線程去執(zhí)行,其他線程繼續(xù)等待
* 在barrier action中,如果出現(xiàn)異常會由當前線程傳播并且barrier被設(shè)置成broken為true的狀態(tài)
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // 永遠不會進入catch
}
}主要調(diào)用了dowait方法,看一下這個方法,方法比較長。
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}首先會創(chuàng)建一個靜態(tài)內(nèi)部類Generation,這個類中只有一個屬性,broken,表示當前屏障是否被中斷,默認為false,未中斷.
如果當前broken為true,就會傳播這個異常.
如果當前線程被中斷,broken為中斷狀態(tài),調(diào)用breakBarrier方法,把broken設(shè)為true,重置count.喚醒其他的線程.
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}如果前面的條件都不成立,那就count自減,表示當前線程已經(jīng)完成了任務(wù),當count=0的時候,表示所有的任務(wù)都已經(jīng)完成,此時在判斷是否構(gòu)造函數(shù)中有傳入的runnable,如果command不為空,總是最后一個線程去執(zhí)行,如果在執(zhí)行這個command的過程中出現(xiàn)異常,再次調(diào)用breakBarrier方法重置count.
執(zhí)行成功,調(diào)用nextGeneration方法,喚醒所有在等待隊列上的線程,重置count.再次把broken設(shè)為false.
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}如果當前count不為0,那就進行自旋,沒有超時直接釋放鎖進入等待隊列,如果線程在釋放鎖進入等待隊列的過程中被中斷直接重置count,拋出異常.
在自旋過程中,如果發(fā)生了broken狀態(tài)的變化或者超時,直接拋出異常,并做相應(yīng)的處理,最后釋放鎖.
總結(jié)
CyclicBarrier的count是可以重用的,這是與CountDownLatch最大的區(qū)別.
barrier中任務(wù)是由最有一個線程執(zhí)行的,并且會執(zhí)行構(gòu)造方法中傳入的任務(wù).這個任務(wù)總是在到達barrier時執(zhí)行.
到此這篇關(guān)于Java的CyclicBarrier循環(huán)屏障解析的文章就介紹到這了,更多相關(guān)CyclicBarrier循環(huán)屏障內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
linux的shell命令檢測某個java程序是否執(zhí)行
ps -ef |grep java|grep2016-04-04
詳解在springboot中使用Mybatis Generator的兩種方式
這篇文章主要介紹了詳解在springboot中使用Mybatis Generator的兩種方式,本文將介紹到在springboot的項目中如何去配置和使用MBG以及MBG生成代碼的兩種方式,非常具有實用價值,需要的朋友可以參考下2018-11-11
SpringBoot開發(fā)案例 分布式集群共享Session詳解
這篇文章主要介紹了SpringBoot開發(fā)案例 分布式集群共享Session詳解,在分布式系統(tǒng)中,為了提升系統(tǒng)性能,通常會對單體項目進行拆分,分解成多個基于功能的微服務(wù),可能還會對單個微服務(wù)進行水平擴展,保證服務(wù)高可用,需要的朋友可以參考下2019-07-07
spring-boot-maven-plugin?配置有啥用
這篇文章主要介紹了spring-boot-maven-plugin?配置是干啥的,這個是SpringBoot的Maven插件,主要用來打包的,通常打包成jar或者war文件,本文通過示例代碼給大家介紹的非常詳細,需要的朋友可以參考下2022-08-08
Mybatis往Mapper.xml文件中傳遞多個參數(shù)問題
這篇文章主要介紹了Mybatis往Mapper.xml文件中傳遞多個參數(shù)問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05

