欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何使用IDEA開發(fā)Spark SQL程序(一文搞懂)

 更新時間:2021年08月10日 11:12:11   作者:小哪吒的BD  
Spark SQL 是一個用來處理結(jié)構(gòu)化數(shù)據(jù)的spark組件。它提供了一個叫做DataFrames的可編程抽象數(shù)據(jù)模型,并且可被視為一個分布式的SQL查詢引擎。這篇文章主要介紹了如何使用IDEA開發(fā)Spark SQL程序(一文搞懂),需要的朋友可以參考下

前言

大家好,我是DJ丶小哪吒,我又來跟你們分享知識了。對軟件開發(fā)有著濃厚的興趣。喜歡與人分享知識。做博客的目的就是為了能與 他 人知識共享。由于水平有限。博客中難免會有一些錯誤。如有 紕漏 之處,歡迎大家在留言區(qū)指正。小編也會及時改正。

DJ丶小哪吒又來與各位分享知識了。今天我們不飆車,今天就靜靜的坐下來,我們來聊一聊關(guān)于sparkSQL。準(zhǔn)備好茶水,聽老朽與你娓娓道來。

Spark SQL是什么

Spark SQL 是一個用來處理結(jié)構(gòu)化數(shù)據(jù)的spark組件。它提供了一個叫做DataFrames的可編程抽象數(shù)據(jù)模型,并且可被視為一個分布式的SQL查詢引擎。

1、使用IDEA開發(fā)Spark SQL

Spark會根據(jù)文件信息嘗試著去推斷DataFrame/DataSet的Schema,當(dāng)然我們也可以手動指定,手動指定的方式有以下幾種:

  • 第1種:指定列名添加Schema
  • 第2種:通過StructType指定Schema
  • 第3種:編寫樣例類,利用反射機(jī)制推斷Schema

 1.1、指定列名添加Schema

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object CreateDFDS {
  def main(args: Array[String]): Unit = {
    //1.創(chuàng)建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.讀取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt))
    //3.將RDD轉(zhuǎn)成DF
    //注意:RDD中原本沒有toDF方法,新版本中要給它增加一個方法,可以使用隱式轉(zhuǎn)換
    import spark.implicits._
    val personDF: DataFrame = rowRDD.toDF("id","name","age")
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.2、通過StructType指定Schema

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}


object CreateDFDS2 {
  def main(args: Array[String]): Unit = {
    //1.創(chuàng)建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.讀取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt))
    //3.將RDD轉(zhuǎn)成DF
    //注意:RDD中原本沒有toDF方法,新版本中要給它增加一個方法,可以使用隱式轉(zhuǎn)換
    //import spark.implicits._
    val schema: StructType = StructType(Seq(
      StructField("id", IntegerType, true),//允許為空
      StructField("name", StringType, true),
      StructField("age", IntegerType, true))
    )
    val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.3、反射推斷Schema–掌握

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object CreateDFDS3 {
case class Person(id:Int,name:String,age:Int)
  def main(args: Array[String]): Unit = {
    //1.創(chuàng)建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.讀取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.將RDD轉(zhuǎn)成DF
    //注意:RDD中原本沒有toDF方法,新版本中要給它增加一個方法,可以使用隱式轉(zhuǎn)換
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通過反射自動獲取到并添加給DF
    val personDF: DataFrame = rowRDD.toDF
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.4、花式查詢

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object QueryDemo {
case class Person(id:Int,name:String,age:Int)
  def main(args: Array[String]): Unit = {
    //1.創(chuàng)建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.讀取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.將RDD轉(zhuǎn)成DF
    //注意:RDD中原本沒有toDF方法,新版本中要給它增加一個方法,可以使用隱式轉(zhuǎn)換
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通過反射自動獲取到并添加給DF
    val personDF: DataFrame = rowRDD.toDF
    personDF.show(10)
    personDF.printSchema()
    //=======================SQL方式查詢=======================
    //0.注冊表
    personDF.createOrReplaceTempView("t_person")
    //1.查詢所有數(shù)據(jù)
    spark.sql("select * from t_person").show()
    //2.查詢age+1
    spark.sql("select age,age+1 from t_person").show()
    //3.查詢age最大的兩人
    spark.sql("select name,age from t_person order by age desc limit 2").show()
    //4.查詢各個年齡的人數(shù)
    spark.sql("select age,count(*) from t_person group by age").show()
    //5.查詢年齡大于30的
    spark.sql("select * from t_person where age > 30").show()

    //=======================DSL方式查詢=======================
    //1.查詢所有數(shù)據(jù)
    personDF.select("name","age")
    //2.查詢age+1
    personDF.select($"name",$"age" + 1)
    //3.查詢age最大的兩人
    personDF.sort($"age".desc).show(2)
    //4.查詢各個年齡的人數(shù)
    personDF.groupBy("age").count().show()
    //5.查詢年齡大于30的
    personDF.filter($"age" > 30).show()

    sc.stop()
    spark.stop()
  }
  }

1.5、 相互轉(zhuǎn)化

RDD、DF、DS之間的相互轉(zhuǎn)換有很多(6種),但是我們實(shí)際操作就只有2類:
1)使用RDD算子操作
2)使用DSL/SQL對表操作

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object TransformDemo {
case class Person(id:Int,name:String,age:Int)

  def main(args: Array[String]): Unit = {
    //1.創(chuàng)建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.讀取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.將RDD轉(zhuǎn)成DF
    //注意:RDD中原本沒有toDF方法,新版本中要給它增加一個方法,可以使用隱式轉(zhuǎn)換
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通過反射自動獲取到并添加給DF
    //=========================相互轉(zhuǎn)換======================
    //1.RDD-->DF
    val personDF: DataFrame = personRDD.toDF
    //2.DF-->RDD
    val rdd: RDD[Row] = personDF.rdd
    //3.RDD-->DS
    val DS: Dataset[Person] = personRDD.toDS()
    //4.DS-->RDD
    val rdd2: RDD[Person] = DS.rdd
    //5.DF-->DS
    val DS2: Dataset[Person] = personDF.as[Person]
    //6.DS-->DF
    val DF: DataFrame = DS2.toDF()

    sc.stop()
    spark.stop()
  }
  }

1.6、Spark SQL完成WordCount(案例)

1.6.1、SQL風(fēng)格

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}


object WordCount {
  def main(args: Array[String]): Unit = {
    //1.創(chuàng)建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.讀取文件
    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
    //fileDF.show()
    //fileDS.show()
    //3.對每一行按照空格進(jìn)行切分并壓平
    //fileDF.flatMap(_.split(" ")) //注意:錯誤,因?yàn)镈F沒有泛型,不知道_是String
    import spark.implicits._
    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正確,因?yàn)镈S有泛型,知道_是String
    //wordDS.show()
    /*
    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    |hello|
    |  you|
      ...
     */
    //4.對上面的數(shù)據(jù)進(jìn)行WordCount
    wordDS.createOrReplaceTempView("t_word")
    val sql =
      """
        |select value ,count(value) as count
        |from t_word
        |group by value
        |order by count desc
      """.stripMargin
    spark.sql(sql).show()

    sc.stop()
    spark.stop()
  }
}

1.6.2、DQL風(fēng)格

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}


