欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spark?SQL配置及使用教程

 更新時間:2021年12月03日 14:27:25   作者:一尺月光寒  
SparkSQL是spark的一個模塊,主入口是SparkSession,將SQL查詢與Spark程序無縫混合,這篇文章主要介紹了Spark?SQL配置及使用,需要的朋友可以參考下

XY個人記

SparkSQL是spark的一個模塊,主入口是SparkSession,將SQL查詢與Spark程序無縫混合。DataFrames和SQL提供了訪問各種數(shù)據(jù)源(通過JDBC或ODBC連接)的常用方法包括Hive,Avro,Parquet,ORC,JSON和JDBC。您甚至可以跨這些來源加入數(shù)據(jù)。以相同方式連接到任何數(shù)據(jù)源。Spark SQL還支持HiveQL語法以及Hive SerDes和UDF,允許您訪問現(xiàn)有的Hive倉庫。

Spark SQL包括基于成本的優(yōu)化器,列式存儲和代碼生成,以快速進行查詢。同時,它使用Spark引擎擴展到數(shù)千個節(jié)點和多小時查詢,該引擎提供完整的中間查詢?nèi)蒎e。不要擔心使用不同的引擎來獲取歷史數(shù)據(jù)。

SparkSQL版本:?

??? Spark2.0之前
入口:SQLContext和HiveContext
SQLContext:主要DataFrame的構(gòu)建以及DataFrame的執(zhí)行,SQLContext指的是spark中SQL模塊的程序入口
HiveContext:是SQLContext的子類,專門用于與Hive的集成,比如讀取Hive的元數(shù)據(jù),數(shù)據(jù)存儲到Hive表、Hive的窗口分析函數(shù)等

??? Spark2.0之后
入口:SparkSession(spark應(yīng)用程序的一個整體入口),合并了SQLContext和HiveContext

??? SparkSQL核心抽象:DataFrame/Dataset?? ? type DataFrame = Dataset[Row]?? ?//type 給某個數(shù)據(jù)類型起個別名

SparkSQL DSL語法?

SparkSQL除了支持直接的HQL語句的查詢外,還支持通過DSL語句/API進行數(shù) 據(jù)的操作,主要DataFrame API列表如下:

select:類似于HQL語句中的select,獲取需要的字段信息

where/filter:類似HQL語句中的where語句,根據(jù)給定條件過濾數(shù)據(jù)

sort/orderBy: 全局數(shù)據(jù)排序功能,類似Hive中的order by語句,按照給定字段進行全部 數(shù)據(jù)的排序

sortWithinPartitions:類似Hive的sort by語句,按照分區(qū)進行數(shù)據(jù)排序

groupBy:數(shù)據(jù)聚合操作

limit:獲取前N條數(shù)據(jù)記錄

SparkSQL和Hive的集成

集成步驟:
-1. namenode和datanode啟動
-2. 將hive配置文件軟連接或者復制到spark的conf目錄下面

$ ln -s /opt/modules/apache/hive-1.2.1/conf/hive-site.xml 
or
$ cp /opt/modules/apache/hive-1.2.1/conf/hive-site.xml ./

? ? ? ? -3. 根據(jù)hive-site.xml中不同配置項,采用不同策略操作
根據(jù)hive.metastore.uris參數(shù)
-a. 當hive.metastore.uris參數(shù)為空的時候(默認值)
將Hive元數(shù)據(jù)庫的驅(qū)動jar文件添加spark的classpath環(huán)境變量中即可完成SparkSQL到hive的集成
-b. 當hive.metastore.uris非空時候
-1. 啟動hive的metastore服務(wù)
./bin/hive --service metastore &
-2. 完成SparkSQL與Hive集成工作

?? ??? ?-4.啟動spark-SQL($ bin/spark-sql)時候 發(fā)現(xiàn)報錯:

java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver

? ? ? ? at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

? ? ? ? at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

? ? ? ? at java.security.AccessController.doPrivileged(Native Method)

? ? ? ? at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

? ? ? ? at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

? ? ? ? at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

? ? ? ? at java.lang.Class.forName0(Native Method)

? ? ? ? at java.lang.Class.forName(Class.java:270)

? ? ? ? at org.apache.spark.util.Utils$.classForName(Utils.scala:228)

