欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用PySpark實(shí)現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實(shí)踐詳解

 更新時間:2023年12月15日 08:21:24   作者:冷月半明  
在大數(shù)據(jù)處理中,PySpark?提供了強(qiáng)大的工具來處理海量數(shù)據(jù),特別是在數(shù)據(jù)清洗和轉(zhuǎn)換方面,本文將介紹如何使用?PySpark?進(jìn)行數(shù)據(jù)清洗,并將數(shù)據(jù)格式轉(zhuǎn)換為?JSON?格式的實(shí)踐,感興趣的可以了解下

簡介

PySpark 是 Apache Spark 的 Python API,可用于處理大規(guī)模數(shù)據(jù)集。它提供了豐富的功能和庫,使得數(shù)據(jù)清洗和轉(zhuǎn)換變得更加高效和便捷。

代碼實(shí)踐

本文將以一個示例數(shù)據(jù)集為例,演示如何使用 PySpark 對數(shù)據(jù)進(jìn)行清洗和轉(zhuǎn)換。以下是代碼實(shí)現(xiàn)的主要步驟:

步驟 1:連接到遠(yuǎn)程 Spark 服務(wù)器

# Author: 冷月半明
# Date: 2023/12/14
# Description: This script does XYZ.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("RemoteSparkConnection") \
    .master("yarn") \
    .config("spark.pyspark.python", "/opt/apps/anaconda3/envs/myspark/bin/python") \
    .config("spark.sql.warehouse.dir", "/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://node01:9083") \
    .config("spark.sql.parquet.writeLegacyFormat", "true") \
    .enableHiveSupport() \
    .getOrCreate()

當(dāng)使用 PySpark 進(jìn)行大數(shù)據(jù)處理時,首先需要建立與 Spark 集群的連接。在這段代碼中,我們使用了 SparkSession 類來創(chuàng)建一個與遠(yuǎn)程 Spark 服務(wù)器的連接,并設(shè)置了連接所需的參數(shù)。

導(dǎo)入必要的庫: 我們首先導(dǎo)入了 SparkSession 類,這是 PySpark 中用于管理 Spark 應(yīng)用程序的入口點(diǎn)。

建立連接: 在接下來的代碼中,我們使用 SparkSession.builder 來創(chuàng)建一個 SparkSession 對象。這個對象允許我們設(shè)置應(yīng)用程序的名稱、集群的主節(jié)點(diǎn)、配置項(xiàng)等參數(shù)。在這個例子中:

  • .appName("RemoteSparkConnection"):為我們的 Spark 應(yīng)用程序設(shè)置了一個名稱,這有助于在集群中識別應(yīng)用程序。
  • .master("yarn"):指定 Spark 應(yīng)用程序的主節(jié)點(diǎn),這里使用的是 YARN 資源管理器,用于分配和管理集群資源。
  • .config("spark.pyspark.python", "/opt/apps/anaconda3/envs/myspark/bin/python"):設(shè)置了 PySpark 使用的 Python 解釋器路徑,確保在集群中使用正確的 Python 環(huán)境。(因?yàn)樗褂玫沫h(huán)境為anaconda創(chuàng)建的虛擬環(huán)境,先進(jìn)入虛擬環(huán)境,然后使用which python查看解釋器位置)
  • .config("spark.sql.warehouse.dir", "/hive/warehouse"):指定了 Spark SQL 的倉庫目錄,這對于數(shù)據(jù)存儲和管理非常重要。如果不指定的話使用sparksql獲取hive的時候可能會出現(xiàn)問題。
  • .config("hive.metastore.uris", "thrift://node01:9083"):配置 Hive 元數(shù)據(jù)存儲的 URI,Hive 是 Hadoop 生態(tài)系統(tǒng)中的一部分,用于管理數(shù)據(jù)倉庫。如果不指定的話使用sparksql獲取hive的時候可能會出只能獲取define默認(rèn)倉庫的情況。
  • .config("spark.sql.parquet.writeLegacyFormat", "true"):設(shè)置了寫入 Parquet 格式數(shù)據(jù)時使用傳統(tǒng)格式,確保兼容性和向后兼容性。因?yàn)閟park寫入和hive不同,使用該配置可保證spark寫入hive的數(shù)據(jù),hive能正常訪問。
  • .enableHiveSupport():啟用了對 Hive 的支持,允許在 Spark 中使用 Hive 的功能和特性。
  • .getOrCreate():最后使用 .getOrCreate() 方法創(chuàng)建或獲取 SparkSession 實(shí)例。

