IDEA 開發(fā)配置SparkSQL及簡單使用案例代碼
1.添加依賴
在idea項目的pom.xml中添加依賴。
<!--spark sql依賴,注意版本號--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0</version> </dependency>
2.案例代碼
package com.zf.bigdata.spark.sql import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Spark01_SparkSql_Basic { def main(args: Array[String]): Unit = { //創(chuàng)建上下文環(huán)境配置對象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql") //創(chuàng)建 SparkSession 對象 val spark = SparkSession.builder().config(sparkConf).getOrCreate() // DataFrame val df: DataFrame = spark.read.json("datas/user.json") //df.show() // DataFrame => Sql //df.createOrReplaceTempView("user") //spark.sql("select * from user").show() //spark.sql("select age from user").show() //spark.sql("select avg(age) from user").show() //DataFrame => Dsl //如果涉及到轉換操作,轉換需要引入隱式轉換規(guī)則,否則無法轉換,比如使用$提取數(shù)據的值 //spark 不是包名,是上下文環(huán)境對象名 import spark.implicits._ //df.select("age","username").show() //df.select($"age"+1).show() //df.select('age+1).show() // DataSet //val seq = Seq(1,2,3,4) //val ds: Dataset[Int] = seq.toDS() // ds.show() // RDD <=> DataFrame val rdd = spark.sparkContext.makeRDD(List((1,"張三",10),(2,"李四",20))) val df1: DataFrame = rdd.toDF("id", "name", "age") val rdd1: RDD[Row] = df1.rdd // DataFrame <=> DataSet val ds: Dataset[User] = df1.as[User] val df2: DataFrame = ds.toDF() // RDD <=> DataSet val ds1: Dataset[User] = rdd.map { case (id, name, age) => { User(id, name = name, age = age) } }.toDS() val rdd2: RDD[User] = ds1.rdd spark.stop() } case class User(id:Int,name:String,age:Int) }
PS:下面看下在IDEA中開發(fā)Spark SQL程序
IDEA 中程序的打包和運行方式都和 SparkCore 類似,Maven 依賴中需要添加新的依賴項:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency>
一、指定Schema格式
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row object Demo1 { def main(args: Array[String]): Unit = { //使用Spark Session 創(chuàng)建表 val spark = SparkSession.builder().master("local").appName("UnderstandSparkSession").getOrCreate() //從指定地址創(chuàng)建RDD val personRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t")) //通過StructType聲明Schema val schema = StructType( List( StructField("id", IntegerType), StructField("name", StringType), StructField("age", IntegerType))) //把RDD映射到rowRDD val rowRDD = personRDD.map(p=>Row(p(0).toInt,p(1),p(2).toInt)) val personDF = spark.createDataFrame(rowRDD, schema) //注冊表 personDF.createOrReplaceTempView("t_person") //執(zhí)行SQL val df = spark.sql("select * from t_person order by age desc limit 4") df.show() spark.stop() } }
二、使用case class
import org.apache.spark.sql.SparkSession //使用case class object Demo2 { def main(args: Array[String]): Unit = { //創(chuàng)建SparkSession val spark = SparkSession.builder().master("local").appName("CaseClassDemo").getOrCreate() //從指定的文件中讀取數(shù)據,生成對應的RDD val lineRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t")) //將RDD和case class 關聯(lián) val studentRDD = lineRDD.map( x => Student(x(0).toInt,x(1),x(2).toInt)) //生成 DataFrame,通過RDD 生成DF,導入隱式轉換 import spark.sqlContext.implicits._ val studentDF = studentRDD.toDF //注冊表 視圖 studentDF.createOrReplaceTempView("student") //執(zhí)行SQL spark.sql("select * from student").show() spark.stop() } } //case class 一定放在外面 case class Student(stuID:Int,stuName:String,stuAge:Int)
三、把數(shù)據保存到數(shù)據庫
import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.Row import java.util.Properties object Demo3 { def main(args: Array[String]): Unit = { //使用Spark Session 創(chuàng)建表 val spark = SparkSession.builder().master("local").appName("UnderstandSparkSession").getOrCreate() //從指定地址創(chuàng)建RDD val personRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t")) //通過StructType聲明Schema val schema = StructType( List( StructField("id", IntegerType), StructField("name", StringType), StructField("age", IntegerType))) //把RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1), p(2).toInt)) val personDF = spark.createDataFrame(rowRDD, schema) //注冊表 personDF.createOrReplaceTempView("person") //執(zhí)行SQL val df = spark.sql("select * from person ") //查看SqL內容 //df.show() //將結果保存到mysql中 val props = new Properties() props.setProperty("user", "root") props.setProperty("password", "123456") props.setProperty("driver", "com.mysql.jdbc.Driver") df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props) spark.close() } }
以上內容轉自:
https://blog.csdn.net/weixin_43520450/article/details/106093582
作者:故明所以
到此這篇關于IDEA 開發(fā)配置SparkSQL及簡單使用案例代碼的文章就介紹到這了,更多相關IDEA 開發(fā) SparkSQL內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
MyBatis寫入Json字段以及Json字段轉對象示例詳解
這篇文章主要給大家介紹了關于MyBatis寫入Json字段以及Json字段轉對象的相關資料,文中通過實例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-07-07MyBatis實現(xiàn)數(shù)據庫類型和Java類型的轉換
MyBatis 在處理數(shù)據庫查詢結果或傳遞參數(shù)時,需要將數(shù)據庫類型與 Java 類型之間進行轉換,本文就給大家介紹MyBatis如何實現(xiàn)數(shù)據庫類型和 Java 類型的轉換的,需要的朋友可以參考下2024-09-09Java用Arrays.fill()初始化二維數(shù)組的實現(xiàn)
這篇文章主要介紹了Java用Arrays.fill()初始化二維數(shù)組的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-01-01Java使用Maven BOM統(tǒng)一管理版本號的實現(xiàn)
這篇文章主要介紹了Java使用Maven BOM統(tǒng)一管理版本號的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04java輸入多個數(shù)據(不確定),排序,并求最大值的方法
今天小編就為大家分享一篇java輸入多個數(shù)據(不確定),排序,并求最大值的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-07-07解決IDEA2021版compiler.automake.allow.when.app.running不存在的問題
很多文章介紹IntelliJ IDEA開啟熱部署功能都會寫到在IntelliJ IDEA中的注冊表中開啟compiler.automake.allow.when.app.running選項,此選項在IntelliJ IDEA 2021.2之后的版本遷移到高級設置中,下面看下設置方法2021-09-09