? ? ? ? at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:693)

? ? ? ? at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)

? ? ? ? at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)

? ? ? ? at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)

? ? ? ? at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.

You need to build Spark with -Phive and -Phive-thriftserver.

解決辦法:將spark源碼中sql/hive-thriftserver/target/spark-hive-thriftserver_2.11-2.0.2.jar拷貝到spark的jars目錄下

完成。(查看數(shù)據(jù)庫 spark-sql (default)> show databases; ,它操作的都是Hive)

??? 編寫兩個簡單的SQL

spark-sql (default)> select * from emp;

??? 也可以做兩張變的jion

spark-sql (default)> select a.*,b.* from emp a left join dept b on a.deptno = b.deptno;

可以對表進行一個緩存操作3

> cache table emp;    #緩存操作
> uncache table dept;    #清除緩存操作
> explain select * from emp;    #執(zhí)行計劃

我們可以看到相應(yīng)的Storage信息,執(zhí)行完清除緩存操作后下面的Stages操作消失

啟動一個Spark Shell,可以直接在shell里面編寫SQL語句

$ bin/spark-shell
#可以在shell里面寫sql
scala> spark.sql("show databases").show
scala> spark.sql("use common").show
scala> spark.sql("select * from emp a join dept b on a.deptno = b.deptno").show

?? ?? 用一個變量名稱接收DataFrame

??? 比如使用registerTempTable注冊一個臨時表。注:臨時表是所有數(shù)據(jù)庫公有的不需要指定數(shù)據(jù)庫

scala> df.registerTempTable("table_regis01")

Spark應(yīng)用依賴第三方j(luò)ar包文件解決方案?? ??? ?

在我們的4040頁面Environment節(jié)點下的Classpath Entries節(jié)點里可以看到我們服務(wù)所依賴的jar包。http://hadoop01.com:4040/environment/

??? 1.直接添加驅(qū)動jar到${SPARK_HOME}/jars