總而言之,這段代碼建立了與遠(yuǎn)程 Spark 服務(wù)器的連接,并配置了各種參數(shù)以確保應(yīng)用程序能夠正確地運(yùn)行和訪問集群資源。這是使用 PySpark 開展大數(shù)據(jù)處理工作的第一步,為后續(xù)的數(shù)據(jù)處理和分析創(chuàng)建了必要的環(huán)境和基礎(chǔ)設(shè)施。

步驟 2:加載數(shù)據(jù)

df = spark.sql("SELECT * FROM cjw_data.xiecheng;")

使用 PySpark 的 spark.sql() 函數(shù)執(zhí)行 SQL 查詢,將查詢結(jié)果加載到 DataFrame 中,為后續(xù)的數(shù)據(jù)操作和分析做好準(zhǔn)備。這種靈活性和強(qiáng)大的數(shù)據(jù)處理能力是 PySpark 在大數(shù)據(jù)處理中的關(guān)鍵優(yōu)勢之一。

步驟 3:數(shù)據(jù)清洗與 JSON 格式轉(zhuǎn)換

from pyspark.sql.functions import udf
import json

def json_clean(commentlist):
    try:
        jsonstr = str(commentlist)
        s = jsonstr.replace("'", '"')
        s = '[' + s.replace('}{', '},{') + ']'
        python_obj = json.loads(s, strict=False)
        json_str = json.dumps(python_obj)
        return json_str
    except:
        return None

json_clean_udf = udf(json_clean, StringType())

df = df.withColumn("new_commentlist", json_clean_udf(df["commentlist"]))
newdf = df.withColumn("commentlist", df["new_commentlist"])
newdf = newdf.drop("new_commentlist")

在 PySpark 中定義并應(yīng)用一個用戶自定義函數(shù)(UDF)來對數(shù)據(jù)進(jìn)行清洗和轉(zhuǎn)換。

定義數(shù)據(jù)清洗函數(shù): json_clean() 函數(shù)接收一個名為 commentlist 的參數(shù),這個函數(shù)用于將從數(shù)據(jù)庫中檢索到的評論數(shù)據(jù)進(jìn)行清洗。具體來說:

  • jsonstr = str(commentlist):將傳入的 commentlist 轉(zhuǎn)換為字符串格式。
  • s = jsonstr.replace("'", '"'):將字符串中的單引號替換為雙引號,以滿足 JSON 格式的要求。
  • s = '[' + s.replace('}{', '},{') + ']':在字符串中的每個對象之間添加逗號,并將整個字符串包含在一個數(shù)組中,以滿足 JSON 格式。
  • python_obj = json.loads(s, strict=False):將字符串解析為 Python 對象。
  • json_str = json.dumps(python_obj):將 Python 對象轉(zhuǎn)換回 JSON 字符串格式。
  • return json_str:返回清洗后的 JSON 字符串,如果清洗失敗則返回 None。

創(chuàng)建用戶定義函數(shù)(UDF): 使用 udf() 函數(shù)將 Python 函數(shù) json_clean() 封裝為 PySpark 的用戶定義函數(shù)(UDF),以便在 Spark 中使用。

