java多線程之并發(fā)工具類CountDownLatch,CyclicBarrier和Semaphore
CountDownLatch
CountDownLatch允許一個或多個線程等待其他線程完成操作。
假設(shè)一個Excel文件有多個sheet,我們需要去記錄每個sheet有多少行數(shù)據(jù),
這時我們就可以使用CountDownLatch實現(xiàn)主線程等待所有sheet線程完成sheet的解析操作后,再繼續(xù)執(zhí)行自己的任務(wù)。
public class CountDownLatchTest {
private static class WorkThread extends Thread {
private CountDownLatch cdl;
public WorkThread(String name, CountDownLatch cdl) {
super(name);
this.cdl = cdl;
}
public void run() {
System.out.println(this.getName() + "啟動了,時間為" + System.currentTimeMillis());
System.out.println(this.getName() + "我要統(tǒng)計每個sheet的行數(shù)");
try {
cdl.await();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getName() + "執(zhí)行完了,時間為" + System.currentTimeMillis());
}
}
private static class sheetThread extends Thread {
private CountDownLatch cdl;
public sheetThread(String name, CountDownLatch cdl) {
super(name);
this.cdl = cdl;
}
public void run() {
try {
System.out.println(this.getName() + "啟動了,時間為" + System.currentTimeMillis());
Thread.sleep(1000); //模擬任務(wù)執(zhí)行耗時
cdl.countDown();
System.out.println(this.getName() + "執(zhí)行完了,時間為" + System.currentTimeMillis() + " sheet的行數(shù)為:" + (int) (Math.random()*100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
CountDownLatch cdl = new CountDownLatch(2);
WorkThread wt0 = new WorkThread("WorkThread", cdl );
wt0.start();
sheetThread dt0 = new sheetThread("sheetThread1", cdl);
sheetThread dt1 = new sheetThread("sheetThread2", cdl);
dt0.start();
dt1.start();
}
}
執(zhí)行結(jié)果:
WorkThread啟動了,時間為1640054503027
WorkThread我要統(tǒng)計每個sheet的行數(shù)
sheetThread1啟動了,時間為1640054503028
sheetThread2啟動了,時間為1640054503029
sheetThread2執(zhí)行完了,時間為1640054504031 sheet的行數(shù)為:6
sheetThread1執(zhí)行完了,時間為1640054504031 sheet的行數(shù)為:44
WorkThread執(zhí)行完了,時間為1640054505036
可以看到,首先WorkThread執(zhí)行await后開始等待,WorkThread在等待sheetThread1和sheetThread2都執(zhí)行完自己的任務(wù)后,WorkThread立刻繼續(xù)執(zhí)行后面的代碼。
CountDownLatch的構(gòu)造函數(shù)接收一個int類型的參數(shù)作為計數(shù)器,如果你想等待N個點完成,這里就傳入N。
當我們調(diào)用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變成零。
由于countDown方法可以用在任何地方,所以這里說的N個點,可以是N個線程,也可以是1個線程里的N個執(zhí)行步驟。
用在多個線程時,只需要把這個CountDownLatch的引用傳遞到線程里即可。
我們繼續(xù)根據(jù)上面的測試案例流程,一步一步的分析CountDownLatch 源碼。
第一步看CountDownLatch的構(gòu)造方法,傳入一個不能小于0的int類型的參數(shù)作為計數(shù)器
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
看它的注釋,說的非常清楚,Sync就是CountDownLatch的同步控制器了,而它也是繼承了AQS,并且第3行注釋說到使用了AQS的state去代表count值。
第二步就是工作線程調(diào)用await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
如果線程中斷,拋出異常,否則開始調(diào)用tryAcquireShared(1),其內(nèi)部類Sync的實現(xiàn)也非常簡單,就是判斷state也就是CountDownLatch的計數(shù)是否等于0,
如果等于0,則該方法返回1,第5行的if判斷不成立,否則該方法返回-1,第5行的if判斷成立,繼續(xù)執(zhí)行doAcquireSharedInterruptibly(1)。
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這個方法其實就是去獲取共享模式下的鎖,獲取失敗就park住。正如我們測試案例中的WorkThread線程應(yīng)該次數(shù)就被park住了,那么它又是何時被喚醒的呢?
下面就到countDown()方法了
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared(1)方法嘗試去釋放共享鎖
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
在for循環(huán)中,先獲取CountDownLatch的計數(shù)也就是當前state,如果等于0返回false,否則將state更新為state-1,并返回最新的state是否等于0。
因此在我們的測試案例中,我們需要調(diào)用兩次countDown方法,才會將全局的state更新為0,然后繼續(xù)執(zhí)行doReleaseShared()方法。
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
LockSupport.unpark(s.thread),喚醒線程的方法被調(diào)用后,WorkThread線程就可以繼續(xù)執(zhí)行了。
至此我們簡單分析了整個測試案例中CountDownLatch的代碼流程。
Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,相當于一個并發(fā)控制器,構(gòu)造的時候傳入可供管理的信號量的數(shù)值,這個數(shù)值就是用來控制并發(fā)數(shù)量的,
每個線程執(zhí)行前先通過acquire方法獲取信號,執(zhí)行后通過release歸還信號 。每次acquire返回成功后,Semaphore可用的信號量就會減少一個,如果沒有可用的信號,
acquire調(diào)用就會阻塞,等待有release調(diào)用釋放信號后,acquire才會得到信號并返回。
下面我們看個測試案例
public class SemaphoreTest {
public static void main(String[] args) {
final Semaphore semaphore = new Semaphore(5);
Runnable runnable = () -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "獲得了信號量>>>>>,時間為" + System.currentTimeMillis());
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "釋放了信號量<<<<<,時間為" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
};
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++)
threads[i] = new Thread(runnable);
for (int i = 0; i < threads.length; i++)
threads[i].start();
}
}
執(zhí)行結(jié)果:
Thread-0獲得了信號量>>>>>,時間為1640058647604
Thread-1獲得了信號量>>>>>,時間為1640058647604
Thread-2獲得了信號量>>>>>,時間為1640058647604
Thread-3獲得了信號量>>>>>,時間為1640058647605
Thread-4獲得了信號量>>>>>,時間為1640058647605
Thread-0釋放了信號量<<<<<,時間為1640058648606
Thread-1釋放了信號量<<<<<,時間為1640058648606
Thread-5獲得了信號量>>>>>,時間為1640058648607
Thread-4釋放了信號量<<<<<,時間為1640058648607
Thread-3釋放了信號量<<<<<,時間為1640058648607
Thread-7獲得了信號量>>>>>,時間為1640058648607
Thread-8獲得了信號量>>>>>,時間為1640058648607
Thread-2釋放了信號量<<<<<,時間為1640058648606
Thread-6獲得了信號量>>>>>,時間為1640058648607
Thread-9獲得了信號量>>>>>,時間為1640058648607
Thread-7釋放了信號量<<<<<,時間為1640058649607
Thread-6釋放了信號量<<<<<,時間為1640058649607
Thread-8釋放了信號量<<<<<,時間為1640058649607
Thread-9釋放了信號量<<<<<,時間為1640058649608
Thread-5釋放了信號量<<<<<,時間為1640058649607
我們使用for循環(huán)同時創(chuàng)建10個線程,首先是線程 0 1 2 3 4獲得了信號量,再后面的10行打印結(jié)果中,線程1到5分別釋放信號量,相同線程間隔也是1000毫秒,然后線程5 6 7 8 9才能繼續(xù)獲得信號量,而且保持最大獲取信號量的線程數(shù)小于等于5。
看下Semaphore的構(gòu)造方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
它支持傳入一個int類型的permits,一個布爾類型的fair,因此Semaphore也有公平模式與非公平模式。
/**
* Synchronization implementation for semaphore. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
第9行代碼可見Semaphore也是通過AQS的state來作為信號量的計數(shù)的
第12行 getPermits() 方法獲取當前的可用的信號量,即還有多少線程可以同時獲得信號量
第15行nonfairTryAcquireShared方法嘗試獲取共享鎖,邏輯就是直接將可用信號量減去該方法請求獲取的數(shù)量,更新state并返回該值。
第24行tryReleaseShared 方法嘗試釋放共享鎖,邏輯就是直接將可用信號量加上該方法請求釋放的數(shù)量,更新state并返回。
再看下Semaphore的公平鎖
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
看嘗試獲取共享鎖的方法中,多了個 if (hasQueuedPredecessors) 的判斷,在java多線程6:ReentrantLock,
分析過hasQueuedPredecessors其實就是判斷當前等待隊列中是否存在等待線程,并判斷第一個等待的線程(head.next)是否是當前線程。
CyclicBarrier
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運行。
一組線程同時被喚醒,讓我們想到了ReentrantLock的Condition,它的signalAll方法可以喚醒await在同一個condition的所有線程。
下面我們還是從一個簡單的測試案例先了解下CyclicBarrier的用法
public class CyclicBarrierTest extends Thread {
private CyclicBarrier cb;
private int sleepSecond;
public CyclicBarrierTest(CyclicBarrier cb, int sleepSecond) {
this.cb = cb;
this.sleepSecond = sleepSecond;
}
public void run() {
try {
System.out.println(this.getName() + "開始, 時間為" + System.currentTimeMillis());
Thread.sleep(sleepSecond * 1000);
cb.await();
System.out.println(this.getName() + "結(jié)束, 時間為" + System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Runnable runnable = new Runnable() {
public void run() {
System.out.println("CyclicBarrier的barrierAction開始運行, 時間為" + System.currentTimeMillis());
}
};
CyclicBarrier cb = new CyclicBarrier(2, runnable);
CyclicBarrierTest cbt0 = new CyclicBarrierTest(cb, 3);
CyclicBarrierTest cbt1 = new CyclicBarrierTest(cb, 6);
cbt0.start();
cbt1.start();
}
}
執(zhí)行結(jié)果:
Thread-1開始, 時間為1640069673534
Thread-0開始, 時間為1640069673534
CyclicBarrier的barrierAction開始運行, 時間為1640069679536
Thread-1結(jié)束, 時間為1640069679536
Thread-0結(jié)束, 時間為1640069679536
可以看到Thread-0和Thread-1同時運行,而自定義的線程barrierAction是在6000毫秒后開始執(zhí)行,說明Thread-0在await之后,等待了3000毫秒,和Thread-1一起繼續(xù)執(zhí)行的。
看下CyclicBarrier 的一個更高級的構(gòu)造函數(shù)
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
parties就是設(shè)定需要多少線程在屏障前等待,只有調(diào)用await方法的線程數(shù)達到才能喚醒所有的線程,還有注意因為使用CyclicBarrier的線程都會阻塞在await方法上,所以在線程池中使用CyclicBarrier時要特別小心,如果線程池的線程過少,那么就會發(fā)生死鎖。
Runnable barrierAction用于在線程到達屏障時,優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場景。
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
首先是ReentrantLock加鎖,全局的count值-1,然后判斷count是否等于0,如果不等于0,則循環(huán),condition執(zhí)行await等待,直到觸發(fā)、中斷、中斷或超時,如果count值等于0,先執(zhí)行barrierAction線程,然后condition開始喚醒所有等待的線程。
簡單是使用之后,有人會覺得CyclicBarrier和CountDownLatch有點像,其實它們兩者有些細微的差別:
1:CountDownLatch是在多個線程都進行了latch.countDown()后才會觸發(fā)事件,喚醒await()在latch上的線程,而執(zhí)行countDown()的線程,是不會阻塞的;
CyclicBarrier是一個柵欄,用于同步所有調(diào)用await()方法的線程,線程執(zhí)行了await()方法之后并不會執(zhí)行之后的代碼,而只有當執(zhí)行await()方法的線程數(shù)等于指定的parties之后,這些執(zhí)行了await()方法的線程才會同時運行。
2:CountDownLatch不能循環(huán)使用,計數(shù)器減為0就減為0了,不能被重置;CyclicBarrier本是就是支持循環(huán)使用parties,而且提供了reset()方法,可以重置計數(shù)器。
總結(jié)
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
Spring Boot(四)之使用JWT和Spring Security保護REST API
這篇文章主要介紹了Spring Boot(四)之使用JWT和Spring Security保護REST API的相關(guān)知識,需要的朋友可以參考下2017-04-04
Java利用Socket和IO流實現(xiàn)文件的上傳與下載
本文主要介紹了Java利用Socket和IO流實現(xiàn)文件的上傳與下載,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-04-04
mybatis打印的sql日志不寫入到log文件的問題及解決
這篇文章主要介紹了mybatis打印的sql日志不寫入到log文件的問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08
基于@RequestParam與@RequestBody使用對比
這篇文章主要介紹了@RequestParam與@RequestBody的使用對比,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10
java算法Leecode刷題統(tǒng)計有序矩陣中的負數(shù)
這篇文章主要為大家介紹了java算法Leecode刷題統(tǒng)計有序矩陣中的負數(shù)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10
Spring Boot集成MyBatis訪問數(shù)據(jù)庫的方法
這篇文章主要介紹了Spring Boot集成MyBatis訪問數(shù)據(jù)庫的方法,需要的朋友可以參考下2017-04-04

