欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

pyspark?dataframe列的合并與拆分實例

 更新時間:2023年03月23日 08:36:10   作者:山木枝  
這篇文章主要介紹了pyspark?dataframe列的合并與拆分實例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

pyspark dataframe列的合并與拆分

使用Spark SQL在對數(shù)據(jù)進行處理的過程中,可能會遇到對一列數(shù)據(jù)拆分為多列,或者把多列數(shù)據(jù)合并為一列。

這里記錄一下目前想到的對DataFrame列數(shù)據(jù)進行合并和拆分的幾種方法。

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("dataframe_split") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
 
sc = spark.sparkContext
df = spark.read.csv('hdfs://master:9000/dataset/dataframe_split.csv', inferSchema=True, header=True)
df.show(3)

原始數(shù)據(jù)如下所示

dataframe列數(shù)據(jù)的分割

from pyspark.sql.functions import split, explode, concat, concat_ws
df_split = df.withColumn("s", split(df['score'], " "))
df_split.show()

 

dataframe列數(shù)據(jù)的拆分

zipWithIndex:給每個元素生成一個索引 

排序首先基于分區(qū)索引,然后是每個分區(qū)內(nèi)的項目順序.因此,第一個分區(qū)中的第一個item索引為0,最后一個分區(qū)中的最后一個item的索引最大.當(dāng)RDD包含多個分區(qū)時此方法需要觸發(fā)spark作業(yè).

first_row = df.first()
numAttrs = len(first_row['score'].split(" "))
print("新增列的個數(shù)", numAttrs)
attrs = sc.parallelize(["score_" + str(i) for i in range(numAttrs)]).zipWithIndex().collect()
print("列名:", attrs)
for name, index in attrs:
    df_split = df_split.withColumn(name, df_split['s'].getItem(index))
df_split.show()

 

dataframe將一行分成多行

df_explode = df.withColumn("e", explode(split(df['score'], " ")))
df_explode.show()

dataframe列數(shù)據(jù)的合并

列的合并有兩個函數(shù):一個不添加分隔符concat(),一個添加分隔符concat_ws()

concat

