欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spark處理數(shù)據(jù)排序問題如何避免OOM

 更新時(shí)間:2020年05月21日 11:00:38   作者:Sheep Sun  
這篇文章主要介紹了Spark處理數(shù)據(jù)排序問題如何避免OOM,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下

錯(cuò)誤思想

舉個(gè)列子,當(dāng)我們想要比較 一個(gè) 類型為 RDD[(Long, (String, Int))] 的RDD,讓它先按Long分組,然后按int的值進(jìn)行倒序排序,最容易想到的思維就是先分組,然后把Iterable 轉(zhuǎn)換為 list,然后sortby,但是這樣卻有一個(gè)致命的缺點(diǎn),就是Iterable 在內(nèi)存中是一個(gè)指針,不占內(nèi)存,而list是一個(gè)容器,占用內(nèi)存,如果Iterable 含有元素過多,那么極易引起OOM

 val cidAndSidCountGrouped: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey()
    // 4. 排序, 取top10
    val result: RDD[(Long, List[(String, Int)])] = cidAndSidCountGrouped.map {
      case (cid, sidCountIt) =>
        // sidCountIt 排序, 取前10
        // Iterable轉(zhuǎn)成容器式集合的時(shí)候, 如果數(shù)據(jù)量過大, 極有可能導(dǎo)致oom
        (cid, sidCountIt.toList.sortBy(-_._2).take(5))
    }

首先,我們要知道,RDD 的排序需要 shuffle, 是采用了內(nèi)存+磁盤來完成的排序.這樣能有效避免OOM的風(fēng)險(xiǎn),但是RDD是全部排序,所以需要針對(duì)性的過濾Key值來進(jìn)行排序

方法一 利用RDD排序特點(diǎn)

 //把long(即key值)提取出來
    val cids: List[Long] = categoryCountList.map(_.cid.toLong)
    val buffer: ListBuffer[(Long, List[(String, Int)])] = ListBuffer[(Long, List[(String, Int)])]()
    //根據(jù)每個(gè)key來過濾RDD
    for (cid <- cids) {
      /*
      List((15,(632972a4-f811-4000-b920-dc12ea803a41,10)), (15,(f34878b8-1784-4d81-a4d1-0c93ce53e942,8)), (15,(5e3545a0-1521-4ad6-91fe-e792c20c46da,8)), (15,(66a421b0-839d-49ae-a386-5fa3ed75226f,8)), (15,(9fa653ec-5a22-4938-83c5-21521d083cd0,8)))
      目標(biāo):
      (9,List((199f8e1d-db1a-4174-b0c2-ef095aaef3ee,9), (329b966c-d61b-46ad-949a-7e37142d384a,8), (5e3545a0-1521-4ad6-91fe-e792c20c46da,8), (e306c00b-a6c5-44c2-9c77-15e919340324,7), (bed60a57-3f81-4616-9e8b-067445695a77,7)))
       */
      val arr: Array[(String, Int)] = cidAndSidCount.filter(cid == _._1)
        .sortBy(-_._2._2)
        .take(5)
        .map(_._2)
      buffer += ((cid, arr.toList))
    }
    buffer.foreach(println)

這樣做也有缺點(diǎn):即有多少個(gè)key,就有多少個(gè)Job,占用資源

方法二 利用TreeSet自動(dòng)排序特性

 def statCategoryTop10Session_3(sc: SparkContext,
                  categoryCountList: List[CategroyCount],
                  userVisitActionRDD: RDD[UserVisitAction]) = {
    // 1. 過濾出來 top10品類的所有點(diǎn)擊記錄
    // 1.1 先map出來top10的品類id
    val cids = categoryCountList.map(_.cid.toLong)
    val topCategoryActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))


    // 2. 計(jì)算每個(gè)品類 下的每個(gè)session 的點(diǎn)擊量 rdd ((cid, sid) ,1)
    val cidAndSidCount: RDD[(Long, (String, Int))] = topCategoryActionRDD
      .map(action => ((action.click_category_id, action.session_id), 1))
      // 使用自定義分區(qū)器 重點(diǎn)理解分區(qū)器的原理
      .reduceByKey(new CategoryPartitioner(cids), _ + _)
      .map {
        case ((cid, sid), count) => (cid, (sid, count))
      }
    
    // 3. 排序取top10
