欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spark內(nèi)存調(diào)優(yōu)指南

 更新時(shí)間:2023年03月07日 16:57:08   作者:朝朝mumu  
這篇文章主要為大家介紹了Spark內(nèi)存調(diào)優(yōu)指南數(shù)據(jù)序列化分析詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

本文是關(guān)于Spark優(yōu)化性能與內(nèi)存使用的最佳實(shí)踐,翻譯整理自Tuning - Spark 3.3.2 Documentation。由于spark內(nèi)存計(jì)算的特性,很多因素都會(huì)影響Spark的表現(xiàn):CPU、網(wǎng)絡(luò)帶寬或者內(nèi)存。一般來說,數(shù)據(jù)可以全部裝入內(nèi)存,則帶寬是瓶頸;有時(shí)你需要進(jìn)行調(diào)優(yōu),主要是兩個(gè)方面:數(shù)據(jù)序列化和內(nèi)存使用。

數(shù)據(jù)序列化

在分布式應(yīng)用中數(shù)據(jù)序列化扮演著至關(guān)重要的角色。序列化對(duì)象的速度很慢,或者消耗大量字節(jié)的格式,會(huì)大大降低計(jì)算速度。通常情況下,這將是你優(yōu)化Spark應(yīng)用時(shí)首先要調(diào)整的東西。Spark的目標(biāo)是在易用性(允許你在操作中使用任何Java類型)和性能之間取得平衡。它提供了兩個(gè)序列化庫:

Java serialization:默認(rèn)是這個(gè),Java序列化很靈活,但往往相當(dāng)慢,而且導(dǎo)致許多類的序列化格式很大。

Kryo serialization:Spark也可以使用Kyro庫更快地序列化對(duì)象。Kryo明顯比Java序列化更快、更緊湊(通常高達(dá)10倍),但不支持所有的Serializable類型,并要求你提前注冊(cè)你將在程序中使用的類以獲得最佳性能。

使用Kryo注冊(cè)并不是想象中十分晦澀難懂的操作,多數(shù)情況僅需一行代碼就行!

可以在 SparkConf 里設(shè)置conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")來初始化Kryo。建議在網(wǎng)絡(luò)密集型應(yīng)用里使用Kyro序列化。從Spark2.0開始,在Shuffle RDD階段的一些簡單類型已經(jīng)自動(dòng)使用了Kyro序列化。

想要注冊(cè)自定義類使用Kyro,只需如下操作:

val conf = new SparkConf().setMaster(...)......
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

如果代碼對(duì)象很大,你需要增大spark.kryoserializer.buffer配置。如果你沒有注冊(cè)你的自定義類,Kryo仍然會(huì)生效,但是它不得不隨對(duì)象存儲(chǔ)全類名,這很浪費(fèi)資源。

內(nèi)存調(diào)優(yōu)

這部分將首先概述Spark的內(nèi)存管理,然后討論用戶可以采取的具體策略,以便在我們的應(yīng)用程序中更有效地利用內(nèi)存。特別是,我們將描述如何確定你的對(duì)象的內(nèi)存使用情況,以及如何通過改變你的數(shù)據(jù)結(jié)構(gòu)改善它,或通過以序列化的格式存儲(chǔ)數(shù)據(jù)。然后,我們將介紹調(diào)整Spark的緩存大小和Java的垃圾收集器。

內(nèi)存管理概述

眾所周知的是Spark的內(nèi)存主要分為2大塊:執(zhí)行與存儲(chǔ)(execution and storage)。執(zhí)行內(nèi)存就是計(jì)算用的,如shuffle/join/sort這些,存儲(chǔ)內(nèi)存則用于緩存和跨集群傳遞數(shù)據(jù)。在Spark中這兩塊內(nèi)存是統(tǒng)一區(qū)域管理的,名為M。當(dāng)沒有執(zhí)行內(nèi)存需求時(shí),存儲(chǔ)內(nèi)存可以獲取全部內(nèi)存,反之亦然。執(zhí)行可以在必要時(shí)驅(qū)逐存儲(chǔ)占用的內(nèi)存空間,直到存儲(chǔ)內(nèi)存占用降低至某一界限R。換言之,R描述了M中的一個(gè)子區(qū)域,其中緩存的塊永遠(yuǎn)不會(huì)被驅(qū)逐。由于執(zhí)行中的復(fù)雜性,存儲(chǔ)可能不會(huì)驅(qū)逐執(zhí)行內(nèi)存。