df_concat = df_split.withColumn("score_concat", concat(df_split['score_0'], \
                                                       df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_concat.show()

 

caoncat_ws

df_ws = df_split.withColumn("score_concat", concat_ws('-', df_split['score_0'], \
                                                       df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_ws.show()

dataframe多行轉(zhuǎn)多列

pivot: 旋轉(zhuǎn)當(dāng)前[[dataframe]]列并執(zhí)行指定的聚合 

#DataFrame 數(shù)據(jù)格式:每個用戶對每部電影的評分 userID 用戶ID,movieID 電影ID,rating評分
df=spark.sparkContext.parallelize([[15,399,2], \
                                   [15,1401,5], \
                                   [15,1608,4], \
                                   [15,20,4], \
                                   [18,100,3], \
                                   [18,1401,3], \
                                   [18,399,1]])\
                    .toDF(["userID","movieID","rating"])
#pivot 多行轉(zhuǎn)多列
resultDF = df.groupBy("userID").pivot("movieID").sum("rating").na.fill(-1)
#結(jié)果
resultDF.show()

pyspark dataframe常用操作

總體原則

pyspark中,dataframe與sql的耗時會經(jīng)引擎優(yōu)化,效率高于rdd,因此盡可能使用dataframe或者sql。執(zhí)行效率之外,dataframe優(yōu)于rdd的另一個好處是:dataframe的各個量有語義信息,便于后期維護。比如rdd[0][1][1]這種很難維護,但是,df.info.school.grade就容易理解。

在使用dataframe過程中,應(yīng)盡量避免使用udf,因為序列化數(shù)據(jù)原本在JVM中,現(xiàn)在spark在worker上啟動一個Python進程,需要將全體數(shù)據(jù)序列化成python可解釋的格式,計算昂貴。

列相關(guān)

根據(jù)已有列生成新列

from pyspark.sql.functions import length, col, lit, size
df.withColumn("length_col", length(col("existing_str_col"))) # 將existing_str_col的長度生成新列
df.withColumn("constant_col", lit("hello")) # 生成一列常量
df.withColumn("size_col", size(col("existing_array_col"))) # 將existing_array_col的元素個數(shù)生成新列

從已有列選擇部分列

from pyspark.sql.functions import col
df = df.select(col("col_1").cast("string"), col("col_2").alias("col_2_")) # 選擇col_1列和col_2列,并將col_1列轉(zhuǎn)換為string格式,將col_2列重命名為col_2_,此時不再存在col_2

將幾列連接起來形成新列

from pyspark.sql.functions import concat_ws
?
df = df.withColumn("concat_col", concat_ws("_", df.col_1, df.col_2)) # 使用_將col_1和col_2連接起來,構(gòu)成新列concat_col

將string列分割成list

from pyspark.sql.functions import split
?
df = df.withColumn("split_col", split(df.col, "-")) #按照-將df中的col列分割,此時split_col時一個list,后續(xù)或者配合filter(length(...))使用

統(tǒng)計列均值

from pyspark.sql.functions import mean
?
col_mean = df.select(mean(col)).collect()[0][0]

行相關(guān)

從全體行中選擇部分行(一般調(diào)試時使用)

print(df.take(5)) #交互式的pyspark shell中,等價于df.show(5)

統(tǒng)計行數(shù)量

print(df.count()) #統(tǒng)計行數(shù)量

從全體行中篩選出部分行

from pyspark.sql.functions import col
df = df.filter(col("col_1")==col("col_2")) #保留col_1等于col_2的行

刪除帶null的行

df.na.drop("all") # 只有當(dāng)所有列都為空時,刪除該行
df.na.drop("any") # 任意列為空時,刪除該行
df.na.drop("all", colsubset=["col_1","col_2"]) # 當(dāng)col_1和col_2都為空時,刪除該行

去除重復(fù)行

df = df.distinct() # 刪除所有列值相同的重復(fù)行
df = df.dropDuplicates(["date", "count"]) # 刪除date, count兩列值相同的行

一行拆分成多行

from pyspark.sql.functions import explode, split
?
df = df.withColumn("sub_str", explode(split(df["str_col"], "_"))) # 將str_col按-拆分成list,list中的每一個元素成為sub_str,與原行中的其他列一起組成新的行

填補行中的空值

df.na.fill({"col_name":fill_content}) # 用fill_content填補col_name列的空值

行前加入遞增(不一定連續(xù))唯一序號

from pyspark.sql.functions import monotonically_increasing_id
?
df = df.withColumn("id", monotonically_increasing_id())

兩個dataframe

兩個dataframe根據(jù)某列拼接

df_3 = df_1.join(df_2, df_1.col_1==df_2.col_2) # inner join, 只有當(dāng)df_1中的col_1列值等于df_2中的col_2時,才會拼接
df_4 = df_1.join(df_2, df_1.col_1==df_2.col_2, "left") # left join, 當(dāng)df_1中的col_1值不存在于df_2中時,仍會拼接,憑借值填充null

兩個dataframe合并

df3 = df1.union(df2)

聚合操作

groupBy
from pyspark.sql.functions import concat_ws, split, explode, collect_list, struct
?
concat_df = concat_df.groupBy("sample_id", "sample_date").agg(collect_list('feature').alias("feature_list")) # 將同sample_id, sample_date的行聚合成組,feature字段拼成一個list,構(gòu)成新列feature_list。agg配合groupBy使用,效果等于select。此時concat_df只有兩列:sample_id和feature_list。
concat_tuple_df = concat_df.groupBy("sample_id", "sample_date").agg(collect_list(struct("feature", "owner")).alias("tuple")) # 將同sample_id, sample_date的行聚合成組, (feature, owner)兩個字段拼成一個單位,組內(nèi)所有單位拼成一個list,構(gòu)成新列tuple

窗口函數(shù)

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
?
windowSpec = Window.partitionBy(df.id, df.date).orderBy(col("price").desc(), col("discount").asc()) # 相同id,date的行被聚成組,組內(nèi)按照price降序,discount升序進行排列
df = df.withColumn("rank", row_number().over(windowSpec)) #為排序之后的組進行組內(nèi)編號
df = df.filter(df.rank<=1) # 取組內(nèi)top-1行

讀寫成csv

from pyspark.sql import SparkSession
from pyspark import SparkContext
?
sc = SparkContext(appName="test_rw")
sc_session = SparkSession(sc)
df.write.mode("overwrite").options(header="true").csv(output_path)
df = sc_session.csv.read(input_path, header=True)

dataframe轉(zhuǎn)SQL

from pyspark import SparkContext
from pyspark.sql import SparkSession
?
sc = SparkContext(appName='get_sample')
sc_session = SparkSession(sc)
?
sample_df.createOrReplaceTempView("item_sample_df")
sample_df = sc_session.sql(
? ? ? ? '''
? ? ? ? ? ? select sample_id
? ? ? ? ? ? ? ? ,label
? ? ? ? ? ? ? ? ,type_ as type
? ? ? ? ? ? ? ? ,split(item_id, "_")[2] as owner
? ? ? ? ? ? ? ? ,ftime
? ? ? ? ? ? from item_sample_df
? ? ? ? ''')

自定義函數(shù)UDF(如非必要,勿用)

from pysprak.sql.functions import udf, col
from pyspark.sql.types import StringType, ArrayType, StructField, StructType
?
?
def simple_func(v1, v2):
? ? pass
? ? # return str
?
simple_udf = udf(my_func, StringType())
?
df = df.withColumn("new", simple_udf(df["col_1"], df["col_2"]))
?
?
?
# 復(fù)雜type
?
def get_entity_func():
? ? pass
? ? # return str_list_1, str_list_2
?
entity_schema = StructType([
? ? ? ? ? ? ? ? ? ? StructField("location", ArrayType(StringType()), True),
? ? ? ? ? ? ? ? ? ? StructField("nondigit", ArrayType(StringType()), True)
? ? ? ? ? ? ? ? ])
?
get_entity_udf = udf(get_entity_func, entity_schema)

dataframe與rdd互相轉(zhuǎn)換

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
?
?
sc = SparkContext(appName="rdd2df")
sc_session = SparkSession(sc)
?
rdd = df.rdd # df轉(zhuǎn)rdd, 注意每列仍帶header,要map(lambda line: [line.id, line.price])才可以轉(zhuǎn)換成不帶header
?
schema = StructType([
? ? ? ? ? ? ? ? ? ? StructField("id", StringType(), True),
? ? ? ? ? ? ? ? ? ? StructField("price", FloatType(), True)
? ? ? ? ? ? ? ? ? ? ])
df = sc_session.createDataFrame(rdd, schema) # rdd轉(zhuǎn)df

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Mac PyCharm中的.gitignore 安裝設(shè)置教程

    Mac PyCharm中的.gitignore 安裝設(shè)置教程

    這篇文章主要介紹了Mac PyCharm中的.gitignore 安裝設(shè)置教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-04-04
  • PyCharm 2020.2下配置Anaconda環(huán)境的方法步驟

    PyCharm 2020.2下配置Anaconda環(huán)境的方法步驟

    這篇文章主要介紹了PyCharm 2020.2下配置Anaconda環(huán)境的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-09-09
  • PyTorch詳解經(jīng)典網(wǎng)絡(luò)種含并行連結(jié)的網(wǎng)絡(luò)GoogLeNet實現(xiàn)流程

    PyTorch詳解經(jīng)典網(wǎng)絡(luò)種含并行連結(jié)的網(wǎng)絡(luò)GoogLeNet實現(xiàn)流程

    今天小編就為大家分享一篇Pytorch實現(xiàn)GoogLeNet的方法,GoogLeNet提出了一個名為“Inception”的深度卷積神經(jīng)網(wǎng)結(jié)構(gòu),其目標(biāo)是將分類、識別ILSVRC14數(shù)據(jù)集的技術(shù)水平提高一個層次。這一結(jié)構(gòu)的主要特征是對網(wǎng)絡(luò)內(nèi)部計算資源的利用進行了優(yōu)化
    2022-05-05
  • python使用IPython調(diào)試debug程序

    python使用IPython調(diào)試debug程序

    這篇文章主要為大家介紹了python使用IPython調(diào)試debug程序詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-05-05
  • python實現(xiàn)在pandas.DataFrame添加一行

    python實現(xiàn)在pandas.DataFrame添加一行

    下面小編就為大家分享一篇python實現(xiàn)在pandas.DataFrame添加一行,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-04-04
  • 淺談基于Pytest框架的自動化測試開發(fā)實踐

    淺談基于Pytest框架的自動化測試開發(fā)實踐

    Pytest是Python的一種易用、高效和靈活的單元測試框架,本文主要介紹了基于Pytest框架的自動化測試開發(fā)實踐,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • 利用Python如何生成便簽圖片詳解

    利用Python如何生成便簽圖片詳解

    python現(xiàn)在火熱的程度相信不用過多介紹了,下面這篇文章主要給大家介紹了關(guān)于利用Python如何生成便簽圖片的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用python具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-07-07
  • PHP魔術(shù)方法__ISSET、__UNSET使用實例

    PHP魔術(shù)方法__ISSET、__UNSET使用實例

    這篇文章主要介紹了PHP魔術(shù)方法__ISSET、__UNSET使用實例,本文直接給出代碼示例,需要的朋友可以參考下
    2014-11-11
  • 深入解析NumPy中的Broadcasting廣播機制

    深入解析NumPy中的Broadcasting廣播機制

    在吳恩達老師的深度學(xué)習(xí)專項課程中,老師有提到NumPy中的廣播機制,同時那一周的測驗也有涉及到廣播機制的題目。那么,到底什么是NumPy中的廣播機制?本文就來介紹一下
    2021-05-05
  • Python3 ffmpeg視頻轉(zhuǎn)換工具使用方法解析

    Python3 ffmpeg視頻轉(zhuǎn)換工具使用方法解析

    這篇文章主要介紹了Python3 ffmpeg視頻轉(zhuǎn)換工具使用方法解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-08-08

最新評論