欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SparkSQl簡介及運行原理

 更新時間:2021年08月10日 11:48:06   作者:山上有風景  
Spark SQL就是將SQL轉換成一個任務,提交到集群上運行,類似于Hive的執(zhí)行方式。今天通過本文給大家分享SparkSQl簡介及運行原理,感興趣的朋友跟隨小編一起看看吧

一:什么是SparkSQL?

(一)SparkSQL簡介

Spark SQL是Spark的一個模塊,用于處理結構化的數(shù)據(jù),它提供了一個數(shù)據(jù)抽象DataFrame(最核心的編程抽象就是DataFrame),并且SparkSQL作為分布式SQL查詢引擎。
Spark SQL就是將SQL轉換成一個任務,提交到集群上運行,類似于Hive的執(zhí)行方式。

(二)SparkSQL運行原理

將Spark SQL轉化為RDD,然后提交到集群執(zhí)行。

(三)SparkSQL特點

(1)容易整合,Spark SQL已經(jīng)集成在Spark中

(2)提供了統(tǒng)一的數(shù)據(jù)訪問方式:JSON、CSV、JDBC、Parquet等都是使用統(tǒng)一的方式進行訪問

(3)兼容 Hive

(4)標準的數(shù)據(jù)連接:JDBC、ODBC

二:DataFrame

(一)什么是DataFrame?

在Spark中,DataFrame是一種以RDD為基礎的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫中的二維表格。

DataFrame是組織成命名列的數(shù)據(jù)集。

它在概念上等同于關系數(shù)據(jù)庫中的表,但在底層具有更豐富的優(yōu)化。

關系型數(shù)據(jù)庫中的表由表結構和數(shù)據(jù)組成,而DataFrame也類似,由schema(結構)和數(shù)據(jù)組成,其數(shù)據(jù)集是RDD。

DataFrame可以根據(jù)很多源進行構建,包括:結構化的數(shù)據(jù)文件,hive中的表,外部的關系型數(shù)據(jù)庫,以及RDD

補充:Spark中的RDD、DataFrame和DataSet講解

(一)Spark中的模塊

上圖展示了Spark的模塊及各模塊之間的關系:

底層是Spark-core核心模塊,Spark每個模塊都有一個核心抽象,Spark-core的核心抽象是RDD,

Spark SQL等都基于RDD封裝了自己的抽象,在Spark SQL中是DataFrame/DataSet。

相對來說RDD是更偏底層的抽象,DataFrame/DataSet是在其上做了一層封裝,做了優(yōu)化,使用起來更加方便。

從功能上來說,DataFrame/DataSet能做的事情RDD都能做,RDD能做的事情DataFrame/DataSet不一定能做。

(二)RDD和DataFrame的區(qū)別

DataFrame與RDD的主要區(qū)別在于:

DataFrame

DataFrame帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。

使得Spark SQL得以洞察更多的結構信息,從而對藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進行了針對性的優(yōu)化,最終達到大幅提升運行時效率的目標。

RDD

RDD,由于無從得知所存數(shù)據(jù)元素的具體內(nèi)部結構,Spark Core只能在stage層面進行簡單、通用的流水線優(yōu)化。

DataFrame和RDD聯(lián)系:

DataFrame底層是以RDD為基礎的分布式數(shù)據(jù)集,和RDD的主要區(qū)別的是:RDD中沒有schema信息,而DataFrame中數(shù)據(jù)每一行都包含schema

DataFrame = RDD[Row] + shcema

三:SparkSession

(一)SparkSession簡介

SparkSession是Spark 2.0引如的新概念。SparkSession為用戶提供了統(tǒng)一的切入點,來讓用戶學習spark的各項功能。

在spark的早期版本中,SparkContext是spark的主要切入點,由于RDD是主要的API,我們通過sparkcontext來創(chuàng)建和操作RDD。

對于每個其他的API,我們需要使用不同的context。

例如,對于Streming,我們需要使用StreamingContext;對于sql,使用sqlContext;對于Hive,使用hiveContext。

但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點。

SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向后兼容,SQLContext和HiveContext也被保存下來。

(二)SparkSession實質(zhì)

SparkSession實質(zhì)上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。

SparkSession內(nèi)部封裝了sparkContext,所以計算實際上是由sparkContext完成的。

(三)SparkSession特點

   ----為用戶提供一個統(tǒng)一的切入點使用Spark各項功能

----允許用戶通過它調(diào)用DataFrame和Dataset相關 API來編寫程序

----減少了用戶需要了解的一些概念,可以很容易的與Spark進行交互

