Java并發(fā)工具之CyclicBarrier使用詳解
1、CyclicBarrier簡介
CyclicBarrier是一個同步器,允許一組線程相互之間等待,直到到達某個公共屏障點 (common barrier point),再繼續(xù)執(zhí)行。
因為CyclicBarrier 的計數(shù)器是可以循環(huán)利用的,所以稱它為循環(huán)(Cyclic) 的 Barrier。
CyclicBarrier常用于多線程計算數(shù)據(jù),當所有線程都完成執(zhí)行后,在CyclicBarrier回調(diào)線程中合并計算。

2、使用介紹
CyclicBarrier默認的構(gòu)造方法是CyclicBarrier(int parties)其參數(shù)表示屏障(barrier )攔截的線程數(shù) 量,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達了屏障,然后當前線程被阻塞。
public class CyclicBarrierDemo {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
//在子線程輸出1
System.out.println("1");
}
}).start();
try {
c.await();
//在主線程輸出2
System.out.println("2");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
因為主線程和子線程的調(diào)度是由CPU決定的,兩個線程都有可能先執(zhí)行,所以既有可能先執(zhí)行System.out.println("1");,也有可能先執(zhí)行 System.out.println("2");。
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),則主線程和子線程會永遠等待, 因為沒有第三個線程執(zhí)行await方法,即沒有第三個線程到達屏障,所以之前到達屏障的兩個 線程都不會繼續(xù)執(zhí)行。
此外CyclicBarrier還可以設置回調(diào)函數(shù),它是一個 Runnable 實例,用于在線程到達屏障時,優(yōu)先執(zhí)行 Runnable 實例,可以處理更復雜的業(yè)務場景。
public class CyclicBarrierDemo {
//實例化CyclicBarrier,并指定回調(diào)函數(shù)
static CyclicBarrier c = new CyclicBarrier(2,new TestRunable());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("1");
}
}).start();
try {
c.await();
System.out.println("2");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
static class TestRunable implements Runnable{
@Override
public void run() {
System.out.println("3");
}
}
}
輸出結(jié)果:
312
CyclicBarrier方法
- await() 該方法被調(diào)用時表示當前線程已經(jīng)到達公共屏障點 (common barrier point),當前線程阻塞進入休眠狀態(tài),直到所有線程都到達屏障點,當前線程才會被喚醒。
- await(long timeout, TimeUnit unit) await()的重載方法,可以指定阻塞時長;
- getParties() 返回參與相互等待的線程數(shù);
- isBroken() 判斷此屏障是否處于中斷狀態(tài)。如果因為構(gòu)造或最后一次重置而導致中斷或超時,從而使一個或多個參與者擺脫此屏障點,或者因為異常而導致某個屏障操作失敗,則返回true;否則返回false;
- reset() 將屏障重置為其初始狀態(tài);
- getNumberWaiting() 返回當前在屏障處等待的參與者數(shù)目,此方法主要用于調(diào)試。
3、使用案例
如果項目中有這樣一個需求,計算每個客戶指定年份消費總金額,然后對所有年份金額進行匯總。我們可以使用一個固定數(shù)量的線程計算每年消費總金額,將相應的結(jié)果存儲在一個集合中,當所有線程完成執(zhí)行其操作時,在CyclicBarrier回調(diào)線程中進行匯總以及顯示。代碼如下:
public class UserConsumeInfo {
private String userName;
private List<UserAggreInfo> userAggreInfos;
//省略必要的get set 方法......
}
public class UserAggreInfo {
//年份
private Integer year;
//消費金額
private Integer amount;
public UserAggreInfo(Integer year, Integer amount) {
this.year = year;
this.amount = amount;
}
//省略必要的get set 方法......
}
public class CyclicBarrierTest {
private CyclicBarrier cyclicBarrier;
//保存客戶消費數(shù)據(jù)
private List<UserConsumeInfo> partialResults = new CopyOnWriteArrayList<>();
private Random random = new Random();
//要統(tǒng)計的年份
private List<Integer> years;
//統(tǒng)計的用戶
private List<String> users;
/**
* 構(gòu)造函數(shù)
* @param years
* @param users
*/
public CyclicBarrierTest(List<Integer> years, List<String> users) {
this.years = years;
this.users = users;
}
/**
* 開始計算用戶消費信息
*/
public void startCalc(){
cyclicBarrier = new CyclicBarrier(users.size(), new AggregatorThread());
for (int i = 0; i < users.size(); i++) {
Thread worker = new Thread(new CalcUserAmountThread(users.get(i)));
//把用戶設置為當前線程名稱
worker.setName(users.get(i));
worker.start();
}
}
/**
* 計算用戶消費金額線程
*/
final class CalcUserAmountThread implements Runnable{
//用戶名
private String userName;
public CalcUserAmountThread(String userName) {
this.userName = userName;
}
@Override
public void run() {
UserConsumeInfo userConsumeInfo=new UserConsumeInfo();
userConsumeInfo.setUserName(userName);
//計算每年的消費金額
List<UserAggreInfo> userAggreInfos=new ArrayList<>();
for (int i = 0; i < years.size(); i++) {
Integer num = random.nextInt(1000);
System.out.println("用戶:"+userName+" "+years.get(i)+"年,消費:"+num+"元");
userAggreInfos.add(new UserAggreInfo(years.get(i),num));
}
userConsumeInfo.setUserAggreInfos(userAggreInfos);
partialResults.add(userConsumeInfo);
try {
System.out.println("用戶:"+userName + "到達barrier,等待其他用戶.");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
}
}
}
/**
* 聚合線程,算出用戶總的消費金額
*/
class AggregatorThread implements Runnable {
@Override
public void run() {
System.out.println("=====統(tǒng)計用戶消費信息=======");
int sum = 0;//所有用戶消費金額
for (UserConsumeInfo user : partialResults) {
String userName = user.getUserName();
System.out.println("用戶:"+userName+"消費信息如下:");
for (UserAggreInfo aggre : user.getUserAggreInfos()) {
System.out.print("\t"+aggre.getYear()+"年,"+"消費"+aggre.getAmount()+"元"+"\t");
sum+=aggre.getAmount();
}
System.out.println();
}
System.out.println("所有用戶消費總金額:"+sum);
}
}
}測試:
public static void main(String[] args) {
List<String> userResults=new ArrayList<>();
userResults.add("張三");
userResults.add("李四");
userResults.add("王二");
userResults.add("小米");
userResults.add("小明");
userResults.add("小紅");
userResults.add("小張");
List<Integer> yearResults=new ArrayList<>();
yearResults.add(2018);
yearResults.add(2019);
yearResults.add(2020);
CyclicBarrierTest demo = new CyclicBarrierTest(yearResults,userResults);
demo.startCalc();
}
輸出結(jié)果:
用戶:小明 2018年,消費:310元
用戶:小紅 2018年,消費:589元
用戶:李四 2018年,消費:557元
用戶:小米 2018年,消費:946元
用戶:張三 2018年,消費:150元
用戶:王二 2018年,消費:17元
用戶:小張 2018年,消費:228元
用戶:小明 2019年,消費:29元
用戶:王二 2019年,消費:741元
用戶:小張 2019年,消費:380元
用戶:王二 2020年,消費:650元
用戶:小紅 2019年,消費:412元
用戶:李四 2019年,消費:453元
用戶:王二到達barrier,等待其他用戶.
用戶:小明 2020年,消費:582元
用戶:小米 2019年,消費:524元
用戶:張三 2019年,消費:691元
用戶:小米 2020年,消費:777元
用戶:小明到達barrier,等待其他用戶.
用戶:李四 2020年,消費:262元
用戶:小紅 2020年,消費:631元
用戶:小張 2020年,消費:34元
用戶:小紅到達barrier,等待其他用戶.
用戶:李四到達barrier,等待其他用戶.
用戶:小米到達barrier,等待其他用戶.
用戶:張三 2020年,消費:802元
用戶:小張到達barrier,等待其他用戶.
用戶:張三到達barrier,等待其他用戶.
=====統(tǒng)計用戶消費信息=======
用戶:王二消費信息如下:
2018年,消費17元 2019年,消費741元 2020年,消費650元 共消費:1408
用戶:小明消費信息如下:
2018年,消費310元 2019年,消費29元 2020年,消費582元 共消費:921
用戶:小米消費信息如下:
2018年,消費946元 2019年,消費524元 2020年,消費777元 共消費:2247
用戶:李四消費信息如下:
2018年,消費557元 2019年,消費453元 2020年,消費262元 共消費:1272
用戶:小紅消費信息如下:
2018年,消費589元 2019年,消費412元 2020年,消費631元 共消費:1632
用戶:小張消費信息如下:
2018年,消費228元 2019年,消費380元 2020年,消費34元 共消費:642
用戶:張三消費信息如下:
2018年,消費150元 2019年,消費691元 2020年,消費802元 共消費:1643
所有用戶消費總金額:9765
4、CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計數(shù)器只能使用一次,而CyclicBarrier的計數(shù)器可以使用reset()方法重 置。
所以CyclicBarrier能處理更為復雜的業(yè)務場景。
例如,如果計算發(fā)生錯誤,可以重置計數(shù) 器,并讓線程重新執(zhí)行一次。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier 阻塞的線程數(shù)量。
isBroken()方法用來了解阻塞的線程是否被中斷
到此這篇關(guān)于Java并發(fā)工具之CyclicBarrier使用詳解的文章就介紹到這了,更多相關(guān)CyclicBarrier使用詳解內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java根據(jù)模板實現(xiàn)填充word內(nèi)容并轉(zhuǎn)換為pdf
這篇文章主要為大家詳細介紹了java如何根據(jù)模板實現(xiàn)填充word內(nèi)容并轉(zhuǎn)換為pdf,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下2025-04-04
聊聊BeanUtils.copyProperties和clone()方法的區(qū)別
這篇文章主要介紹了聊聊BeanUtils.copyProperties和clone()方法的區(qū)別,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
使用springmvc運行流程分析,手寫spring框架嘗試
這篇文章主要介紹了使用springmvc運行流程分析,手寫spring框架嘗試,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10
Java 中解決Unsupported major.minor version 51.0的問題
本文主要介紹解決Unsupported major.minor version 51.0的問題, 這里給大家整理了詳細資料,有需要的小伙伴可以參考下2016-08-08