這種設(shè)計(jì)確保了幾個(gè)理想的特性。首先,不使用緩存的應(yīng)用程序可以使用整個(gè)空間來執(zhí)行,避免了不必要的磁盤溢出。其次,使用緩存的應(yīng)用程序可以保留一個(gè)最小的存儲(chǔ)空間(R),其數(shù)據(jù)塊不會(huì)被驅(qū)逐。最后,這種方法為各種工作負(fù)載提供了合理的開箱即用的性能,而不需要用戶對(duì)內(nèi)存的內(nèi)部劃分有專業(yè)認(rèn)識(shí)。

雖然有兩個(gè)相關(guān)的配置,但一般用戶應(yīng)該不需要調(diào)整,因?yàn)槟J(rèn)值適用于大多數(shù)工作負(fù)載。

spark.memory.fraction將M的大小表示為(JVM堆空間-300MB)的一部分(默認(rèn)為0.6)。其余的空間(40%)被保留給用戶數(shù)據(jù)結(jié)構(gòu)、Spark的內(nèi)部元數(shù)據(jù),以及在記錄稀少和異常大的情況下對(duì)OOM錯(cuò)誤的保護(hù)。

spark.memory.storageFraction表示R占M多大一部分(默認(rèn)為0.5)。R是M中的存儲(chǔ)空間,其中的緩存塊對(duì)執(zhí)行的驅(qū)逐免疫。

確定內(nèi)存消耗

并沒有一個(gè)放之四海而皆準(zhǔn)的公式告訴你RDD占用了多少內(nèi)存,對(duì)一個(gè)具體業(yè)務(wù)需要實(shí)踐出真知。

確定一個(gè)數(shù)據(jù)集所需的內(nèi)存消耗量的最佳方法是創(chuàng)建一個(gè)RDD,將其放入緩存,并查看Web UI中的 "Storage "頁面。該頁面將告訴你該RDD占用了多少內(nèi)存。

要估計(jì)一個(gè)特定對(duì)象的內(nèi)存消耗,可以使用SizeEstimator’s estimate 方法。這對(duì)于試驗(yàn)不同的數(shù)據(jù)布局以修整內(nèi)存使用量,以及確定一個(gè)廣播變量在每個(gè)執(zhí)行器堆上所占用的空間是很有用的。

調(diào)整數(shù)據(jù)結(jié)構(gòu)

減少內(nèi)存消耗的第一個(gè)方法是避免那些增加開銷的Java特性,如基于指針的數(shù)據(jù)結(jié)構(gòu)和包裝對(duì)象。有幾種方法可以做到這一點(diǎn)。

  • 將你的數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)成傾向于對(duì)象的數(shù)組和原始類型,而不是標(biāo)準(zhǔn)的Java或Scala集合類(例如HashMap)fastutil庫為原始類型提供了方便的集合類,與Java標(biāo)準(zhǔn)庫兼容。
  • 盡可能避免使用帶有大量小對(duì)象和指針的嵌套結(jié)構(gòu)。
  • 考慮使用數(shù)字ID或枚舉對(duì)象而不是字符串作為鍵。
  • 如果你的RAM少于32GiB,設(shè)置JVM標(biāo)志-XX:+UseCompressedOops,使指針為四字節(jié)而不是八字節(jié)。你可以在 spark-env.sh 中添加這些選項(xiàng)。

RDD序列化存儲(chǔ)