----與Spark交互之時不需要顯示的創(chuàng)建SparkConf, SparkContext以及 SQlContext,這些對象已經(jīng)封閉在SparkSession中

四:通過RDD創(chuàng)建DataFrame

(一)通過樣本類創(chuàng)建(反射)

case class People(val name:String,val age:Int)  //可以聲明數(shù)據(jù)類型

object WordCount {
  def main(args:Array[String]):Unit={
    val conf = new SparkConf()
    //設置運行模式為本地運行,不然默認是集群模式
    //conf.setMaster("local")  //默認是集群模式
    //設置任務名
    conf.setAppName("WordCount").setMaster("local")
    conf.set("spark.default.parallelism","5")
    //設置SparkContext,是SparkCore的程序入口
    val sc = new SparkContext(conf)
    val Sqlsc = new SQLContext(sc)  //根據(jù)SparkContext生成SQLContext
    
    val array = Array("mark,14","kitty,23","dasi,45")
    val peopleRDD = sc.parallelize(array).map(line=>{    //生成RDD
      People(line.split(",")(0),line.split(",")(1).trim().toInt)
    })
    
    import Sqlsc.implicits._  //引入全部方法
    //將RDD轉換成DataFrame
    val df = peopleRDD.toDF()  
    //將DataFrame轉換成一個臨時的視圖
    df.createOrReplaceTempView("people")
    //使用SQL語句進行查詢
    Sqlsc.sql("select * from people").show()
  }
}

(二)通過SparkSession創(chuàng)建DataFrame

object WordCount {
  def main(args:Array[String]):Unit={
    val conf = new SparkConf()
    //設置運行模式為本地運行,不然默認是集群模式
    //conf.setMaster("local")  //默認是集群模式
    //設置任務名
    conf.setAppName("WordCount").setMaster("local")
    conf.set("spark.default.parallelism","5")
    //設置SparkContext,是SparkCore的程序入口
    val sc = new SparkContext(conf)
    val Sqlsc = new SQLContext(sc)  //根據(jù)SparkContext生成SQLContext
    
    val array = Array("mark,14","kitty,23","dasi,45")
    //1.需要將RDD數(shù)據(jù)映射成Row,需要引入import org.apache.spark.sql.Row
    val peopleRDD = sc.parallelize(array).map(line=>{    //生成RDD
      val fields = line.split(",")
      Row(fields(0),fields(1).trim().toInt)
    })
    
    //2.創(chuàng)建StructType定義結構
    val st:StructType = StructType(
      //字段名,字段類型,是否可以為空
      List(  //傳參是列表類型,或者使用StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil來構建列表
          StructField("name",StringType,true),
          StructField("age",IntegerType,true)
          )
    )
    
    //3.使用SparkSession建立DataFrame
    val df = Sqlsc.createDataFrame(peopleRDD,st)
    //將DataFrame轉換成一個臨時的視圖
    df.createOrReplaceTempView("people")
    //使用SQL語句進行查詢
    Sqlsc.sql("select * from people").show()
  }
}

(三)通過 json 文件創(chuàng)建DataFrames

[{"name":"dafa","age":12},{"name":"safaw","age":17},{"name":"ge","age":34}]
def main(args:Array[String]):Unit={
    val conf = new SparkConf()
    //設置運行模式為本地運行,不然默認是集群模式
    //conf.setMaster("local")  //默認是集群模式
    //設置任務名
    conf.setAppName("WordCount").setMaster("local")
    //設置SparkContext,是SparkCore的程序入口
    val sc = new SparkContext(conf)
    val Sqlsc = new SQLContext(sc)  //根據(jù)SparkContext生成SQLContext
    
    //通過json數(shù)據(jù)直接創(chuàng)建DataFrame
    val df = Sqlsc.read.json("E:\\1.json")
    
    //將DataFrame轉換成一個臨時的視圖
    df.createOrReplaceTempView("people1")
    //使用SQL語句進行查詢
    Sqlsc.sql("select * from people1").show()
  }

五:臨時視圖

(一)什么是視圖

視圖是一個虛表,跟Mysql里的概念是一樣的,視圖基于實際的表而存在,其實質(zhì)是一系列的查詢語句

(二)類型

局部視圖(Temoporary View):只在當前會話中有效,如果創(chuàng)建它的會話終止,則視圖也會消失。

全局視圖(Global Temporary View): 在全局范圍內(nèi)有效,不同的Session中都可以訪問,生命周期是Spark的Application運行周期,全局視圖會綁定到系統(tǒng)保留的數(shù)據(jù)庫global_temp中,因此使用它的時候必須加上相應前綴。

(三)創(chuàng)建視圖

