欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spark調(diào)優(yōu)多線程并行處理任務(wù)實現(xiàn)方式

 更新時間:2020年08月06日 10:21:41   作者:lshan  
這篇文章主要介紹了Spark調(diào)優(yōu)多線程并行處理任務(wù)實現(xiàn)方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下

方式1:

1. 明確 Spark中Job 與 Streaming中 Job 的區(qū)別

1.1 Spark Core

一個 RDD DAG Graph 可以生成一個或多個 Job(Action操作)

一個Job可以認(rèn)為就是會最終輸出一個結(jié)果RDD的一條由RDD組織而成的計算

Job在spark里應(yīng)用里是一個被調(diào)度的單位

1.2 Streaming

一個 batch 的數(shù)據(jù)對應(yīng)一個 DStreamGraph

而一個 DStreamGraph 包含一或多個關(guān)于 DStream 的輸出操作

每一個輸出對應(yīng)于一個Job,一個 DStreamGraph 對應(yīng)一個JobSet,里面包含一個或多個Job

2. Streaming Job的并行度

Job的并行度由兩個配置決定:

spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs

一個 Batch 可能會有多個 Action 執(zhí)行,比如注冊了多個 Kafka 數(shù)據(jù)流,每個Action都會產(chǎn)生一個Job

所以一個 Batch 有可能是一批 Job,也就是 JobSet 的概念

這些 Job 由 jobExecutor 依次提交執(zhí)行

而 JobExecutor 是一個默認(rèn)池子大小為1的線程池,所以只能執(zhí)行完一個Job再執(zhí)行另外一個Job

這里說的池子,大小就是由spark.streaming.concurrentJobs 控制的

concurrentJobs 決定了向 Spark Core 提交Job的并行度

提交一個Job,必須等這個執(zhí)行完了,才會提交第二個

假設(shè)我們把它設(shè)置為2,則會并發(fā)的把 Job 提交給 Spark Core

Spark 有自己的機(jī)制決定如何運行這兩個Job,這個機(jī)制其實就是FIFO或者FAIR(決定了資源的分配規(guī)則)

默認(rèn)是 FIFO,也就是先進(jìn)先出,把 concurrentJobs 設(shè)置為2,但是如果底層是FIFO,那么會優(yōu)先執(zhí)行先提交的Job

雖然如此,如果資源夠兩個job運行,還是會并行運行兩個Job

Spark Streaming 不同Batch任務(wù)可以并行計算么 https://developer.aliyun.com/article/73004

conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行對
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))

你會發(fā)現(xiàn),不同batch的job其實也可以并行運行的,這里需要有幾個條件:

有延時發(fā)生了,batch無法在本batch完成

concurrentJobs > 1

如果scheduler mode 是FIFO則需要某個Job無法一直消耗掉所有資源

Mode是FAIR則盡力保證你的Job是并行運行的,毫無疑問是可以并行的。

方式2:

場景1:

程序每次處理的數(shù)據(jù)量是波動的,比如周末比工作日多很多,晚八點比凌晨四點多很多。

一個spark程序處理的時間在1-2小時波動是OK的。而spark streaming程序不可以,如果每次處理的時間是1-10分鐘,就很蛋疼。
設(shè)置10分鐘吧,實際上10分鐘的也就那一段高峰時間,如果設(shè)置每次是1分鐘,很多時候會出現(xiàn)程序處理不過來,排隊過多的任務(wù)延遲更久,還可能出現(xiàn)程序崩潰的可能。

場景2:

  • 程序需要處理的相似job數(shù)隨著業(yè)務(wù)的增長越來越多
  • 我們知道spark的api里無相互依賴的stage是并行處理的,但是job之間是串行處理的。
  • spark程序通常是離線處理,比如T+1之類的延遲,時間變長是可以容忍的。而spark streaming是準(zhǔn)實時的,如果業(yè)務(wù)增長導(dǎo)致延遲增加就很不合理。

spark雖然是串行執(zhí)行job,但是是可以把job放到線程池里多線程執(zhí)行的。如何在一個SparkContext中提交多個任務(wù)

DStream.foreachRDD{
   rdd =>
    //創(chuàng)建線程池
    val executors=Executors.newFixedThreadPool(rules.length)
    //將規(guī)則放入線程池
    for( ru <- rules){
     val task= executors.submit(new Callable[String] {
      override def call(): String ={
       //執(zhí)行規(guī)則
       runRule(ru,spark)
      }
     })
    }
    //每次創(chuàng)建的線程池執(zhí)行完所有規(guī)則后shutdown
    executors.shutdown()
  }

