欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解Java如何實(shí)現(xiàn)多線程步調(diào)一致

 更新時(shí)間:2023年07月05日 08:25:27   作者:Shawn_Shawn  
本章節(jié)主要講解另外兩個(gè)線程同步器:CountDownLatch和CyclicBarrier的用法,使用場(chǎng)景以及實(shí)現(xiàn)原理,感興趣的小伙伴可以了解一下

CountDownLatch的用法

CountDownLatch主要有兩個(gè)方法:

  • countDown():用于使計(jì)數(shù)器減一,一般是執(zhí)行任務(wù)的線程調(diào)用。
  • await():調(diào)用該方法的線程處于等待狀態(tài),一般是主線程調(diào)用。

這里需要注意的是

countDown()方法并沒有規(guī)定一個(gè)線程只能調(diào)用一次,當(dāng)同一個(gè)線程調(diào)用多次countDown()方法時(shí),每次都會(huì)使計(jì)數(shù)器減一;

await()方法也并沒有規(guī)定只能有一個(gè)線程執(zhí)行該方法,如果多個(gè)線程同時(shí)執(zhí)行await()方法,那么這幾個(gè)線程都將處于等待狀態(tài),并且以共享模式享有同一個(gè)鎖。

如下是其使用示例:

private static void countDownLatch() throws InterruptedException {
 ? ?CountDownLatch latch = new CountDownLatch(5);
 ? ?ExecutorService service = Executors.newFixedThreadPool(5);
?
 ? ?for (int i = 0; i < 4; i++) {
 ? ? ?service.submit(
 ? ? ? ?  () -> {
 ? ? ? ? ? ?action();
 ? ? ? ? ? ?latch.countDown();
 ? ? ? ?  });
 ?  }
?
 ? ?service.awaitTermination(50, TimeUnit.MILLISECONDS);
 ? ?latch.await();
 ? ?System.out.println("Done");
 ? ?service.shutdownNow();
  }
?
 ?private static void action() {
 ? ?System.out.printf("當(dāng)前線程[%s], 正在執(zhí)行。。。\n", Thread.currentThread().getName());
  }
?

CountDownLatch實(shí)現(xiàn)原理

// java.util.concurrent.CountDownLatch
public class CountDownLatch {
 ? ?/**
 ? ? * Synchronization control For CountDownLatch.
 ? ? * Uses AQS state to represent count.
 ? ? */
 ? ?private static final class Sync extends AbstractQueuedSynchronizer {
 ? ? ? ?private static final long serialVersionUID = 4982264981922014374L;
?
 ? ? ? ?Sync(int count) {
 ? ? ? ? ? ?setState(count);
 ? ? ?  }
?
 ? ? ? ?int getCount() {
 ? ? ? ? ? ?return getState();
 ? ? ?  }
?
 ? ? ? ?protected int tryAcquireShared(int acquires) {
 ? ? ? ? ? ?return (getState() == 0) ? 1 : -1;
 ? ? ?  }
?
 ? ? ? ?protected boolean tryReleaseShared(int releases) {
 ? ? ? ? ? ?// Decrement count; signal when transition to zero
 ? ? ? ? ? ?for (;;) {
 ? ? ? ? ? ? ? ?int c = getState();
 ? ? ? ? ? ? ? ?if (c == 0)
 ? ? ? ? ? ? ? ? ? ?return false;
 ? ? ? ? ? ? ? ?int nextc = c - 1;
 ? ? ? ? ? ? ? ?if (compareAndSetState(c, nextc))
 ? ? ? ? ? ? ? ? ? ?return nextc == 0;
 ? ? ? ? ?  }
 ? ? ?  }
 ?  }
?
 ? ?private final Sync sync;
 ? ?public CountDownLatch(int count) {
 ? ? ? ?if (count < 0) throw new IllegalArgumentException("count < 0");
 ? ? ? ?this.sync = new Sync(count);
 ?  }
?
 ? ?public void await() throws InterruptedException {
 ? ? ? ?sync.acquireSharedInterruptibly(1);
 ?  }
 ? ?public boolean await(long timeout, TimeUnit unit)
 ? ? ? ?throws InterruptedException {
 ? ? ? ?return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
 ?  }
 ? ?public void countDown() {
 ? ? ? ?sync.releaseShared(1);
 ?  }
?
}

從上述摘錄的源代碼來看,CountDownLatch的實(shí)現(xiàn)還是依賴于AQS,這跟之前討論過的ReentrantLock的實(shí)現(xiàn)原理基本一致。