創(chuàng)建局部視圖:df.createOrReplaceTempView("emp")
創(chuàng)建全局視圖:df.createOrReplaceGlobalTempView("empG")

(四)視圖查詢

spark.sql("select * from emp").show
spark.sql("select * from global_temp.empG").show  //查詢?nèi)忠晥D,需要添加前綴

(五)會話周期

spark.newSession.sql("select * from emp").show -----> 報錯,Table or View Not Found
spark.newSession.sql("select * from global_temp.empG").show ---->可以正常查詢

六:DataFrame的read和save和savemode

(一)數(shù)據(jù)讀取

val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //方式一
    val df1 = sqlContext.read.json("E:\\666\\people.json")
    val df2 = sqlContext.read.parquet("E:\\666\\users.parquet")
    //方式二
    val df3 = sqlContext.read.format("json").load("E:\\666\\people.json")
    val df4 = sqlContext.read.format("parquet").load("E:\\666\\users.parquet")
    //方式三,默認是parquet格式
    val df5 = sqlContext.load("E:\\666\\users.parquet")
   //方式四,使用MySQL進行數(shù)據(jù)源讀取
    val url = "jdbc:mysql://192.168.123.102:3306/hivedb"
    val table = "dbs"
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","root")
    //需要傳入Mysql的URL、表明、properties(連接數(shù)據(jù)庫的用戶名密碼)
    val df = sqlContext.read.jdbc(url,table,properties)
    df.createOrReplaceTempView("dbs")
    sqlContext.sql("select * from dbs").show()

使用Hive作為數(shù)據(jù)源:需要在pom.xml文件中添加依賴

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

開發(fā)環(huán)境則把resource文件夾下添加hive-site.xml文件,集群環(huán)境把hive的配置文件要發(fā)到$SPARK_HOME/conf目錄下

<configuration>
        <property>
                <name>javax.jdo.option.ConnectionURL</name>
                <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value>
                <description>JDBC connect string for a JDBC metastore</description>
                <!-- 如果 mysql 和 hive 在同一個服務器節(jié)點,那么請更改 hadoop02 為 localhost -->
        </property>
        <property>
                <name>javax.jdo.option.ConnectionDriverName</name>
                <value>com.mysql.jdbc.Driver</value>
                <description>Driver class name for a JDBC metastore</description>
        </property>
        <property>
                <name>javax.jdo.option.ConnectionUserName</name>
                <value>root</value>
                <description>username to use against metastore database</description>
        </property>
        <property>
                <name>javax.jdo.option.ConnectionPassword</name>
                <value>root</value>
        <description>password to use against metastore database</description>
        </property>
    <property>
                <name>hive.metastore.warehouse.dir</name>
                <value>/hive/warehouse</value>
                <description>hive default warehouse, if nessecory, change it</description>
        </property>  
</configuration>

hive-site.xml配置文件
val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    sqlContext.sql("select * from myhive.student").show() 

(二)數(shù)據(jù)保存

val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df1 = sqlContext.read.json("E:\\666\\people.json")
    //方式一
    df1.write.json("E:\\111")
    df1.write.parquet("E:\\222")
    //方式二
    df1.write.format("json").save("E:\\333")
    df1.write.format("parquet").save("E:\\444")
    //方式三
    df1.write.save("E:\\555")

(三)數(shù)據(jù)保存模式

df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")

七:數(shù)據(jù)集DataSet

Dataset也是一個分布式數(shù)據(jù)容器,簡單來說是類似二維表,Dataset里頭存有schema數(shù)據(jù)結構信息和原生數(shù)據(jù),Dataset的底層封裝的是RDD,當RDD的泛型是Row類型的時候,我們也可以稱它為DataFrame。即Dataset<Row> = DataFrame。DataFrame是特殊的Dataset。

Spark整合了Dataset和DataFrame,前者是有明確類型的數(shù)據(jù)集,后者是無明確類型的數(shù)據(jù)集。根據(jù)官方的文檔:

Dataset是一種強類型集合,與領域對象相關,可以使用函數(shù)或者關系進行分布式的操作。
每個Dataset也有一個無類型的視圖,叫做DataFrame,也就是關于Row的Dataset。
簡單來說,Dataset一般都是Dataset[T]形式,這里的T是指數(shù)據(jù)的類型,如上圖中的Person,而DataFrame就是一個Dataset[Row]。

Datasets是懶加載的,即只有actions被調(diào)用的時候才會觸發(fā)計算。在內(nèi)部,Dataset代表一個邏輯計劃,用來描述產(chǎn)生數(shù)據(jù)需要的計算。當一個action被調(diào)用的時候,Spark的query優(yōu)化器會優(yōu)化這個邏輯計劃并以分布式的方式在物理上進行實際的計算操作。

