欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java并發(fā)工具Fork/Join原理

 更新時間:2023年06月25日 08:52:39   作者:架構(gòu)狂人  
這篇文章主要為大家介紹了Java并發(fā)工具Fork/Join原理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

正文

我們一直講,并發(fā)編程可以分為三個層面的問題,分別是分工、協(xié)作和互斥,當你關(guān)注于任務(wù)的時候,你會發(fā)現(xiàn)你的視角已經(jīng)從并發(fā)編程的細節(jié)中跳出來了,你應(yīng)用的更多的是現(xiàn)實世界的思維模式,類比的往往是現(xiàn)實世界里的分工,所以我把線程池、Future、CompletableFuture和CompletionService都列到了分工里面。

下面我用現(xiàn)實世界里的工作流程圖描述了并發(fā)編程領(lǐng)域的簡單并行任務(wù)、聚合任務(wù)和批量并行任務(wù),輔以這些流程圖,相信你一定能將你的思維模式轉(zhuǎn)換到現(xiàn)實世界里來。

從上到下,依次為簡單并行任務(wù)、聚合任務(wù)和批量并行任務(wù)示意圖

上面提到的簡單并行、聚合、批量并行這三種任務(wù)模型,基本上能夠覆蓋日常工作中的并發(fā)場景了,但還是不夠全面,因為還有一種“分治”的任務(wù)模型沒有覆蓋到。 分治,顧名思義,即分而治之,是一種解決復雜問題的思維方法和模式;具體來講,指的是 把一個復雜的問題分解成多個相似的子問題,然后再把子問題分解成更小的子問題,直到子問題簡單到可以直接求解。理論上來講,解決每一個問題都對應(yīng)著一個任務(wù),所以對于問題的分治,實際上就是對于任務(wù)的分治。

分治思想在很多領(lǐng)域都有廣泛的應(yīng)用,例如算法領(lǐng)域有分治算法(歸并排序、快速排序都屬于分治算法,二分法查找也是一種分治算法);大數(shù)據(jù)領(lǐng)域知名的計算框架MapReduce背后的思想也是分治。既然分治這種任務(wù)模型如此普遍,那Java顯然也需要支持,Java并發(fā)包里提供了一種叫做Fork/Join的并行計算框架,就是用來支持分治這種任務(wù)模型的。

分治任務(wù)模型

這里你需要先深入了解一下分治任務(wù)模型,分治任務(wù)模型可分為兩個階段:一個階段是 任務(wù)分解,也就是將任務(wù)迭代地分解為子任務(wù),直至子任務(wù)可以直接計算出結(jié)果;另一個階段是 結(jié)果合并,即逐層合并子任務(wù)的執(zhí)行結(jié)果,直至獲得最終結(jié)果。下圖是一個簡化的分治任務(wù)模型圖,你可以對照著理解。

簡版分治任務(wù)模型圖

在這個分治任務(wù)模型里,任務(wù)和分解后的子任務(wù)具有相似性,這種相似性往往體現(xiàn)在任務(wù)和子任務(wù)的算法是相同的,但是計算的數(shù)據(jù)規(guī)模是不同的。具備這種相似性的問題,我們往往都采用遞歸算法。

Fork/Join的使用

Fork/Join是一個并行計算的框架,主要就是用來支持分治任務(wù)模型的,這個計算框架里的 Fork對應(yīng)的是分治任務(wù)模型里的任務(wù)分解,Join對應(yīng)的是結(jié)果合并。Fork/Join計算框架主要包含兩部分,一部分是 分治任務(wù)的線程池ForkJoinPool,另一部分是 分治任務(wù)ForkJoinTask。這兩部分的關(guān)系類似于ThreadPoolExecutor和Runnable的關(guān)系,都可以理解為提交任務(wù)到線程池,只不過分治任務(wù)有自己獨特類型ForkJoinTask。

ForkJoinTask是一個抽象類,它的方法有很多,最核心的是fork()方法和join()方法,其中fork()方法會異步地執(zhí)行一個子任務(wù),而join()方法則會阻塞當前線程來等待子任務(wù)的執(zhí)行結(jié)果。ForkJoinTask有兩個子類——RecursiveAction和RecursiveTask,通過名字你就應(yīng)該能知道,它們都是用遞歸的方式來處理分治任務(wù)的。這兩個子類都定義了抽象方法compute(),不過區(qū)別是RecursiveAction定義的compute()沒有返回值,而RecursiveTask定義的compute()方法是有返回值的。這兩個子類也是抽象類,在使用的時候,需要你定義子類去擴展。

