Spark調(diào)優(yōu)多線程并行處理任務(wù)實(shí)現(xiàn)方式
方式1:
1. 明確 Spark中Job 與 Streaming中 Job 的區(qū)別
1.1 Spark Core
一個(gè) RDD DAG Graph 可以生成一個(gè)或多個(gè) Job(Action操作)
一個(gè)Job可以認(rèn)為就是會(huì)最終輸出一個(gè)結(jié)果RDD的一條由RDD組織而成的計(jì)算
Job在spark里應(yīng)用里是一個(gè)被調(diào)度的單位
1.2 Streaming
一個(gè) batch 的數(shù)據(jù)對(duì)應(yīng)一個(gè) DStreamGraph
而一個(gè) DStreamGraph 包含一或多個(gè)關(guān)于 DStream 的輸出操作
每一個(gè)輸出對(duì)應(yīng)于一個(gè)Job,一個(gè) DStreamGraph 對(duì)應(yīng)一個(gè)JobSet,里面包含一個(gè)或多個(gè)Job
2. Streaming Job的并行度
Job的并行度由兩個(gè)配置決定:
spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs
一個(gè) Batch 可能會(huì)有多個(gè) Action 執(zhí)行,比如注冊(cè)了多個(gè) Kafka 數(shù)據(jù)流,每個(gè)Action都會(huì)產(chǎn)生一個(gè)Job
所以一個(gè) Batch 有可能是一批 Job,也就是 JobSet 的概念
這些 Job 由 jobExecutor 依次提交執(zhí)行
而 JobExecutor 是一個(gè)默認(rèn)池子大小為1的線程池,所以只能執(zhí)行完一個(gè)Job再執(zhí)行另外一個(gè)Job
這里說(shuō)的池子,大小就是由spark.streaming.concurrentJobs 控制的
concurrentJobs 決定了向 Spark Core 提交Job的并行度
提交一個(gè)Job,必須等這個(gè)執(zhí)行完了,才會(huì)提交第二個(gè)
假設(shè)我們把它設(shè)置為2,則會(huì)并發(fā)的把 Job 提交給 Spark Core
Spark 有自己的機(jī)制決定如何運(yùn)行這兩個(gè)Job,這個(gè)機(jī)制其實(shí)就是FIFO或者FAIR(決定了資源的分配規(guī)則)
默認(rèn)是 FIFO,也就是先進(jìn)先出,把 concurrentJobs 設(shè)置為2,但是如果底層是FIFO,那么會(huì)優(yōu)先執(zhí)行先提交的Job
雖然如此,如果資源夠兩個(gè)job運(yùn)行,還是會(huì)并行運(yùn)行兩個(gè)Job
Spark Streaming 不同Batch任務(wù)可以并行計(jì)算么 https://developer.aliyun.com/article/73004
conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行對(duì)
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))
你會(huì)發(fā)現(xiàn),不同batch的job其實(shí)也可以并行運(yùn)行的,這里需要有幾個(gè)條件:
有延時(shí)發(fā)生了,batch無(wú)法在本batch完成
concurrentJobs > 1
如果scheduler mode 是FIFO則需要某個(gè)Job無(wú)法一直消耗掉所有資源
Mode是FAIR則盡力保證你的Job是并行運(yùn)行的,毫無(wú)疑問(wèn)是可以并行的。
方式2:
場(chǎng)景1:
程序每次處理的數(shù)據(jù)量是波動(dòng)的,比如周末比工作日多很多,晚八點(diǎn)比凌晨四點(diǎn)多很多。
一個(gè)spark程序處理的時(shí)間在1-2小時(shí)波動(dòng)是OK的。而spark streaming程序不可以,如果每次處理的時(shí)間是1-10分鐘,就很蛋疼。
設(shè)置10分鐘吧,實(shí)際上10分鐘的也就那一段高峰時(shí)間,如果設(shè)置每次是1分鐘,很多時(shí)候會(huì)出現(xiàn)程序處理不過(guò)來(lái),排隊(duì)過(guò)多的任務(wù)延遲更久,還可能出現(xiàn)程序崩潰的可能。
場(chǎng)景2:
- 程序需要處理的相似job數(shù)隨著業(yè)務(wù)的增長(zhǎng)越來(lái)越多
- 我們知道spark的api里無(wú)相互依賴的stage是并行處理的,但是job之間是串行處理的。
- spark程序通常是離線處理,比如T+1之類的延遲,時(shí)間變長(zhǎng)是可以容忍的。而spark streaming是準(zhǔn)實(shí)時(shí)的,如果業(yè)務(wù)增長(zhǎng)導(dǎo)致延遲增加就很不合理。
spark雖然是串行執(zhí)行job,但是是可以把job放到線程池里多線程執(zhí)行的。如何在一個(gè)SparkContext中提交多個(gè)任務(wù)
DStream.foreachRDD{ rdd => //創(chuàng)建線程池 val executors=Executors.newFixedThreadPool(rules.length) //將規(guī)則放入線程池 for( ru <- rules){ val task= executors.submit(new Callable[String] { override def call(): String ={ //執(zhí)行規(guī)則 runRule(ru,spark) } }) } //每次創(chuàng)建的線程池執(zhí)行完所有規(guī)則后shutdown executors.shutdown() }
注意點(diǎn)
1.最后需要executors.shutdown()。
- 如果是executors.shutdownNow()會(huì)發(fā)生未執(zhí)行完的task強(qiáng)制關(guān)閉線程。
- 如果使用executors.awaitTermination()則會(huì)發(fā)生阻塞,不是我們想要的結(jié)果。
- 如果沒(méi)有這個(gè)shutdowm操作,程序會(huì)正常執(zhí)行,但是長(zhǎng)時(shí)間會(huì)產(chǎn)生大量無(wú)用的線程池,因?yàn)槊看蝔oreachRDD都會(huì)創(chuàng)建一個(gè)線程池。
2.可不可以將創(chuàng)建線程池放到foreachRDD外面?
不可以,這個(gè)關(guān)系到對(duì)于scala閉包到理解,經(jīng)測(cè)試,第一次或者前幾次batch是正常的,后面的batch無(wú)線程可用。
3.線程池executor崩潰了就會(huì)導(dǎo)致數(shù)據(jù)丟失
原則上是這樣的,但是正常的代碼一般不會(huì)發(fā)生executor崩潰。至少我在使用的時(shí)候沒(méi)遇到過(guò)。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
解析ConcurrentHashMap: put方法源碼分析
ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成。Segment的結(jié)構(gòu)和HashMap類似,是一種數(shù)組和鏈表結(jié)構(gòu),今天給大家普及java面試常見問(wèn)題---ConcurrentHashMap知識(shí),一起看看吧2021-06-06Spring數(shù)據(jù)源及配置文件數(shù)據(jù)加密實(shí)現(xiàn)過(guò)程詳解
這篇文章主要介紹了Spring數(shù)據(jù)源及配置文件數(shù)據(jù)加密實(shí)現(xiàn)過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05IDEA加載項(xiàng)目沒(méi)有src目錄的問(wèn)題及解決
這篇文章主要介紹了IDEA加載項(xiàng)目沒(méi)有src目錄的問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12Spring?Security認(rèn)證器實(shí)現(xiàn)過(guò)程詳解
一些權(quán)限框架一般都包含認(rèn)證器和決策器,前者處理登陸驗(yàn)證,后者處理訪問(wèn)資源的控制,這篇文章主要介紹了Spring?Security認(rèn)證器實(shí)現(xiàn)過(guò)程,需要的朋友可以參考下2022-06-06分布式調(diào)度器之Spring Task 的使用詳解
SpringTask是Spring框架中用于任務(wù)調(diào)度的組件,通過(guò)簡(jiǎn)單的注解就能實(shí)現(xiàn)定時(shí)任務(wù)的創(chuàng)建和調(diào)度,可以通過(guò)配置線程池來(lái)實(shí)現(xiàn),本文給大家介紹分布式調(diào)度器之Spring Task 的使用,感興趣的朋友跟隨小編一起看看吧2024-10-10Java代碼實(shí)現(xiàn)酒店管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java代碼實(shí)現(xiàn)酒店管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05