欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java線程的并發(fā)工具類實(shí)現(xiàn)原理解析

 更新時(shí)間:2021年06月21日 10:44:05   作者:IT王小二  
本文給大家講解Java線程的并發(fā)工具類的一些知識(shí),通過適用場(chǎng)景分析大數(shù)據(jù)量統(tǒng)計(jì)類任務(wù)的實(shí)現(xiàn)原理和封裝,多個(gè)示例代碼講解的非常詳細(xì),對(duì)java線程并發(fā)工具類相關(guān)知識(shí)感興趣的朋友一起學(xué)習(xí)下吧

在JDK的并發(fā)包里提供了幾個(gè)非常有用的并發(fā)工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種并發(fā)流程控制的手段,Exchanger工具類則提供了在線程間交換數(shù)據(jù)的一種手段。本章會(huì)配合一些應(yīng)用場(chǎng)景來(lái)介紹如何使用這些工具類。

一、fork/join

1. Fork-Join原理

在必要的情況下,將一個(gè)大任務(wù),拆分(fork)成若干個(gè)小任務(wù),然后再將一個(gè)個(gè)小任務(wù)的結(jié)果進(jìn)行匯總(join)。

適用場(chǎng)景:大數(shù)據(jù)量統(tǒng)計(jì)類任務(wù)。

2. 工作竊取

Fork/Join在實(shí)現(xiàn)上,大任務(wù)拆分出來(lái)的小任務(wù)會(huì)被分發(fā)到不同的隊(duì)列里面,每一個(gè)隊(duì)列都會(huì)用一個(gè)線程來(lái)消費(fèi),這是為了獲取任務(wù)時(shí)的多線程競(jìng)爭(zhēng),但是某些線程會(huì)提前消費(fèi)完自己的隊(duì)列。而有些線程沒有及時(shí)消費(fèi)完隊(duì)列,這個(gè)時(shí)候,完成了任務(wù)的線程就會(huì)去竊取那些沒有消費(fèi)完成的線程的任務(wù)隊(duì)列,為了減少線程競(jìng)爭(zhēng),F(xiàn)ork/Join使用雙端隊(duì)列來(lái)存取小任務(wù),分配給這個(gè)隊(duì)列的線程會(huì)一直從頭取得一個(gè)任務(wù)然后執(zhí)行,而竊取線程總是從隊(duì)列的尾端拉取task。

3. 代碼實(shí)現(xiàn)

我們要使用 ForkJoin 框架,必須首先創(chuàng)建一個(gè) ForkJoin 任務(wù)。它提供在任務(wù)中執(zhí)行 fork 和 join 的操作機(jī)制,通常我們不直接繼承 ForkjoinTask 類,只需要直接繼承其子類。

1、RecursiveAction,用于沒有返回結(jié)果的任務(wù)。
2、RecursiveTask,用于有返回值的任務(wù)。

task 要通過 ForkJoinPool 來(lái)執(zhí)行,使用 invoke、execute、submit提交,兩者的區(qū)別是:invoke 是同步執(zhí)行,調(diào)用之后需要等待任務(wù)完成,才能執(zhí)行后面的代碼;execute、submit 是異步執(zhí)行。

示例1:長(zhǎng)度400萬(wàn)的隨機(jī)數(shù)組求和,使用RecursiveTask 。

/**
 * 隨機(jī)產(chǎn)生ARRAY_LENGTH長(zhǎng)的的隨機(jī)數(shù)組
 */
public class MakeArray {
    // 數(shù)組長(zhǎng)度
    public static final int ARRAY_LENGTH = 4000000;

    public static int[] makeArray() {
        // new一個(gè)隨機(jī)數(shù)發(fā)生器
        Random r = new Random();
        int[] result = new int[ARRAY_LENGTH];
        for (int i = 0; i < ARRAY_LENGTH; i++) {
            // 用隨機(jī)數(shù)填充數(shù)組
            result[i] = r.nextInt(ARRAY_LENGTH * 3);
        }
        return result;
    }
}

public class SumArray {
    private static class SumTask extends RecursiveTask<Integer> {

        // 閾值
        private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;
        private int[] src;
        private int fromIndex;
        private int toIndex;

        public SumTask(int[] src, int fromIndex, int toIndex) {
            this.src = src;
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
        }

