Spark SQL 2.4.8 操作 Dataframe的兩種方式
一、測(cè)試數(shù)據(jù)
7369,SMITH,CLERK,7902,1980/12/17,800,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,20
7839,KING,PRESIDENT,1981/11/17,5000,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,20
7900,JAMES,CLERK,7698,1981/12/3,9500,30
7902,FORD,ANALYST,7566,1981/12/3,3000,20
7934,MILLER,CLERK,7782,1982/1/23,1300,10
二、創(chuàng)建DataFrame
方式一:DSL方式操作
- 實(shí)例化SparkContext和SparkSession對(duì)象
- 利用StructType類型構(gòu)建schema,用于定義數(shù)據(jù)的結(jié)構(gòu)信息
- 通過SparkContext對(duì)象讀取文件,生成RDD
- 將RDD[String]轉(zhuǎn)換成RDD[Row]
- 通過SparkSession對(duì)象創(chuàng)建dataframe
- 完整代碼如下:
package com.scala.demo.sql import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} object Demo01 { def main(args: Array[String]): Unit = { // 1.創(chuàng)建SparkContext和SparkSession對(duì)象 val sc = new SparkContext(new SparkConf().setAppName("Demo01").setMaster("local[2]")) val sparkSession = SparkSession.builder().getOrCreate() // 2. 使用StructType來定義Schema val mySchema = StructType(List( StructField("empno", DataTypes.IntegerType, false), StructField("ename", DataTypes.StringType, false), StructField("job", DataTypes.StringType, false), StructField("mgr", DataTypes.StringType, false), StructField("hiredate", DataTypes.StringType, false), StructField("sal", DataTypes.IntegerType, false), StructField("comm", DataTypes.StringType, false), StructField("deptno", DataTypes.IntegerType, false) )) // 3. 讀取數(shù)據(jù) val empRDD = sc.textFile("file:///D:\\TestDatas\\emp.csv") // 4. 將其映射成ROW對(duì)象 val rowRDD = empRDD.map(line => { val strings = line.split(",") Row(strings(0).toInt, strings(1), strings(2), strings(3), strings(4), strings(5).toInt,strings(6), strings(7).toInt) }) // 5. 創(chuàng)建DataFrame val dataFrame = sparkSession.createDataFrame(rowRDD, mySchema) // 6. 展示內(nèi)容 DSL dataFrame.groupBy("deptno").sum("sal").as("result").sort("sum(sal)").show() } }
結(jié)果如下:
方式二:SQL方式操作
- 實(shí)例化SparkContext和SparkSession對(duì)象
- 創(chuàng)建case class Emp樣例類,用于定義數(shù)據(jù)的結(jié)構(gòu)信息
- 通過SparkContext對(duì)象讀取文件,生成RDD[String]
- 將RDD[String]轉(zhuǎn)換成RDD[Emp]
- 引入spark隱式轉(zhuǎn)換函數(shù)(必須引入)
- 將RDD[Emp]轉(zhuǎn)換成DataFrame
- 將DataFrame注冊(cè)成一張視圖或者臨時(shí)表
- 通過調(diào)用SparkSession對(duì)象的sql函數(shù),編寫sql語句
- 停止資源
- 具體代碼如下:
package com.scala.demo.sql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} // 0. 數(shù)據(jù)分析 // 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30 // 1. 定義Emp樣例類 case class Emp(empNo:Int,empName:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptNo:Int) object Demo02 { def main(args: Array[String]): Unit = { // 2. 讀取數(shù)據(jù)將其映射成Row對(duì)象 val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo02")) val mapRdd = sc.textFile("file:///D:\\TestDatas\\emp.csv") .map(_.split(",")) val rowRDD:RDD[Emp] = mapRdd.map(line => Emp(line(0).toInt, line(1), line(2), line(3), line(4), line(5).toInt, line(6), line(7).toInt)) // 3。創(chuàng)建dataframe val spark = SparkSession.builder().getOrCreate() // 引入spark隱式轉(zhuǎn)換函數(shù) import spark.implicits._ // 將RDD轉(zhuǎn)成Dataframe val dataFrame = rowRDD.toDF // 4.2 sql語句操作 // 1、將dataframe注冊(cè)成一張臨時(shí)表 dataFrame.createOrReplaceTempView("emp") // 2. 編寫sql語句進(jìn)行操作 spark.sql("select deptNo,sum(sal) as total from emp group by deptNo order by total desc").show() // 關(guān)閉資源 spark.stop() sc.stop() } }
結(jié)果如下:
到此這篇關(guān)于Spark SQL 2.4.8 操作 Dataframe的兩種方式的文章就介紹到這了,更多相關(guān)Spark SQL 操作 Dataframe內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
萬能密碼的SQL注入漏洞其PHP環(huán)境搭建及防御手段
這篇文章主要介紹了萬能密碼的SQL注入漏洞其PHP環(huán)境搭建及防御手段,對(duì)此感興趣的小伙伴趕快收藏起來吧2021-09-09Transactional replication(事務(wù)復(fù)制)詳解之如何跳過一個(gè)事務(wù)
事務(wù)復(fù)制由 SQL Server 快照代理、日志讀取器代理和分發(fā)代理實(shí)現(xiàn)。 快照代理準(zhǔn)備快照文件(其中包含了已發(fā)布表和數(shù)據(jù)庫對(duì)象的架構(gòu)和數(shù)據(jù)),然后將這些文件存儲(chǔ)在快照文件夾中,并在分發(fā)服務(wù)器中的分發(fā)數(shù)據(jù)庫中記錄同步作業(yè)。2014-08-08SQL Server SQL高級(jí)查詢語句小結(jié)
高級(jí)查詢?cè)跀?shù)據(jù)庫中用得是最頻繁的,也是應(yīng)用最廣泛的。 學(xué)習(xí)sqlserver的朋友可以參考下。2011-07-07Spark SQL數(shù)據(jù)加載和保存實(shí)例講解
這篇文章主要以實(shí)例講解的方式為大家詳細(xì)介紹了Spark SQL數(shù)據(jù)加載和保存的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-11-11Sql中將datetime轉(zhuǎn)換成字符串的方法(CONVERT)
這篇文章主要介紹了Sql中將datetime轉(zhuǎn)換成字符串的方法,需要的朋友可以參考下2014-04-04深入SQL SERVER合并相關(guān)操作Union,Except,Intersect的詳解
本篇文章是對(duì)SQL SERVER合并相關(guān)操作Union,Except,Intersect進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-06-06自動(dòng)化收集SQLSERVER診斷信息的工具選擇及使用介紹
相信很多人都遇到過當(dāng)SQLSERVER出現(xiàn)問題的時(shí)候,如果想解決的話一般需要你收集一些系統(tǒng)信息和SQLSERVER診斷信息;接下來將介紹下工具的選擇及使用,感興趣的你可不要錯(cuò)過了哈,或許本文的知識(shí)點(diǎn)可以幫助到你2013-02-02sql server 還原數(shù)據(jù)庫時(shí)提示數(shù)據(jù)庫正在使用,無法進(jìn)行操作的解決方法
sql server 還原數(shù)據(jù)庫時(shí)提示:數(shù)據(jù)庫正在使用,無法進(jìn)行操作的解決方法2013-03-03SQL Server 索引結(jié)構(gòu)及其使用(一)--深入淺出理解索引結(jié)構(gòu)
深入淺出理解索引結(jié)構(gòu)2009-04-04