如何使用IDEA開發(fā)Spark SQL程序(一文搞懂)
前言
大家好,我是DJ丶小哪吒,我又來跟你們分享知識了。對軟件開發(fā)有著濃厚的興趣。喜歡與人分享知識。做博客的目的就是為了能與 他 人知識共享。由于水平有限。博客中難免會有一些錯誤。如有 紕漏 之處,歡迎大家在留言區(qū)指正。小編也會及時改正。
DJ丶小哪吒又來與各位分享知識了。今天我們不飆車,今天就靜靜的坐下來,我們來聊一聊關(guān)于sparkSQL。準(zhǔn)備好茶水,聽老朽與你娓娓道來。
Spark SQL是什么
Spark SQL 是一個用來處理結(jié)構(gòu)化數(shù)據(jù)的spark組件。它提供了一個叫做DataFrames的可編程抽象數(shù)據(jù)模型,并且可被視為一個分布式的SQL查詢引擎。
1、使用IDEA開發(fā)Spark SQL
Spark會根據(jù)文件信息嘗試著去推斷DataFrame/DataSet的Schema,當(dāng)然我們也可以手動指定,手動指定的方式有以下幾種:
- 第1種:指定列名添加Schema
- 第2種:通過StructType指定Schema
- 第3種:編寫樣例類,利用反射機(jī)制推斷Schema
1.1、指定列名添加Schema
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object CreateDFDS { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt)) //3.將RDD轉(zhuǎn)成DF //注意:RDD中原本沒有toDF方法,新版本中要給它增加一個方法,可以使用隱式轉(zhuǎn)換 import spark.implicits._ val personDF: DataFrame = rowRDD.toDF("id","name","age") personDF.show(10) personDF.printSchema() sc.stop() spark.stop() } }
1.2、通過StructType指定Schema
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} object CreateDFDS2 { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt)) //3.將RDD轉(zhuǎn)成DF //注意:RDD中原本沒有toDF方法,新版本中要給它增加一個方法,可以使用隱式轉(zhuǎn)換 //import spark.implicits._ val schema: StructType = StructType(Seq( StructField("id", IntegerType, true),//允許為空 StructField("name", StringType, true), StructField("age", IntegerType, true)) ) val personDF: DataFrame = spark.createDataFrame(rowRDD,schema) personDF.show(10) personDF.printSchema() sc.stop() spark.stop() } }
1.3、反射推斷Schema–掌握
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object CreateDFDS3 { case class Person(id:Int,name:String,age:Int) def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt)) //3.將RDD轉(zhuǎn)成DF //注意:RDD中原本沒有toDF方法,新版本中要給它增加一個方法,可以使用隱式轉(zhuǎn)換 import spark.implicits._ //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息 //所以SparkSQL可以通過反射自動獲取到并添加給DF val personDF: DataFrame = rowRDD.toDF personDF.show(10) personDF.printSchema() sc.stop() spark.stop() } }
1.4、花式查詢
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object QueryDemo { case class Person(id:Int,name:String,age:Int) def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt)) //3.將RDD轉(zhuǎn)成DF //注意:RDD中原本沒有toDF方法,新版本中要給它增加一個方法,可以使用隱式轉(zhuǎn)換 import spark.implicits._ //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息 //所以SparkSQL可以通過反射自動獲取到并添加給DF val personDF: DataFrame = rowRDD.toDF personDF.show(10) personDF.printSchema() //=======================SQL方式查詢======================= //0.注冊表 personDF.createOrReplaceTempView("t_person") //1.查詢所有數(shù)據(jù) spark.sql("select * from t_person").show() //2.查詢age+1 spark.sql("select age,age+1 from t_person").show() //3.查詢age最大的兩人 spark.sql("select name,age from t_person order by age desc limit 2").show() //4.查詢各個年齡的人數(shù) spark.sql("select age,count(*) from t_person group by age").show() //5.查詢年齡大于30的 spark.sql("select * from t_person where age > 30").show() //=======================DSL方式查詢======================= //1.查詢所有數(shù)據(jù) personDF.select("name","age") //2.查詢age+1 personDF.select($"name",$"age" + 1) //3.查詢age最大的兩人 personDF.sort($"age".desc).show(2) //4.查詢各個年齡的人數(shù) personDF.groupBy("age").count().show() //5.查詢年齡大于30的 personDF.filter($"age" > 30).show() sc.stop() spark.stop() } }
1.5、 相互轉(zhuǎn)化
RDD、DF、DS之間的相互轉(zhuǎn)換有很多(6種),但是我們實(shí)際操作就只有2類:
1)使用RDD算子操作
2)使用DSL/SQL對表操作
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object TransformDemo { case class Person(id:Int,name:String,age:Int) def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt)) //3.將RDD轉(zhuǎn)成DF //注意:RDD中原本沒有toDF方法,新版本中要給它增加一個方法,可以使用隱式轉(zhuǎn)換 import spark.implicits._ //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息 //所以SparkSQL可以通過反射自動獲取到并添加給DF //=========================相互轉(zhuǎn)換====================== //1.RDD-->DF val personDF: DataFrame = personRDD.toDF //2.DF-->RDD val rdd: RDD[Row] = personDF.rdd //3.RDD-->DS val DS: Dataset[Person] = personRDD.toDS() //4.DS-->RDD val rdd2: RDD[Person] = DS.rdd //5.DF-->DS val DS2: Dataset[Person] = personDF.as[Person] //6.DS-->DF val DF: DataFrame = DS2.toDF() sc.stop() spark.stop() } }
1.6、Spark SQL完成WordCount(案例)
1.6.1、SQL風(fēng)格
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object WordCount { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt") val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt") //fileDF.show() //fileDS.show() //3.對每一行按照空格進(jìn)行切分并壓平 //fileDF.flatMap(_.split(" ")) //注意:錯誤,因?yàn)镈F沒有泛型,不知道_是String import spark.implicits._ val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正確,因?yàn)镈S有泛型,知道_是String //wordDS.show() /* +-----+ |value| +-----+ |hello| | me| |hello| | you| ... */ //4.對上面的數(shù)據(jù)進(jìn)行WordCount wordDS.createOrReplaceTempView("t_word") val sql = """ |select value ,count(value) as count |from t_word |group by value |order by count desc """.stripMargin spark.sql(sql).show() sc.stop() spark.stop() } }
1.6.2、DQL風(fēng)格
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object WordCount2 { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt") val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt") //fileDF.show() //fileDS.show() //3.對每一行按照空格進(jìn)行切分并壓平 //fileDF.flatMap(_.split(" ")) //注意:錯誤,因?yàn)镈F沒有泛型,不知道_是String import spark.implicits._ val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正確,因?yàn)镈S有泛型,知道_是String //wordDS.show() /* +-----+ |value| +-----+ |hello| | me| |hello| | you| ... */ //4.對上面的數(shù)據(jù)進(jìn)行WordCount wordDS.groupBy("value").count().orderBy($"count".desc).show() sc.stop() spark.stop() } }
好了,以上內(nèi)容就到這里了。你學(xué)到了嗎。
到此這篇關(guān)于如何使用IDEA開發(fā)Spark SQL程序(一文搞懂)的文章就介紹到這了,更多相關(guān)IDEA開發(fā)Spark SQL內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JavaBean valication驗(yàn)證實(shí)現(xiàn)方法示例
這篇文章主要介紹了JavaBean valication驗(yàn)證實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了JavaBean valication驗(yàn)證相關(guān)概念、原理、用法及操作注意事項(xiàng),需要的朋友可以參考下2020-03-03一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J
本文主要介紹了一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03詳解SpringMVC的攔截器鏈實(shí)現(xiàn)及攔截器鏈配置
攔截器(Interceptor)是一種動態(tài)攔截方法調(diào)用的機(jī)制,在SpringMVC中動態(tài)攔截控制器方法的執(zhí)行。本文將詳細(xì)講講SpringMVC中攔截器參數(shù)及攔截器鏈配置,感興趣的可以嘗試一下2022-08-08SpringBoot中使用Redis對接口進(jìn)行限流的實(shí)現(xiàn)
本文將結(jié)合實(shí)例代碼,介紹SpringBoot中使用Redis對接口進(jìn)行限流的實(shí)現(xiàn),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-07-07