        @Override
        protected Integer compute() {
            // 任務(wù)的大小是否合適
            if ((toIndex - fromIndex) < THRESHOLD) {
                System.out.println(" from index = " + fromIndex + " toIndex=" + toIndex);
                int count = 0;
                for (int i = fromIndex; i <= toIndex; i++) {
                    count = count + src[i];
                }
                return count;
            } else {
                // fromIndex....mid.....toIndex
                int mid = (fromIndex + toIndex) / 2;
                SumTask left = new SumTask(src, fromIndex, mid);
                SumTask right = new SumTask(src, mid + 1, toIndex);
                invokeAll(left, right);
                return left.join() + right.join();
            }
        }
    }

    public static void main(String[] args) {

        int[] src = MakeArray.makeArray();
        // new出池的實(shí)例
        ForkJoinPool pool = new ForkJoinPool();
        // new出Task的實(shí)例
        SumTask innerFind = new SumTask(src, 0, src.length - 1);

        long start = System.currentTimeMillis();

        // invoke阻塞方法
        pool.invoke(innerFind);
        System.out.println("Task is Running.....");

        System.out.println("The count is " + innerFind.join()
                + " spend time:" + (System.currentTimeMillis() - start) + "ms");

    }
}

示例2:遍歷指定目錄(含子目錄)下面的txt文件。

public class FindDirsFiles extends RecursiveAction {

    private File path;

    public FindDirsFiles(File path) {
        this.path = path;
    }

    @Override
    protected void compute() {
        List<FindDirsFiles> subTasks = new ArrayList<>();

        File[] files = path.listFiles();
        if (files!=null){
            for (File file : files) {
                if (file.isDirectory()) {
                    // 對(duì)每個(gè)子目錄都新建一個(gè)子任務(wù)。
                    subTasks.add(new FindDirsFiles(file));
                } else {
                    // 遇到文件,檢查。
                    if (file.getAbsolutePath().endsWith("txt")){
                        System.out.println("文件:" + file.getAbsolutePath());
                    }
                }
            }
            if (!subTasks.isEmpty()) {
                // 在當(dāng)前的 ForkJoinPool 上調(diào)度所有的子任務(wù)。
                for (FindDirsFiles subTask : invokeAll(subTasks)) {
                    subTask.join();
                }
            }
        }
    }

