Java并發(fā)工具之CyclicBarrier使用詳解
1、CyclicBarrier簡介
CyclicBarrier是一個(gè)同步器,允許一組線程相互之間等待,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point),再繼續(xù)執(zhí)行。
因?yàn)镃yclicBarrier 的計(jì)數(shù)器是可以循環(huán)利用的,所以稱它為循環(huán)(Cyclic) 的 Barrier。
CyclicBarrier常用于多線程計(jì)算數(shù)據(jù),當(dāng)所有線程都完成執(zhí)行后,在CyclicBarrier回調(diào)線程中合并計(jì)算。

2、使用介紹
CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties)其參數(shù)表示屏障(barrier )攔截的線程數(shù) 量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
public class CyclicBarrierDemo {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
//在子線程輸出1
System.out.println("1");
}
}).start();
try {
c.await();
//在主線程輸出2
System.out.println("2");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
因?yàn)橹骶€程和子線程的調(diào)度是由CPU決定的,兩個(gè)線程都有可能先執(zhí)行,所以既有可能先執(zhí)行System.out.println("1");,也有可能先執(zhí)行 System.out.println("2");。
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),則主線程和子線程會永遠(yuǎn)等待, 因?yàn)闆]有第三個(gè)線程執(zhí)行await方法,即沒有第三個(gè)線程到達(dá)屏障,所以之前到達(dá)屏障的兩個(gè) 線程都不會繼續(xù)執(zhí)行。
此外CyclicBarrier還可以設(shè)置回調(diào)函數(shù),它是一個(gè) Runnable 實(shí)例,用于在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行 Runnable 實(shí)例,可以處理更復(fù)雜的業(yè)務(wù)場景。
public class CyclicBarrierDemo {
//實(shí)例化CyclicBarrier,并指定回調(diào)函數(shù)
static CyclicBarrier c = new CyclicBarrier(2,new TestRunable());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("1");
}
}).start();
try {
c.await();
System.out.println("2");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
static class TestRunable implements Runnable{
@Override
public void run() {
System.out.println("3");
}
}
}
輸出結(jié)果:
312
CyclicBarrier方法
- await() 該方法被調(diào)用時(shí)表示當(dāng)前線程已經(jīng)到達(dá)公共屏障點(diǎn) (common barrier point),當(dāng)前線程阻塞進(jìn)入休眠狀態(tài),直到所有線程都到達(dá)屏障點(diǎn),當(dāng)前線程才會被喚醒。
- await(long timeout, TimeUnit unit) await()的重載方法,可以指定阻塞時(shí)長;
- getParties() 返回參與相互等待的線程數(shù);
- isBroken() 判斷此屏障是否處于中斷狀態(tài)。如果因?yàn)闃?gòu)造或最后一次重置而導(dǎo)致中斷或超時(shí),從而使一個(gè)或多個(gè)參與者擺脫此屏障點(diǎn),或者因?yàn)楫惓6鴮?dǎo)致某個(gè)屏障操作失敗,則返回true;否則返回false;
- reset() 將屏障重置為其初始狀態(tài);
- getNumberWaiting() 返回當(dāng)前在屏障處等待的參與者數(shù)目,此方法主要用于調(diào)試。
3、使用案例
如果項(xiàng)目中有這樣一個(gè)需求,計(jì)算每個(gè)客戶指定年份消費(fèi)總金額,然后對所有年份金額進(jìn)行匯總。我們可以使用一個(gè)固定數(shù)量的線程計(jì)算每年消費(fèi)總金額,將相應(yīng)的結(jié)果存儲在一個(gè)集合中,當(dāng)所有線程完成執(zhí)行其操作時(shí),在CyclicBarrier回調(diào)線程中進(jìn)行匯總以及顯示。代碼如下:
public class UserConsumeInfo {
private String userName;
private List<UserAggreInfo> userAggreInfos;
//省略必要的get set 方法......
}
public class UserAggreInfo {
//年份
private Integer year;
//消費(fèi)金額
private Integer amount;
public UserAggreInfo(Integer year, Integer amount) {
this.year = year;
this.amount = amount;
}
//省略必要的get set 方法......
}
public class CyclicBarrierTest {
private CyclicBarrier cyclicBarrier;
//保存客戶消費(fèi)數(shù)據(jù)
private List<UserConsumeInfo> partialResults = new CopyOnWriteArrayList<>();
private Random random = new Random();
//要統(tǒng)計(jì)的年份
private List<Integer> years;
//統(tǒng)計(jì)的用戶
private List<String> users;
/**
* 構(gòu)造函數(shù)
* @param years
* @param users
*/
public CyclicBarrierTest(List<Integer> years, List<String> users) {
this.years = years;
this.users = users;
}
/**
* 開始計(jì)算用戶消費(fèi)信息
*/
public void startCalc(){
cyclicBarrier = new CyclicBarrier(users.size(), new AggregatorThread());
for (int i = 0; i < users.size(); i++) {
Thread worker = new Thread(new CalcUserAmountThread(users.get(i)));
//把用戶設(shè)置為當(dāng)前線程名稱
worker.setName(users.get(i));
worker.start();
}
}
/**
* 計(jì)算用戶消費(fèi)金額線程
*/
final class CalcUserAmountThread implements Runnable{
//用戶名
private String userName;
public CalcUserAmountThread(String userName) {
this.userName = userName;
}
@Override
public void run() {
UserConsumeInfo userConsumeInfo=new UserConsumeInfo();
userConsumeInfo.setUserName(userName);
//計(jì)算每年的消費(fèi)金額
List<UserAggreInfo> userAggreInfos=new ArrayList<>();
for (int i = 0; i < years.size(); i++) {
Integer num = random.nextInt(1000);
System.out.println("用戶:"+userName+" "+years.get(i)+"年,消費(fèi):"+num+"元");
userAggreInfos.add(new UserAggreInfo(years.get(i),num));
}
userConsumeInfo.setUserAggreInfos(userAggreInfos);
partialResults.add(userConsumeInfo);
try {
System.out.println("用戶:"+userName + "到達(dá)barrier,等待其他用戶.");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
}
}
}
/**
* 聚合線程,算出用戶總的消費(fèi)金額
*/
class AggregatorThread implements Runnable {
@Override
public void run() {
System.out.println("=====統(tǒng)計(jì)用戶消費(fèi)信息=======");
int sum = 0;//所有用戶消費(fèi)金額
for (UserConsumeInfo user : partialResults) {
String userName = user.getUserName();
System.out.println("用戶:"+userName+"消費(fèi)信息如下:");
for (UserAggreInfo aggre : user.getUserAggreInfos()) {
System.out.print("\t"+aggre.getYear()+"年,"+"消費(fèi)"+aggre.getAmount()+"元"+"\t");
sum+=aggre.getAmount();
}
System.out.println();
}
System.out.println("所有用戶消費(fèi)總金額:"+sum);
}
}
}測試:
public static void main(String[] args) {
List<String> userResults=new ArrayList<>();
userResults.add("張三");
userResults.add("李四");
userResults.add("王二");
userResults.add("小米");
userResults.add("小明");
userResults.add("小紅");
userResults.add("小張");
List<Integer> yearResults=new ArrayList<>();
yearResults.add(2018);
yearResults.add(2019);
yearResults.add(2020);
CyclicBarrierTest demo = new CyclicBarrierTest(yearResults,userResults);
demo.startCalc();
}
輸出結(jié)果:
用戶:小明 2018年,消費(fèi):310元
用戶:小紅 2018年,消費(fèi):589元
用戶:李四 2018年,消費(fèi):557元
用戶:小米 2018年,消費(fèi):946元
用戶:張三 2018年,消費(fèi):150元
用戶:王二 2018年,消費(fèi):17元
用戶:小張 2018年,消費(fèi):228元
用戶:小明 2019年,消費(fèi):29元
用戶:王二 2019年,消費(fèi):741元
用戶:小張 2019年,消費(fèi):380元
用戶:王二 2020年,消費(fèi):650元
用戶:小紅 2019年,消費(fèi):412元
用戶:李四 2019年,消費(fèi):453元
用戶:王二到達(dá)barrier,等待其他用戶.
用戶:小明 2020年,消費(fèi):582元
用戶:小米 2019年,消費(fèi):524元
用戶:張三 2019年,消費(fèi):691元
用戶:小米 2020年,消費(fèi):777元
用戶:小明到達(dá)barrier,等待其他用戶.
用戶:李四 2020年,消費(fèi):262元
用戶:小紅 2020年,消費(fèi):631元
用戶:小張 2020年,消費(fèi):34元
用戶:小紅到達(dá)barrier,等待其他用戶.
用戶:李四到達(dá)barrier,等待其他用戶.
用戶:小米到達(dá)barrier,等待其他用戶.
用戶:張三 2020年,消費(fèi):802元
用戶:小張到達(dá)barrier,等待其他用戶.
用戶:張三到達(dá)barrier,等待其他用戶.
=====統(tǒng)計(jì)用戶消費(fèi)信息=======
用戶:王二消費(fèi)信息如下:
2018年,消費(fèi)17元 2019年,消費(fèi)741元 2020年,消費(fèi)650元 共消費(fèi):1408
用戶:小明消費(fèi)信息如下:
2018年,消費(fèi)310元 2019年,消費(fèi)29元 2020年,消費(fèi)582元 共消費(fèi):921
用戶:小米消費(fèi)信息如下:
2018年,消費(fèi)946元 2019年,消費(fèi)524元 2020年,消費(fèi)777元 共消費(fèi):2247
用戶:李四消費(fèi)信息如下:
2018年,消費(fèi)557元 2019年,消費(fèi)453元 2020年,消費(fèi)262元 共消費(fèi):1272
用戶:小紅消費(fèi)信息如下:
2018年,消費(fèi)589元 2019年,消費(fèi)412元 2020年,消費(fèi)631元 共消費(fèi):1632
用戶:小張消費(fèi)信息如下:
2018年,消費(fèi)228元 2019年,消費(fèi)380元 2020年,消費(fèi)34元 共消費(fèi):642
用戶:張三消費(fèi)信息如下:
2018年,消費(fèi)150元 2019年,消費(fèi)691元 2020年,消費(fèi)802元 共消費(fèi):1643
所有用戶消費(fèi)總金額:9765
4、CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計(jì)數(shù)器只能使用一次,而CyclicBarrier的計(jì)數(shù)器可以使用reset()方法重 置。
所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場景。
例如,如果計(jì)算發(fā)生錯誤,可以重置計(jì)數(shù) 器,并讓線程重新執(zhí)行一次。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier 阻塞的線程數(shù)量。
isBroken()方法用來了解阻塞的線程是否被中斷
到此這篇關(guān)于Java并發(fā)工具之CyclicBarrier使用詳解的文章就介紹到這了,更多相關(guān)CyclicBarrier使用詳解內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java根據(jù)模板實(shí)現(xiàn)填充word內(nèi)容并轉(zhuǎn)換為pdf
這篇文章主要為大家詳細(xì)介紹了java如何根據(jù)模板實(shí)現(xiàn)填充word內(nèi)容并轉(zhuǎn)換為pdf,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-04-04
springAOP的三種實(shí)現(xiàn)方式示例代碼
這篇文章主要介紹了springAOP的三種實(shí)現(xiàn)方式,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07
聊聊BeanUtils.copyProperties和clone()方法的區(qū)別
這篇文章主要介紹了聊聊BeanUtils.copyProperties和clone()方法的區(qū)別,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
使用springmvc運(yùn)行流程分析,手寫spring框架嘗試
這篇文章主要介紹了使用springmvc運(yùn)行流程分析,手寫spring框架嘗試,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10
Java 中解決Unsupported major.minor version 51.0的問題
本文主要介紹解決Unsupported major.minor version 51.0的問題, 這里給大家整理了詳細(xì)資料,有需要的小伙伴可以參考下2016-08-08

