詳解Java如何實現(xiàn)多線程步調(diào)一致
CountDownLatch的用法
CountDownLatch主要有兩個方法:
countDown():用于使計數(shù)器減一,一般是執(zhí)行任務的線程調(diào)用。await():調(diào)用該方法的線程處于等待狀態(tài),一般是主線程調(diào)用。
這里需要注意的是
countDown()方法并沒有規(guī)定一個線程只能調(diào)用一次,當同一個線程調(diào)用多次countDown()方法時,每次都會使計數(shù)器減一;
await()方法也并沒有規(guī)定只能有一個線程執(zhí)行該方法,如果多個線程同時執(zhí)行await()方法,那么這幾個線程都將處于等待狀態(tài),并且以共享模式享有同一個鎖。
如下是其使用示例:
private static void countDownLatch() throws InterruptedException {
? ?CountDownLatch latch = new CountDownLatch(5);
? ?ExecutorService service = Executors.newFixedThreadPool(5);
?
? ?for (int i = 0; i < 4; i++) {
? ? ?service.submit(
? ? ? ? () -> {
? ? ? ? ? ?action();
? ? ? ? ? ?latch.countDown();
? ? ? ? });
? }
?
? ?service.awaitTermination(50, TimeUnit.MILLISECONDS);
? ?latch.await();
? ?System.out.println("Done");
? ?service.shutdownNow();
}
?
?private static void action() {
? ?System.out.printf("當前線程[%s], 正在執(zhí)行。。。\n", Thread.currentThread().getName());
}
?CountDownLatch實現(xiàn)原理
// java.util.concurrent.CountDownLatch
public class CountDownLatch {
? ?/**
? ? * Synchronization control For CountDownLatch.
? ? * Uses AQS state to represent count.
? ? */
? ?private static final class Sync extends AbstractQueuedSynchronizer {
? ? ? ?private static final long serialVersionUID = 4982264981922014374L;
?
? ? ? ?Sync(int count) {
? ? ? ? ? ?setState(count);
? ? ? }
?
? ? ? ?int getCount() {
? ? ? ? ? ?return getState();
? ? ? }
?
? ? ? ?protected int tryAcquireShared(int acquires) {
? ? ? ? ? ?return (getState() == 0) ? 1 : -1;
? ? ? }
?
? ? ? ?protected boolean tryReleaseShared(int releases) {
? ? ? ? ? ?// Decrement count; signal when transition to zero
? ? ? ? ? ?for (;;) {
? ? ? ? ? ? ? ?int c = getState();
? ? ? ? ? ? ? ?if (c == 0)
? ? ? ? ? ? ? ? ? ?return false;
? ? ? ? ? ? ? ?int nextc = c - 1;
? ? ? ? ? ? ? ?if (compareAndSetState(c, nextc))
? ? ? ? ? ? ? ? ? ?return nextc == 0;
? ? ? ? ? }
? ? ? }
? }
?
? ?private final Sync sync;
? ?public CountDownLatch(int count) {
? ? ? ?if (count < 0) throw new IllegalArgumentException("count < 0");
? ? ? ?this.sync = new Sync(count);
? }
?
? ?public void await() throws InterruptedException {
? ? ? ?sync.acquireSharedInterruptibly(1);
? }
? ?public boolean await(long timeout, TimeUnit unit)
? ? ? ?throws InterruptedException {
? ? ? ?return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
? }
? ?public void countDown() {
? ? ? ?sync.releaseShared(1);
? }
?
}從上述摘錄的源代碼來看,CountDownLatch的實現(xiàn)還是依賴于AQS,這跟之前討論過的ReentrantLock的實現(xiàn)原理基本一致。
接下來,我們主要分析一下await()和countDown()的實現(xiàn)原理。
await() 步驟如下:
public final void acquireSharedInterruptibly(int arg)
?throws InterruptedException {
?if (Thread.interrupted())
? ?throw new InterruptedException();
?if (tryAcquireShared(arg) < 0) // 查看state是否為0,如果為0,直接返回,如果不為0,則調(diào)用doAcquireSharedInterruptibly()阻塞等待state變?yōu)?.
? ?doAcquireSharedInterruptibly(arg);
}countDown() 步驟如下:
public final boolean releaseShared(int arg) {
?if (tryReleaseShared(arg)) { // state--,如果state變?yōu)?,則執(zhí)行doReleaseShared(),喚醒等待隊列中的線程
? ?doReleaseShared();
? ?return true;
}
?return false;
}CyclicBarrier的用法
CyclicBarrier主要是有兩個方法:
await():計數(shù)器減1,當計數(shù)器的值為0,喚醒等待的線程。
reset():重置計數(shù)器
public class CyclicBarrierDemo {
?
?public static void main(String args[]) throws InterruptedException, BrokenBarrierException {
?
? ?CyclicBarrier barrier = new CyclicBarrier(4);
? ?Party first = new Party(1000, barrier, "PARTY-1");
? ?Party second = new Party(2000, barrier, "PARTY-2");
? ?Party third = new Party(3000, barrier, "PARTY-3");
? ?Party fourth = new Party(4000, barrier, "PARTY-4");
?
? ?first.start();
? ?second.start();
? ?third.start();
? ?fourth.start();
?
? ?System.out.println(Thread.currentThread().getName() + " has finished");
}
}
?
class Party extends Thread {
?private int duration;
?private CyclicBarrier barrier;
?
?public Party(int duration, CyclicBarrier barrier, String name) {
? ?super(name);
? ?this.duration = duration;
? ?this.barrier = barrier;
}
?
?@Override
?public void run() {
? ?try {
? ? ?Thread.sleep(duration);
? ? ?System.out.println(Thread.currentThread().getName() + " is calling await()");
? ? ?barrier.await();
? ? ?System.out.println(Thread.currentThread().getName() + " has started running again");
? } catch (InterruptedException | BrokenBarrierException e) {
? ? ?e.printStackTrace();
? }
}
}CyclicBarrier實現(xiàn)原理
CyclicBarrier與CountDownLatch不同,CountDownLatch的同步是基于AQS同步器,而CyclicBarrier的同步是基于條件變量的,部分源代碼如下:
public class CyclicBarrier {
? ?private static class Generation {
? ? ? ?Generation() {} ? ? ? ? ? ? ? ? // prevent access constructor creation
? ? ? ?boolean broken; ? ? ? ? ? ? ? ? // initially false
? }
?
? ?/** The lock for guarding barrier entry */
? ?private final ReentrantLock lock = new ReentrantLock();
? ?/** Condition to wait on until tripped */
? ?private final Condition trip = lock.newCondition();
? ?/** The number of parties */
? ?private final int parties;
? ?/** The command to run when tripped */
? ?private final Runnable barrierCommand;
? ?/** The current generation */
? ?private Generation generation = new Generation();
?
? ?/**
? ? * Number of parties still waiting. Counts down from parties to 0
? ? * on each generation. It is reset to parties on each new
? ? * generation or when broken.
? ? */
? ?private int count;
?
? ?private void nextGeneration() {
? ? ? ?// signal completion of last generation
? ? ? ?trip.signalAll();
? ? ? ?// set up next generation
? ? ? ?count = parties;
? ? ? ?generation = new Generation();
? }
?
? ?private void breakBarrier() {
? ? ? ?generation.broken = true;
? ? ? ?count = parties;
? ? ? ?trip.signalAll();
? }
?
? ?/**
? ? * Main barrier code, covering the various policies.
? ? */
? ?private int dowait(boolean timed, long nanos)
? ? ? ?throws InterruptedException, BrokenBarrierException,
? ? ? ? ? ? ? TimeoutException {
? ? ? ?final ReentrantLock lock = this.lock;
? ? ? ?lock.lock();
? ? ? ?try {
? ? ? ? ? ?final Generation g = generation;
// 如果generation.broken為true的話,說明這個屏障已經(jīng)損壞,當某個線程await的時候,直接拋出異常
? ? ? ? ? ?if (g.broken)
? ? ? ? ? ? ? ?throw new BrokenBarrierException();
?
? ? ? ? ? ?if (Thread.interrupted()) {
? ? ? ? ? ? ? ?breakBarrier();
? ? ? ? ? ? ? ?throw new InterruptedException();
? ? ? ? ? }
?
? ? ? ? ? ?int index = --count;
? ? ? ? ? ?// 當count == 0時,說明所有線程都已經(jīng)到屏障處,
? ? ? ? ? ?if (index == 0) { ?// tripped
? ? ? ? ? ? ? ?boolean ranAction = false;
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?final Runnable command = barrierCommand;
? ? ? ? ? ? ? ? ? ?if (command != null)
? ? ? ? ? ? ? ? ? ? ? ?command.run();
? ? ? ? ? ? ? ? ? ?ranAction = true;
? ? ? ? ? ? ? ? ? ?// 執(zhí)行條件變量的signalAll方法喚醒等待的線程。
? ? ? ? ? ? ? ? ? ?// 實現(xiàn)屏障的循環(huán)使用,重置
? ? ? ? ? ? ? ? ? ?nextGeneration();
? ? ? ? ? ? ? ? ? ?return 0;
? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ?if (!ranAction)
? ? ? ? ? ? ? ? ? ? ? ?breakBarrier();
? ? ? ? ? ? ? }
? ? ? ? ? }
?
? ? ? ? ? ?// loop until tripped, broken, interrupted, or timed out
? ? ? ? ? ?// 如果count!= 0,說明有線程還未到屏障處,則在鎖條件變量trip上等待。
? ? ? ? ? ?for (;;) {
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?if (!timed)
? ? ? ? ? ? ? ? ? ? ? ?trip.await();
? ? ? ? ? ? ? ? ? ?else if (nanos > 0L)
? ? ? ? ? ? ? ? ? ? ? ?nanos = trip.awaitNanos(nanos);
? ? ? ? ? ? ? } catch (InterruptedException ie) {
? ? ? ? ? ? ? ? ? ?if (g == generation && ! g.broken) {
? ? ? ? ? ? ? ? ? ? ? ?breakBarrier();
? ? ? ? ? ? ? ? ? ? ? ?throw ie;
? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ?// We're about to finish waiting even if we had not
? ? ? ? ? ? ? ? ? ? ? ?// been interrupted, so this interrupt is deemed to
? ? ? ? ? ? ? ? ? ? ? ?// "belong" to subsequent execution.
? ? ? ? ? ? ? ? ? ? ? ?Thread.currentThread().interrupt();
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? }
?
? ? ? ? ? ? ? ?if (g.broken)
? ? ? ? ? ? ? ? ? ?throw new BrokenBarrierException();
?
? ? ? ? ? ? ? ?if (g != generation)
? ? ? ? ? ? ? ? ? ?return index;
?
? ? ? ? ? ? ? ?if (timed && nanos <= 0L) {
? ? ? ? ? ? ? ? ? ?breakBarrier();
? ? ? ? ? ? ? ? ? ?throw new TimeoutException();
? ? ? ? ? ? ? }
? ? ? ? ? }
? ? ? } finally {
? ? ? ? ? ?lock.unlock();
? ? ? }
? }
?
?//....
? ?public int await() throws InterruptedException, BrokenBarrierException {
? ? ? ?try {
? ? ? ? ? ?return dowait(false, 0L);
? ? ? } catch (TimeoutException toe) {
? ? ? ? ? ?throw new Error(toe); // cannot happen
? ? ? }
? }
? ?public int await(long timeout, TimeUnit unit)
? ? ? ?throws InterruptedException,
? ? ? ? ? ? ? BrokenBarrierException,
? ? ? ? ? ? ? TimeoutException {
? ? ? ?return dowait(true, unit.toNanos(timeout));
? }
? ?public void reset() {
? ? ? ?final ReentrantLock lock = this.lock;
? ? ? ?lock.lock();
? ? ? ?try {
? ? ? ? ? ?breakBarrier(); ? // break the current generation
? ? ? ? ? ?nextGeneration(); // start a new generation
? ? ? } finally {
? ? ? ? ? ?lock.unlock();
? ? ? }
? }
?
}極客時間專欄的案例
案例
用戶通過在線商城下單,會生成電子訂單,保存在訂單庫;之后物流就會生成派送單給用戶發(fā)貨,派送單存在派送單庫。為了防止漏派送或者重復派送,對賬系統(tǒng)每天都會校驗是否存在異常訂單,將差異寫在訂單庫。