    public static void main(String [] args){
        try {
            // 用一個(gè) ForkJoinPool 實(shí)例調(diào)度總?cè)蝿?wù)
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File("F:/"));

            // 異步提交
            pool.execute(task);

            // 主線程做自己的業(yè)務(wù)工作
            System.out.println("Task is Running......");
            Thread.sleep(1);
            int otherWork = 0;
            for(int i=0;i<100;i++){
                otherWork = otherWork+i;
            }
            System.out.println("Main Thread done sth......,otherWork=" + otherWork);
            System.out.println("Task end");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

二、CountDownLatch

閉鎖,CountDownLatch 這個(gè)類能夠使一個(gè)線程等待其他線程完成各自的工作后再執(zhí)行。例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有的框架服務(wù)之后再執(zhí)行。

CountDownLatch 是通過一個(gè)計(jì)數(shù)器來(lái)實(shí)現(xiàn)的,計(jì)數(shù)器的初始值為初始任務(wù)的數(shù)量。每當(dāng)完成了一個(gè)任務(wù)后,計(jì)數(shù)器的值就會(huì)減 1(CountDownLatch.countDown()方法)。當(dāng)計(jì)數(shù)器值到達(dá) 0 時(shí),它表示所有的已經(jīng)完成了任務(wù),然后在閉鎖上等待 CountDownLatch.await()方法的線程就可以恢復(fù)執(zhí)行任務(wù)。

示例代碼:

public class CountDownLatchTest {
    private static CountDownLatch countDownLatch = new CountDownLatch(2);

    private static class BusinessThread extends Thread {
        @Override
        public void run() {
            try {
                System.out.println("BusinessThread " + Thread.currentThread().getName() + " start....");
                Thread.sleep(3000);
                System.out.println("BusinessThread " + Thread.currentThread().getName() + " end.....");
                // 計(jì)數(shù)器減1
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("main start....");
        new BusinessThread().start();
        new BusinessThread().start();
        // 等待countDownLatch計(jì)數(shù)器為零后執(zhí)行后面代碼
        countDownLatch.await();
        System.out.println("main end");
    }

}

注意點(diǎn):

1、CountDownLatch(2)并不代表對(duì)應(yīng)兩個(gè)線程。

2、一個(gè)線程中可以多次countDownLatch.countDown(),比如在一個(gè)線程中countDown兩次或者多次。

三、CyclicBarrier

CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行。

CyclicBarrier 默認(rèn)的構(gòu)造方法是 CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用 await 方法告訴 CyclicBarrier 我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。

CyclicBarrier 還提供一個(gè)更高級(jí)的構(gòu)造函數(shù) CyclicBarrie(r int parties,Runnable barrierAction),用于在線程全部到達(dá)屏障時(shí),優(yōu)先執(zhí)行 barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場(chǎng)景。

示例代碼:

public class CyclicBarrierTest {
    private static CyclicBarrier barrier = new CyclicBarrier(4, new CollectThread());

    /**
     * 存放子線程工作結(jié)果的容器
     */
    private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        for (int i = 0; i < 4; i++) {
            Thread thread = new Thread(new SubThread());
            thread.start();
        }
    }

    /**
     * 匯總的任務(wù)
     */
    private static class CollectThread implements Runnable {

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
                result.append("[" + workResult.getValue() + "]");
            }
            System.out.println(" the result = " + result);
            System.out.println("colletThread end.....");
        }
    }

    /**
     * 相互等待的子線程
     */
    private static class SubThread implements Runnable {

        @Override
        public void run() {
            long id = Thread.currentThread().getId();
            resultMap.put(Thread.currentThread().getId() + "", id);
            try {
                Thread.sleep(1000 + id);
                System.out.println("Thread_" + id + " end1.....");
                barrier.await();
                Thread.sleep(1000 + id);
                System.out.println("Thread_" + id + " end2.....");
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

注意: 一個(gè)線程中可以多次await();

四、Semaphore

Semaphore(信號(hào)量)是用來(lái)控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。應(yīng)用場(chǎng)景 Semaphore 可以用于做流量控制,特別是公用資源有限的應(yīng)用場(chǎng)景,比如數(shù)據(jù)庫(kù)連接池?cái)?shù)量。

方法:常用的前4個(gè)。

方法 描述
acquire() 獲取連接
release() 歸還連接數(shù)
intavailablePermits() 返回此信號(hào)量中當(dāng)前可用的許可證數(shù)
intgetQueueLength() 返回正在等待獲取許可證的線程數(shù)
void reducePermit(s int reduction) 減少 reduction 個(gè)許可證,是個(gè) protected 方法
Collection getQueuedThreads() 返回所有等待獲取許可證的線程集合,是個(gè) protected 方法

示例代碼:模擬數(shù)據(jù)庫(kù)連接池。

/**
 * 數(shù)據(jù)庫(kù)連接
 */
public class SqlConnectImpl implements Connection {

    /**
     * 得到一個(gè)數(shù)據(jù)庫(kù)連接
     */
    public static final Connection fetchConnection(){
        return new SqlConnectImpl();
    }
    
    // 省略其他代碼
}
/**
 * 連接池代碼
 */
public class DBPoolSemaphore {

    private final static int POOL_SIZE = 10;
    // 兩個(gè)指示器,分別表示池子還有可用連接和已用連接
    private final Semaphore useful;
	private final Semaphore useless;
    // 存放數(shù)據(jù)庫(kù)連接的容器
    private static LinkedList<Connection> pool = new LinkedList<Connection>();

    // 初始化池
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
    }

    public DBPoolSemaphore() {
        this.useful = new Semaphore(10);
        this.useless = new Semaphore(0);
    }

    /**
     * 歸還連接
     */
    public void returnConnect(Connection connection) throws InterruptedException {
        if (connection != null) {
            System.out.println("當(dāng)前有" + useful.getQueueLength() + "個(gè)線程等待數(shù)據(jù)庫(kù)連接!!"
                    + "可用連接數(shù):" + useful.availablePermits());
            useless.acquire();
            synchronized (pool) {
                pool.addLast(connection);
            }
            useful.release();
        }
    }

    /**
     * 從池子拿連接
     */
    public Connection takeConnect() throws InterruptedException {
        useful.acquire();
        Connection connection;
        synchronized (pool) {
            connection = pool.removeFirst();
        }
        useless.release();
        return connection;
    }
}
/**
 * 測(cè)試代碼
 */
public class AppTest {

    private static DBPoolSemaphore dbPool = new DBPoolSemaphore();

    private static class BusiThread extends Thread {
        @Override
        public void run() {
            // 讓每個(gè)線程持有連接的時(shí)間不一樣
            Random r = new Random();
            long start = System.currentTimeMillis();
            try {
                Connection connect = dbPool.takeConnect();
                System.out.println("Thread_" + Thread.currentThread().getId()
                        + "_獲取數(shù)據(jù)庫(kù)連接共耗時(shí)【" + (System.currentTimeMillis() - start) + "】ms.");
				//模擬業(yè)務(wù)操作,線程持有連接查詢數(shù)據(jù)
                Thread.sleep(100 + r.nextInt(100));
                System.out.println("查詢數(shù)據(jù)完成,歸還連接!");
                dbPool.returnConnect(connect);
            } catch (InterruptedException e) {
            	e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            Thread thread = new BusiThread();
            thread.start();
        }
    }
}

當(dāng)然,你也可以使用一個(gè) semaphore 來(lái)實(shí)現(xiàn),不過需要注意的是 semaphore 的初始數(shù)量為10并不是固定的,如果你后面歸還連接時(shí) dbPool.returnConnect(new SqlConnectImpl()); 的話,那么他的數(shù)量會(huì)變成 11 。

五、Exchange

Exchanger(交換者)是一個(gè)用于線程間協(xié)作的工具類。Exchanger 用于進(jìn)行線程間的數(shù)據(jù)交換。它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn),兩個(gè)線程可以交換彼此的數(shù)據(jù)。這兩個(gè)線程通過 exchange() 方法交換數(shù)據(jù),如果第一個(gè)線程先執(zhí)行 exchange() 方法,它會(huì)一直等待第二個(gè)線程也執(zhí)行 exchange() 方法,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來(lái)的數(shù)據(jù)傳遞給對(duì)方。

但是這種只能在兩個(gè)線程種傳遞,適用面過于狹窄。

六、Callable、Future、FutureTask

  • Runnable 是一個(gè)接口,在它里面只聲明了一個(gè) run()方法,由于 run()方法返回值為 void 類型,所以在執(zhí)行完任務(wù)之后無(wú)法返回任何結(jié)果。
  • Callable 位于 java.util.concurrent 包下,它也是一個(gè)接口,在它里面也只聲明了一個(gè)方法,只不過這個(gè)方法叫做 call(),這是一個(gè)泛型接口,call()函數(shù)返回的類型就是傳遞進(jìn)來(lái)的 V 類型。
  • Future 就是對(duì)于具體的 Runnable 或者 Callable 任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢是否完成、獲取結(jié)果。必要時(shí)可以通過 get 方法獲取執(zhí)行結(jié)果,該方法會(huì)阻塞直到任務(wù)返回結(jié)果。
  • FutureTask 因?yàn)?Future 只是一個(gè)接口,所以是無(wú)法直接用來(lái)創(chuàng)建對(duì)象使用的,因此就有了 FutureTask 。

關(guān)系圖示:

所以,我們可以通過 FutureTask 把一個(gè) Callable 包裝成 Runnable,然后再通過這個(gè) FutureTask 拿到 Callable 運(yùn)行后的返回值。

示例代碼:

public class FutureTaskTest {

