SparkSQL使用快速入門(mén)
一、SparkSQL的進(jìn)化之路
1.0以前: Shark
1.1.x開(kāi)始:SparkSQL(只是測(cè)試性的) SQL
1.3.x: SparkSQL(正式版本)+Dataframe
1.5.x: SparkSQL 鎢絲計(jì)劃
1.6.x: SparkSQL+DataFrame+DataSet(測(cè)試版本)
2.x:
- SparkSQL+DataFrame+DataSet(正式版本)
- SparkSQL:還有其他的優(yōu)化
- StructuredStreaming(DataSet)
Spark on Hive和Hive on Spark
- Spark on Hive:Hive只作為儲(chǔ)存角色,Spark負(fù)責(zé)sql解析優(yōu)化,執(zhí)行。
- Hive on Spark:Hive即作為存儲(chǔ)又負(fù)責(zé)sql的解析優(yōu)化,Spark負(fù)責(zé)執(zhí)行。
二、認(rèn)識(shí)SparkSQL
2.1 什么是SparkSQL?
spark SQL是spark的一個(gè)模塊,主要用于進(jìn)行結(jié)構(gòu)化數(shù)據(jù)的處理。它提供的最核心的編程抽象就是DataFrame。
2.2 SparkSQL的作用
提供一個(gè)編程抽象(DataFrame) 并且作為分布式 SQL查詢引擎
DataFrame:它可以根據(jù)很多源進(jìn)行構(gòu)建,包括:結(jié)構(gòu)化的數(shù)據(jù)文件,hive中的表,外部的關(guān)系型數(shù)據(jù)庫(kù),以及RDD
2.3 運(yùn)行原理
將Spark SQL轉(zhuǎn)化為RDD,然后提交到集群執(zhí)行
2.4 特點(diǎn)
(1)容易整合
(2)統(tǒng)一的數(shù)據(jù)訪問(wèn)方式
(3)兼容 Hive
(4)標(biāo)準(zhǔn)的數(shù)據(jù)連接
2.5 SparkSession
SparkSession是Spark 2.0引如的新概念。SparkSession為用戶提供了統(tǒng)一的切入點(diǎn),來(lái)讓用戶學(xué)習(xí)spark的各項(xiàng)功能。
在spark的早期版本中,SparkContext是spark的主要切入點(diǎn),由于RDD是主要的API,我們通過(guò)sparkcontext來(lái)創(chuàng)建和操作RDD。對(duì)于每個(gè)其他的API,我們需要使用不同的context。例如,對(duì)于Streming,我們需要使用StreamingContext;對(duì)于sql,使用sqlContext;對(duì)于Hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標(biāo)準(zhǔn)的API,就需要為他們建立接入點(diǎn)。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點(diǎn),SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向后兼容,SQLContext和HiveContext也被保存下來(lái)。
SparkSession實(shí)質(zhì)上是SQLContext和HiveContext的組合(未來(lái)可能還會(huì)加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內(nèi)部封裝了sparkContext,所以計(jì)算實(shí)際上是由sparkContext完成的。
特點(diǎn):
---- 為用戶提供一個(gè)統(tǒng)一的切入點(diǎn)使用Spark 各項(xiàng)功能
---- 允許用戶通過(guò)它調(diào)用 DataFrame 和 Dataset 相關(guān) API 來(lái)編寫(xiě)程序
---- 減少了用戶需要了解的一些概念,可以很容易的與 Spark 進(jìn)行交互
---- 與 Spark 交互之時(shí)不需要顯示的創(chuàng)建 SparkConf, SparkContext 以及 SQlContext,這些對(duì)象已經(jīng)封閉在 SparkSession 中
2.6 DataFrames
在Spark中,DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫(kù)中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結(jié)構(gòu)信息,從而對(duì)藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對(duì)性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)。反觀RDD,由于無(wú)從得知所存數(shù)據(jù)元素的具體內(nèi)部結(jié)構(gòu),Spark Core只能在stage層面進(jìn)行簡(jiǎn)單、通用的流水線優(yōu)化。
三、RDD轉(zhuǎn)換成為DataFrame
使用spark1.x版本的方式
測(cè)試數(shù)據(jù)目錄:spark/examples/src/main/resources(spark的安裝目錄里面)
people.txt
3.1通過(guò)case class創(chuàng)建DataFrames(反射)
//定義case class,相當(dāng)于表結(jié)構(gòu) case class People(var name:String,var age:Int) object TestDataFrame1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local") val sc = new SparkContext(conf) val context = new SQLContext(sc) // 將本地的數(shù)據(jù)讀入 RDD, 并將 RDD 與 case class 關(guān)聯(lián) val peopleRDD = sc.textFile("E:\\666\\people.txt") .map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt)) import context.implicits._ // 將RDD 轉(zhuǎn)換成 DataFrames val df = peopleRDD.toDF //將DataFrames創(chuàng)建成一個(gè)臨時(shí)的視圖 df.createOrReplaceTempView("people") //使用SQL語(yǔ)句進(jìn)行查詢 context.sql("select * from people").show() } }
運(yùn)行結(jié)果
3.2通過(guò)structType創(chuàng)建DataFrames(編程接口)
object TestDataFrame2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val fileRDD = sc.textFile("E:\\666\\people.txt") // 將 RDD 數(shù)據(jù)映射成 Row,需要 import org.apache.spark.sql.Row val rowRDD: RDD[Row] = fileRDD.map(line => { val fields = line.split(",") Row(fields(0), fields(1).trim.toInt) }) // 創(chuàng)建 StructType 來(lái)定義結(jié)構(gòu) val structType: StructType = StructType( //字段名,字段類型,是否可以為空 StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil ) /** * rows: java.util.List[Row], * schema: StructType * */ val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType) df.createOrReplaceTempView("people") sqlContext.sql("select * from people").show() } }
運(yùn)行結(jié)果
3.3通過(guò) json 文件創(chuàng)建DataFrames
object TestDataFrame3 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df: DataFrame = sqlContext.read.json("E:\\666\\people.json") df.createOrReplaceTempView("people") sqlContext.sql("select * from people").show() } }
四、DataFrame的read和save和savemode
4.1 數(shù)據(jù)的讀取
object TestRead { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //方式一 val df1 = sqlContext.read.json("E:\\666\\people.json") val df2 = sqlContext.read.parquet("E:\\666\\users.parquet") //方式二 val df3 = sqlContext.read.format("json").load("E:\\666\\people.json") val df4 = sqlContext.read.format("parquet").load("E:\\666\\users.parquet") //方式三,默認(rèn)是parquet格式 val df5 = sqlContext.load("E:\\666\\users.parquet") } }
4.2 數(shù)據(jù)的保存
object TestSave { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df1 = sqlContext.read.json("E:\\666\\people.json") //方式一 df1.write.json("E:\\111") df1.write.parquet("E:\\222") //方式二 df1.write.format("json").save("E:\\333") df1.write.format("parquet").save("E:\\444") //方式三 df1.write.save("E:\\555") } }
4.3 數(shù)據(jù)的保存模式
使用mode
df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")
五、數(shù)據(jù)源
5.1 數(shù)據(jù)源只json
參考4.1
5.2 數(shù)據(jù)源之parquet
參考4.1
5.3 數(shù)據(jù)源之Mysql
object TestMysql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestMysql").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val url = "jdbc:mysql://192.168.123.102:3306/hivedb" val table = "dbs" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","root") //需要傳入Mysql的URL、表明、properties(連接數(shù)據(jù)庫(kù)的用戶名密碼) val df = sqlContext.read.jdbc(url,table,properties) df.createOrReplaceTempView("dbs") sqlContext.sql("select * from dbs").show() } }
運(yùn)行結(jié)果
5.3 數(shù)據(jù)源之Hive
(1)準(zhǔn)備工作
在pom.xml文件中添加依賴
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.3.0</version> </dependency>
開(kāi)發(fā)環(huán)境則把resource文件夾下添加hive-site.xml文件,集群環(huán)境把hive的配置文件要發(fā)到$SPARK_HOME/conf目錄下
<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value> <description>JDBC connect string for a JDBC metastore</description> <!-- 如果 mysql 和 hive 在同一個(gè)服務(wù)器節(jié)點(diǎn),那么請(qǐng)更改 hadoop02 為 localhost --> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>root</value> <description>password to use against metastore database</description> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/hive/warehouse</value> <description>hive default warehouse, if nessecory, change it</description> </property> </configuration>
(2)測(cè)試代碼
object TestHive { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) sqlContext.sql("select * from myhive.student").show() } }
運(yùn)行結(jié)果
六、SparkSQL 的元數(shù)據(jù)
1.1元數(shù)據(jù)的狀態(tài)
SparkSQL 的元數(shù)據(jù)的狀態(tài)有兩種:
1、in_memory,用完了元數(shù)據(jù)也就丟了
2、hive , 通過(guò)hive去保存的,也就是說(shuō),hive的元數(shù)據(jù)存在哪兒,它的元數(shù)據(jù)也就存在哪兒。
換句話說(shuō),SparkSQL的數(shù)據(jù)倉(cāng)庫(kù)在建立在Hive之上實(shí)現(xiàn)的。我們要用SparkSQL去構(gòu)建數(shù)據(jù)倉(cāng)庫(kù)的時(shí)候,必須依賴于Hive。
2.2Spark-SQL腳本
如果用戶直接運(yùn)行bin/spark-sql命令。會(huì)導(dǎo)致我們的元數(shù)據(jù)有兩種狀態(tài):
1、in-memory狀態(tài):如果SPARK-HOME/conf目錄下沒(méi)有放置hive-site.xml文件,元數(shù)據(jù)的狀態(tài)就是in-memory
2、hive狀態(tài):如果我們?cè)赟PARK-HOME/conf目錄下放置了,hive-site.xml文件,那么默認(rèn)情況下,spark-sql的元數(shù)據(jù)的狀態(tài)就是hive.
到此這篇關(guān)于SparkSQL使用快速入門(mén)的文章就介紹到這了,更多相關(guān)SparkSQL使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java結(jié)合WebSphere MQ實(shí)現(xiàn)接收隊(duì)列文件功能
WebSphereMQ,也稱MQSeries,以一致的、可靠的和易于管理的方式來(lái)連接應(yīng)用程序,并為跨部門(mén)、企業(yè)范圍的集成提供了可靠的基礎(chǔ)。通過(guò)為重要的消息和事務(wù)提供可靠的、一次且僅一次的傳遞,MQ可以處理復(fù)雜的通信協(xié)議,并動(dòng)態(tài)地將消息傳遞工作負(fù)載分配給可用的資源。2015-10-10Tk.mybatis零sql語(yǔ)句實(shí)現(xiàn)動(dòng)態(tài)sql查詢的方法(4種)
有時(shí)候,查詢數(shù)據(jù)需要根據(jù)條件使用動(dòng)態(tài)查詢,這時(shí)候需要使用動(dòng)態(tài)sql,本文主要介紹了Tk.mybatis零sql語(yǔ)句實(shí)現(xiàn)動(dòng)態(tài)sql查詢的方法,感興趣的可以了解一下2021-12-12java實(shí)現(xiàn)簡(jiǎn)易計(jì)算器功能
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)簡(jiǎn)易計(jì)算器功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-06-06Spring請(qǐng)求路徑帶參數(shù)URL使用注解的寫(xiě)法說(shuō)明
這篇文章主要介紹了Spring請(qǐng)求路徑帶參數(shù)URL使用注解的寫(xiě)法說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08SpringCloud Ribbon負(fù)載均衡代碼實(shí)例
這篇文章主要介紹了SpringCloud Ribbon負(fù)載均衡代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03java中synchronized鎖的升級(jí)過(guò)程
這篇文章主要介紹了java中synchronized鎖的升級(jí)過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理的場(chǎng)景
這篇文章主要介紹了Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-08-08springboot @WebFilter注解過(guò)濾器的實(shí)現(xiàn)
這篇文章主要介紹了springboot @WebFilter注解過(guò)濾器的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-03-03