欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spark的廣播變量和累加器使用方法代碼示例

 更新時間:2017年09月29日 10:57:24   作者:唐予之_  
這篇文章主要介紹了Spark的廣播變量和累加器使用方法代碼示例,文中介紹了廣播變量和累加器的含義,然后通過實例演示了其用法,需要的朋友可以參考下。

一、廣播變量和累加器

通常情況下,當向Spark操作(如map,reduce)傳遞一個函數(shù)時,它會在一個遠程集群節(jié)點上執(zhí)行,它會使用函數(shù)中所有變量的副本。這些變量被復(fù)制到所有的機器上,遠程機器上并沒有被更新的變量會向驅(qū)動程序回傳。在任務(wù)之間使用通用的,支持讀寫的共享變量是低效的。盡管如此,Spark提供了兩種有限類型的共享變量,廣播變量和累加器。

1.1 廣播變量:

廣播變量允許程序員將一個只讀的變量緩存在每臺機器上,而不用在任務(wù)之間傳遞變量。廣播變量可被用于有效地給每個節(jié)點一個大輸入數(shù)據(jù)集的副本。Spark還嘗試使用高效地廣播算法來分發(fā)變量,進而減少通信的開銷。

Spark的動作通過一系列的步驟執(zhí)行,這些步驟由分布式的shuffle操作分開。Spark自動地廣播每個步驟每個任務(wù)需要的通用數(shù)據(jù)。這些廣播數(shù)據(jù)被序列化地緩存,在運行任務(wù)之前被反序列化出來。這意味著當我們需要在多個階段的任務(wù)之間使用相同的數(shù)據(jù),或者以反序列化形式緩存數(shù)據(jù)是十分重要的時候,顯式地創(chuàng)建廣播變量才有用。

通過在一個變量v上調(diào)用SparkContext.broadcast(v)可以創(chuàng)建廣播變量。廣播變量是圍繞著v的封裝,可以通過value方法訪問這個變量。舉例如下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

在創(chuàng)建了廣播變量之后,在集群上的所有函數(shù)中應(yīng)該使用它來替代使用v.這樣v就不會不止一次地在節(jié)點之間傳輸了。另外,為了確保所有的節(jié)點獲得相同的變量,對象v在被廣播之后就不應(yīng)該再修改。

1.2 累加器:

累加器是僅僅被相關(guān)操作累加的變量,因此可以在并行中被有效地支持。它可以被用來實現(xiàn)計數(shù)器和總和。Spark原生地只支持數(shù)字類型的累加器,編程者可以添加新類型的支持。如果創(chuàng)建累加器時指定了名字,可以在Spark的UI界面看到。這有利于理解每個執(zhí)行階段的進程。(對于python還不支持)

累加器通過對一個初始化了的變量v調(diào)用SparkContext.accumulator(v)來創(chuàng)建。在集群上運行的任務(wù)可以通過add或者”+=”方法在累加器上進行累加操作。但是,它們不能讀取它的值。只有驅(qū)動程序能夠讀取它的值,通過累加器的value方法。

下面的代碼展示了如何把一個數(shù)組中的所有元素累加到累加器上:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10

盡管上面的例子使用了內(nèi)置支持的累加器類型Int,但是開發(fā)人員也可以通過繼承AccumulatorParam類來創(chuàng)建它們自己的累加器類型。AccumulatorParam接口有兩個方法:
zero方法為你的類型提供一個0值。
addInPlace方法將兩個值相加。
假設(shè)我們有一個代表數(shù)學(xué)vector的Vector類。我們可以向下面這樣實現(xiàn):

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
 def zero(initialValue: Vector): Vector = {
  Vector.zeros(initialValue.size)
 }
 def addInPlace(v1: Vector, v2: Vector): Vector = {
  v1 += v2
 }
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

在Scala里,Spark提供更通用的累加接口來累加數(shù)據(jù),盡管結(jié)果的類型和累加的數(shù)據(jù)類型可能不一致(例如,通過收集在一起的元素來創(chuàng)建一個列表)。同時,SparkContext..accumulableCollection方法來累加通用的Scala的集合類型。

累加器僅僅在動作操作內(nèi)部被更新,Spark保證每個任務(wù)在累加器上的更新操作只被執(zhí)行一次,也就是說,重啟任務(wù)也不會更新。在轉(zhuǎn)換操作中,用戶必須意識到每個任務(wù)對累加器的更新操作可能被不只一次執(zhí)行,如果重新執(zhí)行了任務(wù)和作業(yè)的階段。