    private static class CallableTest implements Callable<Integer> {
        private int sum = 0;

        @Override
        public Integer call() throws Exception {
            System.out.println("Callable 子線程開始計(jì)算!");
            for (int i = 0; i < 5000; i++) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println("Callable 子線程計(jì)算任務(wù)中斷!");
                    return null;
                }
                sum = sum + i;
                System.out.println("sum=" + sum);
            }
            System.out.println("Callable 子線程計(jì)算結(jié)束!結(jié)果為: " + sum);
            return sum;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CallableTest callableTest = new CallableTest();
        // 包裝
        FutureTask<Integer> futureTask = new FutureTask<>(callableTest);
        new Thread(futureTask).start();

        Random r = new Random();
        if (r.nextInt(100) > 50) {
            // 如果r.nextInt(100) > 50則計(jì)算返回結(jié)果
            System.out.println("sum = " + futureTask.get());
        } else {
            // 如果r.nextInt(100) <= 50則取消計(jì)算
            System.out.println("Cancel...");
            futureTask.cancel(true);
        }
    }
}

都讀到這里了,來(lái)個(gè) 點(diǎn)贊、評(píng)論、關(guān)注、收藏 吧!

文章作者:IT王小二
首發(fā)地址:https://www.itwxe.com/posts/e4f648cd/
版權(quán)聲明:文章內(nèi)容遵循 署名-非商業(yè)性使用-禁止演繹 4.0 國(guó)際 進(jìn)行許可,轉(zhuǎn)載請(qǐng)?jiān)谖恼马?yè)面明顯位置給出作者與原文鏈接。

