Java線程的并發(fā)工具類實(shí)現(xiàn)原理解析
在JDK的并發(fā)包里提供了幾個(gè)非常有用的并發(fā)工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種并發(fā)流程控制的手段,Exchanger工具類則提供了在線程間交換數(shù)據(jù)的一種手段。本章會(huì)配合一些應(yīng)用場(chǎng)景來(lái)介紹如何使用這些工具類。
一、fork/join
1. Fork-Join原理
在必要的情況下,將一個(gè)大任務(wù),拆分(fork)成若干個(gè)小任務(wù),然后再將一個(gè)個(gè)小任務(wù)的結(jié)果進(jìn)行匯總(join)。
適用場(chǎng)景:大數(shù)據(jù)量統(tǒng)計(jì)類任務(wù)。
2. 工作竊取
Fork/Join在實(shí)現(xiàn)上,大任務(wù)拆分出來(lái)的小任務(wù)會(huì)被分發(fā)到不同的隊(duì)列里面,每一個(gè)隊(duì)列都會(huì)用一個(gè)線程來(lái)消費(fèi),這是為了獲取任務(wù)時(shí)的多線程競(jìng)爭(zhēng),但是某些線程會(huì)提前消費(fèi)完自己的隊(duì)列。而有些線程沒有及時(shí)消費(fèi)完隊(duì)列,這個(gè)時(shí)候,完成了任務(wù)的線程就會(huì)去竊取那些沒有消費(fèi)完成的線程的任務(wù)隊(duì)列,為了減少線程競(jìng)爭(zhēng),F(xiàn)ork/Join使用雙端隊(duì)列來(lái)存取小任務(wù),分配給這個(gè)隊(duì)列的線程會(huì)一直從頭取得一個(gè)任務(wù)然后執(zhí)行,而竊取線程總是從隊(duì)列的尾端拉取task。
3. 代碼實(shí)現(xiàn)
我們要使用 ForkJoin 框架,必須首先創(chuàng)建一個(gè) ForkJoin 任務(wù)。它提供在任務(wù)中執(zhí)行 fork 和 join 的操作機(jī)制,通常我們不直接繼承 ForkjoinTask 類,只需要直接繼承其子類。
1、RecursiveAction,用于沒有返回結(jié)果的任務(wù)。
2、RecursiveTask,用于有返回值的任務(wù)。
task 要通過 ForkJoinPool 來(lái)執(zhí)行,使用 invoke、execute、submit提交,兩者的區(qū)別是:invoke 是同步執(zhí)行,調(diào)用之后需要等待任務(wù)完成,才能執(zhí)行后面的代碼;execute、submit 是異步執(zhí)行。
示例1:長(zhǎng)度400萬(wàn)的隨機(jī)數(shù)組求和,使用RecursiveTask 。
/** * 隨機(jī)產(chǎn)生ARRAY_LENGTH長(zhǎng)的的隨機(jī)數(shù)組 */ public class MakeArray { // 數(shù)組長(zhǎng)度 public static final int ARRAY_LENGTH = 4000000; public static int[] makeArray() { // new一個(gè)隨機(jī)數(shù)發(fā)生器 Random r = new Random(); int[] result = new int[ARRAY_LENGTH]; for (int i = 0; i < ARRAY_LENGTH; i++) { // 用隨機(jī)數(shù)填充數(shù)組 result[i] = r.nextInt(ARRAY_LENGTH * 3); } return result; } } public class SumArray { private static class SumTask extends RecursiveTask<Integer> { // 閾值 private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10; private int[] src; private int fromIndex; private int toIndex; public SumTask(int[] src, int fromIndex, int toIndex) { this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; } @Override protected Integer compute() { // 任務(wù)的大小是否合適 if ((toIndex - fromIndex) < THRESHOLD) { System.out.println(" from index = " + fromIndex + " toIndex=" + toIndex); int count = 0; for (int i = fromIndex; i <= toIndex; i++) { count = count + src[i]; } return count; } else { // fromIndex....mid.....toIndex int mid = (fromIndex + toIndex) / 2; SumTask left = new SumTask(src, fromIndex, mid); SumTask right = new SumTask(src, mid + 1, toIndex); invokeAll(left, right); return left.join() + right.join(); } } } public static void main(String[] args) { int[] src = MakeArray.makeArray(); // new出池的實(shí)例 ForkJoinPool pool = new ForkJoinPool(); // new出Task的實(shí)例 SumTask innerFind = new SumTask(src, 0, src.length - 1); long start = System.currentTimeMillis(); // invoke阻塞方法 pool.invoke(innerFind); System.out.println("Task is Running....."); System.out.println("The count is " + innerFind.join() + " spend time:" + (System.currentTimeMillis() - start) + "ms"); } }
示例2:遍歷指定目錄(含子目錄)下面的txt文件。
public class FindDirsFiles extends RecursiveAction { private File path; public FindDirsFiles(File path) { this.path = path; } @Override protected void compute() { List<FindDirsFiles> subTasks = new ArrayList<>(); File[] files = path.listFiles(); if (files!=null){ for (File file : files) { if (file.isDirectory()) { // 對(duì)每個(gè)子目錄都新建一個(gè)子任務(wù)。 subTasks.add(new FindDirsFiles(file)); } else { // 遇到文件,檢查。 if (file.getAbsolutePath().endsWith("txt")){ System.out.println("文件:" + file.getAbsolutePath()); } } } if (!subTasks.isEmpty()) { // 在當(dāng)前的 ForkJoinPool 上調(diào)度所有的子任務(wù)。 for (FindDirsFiles subTask : invokeAll(subTasks)) { subTask.join(); } } } } public static void main(String [] args){ try { // 用一個(gè) ForkJoinPool 實(shí)例調(diào)度總?cè)蝿?wù) ForkJoinPool pool = new ForkJoinPool(); FindDirsFiles task = new FindDirsFiles(new File("F:/")); // 異步提交 pool.execute(task); // 主線程做自己的業(yè)務(wù)工作 System.out.println("Task is Running......"); Thread.sleep(1); int otherWork = 0; for(int i=0;i<100;i++){ otherWork = otherWork+i; } System.out.println("Main Thread done sth......,otherWork=" + otherWork); System.out.println("Task end"); } catch (Exception e) { e.printStackTrace(); } } }
二、CountDownLatch
閉鎖,CountDownLatch 這個(gè)類能夠使一個(gè)線程等待其他線程完成各自的工作后再執(zhí)行。例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有的框架服務(wù)之后再執(zhí)行。
CountDownLatch 是通過一個(gè)計(jì)數(shù)器來(lái)實(shí)現(xiàn)的,計(jì)數(shù)器的初始值為初始任務(wù)的數(shù)量。每當(dāng)完成了一個(gè)任務(wù)后,計(jì)數(shù)器的值就會(huì)減 1(CountDownLatch.countDown()方法)。當(dāng)計(jì)數(shù)器值到達(dá) 0 時(shí),它表示所有的已經(jīng)完成了任務(wù),然后在閉鎖上等待 CountDownLatch.await()方法的線程就可以恢復(fù)執(zhí)行任務(wù)。
示例代碼:
public class CountDownLatchTest { private static CountDownLatch countDownLatch = new CountDownLatch(2); private static class BusinessThread extends Thread { @Override public void run() { try { System.out.println("BusinessThread " + Thread.currentThread().getName() + " start...."); Thread.sleep(3000); System.out.println("BusinessThread " + Thread.currentThread().getName() + " end....."); // 計(jì)數(shù)器減1 countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { System.out.println("main start...."); new BusinessThread().start(); new BusinessThread().start(); // 等待countDownLatch計(jì)數(shù)器為零后執(zhí)行后面代碼 countDownLatch.await(); System.out.println("main end"); } }
注意點(diǎn):
1、CountDownLatch(2)并不代表對(duì)應(yīng)兩個(gè)線程。
2、一個(gè)線程中可以多次countDownLatch.countDown(),比如在一個(gè)線程中countDown兩次或者多次。
三、CyclicBarrier
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行。
CyclicBarrier 默認(rèn)的構(gòu)造方法是 CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用 await 方法告訴 CyclicBarrier 我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
CyclicBarrier 還提供一個(gè)更高級(jí)的構(gòu)造函數(shù) CyclicBarrie(r int parties,Runnable barrierAction),用于在線程全部到達(dá)屏障時(shí),優(yōu)先執(zhí)行 barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場(chǎng)景。
示例代碼:
public class CyclicBarrierTest { private static CyclicBarrier barrier = new CyclicBarrier(4, new CollectThread()); /** * 存放子線程工作結(jié)果的容器 */ private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>(); public static void main(String[] args) { for (int i = 0; i < 4; i++) { Thread thread = new Thread(new SubThread()); thread.start(); } } /** * 匯總的任務(wù) */ private static class CollectThread implements Runnable { @Override public void run() { StringBuilder result = new StringBuilder(); for (Map.Entry<String, Long> workResult : resultMap.entrySet()) { result.append("[" + workResult.getValue() + "]"); } System.out.println(" the result = " + result); System.out.println("colletThread end....."); } } /** * 相互等待的子線程 */ private static class SubThread implements Runnable { @Override public void run() { long id = Thread.currentThread().getId(); resultMap.put(Thread.currentThread().getId() + "", id); try { Thread.sleep(1000 + id); System.out.println("Thread_" + id + " end1....."); barrier.await(); Thread.sleep(1000 + id); System.out.println("Thread_" + id + " end2....."); barrier.await(); } catch (Exception e) { e.printStackTrace(); } } } }
注意: 一個(gè)線程中可以多次await();
四、Semaphore
Semaphore(信號(hào)量)是用來(lái)控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。應(yīng)用場(chǎng)景 Semaphore 可以用于做流量控制,特別是公用資源有限的應(yīng)用場(chǎng)景,比如數(shù)據(jù)庫(kù)連接池?cái)?shù)量。
方法:常用的前4個(gè)。
方法 | 描述 |
---|---|
acquire() | 獲取連接 |
release() | 歸還連接數(shù) |
intavailablePermits() | 返回此信號(hào)量中當(dāng)前可用的許可證數(shù) |
intgetQueueLength() | 返回正在等待獲取許可證的線程數(shù) |
void reducePermit(s int reduction) | 減少 reduction 個(gè)許可證,是個(gè) protected 方法 |
Collection getQueuedThreads() | 返回所有等待獲取許可證的線程集合,是個(gè) protected 方法 |
示例代碼:模擬數(shù)據(jù)庫(kù)連接池。
/** * 數(shù)據(jù)庫(kù)連接 */ public class SqlConnectImpl implements Connection { /** * 得到一個(gè)數(shù)據(jù)庫(kù)連接 */ public static final Connection fetchConnection(){ return new SqlConnectImpl(); } // 省略其他代碼 }
/** * 連接池代碼 */ public class DBPoolSemaphore { private final static int POOL_SIZE = 10; // 兩個(gè)指示器,分別表示池子還有可用連接和已用連接 private final Semaphore useful; private final Semaphore useless; // 存放數(shù)據(jù)庫(kù)連接的容器 private static LinkedList<Connection> pool = new LinkedList<Connection>(); // 初始化池 static { for (int i = 0; i < POOL_SIZE; i++) { pool.addLast(SqlConnectImpl.fetchConnection()); } } public DBPoolSemaphore() { this.useful = new Semaphore(10); this.useless = new Semaphore(0); } /** * 歸還連接 */ public void returnConnect(Connection connection) throws InterruptedException { if (connection != null) { System.out.println("當(dāng)前有" + useful.getQueueLength() + "個(gè)線程等待數(shù)據(jù)庫(kù)連接!!" + "可用連接數(shù):" + useful.availablePermits()); useless.acquire(); synchronized (pool) { pool.addLast(connection); } useful.release(); } } /** * 從池子拿連接 */ public Connection takeConnect() throws InterruptedException { useful.acquire(); Connection connection; synchronized (pool) { connection = pool.removeFirst(); } useless.release(); return connection; } }
/** * 測(cè)試代碼 */ public class AppTest { private static DBPoolSemaphore dbPool = new DBPoolSemaphore(); private static class BusiThread extends Thread { @Override public void run() { // 讓每個(gè)線程持有連接的時(shí)間不一樣 Random r = new Random(); long start = System.currentTimeMillis(); try { Connection connect = dbPool.takeConnect(); System.out.println("Thread_" + Thread.currentThread().getId() + "_獲取數(shù)據(jù)庫(kù)連接共耗時(shí)【" + (System.currentTimeMillis() - start) + "】ms."); //模擬業(yè)務(wù)操作,線程持有連接查詢數(shù)據(jù) Thread.sleep(100 + r.nextInt(100)); System.out.println("查詢數(shù)據(jù)完成,歸還連接!"); dbPool.returnConnect(connect); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { for (int i = 0; i < 50; i++) { Thread thread = new BusiThread(); thread.start(); } } }
當(dāng)然,你也可以使用一個(gè) semaphore 來(lái)實(shí)現(xiàn),不過需要注意的是 semaphore 的初始數(shù)量為10并不是固定的,如果你后面歸還連接時(shí) dbPool.returnConnect(new SqlConnectImpl()); 的話,那么他的數(shù)量會(huì)變成 11 。
五、Exchange
Exchanger(交換者)是一個(gè)用于線程間協(xié)作的工具類。Exchanger 用于進(jìn)行線程間的數(shù)據(jù)交換。它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn),兩個(gè)線程可以交換彼此的數(shù)據(jù)。這兩個(gè)線程通過 exchange() 方法交換數(shù)據(jù),如果第一個(gè)線程先執(zhí)行 exchange() 方法,它會(huì)一直等待第二個(gè)線程也執(zhí)行 exchange() 方法,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來(lái)的數(shù)據(jù)傳遞給對(duì)方。
但是這種只能在兩個(gè)線程種傳遞,適用面過于狹窄。
六、Callable、Future、FutureTask
- Runnable 是一個(gè)接口,在它里面只聲明了一個(gè) run()方法,由于 run()方法返回值為 void 類型,所以在執(zhí)行完任務(wù)之后無(wú)法返回任何結(jié)果。
- Callable 位于 java.util.concurrent 包下,它也是一個(gè)接口,在它里面也只聲明了一個(gè)方法,只不過這個(gè)方法叫做 call(),這是一個(gè)泛型接口,call()函數(shù)返回的類型就是傳遞進(jìn)來(lái)的 V 類型。
- Future 就是對(duì)于具體的 Runnable 或者 Callable 任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢是否完成、獲取結(jié)果。必要時(shí)可以通過 get 方法獲取執(zhí)行結(jié)果,該方法會(huì)阻塞直到任務(wù)返回結(jié)果。
- FutureTask 因?yàn)?Future 只是一個(gè)接口,所以是無(wú)法直接用來(lái)創(chuàng)建對(duì)象使用的,因此就有了 FutureTask 。
關(guān)系圖示:
所以,我們可以通過 FutureTask 把一個(gè) Callable 包裝成 Runnable,然后再通過這個(gè) FutureTask 拿到 Callable 運(yùn)行后的返回值。
示例代碼:
public class FutureTaskTest { private static class CallableTest implements Callable<Integer> { private int sum = 0; @Override public Integer call() throws Exception { System.out.println("Callable 子線程開始計(jì)算!"); for (int i = 0; i < 5000; i++) { if (Thread.currentThread().isInterrupted()) { System.out.println("Callable 子線程計(jì)算任務(wù)中斷!"); return null; } sum = sum + i; System.out.println("sum=" + sum); } System.out.println("Callable 子線程計(jì)算結(jié)束!結(jié)果為: " + sum); return sum; } } public static void main(String[] args) throws ExecutionException, InterruptedException { CallableTest callableTest = new CallableTest(); // 包裝 FutureTask<Integer> futureTask = new FutureTask<>(callableTest); new Thread(futureTask).start(); Random r = new Random(); if (r.nextInt(100) > 50) { // 如果r.nextInt(100) > 50則計(jì)算返回結(jié)果 System.out.println("sum = " + futureTask.get()); } else { // 如果r.nextInt(100) <= 50則取消計(jì)算 System.out.println("Cancel..."); futureTask.cancel(true); } } }
都讀到這里了,來(lái)個(gè) 點(diǎn)贊、評(píng)論、關(guān)注、收藏 吧!
文章作者:IT王小二
首發(fā)地址:https://www.itwxe.com/posts/e4f648cd/
版權(quán)聲明:文章內(nèi)容遵循 署名-非商業(yè)性使用-禁止演繹 4.0 國(guó)際 進(jìn)行許可,轉(zhuǎn)載請(qǐng)?jiān)谖恼马?yè)面明顯位置給出作者與原文鏈接。
到此這篇關(guān)于Java線程的并發(fā)工具類實(shí)現(xiàn)原理解析的文章就介紹到這了,更多相關(guān)java線程并發(fā)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot?整合?Quartz?定時(shí)任務(wù)框架詳解
這篇文章主要介紹了SpringBoot整合Quartz定時(shí)任務(wù)框架詳解,Quartz是一個(gè)完全由Java編寫的開源作業(yè)調(diào)度框架,為在Java應(yīng)用程序中進(jìn)行作業(yè)調(diào)度提供了簡(jiǎn)單卻強(qiáng)大的機(jī)制2022-08-08Springcloud服務(wù)注冊(cè)consul客戶端過程解析
這篇文章主要介紹了Springcloud服務(wù)注冊(cè)consul客戶端過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08Spring Boot2發(fā)布調(diào)用REST服務(wù)實(shí)現(xiàn)方法
這篇文章主要介紹了Spring Boot2發(fā)布調(diào)用REST服務(wù)實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04使用Jenkins自動(dòng)化構(gòu)建工具進(jìn)行敏捷開發(fā)
這篇文章主要為大家介紹了使用Jenkins自動(dòng)化構(gòu)建工具進(jìn)行敏捷開發(fā),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04子線程任務(wù)發(fā)生異常時(shí)主線程事務(wù)回滾示例過程
這篇文章主要為大家介紹了子線程任務(wù)發(fā)生了異常時(shí)主線程事務(wù)如何回滾的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03