應(yīng)用函數(shù)到 DataFrame: df.withColumn() 函數(shù)將定義的 UDF 應(yīng)用于 DataFrame 中的 commentlist 列,并將處理后的結(jié)果存儲到名為 new_commentlist 的新列中。

更新 DataFrame: 創(chuàng)建了一個新的 DataFrame newdf,通過在原始 DataFrame df 的基礎(chǔ)上添加了經(jīng)過清洗的 commentlist 列,并刪除了原始的 new_commentlist 列。

步驟 4:保存清洗后的數(shù)據(jù)

newdf.write.mode("overwrite").saveAsTable("cjw_data.xiechengsentiment")
  • 使用 write 方法: write 方法用于將 DataFrame 中的數(shù)據(jù)寫入外部存儲,可以是文件系統(tǒng)或數(shù)據(jù)庫。
  • 指定保存模式(Mode): 在這個例子中,.mode("overwrite") 指定了保存模式為 "overwrite",即如果目標(biāo)位置已存在同名的表或數(shù)據(jù),將覆蓋(重寫)已有的內(nèi)容。
  • 保存為表(saveAsTable): saveAsTable() 方法將 DataFrame 中的數(shù)據(jù)保存為一個新的表,名為 cjw_data.xiechengsentiment。這意味著,如果該表不存在,它將會被創(chuàng)建;如果表已存在,根據(jù)指定的模式進(jìn)行重寫或追加。

步驟 5:統(tǒng)計(jì)數(shù)據(jù)

comment_count = newdf.filter(newdf.commentlist != "[]").count()
total_count = newdf.count()

print("有效長度:", comment_count)
print("總長度:", total_count)
  • 篩選非空數(shù)據(jù)行: newdf.filter(newdf.commentlist != "[]") 這部分代碼使用 filter() 方法篩選出 newdf DataFrame 中 commentlist 列不為空的行。這里使用的條件是 commentlist != "[]",即不等于空列表的行。也就是篩選出之前udf函數(shù)里返還為空的那些解析JSON異常的行。
  • 統(tǒng)計(jì)有效數(shù)據(jù)長度: 通過 count() 方法統(tǒng)計(jì)篩選后的 DataFrame 中的行數(shù),即得到了符合條件的有效數(shù)據(jù)的長度,并將結(jié)果存儲在變量 comment_count 中。
  • 統(tǒng)計(jì)總長度: newdf.count() 統(tǒng)計(jì)了整個 DataFrame 的行數(shù),即獲取了總長度,并將結(jié)果存儲在變量 total_count 中。

截圖示例

清洗后示意圖:

hive表中查看清洗后的數(shù)據(jù):

輸出的字符串中包含了轉(zhuǎn)義字符(例如 \u597d),這些字符實(shí)際上是 Unicode 字符的表示方式,而不是真正的亂碼。 Python 中的 json.dumps() 方法默認(rèn)將非 ASCII 字符串轉(zhuǎn)換為 Unicode 轉(zhuǎn)義序列。這種轉(zhuǎn)義是為了確保 JSON 字符串可以被準(zhǔn)確地傳輸和解析,但可能會在輸出時顯示為 Unicode 轉(zhuǎn)義字符。

JSON 是一種數(shù)據(jù)交換格式,它使用 Unicode 轉(zhuǎn)義序列(比如 \uXXXX)來表示非 ASCII 字符。在默認(rèn)情況下,json.dumps() 將非 ASCII 字符轉(zhuǎn)義為 Unicode 字符以確保其正確性,并且這種轉(zhuǎn)義對于網(wǎng)絡(luò)傳輸和解析是非常重要的

總結(jié)

本文介紹了使用 PySpark 對數(shù)據(jù)進(jìn)行清洗和 JSON 格式轉(zhuǎn)換的過程。通過上述步驟,我們可以連接到遠(yuǎn)程 Spark 服務(wù)器,加載數(shù)據(jù),應(yīng)用自定義函數(shù)對數(shù)據(jù)進(jìn)行清洗和格式轉(zhuǎn)換,并最終保存清洗后的數(shù)據(jù)。這個流程展示了 PySpark 在數(shù)據(jù)處理中的強(qiáng)大功能,特別是在大規(guī)模數(shù)據(jù)集的處理和轉(zhuǎn)換方面的優(yōu)勢。

