欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spark中的數(shù)據(jù)讀取保存和累加器實(shí)例詳解

 更新時間:2022年11月02日 11:13:12   作者:欣xy  
這篇文章主要為大家介紹了Spark中的數(shù)據(jù)讀取保存和累加器實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

數(shù)據(jù)讀取與保存

Text文件

對于 Text文件的讀取和保存 ,其語法和實(shí)現(xiàn)是最簡單的,因此我只是簡單敘述一下這部分相關(guān)知識點(diǎn),大家可以結(jié)合demo具體分析記憶。

1)基本語法

(1)數(shù)據(jù)讀?。簍extFile(String)

(2)數(shù)據(jù)保存:saveAsTextFile(String)

2)實(shí)現(xiàn)代碼demo如下:

object Operate_Text {
    def main(args: Array[String]): Unit = {
        //1.創(chuàng)建SparkConf并設(shè)置App名稱
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
        //2.創(chuàng)建SparkContext,該對象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
        //3.1 讀取輸入文件
        val inputRDD: RDD[String] = sc.textFile("input/demo.txt")
        //3.2 保存數(shù)據(jù)
        inputRDD.saveAsTextFile("textFile")
        //4.關(guān)閉連接
        sc.stop()
    }
}

Sequence文件

SequenceFile文件 是Hadoop中用來存儲二進(jìn)制形式的 key-value對 的一種平面文件(Flat File)。在SparkContext中,可以通過調(diào)用 sequenceFile[ keyClass,valueClass ] (path) 來調(diào)用。

1)基本語法

  • (1)數(shù)據(jù)讀?。簊equenceFile[ keyClass, valueClass ] (path)
  • (2)數(shù)據(jù)保存:saveAsSequenceFile(String)

2)實(shí)現(xiàn)代碼demo如下:

object Operate_Sequence {
    def main(args: Array[String]): Unit = {
        //1.創(chuàng)建SparkConf并設(shè)置App名稱
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
        //2.創(chuàng)建SparkContext,該對象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
        //3.1 創(chuàng)建rdd
        val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9)))
        //3.2 保存數(shù)據(jù)為SequenceFile
        dataRDD.saveAsSequenceFile("seqFile")
        //3.3 讀取SequenceFile文件
        sc.sequenceFile[Int,Int]("seqFile").collect().foreach(println)
        //4.關(guān)閉連接
        sc.stop()
    }
}

Object對象文件

對象文件是將對象序列化后保存的文件,采用Hadoop的序列化機(jī)制??梢酝ㄟ^ objectFile[ k , v ] (path) 函數(shù)接收一個路徑,讀取對象文件,返回對應(yīng)的RDD,也可以通過調(diào)用 saveAsObjectFile() 實(shí)現(xiàn)對對象文件的輸出。因?yàn)橐蛄谢砸付愋汀?/p>

1)基本語法

  • (1)數(shù)據(jù)讀取:objectFile[ k , v ] (path)
  • (2)數(shù)據(jù)保存:saveAsObjectFile(String)

2)實(shí)現(xiàn)代碼demo如下:

object Operate_Object {
    def main(args: Array[String]): Unit = {
        //1.創(chuàng)建SparkConf并設(shè)置App名稱
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
        //2.創(chuàng)建SparkContext,該對象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
        //3.1 創(chuàng)建RDD
        val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2)
        //3.2 保存數(shù)據(jù)
        dataRDD.saveAsObjectFile("objFile")
        //3.3 讀取數(shù)據(jù)
        sc.objectFile[Int]("objFile").collect().foreach(println)
        //4.關(guān)閉連接
        sc.stop()
    }
}

累加器

累加器概念

累加器,是一種變量---分布式共享只寫變量。僅支持“add”,支持并發(fā),但Executor和Executor之間不能讀數(shù)據(jù),可實(shí)現(xiàn)所有分片處理時更新共享變量的功能。

累加器用來把Executor端變量信息聚合到Driver端。在Driver中定義的一個變量,在Executor端的每個task都會得到這個變量的一份新的副本,每個task更新這些副本的值后,傳回Driver端進(jìn)行合并計(jì)算。

系統(tǒng)累加器

1)累加器定義(SparkContext.accumulator(initialValue)方法)

val sum: LongAccumulator = sc.longAccumulator("sum")

2)累加器添加數(shù)據(jù)(累加器.add方法)

sum.add(count)

3)累加器獲取數(shù)據(jù)(累加器.value)

sum.value

注意:Executor端的任務(wù)不能讀取累加器的值(例如:在Executor端調(diào)用sum.value,獲取的值不是累加器最終的值)。因此我們說,累加器是一個分布式共享只寫變量。

4)累加器要放在行動算子中

因?yàn)檗D(zhuǎn)換算子執(zhí)行的次數(shù)取決于job的數(shù)量,如果一個 spark應(yīng)用 有多個行動算子,那么轉(zhuǎn)換算子中的累加器可能會發(fā)生不止一次更新,導(dǎo)致結(jié)果錯誤。所以,如果想要一個無論在失敗還是重復(fù)計(jì)算時都絕對可靠的累加器,必須把它放在foreach()這樣的行動算子中。

5) 代碼實(shí)現(xiàn):

object accumulator_system {
package com.atguigu.cache
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object accumulator_system {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
    //需求:統(tǒng)計(jì)a出現(xiàn)的所有次數(shù) ("a",10)
    //普通算子實(shí)現(xiàn) reduceByKey 代碼會走shuffle 效率低
    val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _)
    //累加器實(shí)現(xiàn)
    //1 聲明累加器
    val accSum: LongAccumulator = sc.longAccumulator("sum")
    dataRDD.foreach{
      case (a,count) => {
        //2 使用累加器累加  累加器.add()
        accSum.add(count)
        // 4 不在executor端獲取累加器的值,因?yàn)榈玫降闹挡粶?zhǔn)確,所以累加器叫分布式共享只寫變量
        //println("sum = " + accSum.value)
      }
    }
    //3 獲取累加器的值 累加器.value
    println(("a",accSum.value))
    sc.stop()
  }
}

以上就是Spark中的數(shù)據(jù)讀取保存和累加器實(shí)例詳解的詳細(xì)內(nèi)容,更多關(guān)于Spark數(shù)據(jù)讀取保存累加器的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論