Java中的并發(fā)工具類詳細解析
前言
CountDownLatch、 CyclicBarrier 和 Semaphore 工具類提供了一種并發(fā)流程控制的手段
Exchanger 工具類則提供了在線程間交換數(shù)據(jù)的一種手段。
等待多線程完成的 CountDownLatch
CountDownLatch 允許一個或多個線程等待其他線程完成操作。
public class CountDownLatchTest { staticCountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { System.out.println(1); // N-1;N為0時,退出await方法 c.countDown(); System.out.println(2); c.countDown(); } }).start(); // 帶指定時間的 await 方法——await(long time,TimeUnit unit) c.await(); System.out.println("3"); } }
CountDownLatch 的構(gòu)造函數(shù)接收一個 int 類型的參數(shù)作為計數(shù)器,如果你想等待 N 個點完成,這里就傳入 N。
當我們調(diào)用 CountDownLatch 的 countDown 方法時,N 就會減 1,CountDownLatch 的 await 方法會阻塞當前線程,直到 N 變成零。
由于 countDown 方法可以用在任何地方,所以這里說的 N 個點,可以是 N 個線程,也可以是 1 個線程里 的 N 個執(zhí)行步驟。
用在多個線程時,只需要把這個 CountDownLatch 的引用傳遞到線程 里即可。
計數(shù)器必須大于等于 0,只是等于 0 時候,計數(shù)器就是零,調(diào)用 await 方法時不會阻塞當前線程。
CountDownLatch 不可能重新初始化或者修改 CountDownLatch 對象的內(nèi)部計數(shù)器的值。
一個線程調(diào)用 countDown 方法 happen-before,另外一個線程調(diào)用 await 方法。
同步屏障 CyclicBarrier
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。
它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運行。
CyclicBarrier 簡介
CyclicBarrier 默認的構(gòu)造方法是 CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個線程調(diào)用 await 方法告訴 CyclicBarrier 我已經(jīng)到達了屏障,然后當前線程被阻塞。
public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { // 到達屏障 c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { // 到達屏障 c.await(); } catch (Exception e) { } System.out.println(2); // 1 2 或 2 1 到達順序不唯一 } }
CyclicBarrier 還提供一個更高級的構(gòu)造函數(shù) CyclicBarrier(int parties,Runnable barrier-Action),用于在線程到達屏障時,優(yōu)先執(zhí)行 barrierAction,方便處理更復雜的業(yè)務(wù)場景。
import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest2 { static CyclicBarrier c = new CyclicBarrier(2, new A()); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); // 3 1 2 /*因為 CyclicBarrier 設(shè)置了攔截線程的數(shù)量是 2, 所以必須等代碼中的第一個線程和線程 A 都執(zhí)行完之后, 才會繼續(xù)執(zhí)行主線程,然后輸出 2*/ } static class A implements Runnable { @Override public void run() { System.out.println(3); } } }
CyclicBarrier 的應(yīng)用場景
CyclicBarrier 可以用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的場景
/** * 銀行流水處理服務(wù)類 * * @authorftf */ public class BankWaterService implements Runnable { /** * 創(chuàng)建 4 個屏障,處理完之后執(zhí)行當前類的 run 方法 */ private CyclicBarrier c = new CyclicBarrier(4, this); /** * 假設(shè)只有 4 個 sheet,所以只啟動 4 個線程 */ private Executor executor = Executors.newFixedThreadPool(4); /** * 保存每個 sheet 計算出的銀流結(jié)果 */ private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>(); private void count() { for (int i = 0; i < 4; i++) { executor.execute(new Runnable() { @Override public void run() { // 計算當前 sheet 的銀流數(shù)據(jù),計算代碼省略 sheetBankWaterCount .put(Thread.currentThread().getName(), 1); // 銀流計算完成,插入一個屏障 try { c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); } } @Override public void run() { int result = 0; // 匯總每個 sheet 計算出的結(jié)果 for (Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) { result += sheet.getValue(); } // 將結(jié)果輸出 sheetBankWaterCount.put("result", result); System.out.println(result); } public static void main(String[] args) { BankWaterService bankWaterCount = new BankWaterService(); bankWaterCount.count(); } }
CyclicBarrier 和 CountDownLatch 的區(qū)別
CountDownLatch 的計數(shù)器只能使用一次,而 CyclicBarrier 的計數(shù)器可以使用 reset() 方法重置。所以 CyclicBarrier 能處理更為復雜的業(yè)務(wù)場景。
例如,如果計算發(fā)生錯誤, 可以重置計數(shù)器,并讓線程重新執(zhí)行一次。
CyclicBarrier 還提供其他有用的方法,比如 getNumberWaiting 方法可以獲得 CyclicBarrier 阻塞的線程數(shù)量。
isBroken()方法用來了解阻塞的線程是否被中斷。
public class CyclicBarrierTest3 { staticCyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) throws InterruptedException, BrokenBarrierException { Thread thread = new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } } }); thread.start(); thread.interrupt(); try { c.await(); } catch (Exception e) { System.out.println(c.isBroken()); //true } } }
控制并發(fā)線程數(shù)的 Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。
從字面上很難理解 Semaphore 所表達的含義,只能把它比作是 控制流量的紅綠燈。比如××馬路要限制流量,只允許同時有一百輛車在這條路上行使, 其他的都必須在路口等待,所以前一百輛車會看到綠燈,可以開進這條馬路,后面的車 會看到紅燈,不能駛?cè)?times;×馬路,但是如果前一百輛中有 5 輛車已經(jīng)離開了××馬路,那么 后面就允許有 5 輛車駛?cè)腭R路,這個例子里說的車就是線程,駛?cè)腭R路就表示線程在執(zhí) 行,離開馬路就表示線程執(zhí)行完成,看見紅燈就表示線程被阻塞,不能執(zhí)行。
Semaphore 可以用于做流量控制,特別是公用資源有限的應(yīng)用場景,比如數(shù)據(jù)庫連 接。假如有一個需求,要讀取幾萬個文件的數(shù)據(jù),因為都是 IO 密集型任務(wù),我們可以啟 動幾十個線程并發(fā)地讀取,但是如果讀到內(nèi)存后,還需要存儲到數(shù)據(jù)庫中,而數(shù)據(jù)庫的 連接數(shù)只有 10 個,這時我們必須控制只有 10 個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否 則會報錯無法獲取數(shù)據(jù)庫連接。這個時候,就可以使用 Semaphore 來做流量控制
public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System.out.println("save data"); s.release(); } catch (InterruptedException e) { } } }); } threadPool.shutdown(); } }
雖然有 30 個線程在執(zhí)行,但是只允許 10 個并發(fā)執(zhí)行。Semaphore 的構(gòu) 造方法 Semaphore(int permits)接受一個整型的數(shù)字,表示可用的許可證數(shù)量。 Semaphore(10)表示允許 10 個線程獲取許可證,也就是最大并發(fā)數(shù)是 10。Semaphore 的用法也很簡單,首先線程使用 Semaphore 的 acquire()方法獲取一個許可證,使用完之后調(diào)用 release()方法歸還許可證。還可以用 tryAcquire()方法嘗試獲取許可證。
Semaphore 還提供一些其他方法,具體如下。
- intavailablePermits():返回此信號量中當前可用的許可證數(shù)。
- intgetQueueLength():返回正在等待獲取許可證的線程數(shù)。
- booleanhasQueuedThreads():是否有線程正在等待獲取許可證。
- void reducePermits(int reduction):減少 reduction 個許可證,是個 protected 方 法。
- Collection getQueuedThreads():返回所有等待獲取許可證的線程集合,是個 protected 方法。
線程間交換數(shù)據(jù)的 Exchanger
Exchanger(交換者)是一個用于線程間協(xié)作的工具類。Exchanger 用于進行線程間 的數(shù)據(jù)交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)。這兩個線程通過 exchange 方法交換數(shù)據(jù),如果第一個線程先執(zhí)行 exchange()方法,它會一直等待第二個線程也執(zhí)行 exchange 方法,當兩個線程都到達同步點時,這兩個線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方。
Exchanger 可以用于遺傳算法,遺傳算法里需要選出兩個人作為交配對象,這時候會 交換兩人的數(shù)據(jù),并使用交叉規(guī)則得出 2 個交配結(jié)果。Exchanger 也可以用于校對工作, 比如我們需要將紙制銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,采 用 AB 崗兩人進行錄入,錄入到 Excel 之后,系統(tǒng)需要加載這兩個 Excel,并對兩個 Excel 數(shù)據(jù)進行校對,看看是否錄入一致
public class ExchangerTest { private static final Exchanger<String> exgr = new Exchanger<String>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new Runnable() { @Override public void run() { try { String A = "銀行流水 A"; // A 錄入銀行流水數(shù)據(jù) exgr.exchange(A); } catch (InterruptedException e) { } } }); threadPool.execute(new Runnable() { @Override public void run() { try { String B = "銀行流水 B"; // B 錄入銀行流水數(shù)據(jù) String A = exgr.exchange(B); System.out.println("A 和 B 數(shù)據(jù)是否一致:" + A.equals(B) + ",A 錄入的是:" + A + ",B 錄入是:" + B); } catch (InterruptedException e) { } } }); threadPool.shutdown(); } }
如果兩個線程有一個沒有執(zhí)行 exchange()方法,則會一直等待,如果擔心有特殊情 況發(fā)生,避免一直等待,可以使用 exchange(V x,longtimeout,TimeUnit unit)設(shè)置最大等待時長。
到此這篇關(guān)于Java中的并發(fā)工具類詳細解析的文章就介紹到這了,更多相關(guān)Java中的并發(fā)工具內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Spring Data JPA系列之投影(Projection)的用法
本篇文章主要介紹了詳解Spring Data JPA系列之投影(Projection)的用法,具有一定的參考價值,有興趣的可以了解一下2017-07-07深入理解JVM之Java對象的創(chuàng)建、內(nèi)存布局、訪問定位詳解
這篇文章主要介紹了深入理解JVM之Java對象的創(chuàng)建、內(nèi)存布局、訪問定位,結(jié)合實例形式詳細分析了Java對象的創(chuàng)建、內(nèi)存布局、訪問定位相關(guān)概念、原理、操作技巧與注意事項,需要的朋友可以參考下2019-09-09Mybatis批量修改聯(lián)合主鍵數(shù)據(jù)的兩種方法
最近遇上需要批量修改有聯(lián)合主鍵的表數(shù)據(jù),找很多資料都不是太合適,最終自己摸索總結(jié)了兩種方式可以批量修改數(shù)據(jù),對Mybatis批量修改數(shù)據(jù)相關(guān)知識感興趣的朋友一起看看吧2022-04-04詳解Java8新特性Stream之list轉(zhuǎn)map及問題解決
這篇文章主要介紹了詳解Java8新特性Stream之list轉(zhuǎn)map及問題解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-09-09