當(dāng)你的對(duì)象仍然太大,無法有效地存儲(chǔ),盡管有這樣的調(diào)整,減少內(nèi)存使用的一個(gè)更簡單的方法是以序列化的形式存儲(chǔ)它們,使用RDD持久化API中的序列化存儲(chǔ)級(jí)別,如MEMORY_ONLY_SER。然后,Spark將把每個(gè)RDD分區(qū)存儲(chǔ)為一個(gè)大的字節(jié)數(shù)組。以序列化形式存儲(chǔ)數(shù)據(jù)的唯一缺點(diǎn)是訪問時(shí)間較慢,因?yàn)楸仨氃谶\(yùn)行中對(duì)每個(gè)對(duì)象進(jìn)行反序列化。如果你想以序列化的形式緩存數(shù)據(jù),我們強(qiáng)烈建議你使用Kryo,因?yàn)樗鼘?dǎo)致的大小比Java序列化小得多(當(dāng)然也比原始Java對(duì)象小)。

GC的調(diào)整

當(dāng)你的程序所存儲(chǔ)的RDD有很大的 "流失 "時(shí),JVM的垃圾回收可能是一個(gè)問題。(在只讀取一次RDD,然后對(duì)其進(jìn)行許多操作的程序中,這通常不是一個(gè)問題)。當(dāng)Java需要驅(qū)逐舊對(duì)象為新對(duì)象騰出空間時(shí),它需要追蹤你所有的Java對(duì)象并找到未使用的對(duì)象。這里需要記住的要點(diǎn)是,垃圾收集的成本與Java對(duì)象的數(shù)量成正比,所以使用對(duì)象較少的數(shù)據(jù)結(jié)構(gòu)(例如,用Ints數(shù)組代替LinkedList)可以大大降低這一成本。一個(gè)更好的方法是以序列化的形式持久化對(duì)象,如上所述:現(xiàn)在每個(gè)RDD分區(qū)將只有一個(gè)對(duì)象(一個(gè)字節(jié)數(shù)組)。在嘗試其他技術(shù)之前,如果GC是一個(gè)問題,首先要嘗試的是使用序列化的緩存。

由于你的任務(wù)的工作內(nèi)存(運(yùn)行任務(wù)所需的空間量)和你的節(jié)點(diǎn)上緩存的RDD之間的干擾,GC也可能成為一個(gè)問題。我們將討論如何控制分配給RDD緩存的空間以緩解這一問題。

測量GC的影響

GC調(diào)整的第一步是收集關(guān)于垃圾收集發(fā)生頻率和花費(fèi)在GC上的時(shí)間的統(tǒng)計(jì)數(shù)據(jù)。這可以通過在Java選項(xiàng)中添加-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps來實(shí)現(xiàn)。下次運(yùn)行Spark作業(yè)時(shí),你會(huì)看到每次發(fā)生GC時(shí),工作節(jié)點(diǎn)的日志中都會(huì)打印出信息。請(qǐng)注意,這些日志將出現(xiàn)在集群的工作節(jié)點(diǎn)上(在其工作目錄的stdout文件中),而不是在你的驅(qū)動(dòng)程序上。

高級(jí)GC調(diào)優(yōu)

為了進(jìn)一步調(diào)整GC,我們首先需要了解一些關(guān)于JVM中內(nèi)存管理的基本信息。

Java的堆空間被分為兩個(gè)區(qū)域 Young 和 Old。Young代是用來存放臨時(shí)的對(duì)象的,而Old代是用來存放壽命較長的對(duì)象的。

年輕一代又被劃分為三個(gè)區(qū)域[Eden, Survivor1, Survivor2]。

對(duì)GC行為的簡化描述:當(dāng)Eden滿時(shí),在Eden上運(yùn)行一個(gè)小的GC,Eden和Survivor1中活著的對(duì)象被復(fù)制到Survivor2。Survivor區(qū)域被交換。如果一個(gè)對(duì)象足夠老或者Survivor2已經(jīng)滿了,它就會(huì)被移到Old。最后,當(dāng)Old接近滿的時(shí)候,一個(gè)Full GC被調(diào)用。

