Spark調(diào)優(yōu)多線程并行處理任務(wù)實現(xiàn)方式
方式1:
1. 明確 Spark中Job 與 Streaming中 Job 的區(qū)別
1.1 Spark Core
一個 RDD DAG Graph 可以生成一個或多個 Job(Action操作)
一個Job可以認(rèn)為就是會最終輸出一個結(jié)果RDD的一條由RDD組織而成的計算
Job在spark里應(yīng)用里是一個被調(diào)度的單位
1.2 Streaming
一個 batch 的數(shù)據(jù)對應(yīng)一個 DStreamGraph
而一個 DStreamGraph 包含一或多個關(guān)于 DStream 的輸出操作
每一個輸出對應(yīng)于一個Job,一個 DStreamGraph 對應(yīng)一個JobSet,里面包含一個或多個Job
2. Streaming Job的并行度
Job的并行度由兩個配置決定:
spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs
一個 Batch 可能會有多個 Action 執(zhí)行,比如注冊了多個 Kafka 數(shù)據(jù)流,每個Action都會產(chǎn)生一個Job
所以一個 Batch 有可能是一批 Job,也就是 JobSet 的概念
這些 Job 由 jobExecutor 依次提交執(zhí)行
而 JobExecutor 是一個默認(rèn)池子大小為1的線程池,所以只能執(zhí)行完一個Job再執(zhí)行另外一個Job
這里說的池子,大小就是由spark.streaming.concurrentJobs 控制的
concurrentJobs 決定了向 Spark Core 提交Job的并行度
提交一個Job,必須等這個執(zhí)行完了,才會提交第二個
假設(shè)我們把它設(shè)置為2,則會并發(fā)的把 Job 提交給 Spark Core
Spark 有自己的機(jī)制決定如何運行這兩個Job,這個機(jī)制其實就是FIFO或者FAIR(決定了資源的分配規(guī)則)
默認(rèn)是 FIFO,也就是先進(jìn)先出,把 concurrentJobs 設(shè)置為2,但是如果底層是FIFO,那么會優(yōu)先執(zhí)行先提交的Job
雖然如此,如果資源夠兩個job運行,還是會并行運行兩個Job
Spark Streaming 不同Batch任務(wù)可以并行計算么 https://developer.aliyun.com/article/73004
conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行對
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))
你會發(fā)現(xiàn),不同batch的job其實也可以并行運行的,這里需要有幾個條件:
有延時發(fā)生了,batch無法在本batch完成
concurrentJobs > 1
如果scheduler mode 是FIFO則需要某個Job無法一直消耗掉所有資源
Mode是FAIR則盡力保證你的Job是并行運行的,毫無疑問是可以并行的。
方式2:
場景1:
程序每次處理的數(shù)據(jù)量是波動的,比如周末比工作日多很多,晚八點比凌晨四點多很多。
一個spark程序處理的時間在1-2小時波動是OK的。而spark streaming程序不可以,如果每次處理的時間是1-10分鐘,就很蛋疼。
設(shè)置10分鐘吧,實際上10分鐘的也就那一段高峰時間,如果設(shè)置每次是1分鐘,很多時候會出現(xiàn)程序處理不過來,排隊過多的任務(wù)延遲更久,還可能出現(xiàn)程序崩潰的可能。
場景2:
- 程序需要處理的相似job數(shù)隨著業(yè)務(wù)的增長越來越多
- 我們知道spark的api里無相互依賴的stage是并行處理的,但是job之間是串行處理的。
- spark程序通常是離線處理,比如T+1之類的延遲,時間變長是可以容忍的。而spark streaming是準(zhǔn)實時的,如果業(yè)務(wù)增長導(dǎo)致延遲增加就很不合理。
spark雖然是串行執(zhí)行job,但是是可以把job放到線程池里多線程執(zhí)行的。如何在一個SparkContext中提交多個任務(wù)
DStream.foreachRDD{
rdd =>
//創(chuàng)建線程池
val executors=Executors.newFixedThreadPool(rules.length)
//將規(guī)則放入線程池
for( ru <- rules){
val task= executors.submit(new Callable[String] {
override def call(): String ={
//執(zhí)行規(guī)則
runRule(ru,spark)
}
})
}
//每次創(chuàng)建的線程池執(zhí)行完所有規(guī)則后shutdown
executors.shutdown()
}
注意點
1.最后需要executors.shutdown()。
- 如果是executors.shutdownNow()會發(fā)生未執(zhí)行完的task強(qiáng)制關(guān)閉線程。
- 如果使用executors.awaitTermination()則會發(fā)生阻塞,不是我們想要的結(jié)果。
- 如果沒有這個shutdowm操作,程序會正常執(zhí)行,但是長時間會產(chǎn)生大量無用的線程池,因為每次foreachRDD都會創(chuàng)建一個線程池。
2.可不可以將創(chuàng)建線程池放到foreachRDD外面?
不可以,這個關(guān)系到對于scala閉包到理解,經(jīng)測試,第一次或者前幾次batch是正常的,后面的batch無線程可用。
3.線程池executor崩潰了就會導(dǎo)致數(shù)據(jù)丟失
原則上是這樣的,但是正常的代碼一般不會發(fā)生executor崩潰。至少我在使用的時候沒遇到過。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
解析ConcurrentHashMap: put方法源碼分析
ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成。Segment的結(jié)構(gòu)和HashMap類似,是一種數(shù)組和鏈表結(jié)構(gòu),今天給大家普及java面試常見問題---ConcurrentHashMap知識,一起看看吧2021-06-06
Spring數(shù)據(jù)源及配置文件數(shù)據(jù)加密實現(xiàn)過程詳解
這篇文章主要介紹了Spring數(shù)據(jù)源及配置文件數(shù)據(jù)加密實現(xiàn)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-05-05
Spring?Security認(rèn)證器實現(xiàn)過程詳解
一些權(quán)限框架一般都包含認(rèn)證器和決策器,前者處理登陸驗證,后者處理訪問資源的控制,這篇文章主要介紹了Spring?Security認(rèn)證器實現(xiàn)過程,需要的朋友可以參考下2022-06-06
分布式調(diào)度器之Spring Task 的使用詳解
SpringTask是Spring框架中用于任務(wù)調(diào)度的組件,通過簡單的注解就能實現(xiàn)定時任務(wù)的創(chuàng)建和調(diào)度,可以通過配置線程池來實現(xiàn),本文給大家介紹分布式調(diào)度器之Spring Task 的使用,感興趣的朋友跟隨小編一起看看吧2024-10-10

