欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

spark?dataframe全局排序id與分組后保留最大值行

 更新時(shí)間:2023年02月09日 08:49:25   作者:算法全棧之路  
這篇文章主要為大家介紹了spark?dataframe全局排序id與分組后保留最大值行實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

正文

作為一個(gè)算法工程師,日常學(xué)習(xí)和工作中,不光要 訓(xùn)練模型關(guān)注效果 ,更多的 時(shí)間 是在 準(zhǔn)備樣本數(shù)據(jù)與分析數(shù)據(jù) 等,而這些過(guò)程 都與 大數(shù)據(jù) spark和hadoop生態(tài) 的若干工具息息相關(guān)。

今天我們就不在更新 機(jī)器學(xué)習(xí)算法模型 相關(guān)的內(nèi)容,分享兩個(gè) spark函數(shù) 吧,以前也在某種場(chǎng)景中使用過(guò)但沒(méi)有保存收藏,哎??! 事前不搜藏,臨時(shí)抱佛腳 的感覺(jué) 真是 痛苦,太耽誤干活了 。

so,把這 兩個(gè)函數(shù) 記在這里 以備不時(shí) 之需~

(1) 得到 spark dataframe 全局排序ID

這個(gè)函數(shù)的 應(yīng)用場(chǎng)景 就是:根據(jù)某一列的數(shù)值對(duì) spark 的 dataframe 進(jìn)行排序, 得到全局多分區(qū)排序的全局有序ID,新增一列保存這個(gè)rank id ,并且保留別的列的數(shù)據(jù)無(wú)變化 。

有用戶會(huì)說(shuō),這不是很容易嗎 ,直接用 orderBy 不就可以了嗎,但是難點(diǎn)是:orderBy完記錄下全局ID 并且 保持原來(lái)全部列的DF數(shù)據(jù) 。

多說(shuō)無(wú)益,遇到這個(gè)場(chǎng)景 直接copy 用起來(lái) 就知道 有多爽 了,同類問(wèn)題 我們可以 用下面 這個(gè)函數(shù) 解決 ~

scala 寫的 spark 版本代碼:

def dfZipWithIndex(
  df: DataFrame,
  offset: Int = 1,
  colName: String ="rank_id",
  inFront: Boolean = true
) : DataFrame = {
  df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(ln =>
      Row.fromSeq(
        (if (inFront) Seq(ln._2 + offset) else Seq())
          ++ ln._1.toSeq ++
        (if (inFront) Seq() else Seq(ln._2 + offset))
      )
    ),
    StructType(
      (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())
        ++ df.schema.fields ++
      (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
    )
  )
}

函數(shù)調(diào)用我們可以用這行代碼調(diào)用: val ranked_df = dfZipWithIndex(raw_df.orderBy($"predict_score".desc)), 直接復(fù)制過(guò)去就可以~

python寫的 pyspark 版本代碼:

from pyspark.sql.types import LongType, StructField, StructType
def dfZipWithIndex (df, offset=1, colName="rank_id"):
    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )
    zipped_rdd = df.rdd.zipWithIndex()
    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row)))
    return spark.createDataFrame(new_rdd, new_schema)

調(diào)用 同理 , 這里我就不在進(jìn)行贅述了。

(2)分組后保留最大值行

這個(gè)函數(shù)的 應(yīng)用場(chǎng)景 就是: 當(dāng)我們使用 spark 或則 sparkSQL 查找某個(gè) dataframe 數(shù)據(jù)的時(shí)候,在某一天里,任意一個(gè)用戶可能有多條記錄,我們需要 對(duì)每一個(gè)用戶,保留dataframe 中 某列值最大 的那行數(shù)據(jù)

其中的 關(guān)鍵點(diǎn) 在于:一次性求出對(duì)每個(gè)用戶分組后,求得每個(gè)用戶的多行記錄中,某個(gè)值最大的行進(jìn)行數(shù)據(jù)保留

當(dāng)然,經(jīng)過(guò) 簡(jiǎn)單修改代碼,不一定是最大,最小也是可以的,平均都o(jì)k 。

scala 寫的 spark 版本代碼:

// 得到一天內(nèi)一個(gè)用戶多個(gè)記錄里面時(shí)間最大的那行用戶的記錄
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions
val w = Window.partitionBy("user_id")
val result_df = raw_df
    .withColumn("max_time",functions.max("time").over(w))
    .where($"time" === $"max_time")
    .drop($"max_time")

python寫的 pyspark 版本代碼:

# pyspark dataframe 某列值最大的元素所在的那一行 
# GroupBy 列并過(guò)濾 Pyspark 中某列值最大的行 
# 創(chuàng)建一個(gè)Window 以按A列進(jìn)行分區(qū),并使用它來(lái)計(jì)算每個(gè)組的最大值。然后過(guò)濾出行,使 B 列中的值等于最大值 
from pyspark.sql import Window
w = Window.partitionBy('user_id')
result_df = spark.sql(raw_df).withColumn('max_time', fun.max('time').over(w))\
    .where(fun.col('time') == fun.col('time'))
    .drop('max_time')