Spark中GC調(diào)整的目標(biāo)是確保只有長期存在的RDD被存儲(chǔ)在Old一代,而Young一代有足夠的大小來存儲(chǔ)短期對(duì)象。這將有助于避免全面GC來清理任務(wù)執(zhí)行過程中創(chuàng)建的臨時(shí)對(duì)象。一些可能有用的步驟是。

  • 通過收集GC統(tǒng)計(jì)信息,檢查是否有太多的垃圾收集。如果在一個(gè)任務(wù)完成之前多次調(diào)用Full GC,這意味著沒有足夠的內(nèi)存可用于執(zhí)行任務(wù)。
  • 如果有太多的小GC但沒有太多的大GC,為Eden分配更多的內(nèi)存會(huì)有幫助。你可以將Eden的大小設(shè)置為對(duì)每個(gè)任務(wù)所需內(nèi)存的高估值。如果Eden的大小被確定為E,那么你可以使用選項(xiàng)-Xmn=4/3*E來設(shè)置Young generation的大小。(按4/3的比例增加是為了考慮幸存者區(qū)域所使用的空間)。
  • 在打印的GC統(tǒng)計(jì)中,如果OldGen接近滿了,通過降低spark.memory.fraction來減少用于緩存的內(nèi)存量;緩存更少的對(duì)象比減緩任務(wù)的執(zhí)行要好。另外,也可以考慮減少Young代的大小。這意味著降低-Xmn,如果你已經(jīng)如上設(shè)置。如果沒有,可以嘗試改變JVM的NewRatio參數(shù)的值。許多JVM將其默認(rèn)為2,這意味著老一代占據(jù)了2/3的堆。它應(yīng)該足夠大,以至于這個(gè)分?jǐn)?shù)超過了spark.memory.fraction。
  • 嘗試設(shè)置-XX:+UseG1GC來使用G1GC垃圾收集器。在垃圾收集是一個(gè)瓶頸的情況下,它可以提高性能。注意,對(duì)于大的執(zhí)行器堆大小,用-XX:G1HeapRegionSize增加G1區(qū)域大小可能很重要。
  • 舉個(gè)例子,如果你的任務(wù)是從HDFS讀取數(shù)據(jù),任務(wù)使用的內(nèi)存量可以用從HDFS讀取的數(shù)據(jù)塊的大小來估計(jì)。請(qǐng)注意,解壓后的塊的大小往往是塊的2或3倍。因此,如果我們希望有3或4個(gè)任務(wù)的工作空間,而HDFS塊的大小是128MiB,我們可以估計(jì)Eden的大小是43128MiB。
  • 監(jiān)控垃圾收集的頻率和時(shí)間在新的設(shè)置下如何變化。

我們的經(jīng)驗(yàn)表明,GC調(diào)整的效果取決于你的應(yīng)用程序和可用的內(nèi)存量。網(wǎng)上還描述了許多調(diào)優(yōu)選項(xiàng),但在高層次上,管理完全GC發(fā)生的頻率可以幫助減少開銷。

可以通過在作業(yè)的配置中設(shè)置 spark.executor.defaultJavaOptions 或 spark.executor.extraJavaOptions 來指定執(zhí)行器的 GC 調(diào)整設(shè)置。

其他考慮因素

并行度水平

除非你把每個(gè)操作的并行度設(shè)置得足夠高,否則集群不會(huì)得到充分的利用。Spark會(huì)根據(jù)文件的大小自動(dòng)設(shè)置在每個(gè)文件上運(yùn)行的 map 任務(wù)的數(shù)量(當(dāng)然你可以通過SparkContext.textFile等的可選參數(shù)來控制),而對(duì)于分布式的 "reduce "操作,比如groupByKeyreduceByKey,它會(huì)使用最大的父RDD的分區(qū)數(shù)量。你可以把并行程度作為第二個(gè)參數(shù)傳遞(見spark.PairRDDFunctions文檔),或者設(shè)置配置屬性spark.default.parallelism來改變默認(rèn)值。一般來說,我們建議在你的集群中每個(gè)CPU核有2-3個(gè)任務(wù)。

輸入路徑上的并行Listing

有時(shí),當(dāng)作業(yè)輸入有大量的目錄時(shí),你可能還需要增加目錄列表的并行性,否則這個(gè)過程可能會(huì)花費(fèi)很長的時(shí)間,特別是在針對(duì)S3這樣的對(duì)象存儲(chǔ)時(shí)。如果你的作業(yè)在具有Hadoop輸入格式的RDD上工作(例如,通過SparkContext.sequenceFile),則通過spark.hadoop.mapreduce.input.fileinputformat.list-status.num-reads(目前默認(rèn)為1)控制并行性。

