欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java和scala實(shí)現(xiàn) Spark RDD轉(zhuǎn)換成DataFrame的兩種方法小結(jié)

 更新時(shí)間:2018年06月07日 09:04:25   作者:黑白調(diào)92  
今天小編就為大家分享一篇Java和scala實(shí)現(xiàn) Spark RDD轉(zhuǎn)換成DataFrame的兩種方法小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧

一:準(zhǔn)備數(shù)據(jù)源

在項(xiàng)目下新建一個(gè)student.txt文件,里面的內(nèi)容為:

1,zhangsan,20 
2,lisi,21 
3,wanger,19 
4,fangliu,18 

二:實(shí)現(xiàn)

Java版:

1.首先新建一個(gè)student的Bean對(duì)象,實(shí)現(xiàn)序列化和toString()方法,具體代碼如下:

package com.cxd.sql;
import java.io.Serializable;
@SuppressWarnings("serial")
public class Student implements Serializable {
 String sid;
 String sname;
 int sage;
 public String getSid() {
  return sid;
 }
 public void setSid(String sid) {
  this.sid = sid;
 }
 public String getSname() {
  return sname;
 }
 public void setSname(String sname) {
  this.sname = sname;
 }
 public int getSage() {
  return sage;
 }
 public void setSage(int sage) {
  this.sage = sage;
 }
 @Override
 public String toString() {
  return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
 }
 
}
		

2.轉(zhuǎn)換,具體代碼如下

package com.cxd.sql;
import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class TxtToParquetDemo {
 public static void main(String[] args) {
  
  SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
  SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
  reflectTransform(spark);//Java反射
  dynamicTransform(spark);//動(dòng)態(tài)轉(zhuǎn)換
 }
 
 /**
  * 通過(guò)Java反射轉(zhuǎn)換
  * @param spark
  */
 private static void reflectTransform(SparkSession spark)
 {
  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
  
  JavaRDD<Student> rowRDD = source.map(line -> {
   String parts[] = line.split(",");
   Student stu = new Student();
   stu.setSid(parts[0]);
   stu.setSname(parts[1]);
   stu.setSage(Integer.valueOf(parts[2]));
   return stu;
  });
  
  Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
  df.select("sid", "sname", "sage").
  coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
 }
 /**
  * 動(dòng)態(tài)轉(zhuǎn)換
  * @param spark
  */
 private static void dynamicTransform(SparkSession spark)
 {
  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
  
  JavaRDD<Row> rowRDD = source.map( line -> {
   String[] parts = line.split(",");
   String sid = parts[0];
   String sname = parts[1];
   int sage = Integer.parseInt(parts[2]);
   
   return RowFactory.create(
     sid,
     sname,
     sage
     );
  });
  
  ArrayList<StructField> fields = new ArrayList<StructField>();
  StructField field = null;
  field = DataTypes.createStructField("sid", DataTypes.StringType, true);
  fields.add(field);
  field = DataTypes.createStructField("sname", DataTypes.StringType, true);
  fields.add(field);
  field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
  fields.add(field);
  
  StructType schema = DataTypes.createStructType(fields);
  
  Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
  df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
  
  
 }
 
}

scala版本:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
object RDD2Dataset {
 
 case class Student(id:Int,name:String,age:Int)
 def main(args:Array[String])
 {
 
 val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
 import spark.implicits._
 reflectCreate(spark)
 dynamicCreate(spark)
 }
 
 /**
	 * 通過(guò)Java反射轉(zhuǎn)換
	 * @param spark
	 */
 private def reflectCreate(spark:SparkSession):Unit={
 import spark.implicits._
 val stuRDD=spark.sparkContext.textFile("student2.txt")
 //toDF()為隱式轉(zhuǎn)換
 val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
 //stuDf.select("id","name","age").write.text("result") //對(duì)寫(xiě)入文件指定列名
 stuDf.printSchema()
 stuDf.createOrReplaceTempView("student")
 val nameDf=spark.sql("select name from student where age<20")
 //nameDf.write.text("result") //將查詢結(jié)果寫(xiě)入一個(gè)文件
 nameDf.show()
 }
 
