pyspark對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫(xiě)的實(shí)現(xiàn)
pyspark是Spark對(duì)Python的api接口,可以在Python環(huán)境中通過(guò)調(diào)用pyspark模塊來(lái)操作spark,完成大數(shù)據(jù)框架下的數(shù)據(jù)分析與挖掘。其中,數(shù)據(jù)的讀寫(xiě)是基礎(chǔ)操作,pyspark的子模塊pyspark.sql 可以完成大部分類(lèi)型的數(shù)據(jù)讀寫(xiě)。文本介紹在pyspark中讀寫(xiě)Mysql數(shù)據(jù)庫(kù)。
1 軟件版本
在Python中使用Spark,需要安裝配置Spark,這里跳過(guò)配置的過(guò)程,給出運(yùn)行環(huán)境和相關(guān)程序版本信息。
- win10 64bit
- java 13.0.1
- spark 3.0
- python 3.8
- pyspark 3.0
- pycharm 2019.3.4
2 環(huán)境配置
pyspark連接Mysql是通過(guò)java實(shí)現(xiàn)的,所以需要下載連接Mysql的jar包。

選擇下載Connector/J,然后選擇操作系統(tǒng)為Platform Independent,下載壓縮包到本地。

然后解壓文件,將其中的jar包mysql-connector-java-8.0.19.jar放入spark的安裝目錄下,例如D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars。

環(huán)境配置完成!
3 讀取Mysql
腳本如下:
from pyspark.sql import SQLContext, SparkSession
if __name__ == '__main__':
# spark 初始化
spark = SparkSession. \
Builder(). \
appName('sql'). \
master('local'). \
getOrCreate()
# mysql 配置(需要修改)
prop = {'user': 'xxx',
'password': 'xxx',
'driver': 'com.mysql.cj.jdbc.Driver'}
# database 地址(需要修改)
url = 'jdbc:mysql://host:port/database'
# 讀取表
data = spark.read.jdbc(url=url, table='tb_newCity', properties=prop)
# 打印data數(shù)據(jù)類(lèi)型
print(type(data))
# 展示數(shù)據(jù)
data.show()
# 關(guān)閉spark會(huì)話(huà)
spark.stop()
- 注意點(diǎn):
prop參數(shù)需要根據(jù)實(shí)際情況修改,文中用戶(hù)名和密碼用xxx代替了,driver參數(shù)也可以不需要;url參數(shù)需要根據(jù)實(shí)際情況修改,格式為jdbc:mysql://主機(jī):端口/數(shù)據(jù)庫(kù);- 通過(guò)調(diào)用方法
read.jdbc進(jìn)行讀取,返回的數(shù)據(jù)類(lèi)型為spark DataFrame;
運(yùn)行腳本,輸出如下:

4 寫(xiě)入Mysql
腳本如下:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
if __name__ == '__main__':
# spark 初始化
sc = SparkContext(master='local', appName='sql')
spark = SQLContext(sc)
# mysql 配置(需要修改)
prop = {'user': 'xxx',
'password': 'xxx',
'driver': 'com.mysql.cj.jdbc.Driver'}
# database 地址(需要修改)
url = 'jdbc:mysql://host:port/database'
# 創(chuàng)建spark DataFrame
# 方式1:list轉(zhuǎn)spark DataFrame
l = [(1, 12), (2, 22)]
# 創(chuàng)建并指定列名
list_df = spark.createDataFrame(l, schema=['id', 'value'])
# 方式2:rdd轉(zhuǎn)spark DataFrame
rdd = sc.parallelize(l) # rdd
col_names = Row('id', 'value') # 列名
tmp = rdd.map(lambda x: col_names(*x)) # 設(shè)置列名
rdd_df = spark.createDataFrame(tmp)
# 方式3:pandas dataFrame 轉(zhuǎn)spark DataFrame
df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
pd_df = spark.createDataFrame(df)
# 寫(xiě)入數(shù)據(jù)庫(kù)
pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
# 關(guān)閉spark會(huì)話(huà)
sc.stop()
注意點(diǎn):
prop和url參數(shù)同樣需要根據(jù)實(shí)際情況修改;
寫(xiě)入數(shù)據(jù)庫(kù)要求的對(duì)象類(lèi)型是spark DataFrame,提供了三種常見(jiàn)數(shù)據(jù)類(lèi)型轉(zhuǎn)spark DataFrame的方法;
通過(guò)調(diào)用write.jdbc方法進(jìn)行寫(xiě)入,其中的model參數(shù)控制寫(xiě)入數(shù)據(jù)的行為。
| model | 參數(shù)解釋 |
|---|---|
| error | 默認(rèn)值,原表存在則報(bào)錯(cuò) |
| ignore | 原表存在,不報(bào)錯(cuò)且不寫(xiě)入數(shù)據(jù) |
| append | 新數(shù)據(jù)在原表行末追加 |
| overwrite | 覆蓋原表 |
5 常見(jiàn)報(bào)錯(cuò)
Access denied for user …

原因:mysql配置參數(shù)出錯(cuò)
解決辦法:檢查user,password拼寫(xiě),檢查賬號(hào)密碼是否正確,用其他工具測(cè)試mysql是否能正常連接,做對(duì)比檢查。
No suitable driver

原因:沒(méi)有配置運(yùn)行環(huán)境
解決辦法:下載jar包進(jìn)行配置,具體過(guò)程參考本文的2 環(huán)境配置。
到此這篇關(guān)于pyspark對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫(xiě)的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)pyspark Mysql讀寫(xiě)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python新手學(xué)習(xí)函數(shù)默認(rèn)參數(shù)設(shè)置
在本篇文章里小編給大家分享的是關(guān)于Python新手學(xué)習(xí)函數(shù)默認(rèn)參數(shù)設(shè)置的相關(guān)知識(shí)點(diǎn),需要的朋友們可以參考下。2020-06-06
Python常問(wèn)的100個(gè)面試問(wèn)題匯總(上篇)
這篇文章主要介紹了Python常問(wèn)的100個(gè)面試問(wèn)題匯總(上篇),文章內(nèi)容詳細(xì),簡(jiǎn)單易懂,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2023-01-01
每個(gè) Python 開(kāi)發(fā)者都應(yīng)該知道的7種好用工具(效率翻倍)
Python 從一種小的開(kāi)源語(yǔ)言開(kāi)始,到現(xiàn)在,它已經(jīng)成為開(kāi)發(fā)者很受歡迎的編程語(yǔ)言之一。這篇文章主要介紹了每個(gè) Python 開(kāi)發(fā)者都應(yīng)該知道的7種好用工具(效率翻倍),需要的朋友可以參考下2021-03-03
解決90%的常見(jiàn)問(wèn)題的8個(gè)python NumPy函數(shù)
這篇文章主要為大家介紹了解決90%的常見(jiàn)問(wèn)題的8個(gè)python NumPy函數(shù)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06
python中字典dict排序sorted的實(shí)現(xiàn)
本文主要介紹了python中字典dict排序sorted的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05
python3實(shí)現(xiàn)多線(xiàn)程聊天室
這篇文章主要為大家詳細(xì)介紹了python3實(shí)現(xiàn)多線(xiàn)程聊天室,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-12-12