object WordCount2 {
  def main(args: Array[String]): Unit = {
    //1.創(chuàng)建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.讀取文件
    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
    //fileDF.show()
    //fileDS.show()
    //3.對每一行按照空格進(jìn)行切分并壓平
    //fileDF.flatMap(_.split(" ")) //注意:錯誤,因?yàn)镈F沒有泛型,不知道_是String
    import spark.implicits._
    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正確,因?yàn)镈S有泛型,知道_是String
    //wordDS.show()
    /*
    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    |hello|
    |  you|
      ...
     */
    //4.對上面的數(shù)據(jù)進(jìn)行WordCount
    wordDS.groupBy("value").count().orderBy($"count".desc).show()

    sc.stop()
    spark.stop()
  }
}

好了,以上內(nèi)容就到這里了。你學(xué)到了嗎。

到此這篇關(guān)于如何使用IDEA開發(fā)Spark SQL程序(一文搞懂)的文章就介紹到這了,更多相關(guān)IDEA開發(fā)Spark SQL內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • JavaBean valication驗(yàn)證實(shí)現(xiàn)方法示例

    JavaBean valication驗(yàn)證實(shí)現(xiàn)方法示例

    這篇文章主要介紹了JavaBean valication驗(yàn)證實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了JavaBean valication驗(yàn)證相關(guān)概念、原理、用法及操作注意事項(xiàng),需要的朋友可以參考下
    2020-03-03
  • ThreadLocal?在上下文傳值場景實(shí)踐源碼

    ThreadLocal?在上下文傳值場景實(shí)踐源碼

    這篇文章主要為大家介紹了ThreadLocal在上下文傳值場景下的實(shí)踐源碼,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步
    2022-03-03
  • Java經(jīng)典面試題最全匯總208道(六)

    Java經(jīng)典面試題最全匯總208道(六)

    這篇文章主要介紹了Java經(jīng)典面試題最全匯總208道(六),本文章內(nèi)容詳細(xì),該模塊分為了六個部分,本次為第六部分,需要的朋友可以參考下
    2023-01-01
  • 淺析Java Scanner 類的用法

    淺析Java Scanner 類的用法

    這篇文章主要介紹了Java Scanner 類的用法,文中講解非常詳細(xì),代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下
    2020-07-07
  • 一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J

    一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J

    本文主要介紹了一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • 詳解SpringMVC的攔截器鏈實(shí)現(xiàn)及攔截器鏈配置

    詳解SpringMVC的攔截器鏈實(shí)現(xiàn)及攔截器鏈配置

    攔截器(Interceptor)是一種動態(tài)攔截方法調(diào)用的機(jī)制,在SpringMVC中動態(tài)攔截控制器方法的執(zhí)行。本文將詳細(xì)講講SpringMVC中攔截器參數(shù)及攔截器鏈配置,感興趣的可以嘗試一下
    2022-08-08
  • Mybatis 中如何判斷集合的size

    Mybatis 中如何判斷集合的size

    這篇文章主要介紹了在Mybatis中判斷集合的size操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • SpringBoot中使用Redis對接口進(jìn)行限流的實(shí)現(xiàn)

    SpringBoot中使用Redis對接口進(jìn)行限流的實(shí)現(xiàn)

    本文將結(jié)合實(shí)例代碼,介紹SpringBoot中使用Redis對接口進(jìn)行限流的實(shí)現(xiàn),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • 使用@Cacheable緩存解決雙冒號::的問題

    使用@Cacheable緩存解決雙冒號::的問題

    這篇文章主要介紹了使用@Cacheable緩存解決雙冒號::的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Java split 分隔空值無法得到的解決方式

    Java split 分隔空值無法得到的解決方式

    這篇文章主要介紹了Java split 分隔空值無法得到的解決方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-10-10

最新評論