java多線程之并發(fā)工具類CountDownLatch,CyclicBarrier和Semaphore
CountDownLatch
CountDownLatch允許一個(gè)或多個(gè)線程等待其他線程完成操作。
假設(shè)一個(gè)Excel文件有多個(gè)sheet,我們需要去記錄每個(gè)sheet有多少行數(shù)據(jù),
這時(shí)我們就可以使用CountDownLatch實(shí)現(xiàn)主線程等待所有sheet線程完成sheet的解析操作后,再繼續(xù)執(zhí)行自己的任務(wù)。
public class CountDownLatchTest { private static class WorkThread extends Thread { private CountDownLatch cdl; public WorkThread(String name, CountDownLatch cdl) { super(name); this.cdl = cdl; } public void run() { System.out.println(this.getName() + "啟動(dòng)了,時(shí)間為" + System.currentTimeMillis()); System.out.println(this.getName() + "我要統(tǒng)計(jì)每個(gè)sheet的行數(shù)"); try { cdl.await(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.getName() + "執(zhí)行完了,時(shí)間為" + System.currentTimeMillis()); } } private static class sheetThread extends Thread { private CountDownLatch cdl; public sheetThread(String name, CountDownLatch cdl) { super(name); this.cdl = cdl; } public void run() { try { System.out.println(this.getName() + "啟動(dòng)了,時(shí)間為" + System.currentTimeMillis()); Thread.sleep(1000); //模擬任務(wù)執(zhí)行耗時(shí) cdl.countDown(); System.out.println(this.getName() + "執(zhí)行完了,時(shí)間為" + System.currentTimeMillis() + " sheet的行數(shù)為:" + (int) (Math.random()*100)); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { CountDownLatch cdl = new CountDownLatch(2); WorkThread wt0 = new WorkThread("WorkThread", cdl ); wt0.start(); sheetThread dt0 = new sheetThread("sheetThread1", cdl); sheetThread dt1 = new sheetThread("sheetThread2", cdl); dt0.start(); dt1.start(); } }
執(zhí)行結(jié)果:
WorkThread啟動(dòng)了,時(shí)間為1640054503027
WorkThread我要統(tǒng)計(jì)每個(gè)sheet的行數(shù)
sheetThread1啟動(dòng)了,時(shí)間為1640054503028
sheetThread2啟動(dòng)了,時(shí)間為1640054503029
sheetThread2執(zhí)行完了,時(shí)間為1640054504031 sheet的行數(shù)為:6
sheetThread1執(zhí)行完了,時(shí)間為1640054504031 sheet的行數(shù)為:44
WorkThread執(zhí)行完了,時(shí)間為1640054505036
可以看到,首先WorkThread執(zhí)行await后開(kāi)始等待,WorkThread在等待sheetThread1和sheetThread2都執(zhí)行完自己的任務(wù)后,WorkThread立刻繼續(xù)執(zhí)行后面的代碼。
CountDownLatch的構(gòu)造函數(shù)接收一個(gè)int類型的參數(shù)作為計(jì)數(shù)器,如果你想等待N個(gè)點(diǎn)完成,這里就傳入N。
當(dāng)我們調(diào)用CountDownLatch的countDown方法時(shí),N就會(huì)減1,CountDownLatch的await方法會(huì)阻塞當(dāng)前線程,直到N變成零。
由于countDown方法可以用在任何地方,所以這里說(shuō)的N個(gè)點(diǎn),可以是N個(gè)線程,也可以是1個(gè)線程里的N個(gè)執(zhí)行步驟。
用在多個(gè)線程時(shí),只需要把這個(gè)CountDownLatch的引用傳遞到線程里即可。
我們繼續(xù)根據(jù)上面的測(cè)試案例流程,一步一步的分析CountDownLatch 源碼。
第一步看CountDownLatch的構(gòu)造方法,傳入一個(gè)不能小于0的int類型的參數(shù)作為計(jì)數(shù)器
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
/** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
看它的注釋,說(shuō)的非常清楚,Sync就是CountDownLatch的同步控制器了,而它也是繼承了AQS,并且第3行注釋說(shuō)到使用了AQS的state去代表count值。
第二步就是工作線程調(diào)用await()方法
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
如果線程中斷,拋出異常,否則開(kāi)始調(diào)用tryAcquireShared(1),其內(nèi)部類Sync的實(shí)現(xiàn)也非常簡(jiǎn)單,就是判斷state也就是CountDownLatch的計(jì)數(shù)是否等于0,
如果等于0,則該方法返回1,第5行的if判斷不成立,否則該方法返回-1,第5行的if判斷成立,繼續(xù)執(zhí)行doAcquireSharedInterruptibly(1)。
/** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
這個(gè)方法其實(shí)就是去獲取共享模式下的鎖,獲取失敗就park住。正如我們測(cè)試案例中的WorkThread線程應(yīng)該次數(shù)就被park住了,那么它又是何時(shí)被喚醒的呢?
下面就到countDown()方法了
public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared(1)方法嘗試去釋放共享鎖
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
在for循環(huán)中,先獲取CountDownLatch的計(jì)數(shù)也就是當(dāng)前state,如果等于0返回false,否則將state更新為state-1,并返回最新的state是否等于0。
因此在我們的測(cè)試案例中,我們需要調(diào)用兩次countDown方法,才會(huì)將全局的state更新為0,然后繼續(xù)執(zhí)行doReleaseShared()方法。
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
LockSupport.unpark(s.thread),喚醒線程的方法被調(diào)用后,WorkThread線程就可以繼續(xù)執(zhí)行了。
至此我們簡(jiǎn)單分析了整個(gè)測(cè)試案例中CountDownLatch的代碼流程。
Semaphore
Semaphore(信號(hào)量)是用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量,相當(dāng)于一個(gè)并發(fā)控制器,構(gòu)造的時(shí)候傳入可供管理的信號(hào)量的數(shù)值,這個(gè)數(shù)值就是用來(lái)控制并發(fā)數(shù)量的,
每個(gè)線程執(zhí)行前先通過(guò)acquire方法獲取信號(hào),執(zhí)行后通過(guò)release歸還信號(hào) 。每次acquire返回成功后,Semaphore可用的信號(hào)量就會(huì)減少一個(gè),如果沒(méi)有可用的信號(hào),
acquire調(diào)用就會(huì)阻塞,等待有release調(diào)用釋放信號(hào)后,acquire才會(huì)得到信號(hào)并返回。
下面我們看個(gè)測(cè)試案例
public class SemaphoreTest { public static void main(String[] args) { final Semaphore semaphore = new Semaphore(5); Runnable runnable = () -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "獲得了信號(hào)量>>>>>,時(shí)間為" + System.currentTimeMillis()); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "釋放了信號(hào)量<<<<<,時(shí)間為" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }; Thread[] threads = new Thread[10]; for (int i = 0; i < threads.length; i++) threads[i] = new Thread(runnable); for (int i = 0; i < threads.length; i++) threads[i].start(); } }
執(zhí)行結(jié)果:
Thread-0獲得了信號(hào)量>>>>>,時(shí)間為1640058647604
Thread-1獲得了信號(hào)量>>>>>,時(shí)間為1640058647604
Thread-2獲得了信號(hào)量>>>>>,時(shí)間為1640058647604
Thread-3獲得了信號(hào)量>>>>>,時(shí)間為1640058647605
Thread-4獲得了信號(hào)量>>>>>,時(shí)間為1640058647605
Thread-0釋放了信號(hào)量<<<<<,時(shí)間為1640058648606
Thread-1釋放了信號(hào)量<<<<<,時(shí)間為1640058648606
Thread-5獲得了信號(hào)量>>>>>,時(shí)間為1640058648607
Thread-4釋放了信號(hào)量<<<<<,時(shí)間為1640058648607
Thread-3釋放了信號(hào)量<<<<<,時(shí)間為1640058648607
Thread-7獲得了信號(hào)量>>>>>,時(shí)間為1640058648607
Thread-8獲得了信號(hào)量>>>>>,時(shí)間為1640058648607
Thread-2釋放了信號(hào)量<<<<<,時(shí)間為1640058648606
Thread-6獲得了信號(hào)量>>>>>,時(shí)間為1640058648607
Thread-9獲得了信號(hào)量>>>>>,時(shí)間為1640058648607
Thread-7釋放了信號(hào)量<<<<<,時(shí)間為1640058649607
Thread-6釋放了信號(hào)量<<<<<,時(shí)間為1640058649607
Thread-8釋放了信號(hào)量<<<<<,時(shí)間為1640058649607
Thread-9釋放了信號(hào)量<<<<<,時(shí)間為1640058649608
Thread-5釋放了信號(hào)量<<<<<,時(shí)間為1640058649607
我們使用for循環(huán)同時(shí)創(chuàng)建10個(gè)線程,首先是線程 0 1 2 3 4獲得了信號(hào)量,再后面的10行打印結(jié)果中,線程1到5分別釋放信號(hào)量,相同線程間隔也是1000毫秒,然后線程5 6 7 8 9才能繼續(xù)獲得信號(hào)量,而且保持最大獲取信號(hào)量的線程數(shù)小于等于5。
看下Semaphore的構(gòu)造方法
public Semaphore(int permits) { sync = new NonfairSync(permits); }
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
它支持傳入一個(gè)int類型的permits,一個(gè)布爾類型的fair,因此Semaphore也有公平模式與非公平模式。
/** * Synchronization implementation for semaphore. Uses AQS state * to represent permits. Subclassed into fair and nonfair * versions. */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
第9行代碼可見(jiàn)Semaphore也是通過(guò)AQS的state來(lái)作為信號(hào)量的計(jì)數(shù)的
第12行 getPermits() 方法獲取當(dāng)前的可用的信號(hào)量,即還有多少線程可以同時(shí)獲得信號(hào)量
第15行nonfairTryAcquireShared方法嘗試獲取共享鎖,邏輯就是直接將可用信號(hào)量減去該方法請(qǐng)求獲取的數(shù)量,更新state并返回該值。
第24行tryReleaseShared 方法嘗試釋放共享鎖,邏輯就是直接將可用信號(hào)量加上該方法請(qǐng)求釋放的數(shù)量,更新state并返回。
再看下Semaphore的公平鎖
/** * Fair version */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
看嘗試獲取共享鎖的方法中,多了個(gè) if (hasQueuedPredecessors) 的判斷,在java多線程6:ReentrantLock,
分析過(guò)hasQueuedPredecessors其實(shí)就是判斷當(dāng)前等待隊(duì)列中是否存在等待線程,并判斷第一個(gè)等待的線程(head.next)是否是當(dāng)前線程。
CyclicBarrier
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開(kāi)門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行。
一組線程同時(shí)被喚醒,讓我們想到了ReentrantLock的Condition,它的signalAll方法可以喚醒a(bǔ)wait在同一個(gè)condition的所有線程。
下面我們還是從一個(gè)簡(jiǎn)單的測(cè)試案例先了解下CyclicBarrier的用法
public class CyclicBarrierTest extends Thread { private CyclicBarrier cb; private int sleepSecond; public CyclicBarrierTest(CyclicBarrier cb, int sleepSecond) { this.cb = cb; this.sleepSecond = sleepSecond; } public void run() { try { System.out.println(this.getName() + "開(kāi)始, 時(shí)間為" + System.currentTimeMillis()); Thread.sleep(sleepSecond * 1000); cb.await(); System.out.println(this.getName() + "結(jié)束, 時(shí)間為" + System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { Runnable runnable = new Runnable() { public void run() { System.out.println("CyclicBarrier的barrierAction開(kāi)始運(yùn)行, 時(shí)間為" + System.currentTimeMillis()); } }; CyclicBarrier cb = new CyclicBarrier(2, runnable); CyclicBarrierTest cbt0 = new CyclicBarrierTest(cb, 3); CyclicBarrierTest cbt1 = new CyclicBarrierTest(cb, 6); cbt0.start(); cbt1.start(); } }
執(zhí)行結(jié)果:
Thread-1開(kāi)始, 時(shí)間為1640069673534
Thread-0開(kāi)始, 時(shí)間為1640069673534
CyclicBarrier的barrierAction開(kāi)始運(yùn)行, 時(shí)間為1640069679536
Thread-1結(jié)束, 時(shí)間為1640069679536
Thread-0結(jié)束, 時(shí)間為1640069679536
可以看到Thread-0和Thread-1同時(shí)運(yùn)行,而自定義的線程barrierAction是在6000毫秒后開(kāi)始執(zhí)行,說(shuō)明Thread-0在await之后,等待了3000毫秒,和Thread-1一起繼續(xù)執(zhí)行的。
看下CyclicBarrier 的一個(gè)更高級(jí)的構(gòu)造函數(shù)
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
parties就是設(shè)定需要多少線程在屏障前等待,只有調(diào)用await方法的線程數(shù)達(dá)到才能喚醒所有的線程,還有注意因?yàn)槭褂肅yclicBarrier的線程都會(huì)阻塞在await方法上,所以在線程池中使用CyclicBarrier時(shí)要特別小心,如果線程池的線程過(guò)少,那么就會(huì)發(fā)生死鎖。
Runnable barrierAction用于在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場(chǎng)景。
/** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
首先是ReentrantLock加鎖,全局的count值-1,然后判斷count是否等于0,如果不等于0,則循環(huán),condition執(zhí)行await等待,直到觸發(fā)、中斷、中斷或超時(shí),如果count值等于0,先執(zhí)行barrierAction線程,然后condition開(kāi)始喚醒所有等待的線程。
簡(jiǎn)單是使用之后,有人會(huì)覺(jué)得CyclicBarrier
和CountDownLatch
有點(diǎn)像,其實(shí)它們兩者有些細(xì)微的差別:
1:CountDownLatch
是在多個(gè)線程都進(jìn)行了latch.countDown()
后才會(huì)觸發(fā)事件,喚醒a(bǔ)wait()在latch上的線程,而執(zhí)行countDown()的線程,是不會(huì)阻塞的;
CyclicBarrier
是一個(gè)柵欄,用于同步所有調(diào)用await()方法的線程,線程執(zhí)行了await()方法之后并不會(huì)執(zhí)行之后的代碼,而只有當(dāng)執(zhí)行await()方法的線程數(shù)等于指定的parties之后,這些執(zhí)行了await()方法的線程才會(huì)同時(shí)運(yùn)行。
2:CountDownLatch
不能循環(huán)使用,計(jì)數(shù)器減為0就減為0了,不能被重置;CyclicBarrier本是就是支持循環(huán)使用parties,而且提供了reset()方法,可以重置計(jì)數(shù)器。
總結(jié)
本篇文章就到這里了,希望能夠給你帶來(lái)幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
Spring Boot(四)之使用JWT和Spring Security保護(hù)REST API
這篇文章主要介紹了Spring Boot(四)之使用JWT和Spring Security保護(hù)REST API的相關(guān)知識(shí),需要的朋友可以參考下2017-04-04Java利用Socket和IO流實(shí)現(xiàn)文件的上傳與下載
本文主要介紹了Java利用Socket和IO流實(shí)現(xiàn)文件的上傳與下載,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-04-04mybatis打印的sql日志不寫入到log文件的問(wèn)題及解決
這篇文章主要介紹了mybatis打印的sql日志不寫入到log文件的問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08基于@RequestParam與@RequestBody使用對(duì)比
這篇文章主要介紹了@RequestParam與@RequestBody的使用對(duì)比,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10java算法Leecode刷題統(tǒng)計(jì)有序矩陣中的負(fù)數(shù)
這篇文章主要為大家介紹了java算法Leecode刷題統(tǒng)計(jì)有序矩陣中的負(fù)數(shù)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10Spring Boot集成MyBatis訪問(wèn)數(shù)據(jù)庫(kù)的方法
這篇文章主要介紹了Spring Boot集成MyBatis訪問(wèn)數(shù)據(jù)庫(kù)的方法,需要的朋友可以參考下2017-04-04