欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java AQS中CyclicBarrier回環(huán)柵欄的使用

 更新時間:2023年02月02日 09:24:29   作者:飛奔的小付  
這篇文章主要介紹了Java中的 CyclicBarrier詳解,CyclicBarrier沒有顯示繼承哪個父類或者實(shí)現(xiàn)哪個父接口, 所有AQS和重入鎖不是通過繼承實(shí)現(xiàn)的,而是通過組合實(shí)現(xiàn)的,下文相關(guān)內(nèi)容需要的小伙伴可以參考一下

一. 簡介

CyclicBarrier ,回環(huán)柵欄(循環(huán)屏障),通過它可以實(shí)現(xiàn)讓一組線程等待至某個狀態(tài)(屏障點(diǎn))之后再全部同時執(zhí)行。叫做回環(huán)是因?yàn)楫?dāng)所有等待線程都被釋放以后,CyclicBarrier可以被重用。

二. CyclicBarrier的使用

構(gòu)造方法

//parties表示屏障攔截的線程數(shù)量,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
public CyclicBarrier(int parties) {
        this(parties, null);
    }
 //用于在線程到達(dá)屏障時,優(yōu)先執(zhí)行 barrierAction,
 //方便處理更復(fù)雜的業(yè)務(wù)場景(該線程的執(zhí)行時機(jī)是在到達(dá)屏障之后再執(zhí)行)
 public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

重要方法

//指定數(shù)量的線程全部調(diào)用await()方法時,這些線程不再阻塞
// BrokenBarrierException 表示柵欄已經(jīng)被破壞,破壞的原因可能是其中一個線程 await() 時被中斷或者超時
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循環(huán)重置
public void reset() {}

三. CyclicBarrier的應(yīng)用場景

CyclicBarrier 可以用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的場景。

例子一 ,多個線程調(diào)用await之后阻塞,等到達(dá)到屏障攔截的線程數(shù)量之后,再一起執(zhí)行

public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName()
                                + "開始等待其他線程");
                        cyclicBarrier.await();
                        System.out.println(Thread.currentThread().getName() + "開始執(zhí)行");
                        //TODO 模擬業(yè)務(wù)處理
                        Thread.sleep(5000);
                        System.out.println(Thread.currentThread().getName() + "執(zhí)行完畢");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

Thread-0開始等待其他線程
Thread-2開始等待其他線程
Thread-1開始等待其他線程
Thread-4開始等待其他線程
Thread-3開始等待其他線程
Thread-2開始執(zhí)行
Thread-1開始執(zhí)行
Thread-0開始執(zhí)行

