欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用PySpark實(shí)現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實(shí)踐詳解

 更新時(shí)間:2023年12月15日 08:21:24   作者:冷月半明  
在大數(shù)據(jù)處理中,PySpark?提供了強(qiáng)大的工具來(lái)處理海量數(shù)據(jù),特別是在數(shù)據(jù)清洗和轉(zhuǎn)換方面,本文將介紹如何使用?PySpark?進(jìn)行數(shù)據(jù)清洗,并將數(shù)據(jù)格式轉(zhuǎn)換為?JSON?格式的實(shí)踐,感興趣的可以了解下

簡(jiǎn)介

PySpark 是 Apache Spark 的 Python API,可用于處理大規(guī)模數(shù)據(jù)集。它提供了豐富的功能和庫(kù),使得數(shù)據(jù)清洗和轉(zhuǎn)換變得更加高效和便捷。

代碼實(shí)踐

本文將以一個(gè)示例數(shù)據(jù)集為例,演示如何使用 PySpark 對(duì)數(shù)據(jù)進(jìn)行清洗和轉(zhuǎn)換。以下是代碼實(shí)現(xiàn)的主要步驟:

步驟 1:連接到遠(yuǎn)程 Spark 服務(wù)器

# Author: 冷月半明
# Date: 2023/12/14
# Description: This script does XYZ.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("RemoteSparkConnection") \
    .master("yarn") \
    .config("spark.pyspark.python", "/opt/apps/anaconda3/envs/myspark/bin/python") \
    .config("spark.sql.warehouse.dir", "/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://node01:9083") \
    .config("spark.sql.parquet.writeLegacyFormat", "true") \
    .enableHiveSupport() \
    .getOrCreate()

當(dāng)使用 PySpark 進(jìn)行大數(shù)據(jù)處理時(shí),首先需要建立與 Spark 集群的連接。在這段代碼中,我們使用了 SparkSession 類來(lái)創(chuàng)建一個(gè)與遠(yuǎn)程 Spark 服務(wù)器的連接,并設(shè)置了連接所需的參數(shù)。

導(dǎo)入必要的庫(kù): 我們首先導(dǎo)入了 SparkSession 類,這是 PySpark 中用于管理 Spark 應(yīng)用程序的入口點(diǎn)。

建立連接: 在接下來(lái)的代碼中,我們使用 SparkSession.builder 來(lái)創(chuàng)建一個(gè) SparkSession 對(duì)象。這個(gè)對(duì)象允許我們?cè)O(shè)置應(yīng)用程序的名稱、集群的主節(jié)點(diǎn)、配置項(xiàng)等參數(shù)。在這個(gè)例子中:

  • .appName("RemoteSparkConnection"):為我們的 Spark 應(yīng)用程序設(shè)置了一個(gè)名稱,這有助于在集群中識(shí)別應(yīng)用程序。
  • .master("yarn"):指定 Spark 應(yīng)用程序的主節(jié)點(diǎn),這里使用的是 YARN 資源管理器,用于分配和管理集群資源。
  • .config("spark.pyspark.python", "/opt/apps/anaconda3/envs/myspark/bin/python"):設(shè)置了 PySpark 使用的 Python 解釋器路徑,確保在集群中使用正確的 Python 環(huán)境。(因?yàn)樗褂玫沫h(huán)境為anaconda創(chuàng)建的虛擬環(huán)境,先進(jìn)入虛擬環(huán)境,然后使用which python查看解釋器位置)
  • .config("spark.sql.warehouse.dir", "/hive/warehouse"):指定了 Spark SQL 的倉(cāng)庫(kù)目錄,這對(duì)于數(shù)據(jù)存儲(chǔ)和管理非常重要。如果不指定的話使用sparksql獲取hive的時(shí)候可能會(huì)出現(xiàn)問(wèn)題。
  • .config("hive.metastore.uris", "thrift://node01:9083"):配置 Hive 元數(shù)據(jù)存儲(chǔ)的 URI,Hive 是 Hadoop 生態(tài)系統(tǒng)中的一部分,用于管理數(shù)據(jù)倉(cāng)庫(kù)。如果不指定的話使用sparksql獲取hive的時(shí)候可能會(huì)出只能獲取define默認(rèn)倉(cāng)庫(kù)的情況。
  • .config("spark.sql.parquet.writeLegacyFormat", "true"):設(shè)置了寫入 Parquet 格式數(shù)據(jù)時(shí)使用傳統(tǒng)格式,確保兼容性和向后兼容性。因?yàn)閟park寫入和hive不同,使用該配置可保證spark寫入hive的數(shù)據(jù),hive能正常訪問(wèn)。
  • .enableHiveSupport():?jiǎn)⒂昧藢?duì) Hive 的支持,允許在 Spark 中使用 Hive 的功能和特性。
  • .getOrCreate():最后使用 .getOrCreate() 方法創(chuàng)建或獲取 SparkSession 實(shí)例。

