詳解Java如何實(shí)現(xiàn)多線程步調(diào)一致
CountDownLatch的用法
CountDownLatch
主要有兩個(gè)方法:
countDown()
:用于使計(jì)數(shù)器減一,一般是執(zhí)行任務(wù)的線程調(diào)用。await()
:調(diào)用該方法的線程處于等待狀態(tài),一般是主線程調(diào)用。
這里需要注意的是
countDown()
方法并沒有規(guī)定一個(gè)線程只能調(diào)用一次,當(dāng)同一個(gè)線程調(diào)用多次countDown()
方法時(shí),每次都會(huì)使計(jì)數(shù)器減一;
await()
方法也并沒有規(guī)定只能有一個(gè)線程執(zhí)行該方法,如果多個(gè)線程同時(shí)執(zhí)行await()
方法,那么這幾個(gè)線程都將處于等待狀態(tài),并且以共享模式享有同一個(gè)鎖。
如下是其使用示例:
private static void countDownLatch() throws InterruptedException { ? ?CountDownLatch latch = new CountDownLatch(5); ? ?ExecutorService service = Executors.newFixedThreadPool(5); ? ? ?for (int i = 0; i < 4; i++) { ? ? ?service.submit( ? ? ? ? () -> { ? ? ? ? ? ?action(); ? ? ? ? ? ?latch.countDown(); ? ? ? ? }); ? } ? ? ?service.awaitTermination(50, TimeUnit.MILLISECONDS); ? ?latch.await(); ? ?System.out.println("Done"); ? ?service.shutdownNow(); } ? ?private static void action() { ? ?System.out.printf("當(dāng)前線程[%s], 正在執(zhí)行。。。\n", Thread.currentThread().getName()); } ?
CountDownLatch實(shí)現(xiàn)原理
// java.util.concurrent.CountDownLatch public class CountDownLatch { ? ?/** ? ? * Synchronization control For CountDownLatch. ? ? * Uses AQS state to represent count. ? ? */ ? ?private static final class Sync extends AbstractQueuedSynchronizer { ? ? ? ?private static final long serialVersionUID = 4982264981922014374L; ? ? ? ? ?Sync(int count) { ? ? ? ? ? ?setState(count); ? ? ? } ? ? ? ? ?int getCount() { ? ? ? ? ? ?return getState(); ? ? ? } ? ? ? ? ?protected int tryAcquireShared(int acquires) { ? ? ? ? ? ?return (getState() == 0) ? 1 : -1; ? ? ? } ? ? ? ? ?protected boolean tryReleaseShared(int releases) { ? ? ? ? ? ?// Decrement count; signal when transition to zero ? ? ? ? ? ?for (;;) { ? ? ? ? ? ? ? ?int c = getState(); ? ? ? ? ? ? ? ?if (c == 0) ? ? ? ? ? ? ? ? ? ?return false; ? ? ? ? ? ? ? ?int nextc = c - 1; ? ? ? ? ? ? ? ?if (compareAndSetState(c, nextc)) ? ? ? ? ? ? ? ? ? ?return nextc == 0; ? ? ? ? ? } ? ? ? } ? } ? ? ?private final Sync sync; ? ?public CountDownLatch(int count) { ? ? ? ?if (count < 0) throw new IllegalArgumentException("count < 0"); ? ? ? ?this.sync = new Sync(count); ? } ? ? ?public void await() throws InterruptedException { ? ? ? ?sync.acquireSharedInterruptibly(1); ? } ? ?public boolean await(long timeout, TimeUnit unit) ? ? ? ?throws InterruptedException { ? ? ? ?return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); ? } ? ?public void countDown() { ? ? ? ?sync.releaseShared(1); ? } ? }
從上述摘錄的源代碼來看,CountDownLatch
的實(shí)現(xiàn)還是依賴于AQS
,這跟之前討論過的ReentrantLock
的實(shí)現(xiàn)原理基本一致。
接下來,我們主要分析一下await()
和countDown()
的實(shí)現(xiàn)原理。
await()
步驟如下:
public final void acquireSharedInterruptibly(int arg) ?throws InterruptedException { ?if (Thread.interrupted()) ? ?throw new InterruptedException(); ?if (tryAcquireShared(arg) < 0) // 查看state是否為0,如果為0,直接返回,如果不為0,則調(diào)用doAcquireSharedInterruptibly()阻塞等待state變?yōu)?. ? ?doAcquireSharedInterruptibly(arg); }
countDown()
步驟如下:
public final boolean releaseShared(int arg) { ?if (tryReleaseShared(arg)) { // state--,如果state變?yōu)?,則執(zhí)行doReleaseShared(),喚醒等待隊(duì)列中的線程 ? ?doReleaseShared(); ? ?return true; } ?return false; }
CyclicBarrier的用法
CyclicBarrier
主要是有兩個(gè)方法:
await()
:計(jì)數(shù)器減1,當(dāng)計(jì)數(shù)器的值為0,喚醒等待的線程。
reset()
:重置計(jì)數(shù)器
public class CyclicBarrierDemo { ? ?public static void main(String args[]) throws InterruptedException, BrokenBarrierException { ? ? ?CyclicBarrier barrier = new CyclicBarrier(4); ? ?Party first = new Party(1000, barrier, "PARTY-1"); ? ?Party second = new Party(2000, barrier, "PARTY-2"); ? ?Party third = new Party(3000, barrier, "PARTY-3"); ? ?Party fourth = new Party(4000, barrier, "PARTY-4"); ? ? ?first.start(); ? ?second.start(); ? ?third.start(); ? ?fourth.start(); ? ? ?System.out.println(Thread.currentThread().getName() + " has finished"); } } ? class Party extends Thread { ?private int duration; ?private CyclicBarrier barrier; ? ?public Party(int duration, CyclicBarrier barrier, String name) { ? ?super(name); ? ?this.duration = duration; ? ?this.barrier = barrier; } ? ?@Override ?public void run() { ? ?try { ? ? ?Thread.sleep(duration); ? ? ?System.out.println(Thread.currentThread().getName() + " is calling await()"); ? ? ?barrier.await(); ? ? ?System.out.println(Thread.currentThread().getName() + " has started running again"); ? } catch (InterruptedException | BrokenBarrierException e) { ? ? ?e.printStackTrace(); ? } } }
CyclicBarrier實(shí)現(xiàn)原理
CyclicBarrier
與CountDownLatch
不同,CountDownLatch
的同步是基于AQS
同步器,而CyclicBarrier
的同步是基于條件變量的,部分源代碼如下:
public class CyclicBarrier { ? ?private static class Generation { ? ? ? ?Generation() {} ? ? ? ? ? ? ? ? // prevent access constructor creation ? ? ? ?boolean broken; ? ? ? ? ? ? ? ? // initially false ? } ? ? ?/** The lock for guarding barrier entry */ ? ?private final ReentrantLock lock = new ReentrantLock(); ? ?/** Condition to wait on until tripped */ ? ?private final Condition trip = lock.newCondition(); ? ?/** The number of parties */ ? ?private final int parties; ? ?/** The command to run when tripped */ ? ?private final Runnable barrierCommand; ? ?/** The current generation */ ? ?private Generation generation = new Generation(); ? ? ?/** ? ? * Number of parties still waiting. Counts down from parties to 0 ? ? * on each generation. It is reset to parties on each new ? ? * generation or when broken. ? ? */ ? ?private int count; ? ? ?private void nextGeneration() { ? ? ? ?// signal completion of last generation ? ? ? ?trip.signalAll(); ? ? ? ?// set up next generation ? ? ? ?count = parties; ? ? ? ?generation = new Generation(); ? } ? ? ?private void breakBarrier() { ? ? ? ?generation.broken = true; ? ? ? ?count = parties; ? ? ? ?trip.signalAll(); ? } ? ? ?/** ? ? * Main barrier code, covering the various policies. ? ? */ ? ?private int dowait(boolean timed, long nanos) ? ? ? ?throws InterruptedException, BrokenBarrierException, ? ? ? ? ? ? ? TimeoutException { ? ? ? ?final ReentrantLock lock = this.lock; ? ? ? ?lock.lock(); ? ? ? ?try { ? ? ? ? ? ?final Generation g = generation; // 如果generation.broken為true的話,說明這個(gè)屏障已經(jīng)損壞,當(dāng)某個(gè)線程await的時(shí)候,直接拋出異常 ? ? ? ? ? ?if (g.broken) ? ? ? ? ? ? ? ?throw new BrokenBarrierException(); ? ? ? ? ? ? ?if (Thread.interrupted()) { ? ? ? ? ? ? ? ?breakBarrier(); ? ? ? ? ? ? ? ?throw new InterruptedException(); ? ? ? ? ? } ? ? ? ? ? ? ?int index = --count; ? ? ? ? ? ?// 當(dāng)count == 0時(shí),說明所有線程都已經(jīng)到屏障處, ? ? ? ? ? ?if (index == 0) { ?// tripped ? ? ? ? ? ? ? ?boolean ranAction = false; ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?final Runnable command = barrierCommand; ? ? ? ? ? ? ? ? ? ?if (command != null) ? ? ? ? ? ? ? ? ? ? ? ?command.run(); ? ? ? ? ? ? ? ? ? ?ranAction = true; ? ? ? ? ? ? ? ? ? ?// 執(zhí)行條件變量的signalAll方法喚醒等待的線程。 ? ? ? ? ? ? ? ? ? ?// 實(shí)現(xiàn)屏障的循環(huán)使用,重置 ? ? ? ? ? ? ? ? ? ?nextGeneration(); ? ? ? ? ? ? ? ? ? ?return 0; ? ? ? ? ? ? ? } finally { ? ? ? ? ? ? ? ? ? ?if (!ranAction) ? ? ? ? ? ? ? ? ? ? ? ?breakBarrier(); ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? ? ? ? ?// loop until tripped, broken, interrupted, or timed out ? ? ? ? ? ?// 如果count!= 0,說明有線程還未到屏障處,則在鎖條件變量trip上等待。 ? ? ? ? ? ?for (;;) { ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?if (!timed) ? ? ? ? ? ? ? ? ? ? ? ?trip.await(); ? ? ? ? ? ? ? ? ? ?else if (nanos > 0L) ? ? ? ? ? ? ? ? ? ? ? ?nanos = trip.awaitNanos(nanos); ? ? ? ? ? ? ? } catch (InterruptedException ie) { ? ? ? ? ? ? ? ? ? ?if (g == generation && ! g.broken) { ? ? ? ? ? ? ? ? ? ? ? ?breakBarrier(); ? ? ? ? ? ? ? ? ? ? ? ?throw ie; ? ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? ? ?// We're about to finish waiting even if we had not ? ? ? ? ? ? ? ? ? ? ? ?// been interrupted, so this interrupt is deemed to ? ? ? ? ? ? ? ? ? ? ? ?// "belong" to subsequent execution. ? ? ? ? ? ? ? ? ? ? ? ?Thread.currentThread().interrupt(); ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ?if (g.broken) ? ? ? ? ? ? ? ? ? ?throw new BrokenBarrierException(); ? ? ? ? ? ? ? ? ?if (g != generation) ? ? ? ? ? ? ? ? ? ?return index; ? ? ? ? ? ? ? ? ?if (timed && nanos <= 0L) { ? ? ? ? ? ? ? ? ? ?breakBarrier(); ? ? ? ? ? ? ? ? ? ?throw new TimeoutException(); ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? } finally { ? ? ? ? ? ?lock.unlock(); ? ? ? } ? } ? ?//.... ? ?public int await() throws InterruptedException, BrokenBarrierException { ? ? ? ?try { ? ? ? ? ? ?return dowait(false, 0L); ? ? ? } catch (TimeoutException toe) { ? ? ? ? ? ?throw new Error(toe); // cannot happen ? ? ? } ? } ? ?public int await(long timeout, TimeUnit unit) ? ? ? ?throws InterruptedException, ? ? ? ? ? ? ? BrokenBarrierException, ? ? ? ? ? ? ? TimeoutException { ? ? ? ?return dowait(true, unit.toNanos(timeout)); ? } ? ?public void reset() { ? ? ? ?final ReentrantLock lock = this.lock; ? ? ? ?lock.lock(); ? ? ? ?try { ? ? ? ? ? ?breakBarrier(); ? // break the current generation ? ? ? ? ? ?nextGeneration(); // start a new generation ? ? ? } finally { ? ? ? ? ? ?lock.unlock(); ? ? ? } ? } ? }
極客時(shí)間專欄的案例
案例
用戶通過在線商城下單,會(huì)生成電子訂單,保存在訂單庫;之后物流就會(huì)生成派送單給用戶發(fā)貨,派送單存在派送單庫。為了防止漏派送或者重復(fù)派送,對(duì)賬系統(tǒng)每天都會(huì)校驗(yàn)是否存在異常訂單,將差異寫在訂單庫。
代碼示例:
while(存在未對(duì)賬訂單){ ?// 查詢未對(duì)賬訂單 ?pos = getPOrders(); ?// 查詢派送單 ?dos = getDOrders(); ?// 執(zhí)行對(duì)賬操作 ?diff = check(pos, dos); ?// 差異寫入差異庫 ?save(diff); } ?
優(yōu)化
單線程的執(zhí)行示意圖:
目前的對(duì)賬系統(tǒng),由于訂單量和派送單量巨大,所以getPOrders()和getDOrders()相對(duì)較慢,另外這兩個(gè)操作沒有先后順序的依賴。這兩個(gè)操作并行以后,你會(huì)發(fā)現(xiàn),吞吐量近乎單線程的2倍,示意圖如下:
代碼示例:
while(存在未對(duì)賬訂單){ ?// 查詢未對(duì)賬訂單 ?Thread T1 = new Thread(()->{ ? ?pos = getPOrders(); }); ?T1.start(); ?// 查詢派送單 ?Thread T2 = new Thread(()->{ ? ?dos = getDOrders(); }); ?T2.start(); ?// 等待 T1、T2 結(jié)束 ?T1.join(); ?T2.join(); ?// 執(zhí)行對(duì)賬操作 ?diff = check(pos, dos); ?// 差異寫入差異庫 ?save(diff); } ?
我們創(chuàng)建兩個(gè)線程T1,T2,并行執(zhí)行g(shù)etPOrders()和getDOrders()。需要等t1,t2都執(zhí)行完畢后,才能執(zhí)行對(duì)賬操作,差異寫入差異庫的操作。
用CountDownLatch實(shí)現(xiàn)線程等待
上述代碼的缺點(diǎn):while循環(huán),每次都會(huì)創(chuàng)建新的線程,創(chuàng)建線程是個(gè)重量級(jí)操作,所以最好能把創(chuàng)建出來的線程重復(fù)利用--線程池
我們創(chuàng)建一個(gè)固定大小為2的線程池,之后在while循環(huán)里重復(fù)利用。但是在線程池方案里,線程根本就不會(huì)退出,執(zhí)行join失效,導(dǎo)致check和save不知道啥時(shí)執(zhí)行。
Join方案無效,那么我們可以采用計(jì)數(shù)器的方案來實(shí)現(xiàn),比如計(jì)數(shù)器的初始值是2,當(dāng)執(zhí)行完getPOrders(),計(jì)數(shù)器減一,變?yōu)?,當(dāng)執(zhí)行完getDOrders()計(jì)數(shù)器再減一,變?yōu)?,當(dāng)計(jì)數(shù)器變?yōu)?的時(shí)候,開始執(zhí)行check,save操作。
利用CountDownLatch就可以實(shí)現(xiàn)
代碼示例:
// 創(chuàng)建 2 個(gè)線程的線程池 Executor executor = ?Executors.newFixedThreadPool(2); while(存在未對(duì)賬訂單){ ?// 計(jì)數(shù)器初始化為 2 ?CountDownLatch latch = ? ?new CountDownLatch(2); ?// 查詢未對(duì)賬訂單 ?executor.execute(()-> { ? ?pos = getPOrders(); ? ? // 計(jì)數(shù)器-1 ? ?latch.countDown(); }); ?// 查詢派送單 ?executor.execute(()-> { ? ?dos = getDOrders(); ? ? ?// 計(jì)數(shù)器-1 ? ?latch.countDown(); }); ?// 等待兩個(gè)查詢操作結(jié)束 計(jì)數(shù)器變?yōu)? ?latch.await(); ?// 執(zhí)行對(duì)賬操作 ?diff = check(pos, dos); ?// 差異寫入差異庫 ?save(diff); } ?
進(jìn)一步優(yōu)化性能
前面我們將getPOrders()和getDOrders()并行了,事實(shí)上查詢操作和對(duì)賬操作也是可以并行的。在執(zhí)行對(duì)賬操作的時(shí)候,可以同時(shí)去執(zhí)行下一輪的查詢操作。過程示意圖:
兩次查詢操作能夠和對(duì)賬操作并行,對(duì)賬操作還依賴查詢操作的結(jié)果。這是典型的生產(chǎn)者消費(fèi)者模型。
兩次查詢是生產(chǎn)者,對(duì)賬操作是消費(fèi)者。
需要隊(duì)列來保存生產(chǎn)者生產(chǎn)的數(shù)據(jù),消費(fèi)者從隊(duì)列中消費(fèi)數(shù)據(jù)。
本案例需要兩個(gè)隊(duì)列
訂單隊(duì)列和派送單隊(duì)列,這兩個(gè)隊(duì)列的元素之間是一一對(duì)應(yīng)的。對(duì)賬操作可以從訂單隊(duì)列里取出一個(gè)元素,從派送單隊(duì)列里取出一個(gè)元素,然后對(duì)這兩個(gè)元素進(jìn)行對(duì)賬操作,這樣數(shù)據(jù)一定不會(huì)亂掉
線程T1執(zhí)行訂單的查詢工作,線程T2執(zhí)行派送單的查詢工作,當(dāng)T1 T2都各自生產(chǎn)完1條數(shù)據(jù)后,通知T3執(zhí)行對(duì)賬操作。要求T1和T2的步調(diào)一致,不能一個(gè)太快一個(gè)太慢。只有這樣才能做到各自生產(chǎn)完1條數(shù)據(jù)的時(shí)候,是一一對(duì)應(yīng)的,并且能通知T3。
用CyclicBarrier實(shí)現(xiàn)同步
計(jì)數(shù)器初始化為2,當(dāng)T1 T2生產(chǎn)完一條數(shù)據(jù)都將計(jì)數(shù)器-1,如果計(jì)數(shù)器大于0,則線程T1或者T2等待。如果計(jì)數(shù)器=0,則通知線程T3,并喚醒等待線程T1或者T2,與此同時(shí),計(jì)數(shù)器重置為2.
// 訂單隊(duì)列 Vector<P> pos; // 派送單隊(duì)列 Vector<D> dos; // 執(zhí)行回調(diào)的線程池 Executor executor = ?Executors.newFixedThreadPool(1); final CyclicBarrier barrier = ?new CyclicBarrier(2, ()->{ ? ?executor.execute(()->check()); }); void check(){ ?P p = pos.remove(0); ?D d = dos.remove(0); ?// 執(zhí)行對(duì)賬操作 ?diff = check(p, d); ?// 差異寫入差異庫 ?save(diff); } void checkAll(){ ?// 循環(huán)查詢訂單庫 ?Thread T1 = new Thread(()->{ ? ?while(存在未對(duì)賬訂單){ ? ? ?// 查詢訂單庫 ? ? ?pos.add(getPOrders()); ? ? ?// 等待 ? ? ?barrier.await(); ? } }); ?T1.start(); ? ?// 循環(huán)查詢運(yùn)單庫 ?Thread T2 = new Thread(()->{ ? ?while(存在未對(duì)賬訂單){ ? ? ?// 查詢運(yùn)單庫 ? ? ?dos.add(getDOrders()); ? ? ?// 等待 ? ? ?barrier.await(); ? } }); ?T2.start(); } ?
區(qū)別
CountDownLatch用來解決一個(gè)線程等待多個(gè)線程的場(chǎng)景
CyclicBarrier是一組線程之間互相等待,計(jì)數(shù)器可以循環(huán)利用。
以上就是詳解Java如何實(shí)現(xiàn)多線程步調(diào)一致的詳細(xì)內(nèi)容,更多關(guān)于Java多線程同步的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解如何在Java項(xiàng)目中實(shí)現(xiàn)信號(hào)的連續(xù)接收
在Java項(xiàng)目中,信號(hào)的連續(xù)接收是一項(xiàng)重要的任務(wù),特別是在處理異步事件或者需要對(duì)外部事件做出響應(yīng)時(shí),本篇博客將介紹如何在Java項(xiàng)目中實(shí)現(xiàn)信號(hào)的連續(xù)接收,包括信號(hào)的監(jiān)聽、處理和停止等步驟,需要的朋友可以參考下2023-11-11springboot整合redis實(shí)現(xiàn)發(fā)送郵箱并驗(yàn)證
大家好,本篇文章主要講的是springboot整合redis實(shí)現(xiàn)發(fā)送郵箱并驗(yàn)證,感興趣的同學(xué)趕快來看一看吧,對(duì)你有幫助的話記得收藏一下2022-01-01使用kotlin集成springboot開發(fā)的超詳細(xì)教程
目前大多數(shù)都在使用java集成 springboot進(jìn)行開發(fā),本文演示僅僅將 java換成 kotlin,其他不變的情況下進(jìn)行開發(fā),需要的朋友可以參考下2021-09-09Java的數(shù)據(jù)類型和參數(shù)傳遞(詳解)
下面小編就為大家?guī)硪黄狫ava的數(shù)據(jù)類型和參數(shù)傳遞(詳解)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-07-07Java簡(jiǎn)單計(jì)時(shí)的實(shí)現(xiàn)案例(可以用來限時(shí)循環(huán))
這篇文章主要介紹了Java簡(jiǎn)單計(jì)時(shí)的實(shí)現(xiàn)案例(可以用來限時(shí)循環(huán)),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-08-08一文搞懂SpringMVC中@InitBinder注解的使用
@InitBinder方法可以注冊(cè)控制器特定的java.bean.PropertyEditor或Spring Converter和 Formatter組件。本文通過示例為大家詳細(xì)講講@InitBinder注解的使用,需要的可以參考一下2022-06-06WebUploader實(shí)現(xiàn)圖片上傳功能
這篇文章主要為大家詳細(xì)介紹了WebUploader實(shí)現(xiàn)圖片上傳功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-03-03