Java多線程之循環(huán)柵欄技術(shù)CyclicBarrier使用探索
第1章:引言
工作中,咱們經(jīng)常會(huì)遇到需要多個(gè)線程協(xié)同工作的情況。CyclicBarrier,直譯過來就是“循環(huán)屏障”。它是Java中用于管理一組線程,并讓它們?cè)谀硞€(gè)點(diǎn)上同步的工具。簡(jiǎn)單來說,咱們可以把一群線程想象成一隊(duì)馬拉雪橇的馴鹿,CyclicBarrier就像是一個(gè)指定的集合點(diǎn),所有馴鹿必須到齊了,才能繼續(xù)下一段旅程。
不過別擔(dān)心,這聽起來比實(shí)際復(fù)雜。實(shí)際上,CyclicBarrier提供了一種簡(jiǎn)單的方式來達(dá)到這個(gè)同步目的。它通過一個(gè)計(jì)數(shù)器來實(shí)現(xiàn),這個(gè)計(jì)數(shù)器初始值是線程的數(shù)量。當(dāng)一個(gè)線程到達(dá)屏障點(diǎn)時(shí),計(jì)數(shù)器就減一。當(dāng)計(jì)數(shù)器減到0時(shí),表示所有線程都到齊了,然后咱們可以執(zhí)行一些操作,或者繼續(xù)執(zhí)行下一步。
第2章:CyclicBarrier基礎(chǔ)
要深入理解CyclicBarrier,咱們首先得知道它是怎么工作的。CyclicBarrier在Java的java.util.concurrent
包中,是并發(fā)編程的一部分。它主要用于讓一組線程互相等待,直到所有線程都達(dá)到了一個(gè)公共屏障點(diǎn)(Barrier Point),然后這些線程才繼續(xù)執(zhí)行。
讓小黑舉個(gè)簡(jiǎn)單的例子。假設(shè)咱們有一個(gè)任務(wù),需要四個(gè)線程同時(shí)開始執(zhí)行。這就可以用CyclicBarrier來實(shí)現(xiàn)。小黑寫了下面這段代碼,來展示基本的用法:
import java.util.concurrent.CyclicBarrier; public class CyclicBarrierExample { public static void main(String[] args) { // 創(chuàng)建一個(gè)新的CyclicBarrier,其中包括4個(gè)線程 CyclicBarrier barrier = new CyclicBarrier(4, () -> System.out.println("所有線程到達(dá)屏障點(diǎn),可以繼續(xù)執(zhí)行!")); // 創(chuàng)建四個(gè)線程 for (int i = 0; i < 4; i++) { int threadNum = i; new Thread(() -> { try { System.out.println("線程 " + threadNum + " 正在執(zhí)行任務(wù)"); Thread.sleep(1000); // 模擬任務(wù)執(zhí)行時(shí)間 System.out.println("線程 " + threadNum + " 到達(dá)屏障點(diǎn)"); barrier.await(); // 等待其他線程 System.out.println("線程 " + threadNum + " 繼續(xù)執(zhí)行其他任務(wù)"); } catch (Exception e) { e.printStackTrace(); } }).start(); } } }
在這個(gè)例子中,咱們創(chuàng)建了一個(gè)CyclicBarrier
實(shí)例,這個(gè)實(shí)例要求四個(gè)線程都達(dá)到屏障點(diǎn)后才能繼續(xù)執(zhí)行。每個(gè)線程在執(zhí)行自己的任務(wù)后,會(huì)調(diào)用barrier.await();
來等待其他線程。所有線程都調(diào)用了await()
方法后,計(jì)數(shù)器變?yōu)?,屏障就被克服了,每個(gè)線程繼續(xù)執(zhí)行它們之后的任務(wù)。
第3章:CyclicBarrier的核心特性
了解了CyclicBarrier的基本用法后,咱們來深入探討一下它的核心特性。這些特性讓CyclicBarrier成為并發(fā)編程中一個(gè)非常有用的工具,特別是在處理多線程同步問題時(shí)。
重用性
CyclicBarrier的一個(gè)顯著特點(diǎn)是它的重用性。這意味著一旦所有等待線程都到達(dá)屏障,它就可以重置并重用。這個(gè)特性使得CyclicBarrier非常適合于那些需要多次等待一組線程到達(dá)同一點(diǎn)的情況。
讓小黑用一個(gè)例子來說明這一點(diǎn)。假設(shè)咱們有一個(gè)處理數(shù)據(jù)的多階段任務(wù),每個(gè)階段都需要所有線程完成后才能進(jìn)入下一階段。這里就可以運(yùn)用CyclicBarrier的重用性。
import java.util.concurrent.CyclicBarrier; public class CyclicBarrierReuseExample { private static final int THREAD_COUNT = 3; public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> System.out.println("所有線程完成當(dāng)前階段,準(zhǔn)備進(jìn)入下一階段!")); for (int i = 0; i < THREAD_COUNT; i++) { int threadNum = i; new Thread(() -> { try { for (int phase = 1; phase <= 3; phase++) { // 假設(shè)有三個(gè)階段 System.out.println("線程 " + threadNum + " 完成階段 " + phase); barrier.await(); // 等待其他線程 } } catch (Exception e) { e.printStackTrace(); } }).start(); } } }
在這個(gè)例子中,每個(gè)線程都會(huì)經(jīng)歷三個(gè)階段。每個(gè)階段都使用相同的CyclicBarrier來同步線程。線程完成一個(gè)階段后,就會(huì)等待其他線程。一旦所有線程都完成了該階段,CyclicBarrier就會(huì)重置,讓線程開始下一個(gè)階段。
同步輔助功能
CyclicBarrier還提供了一個(gè)同步輔助功能:當(dāng)所有線程都到達(dá)屏障時(shí),可以執(zhí)行一個(gè)預(yù)定義的動(dòng)作。這是通過在CyclicBarrier的構(gòu)造函數(shù)中提供一個(gè)Runnable來實(shí)現(xiàn)的。
這個(gè)功能非常有用,因?yàn)樗试S咱們?cè)谒芯€程都到達(dá)屏障后,執(zhí)行一些處理,比如更新共享資源、合并結(jié)果等。小黑再來給大家展示一個(gè)例子:
import java.util.concurrent.CyclicBarrier; public class CyclicBarrierActionExample { public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("全部線程已到達(dá)屏障點(diǎn),執(zhí)行屏障動(dòng)作")); for (int i = 0; i < 3; i++) { new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 正在執(zhí)行任務(wù)"); Thread.sleep(2000); // 模擬任務(wù)執(zhí)行時(shí)間 barrier.await(); // 等待其他線程 System.out.println(Thread.currentThread().getName() + " 繼續(xù)執(zhí)行后續(xù)任務(wù)"); } catch (Exception e) { e.printStackTrace(); } }).start(); } } }
在這個(gè)代碼中,當(dāng)所有線程都到達(dá)屏障點(diǎn)時(shí),會(huì)執(zhí)行一段指定的代碼,即打印出“全部線程已到達(dá)屏障點(diǎn),執(zhí)行屏障動(dòng)作”。這樣的設(shè)計(jì)使得CyclicBarrier不僅僅是一個(gè)同步工具,還可以作為線程間協(xié)調(diào)的一種手段。
通過這些特性,CyclicBarrier成為了處理復(fù)雜同步問題的有力工具。它不僅能確保線程在繼續(xù)執(zhí)行前達(dá)到某個(gè)公共點(diǎn),還能夠在所有線程都準(zhǔn)備好后執(zhí)行。
第4章:CyclicBarrier的實(shí)際應(yīng)用場(chǎng)景
并行計(jì)算
一個(gè)典型的應(yīng)用場(chǎng)景是并行計(jì)算。假設(shè)咱們有一個(gè)大數(shù)據(jù)集,需要進(jìn)行復(fù)雜的數(shù)據(jù)處理,這個(gè)處理過程可以分解為多個(gè)獨(dú)立的子任務(wù),每個(gè)子任務(wù)由一個(gè)單獨(dú)的線程處理。但在進(jìn)行下一步處理之前,必須確保所有子任務(wù)都完成了當(dāng)前步驟。這里就是CyclicBarrier大顯身手的時(shí)候。
來看看下面這個(gè)例子,小黑寫了一段代碼,模擬了這種情況:
import java.util.concurrent.CyclicBarrier; public class ParallelComputationExample { private static final int THREAD_COUNT = 4; public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> System.out.println("所有子任務(wù)處理完成,準(zhǔn)備進(jìn)入下一步!")); for (int i = 0; i < THREAD_COUNT; i++) { new Thread(new Worker(i, barrier)).start(); } } static class Worker implements Runnable { private final int threadNumber; private final CyclicBarrier barrier; Worker(int threadNumber, CyclicBarrier barrier) { this.threadNumber = threadNumber; this.barrier = barrier; } @Override public void run() { try { System.out.println("線程 " + threadNumber + " 正在處理任務(wù)"); Thread.sleep(2000); // 模擬任務(wù)處理時(shí)間 System.out.println("線程 " + threadNumber + " 完成任務(wù),等待其他線程"); barrier.await(); // 等待其他線程完成 } catch (Exception e) { e.printStackTrace(); } } } }
在這個(gè)例子中,咱們創(chuàng)建了四個(gè)線程,每個(gè)線程都代表一個(gè)數(shù)據(jù)處理任務(wù)。這些線程在完成自己的任務(wù)后,會(huì)等待其他線程完成,然后一起進(jìn)入下一步。
第5章:深入CyclicBarrier的API
咱們已經(jīng)看到了CyclicBarrier在實(shí)際場(chǎng)景中的一些應(yīng)用,現(xiàn)在小黑要帶大家更深入地了解一下CyclicBarrier的API。理解這些API對(duì)于充分利用CyclicBarrier的功能是至關(guān)重要的。
基本方法
CyclicBarrier提供了一些核心的方法來控制線程間的同步:
CyclicBarrier(int parties)
: 這是CyclicBarrier的構(gòu)造函數(shù),parties
指的是必須調(diào)用await
方法的線程數(shù)量。CyclicBarrier(int parties, Runnable barrierAction)
: 這個(gè)構(gòu)造函數(shù)除了指定線程數(shù)外,還可以指定當(dāng)所有線程都到達(dá)屏障時(shí),要執(zhí)行的操作。await()
: 線程調(diào)用這個(gè)方法告訴CyclicBarrier它已到達(dá)屏障點(diǎn)。如果所有線程都到達(dá)屏障,它們就會(huì)繼續(xù)執(zhí)行;否則,調(diào)用await
的線程會(huì)阻塞,等待其他線程。
示例:使用CyclicBarrier同步任務(wù)
為了更好地理解這些API,小黑準(zhǔn)備了一個(gè)具體的例子。假設(shè)咱們有一個(gè)任務(wù),需要多個(gè)線程協(xié)作完成,每個(gè)線程執(zhí)行完各自的部分后,需要等待其他線程也執(zhí)行完畢,然后統(tǒng)一進(jìn)行下一步操作。
import java.util.concurrent.CyclicBarrier; public class CyclicBarrierApiExample { public static void main(String[] args) { // 定義一個(gè)新的CyclicBarrier,需要3個(gè)線程協(xié)作 CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("所有線程準(zhǔn)備就緒,開始下一步操作")); for (int i = 0; i < 3; i++) { new Thread(new Task(barrier), "線程 " + i).start(); } } static class Task implements Runnable { private final CyclicBarrier barrier; Task(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " 正在執(zhí)行任務(wù)"); Thread.sleep(1000); // 模擬任務(wù)執(zhí)行時(shí)間 System.out.println(Thread.currentThread().getName() + " 完成任務(wù),等待其他線程"); barrier.await(); // 等待其他線程 System.out.println(Thread.currentThread().getName() + " 開始執(zhí)行后續(xù)操作"); } catch (Exception e) { e.printStackTrace(); } } } }
在這個(gè)例子中,每個(gè)線程都在執(zhí)行它的任務(wù)。完成任務(wù)后,它會(huì)等待其他線程也完成任務(wù)。只有當(dāng)所有線程都執(zhí)行了barrier.await()
方法之后,才會(huì)執(zhí)行CyclicBarrier的barrierAction
,即打印出“所有線程準(zhǔn)備就緒,開始下一步操作”。
異常處理
處理異常也是使用CyclicBarrier時(shí)需要考慮的一個(gè)重要方面。如果任何線程在等待過程中被中斷或超時(shí),或者屏障被重置,或者屏障的await
方法被中斷,BrokenBarrierException
或InterruptedException
將會(huì)被拋出。這些異常需要被妥善處理,以確保程序的健壯性和正確性。
第6章:CyclicBarrier的高級(jí)用法
動(dòng)態(tài)調(diào)整參與線程數(shù)
CyclicBarrier提供了一種機(jī)制,允許在運(yùn)行時(shí)動(dòng)態(tài)調(diào)整等待的線程數(shù)量。這在一些動(dòng)態(tài)變化的并發(fā)場(chǎng)景中非常有用,比如線程數(shù)量會(huì)根據(jù)任務(wù)的不同而變化。
為了展示這個(gè)特性,小黑寫了以下的例子。在這個(gè)例子中,咱們會(huì)創(chuàng)建一個(gè)CyclicBarrier,并在運(yùn)行時(shí)根據(jù)需要?jiǎng)討B(tài)調(diào)整它的屏障點(diǎn):
import java.util.concurrent.CyclicBarrier; public class DynamicCyclicBarrierExample { public static void main(String[] args) { // 初始時(shí),屏障點(diǎn)設(shè)置為3 CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("屏障點(diǎn)動(dòng)作執(zhí)行")); for (int i = 0; i < 2; i++) { // 初始只啟動(dòng)兩個(gè)線程 new Thread(new Worker(barrier), "線程 " + i).start(); } // 動(dòng)態(tài)調(diào)整屏障點(diǎn),現(xiàn)在需要4個(gè)線程到達(dá)屏障點(diǎn) barrier.reset(); // 重置CyclicBarrier,這也會(huì)打破任何當(dāng)前等待的線程 barrier = new CyclicBarrier(4, () -> System.out.println("新的屏障點(diǎn)動(dòng)作執(zhí)行")); for (int i = 0; i < 4; i++) { // 現(xiàn)在啟動(dòng)四個(gè)線程 new Thread(new Worker(barrier), "線程 " + i).start(); } } static class Worker implements Runnable { private final CyclicBarrier barrier; Worker(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " 到達(dá)屏障點(diǎn)"); barrier.await(); System.out.println(Thread.currentThread().getName() + " 繼續(xù)執(zhí)行"); } catch (Exception e) { e.printStackTrace(); } } } }
在這個(gè)例子中,咱們先創(chuàng)建了一個(gè)需要3個(gè)線程到達(dá)的CyclicBarrier。但后來因?yàn)樾枨笞兓覀兺ㄟ^調(diào)用reset()
方法重置了CyclicBarrier,并創(chuàng)建了一個(gè)新的CyclicBarrier,這次需要4個(gè)線程。這展示了如何根據(jù)實(shí)際情況調(diào)整同步點(diǎn)的數(shù)量。
結(jié)合其他并發(fā)工具使用
CyclicBarrier還可以與Java的其他并發(fā)工具一起使用,以解決更復(fù)雜的并發(fā)問題。例如,可以將其與ExecutorService結(jié)合使用,以管理線程池中的一組任務(wù)。
看看下面的例子,小黑展示了如何將CyclicBarrier與線程池結(jié)合使用:
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierWithExecutorServiceExample { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(4); CyclicBarrier barrier = new CyclicBarrier(4, () -> System.out.println("所有任務(wù)完成,準(zhǔn)備下一輪執(zhí)行")); for (int i = 0; i < 4; i++) { executorService.execute(new Worker(barrier)); } executorService.shutdown(); } static class Worker implements Runnable { private final CyclicBarrier barrier; Worker(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " 正在執(zhí)行任務(wù)"); Thread.sleep(1000); barrier.await(); System.out.println(Thread.currentThread().getName() + " 任務(wù)完成"); } catch (Exception e) { e.printStackTrace(); } } } }
第7章:CyclicBarrier的問題和解決方案
1. BrokenBarrierException的處理
當(dāng)參與CyclicBarrier的某個(gè)線程在等待期間被中斷,或者CyclicBarrier被重置,或者在屏障點(diǎn)等待的線程超時(shí)時(shí),就會(huì)拋出BrokenBarrierException
異常。這通常意味著CyclicBarrier無法正常工作。
解決這個(gè)問題的關(guān)鍵是要正確處理這個(gè)異常。咱們可以設(shè)置適當(dāng)?shù)漠惓L幚磉壿?,確保即使在出現(xiàn)異常時(shí),程序也能以一種預(yù)期的方式繼續(xù)運(yùn)行。例如,可以在捕獲到BrokenBarrierException
時(shí)重置CyclicBarrier,或者采取其他恢復(fù)措施。
2. 超時(shí)的處理
如果咱們希望線程在等待達(dá)到屏障點(diǎn)的過程中不要無限期地等待,可以使用await(long timeout, TimeUnit unit)
方法,為等待設(shè)置一個(gè)超時(shí)時(shí)間。如果在指定的時(shí)間內(nèi)沒有所有的線程都到達(dá)屏障點(diǎn),就會(huì)拋出TimeoutException
。
處理超時(shí)的策略可能包括重試機(jī)制或者回退邏輯。但重要的是要確保所有的線程在超時(shí)后都能正確地處理這種情況,避免資源泄漏或者線程阻塞。
示例代碼:處理異常和超時(shí)
下面是一個(gè)示例,展示了如何在CyclicBarrier中處理BrokenBarrierException
和超時(shí)異常:
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class CyclicBarrierExceptionHandlingExample { public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(3); for (int i = 0; i < 3; i++) { new Thread(new Task(barrier), "線程 " + i).start(); } } static class Task implements Runnable { private final CyclicBarrier barrier; Task(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " 正在執(zhí)行任務(wù)"); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " 到達(dá)屏障點(diǎn),等待其他線程"); barrier.await(2, TimeUnit.SECONDS); // 設(shè)置超時(shí)時(shí)間為2秒 } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " 被中斷"); } catch (BrokenBarrierException e) { System.out.println(Thread.currentThread().getName() + " 檢測(cè)到屏障損壞"); } catch (TimeoutException e) { System.out.println(Thread.currentThread().getName() + " 等待超時(shí)"); } System.out.println(Thread.currentThread().getName() + " 繼續(xù)執(zhí)行后續(xù)操作"); } } }
在這個(gè)例子中,每個(gè)線程在執(zhí)行任務(wù)后會(huì)嘗試等待其他線程,但如果等待超過2秒,就會(huì)拋出TimeoutException
。同時(shí),這個(gè)代碼也演示了如何處理InterruptedException
和BrokenBarrierException
,確保線程在異常發(fā)生時(shí)能夠正確地繼續(xù)執(zhí)行。
3. CyclicBarrier重置問題
在使用CyclicBarrier時(shí),還可能遇到需要重置屏障的情況。這可以通過調(diào)用reset()
方法實(shí)現(xiàn),但要注意這個(gè)操作會(huì)打破正在等待的線程。因此,在重置CyclicBarrier之前,需要確保所有線程都已經(jīng)離開屏障點(diǎn),或者咱們?cè)敢饨邮艽驍嗨鼈兊牡却^程。
總結(jié)
- 基本用法:CyclicBarrier主要用于協(xié)調(diào)多個(gè)線程,確保它們?cè)诶^續(xù)執(zhí)行之前在某個(gè)公共點(diǎn)同步。
- 重用性:一個(gè)CyclicBarrier可以被重復(fù)使用,這對(duì)于那些分階段執(zhí)行的多線程任務(wù)非常有用。
- 異常處理:正確處理
BrokenBarrierException
和TimeoutException
對(duì)于構(gòu)建健壯的并發(fā)應(yīng)用至關(guān)重要。 - 與其他工具的結(jié)合:CyclicBarrier可以與Java的其他并發(fā)工具,如ExecutorService,配合使用,以處理更復(fù)雜的并發(fā)場(chǎng)景。
學(xué)習(xí)并發(fā)編程是一個(gè)持續(xù)的過程。技術(shù)總是在發(fā)展,新的挑戰(zhàn)總是在出現(xiàn)。保持好奇心,不斷學(xué)習(xí),小黑相信你會(huì)在這條路上越走越遠(yuǎn)!
以上就是Java多線程之循環(huán)柵欄技術(shù)CyclicBarrier使用探索的詳細(xì)內(nèi)容,更多關(guān)于Java多線程CyclicBarrier的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
全面匯總SpringBoot和SpringClould常用注解
Java注解是附加在代碼中的一些元信息,用于一些工具在編譯、運(yùn)行時(shí)進(jìn)行解析和使用,起到說明、配置的功能,這篇文章就帶你來了解一下2021-08-08Java service層獲取HttpServletRequest工具類的方法
今天小編就為大家分享一篇關(guān)于Java service層獲取HttpServletRequest工具類的方法,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-12-12Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(50)
下面小編就為大家?guī)硪黄狫ava基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧,希望可以幫到你2021-08-08Spring Boot 3.x 集成 Eureka Server/Cl
隨著SpringBoot 3.x版本的開發(fā)嘗試,本文記錄了在集成Eureka Server/Client時(shí)所遇到的問題和解決方案,文中詳細(xì)介紹了搭建服務(wù)、配置文件和測(cè)試步驟,感興趣的朋友跟隨小編一起看看吧2024-09-09Maven項(xiàng)目部署到服務(wù)器設(shè)置訪問路徑以及配置虛擬目錄的方法
今天小編就為大家分享一篇關(guān)于Maven項(xiàng)目部署到服務(wù)器設(shè)置訪問路徑以及配置虛擬目錄的方法,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-02-02Mybatis-plus中的@EnumValue注解使用詳解
這篇文章主要介紹了Mybatis-plus中的@EnumValue注解使用詳解,在PO類中,如果我們直接使用枚舉類型去映射數(shù)據(jù)庫(kù)的對(duì)應(yīng)字段保存時(shí),往往就會(huì)因?yàn)轭愋筒黄ヅ鋵?dǎo)致映射失敗,Mybatis-plus提供了一種解決辦法,就是使用@EnumValue注解,需要的朋友可以參考下2024-02-02