接下來我們就來實現(xiàn)一下,看看如何用Fork/Join這個并行計算框架計算斐波那契數(shù)列(下面的代碼源自Java官方示例)。首先我們需要創(chuàng)建一個分治任務(wù)線程池以及計算斐波那契數(shù)列的分治任務(wù),之后通過調(diào)用分治任務(wù)線程池的 invoke() 方法來啟動分治任務(wù)。由于計算斐波那契數(shù)列需要有返回值,所以Fibonacci 繼承自RecursiveTask。分治任務(wù)Fibonacci 需要實現(xiàn)compute()方法,這個方法里面的邏輯和普通計算斐波那契數(shù)列非常類似,區(qū)別之處在于計算 Fibonacci(n - 1) 使用了異步子任務(wù),這是通過 f1.fork() 這條語句實現(xiàn)的。

static void main(String[] args){
  //創(chuàng)建分治任務(wù)線程池
  ForkJoinPool fjp =
    new ForkJoinPool(4);
  //創(chuàng)建分治任務(wù)
  Fibonacci fib =
    new Fibonacci(30);
  //啟動分治任務(wù)
  Integer result =
    fjp.invoke(fib);
  //輸出結(jié)果
  System.out.println(result);
}
//遞歸任務(wù)
static class Fibonacci extends
    RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
  protected Integer compute(){
    if (n <= 1)
      return n;
    Fibonacci f1 =
      new Fibonacci(n - 1);
    //創(chuàng)建子任務(wù)
    f1.fork();
    Fibonacci f2 =
      new Fibonacci(n - 2);
    //等待子任務(wù)結(jié)果,并合并結(jié)果
    return f2.compute() + f1.join();
  }
}

ForkJoinPool工作原理

Fork/Join并行計算的核心組件是ForkJoinPool,所以下面我們就來簡單介紹一下ForkJoinPool的工作原理。

ThreadPoolExecutor本質(zhì)上是一個生產(chǎn)者-消費者模式的實現(xiàn),內(nèi)部有一個任務(wù)隊列,這個任務(wù)隊列是生產(chǎn)者和消費者通信的媒介;ThreadPoolExecutor可以有多個工作線程,但是這些工作線程都共享一個任務(wù)隊列。

ForkJoinPool本質(zhì)上也是一個生產(chǎn)者-消費者的實現(xiàn),但是更加智能,你可以參考下面的ForkJoinPool工作原理圖來理解其原理。ThreadPoolExecutor內(nèi)部只有一個任務(wù)隊列,而ForkJoinPool內(nèi)部有多個任務(wù)隊列,當我們通過ForkJoinPool的invoke()或者submit()方法提交任務(wù)時,F(xiàn)orkJoinPool根據(jù)一定的路由規(guī)則把任務(wù)提交到一個任務(wù)隊列中,如果任務(wù)在執(zhí)行過程中會創(chuàng)建出子任務(wù),那么子任務(wù)會提交到工作線程對應(yīng)的任務(wù)隊列中。

如果工作線程對應(yīng)的任務(wù)隊列空了,是不是就沒活兒干了呢?不是的,F(xiàn)orkJoinPool支持一種叫做“ 任務(wù)竊取”的機制,如果工作線程空閑了,那它可以“竊取”其他工作任務(wù)隊列里的任務(wù),例如下圖中,線程T2對應(yīng)的任務(wù)隊列已經(jīng)空了,它可以“竊取”線程T1對應(yīng)的任務(wù)隊列的任務(wù)。如此一來,所有的工作線程都不會閑下來了。

ForkJoinPool中的任務(wù)隊列采用的是雙端隊列,工作線程正常獲取任務(wù)和“竊取任務(wù)”分別是從任務(wù)隊列不同的端消費,這樣能避免很多不必要的數(shù)據(jù)競爭。我們這里介紹的僅僅是簡化后的原理,F(xiàn)orkJoinPool的實現(xiàn)遠比我們這里介紹的復雜,如果你感興趣,建議去看它的源碼。

ForkJoinPool工作原理圖

模擬MapReduce統(tǒng)計單詞數(shù)量

學習MapReduce有一個入門程序,統(tǒng)計一個文件里面每個單詞的數(shù)量,下面我們來看看如何用Fork/Join并行計算框架來實現(xiàn)。