代碼示例:
while(存在未對賬訂單){
?// 查詢未對賬訂單
?pos = getPOrders();
?// 查詢派送單
?dos = getDOrders();
?// 執(zhí)行對賬操作
?diff = check(pos, dos);
?// 差異寫入差異庫
?save(diff);
}
?優(yōu)化
單線程的執(zhí)行示意圖:

目前的對賬系統(tǒng),由于訂單量和派送單量巨大,所以getPOrders()和getDOrders()相對較慢,另外這兩個操作沒有先后順序的依賴。這兩個操作并行以后,你會發(fā)現(xiàn),吞吐量近乎單線程的2倍,示意圖如下:

代碼示例:
while(存在未對賬訂單){
?// 查詢未對賬訂單
?Thread T1 = new Thread(()->{
? ?pos = getPOrders();
});
?T1.start();
?// 查詢派送單
?Thread T2 = new Thread(()->{
? ?dos = getDOrders();
});
?T2.start();
?// 等待 T1、T2 結(jié)束
?T1.join();
?T2.join();
?// 執(zhí)行對賬操作
?diff = check(pos, dos);
?// 差異寫入差異庫
?save(diff);
}
?我們創(chuàng)建兩個線程T1,T2,并行執(zhí)行getPOrders()和getDOrders()。需要等t1,t2都執(zhí)行完畢后,才能執(zhí)行對賬操作,差異寫入差異庫的操作。
用CountDownLatch實現(xiàn)線程等待
上述代碼的缺點:while循環(huán),每次都會創(chuàng)建新的線程,創(chuàng)建線程是個重量級操作,所以最好能把創(chuàng)建出來的線程重復利用--線程池
我們創(chuàng)建一個固定大小為2的線程池,之后在while循環(huán)里重復利用。但是在線程池方案里,線程根本就不會退出,執(zhí)行join失效,導致check和save不知道啥時執(zhí)行。
Join方案無效,那么我們可以采用計數(shù)器的方案來實現(xiàn),比如計數(shù)器的初始值是2,當執(zhí)行完getPOrders(),計數(shù)器減一,變?yōu)?,當執(zhí)行完getDOrders()計數(shù)器再減一,變?yōu)?,當計數(shù)器變?yōu)?的時候,開始執(zhí)行check,save操作。
利用CountDownLatch就可以實現(xiàn)
代碼示例:
// 創(chuàng)建 2 個線程的線程池
Executor executor =
?Executors.newFixedThreadPool(2);
while(存在未對賬訂單){
?// 計數(shù)器初始化為 2
?CountDownLatch latch =
? ?new CountDownLatch(2);
?// 查詢未對賬訂單
?executor.execute(()-> {
? ?pos = getPOrders();
? ? // 計數(shù)器-1
? ?latch.countDown();
});
?// 查詢派送單
?executor.execute(()-> {
? ?dos = getDOrders();
? ? ?// 計數(shù)器-1
? ?latch.countDown();
});
?// 等待兩個查詢操作結(jié)束 計數(shù)器變?yōu)?
?latch.await();
?// 執(zhí)行對賬操作
?diff = check(pos, dos);
?// 差異寫入差異庫
?save(diff);
}
?進一步優(yōu)化性能
前面我們將getPOrders()和getDOrders()并行了,事實上查詢操作和對賬操作也是可以并行的。在執(zhí)行對賬操作的時候,可以同時去執(zhí)行下一輪的查詢操作。過程示意圖:

兩次查詢操作能夠和對賬操作并行,對賬操作還依賴查詢操作的結(jié)果。這是典型的生產(chǎn)者消費者模型。
兩次查詢是生產(chǎn)者,對賬操作是消費者。
需要隊列來保存生產(chǎn)者生產(chǎn)的數(shù)據(jù),消費者從隊列中消費數(shù)據(jù)。
本案例需要兩個隊列
訂單隊列和派送單隊列,這兩個隊列的元素之間是一一對應的。對賬操作可以從訂單隊列里取出一個元素,從派送單隊列里取出一個元素,然后對這兩個元素進行對賬操作,這樣數(shù)據(jù)一定不會亂掉
線程T1執(zhí)行訂單的查詢工作,線程T2執(zhí)行派送單的查詢工作,當T1 T2都各自生產(chǎn)完1條數(shù)據(jù)后,通知T3執(zhí)行對賬操作。要求T1和T2的步調(diào)一致,不能一個太快一個太慢。只有這樣才能做到各自生產(chǎn)完1條數(shù)據(jù)的時候,是一一對應的,并且能通知T3。
用CyclicBarrier實現(xiàn)同步
計數(shù)器初始化為2,當T1 T2生產(chǎn)完一條數(shù)據(jù)都將計數(shù)器-1,如果計數(shù)器大于0,則線程T1或者T2等待。如果計數(shù)器=0,則通知線程T3,并喚醒等待線程T1或者T2,與此同時,計數(shù)器重置為2.
// 訂單隊列
Vector<P> pos;
// 派送單隊列
Vector<D> dos;
// 執(zhí)行回調(diào)的線程池
Executor executor =
?Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
?new CyclicBarrier(2, ()->{
? ?executor.execute(()->check());
});
void check(){
?P p = pos.remove(0);
?D d = dos.remove(0);
?// 執(zhí)行對賬操作
?diff = check(p, d);
?// 差異寫入差異庫
?save(diff);
}
void checkAll(){
?// 循環(huán)查詢訂單庫
?Thread T1 = new Thread(()->{
? ?while(存在未對賬訂單){
? ? ?// 查詢訂單庫
? ? ?pos.add(getPOrders());
? ? ?// 等待
? ? ?barrier.await();
? }
});
?T1.start(); ?
?// 循環(huán)查詢運單庫
?Thread T2 = new Thread(()->{
? ?while(存在未對賬訂單){
? ? ?// 查詢運單庫
? ? ?dos.add(getDOrders());
? ? ?// 等待
? ? ?barrier.await();
? }
});
?T2.start();
}
?區(qū)別
CountDownLatch用來解決一個線程等待多個線程的場景
CyclicBarrier是一組線程之間互相等待,計數(shù)器可以循環(huán)利用。
以上就是詳解Java如何實現(xiàn)多線程步調(diào)一致的詳細內(nèi)容,更多關于Java多線程同步的資料請關注腳本之家其它相關文章!
相關文章
詳解如何在Java項目中實現(xiàn)信號的連續(xù)接收
在Java項目中,信號的連續(xù)接收是一項重要的任務,特別是在處理異步事件或者需要對外部事件做出響應時,本篇博客將介紹如何在Java項目中實現(xiàn)信號的連續(xù)接收,包括信號的監(jiān)聽、處理和停止等步驟,需要的朋友可以參考下2023-11-11
springboot整合redis實現(xiàn)發(fā)送郵箱并驗證
大家好,本篇文章主要講的是springboot整合redis實現(xiàn)發(fā)送郵箱并驗證,感興趣的同學趕快來看一看吧,對你有幫助的話記得收藏一下2022-01-01
使用kotlin集成springboot開發(fā)的超詳細教程
目前大多數(shù)都在使用java集成 springboot進行開發(fā),本文演示僅僅將 java換成 kotlin,其他不變的情況下進行開發(fā),需要的朋友可以參考下2021-09-09
Java的數(shù)據(jù)類型和參數(shù)傳遞(詳解)
下面小編就為大家?guī)硪黄狫ava的數(shù)據(jù)類型和參數(shù)傳遞(詳解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-07-07
Java簡單計時的實現(xiàn)案例(可以用來限時循環(huán))
這篇文章主要介紹了Java簡單計時的實現(xiàn)案例(可以用來限時循環(huán)),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08
一文搞懂SpringMVC中@InitBinder注解的使用
@InitBinder方法可以注冊控制器特定的java.bean.PropertyEditor或Spring Converter和 Formatter組件。本文通過示例為大家詳細講講@InitBinder注解的使用,需要的可以參考一下2022-06-06