總而言之,這段代碼建立了與遠(yuǎn)程 Spark 服務(wù)器的連接,并配置了各種參數(shù)以確保應(yīng)用程序能夠正確地運(yùn)行和訪問(wèn)集群資源。這是使用 PySpark 開展大數(shù)據(jù)處理工作的第一步,為后續(xù)的數(shù)據(jù)處理和分析創(chuàng)建了必要的環(huán)境和基礎(chǔ)設(shè)施。

步驟 2:加載數(shù)據(jù)

df = spark.sql("SELECT * FROM cjw_data.xiecheng;")

使用 PySpark 的 spark.sql() 函數(shù)執(zhí)行 SQL 查詢,將查詢結(jié)果加載到 DataFrame 中,為后續(xù)的數(shù)據(jù)操作和分析做好準(zhǔn)備。這種靈活性和強(qiáng)大的數(shù)據(jù)處理能力是 PySpark 在大數(shù)據(jù)處理中的關(guān)鍵優(yōu)勢(shì)之一。

步驟 3:數(shù)據(jù)清洗與 JSON 格式轉(zhuǎn)換

from pyspark.sql.functions import udf
import json

def json_clean(commentlist):
    try:
        jsonstr = str(commentlist)
        s = jsonstr.replace("'", '"')
        s = '[' + s.replace('}{', '},{') + ']'
        python_obj = json.loads(s, strict=False)
        json_str = json.dumps(python_obj)
        return json_str
    except:
        return None

json_clean_udf = udf(json_clean, StringType())

df = df.withColumn("new_commentlist", json_clean_udf(df["commentlist"]))
newdf = df.withColumn("commentlist", df["new_commentlist"])
newdf = newdf.drop("new_commentlist")

在 PySpark 中定義并應(yīng)用一個(gè)用戶自定義函數(shù)(UDF)來(lái)對(duì)數(shù)據(jù)進(jìn)行清洗和轉(zhuǎn)換。

定義數(shù)據(jù)清洗函數(shù): json_clean() 函數(shù)接收一個(gè)名為 commentlist 的參數(shù),這個(gè)函數(shù)用于將從數(shù)據(jù)庫(kù)中檢索到的評(píng)論數(shù)據(jù)進(jìn)行清洗。具體來(lái)說(shuō):

  • jsonstr = str(commentlist):將傳入的 commentlist 轉(zhuǎn)換為字符串格式。
  • s = jsonstr.replace("'", '"'):將字符串中的單引號(hào)替換為雙引號(hào),以滿足 JSON 格式的要求。
  • s = '[' + s.replace('}{', '},{') + ']':在字符串中的每個(gè)對(duì)象之間添加逗號(hào),并將整個(gè)字符串包含在一個(gè)數(shù)組中,以滿足 JSON 格式。
  • python_obj = json.loads(s, strict=False):將字符串解析為 Python 對(duì)象。
  • json_str = json.dumps(python_obj):將 Python 對(duì)象轉(zhuǎn)換回 JSON 字符串格式。
  • return json_str:返回清洗后的 JSON 字符串,如果清洗失敗則返回 None。

創(chuàng)建用戶定義函數(shù)(UDF): 使用 udf() 函數(shù)將 Python 函數(shù) json_clean() 封裝為 PySpark 的用戶定義函數(shù)(UDF),以便在 Spark 中使用。

應(yīng)用函數(shù)到 DataFrame: df.withColumn() 函數(shù)將定義的 UDF 應(yīng)用于 DataFrame 中的 commentlist 列,并將處理后的結(jié)果存儲(chǔ)到名為 new_commentlist 的新列中。