//因?yàn)橐呀?jīng)按key分好了區(qū),所以用Mappartitions ,在每個(gè)分區(qū)中新建一個(gè)TreeSet即可
    val result: RDD[(Long, List[SessionInfo])] = cidAndSidCount.mapPartitions((it: Iterator[(Long, (String, Int))]) => {
//new 一個(gè)TreeSet,并同時(shí)指定排序規(guī)則
   var treeSet: mutable.TreeSet[CategorySession] = new mutable.TreeSet[CategorySession]()(new Ordering[CategorySession] {
          override def compare(x: CategorySession, y: CategorySession): Int = {
            if (x.clickCount >= y.clickCount) -1 else 1
          }
        })
   var id = 0l
  iter.foreach({
    case (l, session) => {
      id = l
      treeSet.add(session)
    if (treeSet.size > 10) treeSet = treeSet.take(10)
          }
        })
        Iterator(id, treeSet)
      })
  
    result.collect.foreach(println)
    
    Thread.sleep(1000000)
  }
}

/*
根據(jù)傳入的key值來決定分區(qū)號(hào),讓相同key進(jìn)入相同的分區(qū),能夠避免多次shuffle
 */
class CategoryPartitioner(cids: List[Long]) extends Partitioner {
  // 用cid索引, 作為將來他的分區(qū)索引.
  private val cidWithIndex: Map[Long, Int] = cids.zipWithIndex.toMap
  
  // 返回集合的長(zhǎng)度
  override def numPartitions: Int = cids.length
  
  // 根據(jù)key返回分區(qū)的索引
  override def getPartition(key: Any): Int = {
    key match {
      // 根據(jù)品類id返回分區(qū)的索引!  0-9
      case (cid: Long, _) =>
        cidWithIndex(cid)
    }
  }
}

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • python 中值濾波,椒鹽去噪,圖片增強(qiáng)實(shí)例

    python 中值濾波,椒鹽去噪,圖片增強(qiáng)實(shí)例

    今天小編就為大家分享一篇python 中值濾波,椒鹽去噪,圖片增強(qiáng)實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2019-12-12
  • Python讀取postgresql數(shù)據(jù)庫(kù)詳情

    Python讀取postgresql數(shù)據(jù)庫(kù)詳情

    這篇文章主要介紹了Python讀取postgresql數(shù)據(jù)庫(kù)詳情,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下
    2022-09-09
  • 在Python中定義函數(shù)并調(diào)用的操作步驟

    在Python中定義函數(shù)并調(diào)用的操作步驟

    這篇文章主要介紹了在Python中如何定義函數(shù)并調(diào)用它,函數(shù)的定義和調(diào)用是Python編程中最基本也是最重要的概念之一,掌握它們對(duì)于進(jìn)行有效的Python編程至關(guān)重要,需要的朋友可以參考下
    2024-01-01
  • Python實(shí)現(xiàn)PDF頁(yè)面的刪除與添加功能

    Python實(shí)現(xiàn)PDF頁(yè)面的刪除與添加功能

    在處理PDF文檔的過程中,我們時(shí)常會(huì)需要對(duì)PDF文檔中的頁(yè)面進(jìn)行編輯操作的情況,如插入和刪除頁(yè)面,通過添加和刪除PDF頁(yè)面,我們可以增加內(nèi)容或?qū)Σ恍枰膬?nèi)容進(jìn)行刪除,本文將介紹如何使用Python代碼實(shí)現(xiàn)在PDF文檔中添加和刪除頁(yè)面
    2024-04-04
  • python選擇排序算法的實(shí)現(xiàn)代碼

    python選擇排序算法的實(shí)現(xiàn)代碼

    這篇文章主要介紹了python選擇排序算法的實(shí)現(xiàn)代碼,大家參考
    2013-11-11
  • python中如何設(shè)置list步長(zhǎng)

    python中如何設(shè)置list步長(zhǎng)

    這篇文章主要介紹了python中如何設(shè)置list步長(zhǎng),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-05-05
  • Python使用python-docx讀寫word文檔

    Python使用python-docx讀寫word文檔

    這篇文章主要為大家詳細(xì)介紹了Python使用python-docx讀寫word文檔,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-08-08
  • python 實(shí)現(xiàn)學(xué)生信息管理系統(tǒng)的示例

    python 實(shí)現(xiàn)學(xué)生信息管理系統(tǒng)的示例

    本篇文章主要分享python學(xué)生管理系統(tǒng)的使用,文章非常詳細(xì)地介紹了通過示例代碼實(shí)現(xiàn)的學(xué)生管理系統(tǒng),該系統(tǒng)對(duì)每個(gè)人的研究或工作都有一定的參考學(xué)習(xí)價(jià)值,希望你能在其中有所收獲。
    2020-11-11
  • 詳解Django配置優(yōu)化方法

    詳解Django配置優(yōu)化方法

    這篇文章主要介紹了詳解Django配置優(yōu)化方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • python中的try except與R語(yǔ)言中的tryCatch異常解決

    python中的try except與R語(yǔ)言中的tryCatch異常解決

    這篇文章主要為大家介紹了python中的try except與R語(yǔ)言中的tryCatch異常解決的方式及分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助
    2021-11-11

最新評(píng)論