例子二 用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的場景

  //保存每個學(xué)生的平均成績
    private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
    private ExecutorService threadPool= Executors.newFixedThreadPool(3);
    private CyclicBarrier cb=new CyclicBarrier(3,()->{
        int result=0;
        Set<String> set = map.keySet();
        for(String s:set){
            result+=map.get(s);
        }
        System.out.println("三人平均成績?yōu)?"+(result/3)+"分");
    });
    public void count(){
        for(int i=0;i<3;i++){
            threadPool.execute(new Runnable(){
                @Override
                public void run() {
                    //獲取學(xué)生平均成績
                    int score=(int)(Math.random()*40+60);
                    map.put(Thread.currentThread().getName(), score);
                    System.out.println(Thread.currentThread().getName()
                            +"同學(xué)的平均成績?yōu)椋?+score);
                    try {
                        //執(zhí)行完運(yùn)行await(),等待所有學(xué)生平均成績都計算完畢
                        cb.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    public static void main(String[] args) {
        CyclicBarrierTest2 cb=new CyclicBarrierTest2();
        cb.count();
    }

pool-1-thread-3同學(xué)的平均成績?yōu)椋?2
pool-1-thread-1同學(xué)的平均成績?yōu)椋?4
pool-1-thread-2同學(xué)的平均成績?yōu)椋?5
三人平均成績?yōu)?73分

例子三 利用CyclicBarrier的計數(shù)器能夠重置,屏障可以重復(fù)使用的特性

public static void main(String[] args) {
        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5, 5, 1000, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                (r) -> new Thread(r, counter.addAndGet(1) + " 號 "),
                new ThreadPoolExecutor.AbortPolicy());
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
                () -> System.out.println("裁判:比賽開始~~"));
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.submit(new Runner(cyclicBarrier));
        }
    }
    static class Runner extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Runner (CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            try {
                int sleepMills = ThreadLocalRandom.current().nextInt(1000);
                Thread.sleep(sleepMills);
                System.out.println(Thread.currentThread().getName() + " 選手已就位, 準(zhǔn)備共用時: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
        }
    }

3 號 選手已就位, 準(zhǔn)備共用時: 223ms0
2 號 選手已就位, 準(zhǔn)備共用時: 315ms1
5 號 選手已就位, 準(zhǔn)備共用時: 471ms2
1 號 選手已就位, 準(zhǔn)備共用時: 556ms3
4 號 選手已就位, 準(zhǔn)備共用時: 923ms4
裁判:比賽開始~~
3 號 選手已就位, 準(zhǔn)備共用時: 285ms0
2 號 選手已就位, 準(zhǔn)備共用時: 413ms1
1 號 選手已就位, 準(zhǔn)備共用時: 533ms2
5 號 選手已就位, 準(zhǔn)備共用時: 661ms3
4 號 選手已就位, 準(zhǔn)備共用時: 810ms4
裁判:比賽開始~~

四. CyclicBarrier與CountDownLatch的區(qū)別

  • CountDownLatch的計數(shù)器只能使用一次,而CyclicBarrier的計數(shù)器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場景,比如如果計算發(fā)生錯誤,可以重置計數(shù)器,并讓線程們重新執(zhí)行一次
  • CyclicBarrier還提供getNumberWaiting(可以獲得CyclicBarrier阻塞的線程數(shù)量)、isBroken(用來知道阻塞的線程是否被中斷)等方法。
  • CountDownLatch會阻塞主線程,CyclicBarrier不會阻塞主線程,只會阻塞子線程。
  • CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待,只不過它們側(cè)重點(diǎn)不同。CountDownLatch一般用于一個或多個線程,等待其他線程執(zhí)行完任務(wù)后,再執(zhí)行。CyclicBarrier一般用于一組線程互相等待至某個狀態(tài),然后這一組線程再同時執(zhí)行。
  • CyclicBarrier 還可以提供一個 barrierAction,合并多線程計算結(jié)果。
  • CyclicBarrier是通過ReentrantLock的"獨(dú)占鎖"和Conditon來實(shí)現(xiàn)一組線程的阻塞喚醒的,而CountDownLatch則是通過AQS的“共享鎖”實(shí)現(xiàn)

五. CyclicBarrier源碼解析

以例子以為例,thread0 從await方法開始,await會調(diào)用dowait

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

那么dowait里又做了什么

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //如果狀態(tài)為0就將其修改為1,并設(shè)置當(dāng)前線程,調(diào)用await時外面肯定是lock
        lock.lock();
        try {
          //每一個棧欄算是一代
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
			//每個線程執(zhí)行一次,count自減一
            int index = --count;
            // count減到0,說明幾個線程都到達(dá)屏障,就會重置 進(jìn)入下一個屏障
            if (index == 0) {  
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                    //trip就是一個條件隊列condition,入條件等待隊列,單向鏈表結(jié)構(gòu)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

入隊阻塞的方法在await()中,下面看一下

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
                //入隊 創(chuàng)建節(jié)點(diǎn)
            Node node = addConditionWaiter();
            // 釋放鎖,這樣其他線程才能獲取鎖,執(zhí)行
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
            	//不在當(dāng)前隊列中就阻塞
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

addConditionWaiter中創(chuàng)建節(jié)點(diǎn),對于thread0,頭節(jié)點(diǎn)是自己,lastWaiter也是自己

/**
*	添加條件等待節(jié)點(diǎn)
*/
private Node addConditionWaiter() {
            Node t = lastWaiter;
            //節(jié)點(diǎn)取消 則移除.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //創(chuàng)建一個節(jié)點(diǎn),thread=Thread.currentThread(),waitstatus= Node.CONDITION =-2
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //如果上一個節(jié)點(diǎn)為null,則將該節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

釋放鎖

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //釋放鎖時state變?yōu)?了
            if (c == 0) {
                free = true;
                //將當(dāng)前線程設(shè)置為null
                setExclusiveOwnerThread(null);
            }
            //修改state狀態(tài)值
            setState(c);
            return free;
        }

thread0的大致流程

首先會調(diào)用lock.lock進(jìn)行加鎖,加鎖之后調(diào)用trip.await方法進(jìn)行入隊阻塞,入隊是通過addConditionWaiter添加進(jìn)條件等待隊列,然后通過fullyRelease釋放鎖,設(shè)置當(dāng)前線程為null,然后修改state狀態(tài)值,最后調(diào)用LockSupport.park(this);進(jìn)行阻塞。

threa1的流程大致和thread0一樣,還沒有達(dá)到屏障數(shù)量,入隊的地方和thread0不一樣

private Node addConditionWaiter() {
            Node t = lastWaiter;
            //節(jié)點(diǎn)取消 則移除.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //創(chuàng)建一個節(jié)點(diǎn),thread=Thread.currentThread(),waitstatus= Node.CONDITION =-2
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //如果上一個節(jié)點(diǎn)為null,則將該節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
            if (t == null)
                firstWaiter = node;
            else
            	//thread1執(zhí)行時,t已經(jīng)不為null了
                t.nextWaiter = node;
                //lastWaiter指向當(dāng)前
            lastWaiter = node;
            return node;
        }

thread1流程

thread2執(zhí)行的時候count已經(jīng)減到0,會執(zhí)行nextGeneration方法

			//每個線程執(zhí)行一次,count自減一
            int index = --count;
            // count減到0,說明幾個線程都到達(dá)屏障,就會重置 進(jìn)入下一個屏障
            if (index == 0) {  
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //開啟下一代屏障
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
 private void nextGeneration() {
        // 喚醒所有線程,喚醒操作是在同步等待隊列中,所以要將條件等待隊列轉(zhuǎn)換為同步等待隊列
        trip.signalAll();
        // 重置count
        count = parties;
        //創(chuàng)建下一代屏障
        generation = new Generation();
    }
public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
  private void doSignalAll(Node first) {
  			//將firstWaiter和lastWaiter都置為null,就沒有首尾節(jié)點(diǎn)了
            lastWaiter = firstWaiter = null;
            //條件隊列的出隊
            do {
            //循環(huán)將條件隊列轉(zhuǎn)同步隊列
                Node next = first.nextWaiter;
                //first的nextWaiter置為null
                first.nextWaiter = null;
                //條件隊列轉(zhuǎn)同步隊列,因?yàn)閱拘咽窃谕疥犃兄?
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

條件隊列的出隊

/**
* 條件隊列轉(zhuǎn)同步隊列
*/
 final boolean transferForSignal(Node node) {
 		//將同步狀態(tài)改為0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        Node p = enq(node);
        int ws = p.waitStatus;
        //將ws設(shè)置為-1
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

同步隊列入隊

thread2開始解鎖

 private int dowait(boolean timed, long nanos){
//....
 } finally {
            lock.unlock();
        }
}
 public void unlock() {
        sync.release(1);
    }
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            	//喚醒線程0
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
        //將ws設(shè)置為0
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
        //喚醒線程0
            LockSupport.unpark(s.thread);
    }

Thread0喚醒之后就會獲取鎖,執(zhí)行業(yè)務(wù)邏輯然后再釋放鎖

 public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 在這里獲取鎖,再釋放鎖
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) 
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
//CAS嘗試獲取鎖
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //cas獲取鎖
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

同步隊列就會出隊

Thread0喚醒之后,執(zhí)行業(yè)務(wù)邏輯之后會unlock,又會喚醒Thread1,Thread1喚醒之后,又會喚醒Thread2。

總結(jié):

await方法,

前半段 釋放鎖 進(jìn)入條件隊列,阻塞線程(Thread0 Thread1),

過渡階段 其他線程調(diào)用singnal/signalAll喚醒(Thread2),條件隊列轉(zhuǎn)同步隊列,可以在釋放鎖的時候喚醒head的后續(xù)節(jié)點(diǎn)所在的線程

后半段 (Thread0)被喚醒的線程獲取鎖(如果有競爭,CAS獲取鎖失敗,還會阻塞),Thread0釋放鎖,喚醒同步隊列中head的后續(xù)節(jié)點(diǎn)所在的線程(獨(dú)占鎖的邏輯)

到此這篇關(guān)于Java AQS中CyclicBarrier回環(huán)柵欄的使用的文章就介紹到這了,更多相關(guān)Java CyclicBarrier內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論