??? 2. 使用參數(shù)--jars 添加本地jar包
./bin/spark-shell --jars jars/mysql-connector-java-5.1.27-bin.jar,/opt/modules/hive-1.2.1/lib/servlet-api-2.5.jar
添加多個本地jar的話,用逗號隔開
./bin/spark-shell --jars jars/mysql-connector-java-5.1.27-bin.jar,/opt/modules/hive-1.2.1/lib/*
注意:不能使用*去添加jar包,如果想要添加多個依賴jar,只能一個一個去添加

3. 使用參數(shù)--packages添加maven中的第三方j(luò)ar文件
. bin/spark-shell --packages mysql:mysql-connector-java:5.1.28?? ????
可以使用逗號隔開給定多個,格式(groupId:artifactId:version)
(底層執(zhí)行原理先從maven中央庫下載本地沒有的第三方j(luò)ar文件到本地,jar文件會先下載到本地的/home/ijeffrey/.ivy2/jars目錄下,最后通過spark.jars來控制添加classpath中)
--exclude-packages?? ?去掉不需要的包
--repositories maven源,指定URL連接?? ?

4. 使用SPARK_CLASSPATH環(huán)境變量給定jar文件路徑?? ?
編輯spark-env.sh文件
SPARK_CLASSPATH=/opt/modules/apache/spark-2.0.2/external_jars/*????????? 外部jar的路徑
5. 將第三方j(luò)ar文件打包到最終的jar文件中?? ?
在IDEA中添加依賴jar到最終的需要運行的spark應(yīng)用的jar中

SparkSQL的ThriftServer服務(wù)

??? ThriftServer底層就是Hive的HiveServer2服務(wù),下面是客戶端連接Hive Server2 方法的相關(guān)連接
https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC?? ?
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics??? #hiveserver2的配置
https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2???

??? 配置:
-1. ThriftServer服務(wù)運行的Spark環(huán)境必須完成SparkSQL和Hive的集成
-2. hive-site.xml中配置hiveserver2服務(wù)的相關(guān)參數(shù)

<!-- 監(jiān)聽的端口號 -->
<property>
	<name>hive.server2.thrift.bind.port</name>
	<value>10000</value>
</property>
<!-- 監(jiān)聽的主機名 -->
<property>
	<name>hive.server2.thrift.bind.host</name>
	<value>hadoop01.com</value>
</property>

? ? ? ? -3. 啟動hive的元數(shù)據(jù)服務(wù)

$ ./bin/hive --service metastore &

? ? ? ? -4. 啟動spark的thriftserver服務(wù),也是一個SparkSubmit服務(wù)

$ sbin/start-thriftserver.sh 

??? 也可以看到相應(yīng)的WEBUI界面,比之前的多了一個JDBC/ODBC Server

注意:如果需要啟動Spark ThriftServer 服務(wù),需要關(guān)閉hiveserver2 服務(wù)

SparkSQL的ThriftServer服務(wù)測試

??? -1. 查看進程是否存在
jps -ml | grep HiveThriftServer2
-2. 查看WEB界面是否正常
有JDBC/ODBC Server這個選項就是正常的
-3. 通過spark自帶的beeline命令
./bin/beeline
-4. 通過jdbc來訪問spark的ThriftServer接口

Spark中beeline的使用

$ bin/beeline    #啟動beeline
#可以使用!help查看相應(yīng)的命令
beeline> !help
#如connect
beeline> !connect
Usage: connect <url> <username> <password> [driver]
#這樣可以多個用戶連接
beeline> !connect jdbc:hive2://hadoop01.com:10000
#退出
beeline> !quit

連接成功,在4040 頁面也可以看到我們連接的hive

注:如果報錯
No known driver to handle "jdbc:hive2://hadoop01.com:10000"
說明缺少了hive的驅(qū)動jar,在我們編譯好的源碼中hive-jdbc-1.2.1.spark2.jar 找到并copy到spark的jars中

通過jdbc來訪問spark的ThriftServer接口

向我們java連接mysql一樣,我們使用scala來連接ThriftServer

package com.jeffrey
 
import java.sql.DriverManager
 
object SparkJDBCThriftServerDemo {
    def main(args: Array[String]): Unit = {
        //1 添加驅(qū)動
        val driver = "org.apache.hive.jdbc.HiveDriver"
        Class.forName(driver)
 
        //2 構(gòu)建連接對象
        val url = "jdbc:hive2://hadoop01.com:10000"
        val conn = DriverManager.getConnection(url,"ijeffrey","123456")
 
        //3 sql 語句執(zhí)行
        conn.prepareStatement("use common").execute()
 
        var pstmt = conn.prepareStatement("select empno,ename,sal from emp")
 
        var rs = pstmt.executeQuery()
 
        while (rs.next()){
            println(s"empno = ${rs.getInt("empno")}  " +
                    s"ename=${rs.getString("ename")}   " +
                    s" sal=${rs.getDouble("sal")}")
        }
 
        println("---------------------------------------------------------------------------")
 
        pstmt = conn.prepareStatement("select empno,ename,sal from emp where sal > ? and ename = ?")
        pstmt.setDouble(1,3000)
        pstmt.setString(2,"KING")
 
        rs = pstmt.executeQuery()
 
        while (rs.next()){
            println(s"empno = ${rs.getInt("empno")}  " +
                    s"ename=${rs.getString("ename")}   " +
                    s" sal=${rs.getDouble("sal")}")
        }
 
        rs.close()
        pstmt.close()
        conn.close()
    }
}

執(zhí)行結(jié)果:

SparkSQL案例

案例一:SparkSQL讀取HDFS上Json格式的文件

?? ?1. 將案例數(shù)據(jù)上傳到HDFS上
樣例數(shù)據(jù)在${SPARK_HOME}/examples/src/main/resources/*

?? ?2. 編寫SparkSQL程序
啟動一個spark-shell進行編寫

scala> val path = "/spark/data/people.json"
scala> val df = spark.read.json(path)
scala> df.registerTempTable("tmp04") //通過DataFrame注冊一個臨時表
scala> spark.sql("show tables").show  //通過SQL語句進行操作
scala> spark.sql("select * from tmp04").show
 
#saveAsTable 使用之前 先要use table
scala> spark.sql("select * from tmp04").write.saveAsTable("test01")
#overwrite 覆蓋  append 拼接  ignore 忽略
scala> spark.sql("select * from tmp01").write.mode("overwrite").saveAsTable("test01")
scala> spark.sql("select * from tmp01").write.mode("append").saveAsTable("test01")
scala> spark.sql("select * from tmp01").write.mode("ignore").saveAsTable("test01")

? ? saveAsTable("test01")默認保存到一張不存在的表中(test01不是臨時表),如果表存在的話就會報錯

??? SaveMode四種情況:
Append:拼接
Overwrite: 重寫
ErrorIfExists:如果表已經(jīng)存在,則報錯,默認就是這一種,存在即報錯
Ignore:如果表已經(jīng)存在了,則忽略這一步操作

除了spark.read.json的方式去讀取數(shù)據(jù)外,還可以使用spark.sql的方式直接讀取數(shù)據(jù)

scala> spark.sql("select * from json.`/spark/data/people.json` where age is not null").show 
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+
# hdfs上的路徑使用`(反票號)引起來

案例二:DataFrame和Dataset和RDD之間的互相轉(zhuǎn)換

??? 在IDEA中集成Hive的話,需要將hive-site.xml文件放到resources目錄下面

package com.jeffrey.sql
 
import java.util.Properties
 
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 
object HiveJoinMySQLDemo {
    def main(args: Array[String]): Unit = {
        System.setProperty("hadoop.home.dir","D:\\hadoop-2.7.3")
        // 1.構(gòu)建SparkSession
        val warehouseLocation = "/user/hive/warehouse"
 
        val spark = SparkSession
                .builder()
                .master("local")    //如果放到集群運行需要注釋掉
                .appName("RDD 2 DataFrame")
                .config("spark.sql.warehouse.dir",warehouseLocation)
                .enableHiveSupport()
                .getOrCreate()
 
        import spark.implicits._
        import spark.sql
 
        val url = "jdbc:mysql://hadoop01.com:3306/test"
        val table = "tb_dept"
 
        val props = new Properties()
        props.put("user","root")
        props.put("password","123456")
 
        // 1.Hive表數(shù)據(jù)導入到MySQL中    在shell中可以使用paste寫多行
        spark.read.table("common.dept")
                .write
                .mode(SaveMode.Overwrite)
                .jdbc(url,table,props)
 
        // 2.Hive和MySQL的join操作
        //2.1 讀取MySQL的數(shù)據(jù)
       val df: DataFrame = spark
                .read
                .jdbc(url,table,props)
 
        df.createOrReplaceTempView("tmp_tb_dept")
        //2.1 數(shù)據(jù)聚合
        spark.sql(
            """
              |select a.*,b.dname,b.loc
              |from common.emp a
              |join tmp_tb_dept b on a.deptno = b.deptno
            """.stripMargin).createOrReplaceTempView("tmp_emp_join_dept_result")
 
        spark.sql("select * from tmp_emp_join_dept_result").show()
 
        // 對表進行緩存的方法
        spark.read.table("tmp_emp_join_dept_result").cache()
        spark.catalog.cacheTable("tmp_emp_join_dept_result")
 
        //輸出到HDFS上
        // 方法一
        /*spark
                .read
                .table("tmp_emp_join_dept_result")
                .write.parquet("/spark/sql/hive_join_mysql")*/
 
        // 方法二
        spark
                .read
                .table("tmp_emp_join_dept_result")
                .write
                .format("parquet")
                .save(s"hdfs://hadoop01.com:8020/spark/sql/hive_join_mysql/${System.currentTimeMillis()}")
 
 
        //輸出到Hive中,并且是parquet格式 按照deptno分區(qū)
        spark
                .read
                .table("tmp_emp_join_dept_result")
                .write
                .format("parquet")
                .partitionBy("deptno")
                .mode(SaveMode.Overwrite)
                .saveAsTable("hive_emp_dept")
 
        println("------------------------------------------------------------")
 
        spark.sql("show tables").show()
 
        //清空緩存
        spark.catalog.uncacheTable("tmp_emp_join_dept_result")
 
    }
}

