使用PySpark實(shí)現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實(shí)踐詳解
簡(jiǎn)介
PySpark 是 Apache Spark 的 Python API,可用于處理大規(guī)模數(shù)據(jù)集。它提供了豐富的功能和庫(kù),使得數(shù)據(jù)清洗和轉(zhuǎn)換變得更加高效和便捷。
代碼實(shí)踐
本文將以一個(gè)示例數(shù)據(jù)集為例,演示如何使用 PySpark 對(duì)數(shù)據(jù)進(jìn)行清洗和轉(zhuǎn)換。以下是代碼實(shí)現(xiàn)的主要步驟:
步驟 1:連接到遠(yuǎn)程 Spark 服務(wù)器
# Author: 冷月半明 # Date: 2023/12/14 # Description: This script does XYZ. from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("RemoteSparkConnection") \ .master("yarn") \ .config("spark.pyspark.python", "/opt/apps/anaconda3/envs/myspark/bin/python") \ .config("spark.sql.warehouse.dir", "/hive/warehouse") \ .config("hive.metastore.uris", "thrift://node01:9083") \ .config("spark.sql.parquet.writeLegacyFormat", "true") \ .enableHiveSupport() \ .getOrCreate()
當(dāng)使用 PySpark 進(jìn)行大數(shù)據(jù)處理時(shí),首先需要建立與 Spark 集群的連接。在這段代碼中,我們使用了 SparkSession
類來(lái)創(chuàng)建一個(gè)與遠(yuǎn)程 Spark 服務(wù)器的連接,并設(shè)置了連接所需的參數(shù)。
導(dǎo)入必要的庫(kù): 我們首先導(dǎo)入了 SparkSession
類,這是 PySpark 中用于管理 Spark 應(yīng)用程序的入口點(diǎn)。
建立連接: 在接下來(lái)的代碼中,我們使用 SparkSession.builder
來(lái)創(chuàng)建一個(gè) SparkSession
對(duì)象。這個(gè)對(duì)象允許我們?cè)O(shè)置應(yīng)用程序的名稱、集群的主節(jié)點(diǎn)、配置項(xiàng)等參數(shù)。在這個(gè)例子中:
.appName("RemoteSparkConnection")
:為我們的 Spark 應(yīng)用程序設(shè)置了一個(gè)名稱,這有助于在集群中識(shí)別應(yīng)用程序。.master("yarn")
:指定 Spark 應(yīng)用程序的主節(jié)點(diǎn),這里使用的是 YARN 資源管理器,用于分配和管理集群資源。.config("spark.pyspark.python", "/opt/apps/anaconda3/envs/myspark/bin/python")
:設(shè)置了 PySpark 使用的 Python 解釋器路徑,確保在集群中使用正確的 Python 環(huán)境。(因?yàn)樗褂玫沫h(huán)境為anaconda創(chuàng)建的虛擬環(huán)境,先進(jìn)入虛擬環(huán)境,然后使用which python查看解釋器位置).config("spark.sql.warehouse.dir", "/hive/warehouse")
:指定了 Spark SQL 的倉(cāng)庫(kù)目錄,這對(duì)于數(shù)據(jù)存儲(chǔ)和管理非常重要。如果不指定的話使用sparksql獲取hive的時(shí)候可能會(huì)出現(xiàn)問(wèn)題。.config("hive.metastore.uris", "thrift://node01:9083")
:配置 Hive 元數(shù)據(jù)存儲(chǔ)的 URI,Hive 是 Hadoop 生態(tài)系統(tǒng)中的一部分,用于管理數(shù)據(jù)倉(cāng)庫(kù)。如果不指定的話使用sparksql獲取hive的時(shí)候可能會(huì)出只能獲取define默認(rèn)倉(cāng)庫(kù)的情況。.config("spark.sql.parquet.writeLegacyFormat", "true")
:設(shè)置了寫入 Parquet 格式數(shù)據(jù)時(shí)使用傳統(tǒng)格式,確保兼容性和向后兼容性。因?yàn)閟park寫入和hive不同,使用該配置可保證spark寫入hive的數(shù)據(jù),hive能正常訪問(wèn)。.enableHiveSupport()
:?jiǎn)⒂昧藢?duì) Hive 的支持,允許在 Spark 中使用 Hive 的功能和特性。.getOrCreate()
:最后使用.getOrCreate()
方法創(chuàng)建或獲取 SparkSession 實(shí)例。
總而言之,這段代碼建立了與遠(yuǎn)程 Spark 服務(wù)器的連接,并配置了各種參數(shù)以確保應(yīng)用程序能夠正確地運(yùn)行和訪問(wèn)集群資源。這是使用 PySpark 開展大數(shù)據(jù)處理工作的第一步,為后續(xù)的數(shù)據(jù)處理和分析創(chuàng)建了必要的環(huán)境和基礎(chǔ)設(shè)施。
步驟 2:加載數(shù)據(jù)
df = spark.sql("SELECT * FROM cjw_data.xiecheng;")
使用 PySpark 的 spark.sql()
函數(shù)執(zhí)行 SQL 查詢,將查詢結(jié)果加載到 DataFrame 中,為后續(xù)的數(shù)據(jù)操作和分析做好準(zhǔn)備。這種靈活性和強(qiáng)大的數(shù)據(jù)處理能力是 PySpark 在大數(shù)據(jù)處理中的關(guān)鍵優(yōu)勢(shì)之一。
步驟 3:數(shù)據(jù)清洗與 JSON 格式轉(zhuǎn)換
from pyspark.sql.functions import udf import json def json_clean(commentlist): try: jsonstr = str(commentlist) s = jsonstr.replace("'", '"') s = '[' + s.replace('}{', '},{') + ']' python_obj = json.loads(s, strict=False) json_str = json.dumps(python_obj) return json_str except: return None json_clean_udf = udf(json_clean, StringType()) df = df.withColumn("new_commentlist", json_clean_udf(df["commentlist"])) newdf = df.withColumn("commentlist", df["new_commentlist"]) newdf = newdf.drop("new_commentlist")
在 PySpark 中定義并應(yīng)用一個(gè)用戶自定義函數(shù)(UDF)來(lái)對(duì)數(shù)據(jù)進(jìn)行清洗和轉(zhuǎn)換。
定義數(shù)據(jù)清洗函數(shù): json_clean()
函數(shù)接收一個(gè)名為 commentlist
的參數(shù),這個(gè)函數(shù)用于將從數(shù)據(jù)庫(kù)中檢索到的評(píng)論數(shù)據(jù)進(jìn)行清洗。具體來(lái)說(shuō):
jsonstr = str(commentlist)
:將傳入的commentlist
轉(zhuǎn)換為字符串格式。s = jsonstr.replace("'", '"')
:將字符串中的單引號(hào)替換為雙引號(hào),以滿足 JSON 格式的要求。s = '[' + s.replace('}{', '},{') + ']'
:在字符串中的每個(gè)對(duì)象之間添加逗號(hào),并將整個(gè)字符串包含在一個(gè)數(shù)組中,以滿足 JSON 格式。python_obj = json.loads(s, strict=False)
:將字符串解析為 Python 對(duì)象。json_str = json.dumps(python_obj)
:將 Python 對(duì)象轉(zhuǎn)換回 JSON 字符串格式。return json_str
:返回清洗后的 JSON 字符串,如果清洗失敗則返回None
。
創(chuàng)建用戶定義函數(shù)(UDF): 使用 udf()
函數(shù)將 Python 函數(shù) json_clean()
封裝為 PySpark 的用戶定義函數(shù)(UDF),以便在 Spark 中使用。
應(yīng)用函數(shù)到 DataFrame: df.withColumn()
函數(shù)將定義的 UDF 應(yīng)用于 DataFrame 中的 commentlist
列,并將處理后的結(jié)果存儲(chǔ)到名為 new_commentlist
的新列中。
更新 DataFrame: 創(chuàng)建了一個(gè)新的 DataFrame newdf
,通過(guò)在原始 DataFrame df
的基礎(chǔ)上添加了經(jīng)過(guò)清洗的 commentlist
列,并刪除了原始的 new_commentlist
列。
步驟 4:保存清洗后的數(shù)據(jù)
newdf.write.mode("overwrite").saveAsTable("cjw_data.xiechengsentiment")
- 使用
write
方法:write
方法用于將 DataFrame 中的數(shù)據(jù)寫入外部存儲(chǔ),可以是文件系統(tǒng)或數(shù)據(jù)庫(kù)。 - 指定保存模式(Mode): 在這個(gè)例子中,
.mode("overwrite")
指定了保存模式為 "overwrite",即如果目標(biāo)位置已存在同名的表或數(shù)據(jù),將覆蓋(重寫)已有的內(nèi)容。 - 保存為表(
saveAsTable
):saveAsTable()
方法將 DataFrame 中的數(shù)據(jù)保存為一個(gè)新的表,名為cjw_data.xiechengsentiment
。這意味著,如果該表不存在,它將會(huì)被創(chuàng)建;如果表已存在,根據(jù)指定的模式進(jìn)行重寫或追加。
步驟 5:統(tǒng)計(jì)數(shù)據(jù)
comment_count = newdf.filter(newdf.commentlist != "[]").count() total_count = newdf.count() print("有效長(zhǎng)度:", comment_count) print("總長(zhǎng)度:", total_count)
- 篩選非空數(shù)據(jù)行:
newdf.filter(newdf.commentlist != "[]")
這部分代碼使用filter()
方法篩選出newdf
DataFrame 中commentlist
列不為空的行。這里使用的條件是commentlist != "[]"
,即不等于空列表的行。也就是篩選出之前udf函數(shù)里返還為空的那些解析JSON異常的行。 - 統(tǒng)計(jì)有效數(shù)據(jù)長(zhǎng)度: 通過(guò)
count()
方法統(tǒng)計(jì)篩選后的 DataFrame 中的行數(shù),即得到了符合條件的有效數(shù)據(jù)的長(zhǎng)度,并將結(jié)果存儲(chǔ)在變量comment_count
中。 - 統(tǒng)計(jì)總長(zhǎng)度:
newdf.count()
統(tǒng)計(jì)了整個(gè) DataFrame 的行數(shù),即獲取了總長(zhǎng)度,并將結(jié)果存儲(chǔ)在變量total_count
中。
截圖示例
清洗后示意圖:
hive表中查看清洗后的數(shù)據(jù):
輸出的字符串中包含了轉(zhuǎn)義字符(例如 \u597d),這些字符實(shí)際上是 Unicode 字符的表示方式,而不是真正的亂碼。 Python 中的 json.dumps() 方法默認(rèn)將非 ASCII 字符串轉(zhuǎn)換為 Unicode 轉(zhuǎn)義序列。這種轉(zhuǎn)義是為了確保 JSON 字符串可以被準(zhǔn)確地傳輸和解析,但可能會(huì)在輸出時(shí)顯示為 Unicode 轉(zhuǎn)義字符。
JSON 是一種數(shù)據(jù)交換格式,它使用 Unicode 轉(zhuǎn)義序列(比如 \uXXXX)來(lái)表示非 ASCII 字符。在默認(rèn)情況下,json.dumps() 將非 ASCII 字符轉(zhuǎn)義為 Unicode 字符以確保其正確性,并且這種轉(zhuǎn)義對(duì)于網(wǎng)絡(luò)傳輸和解析是非常重要的
總結(jié)
本文介紹了使用 PySpark 對(duì)數(shù)據(jù)進(jìn)行清洗和 JSON 格式轉(zhuǎn)換的過(guò)程。通過(guò)上述步驟,我們可以連接到遠(yuǎn)程 Spark 服務(wù)器,加載數(shù)據(jù),應(yīng)用自定義函數(shù)對(duì)數(shù)據(jù)進(jìn)行清洗和格式轉(zhuǎn)換,并最終保存清洗后的數(shù)據(jù)。這個(gè)流程展示了 PySpark 在數(shù)據(jù)處理中的強(qiáng)大功能,特別是在大規(guī)模數(shù)據(jù)集的處理和轉(zhuǎn)換方面的優(yōu)勢(shì)。
以上就是使用PySpark實(shí)現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實(shí)踐詳解的詳細(xì)內(nèi)容,更多關(guān)于PySpark數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
python將二維數(shù)組升為一維數(shù)組或二維降為一維方法實(shí)例
在實(shí)際應(yīng)用的過(guò)程中,遇到列表或是數(shù)組的維數(shù)不同,需要變換的問(wèn)題,如二維列表/數(shù)組變成了一維列表/數(shù)組,下面這篇文章主要給大家介紹了關(guān)于python將二維數(shù)組升為一維數(shù)組或二維降為一維的相關(guān)資料,需要的朋友可以參考下2022-11-11Python實(shí)現(xiàn)合并兩個(gè)字典的8種方法
Python有多種方法可以通過(guò)使用各種函數(shù)和構(gòu)造函數(shù)來(lái)合并字典,本文主要介紹了Python實(shí)現(xiàn)合并兩個(gè)字典的8種方法,具有一定的參考價(jià)值,感興趣的可以了解一下2024-07-07Python Pandas兩個(gè)表格內(nèi)容模糊匹配的實(shí)現(xiàn)
模糊查詢大家應(yīng)該都不會(huì)陌生,下面這篇文章主要給大家介紹了關(guān)于Python Pandas兩個(gè)表格內(nèi)容模糊匹配的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2021-11-11Python實(shí)現(xiàn)半角轉(zhuǎn)全角的方法示例
本文介紹了使用Python實(shí)現(xiàn)半角字符到全角字符的轉(zhuǎn)換,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2025-01-01