累加器并沒有改變Spark的惰性求值模型。如果它們被RDD上的操作更新,它們的值只有當RDD因為動作操作被計算時才被更新。因此,當執(zhí)行一個惰性的轉(zhuǎn)換操作,比如map時,不能保證對累加器值的更新被實際執(zhí)行了。下面的代碼片段演示了此特性:

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
//在這里,accum的值仍然是0,因為沒有動作操作引起map被實際的計算.

二.Java和Scala版本的實戰(zhàn)演示

2.1 Java版本:

/**
 * 實例:利用廣播進行黑名單過濾!
 * 檢查新的數(shù)據(jù) 根據(jù)是否在廣播變量-黑名單內(nèi),從而實現(xiàn)過濾數(shù)據(jù)。
 */
public class BroadcastAccumulator {
 /**
  * 創(chuàng)建一個List的廣播變量
  *
  */
 private static volatile Broadcast<List<String>> broadcastList = null;
 /**
  * 計數(shù)器!
  */
 private static volatile Accumulator<Integer> accumulator = null;
 public static void main(String[] args) {
  SparkConf conf = new SparkConf().setMaster("local[2]").
    setAppName("WordCountOnlineBroadcast");
  JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

  /**
   * 注意:分發(fā)廣播需要一個action操作觸發(fā)。
   * 注意:廣播的是Arrays的asList 而非對象的引用。廣播Array數(shù)組的對象引用會出錯。
   * 使用broadcast廣播黑名單到每個Executor中!
   */
  broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
  /**
   * 累加器作為全局計數(shù)器!用于統(tǒng)計在線過濾了多少個黑名單!
   * 在這里實例化。
   */
  accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");

  JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);

  /**
   * 這里省去flatmap因為名單是一個個的!
   */
  JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
   @Override
   public Tuple2<String, Integer> call(String word) {
    return new Tuple2<String, Integer>(word, 1);
   }
  });
  JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
   @Override
   public Integer call(Integer v1, Integer v2) {
    return v1 + v2;
   }
  });
  /**
   * Funtion里面 前幾個參數(shù)是 入?yún)ⅰ?
   * 后面的出參。
   * 體現(xiàn)在call方法里面!
   *
   */
  wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
   @Override
   public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
    rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
     @Override
     public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
      if (broadcastList.value().contains(wordPair._1)) {
       /**
        * accumulator不僅僅用來計數(shù)。
        * 可以同時寫進數(shù)據(jù)庫或者緩存中。
        */
       accumulator.add(wordPair._2);
       return false;
      }else {
       return true;
      }
     };
     /**
      * 廣播和計數(shù)器的執(zhí)行,需要進行一個action操作!
      */
    }).collect();
    System.out.println("廣播器里面的值"+broadcastList.value());
    System.out.println("計時器里面的值"+accumulator.value());
    return null;
   }
  });

  jsc.start();
  jsc.awaitTermination();
  jsc.close();
 }
 }

2.2 Scala版本

package com.Streaming
import java.util
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast
/**
 * Created by lxh on 2016/6/30.
 */
object BroadcastAccumulatorStreaming {
 /**
 * 聲明一個廣播和累加器!
 */
 private var broadcastList:Broadcast[List[String]] = _
 private var accumulator:Accumulator[Int] = _
 def main(args: Array[String]) {
 val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
 val sc = new SparkContext(sparkConf)
 /**
  * duration是ms
  */
 val ssc = new StreamingContext(sc,Duration(2000))
 // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
 broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
 accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")
 /**
  * 獲取數(shù)據(jù)!
  */
 val lines = ssc.socketTextStream("localhost",9999)
 /**
  * 1.flatmap把行分割成詞。
  * 2.map把詞變成tuple(word,1)
  * 3.reducebykey累加value
  * (4.sortBykey排名)
  * 4.進行過濾。 value是否在累加器中。
  * 5.打印顯示。
  */
 val words = lines.flatMap(line => line.split(" "))
 val wordpair = words.map(word => (word,1))
 wordpair.filter(record => {broadcastList.value.contains(record._1)})
 val pair = wordpair.reduceByKey(_+_)
 /**
  * 這個pair 是PairDStream<String, Integer>
  * 查看這個id是否在黑名單中,如果是的話,累加器就+1
  */
/* pair.foreachRDD(rdd => {
  rdd.filter(record => {
  if (broadcastList.value.contains(record._1)) {
   accumulator.add(1)
   return true
  } else {
   return false
  }
  })
 })*/
 val filtedpair = pair.filter(record => {
  if (broadcastList.value.contains(record._1)) {
   accumulator.add(record._2)
   true
  } else {
   false
  }
  }).print
 println("累加器的值"+accumulator.value)
 // pair.filter(record => {broadcastList.value.contains(record._1)})
 /* val keypair = pair.map(pair => (pair._2,pair._1))*/
 /**
  * 如果DStream自己沒有某個算子操作。就通過轉(zhuǎn)化transform!
  */
 /* keypair.transform(rdd => {
  rdd.sortByKey(false)//TODO
 })*/
 pair.print()
 ssc.start()
 ssc.awaitTermination()
 }
}

