Spark JDBC操作MySQL方式詳細(xì)講解
JDBC操作MySQL
在實際的企業(yè)級開發(fā)環(huán)境中,如果數(shù)據(jù)規(guī)模特S別大,此時采用傳統(tǒng)的SQL語句去處理的話一般需要分成很多批次處理,而且很容易造成數(shù)據(jù)庫服務(wù)宕機,且實際的處理過程可能會非常復(fù)雜,通過傳統(tǒng)的Java EE等技術(shù)可能很難或者不方便實現(xiàn)處理算法,此時采用SparkSQL進(jìn)行分布式分析處理就可以非常好的解決該問題,在生產(chǎn)環(huán)境下,一般會在Spark SQL和具體要操作的DB之間加上一個緩沖層次,例如中間使用Redis或者Kafka。
Spark SQL可以通過JDBC從傳統(tǒng)的關(guān)系型數(shù)據(jù)庫中讀寫數(shù)據(jù),讀取數(shù)據(jù)后直接生成的是DataFrame,然后再加上借助于Spark SQL豐富的API來進(jìn)行各種操作。從計算數(shù)據(jù)規(guī)模的角度去講,集群并行訪問數(shù)據(jù)庫數(shù)據(jù),調(diào)用Data Frame Reader的Format(“JDBC”)的方式說明Spark SQL操作的數(shù)據(jù)來源是通過JDBC獲得,JDBC后端一般都是數(shù)據(jù)庫,例如MySQL、Oracle等。
JDBC讀取數(shù)據(jù)方式
單Partition(無并發(fā))
調(diào)用函數(shù)格式:def jdbc(url: String, table: String, properties: Properties): DataFrame
- url:代表數(shù)據(jù)庫的JDBC鏈接地址;
- table:具體要鏈接的數(shù)據(jù)庫;
這種方法是將所有的數(shù)據(jù)放在一個Partition中進(jìn)行操作(即并發(fā)度為1),意味著無論給的資源有多少,只有一個Task會執(zhí)行任務(wù),執(zhí)行效率比較慢,并且容易出現(xiàn)OOM。使用如下,在spark-shell中執(zhí)行:
/*此為代碼格式,實際中使用應(yīng)替換相應(yīng)字段中的內(nèi)容*/ val url = "jdbc:mysql://localhost:/database" val tableName = "table" // 設(shè)置連接用戶&密碼 val prop = new java.util.Properties prop.setProperty("user","username") //實際使用中替換username為相應(yīng)的用戶名 prop.setProperty("password","pwd") //實際使用中替換pwd為相應(yīng)的密碼
根據(jù)Long類型字段分區(qū)
/*此為代碼格式,實際中使用應(yīng)替換相應(yīng)字段中的內(nèi)容*/ def jdbc( url: String, table: String, columnName: String, // 根據(jù)該字段分區(qū),需要為整型,比如 id 等 lowerBound: Long, // 分區(qū)的下界 upperBound: Long, // 分區(qū)的上界 numPartitions: Int, //分區(qū)的個數(shù) connectionProperties: Properties): DataFrame
根據(jù)字段將數(shù)據(jù)進(jìn)行分區(qū),放進(jìn)不同的Partition中,執(zhí)行效率較快,但是只能根據(jù)數(shù)據(jù)字段作為分區(qū)關(guān)鍵字。使用如下:
/*此為代碼格式,實際中使用應(yīng)替換相應(yīng)字段中的內(nèi)容*/ val url = "jdbc:mysql://mysqlHost:3306/database" val tableName = "table" val columnName = "colName" val lowerBound = 1, val upperBound = 10000000, val numPartitions = 10, // 設(shè)置連接用戶&密碼 val prop = new java.util.Properties prop.setProperty("user","username") prop.setProperty("password","pwd")
將字段 colName 中發(fā) 1~10000000 條數(shù)據(jù)分區(qū)到 10 個 Partition 中。
根據(jù)任意類型字段分區(qū)
/*此為代碼格式,實際中使用應(yīng)替換相應(yīng)字段中的內(nèi)容*/ jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
以下使用時間字段進(jìn)行分區(qū):
/*此為代碼格式,實際中使用應(yīng)替換相應(yīng)字段中的內(nèi)容*/ val url = "jdbc:mysql://mysqlHost:3306/database" val tableName = "table" // 設(shè)置連接用戶&密碼 val prop = new java.util.Properties prop.setProperty("user","username") prop.setProperty("password","pwd") /** * 將 9 月 16-12 月 15 三個月的數(shù)據(jù)取出,按時間分為 6 個 partition * 為了減少事例代碼,這里的時間都是寫死的 * modified_time 為時間字段 */ val predicates = Array( "2015-09-16" -> "2015-09-30", "2015-10-01" -> "2015-10-15", "2015-10-16" -> "2015-10-31", "2015-11-01" -> "2015-11-14", "2015-11-15" -> "2015-11-30", "2015-12-01" -> "2015-12-15" ).map { case (start, end) => s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'" }
這種方法可以使用任意字段進(jìn)行分區(qū),比較靈活,適用于各種場景。以MySQL 3000W數(shù)據(jù)量為例,如果單分區(qū)count,若干分鐘就會報OOM;如果分成5~20個分區(qū)后,count操作只需要2s,效率會明顯提高,這里就凸顯出JDBC高并發(fā)的優(yōu)勢。Spark高并發(fā)度可以大幅度提高讀取以及處理數(shù)據(jù)的速度,但是如果設(shè)置過高(大量的Partition同時讀取)也可能會將數(shù)據(jù)源數(shù)據(jù)庫宕掉。
JDBC讀取MySQL數(shù)據(jù)
下面來進(jìn)行實際操作,首先需要配置MySQL
- 免密登陸:
mysql -uroot
- 查看數(shù)據(jù)庫:
show databases;
- 使用MySQL數(shù)據(jù)庫:
use mysql;
修改表格的權(quán)限,目的是為了使其他主機可以遠(yuǎn)程連接 MySQL,通過此命令可以查看訪問用戶允許的主機名。
- 查看所有用戶及其host:
select host, user from user;
- 將相應(yīng)用戶數(shù)據(jù)表中的host字段改成’%':
update user set host="%" where user="root";
- 刷新修改權(quán)限
flush privileges;
通過命令修改host為%,表示任意IP地址都可以登錄。出現(xiàn)ERROR 1062 (23000): Duplicate entry '%-root' for key 'PRIMARY'
,是因為 user+host 是主鍵,不能重復(fù),可以不用理會。也可通過以下命令刪除user 為空的內(nèi)容來解決:delete from user where user='';
。
在MySQL創(chuàng)建數(shù)據(jù)庫和表格,插入數(shù)據(jù),查看:
create database test; //創(chuàng)建數(shù)據(jù)庫test use test; //進(jìn)入數(shù)據(jù)庫test create table people( name varchar(12), age int); //創(chuàng)建表格people并構(gòu)建結(jié)構(gòu) insert into people values ("Andy",30),("Justin",19),("Dela",25),("Magi",20),("Pule",21),("Mike",12); //向people表中插入數(shù)據(jù) select * from people; //輸出people表中全部數(shù)據(jù)
編寫代碼讀取MySQL表中數(shù)據(jù):
//導(dǎo)入依賴環(huán)境 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, SQLContext} import java.util.Properties val url = "jdbc:mysql://localhost/test" //MySQL地址及數(shù)據(jù)庫 val username = "root" //用戶名 val sqlContext = new SQLContext(sc) sc.setLogLevel("WARN") val uri = url + "?user=" + username + "&useUnicode=true&characterEncoding=UTF-8" //設(shè)置讀取路徑及用戶名 val properties = new Properties() //創(chuàng)建JDBC連接信息 properties.put("user","root") properties.put("driver", "com.mysql.jdbc.Driver") val df_test: DataFrame = spark.sqlContext.read.jdbc(uri, "people", properties) //讀取數(shù)據(jù) df_test.select("name","age").collect().foreach(row => { //輸出數(shù)據(jù) println("name " + row(0) + ", age" + row(1)) }) df_test.write.mode("append").jdbc(uri,"people",properties) //向people表中寫入讀出的數(shù)據(jù),相當(dāng)于people表中有兩份一樣的數(shù)據(jù)
到此這篇關(guān)于Spark JDBC操作MySQL方式詳細(xì)講解的文章就介紹到這了,更多相關(guān)Spark JDBC操作MySQL內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
GC調(diào)優(yōu)實戰(zhàn)之過早提升Premature?Promotion
這篇文章主要為大家介紹了GC調(diào)優(yōu)實戰(zhàn)之過早提升Premature?Promotion2022-01-01Eclipse的Debug調(diào)試技巧大全(總結(jié))
這篇文章主要介紹了Eclipse的Debug調(diào)試技巧大全(總結(jié)),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-12-12解決mybatis竟然報Invalid value for getInt()的問題
使用mybatis遇到一個非常奇葩的問題,總是報Invalid value for getInt()的問題,怎么解決呢?下面小編通過場景分析給大家代來了mybatis報Invalid value for getInt()的解決方法,感興趣的朋友參考下吧2021-10-10Springboot實現(xiàn)Java郵件任務(wù)過程解析
這篇文章主要介紹了Springboot實現(xiàn)Java郵件任務(wù)過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-09-09SSH框架網(wǎng)上商城項目第13戰(zhàn)之Struts2文件上傳功能
這篇文章主要為大家詳細(xì)介紹了SSH框架網(wǎng)上商城項目第13戰(zhàn)之Struts2文件上傳功能的相關(guān)資料,感興趣的小伙伴們可以參考一下2016-06-06JAVA中堆、棧,靜態(tài)方法和非靜態(tài)方法的速度問題
這篇文章主要介紹了JAVA中堆、棧,靜態(tài)方法和非靜態(tài)方法的速度問題,堆和棧得速度性能分析多角度給大家分析,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2018-08-08Springboot集成mybatis實現(xiàn)多數(shù)據(jù)源配置詳解流程
在日常開發(fā)中,若遇到多個數(shù)據(jù)源的需求,怎么辦呢?通過springboot集成mybatis實現(xiàn)多數(shù)據(jù)源配置,簡單嘗試一下,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06SpringBoot對靜態(tài)資源的映射規(guī)則詳解解讀
這篇文章主要介紹了SpringBoot對靜態(tài)資源的映射規(guī)則詳解解讀,在Spring Boot中,映射規(guī)則是用來定義URL與控制器方法之間的映射關(guān)系的,通過映射規(guī)則,可以將特定的URL請求映射到相應(yīng)的控制器方法上,從而實現(xiàn)請求的處理和響應(yīng)的返回,需要的朋友可以參考下2023-10-10