我們可以先用二分法遞歸地將一個文件拆分成更小的文件,直到文件里只有一行數(shù)據(jù),然后統(tǒng)計這一行數(shù)據(jù)里單詞的數(shù)量,最后再逐級匯總結(jié)果,你可以對照前面的簡版分治任務(wù)模型圖來理解這個過程。

思路有了,我們馬上來實現(xiàn)。下面的示例程序用一個字符串數(shù)組 String[] fc 來模擬文件內(nèi)容,fc里面的元素與文件里面的行數(shù)據(jù)一一對應(yīng)。關(guān)鍵的代碼在 compute() 這個方法里面,這是一個遞歸方法,前半部分數(shù)據(jù)fork一個遞歸任務(wù)去處理(關(guān)鍵代碼mr1.fork()),后半部分數(shù)據(jù)則在當前任務(wù)中遞歸處理(mr2.compute())。

static void main(String[] args){
  String[] fc = {"hello world",
          "hello me",
          "hello fork",
          "hello join",
          "fork join in world"};
  //創(chuàng)建ForkJoin線程池
  ForkJoinPool fjp =
      new ForkJoinPool(3);
  //創(chuàng)建任務(wù)
  MR mr = new MR(
      fc, 0, fc.length);
  //啟動任務(wù)
  Map<String, Long> result =
      fjp.invoke(mr);
  //輸出結(jié)果
  result.forEach((k, v)->
    System.out.println(k+":"+v));
}
//MR模擬類
static class MR extends
  RecursiveTask<Map<String, Long>> {
  private String[] fc;
  private int start, end;
  //構(gòu)造函數(shù)
  MR(String[] fc, int fr, int to){
    this.fc = fc;
    this.start = fr;
    this.end = to;
  }
  @Override protected
  Map<String, Long> compute(){
    if (end - start == 1) {
      return calc(fc[start]);
    } else {
      int mid = (start+end)/2;
      MR mr1 = new MR(
          fc, start, mid);
      mr1.fork();
      MR mr2 = new MR(
          fc, mid, end);
      //計算子任務(wù),并返回合并的結(jié)果
      return merge(mr2.compute(),
          mr1.join());
    }
  }
  //合并結(jié)果
  private Map<String, Long> merge(
      Map<String, Long> r1,
      Map<String, Long> r2) {
    Map<String, Long> result =
        new HashMap<>();
    result.putAll(r1);
    //合并結(jié)果
    r2.forEach((k, v) -> {
      Long c = result.get(k);
      if (c != null)
        result.put(k, c+v);
      else
        result.put(k, v);
    });
    return result;
  }
  //統(tǒng)計單詞數(shù)量
  private Map<String, Long>
      calc(String line) {
    Map<String, Long> result =
        new HashMap<>();
    //分割單詞
    String [] words =
        line.split("\\s+");
    //統(tǒng)計單詞數(shù)量
    for (String w : words) {
      Long v = result.get(w);
      if (v != null)
        result.put(w, v+1);
      else
        result.put(w, 1L);
    }
    return result;
  }
}

總結(jié)

Fork/Join并行計算框架主要解決的是分治任務(wù)。分治的核心思想是“分而治之”:將一個大的任務(wù)拆分成小的子任務(wù)去解決,然后再把子任務(wù)的結(jié)果聚合起來從而得到最終結(jié)果。這個過程非常類似于大數(shù)據(jù)處理中的MapReduce,所以你可以把Fork/Join看作單機版的MapReduce。

Fork/Join并行計算框架的核心組件是ForkJoinPool。ForkJoinPool支持任務(wù)竊取機制,能夠讓所有線程的工作量基本均衡,不會出現(xiàn)有的線程很忙,而有的線程很閑的狀況,所以性能很好。Java 1.8提供的Stream API里面并行流也是以ForkJoinPool為基礎(chǔ)的。不過需要你注意的是,默認情況下所有的并行流計算都共享一個ForkJoinPool,這個共享的ForkJoinPool默認的線程數(shù)是CPU的核數(shù);如果所有的并行流計算都是CPU密集型計算的話,完全沒有問題,但是如果存在I/O密集型的并行流計算,那么很可能會因為一個很慢的I/O計算而拖慢整個系統(tǒng)的性能。所以 建議用不同的ForkJoinPool執(zhí)行不同類型的計算任務(wù)。

以上就是Java并發(fā)工具Fork/Join原理的詳細內(nèi)容,更多關(guān)于Java并發(fā)工具Fork/Join原理的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論