更新 DataFrame: 創(chuàng)建了一個(gè)新的 DataFrame newdf,通過(guò)在原始 DataFrame df 的基礎(chǔ)上添加了經(jīng)過(guò)清洗的 commentlist 列,并刪除了原始的 new_commentlist 列。

步驟 4:保存清洗后的數(shù)據(jù)

newdf.write.mode("overwrite").saveAsTable("cjw_data.xiechengsentiment")
  • 使用 write 方法: write 方法用于將 DataFrame 中的數(shù)據(jù)寫入外部存儲(chǔ),可以是文件系統(tǒng)或數(shù)據(jù)庫(kù)。
  • 指定保存模式(Mode): 在這個(gè)例子中,.mode("overwrite") 指定了保存模式為 "overwrite",即如果目標(biāo)位置已存在同名的表或數(shù)據(jù),將覆蓋(重寫)已有的內(nèi)容。
  • 保存為表(saveAsTable): saveAsTable() 方法將 DataFrame 中的數(shù)據(jù)保存為一個(gè)新的表,名為 cjw_data.xiechengsentiment。這意味著,如果該表不存在,它將會(huì)被創(chuàng)建;如果表已存在,根據(jù)指定的模式進(jìn)行重寫或追加。

步驟 5:統(tǒng)計(jì)數(shù)據(jù)

comment_count = newdf.filter(newdf.commentlist != "[]").count()
total_count = newdf.count()

print("有效長(zhǎng)度:", comment_count)
print("總長(zhǎng)度:", total_count)
  • 篩選非空數(shù)據(jù)行: newdf.filter(newdf.commentlist != "[]") 這部分代碼使用 filter() 方法篩選出 newdf DataFrame 中 commentlist 列不為空的行。這里使用的條件是 commentlist != "[]",即不等于空列表的行。也就是篩選出之前udf函數(shù)里返還為空的那些解析JSON異常的行。
  • 統(tǒng)計(jì)有效數(shù)據(jù)長(zhǎng)度: 通過(guò) count() 方法統(tǒng)計(jì)篩選后的 DataFrame 中的行數(shù),即得到了符合條件的有效數(shù)據(jù)的長(zhǎng)度,并將結(jié)果存儲(chǔ)在變量 comment_count 中。
  • 統(tǒng)計(jì)總長(zhǎng)度: newdf.count() 統(tǒng)計(jì)了整個(gè) DataFrame 的行數(shù),即獲取了總長(zhǎng)度,并將結(jié)果存儲(chǔ)在變量 total_count 中。

截圖示例

清洗后示意圖:

hive表中查看清洗后的數(shù)據(jù):

輸出的字符串中包含了轉(zhuǎn)義字符(例如 \u597d),這些字符實(shí)際上是 Unicode 字符的表示方式,而不是真正的亂碼。 Python 中的 json.dumps() 方法默認(rèn)將非 ASCII 字符串轉(zhuǎn)換為 Unicode 轉(zhuǎn)義序列。這種轉(zhuǎn)義是為了確保 JSON 字符串可以被準(zhǔn)確地傳輸和解析,但可能會(huì)在輸出時(shí)顯示為 Unicode 轉(zhuǎn)義字符。

JSON 是一種數(shù)據(jù)交換格式,它使用 Unicode 轉(zhuǎn)義序列(比如 \uXXXX)來(lái)表示非 ASCII 字符。在默認(rèn)情況下,json.dumps() 將非 ASCII 字符轉(zhuǎn)義為 Unicode 字符以確保其正確性,并且這種轉(zhuǎn)義對(duì)于網(wǎng)絡(luò)傳輸和解析是非常重要的

總結(jié)

本文介紹了使用 PySpark 對(duì)數(shù)據(jù)進(jìn)行清洗和 JSON 格式轉(zhuǎn)換的過(guò)程。通過(guò)上述步驟,我們可以連接到遠(yuǎn)程 Spark 服務(wù)器,加載數(shù)據(jù),應(yīng)用自定義函數(shù)對(duì)數(shù)據(jù)進(jìn)行清洗和格式轉(zhuǎn)換,并最終保存清洗后的數(shù)據(jù)。這個(gè)流程展示了 PySpark 在數(shù)據(jù)處理中的強(qiáng)大功能,特別是在大規(guī)模數(shù)據(jù)集的處理和轉(zhuǎn)換方面的優(yōu)勢(shì)。

