從Pyspark UDF調(diào)用另一個自定義Python函數(shù)的方法步驟
PySpark,通常稱為 Apache Spark 的 Python API,是為分布式數(shù)據(jù)處理而創(chuàng)建的。它使用戶能夠高效且可擴(kuò)展地對大型數(shù)據(jù)集進(jìn)行復(fù)雜的計(jì)算和轉(zhuǎn)換。用戶定義函數(shù) (UDF),允許用戶創(chuàng)建自己獨(dú)特的函數(shù)并將其應(yīng)用到 Spark DataFrame 或 RDD,這是 PySpark 的主要功能之一 。使用 UDF,可以擴(kuò)展和定制 PySpark 的功能以滿足某些需求。在本文中,我們將學(xué)習(xí)如何從 Pyspark UDF 調(diào)用另一個自定義 Python 函數(shù)。
從 Pyspark UDF 調(diào)用另一個自定義 Python函數(shù)
Python 編碼的 PySpark UDF 提供了調(diào)用其他Python 函數(shù)的能力,無論它們是內(nèi)置函數(shù)還是來自外部庫的用戶定義函數(shù)。通過使用戶能夠利用現(xiàn)有的 Python 代碼,此功能提高了 UDF 的模塊化和可重用性。在分布式 PySpark 環(huán)境中,用戶可以輕松實(shí)現(xiàn)特定領(lǐng)域的邏輯、執(zhí)行具有挑戰(zhàn)性的計(jì)算或使用尖端算法。用戶可以通過從 PySpark UDF 調(diào)用 Python 函數(shù)來充分利用 Python 龐大的庫和功能生態(tài)系統(tǒng)的全部潛力。
從 PySpark UDF 調(diào)用另一個自定義 Python 函數(shù)的步驟
讓我們看看從 Pyspark UDF 調(diào)用另一個自定義 Python 函數(shù)的分步過程。
第1步:導(dǎo)入必要的模塊
首先,從“pyspark.sql.functions” 模塊導(dǎo)入“udf” ,該模塊提供了處理 Spark DataFrame 的工具。
from pyspark.sql.functions import udf
第 2 步:啟動 Spark 會話
接下來,通過導(dǎo)入必要的 Spark 模塊來創(chuàng)建 Spark 會話。
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()
第 3 步:創(chuàng)建數(shù)據(jù)框
下一步是創(chuàng)建一個數(shù)據(jù)幀,用于在 Spark 中執(zhí)行操作。
data = [("Marry", 25), ("Sunny", 30), ("Ram", 35)] df = spark.createDataFrame(data, ["name", "age"])
第 4 步:定義自定義 Python 函數(shù)
然后定義我們希望從 PySpark UDF 調(diào)用的自定義 Python 函數(shù)。我們可以在此函數(shù)中使用我們需要的任何邏輯或計(jì)算。例如,將字符串轉(zhuǎn)換為大寫字符串的函數(shù)。
def to_uppercase(string): return string.upper()
第 5 步: 創(chuàng)建 PySpark UDF
創(chuàng)建自定義 Python 函數(shù)后,使用 “pyspark.sql.functions” 模塊中的 UDF 函數(shù)構(gòu)造 PySpark UDF。 “udf()” 函數(shù)應(yīng)接收自定義 Python 函數(shù)作為參數(shù)。自定義函數(shù)注冊為 UDF,以便它可以應(yīng)用于 DataFrame 列。
to_uppercase_udf = udf(to_uppercase)
步驟 6:將 UDF 應(yīng)用到 DataFrame
創(chuàng)建 PySpark UDF 后,使用 “withColumn()” 函數(shù)將其應(yīng)用到 DataFrame 列。在 DataFrame 中,此方法添加新列或刪除現(xiàn)有列。DataFrame 的每一行都會調(diào)用 UDF 一次,將自定義 Python 函數(shù)應(yīng)用于指定列并生成所需的結(jié)果。
df = df.withColumn("name_uppercase", to_uppercase_udf(df["name"]))
第7步: 顯示數(shù)據(jù)框
最后,我們將使用 “show()” 函數(shù)顯示數(shù)據(jù)框以查看對其所做的更改。
df.show()
按照這些說明,我們可以通過從 PySpark UDF 調(diào)用另一個自定義 Python 函數(shù)來在 PySpark DataFrame 上執(zhí)行自定義計(jì)算和轉(zhuǎn)換。
從 PySpark UDF 調(diào)用另一個自定義 Python 函數(shù)的示例
現(xiàn)在,讓我們看看從 Pyspark UDF 調(diào)用 Python 自定義函數(shù)的幾個不同示例。
示例 1:將 DataFrame 列轉(zhuǎn)換為大寫
在此示例中,我們將使用 Pyspark 創(chuàng)建一個包含人員姓名和年齡的Spark 數(shù)據(jù)框 “df” 。然后我們將定義一個自定義 Python 函數(shù)“ to_uppercase()”,它將Python 字符串作為參數(shù)并將其轉(zhuǎn)換為大寫并將結(jié)果存儲在該數(shù)據(jù)幀的新列中。然后我們使用 Pyspark 的“ udf() ”函數(shù)創(chuàng)建 Pyspark UDF。
Python3
# 導(dǎo)入模塊 from pyspark.sql import SparkSession from pyspark.sql.functions import udf # 定義自定義Python函數(shù) def to_uppercase(string): return string.upper() # 創(chuàng)建一個SparkSession spark = SparkSession.builder.getOrCreate() # 創(chuàng)建一個DataFrame data = [("Marry", 25), ("Sunny", 30), ("Ram", 35)] df = spark.createDataFrame(data, ["name", "age"]) # 創(chuàng)建 PySpark UDF to_uppercase_udf = udf(to_uppercase) # 將UDF應(yīng)用于“name”列 df = df.withColumn("name_uppercase", to_uppercase_udf(df["name"])) # 用于顯示DataFrame的函數(shù) df.show()
輸出:
示例 2: 調(diào)用組合多個 DataFrame 列的自定義 Python 函數(shù)
在此示例中,我們將創(chuàng)建一個包含 2 列的數(shù)據(jù)框 - ' first_name ' 和 ' last_name '。然后創(chuàng)建一個 Python 自定義函數(shù)“ combine_columns ”,它將“first_name”和“last_name”作為參數(shù),并返回一個列,將它們組合在一起以創(chuàng)建“ full_name”。
Python3
# 導(dǎo)入模塊 from pyspark.sql import SparkSession from pyspark.sql.functions import udf # 自定義Python函數(shù) def combine_columns(col1, col2): return col1 + " " + col2 # 創(chuàng)建SparkSession spark = SparkSession.builder.getOrCreate() # 創(chuàng)建一個DataFrame data = [("John", "Doe"), ("Ram", "Kumar"), ("Smith", "Jones")] df = spark.createDataFrame(data, ["first_name", "last_name"]) # 制作PySpark UDF combine_columns_udf = udf(combine_columns) # 將 UDF應(yīng)用于“first_name”和“l(fā)ast_name”列 df = df.withColumn("full_name", combine_columns_udf(df["first_name"], df["last_name"])) # 用于顯示DataFrame的函數(shù) df.show()
輸出:
示例 3:使用外部庫從 PySpark UDF 調(diào)用自定義 Python 函數(shù)
對于更復(fù)雜的計(jì)算,PySpark 使我們能夠在定制函數(shù)中使用外部 Python 庫。假設(shè)我們希望使用模糊匹配庫 “fuzzywuzzy” 和名為 “calculate_similarity” 的自定義 Python 方法來比較兩個文本之間的相似度。
在此示例中,我們從Python 中的 fuzzywuzzy 庫導(dǎo)入“fuzz”模塊,并使用“ fuzz.ratio() ”函數(shù)來確定兩個文本之間的相似程度。我們創(chuàng)建了獨(dú)特的 Python 方法“ calculate_similarity() ”來使用輸入字符串調(diào)用 “fuzz.ratio()” 算法。使用 “udf()” 函數(shù),我們構(gòu)建一個名為 “similarity_udf” 的 UDF 并定義輸入和輸出類型。最后,我們使用 “withColumn()” 方法將 UDF 應(yīng)用于“string1”和“string2”列,并顯示具有相似率的結(jié)果 DataFrame。
Python3
# 導(dǎo)入模塊 from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType from fuzzywuzzy import fuzz # 創(chuàng)建SparkSession spark = SparkSession.builder.getOrCreate() # 使用列“string1”和“string2”獲取的示例DataFrame data = [("apple", "apples"), ("banana", "bananas"), ("cat", "dog")] df = spark.createDataFrame(data, ["string1", "string2"]) # 創(chuàng)建自定義Python函數(shù) def calculate_similarity(str1, str2): return fuzz.ratio(str1, str2) # 從自定義函數(shù)創(chuàng)建自定義項(xiàng) similarity_udf = udf(calculate_similarity, IntegerType()) # 應(yīng)用UDF計(jì)算相似性 df.withColumn("similarity", similarity_udf(df["string1"], df["string2"])).show()
輸出:
示例 4:應(yīng)用具有復(fù)雜邏輯的自定義 Python 函數(shù)
讓我們看一個示例,其中有一個 DataFrame,其中有一列表示句子的字符串,并且我們希望使用名為“ count_words” 的自定義 Python 函數(shù)來確定每個短語中存在多少個單詞。
在此圖中,自定義 Python 函數(shù) “count_words” 使用 “split()” 方法將輸入文本分解為單詞,并使用 “len()” 函數(shù)獲取單詞計(jì)數(shù)。使用 “udf()” 函數(shù),我們構(gòu)建一個名為 “count_udf” 的 UDF并定義輸入和輸出類型。最后,我們使用 “withColumn()” 方法將 UDF 應(yīng)用到 “sentence” 列,并顯示帶有字?jǐn)?shù)統(tǒng)計(jì)的結(jié)果 DataFrame。
Python3
# 導(dǎo)入模塊 from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType # 創(chuàng)建 SparkSession spark = SparkSession.builder.getOrCreate() # 具有列“sentence”的示例DataFrame data = [("Hello, PySpark!",), ("PySpark is great in today's world",), ("Spark DataFrames are powerful in python to work on",)] df = spark.createDataFrame(data, ["sentence"]) # 創(chuàng)建自定義Python函數(shù) def count_words(sentence): return len(sentence.split()) # 從自定義函數(shù)創(chuàng)建自定義項(xiàng) count_udf = udf(count_words, IntegerType()) # 應(yīng)用UDF計(jì)算每句話中的單詞 df.withColumn("word_count", count_udf(df["sentence"])).show()
輸出:
以上就是從Pyspark UDF調(diào)用另一個自定義Python函數(shù)的方法步驟的詳細(xì)內(nèi)容,更多關(guān)于Pyspark UDF調(diào)用Python函數(shù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
python深度學(xué)習(xí)tensorflow實(shí)例數(shù)據(jù)下載與讀取
這篇文章主要為大家介紹了python深度學(xué)習(xí)tensorflow實(shí)例數(shù)據(jù)下載與讀取示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06Python學(xué)習(xí)之路之pycharm的第一個項(xiàng)目搭建過程
這篇文章主要介紹了Python學(xué)習(xí)之路之pycharm的第一個項(xiàng)目搭建過程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-06-06python3使用matplotlib繪制散點(diǎn)圖
這篇文章主要為大家詳細(xì)介紹了python3使用matplotlib繪制散點(diǎn)圖,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-03-03Python關(guān)于__name__屬性的含義和作用詳解
在本篇文章里小編給大家分享的是關(guān)于Python關(guān)于__name__屬性的含義和作用知識點(diǎn),需要的朋友們可以參考下。2020-02-02