Spark處理trick總結(jié)分析
前言
最近做了很多數(shù)據(jù)清洗以及摸底的工作,由于處理的數(shù)據(jù)很大,所以采用了spark進(jìn)行輔助處理,期間遇到了很多問(wèn)題,特此記錄一下,供大家學(xué)習(xí)。
由于比較熟悉python, 所以筆者采用的是pyspark,所以下面給的demo都是基于pyspark,其實(shí)其他語(yǔ)言腳本一樣,重在學(xué)習(xí)思想,具體實(shí)現(xiàn)改改對(duì)應(yīng)的API即可。
這里盡可能的把一些坑以及實(shí)現(xiàn)技巧以demo的形式直白的提供出來(lái),順序不分先后。有了這些demo,大家在實(shí)現(xiàn)自己各種各樣需求尤其是一些有難度需求的時(shí)候,就可以參考了,當(dāng)然了有時(shí)間筆者后續(xù)還會(huì)更新一些demo,感興趣的同學(xué)可以關(guān)注下。
trick
首先說(shuō)一個(gè)最基本思想:能map絕不reduce。
換句話說(shuō)當(dāng)在實(shí)現(xiàn)某一需求時(shí),要盡可能得用map類(lèi)的算子,這是相當(dāng)快的。但是聚合類(lèi)的算子通常來(lái)說(shuō)是相對(duì)較慢,如果我們最后不得不用聚合類(lèi)算子的時(shí)候,我們也要把這一步邏輯看看能不能盡可能的往后放,而把一些諸如過(guò)濾什么的邏輯往前放,這樣最后的數(shù)據(jù)量就會(huì)越來(lái)越少,再進(jìn)行聚合的時(shí)候就會(huì)快很多。如果反過(guò)來(lái),那就得不償失了,雖然最后實(shí)現(xiàn)的效果是一樣的,但是時(shí)間差卻是數(shù)量級(jí)的。
- 常用API
這里列一下我們最常用的算子
rdd = rdd.filter(lambda x: fun(x)) rdd = rdd.map(lambda x: fun(x)) rdd = rdd.flatMap(lambda x: fun(x)) rdd = rdd.reduceByKey(lambda a, b: a + b)
filter: 過(guò)濾,滿(mǎn)足條件的返回True, 需要過(guò)濾的返回False。
map: 每條樣本做一些共同的操作。
flatMap: 一條拆分成多條返回,具體的是list。
reduceByKey: 根據(jù)key進(jìn)行聚合。
- 聚合
一個(gè)最常見(jiàn)的場(chǎng)景就是需要對(duì)某一個(gè)字段進(jìn)行聚合:假設(shè)現(xiàn)在我們有一份流水表,其每一行數(shù)據(jù)就是一個(gè)用戶(hù)的一次點(diǎn)擊行為,那現(xiàn)在我們想統(tǒng)計(jì)一下每個(gè)用戶(hù)一共點(diǎn)擊了多少次,更甚至我們想拿到每個(gè)用戶(hù)點(diǎn)擊過(guò)的所有item集合。偽代碼如下:
def get_key_value(x): user = x[0] item = x[1] return (user, [item]) rdd = rdd.map(lambda x: get_key_value(x)) rdd = rdd.reduceByKey(lambda a, b: a + b)
首先我們先通過(guò)get_key_value函數(shù)將每條數(shù)據(jù)轉(zhuǎn)化成(key, value)的形式,然后通過(guò)reduceByKey聚合算子進(jìn)行聚合,它就會(huì)把相同key的數(shù)據(jù)聚合在一起,說(shuō)到這里,大家可能不覺(jué)得有什么?這算什么trick!其實(shí)筆者這里想展示的是get_key_value函數(shù)返回形式:[item] 。
為了對(duì)比,這里筆者再列一下兩者的區(qū)別:
def get_key_value(x): user = x[0] item = x[1] return (user, [item]) def get_key_value(x): user = x[0] item = x[1] return (user, item)
可以看到第一個(gè)的value是一個(gè)列表,而第二個(gè)就是單純的item,我們看reduceByKey這里我們用的具體聚合形式是相加,列表相加就是得到一個(gè)更大的列表即:
所以最后我們就拿到了:每個(gè)用戶(hù)點(diǎn)擊過(guò)的所有item集合,具體的是一個(gè)列表。
- 抽樣、分批
在日常中我們需要抽樣出一部分?jǐn)?shù)據(jù)進(jìn)行數(shù)據(jù)分析或者實(shí)驗(yàn),甚至我們需要將數(shù)據(jù)等分成多少份,一份一份用(后面會(huì)說(shuō)),這個(gè)時(shí)候怎么辦呢?
當(dāng)然了spark也有類(lèi)似sample這樣的抽樣算子
那其實(shí)我們也可以實(shí)現(xiàn),而且可以靈活控制等分等等且速度非???,如下:
def get_prefix(x, num): prefix = random.randint(1, num) return [x, num] def get_sample(x): prefix = x[1] if prefix == 1: return True else: return False rdd = rdd.map(lambda x: get_prefix(x, num)) rdd = rdd.filter(lambda x: get_sample(x))
假設(shè)我們需要抽取1/10的數(shù)據(jù)出來(lái),總的思路就是先給每個(gè)樣本打上一個(gè)[1,10]的隨機(jī)數(shù),然后只過(guò)濾出打上1的數(shù)據(jù)即可。
以此類(lèi)推,我們還可以得到3/10的數(shù)據(jù)出來(lái),那就是在過(guò)濾的時(shí)候,取出打上[1,2,3]的即可,當(dāng)然了[4,5,6]也行,只要取三個(gè)就行。
- 笛卡爾積
有的時(shí)候需要在兩個(gè)集合之間做笛卡爾積,假設(shè)這兩個(gè)集合是A和B即兩個(gè)rdd。
首先spark已經(jīng)提供了對(duì)應(yīng)的API即cartesian,具體如下:
rdd_cartesian = rdd_A.cartesian(rdd_B)
其更具體的用法和返回形式大家可以找找相關(guān)博客,很多,筆者這里不再累述。
但是其速度非常慢
尤其當(dāng)rdd_A和rdd_B比較大的時(shí)候,這個(gè)時(shí)候怎么辦呢?
這個(gè)時(shí)候我們可以借助廣播機(jī)制,其實(shí)已經(jīng)有人也用了這個(gè)trick:
http://www.dbjr.com.cn/article/203197.htm
首先說(shuō)一下spark中的廣播機(jī)制,假設(shè)一個(gè)變量被申請(qǐng)為了廣播機(jī)制,那么其實(shí)是緩存了一個(gè)只讀的變量在每臺(tái)機(jī)器上,假設(shè)當(dāng)前rdd_A比較小,rdd_B比較大,那么我可以把rdd_A轉(zhuǎn)化為廣播變量,然后用這個(gè)廣播變量和每個(gè)rdd_B中的每個(gè)元素都去做一個(gè)操作,進(jìn)而實(shí)現(xiàn)笛卡爾積的效果,好了,筆者給一下pyspark的實(shí)現(xiàn):
def ops(A, B): pass def fun(A_list, B): result = [] for cur_A in A_list: result.append(cur_A + B) return result rdd_A = sc.broadcast(rdd_A.collect()) rdd_cartesian = rdd_B.flatMap(lambda x: fun(rdd_A.value, x))
可以看到我們先把rdd_A轉(zhuǎn)化為廣播變量,然后通過(guò)flatMap,將rdd_A和所有rdd_B中的單個(gè)元素進(jìn)行操作,具體是什么操作大家可以在ops函數(shù)中自己定義自己的邏輯。
關(guān)于spark的廣播機(jī)制更多講解,大家也可以找找文檔,很多的,比如:
https://www.cnblogs.com/Lee-yl/p/9777857.html
但目前為止,其實(shí)還沒(méi)有真真結(jié)束,從上面我們可以看到,rdd_A被轉(zhuǎn)化為了廣播變量,但是其有一個(gè)重要的前提:那就是rdd_A比較小。但是當(dāng)rdd_A比較大的時(shí)候,我們?cè)谵D(zhuǎn)化的過(guò)程中,就會(huì)報(bào)內(nèi)存錯(cuò)誤,當(dāng)然了可以通過(guò)增加配置:
spark.driver.maxResultSize=10g
但是如果rdd_A還是極其大呢?換句話說(shuō)rdd_A和rdd_B都是非常大的,哪一個(gè)做廣播變量都是不合適的,怎么辦呢?
其實(shí)我們一部分一部分的做。假設(shè)我們把rdd_A拆分成10份,這樣的話,每一份的量級(jí)就降下來(lái)了,然后把每一份轉(zhuǎn)化為廣播變量且都去和rdd_B做笛卡爾積,最后再匯總一下就可以啦。
有了想法,那么怎么實(shí)現(xiàn)呢?
分批大家都會(huì)了,如上。但是這里面會(huì)有另外一個(gè)問(wèn)題,那就是這個(gè)廣播變量名會(huì)被重復(fù)利用,在進(jìn)行下一批廣播變量的時(shí)候,需要先銷(xiāo)毀,再創(chuàng)建,demo如下:
def ops(A, B): pass def fun(A_list, B): result = [] for cur_A in A_list: result.append(cur_A + B) return result def get_rdd_cartesian(rdd_A, rdd_B): rdd_cartesian = rdd_B.flatMap(lambda x: fun(rdd_A.value, x)) return rdd_cartesian for i in range(len(rdd_A_batch)) qb_rdd_temp = rdd_A_batch[i] qb_rdd_temp = sc.broadcast(qb_rdd_temp.collect()) rdd_cartesian_batch = get_rdd_cartesian(qb_rdd_temp, rdd_B) dw.saveToTable(rdd_cartesian_batch, tdw_table, "p_" + ds, overwrite=False) qb_rdd_temp.unpersist()
可以看到,最主要的就是unpersist()
- 廣播變量應(yīng)用之向量索引
說(shuō)到廣播機(jī)制,這里就再介紹一個(gè)稍微復(fù)雜的demo,乘熱打鐵。
做算法的同學(xué),可能經(jīng)常會(huì)遇到向量索引這一場(chǎng)景:即每一個(gè)item被表征成一個(gè)embedding,然后兩個(gè)item的相似度便可以基于embedding的余弦相似度進(jìn)行量化。向量索引是指假設(shè)來(lái)了一個(gè)query,候選池子里面假設(shè)有幾百萬(wàn)的doc,最終目的就是要從候選池子中挑選出與query最相似的n個(gè)topk個(gè)doc。
關(guān)于做大規(guī)模數(shù)量級(jí)的索引已經(jīng)有很多現(xiàn)成好的API可以用,最常見(jiàn)的包比如有faiss。如果還不熟悉faiss的同學(xué),可以先簡(jiǎn)單搜一下其基本用法,看看demo,很簡(jiǎn)單。
好啦,假設(shè)現(xiàn)在query的量級(jí)是10w,doc的量級(jí)是100w,面對(duì)這么大的量級(jí),我們當(dāng)然是想通過(guò)spark來(lái)并行處理,加快計(jì)算流程。那么該怎么做呢?
這時(shí)我們便可以使用spark的廣播機(jī)制進(jìn)行處理啦,而且很顯然doc應(yīng)該是廣播變量,因?yàn)槊恳粋€(gè)query都要和全部的doc做計(jì)算。
廢話不多說(shuō),直接看實(shí)現(xiàn)
首先建立doc索引:
# 獲取index embedding,并collect,方便后續(xù)建立索引 index_embedding_list = index_embedding_rdd.collect() all_ids = np.array([row[1] for row in index_embedding_list], np.str) all_vectors = np.array([str_to_vec(row[2]) for row in index_embedding_list], np.float32) del(index_embedding_list) #faiss.normalize_L2(all_vectors) print(all_ids[:2]) print(all_vectors[:2]) print("all id size: {}, all vec shape: {}".format(len(all_ids), all_vectors.shape)) # 建立index索引,并轉(zhuǎn)化為廣播變量 faiss_index = FaissIndex(all_ids, all_vectors, self.args.fast_mode, self.args.nlist, self.args.nprobe) del(all_vectors) del(all_ids) print("broadcast start") bc_faiss_index = self.sc.broadcast(faiss_index) print("broadcast done")
這里的index_embedding_rdd就是doc的embedding,可以看到先要collect,然后建立索引。
建立完索引后,就可以開(kāi)始計(jì)算了,但是這里會(huì)有一個(gè)問(wèn)題就是query的量級(jí)也是比較大的,如果一起計(jì)算可能會(huì)OM,所以我們分批次進(jìn)行即batch:
# 開(kāi)始檢索 # https://blog.csdn.net/wx1528159409/article/details/125879542 query_embedding_rdd = query_embedding_rdd.repartition(300) top_n = 5 batch_size = 1000 query_sim_rdd = query_embedding_rdd.mapPartitions( lambda iters: batch_get_nearest_ids( iters, bc_faiss_index, top_n, batch_size ) )
假設(shè)query_embedding_rdd是全部query的embedding,為了實(shí)現(xiàn)batch,我們先將query_embedding_rdd進(jìn)行分區(qū)repartition,然后每個(gè)batch進(jìn)行,可以看到核心就是batch_get_nearest_ids這個(gè)函數(shù):
def batch_get_nearest_ids(iters, bc_faiss_index, top_n, batch_size): import mkl mkl.get_max_threads() res = list() rows = list() for it in iters: rows.append(it) if len(rows) >= batch_size: batch_res = __batch_get_nearest_ids(rows, bc_faiss_index, top_n) res.extend(batch_res) rows = list() if rows: batch_res = __batch_get_nearest_ids(rows, bc_faiss_index, top_n) res.extend(batch_res) return res
從這里可以清楚的看到就是組batch,組夠一個(gè)batch后就可以給當(dāng)前這個(gè)batch內(nèi)的query進(jìn)行計(jì)算最相似的候選啦即__batch_get_nearest_ids這個(gè)核心函數(shù):
def __batch_get_nearest_ids(rows, bc_faiss_index, top_n): import mkl mkl.get_max_threads() import faiss embs = [str_to_vec(row[3]) for row in rows] vec = np.array(embs, np.float32) #faiss.normalize_L2(vec) similarities, dst_ids = bc_faiss_index.value.batch_search(vec, top_n) batch_res = list() for i in range(len(rows)): batch_res.append([str("\\t".join([rows[i][1], rows[i][2]])), "$$$".join(["\\t".join(dst.split("\\t")+[str(round(sim, 2))]) for dst, sim in zip(dst_ids[i], similarities[i])])]) return batch_res
這里就是真真的調(diào)用faiss的索引API進(jìn)行召回啦,當(dāng)然了batch_res這個(gè)就是結(jié)果,自己可以想怎么定義都行,筆者這里不僅返回了召回的item,還返回了query自身的一些信息。
- 注意點(diǎn)
在map的時(shí)候,不論是self的類(lèi)成員還是類(lèi)方法都要放到外面,不要放到類(lèi)里面,不然會(huì)報(bào)錯(cuò)
總結(jié)
總之,在用spark做任何需求之前,一定要牢記能map就map,盡量不要聚合算子,實(shí)在不行就盡可能放到最后。
以上就是Spark處理trick總結(jié)分析的詳細(xì)內(nèi)容,更多關(guān)于Spark處理trick的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解Idea 2020 找不到或無(wú)法安裝官方漢化包解決方案
這篇文章主要介紹了詳解Idea 2020 找不到或無(wú)法安裝官方漢化包解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04常用VsCode 快捷鍵(Window & Mac)GIF演示
本文給大家分享了23個(gè)常用VsCode 快捷鍵(Window & Mac)的GIF動(dòng)圖演示,非常的實(shí)用,有需要的小伙伴可以拿走參考2020-02-02反向傳播BP學(xué)習(xí)算法Gradient?Descent的推導(dǎo)過(guò)程
這篇文章主要為大家介紹了反向傳播BP學(xué)習(xí)算法-Gradient?Descent的推導(dǎo)過(guò)程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05大前端時(shí)代搞定PC/Mac端開(kāi)發(fā),我有絕招(收藏版)
這篇文章主要介紹了大前端時(shí)代搞定PC/Mac端開(kāi)發(fā),我有絕招,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07fiddler抓包小技巧之自動(dòng)保存抓包數(shù)據(jù)的實(shí)現(xiàn)方法分析【可根據(jù)需求過(guò)濾】
這篇文章主要介紹了fiddler抓包小技巧之自動(dòng)保存抓包數(shù)據(jù)的實(shí)現(xiàn)方法,較為詳細(xì)的分析了fiddler自動(dòng)保存抓包數(shù)據(jù)及根據(jù)需求過(guò)濾相關(guān)操作技巧,需要的朋友可以參考下2020-01-01