以上就是使用PySpark實(shí)現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實(shí)踐詳解的詳細(xì)內(nèi)容,更多關(guān)于PySpark數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • python將二維數(shù)組升為一維數(shù)組或二維降為一維方法實(shí)例

    python將二維數(shù)組升為一維數(shù)組或二維降為一維方法實(shí)例

    在實(shí)際應(yīng)用的過程中,遇到列表或是數(shù)組的維數(shù)不同,需要變換的問題,如二維列表/數(shù)組變成了一維列表/數(shù)組,下面這篇文章主要給大家介紹了關(guān)于python將二維數(shù)組升為一維數(shù)組或二維降為一維的相關(guān)資料,需要的朋友可以參考下
    2022-11-11
  • Python實(shí)現(xiàn)合并兩個字典的8種方法

    Python實(shí)現(xiàn)合并兩個字典的8種方法

    Python有多種方法可以通過使用各種函數(shù)和構(gòu)造函數(shù)來合并字典,本文主要介紹了Python實(shí)現(xiàn)合并兩個字典的8種方法,具有一定的參考價值,感興趣的可以了解一下
    2024-07-07
  • Pytorch中retain_graph的坑及解決

    Pytorch中retain_graph的坑及解決

    這篇文章主要介紹了Pytorch中retain_graph的坑及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-02-02
  • 使用k8s部署Django項(xiàng)目的方法步驟

    使用k8s部署Django項(xiàng)目的方法步驟

    這篇文章主要介紹了使用k8s部署Django項(xiàng)目的方法步驟,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-01-01
  • Python條件語句與循環(huán)語句

    Python條件語句與循環(huán)語句

    這篇文章主要介紹了Python條件語句與循環(huán)語句,條件語句就是通過指定的表達(dá)式的運(yùn)行結(jié)果來判斷當(dāng)前是執(zhí)行還是跳過某些指定的語句塊,循環(huán)語句就是對某些語句的重復(fù)執(zhí)行,這個重復(fù)執(zhí)行是通過指定表達(dá)式來控制的,下面來看具體內(nèi)容及續(xù)航管案例吧,需要的朋友可以參考一下
    2021-11-11
  • python語音信號處理詳細(xì)教程

    python語音信號處理詳細(xì)教程

    在深度學(xué)習(xí)中,語音的輸入都是需要處理的,下面這篇文章主要給大家介紹了關(guān)于python語音信號處理的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-01-01
  • 深入了解Django View(視圖系統(tǒng))

    深入了解Django View(視圖系統(tǒng))

    這篇文章主要介紹了簡單了解Django View(視圖系統(tǒng)),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-07-07
  • python編寫暴力破解zip文檔程序的實(shí)例講解

    python編寫暴力破解zip文檔程序的實(shí)例講解

    下面小編就為大家分享一篇python編寫暴力破解zip文檔程序的實(shí)例講解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-04-04
  • Python Pandas兩個表格內(nèi)容模糊匹配的實(shí)現(xiàn)

    Python Pandas兩個表格內(nèi)容模糊匹配的實(shí)現(xiàn)

    模糊查詢大家應(yīng)該都不會陌生,下面這篇文章主要給大家介紹了關(guān)于Python Pandas兩個表格內(nèi)容模糊匹配的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2021-11-11
  • Python實(shí)現(xiàn)半角轉(zhuǎn)全角的方法示例

    Python實(shí)現(xiàn)半角轉(zhuǎn)全角的方法示例

    本文介紹了使用Python實(shí)現(xiàn)半角字符到全角字符的轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2025-01-01

最新評論