詳解java CountDownLatch和CyclicBarrier在內(nèi)部實(shí)現(xiàn)和場(chǎng)景上的區(qū)別
前言
CountDownLatch和CyclicBarrier兩個(gè)同為java并發(fā)編程的重要工具類(lèi),它們?cè)谥T多多線程并發(fā)或并行場(chǎng)景中得到了廣泛的應(yīng)用。但兩者就其內(nèi)部實(shí)現(xiàn)和使用場(chǎng)景而言是各有所側(cè)重的。
內(nèi)部實(shí)現(xiàn)差異
前者更多依賴(lài)經(jīng)典的AQS機(jī)制和CAS機(jī)制來(lái)控制器內(nèi)部狀態(tài)的更迭和計(jì)數(shù)器本身的變化,而后者更多依靠可重入Lock等機(jī)制來(lái)控制其內(nèi)部并發(fā)安全性和一致性。
public class { //Synchronization control For CountDownLatch. //Uses AQS state to represent count. private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; ... ...// }
public class CyclicBarrier { /** * Each use of the barrier is represented as a generation instance. * The generation changes whenever the barrier is tripped, or * is reset. There can be many generations associated with threads * using the barrier - due to the non-deterministic way the lock * may be allocated to waiting threads - but only one of these * can be active at a time (the one to which {@code count} applies) * and all the rest are either broken or tripped. * There need not be an active generation if there has been a break * but no subsequent reset. */ private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ private int count; /** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } /** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } ... ... // }
實(shí)戰(zhàn) - 展示各自的使用場(chǎng)景
/** *類(lèi)說(shuō)明:共5個(gè)初始化子線程,6個(gè)閉鎖扣除點(diǎn),扣除完畢后,主線程和業(yè)務(wù)線程才能繼續(xù)執(zhí)行 */ public class UseCountDownLatch { static CountDownLatch latch = new CountDownLatch(6); /*初始化線程*/ private static class InitThread implements Runnable{ public void run() { System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work......"); latch.countDown(); for(int i =0;i<2;i++) { System.out.println("Thread_"+Thread.currentThread().getId() +" ........continue do its work"); } } } /*業(yè)務(wù)線程等待latch的計(jì)數(shù)器為0完成*/ private static class BusiThread implements Runnable{ public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for(int i =0;i<3;i++) { System.out.println("BusiThread_"+Thread.currentThread().getId() +" do business-----"); } } } public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { public void run() { SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 1st......"); latch.countDown(); System.out.println("begin step 2nd......."); SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 2nd......"); latch.countDown(); } }).start(); new Thread(new BusiThread()).start(); for(int i=0;i<=3;i++){ Thread thread = new Thread(new InitThread()); thread.start(); } latch.await(); System.out.println("Main do ites work........"); } }
/** *類(lèi)說(shuō)明:共4個(gè)子線程,他們?nèi)客瓿晒ぷ骱螅怀鲎约航Y(jié)果, *再被統(tǒng)一釋放去做自己的事情,而交出的結(jié)果被另外的線程拿來(lái)拼接字符串 */ class UseCyclicBarrier { private static CyclicBarrier barrier = new CyclicBarrier(4,new CollectThread()); //存放子線程工作結(jié)果的容器 private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<String,Long>(); public static void main(String[] args) { for(int i=0;i<4;i++){ Thread thread = new Thread(new SubThread()); thread.start(); } } /*匯總的任務(wù)*/ private static class CollectThread implements Runnable{ @Override public void run() { StringBuilder result = new StringBuilder(); for(Map.Entry<String,Long> workResult:resultMap.entrySet()){ result.append("["+workResult.getValue()+"]"); } System.out.println(" the result = "+ result); System.out.println("do other business........"); } } /*相互等待的子線程*/ private static class SubThread implements Runnable{ @Override public void run() { long id = Thread.currentThread().getId(); resultMap.put(Thread.currentThread().getId()+"",id); try { Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do something "); barrier.await(); Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do its business "); barrier.await(); } catch (Exception e) { e.printStackTrace(); } } } }
兩者總結(jié)
1. Cyclicbarrier結(jié)果匯總的Runable線程可以重復(fù)被執(zhí)行,通過(guò)多次觸發(fā)await()方法,countdownlatch可以調(diào)用await()方法多次;cyclicbarrier若沒(méi)有結(jié)果匯總,則調(diào)用一次await()就夠了;
2. New cyclicbarrier(threadCount)的線程數(shù)必須與實(shí)際的用戶(hù)線程數(shù)一致;
3. 協(xié)調(diào)線程同時(shí)運(yùn)行:countDownLatch協(xié)調(diào)工作線程執(zhí)行,是由外面線程協(xié)調(diào);cyclicbarrier是由工作線程之間相互協(xié)調(diào)運(yùn)行;
4. 從構(gòu)造函數(shù)上看出:countDownlatch控制運(yùn)行的計(jì)數(shù)器數(shù)量和線程數(shù)沒(méi)有關(guān)系;cyclicbarrier構(gòu)造中傳入的線程數(shù)等于實(shí)際執(zhí)行線程數(shù);
5. countDownLatch在不能基于執(zhí)行子線程的運(yùn)行結(jié)果做處理,而cyclicbarrier可以;
6. 就使用場(chǎng)景而言,countdownlatch 更適用于框架加載前的一系列初始化工作等場(chǎng)景; cyclicbarrier更適用于需要多個(gè)用戶(hù)線程執(zhí)行后,將運(yùn)行結(jié)果匯總再計(jì)算等典型場(chǎng)景;
到此這篇關(guān)于詳解java CountDownLatch和CyclicBarrier在內(nèi)部實(shí)現(xiàn)和場(chǎng)景上的區(qū)別的文章就介紹到這了,更多相關(guān)java CountDownLatch和CyclicBarrier區(qū)別內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
spring-boot-maven-plugin引入出現(xiàn)爆紅(已解決)
這篇文章主要介紹了spring-boot-maven-plugin引入出現(xiàn)爆紅(已解決),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03Java調(diào)用SSE流式接口并流式返回給前端實(shí)現(xiàn)打字輸出效果
在Web開(kāi)發(fā)中,有時(shí)我們需要將文件以流的形式返回給前端,下面這篇文章主要給大家介紹了關(guān)于Java調(diào)用SSE流式接口并流式返回給前端實(shí)現(xiàn)打字輸出效果的相關(guān)資料,需要的朋友可以參考下2024-08-08Java中StringBuilder字符串類(lèi)型的操作方法及API整理
Java中的StringBuffer類(lèi)繼承于AbstractStringBuilder,用來(lái)創(chuàng)建非線程安全的字符串類(lèi)型對(duì)象,下面即是對(duì)Java中StringBuilder字符串類(lèi)型的操作方法及API整理2016-05-05SpringBoot集成thymeleaf渲染html模板的步驟詳解
這篇文章主要給大家詳細(xì)介紹了SpringBoot集成thymeleaf如何使實(shí)現(xiàn)html模板的渲染,文中有詳細(xì)的代碼示例,具有一定的參考價(jià)值,需要的朋友可以參考下2023-06-06java返回集合為null還是空集合及空集合的三種寫(xiě)法小結(jié)
這篇文章主要介紹了java返回集合為null還是空集合及空集合的三種寫(xiě)法小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11淺談Spring Security LDAP簡(jiǎn)介
這篇文章主要介紹了淺談Spring Security LDAP簡(jiǎn)介,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-05-05Mybatis 級(jí)聯(lián)刪除的實(shí)現(xiàn)
這篇文章主要介紹了Mybatis 級(jí)聯(lián)刪除的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11Java并發(fā)編程之線程池實(shí)現(xiàn)原理詳解
池化思想是一種空間換時(shí)間的思想,期望使用預(yù)先創(chuàng)建好的對(duì)象來(lái)減少頻繁創(chuàng)建對(duì)象的性能開(kāi)銷(xiāo),java中有多種池化思想的應(yīng)用,例如:數(shù)據(jù)庫(kù)連接池、線程池等,下面就來(lái)具體講講2023-05-05