Java線程的并發(fā)工具類實現(xiàn)原理解析
在JDK的并發(fā)包里提供了幾個非常有用的并發(fā)工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種并發(fā)流程控制的手段,Exchanger工具類則提供了在線程間交換數(shù)據(jù)的一種手段。本章會配合一些應(yīng)用場景來介紹如何使用這些工具類。
一、fork/join
1. Fork-Join原理
在必要的情況下,將一個大任務(wù),拆分(fork)成若干個小任務(wù),然后再將一個個小任務(wù)的結(jié)果進(jìn)行匯總(join)。
適用場景:大數(shù)據(jù)量統(tǒng)計類任務(wù)。

2. 工作竊取
Fork/Join在實現(xiàn)上,大任務(wù)拆分出來的小任務(wù)會被分發(fā)到不同的隊列里面,每一個隊列都會用一個線程來消費(fèi),這是為了獲取任務(wù)時的多線程競爭,但是某些線程會提前消費(fèi)完自己的隊列。而有些線程沒有及時消費(fèi)完隊列,這個時候,完成了任務(wù)的線程就會去竊取那些沒有消費(fèi)完成的線程的任務(wù)隊列,為了減少線程競爭,F(xiàn)ork/Join使用雙端隊列來存取小任務(wù),分配給這個隊列的線程會一直從頭取得一個任務(wù)然后執(zhí)行,而竊取線程總是從隊列的尾端拉取task。
3. 代碼實現(xiàn)
我們要使用 ForkJoin 框架,必須首先創(chuàng)建一個 ForkJoin 任務(wù)。它提供在任務(wù)中執(zhí)行 fork 和 join 的操作機(jī)制,通常我們不直接繼承 ForkjoinTask 類,只需要直接繼承其子類。
1、RecursiveAction,用于沒有返回結(jié)果的任務(wù)。
2、RecursiveTask,用于有返回值的任務(wù)。
task 要通過 ForkJoinPool 來執(zhí)行,使用 invoke、execute、submit提交,兩者的區(qū)別是:invoke 是同步執(zhí)行,調(diào)用之后需要等待任務(wù)完成,才能執(zhí)行后面的代碼;execute、submit 是異步執(zhí)行。
示例1:長度400萬的隨機(jī)數(shù)組求和,使用RecursiveTask 。
/**
* 隨機(jī)產(chǎn)生ARRAY_LENGTH長的的隨機(jī)數(shù)組
*/
public class MakeArray {
// 數(shù)組長度
public static final int ARRAY_LENGTH = 4000000;
public static int[] makeArray() {
// new一個隨機(jī)數(shù)發(fā)生器
Random r = new Random();
int[] result = new int[ARRAY_LENGTH];
for (int i = 0; i < ARRAY_LENGTH; i++) {
// 用隨機(jī)數(shù)填充數(shù)組
result[i] = r.nextInt(ARRAY_LENGTH * 3);
}
return result;
}
}
public class SumArray {
private static class SumTask extends RecursiveTask<Integer> {
// 閾值
private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;
private int[] src;
private int fromIndex;
private int toIndex;
public SumTask(int[] src, int fromIndex, int toIndex) {
this.src = src;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}
@Override
protected Integer compute() {
// 任務(wù)的大小是否合適
if ((toIndex - fromIndex) < THRESHOLD) {
System.out.println(" from index = " + fromIndex + " toIndex=" + toIndex);
int count = 0;
for (int i = fromIndex; i <= toIndex; i++) {
count = count + src[i];
}
return count;
} else {
// fromIndex....mid.....toIndex
int mid = (fromIndex + toIndex) / 2;
SumTask left = new SumTask(src, fromIndex, mid);
SumTask right = new SumTask(src, mid + 1, toIndex);
invokeAll(left, right);
return left.join() + right.join();
}
}
}
public static void main(String[] args) {
int[] src = MakeArray.makeArray();
// new出池的實例
ForkJoinPool pool = new ForkJoinPool();
// new出Task的實例
SumTask innerFind = new SumTask(src, 0, src.length - 1);
long start = System.currentTimeMillis();
// invoke阻塞方法
pool.invoke(innerFind);
System.out.println("Task is Running.....");
System.out.println("The count is " + innerFind.join()
+ " spend time:" + (System.currentTimeMillis() - start) + "ms");
}
}
示例2:遍歷指定目錄(含子目錄)下面的txt文件。
public class FindDirsFiles extends RecursiveAction {
private File path;
public FindDirsFiles(File path) {
this.path = path;
}
@Override
protected void compute() {
List<FindDirsFiles> subTasks = new ArrayList<>();
File[] files = path.listFiles();
if (files!=null){
for (File file : files) {
if (file.isDirectory()) {
// 對每個子目錄都新建一個子任務(wù)。
subTasks.add(new FindDirsFiles(file));
} else {
// 遇到文件,檢查。
if (file.getAbsolutePath().endsWith("txt")){
System.out.println("文件:" + file.getAbsolutePath());
}
}
}
if (!subTasks.isEmpty()) {
// 在當(dāng)前的 ForkJoinPool 上調(diào)度所有的子任務(wù)。
for (FindDirsFiles subTask : invokeAll(subTasks)) {
subTask.join();
}
}
}
}
public static void main(String [] args){
try {
// 用一個 ForkJoinPool 實例調(diào)度總?cè)蝿?wù)
ForkJoinPool pool = new ForkJoinPool();
FindDirsFiles task = new FindDirsFiles(new File("F:/"));
// 異步提交
pool.execute(task);
// 主線程做自己的業(yè)務(wù)工作
System.out.println("Task is Running......");
Thread.sleep(1);
int otherWork = 0;
for(int i=0;i<100;i++){
otherWork = otherWork+i;
}
System.out.println("Main Thread done sth......,otherWork=" + otherWork);
System.out.println("Task end");
} catch (Exception e) {
e.printStackTrace();
}
}
}
二、CountDownLatch
閉鎖,CountDownLatch 這個類能夠使一個線程等待其他線程完成各自的工作后再執(zhí)行。例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動框架服務(wù)的線程已經(jīng)啟動所有的框架服務(wù)之后再執(zhí)行。
CountDownLatch 是通過一個計數(shù)器來實現(xiàn)的,計數(shù)器的初始值為初始任務(wù)的數(shù)量。每當(dāng)完成了一個任務(wù)后,計數(shù)器的值就會減 1(CountDownLatch.countDown()方法)。當(dāng)計數(shù)器值到達(dá) 0 時,它表示所有的已經(jīng)完成了任務(wù),然后在閉鎖上等待 CountDownLatch.await()方法的線程就可以恢復(fù)執(zhí)行任務(wù)。
示例代碼:
public class CountDownLatchTest {
private static CountDownLatch countDownLatch = new CountDownLatch(2);
private static class BusinessThread extends Thread {
@Override
public void run() {
try {
System.out.println("BusinessThread " + Thread.currentThread().getName() + " start....");
Thread.sleep(3000);
System.out.println("BusinessThread " + Thread.currentThread().getName() + " end.....");
// 計數(shù)器減1
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("main start....");
new BusinessThread().start();
new BusinessThread().start();
// 等待countDownLatch計數(shù)器為零后執(zhí)行后面代碼
countDownLatch.await();
System.out.println("main end");
}
}
注意點(diǎn):
1、CountDownLatch(2)并不代表對應(yīng)兩個線程。
2、一個線程中可以多次countDownLatch.countDown(),比如在一個線程中countDown兩次或者多次。
三、CyclicBarrier
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達(dá)一個屏障(也可以叫同步點(diǎn))時被阻塞,直到最后一個線程到達(dá)屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運(yùn)行。
CyclicBarrier 默認(rèn)的構(gòu)造方法是 CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個線程調(diào)用 await 方法告訴 CyclicBarrier 我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
CyclicBarrier 還提供一個更高級的構(gòu)造函數(shù) CyclicBarrie(r int parties,Runnable barrierAction),用于在線程全部到達(dá)屏障時,優(yōu)先執(zhí)行 barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場景。
示例代碼:
public class CyclicBarrierTest {
private static CyclicBarrier barrier = new CyclicBarrier(4, new CollectThread());
/**
* 存放子線程工作結(jié)果的容器
*/
private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
for (int i = 0; i < 4; i++) {
Thread thread = new Thread(new SubThread());
thread.start();
}
}
/**
* 匯總的任務(wù)
*/
private static class CollectThread implements Runnable {
@Override
public void run() {
StringBuilder result = new StringBuilder();
for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
result.append("[" + workResult.getValue() + "]");
}
System.out.println(" the result = " + result);
System.out.println("colletThread end.....");
}
}
/**
* 相互等待的子線程
*/
private static class SubThread implements Runnable {
@Override
public void run() {
long id = Thread.currentThread().getId();
resultMap.put(Thread.currentThread().getId() + "", id);
try {
Thread.sleep(1000 + id);
System.out.println("Thread_" + id + " end1.....");
barrier.await();
Thread.sleep(1000 + id);
System.out.println("Thread_" + id + " end2.....");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
注意: 一個線程中可以多次await();
四、Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。應(yīng)用場景 Semaphore 可以用于做流量控制,特別是公用資源有限的應(yīng)用場景,比如數(shù)據(jù)庫連接池數(shù)量。
方法:常用的前4個。
| 方法 | 描述 |
|---|---|
| acquire() | 獲取連接 |
| release() | 歸還連接數(shù) |
| intavailablePermits() | 返回此信號量中當(dāng)前可用的許可證數(shù) |
| intgetQueueLength() | 返回正在等待獲取許可證的線程數(shù) |
| void reducePermit(s int reduction) | 減少 reduction 個許可證,是個 protected 方法 |
| Collection getQueuedThreads() | 返回所有等待獲取許可證的線程集合,是個 protected 方法 |
示例代碼:模擬數(shù)據(jù)庫連接池。
/**
* 數(shù)據(jù)庫連接
*/
public class SqlConnectImpl implements Connection {
/**
* 得到一個數(shù)據(jù)庫連接
*/
public static final Connection fetchConnection(){
return new SqlConnectImpl();
}
// 省略其他代碼
}
/**
* 連接池代碼
*/
public class DBPoolSemaphore {
private final static int POOL_SIZE = 10;
// 兩個指示器,分別表示池子還有可用連接和已用連接
private final Semaphore useful;
private final Semaphore useless;
// 存放數(shù)據(jù)庫連接的容器
private static LinkedList<Connection> pool = new LinkedList<Connection>();
// 初始化池
static {
for (int i = 0; i < POOL_SIZE; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
public DBPoolSemaphore() {
this.useful = new Semaphore(10);
this.useless = new Semaphore(0);
}
/**
* 歸還連接
*/
public void returnConnect(Connection connection) throws InterruptedException {
if (connection != null) {
System.out.println("當(dāng)前有" + useful.getQueueLength() + "個線程等待數(shù)據(jù)庫連接!!"
+ "可用連接數(shù):" + useful.availablePermits());
useless.acquire();
synchronized (pool) {
pool.addLast(connection);
}
useful.release();
}
}
/**
* 從池子拿連接
*/
public Connection takeConnect() throws InterruptedException {
useful.acquire();
Connection connection;
synchronized (pool) {
connection = pool.removeFirst();
}
useless.release();
return connection;
}
}
/**
* 測試代碼
*/
public class AppTest {
private static DBPoolSemaphore dbPool = new DBPoolSemaphore();
private static class BusiThread extends Thread {
@Override
public void run() {
// 讓每個線程持有連接的時間不一樣
Random r = new Random();
long start = System.currentTimeMillis();
try {
Connection connect = dbPool.takeConnect();
System.out.println("Thread_" + Thread.currentThread().getId()
+ "_獲取數(shù)據(jù)庫連接共耗時【" + (System.currentTimeMillis() - start) + "】ms.");
//模擬業(yè)務(wù)操作,線程持有連接查詢數(shù)據(jù)
Thread.sleep(100 + r.nextInt(100));
System.out.println("查詢數(shù)據(jù)完成,歸還連接!");
dbPool.returnConnect(connect);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
Thread thread = new BusiThread();
thread.start();
}
}
}
當(dāng)然,你也可以使用一個 semaphore 來實現(xiàn),不過需要注意的是 semaphore 的初始數(shù)量為10并不是固定的,如果你后面歸還連接時 dbPool.returnConnect(new SqlConnectImpl()); 的話,那么他的數(shù)量會變成 11 。
五、Exchange
Exchanger(交換者)是一個用于線程間協(xié)作的工具類。Exchanger 用于進(jìn)行線程間的數(shù)據(jù)交換。它提供一個同步點(diǎn),在這個同步點(diǎn),兩個線程可以交換彼此的數(shù)據(jù)。這兩個線程通過 exchange() 方法交換數(shù)據(jù),如果第一個線程先執(zhí)行 exchange() 方法,它會一直等待第二個線程也執(zhí)行 exchange() 方法,當(dāng)兩個線程都到達(dá)同步點(diǎn)時,這兩個線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方。
但是這種只能在兩個線程種傳遞,適用面過于狹窄。
六、Callable、Future、FutureTask
- Runnable 是一個接口,在它里面只聲明了一個 run()方法,由于 run()方法返回值為 void 類型,所以在執(zhí)行完任務(wù)之后無法返回任何結(jié)果。
- Callable 位于 java.util.concurrent 包下,它也是一個接口,在它里面也只聲明了一個方法,只不過這個方法叫做 call(),這是一個泛型接口,call()函數(shù)返回的類型就是傳遞進(jìn)來的 V 類型。
- Future 就是對于具體的 Runnable 或者 Callable 任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢是否完成、獲取結(jié)果。必要時可以通過 get 方法獲取執(zhí)行結(jié)果,該方法會阻塞直到任務(wù)返回結(jié)果。
- FutureTask 因為 Future 只是一個接口,所以是無法直接用來創(chuàng)建對象使用的,因此就有了 FutureTask 。
關(guān)系圖示:

所以,我們可以通過 FutureTask 把一個 Callable 包裝成 Runnable,然后再通過這個 FutureTask 拿到 Callable 運(yùn)行后的返回值。
示例代碼:
public class FutureTaskTest {
private static class CallableTest implements Callable<Integer> {
private int sum = 0;
@Override
public Integer call() throws Exception {
System.out.println("Callable 子線程開始計算!");
for (int i = 0; i < 5000; i++) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Callable 子線程計算任務(wù)中斷!");
return null;
}
sum = sum + i;
System.out.println("sum=" + sum);
}
System.out.println("Callable 子線程計算結(jié)束!結(jié)果為: " + sum);
return sum;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableTest callableTest = new CallableTest();
// 包裝
FutureTask<Integer> futureTask = new FutureTask<>(callableTest);
new Thread(futureTask).start();
Random r = new Random();
if (r.nextInt(100) > 50) {
// 如果r.nextInt(100) > 50則計算返回結(jié)果
System.out.println("sum = " + futureTask.get());
} else {
// 如果r.nextInt(100) <= 50則取消計算
System.out.println("Cancel...");
futureTask.cancel(true);
}
}
}
都讀到這里了,來個 點(diǎn)贊、評論、關(guān)注、收藏 吧!
文章作者:IT王小二
首發(fā)地址:https://www.itwxe.com/posts/e4f648cd/
版權(quán)聲明:文章內(nèi)容遵循 署名-非商業(yè)性使用-禁止演繹 4.0 國際 進(jìn)行許可,轉(zhuǎn)載請在文章頁面明顯位置給出作者與原文鏈接。
到此這篇關(guān)于Java線程的并發(fā)工具類實現(xiàn)原理解析的文章就介紹到這了,更多相關(guān)java線程并發(fā)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot?整合?Quartz?定時任務(wù)框架詳解
這篇文章主要介紹了SpringBoot整合Quartz定時任務(wù)框架詳解,Quartz是一個完全由Java編寫的開源作業(yè)調(diào)度框架,為在Java應(yīng)用程序中進(jìn)行作業(yè)調(diào)度提供了簡單卻強(qiáng)大的機(jī)制2022-08-08
Springcloud服務(wù)注冊consul客戶端過程解析
這篇文章主要介紹了Springcloud服務(wù)注冊consul客戶端過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-08-08
Spring Boot2發(fā)布調(diào)用REST服務(wù)實現(xiàn)方法
這篇文章主要介紹了Spring Boot2發(fā)布調(diào)用REST服務(wù)實現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-04-04
使用Jenkins自動化構(gòu)建工具進(jìn)行敏捷開發(fā)
這篇文章主要為大家介紹了使用Jenkins自動化構(gòu)建工具進(jìn)行敏捷開發(fā),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04
子線程任務(wù)發(fā)生異常時主線程事務(wù)回滾示例過程
這篇文章主要為大家介紹了子線程任務(wù)發(fā)生了異常時主線程事務(wù)如何回滾的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03