對(duì)于具有基于文件的數(shù)據(jù)源的Spark SQL,你可以調(diào)整spark.sql.sources.parallelPartitionDiscovery.threshold和spark.sql.sources.parallelPartitionDiscovery.parallelism,以提高列舉并行性。更多細(xì)節(jié)請(qǐng)參考Spark SQL性能調(diào)優(yōu)指南。

Reduce任務(wù)的內(nèi)存使用情況

有時(shí),你會(huì)得到OutOfMemoryError,不是因?yàn)槟愕腞DDs不適合在內(nèi)存中,而是因?yàn)槟愕哪硞€(gè)任務(wù)的工作集,比如groupByKey中的一個(gè)Reduce任務(wù)太大。Spark的shuffle操作(sortByKey、groupByKey、reduceByKey、join等)在每個(gè)任務(wù)中建立一個(gè)哈希表來執(zhí)行分組,而這個(gè)哈希表往往會(huì)很大。這里最簡單的解決方法是提高并行化水平,使每個(gè)任務(wù)的輸入集更小。Spark可以有效地支持短至200毫秒的任務(wù),因?yàn)樗谠S多任務(wù)中重復(fù)使用一個(gè)執(zhí)行器JVM,而且它的任務(wù)啟動(dòng)成本很低,所以你可以安全地將并行化水平提高到超過集群中的核心數(shù)量。

廣播大型變量

使用SparkContext中的廣播功能可以大大減少每個(gè)序列化任務(wù)的大小,以及在集群中啟動(dòng)作業(yè)的成本。如果你的任務(wù)中使用了驅(qū)動(dòng)程序中的任何大型對(duì)象(例如靜態(tài)查詢表),可以考慮將其變成一個(gè)廣播變量。Spark在主程序上打印每個(gè)任務(wù)的序列化大小,所以你可以看一下,以決定你的任務(wù)是否太大;一般來說,大于20KiB的任務(wù)可能值得優(yōu)化。

數(shù)據(jù)位置

數(shù)據(jù)位置可以對(duì)Spark作業(yè)的性能產(chǎn)生重大影響。如果數(shù)據(jù)和對(duì)其進(jìn)行操作的代碼在一起,那么計(jì)算往往會(huì)很快。但如果代碼和數(shù)據(jù)是分開的,一個(gè)必須移動(dòng)到另一個(gè)。通常情況下,將序列化的代碼從一個(gè)地方運(yùn)送到另一個(gè)地方要比運(yùn)送一大塊數(shù)據(jù)快,因?yàn)榇a的大小比數(shù)據(jù)小得多。Spark圍繞這個(gè)數(shù)據(jù)定位的一般原則建立了它的調(diào)度。

數(shù)據(jù)定位是指數(shù)據(jù)離處理它的代碼有多近。根據(jù)數(shù)據(jù)的當(dāng)前位置,有幾個(gè)級(jí)別的定位。按照從最近到最遠(yuǎn)的順序:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY。

Spark通常的做法是等待一下,希望有一個(gè)繁忙的CPU騰出手來。一旦超時(shí),它就開始把數(shù)據(jù)從遠(yuǎn)處移到空閑的CPU上。每個(gè)級(jí)別之間的回退等待超時(shí)可以單獨(dú)配置,也可以在一個(gè)參數(shù)中全部配置;詳見spark.locality參數(shù)。如果你的任務(wù)很長,看到的定位性很差,你應(yīng)該增加這些設(shè)置,但默認(rèn)值通常很好用。

小結(jié)

這份簡短指南指出了你在調(diào)整Spark應(yīng)用程序時(shí)應(yīng)該知道的主要問題——最重要的是數(shù)據(jù)序列化和內(nèi)存調(diào)整。對(duì)于大多數(shù)程序來說,切換到Kryo序列化并以序列化的形式持久化數(shù)據(jù)將解決大多數(shù)常見的性能問題。

相關(guān)文章

最新評(píng)論