到此這篇關(guān)于Java線程的并發(fā)工具類實(shí)現(xiàn)原理解析的文章就介紹到這了,更多相關(guān)java線程并發(fā)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot?整合?Quartz?定時(shí)任務(wù)框架詳解

    SpringBoot?整合?Quartz?定時(shí)任務(wù)框架詳解

    這篇文章主要介紹了SpringBoot整合Quartz定時(shí)任務(wù)框架詳解,Quartz是一個(gè)完全由Java編寫的開源作業(yè)調(diào)度框架,為在Java應(yīng)用程序中進(jìn)行作業(yè)調(diào)度提供了簡(jiǎn)單卻強(qiáng)大的機(jī)制
    2022-08-08
  • Springcloud服務(wù)注冊(cè)consul客戶端過程解析

    Springcloud服務(wù)注冊(cè)consul客戶端過程解析

    這篇文章主要介紹了Springcloud服務(wù)注冊(cè)consul客戶端過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-08-08
  • Spring Boot2發(fā)布調(diào)用REST服務(wù)實(shí)現(xiàn)方法

    Spring Boot2發(fā)布調(diào)用REST服務(wù)實(shí)現(xiàn)方法

    這篇文章主要介紹了Spring Boot2發(fā)布調(diào)用REST服務(wù)實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-04-04
  • Java中的ThreadLocal詳解

    Java中的ThreadLocal詳解

    這篇文章主要介紹了Java中的ThreadLocal詳解,ThreadLocal?是一個(gè)線程局部變量,其實(shí)的功用非常簡(jiǎn)單,就是為每一個(gè)使用該變量的線程都提供一個(gè)變量值的副本,是Java中一種較為特殊的線程綁定機(jī)制,需要的朋友可以參考下
    2023-09-09
  • Java實(shí)現(xiàn)猜數(shù)程序

    Java實(shí)現(xiàn)猜數(shù)程序

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)猜數(shù)程序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-12-12
  • java.security.egd?作用詳解

    java.security.egd?作用詳解

    這篇文章主要為大家介紹了java.security.egd作用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • java設(shè)計(jì)模式--三種工廠模式詳解

    java設(shè)計(jì)模式--三種工廠模式詳解

    這篇文章主要為大家詳細(xì)介紹了Java設(shè)計(jì)模式之工廠模式,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能給你帶來(lái)幫助
    2021-07-07
  • 使用Jenkins自動(dòng)化構(gòu)建工具進(jìn)行敏捷開發(fā)

    使用Jenkins自動(dòng)化構(gòu)建工具進(jìn)行敏捷開發(fā)

    這篇文章主要為大家介紹了使用Jenkins自動(dòng)化構(gòu)建工具進(jìn)行敏捷開發(fā),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪
    2022-04-04
  • java堆排序概念原理介紹

    java堆排序概念原理介紹

    在本篇文章里我們給大家分享了關(guān)于java堆排序的概念原理相關(guān)知識(shí)點(diǎn)內(nèi)容,有需要的朋友們可以學(xué)習(xí)下。
    2018-10-10
  • 子線程任務(wù)發(fā)生異常時(shí)主線程事務(wù)回滾示例過程

    子線程任務(wù)發(fā)生異常時(shí)主線程事務(wù)回滾示例過程

    這篇文章主要為大家介紹了子線程任務(wù)發(fā)生了異常時(shí)主線程事務(wù)如何回滾的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步
    2022-03-03

最新評(píng)論