總結(jié)

以上就是本文關(guān)于Spark的廣播變量和累加器使用方法代碼示例的全部內(nèi)容,希望對大家有所幫助。感興趣的朋友可以參閱:詳解Java編寫并運行spark應(yīng)用程序的方法  、 Spark入門簡介等,有什么問題可以隨時留言,小編會及時回復(fù)大家。感謝朋友們對腳本之家網(wǎng)站的支持。

相關(guān)文章

  • Mac下開啟與關(guān)閉端口轉(zhuǎn)發(fā)的腳本配置方法

    Mac下開啟與關(guān)閉端口轉(zhuǎn)發(fā)的腳本配置方法

    這篇文章主要介紹了Mac下開啟與關(guān)閉端口轉(zhuǎn)發(fā)的腳本配置方法,非常不錯,具有參考借鑒價值,需要的朋友可以參考下
    2018-04-04
  • Linux Makefile與Shell的問題

    Linux Makefile與Shell的問題

    大概只要知道Makefile的人,都知道Makefile可以調(diào)用Shell腳本。但是在實際使用時,并不那么簡單,一些模棱兩可的地方可能會讓你抓狂。你若不信,可以先看幾個例子,想象一下這些這些例子會打印什么內(nèi)容,記下你想象的結(jié)果,然后在計算機上運行這些例子,對照看一下
    2016-03-03
  • 基于Xen的VPS 配置squid服務(wù)器

    基于Xen的VPS 配置squid服務(wù)器

    前面總結(jié)了 基于Xen的VPS的web服務(wù)器的配置:ubuntu+nginx+php,下面記錄下squid服務(wù)器的配置。
    2010-07-07
  • 寶塔面板中mongodb的配置教程分享

    寶塔面板中mongodb的配置教程分享

    MongoDB是一個基于分布式文件存儲的數(shù)據(jù)庫,由C++語言編寫。旨在為WEB應(yīng)用提供可擴展的高性能數(shù)據(jù)存儲解決方案,下面我們來講講如何在寶塔面板中配置mongodb吧
    2023-08-08
  • Mac安裝Homebrew的那些事兒

    Mac安裝Homebrew的那些事兒

    Homebrew是Mac Os的包管理工具,相當于Redhat Linux(Centos/RHEL/Fedora)的yum或者Debian Linux(Debian/Ubuntu)的apt-get。這篇文章主要介紹了Mac安裝Homebrew的那些事兒 ,需要的朋友可以參考下
    2019-08-08
  • 運維管理器Fabric使用方法

    運維管理器Fabric使用方法

    Fabric是基于Python2.5版本以上實現(xiàn)的SSH命令行工具,簡化了SSH的應(yīng)用程序部署及系統(tǒng)管理任務(wù),它提供了系統(tǒng)基礎(chǔ)的操作組件,可以實現(xiàn)本地或遠程shell命令,包括命令執(zhí)行,文件上傳,下載及完整執(zhí)行日志輸出等功能
    2016-08-08
  • Centos7使用docker搭建gitlab服務(wù)器

    Centos7使用docker搭建gitlab服務(wù)器

    這篇文章主要為大家詳細介紹了Centos7使用docker搭建gitlab服務(wù)器,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-04-04
  • 外貿(mào)網(wǎng)站屏蔽中國IP訪問的多種方法

    外貿(mào)網(wǎng)站屏蔽中國IP訪問的多種方法

    這篇文章主要介紹了外貿(mào)網(wǎng)站屏蔽中國IP訪問的多種方法,本文通過nginx、iptables、javascript三種方法實現(xiàn),需要的朋友可以參考下
    2014-12-12
  • 服務(wù)器添加git鉤子的步驟

    服務(wù)器添加git鉤子的步驟

    這篇文章主要介紹了服務(wù)器添加git鉤子的步驟,配置碼云和服務(wù)器ssh
    -服務(wù)器要開放22端口,對服務(wù)器git鉤子相關(guān)知識感興趣的朋友一起看看吧
    2022-10-10
  • vscode內(nèi)網(wǎng)訪問服務(wù)器的方法

    vscode內(nèi)網(wǎng)訪問服務(wù)器的方法

    這篇文章主要介紹了vscode內(nèi)網(wǎng)訪問服務(wù)器的相關(guān)知識,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-06-06

最新評論