Spark自定義累加器的使用實(shí)例詳解
累加器(accumulator)是Spark中提供的一種分布式的變量機(jī)制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個(gè)常見用途是在調(diào)試時(shí)對作業(yè)執(zhí)行過程中的事件進(jìn)行計(jì)數(shù)。
累加器簡單使用
Spark內(nèi)置的提供了Long和Double類型的累加器。下面是一個(gè)簡單的使用示例,在這個(gè)例子中我們在過濾掉RDD中奇數(shù)的同時(shí)進(jìn)行計(jì)數(shù),最后計(jì)算剩下整數(shù)的和。
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]") val sc = new SparkContext(sparkConf) val accum = sc.longAccumulator("longAccum") //統(tǒng)計(jì)奇數(shù)的個(gè)數(shù) val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{ if(n%2!=0) accum.add(1L) n%2==0 }).reduce(_+_) println("sum: "+sum) println("accum: "+accum.value) sc.stop()
結(jié)果為:
sum: 20
accum: 5
這是結(jié)果正常的情況,但是在使用累加器的過程中如果對于spark的執(zhí)行過程理解的不夠深入就會遇到兩類典型的錯(cuò)誤:少加(或者沒加)、多加。
自定義累加器
自定義累加器類型的功能在1.X版本中就已經(jīng)提供了,但是使用起來比較麻煩,在2.0版本后,累加器的易用性有了較大的改進(jìn),而且官方還提供了一個(gè)新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實(shí)現(xiàn)方式。官方同時(shí)給出了一個(gè)實(shí)現(xiàn)的示例:CollectionAccumulator類,這個(gè)類允許以集合的形式收集spark應(yīng)用執(zhí)行過程中的一些信息。例如,我們可以用這個(gè)類收集Spark處理數(shù)據(jù)時(shí)的一些細(xì)節(jié),當(dāng)然,由于累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問題,需要對收集的信息的規(guī)模要加以控制,不宜過大。
繼承AccumulatorV2類,并復(fù)寫它的所有方法
package spark import constant.Constant import org.apache.spark.util.AccumulatorV2 import util.getFieldFromConcatString import util.setFieldFromConcatString open class SessionAccmulator : AccumulatorV2<String, String>() { private var result = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" override fun value(): String { return this.result } /** * 合并數(shù)據(jù) */ override fun merge(other: AccumulatorV2<String, String>?) { if (other == null) return else { if (other is SessionAccmulator) { var newResult = "" val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s, Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m, Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m, Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9, Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60) resultArray.forEach { val oldValue = other.result.getFieldFromConcatString("|", it) if (oldValue.isNotEmpty()) { val newValue = oldValue.toInt() + 1 //找到原因,一直在循環(huán)賦予值,debug30分鐘 很煩 if (newResult.isEmpty()){ newResult = result.setFieldFromConcatString("|", it, newValue.toString()) } //問題就在于這里,自定義沒有寫錯(cuò),合并錯(cuò)了 newResult = newResult.setFieldFromConcatString("|", it, newValue.toString()) } } result = newResult } } } override fun copy(): AccumulatorV2<String, String> { val sessionAccmulator = SessionAccmulator() sessionAccmulator.result = this.result return sessionAccmulator } override fun add(p0: String?) { val v1 = this.result val v2 = p0 if (v2.isNullOrEmpty()){ return }else{ var newResult = "" val oldValue = v1.getFieldFromConcatString("|", v2!!) if (oldValue.isNotEmpty()){ val newValue = oldValue.toInt() + 1 newResult = result.setFieldFromConcatString("|", v2, newValue.toString()) } result = newResult } } override fun reset() { val newResult = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" result = newResult } override fun isZero(): Boolean { val newResult = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" return this.result == newResult } }
方法介紹
value方法:獲取累加器中的值
merge方法:該方法特別重要,一定要寫對,這個(gè)方法是各個(gè)task的累加器進(jìn)行合并的方法(下面介紹執(zhí)行流程中將要用到)
iszero方法:判斷是否為初始值
reset方法:重置累加器中的值
copy方法:拷貝累加器
spark中累加器的執(zhí)行流程:
首先有幾個(gè)task,spark engine就調(diào)用copy方法拷貝幾個(gè)累加器(不注冊的),然后在各個(gè)task中進(jìn)行累加(注意在此過程中,被最初注冊的累加器的值是不變的),執(zhí)行最后將調(diào)用merge方法和各個(gè)task的結(jié)果累計(jì)器進(jìn)行合并(此時(shí)被注冊的累加器是初始值)
總結(jié)
以上就是本文關(guān)于Spark自定義累加器的使用實(shí)例詳解的全部內(nèi)容,希望對大家有所幫助。有什么問題可以隨時(shí)留言,小編會及時(shí)回復(fù)大家的。
相關(guān)文章
rsync同步時(shí)出現(xiàn)rsync: failed to set times on “xxxx”: Operation no
今天在同步數(shù)據(jù)的時(shí)候提示rsync: failed to set times on “xxxx”: Operation not permitted,一般來說要不是服務(wù)器時(shí)間不對或者權(quán)限沒有設(shè)置好2016-12-12用phpMyadmin創(chuàng)建Mysql數(shù)據(jù)庫及獨(dú)立數(shù)據(jù)庫帳號的圖文教程
在一個(gè)服務(wù)器上一般來講都不止一個(gè)站點(diǎn),更不止一個(gè)MySQL(和PHP搭配之最佳組合)數(shù)據(jù)庫。2010-03-03TeamCenter12登陸報(bào)404/503問題解決方案
這篇文章主要介紹了TeamCenter12登陸報(bào)404/503問題解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10公網(wǎng)使用SSH遠(yuǎn)程登錄macOS服務(wù)器的過程(內(nèi)網(wǎng)穿透)
這篇文章主要介紹了公網(wǎng)使用SSH遠(yuǎn)程登錄macOS服務(wù)器【內(nèi)網(wǎng)穿透】,本次教程,我們將使用cpolar內(nèi)網(wǎng)穿透工具,映射ssh服務(wù)默認(rèn)端口:22端口,獲取公網(wǎng)地址,實(shí)現(xiàn)在公網(wǎng)環(huán)境下的ssh遠(yuǎn)程登錄,無需公網(wǎng)IP,也無需設(shè)置路由器,需要的朋友可以參考下2023-04-04VScode連接遠(yuǎn)程服務(wù)器踩坑實(shí)戰(zhàn)記錄(新版離線vscode-server安裝)
本文主要介紹了如何使用VScode連接遠(yuǎn)程服務(wù)器,并對離線安裝vscode-server進(jìn)行了詳細(xì)的操作步驟說明,其中包括VScode擴(kuò)展的安裝與配置,vscode-server的離線下載,文件的解壓縮和移動,以及VScode的一些更新設(shè)置,能夠幫助讀者更好地理解和掌握VScode連接遠(yuǎn)程服務(wù)器的方法2024-10-10