注意點

1.最后需要executors.shutdown()。

  • 如果是executors.shutdownNow()會發(fā)生未執(zhí)行完的task強(qiáng)制關(guān)閉線程。
  • 如果使用executors.awaitTermination()則會發(fā)生阻塞,不是我們想要的結(jié)果。
  • 如果沒有這個shutdowm操作,程序會正常執(zhí)行,但是長時間會產(chǎn)生大量無用的線程池,因為每次foreachRDD都會創(chuàng)建一個線程池。

2.可不可以將創(chuàng)建線程池放到foreachRDD外面?

不可以,這個關(guān)系到對于scala閉包到理解,經(jīng)測試,第一次或者前幾次batch是正常的,后面的batch無線程可用。

3.線程池executor崩潰了就會導(dǎo)致數(shù)據(jù)丟失

原則上是這樣的,但是正常的代碼一般不會發(fā)生executor崩潰。至少我在使用的時候沒遇到過。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • 解析ConcurrentHashMap: put方法源碼分析

    解析ConcurrentHashMap: put方法源碼分析

    ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成。Segment的結(jié)構(gòu)和HashMap類似,是一種數(shù)組和鏈表結(jié)構(gòu),今天給大家普及java面試常見問題---ConcurrentHashMap知識,一起看看吧
    2021-06-06
  • Spring數(shù)據(jù)源及配置文件數(shù)據(jù)加密實現(xiàn)過程詳解

    Spring數(shù)據(jù)源及配置文件數(shù)據(jù)加密實現(xiàn)過程詳解

    這篇文章主要介紹了Spring數(shù)據(jù)源及配置文件數(shù)據(jù)加密實現(xiàn)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-05-05
  • 淺談多線程中的鎖的幾種用法總結(jié)(必看)

    淺談多線程中的鎖的幾種用法總結(jié)(必看)

    下面小編就為大家?guī)硪黄獪\談多線程中的鎖的幾種用法總結(jié)(必看)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-05-05
  • IDEA加載項目沒有src目錄的問題及解決

    IDEA加載項目沒有src目錄的問題及解決

    這篇文章主要介紹了IDEA加載項目沒有src目錄的問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • Spring?Security認(rèn)證器實現(xiàn)過程詳解

    Spring?Security認(rèn)證器實現(xiàn)過程詳解

    一些權(quán)限框架一般都包含認(rèn)證器和決策器,前者處理登陸驗證,后者處理訪問資源的控制,這篇文章主要介紹了Spring?Security認(rèn)證器實現(xiàn)過程,需要的朋友可以參考下
    2022-06-06
  • 分布式調(diào)度器之Spring Task 的使用詳解

    分布式調(diào)度器之Spring Task 的使用詳解

    SpringTask是Spring框架中用于任務(wù)調(diào)度的組件,通過簡單的注解就能實現(xiàn)定時任務(wù)的創(chuàng)建和調(diào)度,可以通過配置線程池來實現(xiàn),本文給大家介紹分布式調(diào)度器之Spring Task 的使用,感興趣的朋友跟隨小編一起看看吧
    2024-10-10
  • jmap執(zhí)行失敗如何獲取heapdump詳解

    jmap執(zhí)行失敗如何獲取heapdump詳解

    這篇文章主要為大家介紹了jmap執(zhí)行失敗如何獲取heapdump詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-04-04
  • Java代碼實現(xiàn)酒店管理系統(tǒng)

    Java代碼實現(xiàn)酒店管理系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了Java代碼實現(xiàn)酒店管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-05-05
  • 新手初學(xué)Java-Map

    新手初學(xué)Java-Map

    Map簡介:將鍵映射到值的對象。一個映射不能包含重復(fù)的鍵;每個鍵最多只能映射到一個值。此接口取代 Dictionary 類,后者完全是一個抽象類,而不是一個接口
    2021-07-07
  • Spring?Boot指定外部配置文件簡單示例

    Spring?Boot指定外部配置文件簡單示例

    Spring Boot可以讓你將配置外部化,這樣你就可以在不同的環(huán)境中使用相同的應(yīng)用程序代碼,這篇文章主要給大家介紹了關(guān)于Spring?Boot指定外部配置文件的相關(guān)資料,需要的朋友可以參考下
    2024-01-01

最新評論