以上就是使用PySpark實(shí)現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實(shí)踐詳解的詳細(xì)內(nèi)容,更多關(guān)于PySpark數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • python將二維數(shù)組升為一維數(shù)組或二維降為一維方法實(shí)例

    python將二維數(shù)組升為一維數(shù)組或二維降為一維方法實(shí)例

    在實(shí)際應(yīng)用的過(guò)程中,遇到列表或是數(shù)組的維數(shù)不同,需要變換的問(wèn)題,如二維列表/數(shù)組變成了一維列表/數(shù)組,下面這篇文章主要給大家介紹了關(guān)于python將二維數(shù)組升為一維數(shù)組或二維降為一維的相關(guān)資料,需要的朋友可以參考下
    2022-11-11
  • Python實(shí)現(xiàn)合并兩個(gè)字典的8種方法

    Python實(shí)現(xiàn)合并兩個(gè)字典的8種方法

    Python有多種方法可以通過(guò)使用各種函數(shù)和構(gòu)造函數(shù)來(lái)合并字典,本文主要介紹了Python實(shí)現(xiàn)合并兩個(gè)字典的8種方法,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-07-07
  • Pytorch中retain_graph的坑及解決

    Pytorch中retain_graph的坑及解決

    這篇文章主要介紹了Pytorch中retain_graph的坑及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-02-02
  • 使用k8s部署Django項(xiàng)目的方法步驟

    使用k8s部署Django項(xiàng)目的方法步驟

    這篇文章主要介紹了使用k8s部署Django項(xiàng)目的方法步驟,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2019-01-01
  • Python條件語(yǔ)句與循環(huán)語(yǔ)句

    Python條件語(yǔ)句與循環(huán)語(yǔ)句

    這篇文章主要介紹了Python條件語(yǔ)句與循環(huán)語(yǔ)句,條件語(yǔ)句就是通過(guò)指定的表達(dá)式的運(yùn)行結(jié)果來(lái)判斷當(dāng)前是執(zhí)行還是跳過(guò)某些指定的語(yǔ)句塊,循環(huán)語(yǔ)句就是對(duì)某些語(yǔ)句的重復(fù)執(zhí)行,這個(gè)重復(fù)執(zhí)行是通過(guò)指定表達(dá)式來(lái)控制的,下面來(lái)看具體內(nèi)容及續(xù)航管案例吧,需要的朋友可以參考一下
    2021-11-11
  • python語(yǔ)音信號(hào)處理詳細(xì)教程

    python語(yǔ)音信號(hào)處理詳細(xì)教程

    在深度學(xué)習(xí)中,語(yǔ)音的輸入都是需要處理的,下面這篇文章主要給大家介紹了關(guān)于python語(yǔ)音信號(hào)處理的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-01-01
  • 深入了解Django View(視圖系統(tǒng))

    深入了解Django View(視圖系統(tǒng))

    這篇文章主要介紹了簡(jiǎn)單了解Django View(視圖系統(tǒng)),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-07-07
  • python編寫暴力破解zip文檔程序的實(shí)例講解

    python編寫暴力破解zip文檔程序的實(shí)例講解

    下面小編就為大家分享一篇python編寫暴力破解zip文檔程序的實(shí)例講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2018-04-04
  • Python Pandas兩個(gè)表格內(nèi)容模糊匹配的實(shí)現(xiàn)

    Python Pandas兩個(gè)表格內(nèi)容模糊匹配的實(shí)現(xiàn)

    模糊查詢大家應(yīng)該都不會(huì)陌生,下面這篇文章主要給大家介紹了關(guān)于Python Pandas兩個(gè)表格內(nèi)容模糊匹配的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2021-11-11
  • Python實(shí)現(xiàn)半角轉(zhuǎn)全角的方法示例

    Python實(shí)現(xiàn)半角轉(zhuǎn)全角的方法示例

    本文介紹了使用Python實(shí)現(xiàn)半角字符到全角字符的轉(zhuǎn)換,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2025-01-01

最新評(píng)論