如何使用IDEA開(kāi)發(fā)Spark SQL程序(一文搞懂)
前言
大家好,我是DJ丶小哪吒,我又來(lái)跟你們分享知識(shí)了。對(duì)軟件開(kāi)發(fā)有著濃厚的興趣。喜歡與人分享知識(shí)。做博客的目的就是為了能與 他 人知識(shí)共享。由于水平有限。博客中難免會(huì)有一些錯(cuò)誤。如有 紕漏 之處,歡迎大家在留言區(qū)指正。小編也會(huì)及時(shí)改正。
DJ丶小哪吒又來(lái)與各位分享知識(shí)了。今天我們不飆車,今天就靜靜的坐下來(lái),我們來(lái)聊一聊關(guān)于sparkSQL。準(zhǔn)備好茶水,聽(tīng)老朽與你娓娓道來(lái)。
Spark SQL是什么
Spark SQL 是一個(gè)用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的spark組件。它提供了一個(gè)叫做DataFrames的可編程抽象數(shù)據(jù)模型,并且可被視為一個(gè)分布式的SQL查詢引擎。
1、使用IDEA開(kāi)發(fā)Spark SQL
Spark會(huì)根據(jù)文件信息嘗試著去推斷DataFrame/DataSet的Schema,當(dāng)然我們也可以手動(dòng)指定,手動(dòng)指定的方式有以下幾種:
- 第1種:指定列名添加Schema
- 第2種:通過(guò)StructType指定Schema
- 第3種:編寫樣例類,利用反射機(jī)制推斷Schema
1.1、指定列名添加Schema
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object CreateDFDS {
def main(args: Array[String]): Unit = {
//1.創(chuàng)建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.讀取文件
val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt))
//3.將RDD轉(zhuǎn)成DF
//注意:RDD中原本沒(méi)有toDF方法,新版本中要給它增加一個(gè)方法,可以使用隱式轉(zhuǎn)換
import spark.implicits._
val personDF: DataFrame = rowRDD.toDF("id","name","age")
personDF.show(10)
personDF.printSchema()
sc.stop()
spark.stop()
}
}
1.2、通過(guò)StructType指定Schema
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object CreateDFDS2 {
def main(args: Array[String]): Unit = {
//1.創(chuàng)建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.讀取文件
val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt))
//3.將RDD轉(zhuǎn)成DF
//注意:RDD中原本沒(méi)有toDF方法,新版本中要給它增加一個(gè)方法,可以使用隱式轉(zhuǎn)換
//import spark.implicits._
val schema: StructType = StructType(Seq(
StructField("id", IntegerType, true),//允許為空
StructField("name", StringType, true),
StructField("age", IntegerType, true))
)
val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)
personDF.show(10)
personDF.printSchema()
sc.stop()
spark.stop()
}
}
1.3、反射推斷Schema–掌握
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object CreateDFDS3 {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.創(chuàng)建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.讀取文件
val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
//3.將RDD轉(zhuǎn)成DF
//注意:RDD中原本沒(méi)有toDF方法,新版本中要給它增加一個(gè)方法,可以使用隱式轉(zhuǎn)換
import spark.implicits._
//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
//所以SparkSQL可以通過(guò)反射自動(dòng)獲取到并添加給DF
val personDF: DataFrame = rowRDD.toDF
personDF.show(10)
personDF.printSchema()
sc.stop()
spark.stop()
}
}
1.4、花式查詢
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object QueryDemo {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.創(chuàng)建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.讀取文件
val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
//3.將RDD轉(zhuǎn)成DF
//注意:RDD中原本沒(méi)有toDF方法,新版本中要給它增加一個(gè)方法,可以使用隱式轉(zhuǎn)換
import spark.implicits._
//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
//所以SparkSQL可以通過(guò)反射自動(dòng)獲取到并添加給DF
val personDF: DataFrame = rowRDD.toDF
personDF.show(10)
personDF.printSchema()
//=======================SQL方式查詢=======================
//0.注冊(cè)表
personDF.createOrReplaceTempView("t_person")
//1.查詢所有數(shù)據(jù)
spark.sql("select * from t_person").show()
//2.查詢age+1
spark.sql("select age,age+1 from t_person").show()
//3.查詢age最大的兩人
spark.sql("select name,age from t_person order by age desc limit 2").show()
//4.查詢各個(gè)年齡的人數(shù)
spark.sql("select age,count(*) from t_person group by age").show()
//5.查詢年齡大于30的
spark.sql("select * from t_person where age > 30").show()
//=======================DSL方式查詢=======================
//1.查詢所有數(shù)據(jù)
personDF.select("name","age")
//2.查詢age+1
personDF.select($"name",$"age" + 1)
//3.查詢age最大的兩人
personDF.sort($"age".desc).show(2)
//4.查詢各個(gè)年齡的人數(shù)
personDF.groupBy("age").count().show()
//5.查詢年齡大于30的
personDF.filter($"age" > 30).show()
sc.stop()
spark.stop()
}
}
1.5、 相互轉(zhuǎn)化
RDD、DF、DS之間的相互轉(zhuǎn)換有很多(6種),但是我們實(shí)際操作就只有2類:
1)使用RDD算子操作
2)使用DSL/SQL對(duì)表操作
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object TransformDemo {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.創(chuàng)建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.讀取文件
val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
//3.將RDD轉(zhuǎn)成DF
//注意:RDD中原本沒(méi)有toDF方法,新版本中要給它增加一個(gè)方法,可以使用隱式轉(zhuǎn)換
import spark.implicits._
//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
//所以SparkSQL可以通過(guò)反射自動(dòng)獲取到并添加給DF
//=========================相互轉(zhuǎn)換======================
//1.RDD-->DF
val personDF: DataFrame = personRDD.toDF
//2.DF-->RDD
val rdd: RDD[Row] = personDF.rdd
//3.RDD-->DS
val DS: Dataset[Person] = personRDD.toDS()
//4.DS-->RDD
val rdd2: RDD[Person] = DS.rdd
//5.DF-->DS
val DS2: Dataset[Person] = personDF.as[Person]
//6.DS-->DF
val DF: DataFrame = DS2.toDF()
sc.stop()
spark.stop()
}
}
1.6、Spark SQL完成WordCount(案例)
1.6.1、SQL風(fēng)格
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object WordCount {
def main(args: Array[String]): Unit = {
//1.創(chuàng)建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.讀取文件
val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
//fileDF.show()
//fileDS.show()
//3.對(duì)每一行按照空格進(jìn)行切分并壓平
//fileDF.flatMap(_.split(" ")) //注意:錯(cuò)誤,因?yàn)镈F沒(méi)有泛型,不知道_是String
import spark.implicits._
val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正確,因?yàn)镈S有泛型,知道_是String
//wordDS.show()
/*
+-----+
|value|
+-----+
|hello|
| me|
|hello|
| you|
...
*/
//4.對(duì)上面的數(shù)據(jù)進(jìn)行WordCount
wordDS.createOrReplaceTempView("t_word")
val sql =
"""
|select value ,count(value) as count
|from t_word
|group by value
|order by count desc
""".stripMargin
spark.sql(sql).show()
sc.stop()
spark.stop()
}
}
1.6.2、DQL風(fēng)格
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object WordCount2 {
def main(args: Array[String]): Unit = {
//1.創(chuàng)建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.讀取文件
val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
//fileDF.show()
//fileDS.show()
//3.對(duì)每一行按照空格進(jìn)行切分并壓平
//fileDF.flatMap(_.split(" ")) //注意:錯(cuò)誤,因?yàn)镈F沒(méi)有泛型,不知道_是String
import spark.implicits._
val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正確,因?yàn)镈S有泛型,知道_是String
//wordDS.show()
/*
+-----+
|value|
+-----+
|hello|
| me|
|hello|
| you|
...
*/
//4.對(duì)上面的數(shù)據(jù)進(jìn)行WordCount
wordDS.groupBy("value").count().orderBy($"count".desc).show()
sc.stop()
spark.stop()
}
}
好了,以上內(nèi)容就到這里了。你學(xué)到了嗎。
到此這篇關(guān)于如何使用IDEA開(kāi)發(fā)Spark SQL程序(一文搞懂)的文章就介紹到這了,更多相關(guān)IDEA開(kāi)發(fā)Spark SQL內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JavaBean valication驗(yàn)證實(shí)現(xiàn)方法示例
這篇文章主要介紹了JavaBean valication驗(yàn)證實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了JavaBean valication驗(yàn)證相關(guān)概念、原理、用法及操作注意事項(xiàng),需要的朋友可以參考下2020-03-03
ThreadLocal?在上下文傳值場(chǎng)景實(shí)踐源碼
這篇文章主要為大家介紹了ThreadLocal在上下文傳值場(chǎng)景下的實(shí)踐源碼,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03
一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J
本文主要介紹了一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03
詳解SpringMVC的攔截器鏈實(shí)現(xiàn)及攔截器鏈配置
攔截器(Interceptor)是一種動(dòng)態(tài)攔截方法調(diào)用的機(jī)制,在SpringMVC中動(dòng)態(tài)攔截控制器方法的執(zhí)行。本文將詳細(xì)講講SpringMVC中攔截器參數(shù)及攔截器鏈配置,感興趣的可以嘗試一下2022-08-08
SpringBoot中使用Redis對(duì)接口進(jìn)行限流的實(shí)現(xiàn)
本文將結(jié)合實(shí)例代碼,介紹SpringBoot中使用Redis對(duì)接口進(jìn)行限流的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07
使用@Cacheable緩存解決雙冒號(hào)::的問(wèn)題
這篇文章主要介紹了使用@Cacheable緩存解決雙冒號(hào)::的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12

