Java并發(fā)編程中的CyclicBarrier線程屏障詳解
一、簡介
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運行。下圖演示了這一過程。

二、與countdownlatch的區(qū)別
它的主要作用其實和CountDownLanch差不多,都是讓一組線程到達一個屏障時被阻塞,直到最后一個線程到達屏障時,屏障會被打開,所有被屏障阻塞的線程才會繼續(xù)執(zhí)行,不過它是可以循環(huán)執(zhí)行的,這是它與CountDownLanch最大的不同。

三、核心參數(shù)
/** The lock for guarding barrier entry */
// 可重入鎖
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
// 條件隊列,具體看AQS
private final Condition trip = lock.newCondition();
/** The number of parties */
// 參與的線程數(shù)量
private final int parties;
/* The command to run when tripped */
// 由最后一個進入 barrier 的線程執(zhí)行的操作
private final Runnable barrierCommand;
/** The current generation */
// 當前代
private Generation generation = new Generation();
// 正在等待進入屏障的線程數(shù)量
private int count;四、構造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
// 參與的線程數(shù)量小于等于0,拋出異常
if (parties <= 0) throw new IllegalArgumentException();
// 設置parties
this.parties = parties;
// 設置count
this.count = parties;
// 設置barrierCommand
this.barrierCommand = barrierAction;
}該構造函數(shù)可以指定關聯(lián)該CyclicBarrier的線程數(shù)量,并且可以指定在所有線程都進入屏障后的執(zhí)行動作,該執(zhí)行動作由最后一個進行屏障的線程執(zhí)行。
五、核心方法
await方法
(1)dowait方法
await中調(diào)用dowait
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 保存當前鎖
final ReentrantLock lock = this.lock;
// 鎖定
lock.lock();
try {
// 保存當前代
final Generation g = generation;
if (g.broken) // 屏障被破壞,拋出異常
throw new BrokenBarrierException();
if (Thread.interrupted()) { // 線程被中斷
// 損壞當前屏障,并且喚醒所有的線程,只有擁有鎖的時候才會調(diào)用
breakBarrier();
// 拋出異常
throw new InterruptedException();
}
// 減少正在等待進入屏障的線程數(shù)量
int index = --count;
if (index == 0) { // 正在等待進入屏障的線程數(shù)量為0,所有線程都已經(jīng)進入
// 運行的動作標識
boolean ranAction = false;
try {
// 保存運行動作
final Runnable command = barrierCommand;
if (command != null) // 動作不為空
// 運行
command.run();
// 設置ranAction狀態(tài)
ranAction = true;
// 進入下一代
nextGeneration();
return 0;
} finally {
if (!ranAction) // 沒有運行的動作
// 損壞當前屏障
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 無限循環(huán)
for (;;) {
try {
if (!timed) // 沒有設置等待時間
// 等待
trip.await();
else if (nanos > 0L) // 設置了等待時間,并且等待時間大于0
// 等待指定時長
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) { // 等于當前代并且屏障沒有被損壞
// 損壞當前屏障
breakBarrier();
// 拋出異常
throw ie;
} else { // 不等于當前帶后者是屏障被損壞
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
// 中斷當前線程
Thread.currentThread().interrupt();
}
}
if (g.broken) // 屏障被損壞,拋出異常
throw new BrokenBarrierException();
if (g != generation) // 不等于當前代
// 返回索引
return index;
if (timed && nanos <= 0L) { // 設置了等待時間,并且等待時間小于0
// 損壞屏障
breakBarrier();
// 拋出異常
throw new TimeoutException();
}
}
} finally {
// 釋放鎖
lock.unlock();
}
}(2)核心流程

(3)nextGeneration方法
該方法會喚醒所有線程并且重置次數(shù),這也是為什么CyclicBarrier可以循環(huán)調(diào)用的原因
private void nextGeneration() {
// signal completion of last generation
// 喚醒所有線程
trip.signalAll();
// set up next generation
// 恢復正在等待進入屏障的線程數(shù)量
count = parties;
// 新生一代
generation = new Generation();
}六、應用
static class MyThread extends Thread {
private CyclicBarrier cb;
public MyThread(String name, CyclicBarrier cb) {
super(name);
this.cb = cb;
}
public void run() {
System.out.println(Thread.currentThread().getName() + " going to await");
try {
cb.await();
System.out.println(Thread.currentThread().getName() + " continue");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
CyclicBarrier cb = new CyclicBarrier(3, new Thread("barrierAction") {
public void run() {
System.out.println(Thread.currentThread().getName() + " barrier action");
}
});
MyThread t1 = new MyThread("t1", cb);
MyThread t2 = new MyThread("t2", cb);
t1.start();
t2.start();
System.out.println(Thread.currentThread().getName() + " going to await");
cb.await();
System.out.println(Thread.currentThread().getName() + " continue");
}運行結果:

所以調(diào)用時序為:

到此這篇關于Java并發(fā)編程中的CyclicBarrier線程屏障詳解的文章就介紹到這了,更多相關CyclicBarrier線程屏障內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot開發(fā)案例之配置Druid數(shù)據(jù)庫連接池的示例
本篇文章主要介紹了SpringBoot開發(fā)案例之配置Druid數(shù)據(jù)庫連接池的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-03-03
Java多線程并發(fā)編程(互斥鎖Reentrant Lock)
這篇文章主要介紹了ReentrantLock 互斥鎖,在同一時間只能被一個線程所占有,在被持有后并未釋放之前,其他線程若想獲得該鎖只能等待或放棄,需要的朋友可以參考下2017-05-05