(一)創(chuàng)建和使用DataSet---使用序列

(1,"Tom")  (2,"Mary")

測試數(shù)據(jù)

(1)定義case class
             case class MyData(a:Int,b:String)
(2)使用序列創(chuàng)建DataSet
             val DS = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS

(二)創(chuàng)建和使用DataSet---通過case class作為編碼器,將DataFrame轉換成DataSet

(1)定義case class
             case class Person(name:String,age:BigInt)
(2)讀入JSON的數(shù)據(jù)
             val df = spark.read.json("/root/temp/people.json")
(3)將DataFrame轉換成DataSet
             val PersonDS =df.as[Person]

(三)創(chuàng)建和使用DataSet---讀取HDFS數(shù)據(jù)文件

(1)讀取HDFS的文件,直接創(chuàng)建DataSet
             val lineDS = spark.read.text("hdfs://bigdata111:9000/input/data.txt").as[String]
(2)分詞操作,查詢長度大于3的單詞
             val words = lineDS.flatMap(_.split(" ")).filter(_.length > 3)
             words.show
             words.collect

到此這篇關于SparkSQl簡介及運行原理的文章就介紹到這了,更多相關SparkSQl使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java構造代碼塊,靜態(tài)代碼塊原理與用法實例分析

    Java構造代碼塊,靜態(tài)代碼塊原理與用法實例分析

    這篇文章主要介紹了Java構造代碼塊,靜態(tài)代碼塊,結合實例形式分析了Java構造代碼塊,靜態(tài)代碼塊的功能、原理、用法及操作注意事項,需要的朋友可以參考下
    2020-04-04
  • j2Cache線上異常排查問題解決記錄分析

    j2Cache線上異常排查問題解決記錄分析

    這篇文章主要為大家介紹了關于j2Cache線上異常排查的問題解決記錄分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步
    2022-02-02
  • MyBatis中SqlSession生命周期的使用

    MyBatis中SqlSession生命周期的使用

    SqlSession是MyBatis的核心接口之一,本文主要介紹了MyBatis中SqlSession生命周期的使用,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-09-09
  • java使用URLDecoder和URLEncoder對中文字符進行編碼和解碼

    java使用URLDecoder和URLEncoder對中文字符進行編碼和解碼

    這篇文章主要介紹了java 使用 URLDecoder 和 URLEncoder 對中文字符進行編碼和解碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-07-07
  • SpringBoot 內(nèi)嵌 camunda的配置方法

    SpringBoot 內(nèi)嵌 camunda的配置方法

    Camunda是一個基于Java的框架,支持用于工作流和流程自動化的BPMN、用于案例管理的CMMN和用于業(yè)務決策管理的DMN,這篇文章主要介紹了SpringBoot 內(nèi)嵌 camunda,需要的朋友可以參考下
    2024-06-06
  • 數(shù)據(jù)庫連接超時java處理的兩種方式

    數(shù)據(jù)庫連接超時java處理的兩種方式

    這篇文章主要介紹了數(shù)據(jù)庫連接超時java處理的兩種方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • 基于@Table注解無法使用及報紅的解決

    基于@Table注解無法使用及報紅的解決

    這篇文章主要介紹了基于@Table注解無法使用及報紅的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • spring mvc rest 接口選擇性加密解密詳情

    spring mvc rest 接口選擇性加密解密詳情

    這篇文章主要介紹了spring mvc rest 接口選擇性加密解密詳情,spring mvc rest接口以前是采用https加密的,但是現(xiàn)在需要更加安全的加密。而且不是對所有的接口進行加密,是對部分接口進行加密,接口返回值進行解密
    2022-07-07
  • TKmybatis的框架介紹和原理解析

    TKmybatis的框架介紹和原理解析

    tkmybatis是在mybatis框架的基礎上提供了很多工具,讓開發(fā)更加高效,下面來看看這個框架的基本使用,后面會對相關源碼進行分析,感興趣的同學可以看一下,挺不錯的一個工具
    2020-12-12
  • Java運行時jar終端輸出的中文日志亂碼兩種解決方式

    Java運行時jar終端輸出的中文日志亂碼兩種解決方式

    jar包啟動,今天java開發(fā)過來找,說jar包啟動日志是亂碼,這篇文章主要給大家介紹了關于Java運行時jar終端輸出的中文日志亂碼的兩種解決方式,文中通過圖文介紹的非常詳細,需要的朋友可以參考下
    2024-01-01

最新評論