 /**
	 * 動(dòng)態(tài)轉(zhuǎn)換
	 * @param spark
	 */
 private def dynamicCreate(spark:SparkSession):Unit={
 val stuRDD=spark.sparkContext.textFile("student.txt")
 import spark.implicits._
 val schemaString="id,name,age"
 val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
 val schema=StructType(fields)
 val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
 val stuDf=spark.createDataFrame(rowRDD, schema)
  stuDf.printSchema()
 val tmpView=stuDf.createOrReplaceTempView("student")
 val nameDf=spark.sql("select name from student where age<20")
 //nameDf.write.text("result") //將查詢結(jié)果寫(xiě)入一個(gè)文件
 nameDf.show()
 }
}

注:

1.上面代碼全都已經(jīng)測(cè)試通過(guò),測(cè)試的環(huán)境為spark2.1.0,jdk1.8。

2.此代碼不適用于spark2.0以前的版本。

以上這篇Java和scala實(shí)現(xiàn) Spark RDD轉(zhuǎn)換成DataFrame的兩種方法小結(jié)就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Springboot事務(wù)失效的幾種情況解讀

    Springboot事務(wù)失效的幾種情況解讀

    這篇文章主要介紹了Springboot事務(wù)失效的幾種情況解讀,因?yàn)镾pring AOP默認(rèn)使用動(dòng)態(tài)代理,會(huì)給被代理的類生成一個(gè)代理類,事務(wù)相關(guān)的操作都通過(guò)代理來(lái)完成,使用內(nèi)部方法調(diào)用時(shí),使用的是實(shí)例調(diào)用,沒(méi)有通過(guò)代理類調(diào)用方法,因此事務(wù)不會(huì)檢測(cè)到失敗,需要的朋友可以參考下
    2023-10-10
  • SpringMVC視圖作用詳解

    SpringMVC視圖作用詳解

    這篇文章主要介紹了springMVC中的視圖與視圖解析器,springMVC視圖的種類很多,默認(rèn)有轉(zhuǎn)發(fā)視圖和重定向視圖,本文就每一種視圖給大家詳細(xì)介紹,需要的朋友可以參考下
    2022-11-11
  • SpringBoot如何基于POI-tl和word模板導(dǎo)出龐大的Word文件

    SpringBoot如何基于POI-tl和word模板導(dǎo)出龐大的Word文件

    這篇文章主要介紹了SpringBoot如何基于POI-tl和word模板導(dǎo)出龐大的Word文件,poi-tl是一個(gè)基于Apache?POI的Word模板引擎,也是一個(gè)免費(fèi)開(kāi)源的Java類庫(kù)
    2022-08-08
  • Java實(shí)戰(zhàn)項(xiàng)目練習(xí)之球館在線預(yù)約系統(tǒng)的實(shí)現(xiàn)

    Java實(shí)戰(zhàn)項(xiàng)目練習(xí)之球館在線預(yù)約系統(tǒng)的實(shí)現(xiàn)

    理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SpringBoot+maven+freemark+Mysql實(shí)現(xiàn)一個(gè)球館在線預(yù)約系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平
    2022-01-01
  • 注解@TableName,@TableField,pgsql的模式對(duì)應(yīng)方式

    注解@TableName,@TableField,pgsql的模式對(duì)應(yīng)方式

    這篇文章主要介紹了注解@TableName,@TableField,pgsql的模式對(duì)應(yīng)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-04-04
  • Java關(guān)鍵字finally_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    Java關(guān)鍵字finally_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    java關(guān)鍵字finally不管是否出現(xiàn)異常,finally子句總是在塊完成之前執(zhí)行。下面通過(guò)實(shí)現(xiàn)代碼給大家介紹Java關(guān)鍵字finally相關(guān)知識(shí),需要的的朋友參考下吧
    2017-04-04
  • SpringCloud Zuul服務(wù)功能與使用方法解析

    SpringCloud Zuul服務(wù)功能與使用方法解析

    這篇文章主要介紹了SpringCloud Zuul服務(wù)功能與使用方法解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-05-05
  • Java開(kāi)啟線程的四種方法案例詳解

    Java開(kāi)啟線程的四種方法案例詳解

    這篇文章主要介紹了Java開(kāi)啟線程的四種方法,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-02-02
  • 詳談Lock與synchronized 的區(qū)別

    詳談Lock與synchronized 的區(qū)別

    下面小編就為大家?guī)?lái)一篇詳談Lock與synchronized 的區(qū)別。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-05-05
  • springboot @RequiredArgsConstructor的概念與使用方式

    springboot @RequiredArgsConstructor的概念與使用方式

    這篇文章主要介紹了springboot @RequiredArgsConstructor的概念與使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-09-09

最新評(píng)論