可以打成jar文件放在集群上執(zhí)行

bin/spark-submit \
--class com.jeffrey.sql.HiveJoinMySQLDemo \
--master yarn \
--deploy-mode client \
/opt/datas/jar/hivejoinmysql.jar
 
 
bin/spark-submit \
--class com.jeffrey.sql.HiveJoinMySQLDemo \
--master yarn \
--deploy-mode cluster \
/opt/datas/logAnalyze.jar

以上即使Spark?SQL的基本使用。

SparkSQL的函數(shù)

HIve支持的函數(shù),SparkSQL基本都是支持的,SparkSQL支持兩種自定義函數(shù),分別是:UDF和UDAF,兩種函數(shù)都是通過SparkSession的udf屬性進行函數(shù)的注冊使用的;SparkSQL不支持UDTF函數(shù)的 自定義使用。

☆ UDF:一條數(shù)據(jù)輸入,一條數(shù)據(jù)輸出,一對一的函數(shù),即普通函數(shù)

☆ UDAF:多條數(shù)據(jù)輸入,一條數(shù)據(jù)輸出,多對一的函數(shù),即聚合函數(shù)

下一篇會寫一下SparkSQL自定義函數(shù)的案例以及其關(guān)于SparkSQL其他的案例 ^_^

到此這篇關(guān)于Spark SQL配置及使用的文章就介紹到這了,更多相關(guān)Spark SQL配置內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring MVC 簡單的hello world的實現(xiàn)

    Spring MVC 簡單的hello world的實現(xiàn)

    這篇文章主要介紹了Spring MVC 簡單的hello world的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-01-01
  • Java多線程產(chǎn)生死鎖的必要條件

    Java多線程產(chǎn)生死鎖的必要條件

    今天小編就為大家分享一篇關(guān)于Java多線程產(chǎn)生死鎖的必要條件,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-01-01
  • Java基礎(chǔ)-Java編程語言發(fā)展史

    Java基礎(chǔ)-Java編程語言發(fā)展史

    這篇文章主要介紹了Java基礎(chǔ)-Java編程語言發(fā)展簡史,Java源自Sun公司的一個叫Green的項目,其原先的目的是為家用電子消費產(chǎn)品開發(fā)一個分布式代碼系統(tǒng),這樣就可以將通信和控制信息發(fā)給電冰箱、電視機、烤面包機等家用電器,對它們進行控制和信息交流,需要的朋友可以參考一下
    2022-01-01
  • java web項目實現(xiàn)文件下載實例代碼

    java web項目實現(xiàn)文件下載實例代碼

    現(xiàn)在項目里面有個需求,需要把系統(tǒng)產(chǎn)生的日志文件給下載到本地 先獲取所有的日志文件列表,顯示到界面,選擇一個日志文件,把文件名傳到后臺
    2013-09-09
  • 使用Java進行FreeMarker的web模板開發(fā)的基礎(chǔ)教程

    使用Java進行FreeMarker的web模板開發(fā)的基礎(chǔ)教程

    這篇文章主要介紹了使用Java進行FreeMarker模板引擎開發(fā)的基礎(chǔ)教程,文中針對FreeMarker的網(wǎng)頁標簽用法給出了一些例子,需要的朋友可以參考下
    2016-03-03
  • Java異常處理實例教程

    Java異常處理實例教程

    這篇文章主要為大家分享一份非常詳細的Java異常處理實例教程,幫助大家更好的學習java異常處理,感興趣的小伙伴們可以參考一下
    2016-02-02
  • Spring?控制反轉(zhuǎn)和依賴注入的具體使用

    Spring?控制反轉(zhuǎn)和依賴注入的具體使用

    本文主要介紹了Spring?控制反轉(zhuǎn)和依賴注入,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • Java Arrays.sort()如何實現(xiàn)對int類型數(shù)組倒序排序

    Java Arrays.sort()如何實現(xiàn)對int類型數(shù)組倒序排序

    這篇文章主要介紹了Java Arrays.sort()如何實現(xiàn)對int類型數(shù)組倒序排序問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • Maven的概述及基本使用示例詳解

    Maven的概述及基本使用示例詳解

    MApache Maven是一個項目管理和構(gòu)建工具,它基于項目對象模型POM的概念,通過一小段描述信息來管理項目的構(gòu)建、報告和文檔,aven是專門用于管理和構(gòu)建Java項目的工具,本文給大家介紹Maven的概述及基本使用,感興趣的朋友一起看看吧
    2023-07-07
  • 好用!解決maven包沖突的插件

    好用!解決maven包沖突的插件

    今天的主要內(nèi)容是介紹一款插件,該插件的主要用途是當maven包沖突了以后,使用這款插件直接解決問題。
    2020-10-10

最新評論