接下來,我們主要分析一下await()countDown()的實(shí)現(xiàn)原理。

await() 步驟如下:

public final void acquireSharedInterruptibly(int arg)
 ?throws InterruptedException {
 ?if (Thread.interrupted())
 ? ?throw new InterruptedException();
 ?if (tryAcquireShared(arg) < 0) // 查看state是否為0,如果為0,直接返回,如果不為0,則調(diào)用doAcquireSharedInterruptibly()阻塞等待state變?yōu)?.
 ? ?doAcquireSharedInterruptibly(arg);
}

countDown() 步驟如下:

public final boolean releaseShared(int arg) {
 ?if (tryReleaseShared(arg)) { // state--,如果state變?yōu)?,則執(zhí)行doReleaseShared(),喚醒等待隊(duì)列中的線程
 ? ?doReleaseShared();
 ? ?return true;
  }
 ?return false;
}

CyclicBarrier的用法

CyclicBarrier主要是有兩個(gè)方法:

await():計(jì)數(shù)器減1,當(dāng)計(jì)數(shù)器的值為0,喚醒等待的線程。

reset():重置計(jì)數(shù)器

public class CyclicBarrierDemo {
?
 ?public static void main(String args[]) throws InterruptedException, BrokenBarrierException {
?
 ? ?CyclicBarrier barrier = new CyclicBarrier(4);
 ? ?Party first = new Party(1000, barrier, "PARTY-1");
 ? ?Party second = new Party(2000, barrier, "PARTY-2");
 ? ?Party third = new Party(3000, barrier, "PARTY-3");
 ? ?Party fourth = new Party(4000, barrier, "PARTY-4");
?
 ? ?first.start();
 ? ?second.start();
 ? ?third.start();
 ? ?fourth.start();
?
 ? ?System.out.println(Thread.currentThread().getName() + " has finished");
  }
}
?
class Party extends Thread {
 ?private int duration;
 ?private CyclicBarrier barrier;
?
 ?public Party(int duration, CyclicBarrier barrier, String name) {
 ? ?super(name);
 ? ?this.duration = duration;
 ? ?this.barrier = barrier;
  }
?
 ?@Override
 ?public void run() {
 ? ?try {
 ? ? ?Thread.sleep(duration);
 ? ? ?System.out.println(Thread.currentThread().getName() + " is calling await()");
 ? ? ?barrier.await();
 ? ? ?System.out.println(Thread.currentThread().getName() + " has started running again");
 ?  } catch (InterruptedException | BrokenBarrierException e) {
 ? ? ?e.printStackTrace();
 ?  }
  }
}

CyclicBarrier實(shí)現(xiàn)原理

CyclicBarrierCountDownLatch不同,CountDownLatch的同步是基于AQS同步器,而CyclicBarrier的同步是基于條件變量的,部分源代碼如下:

public class CyclicBarrier {
 ? ?private static class Generation {
 ? ? ? ?Generation() {} ? ? ? ? ? ? ? ? // prevent access constructor creation
 ? ? ? ?boolean broken; ? ? ? ? ? ? ? ? // initially false
 ?  }
?
 ? ?/** The lock for guarding barrier entry */
 ? ?private final ReentrantLock lock = new ReentrantLock();
 ? ?/** Condition to wait on until tripped */
 ? ?private final Condition trip = lock.newCondition();
 ? ?/** The number of parties */
 ? ?private final int parties;
 ? ?/** The command to run when tripped */
 ? ?private final Runnable barrierCommand;
 ? ?/** The current generation */
 ? ?private Generation generation = new Generation();
?
 ? ?/**
 ? ? * Number of parties still waiting. Counts down from parties to 0
 ? ? * on each generation.  It is reset to parties on each new
 ? ? * generation or when broken.
 ? ? */
 ? ?private int count;
?
 ? ?private void nextGeneration() {
 ? ? ? ?// signal completion of last generation
 ? ? ? ?trip.signalAll();
 ? ? ? ?// set up next generation
 ? ? ? ?count = parties;
 ? ? ? ?generation = new Generation();
 ?  }
?
 ? ?private void breakBarrier() {
 ? ? ? ?generation.broken = true;
 ? ? ? ?count = parties;
 ? ? ? ?trip.signalAll();
 ?  }
?
 ? ?/**
 ? ? * Main barrier code, covering the various policies.
 ? ? */
 ? ?private int dowait(boolean timed, long nanos)
 ? ? ? ?throws InterruptedException, BrokenBarrierException,
 ? ? ? ? ? ? ? TimeoutException {
 ? ? ? ?final ReentrantLock lock = this.lock;
 ? ? ? ?lock.lock();
 ? ? ? ?try {
 ? ? ? ? ? ?final Generation g = generation;
            // 如果generation.broken為true的話,說明這個(gè)屏障已經(jīng)損壞,當(dāng)某個(gè)線程await的時(shí)候,直接拋出異常
 ? ? ? ? ? ?if (g.broken)
 ? ? ? ? ? ? ? ?throw new BrokenBarrierException();
?
 ? ? ? ? ? ?if (Thread.interrupted()) {
 ? ? ? ? ? ? ? ?breakBarrier();
 ? ? ? ? ? ? ? ?throw new InterruptedException();
 ? ? ? ? ?  }
?
 ? ? ? ? ? ?int index = --count;
 ? ? ? ? ? ?// 當(dāng)count == 0時(shí),說明所有線程都已經(jīng)到屏障處,
 ? ? ? ? ? ?if (index == 0) { ?// tripped
 ? ? ? ? ? ? ? ?boolean ranAction = false;
 ? ? ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ? ? ?final Runnable command = barrierCommand;
 ? ? ? ? ? ? ? ? ? ?if (command != null)
 ? ? ? ? ? ? ? ? ? ? ? ?command.run();
 ? ? ? ? ? ? ? ? ? ?ranAction = true;
 ? ? ? ? ? ? ? ? ? ?// 執(zhí)行條件變量的signalAll方法喚醒等待的線程。
 ? ? ? ? ? ? ? ? ? ?// 實(shí)現(xiàn)屏障的循環(huán)使用,重置
 ? ? ? ? ? ? ? ? ? ?nextGeneration();
 ? ? ? ? ? ? ? ? ? ?return 0;
 ? ? ? ? ? ? ?  } finally {
 ? ? ? ? ? ? ? ? ? ?if (!ranAction)
 ? ? ? ? ? ? ? ? ? ? ? ?breakBarrier();
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
?
 ? ? ? ? ? ?// loop until tripped, broken, interrupted, or timed out
 ? ? ? ? ? ?// 如果count!= 0,說明有線程還未到屏障處,則在鎖條件變量trip上等待。
 ? ? ? ? ? ?for (;;) {
 ? ? ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ? ? ?if (!timed)
 ? ? ? ? ? ? ? ? ? ? ? ?trip.await();
 ? ? ? ? ? ? ? ? ? ?else if (nanos > 0L)
 ? ? ? ? ? ? ? ? ? ? ? ?nanos = trip.awaitNanos(nanos);
 ? ? ? ? ? ? ?  } catch (InterruptedException ie) {
 ? ? ? ? ? ? ? ? ? ?if (g == generation && ! g.broken) {
 ? ? ? ? ? ? ? ? ? ? ? ?breakBarrier();
 ? ? ? ? ? ? ? ? ? ? ? ?throw ie;
 ? ? ? ? ? ? ? ? ?  } else {
 ? ? ? ? ? ? ? ? ? ? ? ?// We're about to finish waiting even if we had not
 ? ? ? ? ? ? ? ? ? ? ? ?// been interrupted, so this interrupt is deemed to
 ? ? ? ? ? ? ? ? ? ? ? ?// "belong" to subsequent execution.
 ? ? ? ? ? ? ? ? ? ? ? ?Thread.currentThread().interrupt();
 ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ?if (g.broken)
 ? ? ? ? ? ? ? ? ? ?throw new BrokenBarrierException();
?
 ? ? ? ? ? ? ? ?if (g != generation)
 ? ? ? ? ? ? ? ? ? ?return index;
?
 ? ? ? ? ? ? ? ?if (timed && nanos <= 0L) {
 ? ? ? ? ? ? ? ? ? ?breakBarrier();
 ? ? ? ? ? ? ? ? ? ?throw new TimeoutException();
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
 ? ? ?  } finally {
 ? ? ? ? ? ?lock.unlock();
 ? ? ?  }
 ?  }
?
 ?//....
 ? ?public int await() throws InterruptedException, BrokenBarrierException {
 ? ? ? ?try {
 ? ? ? ? ? ?return dowait(false, 0L);
 ? ? ?  } catch (TimeoutException toe) {
 ? ? ? ? ? ?throw new Error(toe); // cannot happen
 ? ? ?  }
 ?  }
 ? ?public int await(long timeout, TimeUnit unit)
 ? ? ? ?throws InterruptedException,
 ? ? ? ? ? ? ? BrokenBarrierException,
 ? ? ? ? ? ? ? TimeoutException {
 ? ? ? ?return dowait(true, unit.toNanos(timeout));
 ?  }
 ? ?public void reset() {
 ? ? ? ?final ReentrantLock lock = this.lock;
 ? ? ? ?lock.lock();
 ? ? ? ?try {
 ? ? ? ? ? ?breakBarrier(); ? // break the current generation
 ? ? ? ? ? ?nextGeneration(); // start a new generation
 ? ? ?  } finally {
 ? ? ? ? ? ?lock.unlock();
 ? ? ?  }
 ?  }
?
}

極客時(shí)間專欄的案例

案例

用戶通過在線商城下單,會(huì)生成電子訂單,保存在訂單庫;之后物流就會(huì)生成派送單給用戶發(fā)貨,派送單存在派送單庫。為了防止漏派送或者重復(fù)派送,對(duì)賬系統(tǒng)每天都會(huì)校驗(yàn)是否存在異常訂單,將差異寫在訂單庫。

代碼示例:

while(存在未對(duì)賬訂單){
 ?// 查詢未對(duì)賬訂單
 ?pos = getPOrders();
 ?// 查詢派送單
 ?dos = getDOrders();
 ?// 執(zhí)行對(duì)賬操作
 ?diff = check(pos, dos);
 ?// 差異寫入差異庫
 ?save(diff);
} 
?

優(yōu)化

單線程的執(zhí)行示意圖:

目前的對(duì)賬系統(tǒng),由于訂單量和派送單量巨大,所以getPOrders()和getDOrders()相對(duì)較慢,另外這兩個(gè)操作沒有先后順序的依賴。這兩個(gè)操作并行以后,你會(huì)發(fā)現(xiàn),吞吐量近乎單線程的2倍,示意圖如下:

代碼示例:

while(存在未對(duì)賬訂單){
     ?// 查詢未對(duì)賬訂單
     ?Thread T1 = new Thread(()->{
     ? ?pos = getPOrders();
      });
     ?T1.start();
     ?// 查詢派送單
     ?Thread T2 = new Thread(()->{
     ? ?dos = getDOrders();
      });
     ?T2.start();
     ?// 等待 T1、T2 結(jié)束
     ?T1.join();
     ?T2.join();
     ?// 執(zhí)行對(duì)賬操作
     ?diff = check(pos, dos);
     ?// 差異寫入差異庫
     ?save(diff);
 } 
    ?

我們創(chuàng)建兩個(gè)線程T1,T2,并行執(zhí)行g(shù)etPOrders()和getDOrders()。需要等t1,t2都執(zhí)行完畢后,才能執(zhí)行對(duì)賬操作,差異寫入差異庫的操作。

用CountDownLatch實(shí)現(xiàn)線程等待

上述代碼的缺點(diǎn):while循環(huán),每次都會(huì)創(chuàng)建新的線程,創(chuàng)建線程是個(gè)重量級(jí)操作,所以最好能把創(chuàng)建出來的線程重復(fù)利用--線程池

我們創(chuàng)建一個(gè)固定大小為2的線程池,之后在while循環(huán)里重復(fù)利用。但是在線程池方案里,線程根本就不會(huì)退出,執(zhí)行join失效,導(dǎo)致check和save不知道啥時(shí)執(zhí)行。

Join方案無效,那么我們可以采用計(jì)數(shù)器的方案來實(shí)現(xiàn),比如計(jì)數(shù)器的初始值是2,當(dāng)執(zhí)行完getPOrders(),計(jì)數(shù)器減一,變?yōu)?,當(dāng)執(zhí)行完getDOrders()計(jì)數(shù)器再減一,變?yōu)?,當(dāng)計(jì)數(shù)器變?yōu)?的時(shí)候,開始執(zhí)行check,save操作。

利用CountDownLatch就可以實(shí)現(xiàn)

代碼示例:

    // 創(chuàng)建 2 個(gè)線程的線程池
    Executor executor = 
     ?Executors.newFixedThreadPool(2);
    while(存在未對(duì)賬訂單){
     ?// 計(jì)數(shù)器初始化為 2
     ?CountDownLatch latch = 
     ? ?new CountDownLatch(2);
     ?// 查詢未對(duì)賬訂單
     ?executor.execute(()-> {
     ? ?pos = getPOrders();
     ? ? // 計(jì)數(shù)器-1
     ? ?latch.countDown();
      });
     ?// 查詢派送單
     ?executor.execute(()-> {
     ? ?dos = getDOrders();
     ? ? ?// 計(jì)數(shù)器-1
     ? ?latch.countDown();
      });
     ?// 等待兩個(gè)查詢操作結(jié)束 計(jì)數(shù)器變?yōu)?
     ?latch.await();
     ?// 執(zhí)行對(duì)賬操作
     ?diff = check(pos, dos);
     ?// 差異寫入差異庫
     ?save(diff);
    }
    ?

進(jìn)一步優(yōu)化性能

前面我們將getPOrders()和getDOrders()并行了,事實(shí)上查詢操作和對(duì)賬操作也是可以并行的。在執(zhí)行對(duì)賬操作的時(shí)候,可以同時(shí)去執(zhí)行下一輪的查詢操作。過程示意圖:

兩次查詢操作能夠和對(duì)賬操作并行,對(duì)賬操作還依賴查詢操作的結(jié)果。這是典型的生產(chǎn)者消費(fèi)者模型。

兩次查詢是生產(chǎn)者,對(duì)賬操作是消費(fèi)者。

需要隊(duì)列來保存生產(chǎn)者生產(chǎn)的數(shù)據(jù),消費(fèi)者從隊(duì)列中消費(fèi)數(shù)據(jù)。

本案例需要兩個(gè)隊(duì)列

訂單隊(duì)列和派送單隊(duì)列,這兩個(gè)隊(duì)列的元素之間是一一對(duì)應(yīng)的。對(duì)賬操作可以從訂單隊(duì)列里取出一個(gè)元素,從派送單隊(duì)列里取出一個(gè)元素,然后對(duì)這兩個(gè)元素進(jìn)行對(duì)賬操作,這樣數(shù)據(jù)一定不會(huì)亂掉

線程T1執(zhí)行訂單的查詢工作,線程T2執(zhí)行派送單的查詢工作,當(dāng)T1 T2都各自生產(chǎn)完1條數(shù)據(jù)后,通知T3執(zhí)行對(duì)賬操作。要求T1和T2的步調(diào)一致,不能一個(gè)太快一個(gè)太慢。只有這樣才能做到各自生產(chǎn)完1條數(shù)據(jù)的時(shí)候,是一一對(duì)應(yīng)的,并且能通知T3。

用CyclicBarrier實(shí)現(xiàn)同步

計(jì)數(shù)器初始化為2,當(dāng)T1 T2生產(chǎn)完一條數(shù)據(jù)都將計(jì)數(shù)器-1,如果計(jì)數(shù)器大于0,則線程T1或者T2等待。如果計(jì)數(shù)器=0,則通知線程T3,并喚醒等待線程T1或者T2,與此同時(shí),計(jì)數(shù)器重置為2.

// 訂單隊(duì)列
Vector<P> pos;
// 派送單隊(duì)列
Vector<D> dos;
// 執(zhí)行回調(diào)的線程池 
Executor executor = 
 ?Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
 ?new CyclicBarrier(2, ()->{
 ? ?executor.execute(()->check());
  });
void check(){
 ?P p = pos.remove(0);
 ?D d = dos.remove(0);
 ?// 執(zhí)行對(duì)賬操作
 ?diff = check(p, d);
 ?// 差異寫入差異庫
 ?save(diff);
}
void checkAll(){
 ?// 循環(huán)查詢訂單庫
 ?Thread T1 = new Thread(()->{
 ? ?while(存在未對(duì)賬訂單){
 ? ? ?// 查詢訂單庫
 ? ? ?pos.add(getPOrders());
 ? ? ?// 等待
 ? ? ?barrier.await();
 ?  }
  });
 ?T1.start(); ?
 ?// 循環(huán)查詢運(yùn)單庫
 ?Thread T2 = new Thread(()->{
 ? ?while(存在未對(duì)賬訂單){
 ? ? ?// 查詢運(yùn)單庫
 ? ? ?dos.add(getDOrders());
 ? ? ?// 等待
 ? ? ?barrier.await();
 ?  }
  });
 ?T2.start();
}
?

區(qū)別

CountDownLatch用來解決一個(gè)線程等待多個(gè)線程的場(chǎng)景

CyclicBarrier是一組線程之間互相等待,計(jì)數(shù)器可以循環(huán)利用。

以上就是詳解Java如何實(shí)現(xiàn)多線程步調(diào)一致的詳細(xì)內(nèi)容,更多關(guān)于Java多線程同步的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論