我們可以看到: 這個(gè)函數(shù)的關(guān)鍵就是運(yùn)用了 spark 的 window 函數(shù) ,靈活運(yùn)用 威力無(wú)窮 哦 !

到這里,spark利器2函數(shù)之dataframe全局排序id與分組后保留最大值行 的全文 就寫完了 ,更多關(guān)于spark dataframe全局排序的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • python如何正確使用yield

    python如何正確使用yield

    在 Python 開(kāi)發(fā)中,yield 關(guān)鍵字的使用其實(shí)較為頻繁,例如大集合的生成,簡(jiǎn)化代碼結(jié)構(gòu)、協(xié)程與并發(fā)都會(huì)用到它。但是,你是否真正了解 yield 的運(yùn)行過(guò)程呢?這篇文章,我們就來(lái)看一下 yield 的運(yùn)行流程,以及在開(kāi)發(fā)中哪些場(chǎng)景適合使用yield
    2021-05-05
  • 使用python在校內(nèi)發(fā)人人網(wǎng)狀態(tài)(人人網(wǎng)看狀態(tài))

    使用python在校內(nèi)發(fā)人人網(wǎng)狀態(tài)(人人網(wǎng)看狀態(tài))

    人人網(wǎng)怎么發(fā)狀態(tài)?下面使用python實(shí)現(xiàn)這個(gè)功能,大家參考使用吧
    2014-02-02
  • Python networkx中獲取圖的鄰接矩陣方式

    Python networkx中獲取圖的鄰接矩陣方式

    這篇文章主要介紹了Python networkx中獲取圖的鄰接矩陣方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-12-12
  • pandas缺失值np.nan, np.isnan, None, pd.isnull, pd.isna

    pandas缺失值np.nan, np.isnan, None, pd.isnull,&n

    本文主要介紹了pandas缺失值np.nan, np.isnan, None, pd.isnull, pd.isna
    2024-04-04
  • Python實(shí)現(xiàn)批量轉(zhuǎn)換文件編碼的方法

    Python實(shí)現(xiàn)批量轉(zhuǎn)換文件編碼的方法

    這篇文章主要介紹了Python實(shí)現(xiàn)批量轉(zhuǎn)換文件編碼的方法,涉及Python針對(duì)文件的遍歷及編碼轉(zhuǎn)換實(shí)現(xiàn)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-07-07
  • pycharm重置設(shè)置,恢復(fù)默認(rèn)設(shè)置的方法

    pycharm重置設(shè)置,恢復(fù)默認(rèn)設(shè)置的方法

    今天小編就為大家分享一篇pycharm重置設(shè)置,恢復(fù)默認(rèn)設(shè)置的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2018-10-10
  • OpenCV實(shí)戰(zhàn)之實(shí)現(xiàn)手勢(shì)虛擬縮放效果

    OpenCV實(shí)戰(zhàn)之實(shí)現(xiàn)手勢(shì)虛擬縮放效果

    本篇將會(huì)以HandTrackingModule為模塊,實(shí)現(xiàn)通過(guò)手勢(shì)對(duì)本人的博客海報(bào)進(jìn)行縮放。文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,需要的可以參考一下
    2022-11-11
  • python基礎(chǔ)教程之udp端口掃描

    python基礎(chǔ)教程之udp端口掃描

    開(kāi)發(fā)一個(gè)程序,用于獲取局域網(wǎng)中開(kāi)啟snmp服務(wù)的主機(jī)ip地址列表,并寫入相應(yīng)文件以便其它程序使用。下面是實(shí)現(xiàn)方法
    2014-02-02
  • Python協(xié)程的四種實(shí)現(xiàn)方式總結(jié)

    Python協(xié)程的四種實(shí)現(xiàn)方式總結(jié)

    今天繼續(xù)給大家介紹Python關(guān)知識(shí),本文主要內(nèi)容是Python協(xié)程的四種實(shí)現(xiàn)方式。文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2023-01-01
  • 使用PyV8在Python爬蟲(chóng)中執(zhí)行js代碼

    使用PyV8在Python爬蟲(chóng)中執(zhí)行js代碼

    PyV8是chrome用來(lái)執(zhí)行javascript的引擎,據(jù)說(shuō)是最快的js引擎,通過(guò)pyv8的封裝,可以在python中使用。下面這篇文章主要介紹了使用PyV8在Python爬蟲(chóng)中執(zhí)行js代碼的相關(guān)資料,需要的朋友可以參考下。
    2017-02-02

最新評(píng)論