欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spark學(xué)習(xí)筆記Spark Streaming的使用

 更新時間:2019年06月14日 09:56:55   作者:EVAO_大個子  
這篇文章主要介紹了Spark學(xué)習(xí)筆記Spark Streaming的使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

1. Spark Streaming

  • Spark Streaming是一個基于Spark Core之上的實(shí)時計算框架,可以從很多數(shù)據(jù)源消費(fèi)數(shù)據(jù)并對數(shù)據(jù)進(jìn)行處理
  • Spark Streaing中有一個最基本的抽象叫DStream(代理),本質(zhì)上就是一系列連續(xù)的RDD,DStream其實(shí)就是對RDD的封裝
  • DStream可以認(rèn)為是一個RDD的工廠,該DStream里面生產(chǎn)都是相同業(yè)務(wù)邏輯的RDD,只不過是RDD里面要讀取數(shù)據(jù)的不相同
  • 在一個批次的處理時間間隔里, DStream只產(chǎn)生一個RDD
  • DStream就相當(dāng)于一個"模板", 我們可以根據(jù)這個"模板"來處理一段時間間隔之內(nèi)產(chǎn)生的這個rdd,以此為依據(jù)來構(gòu)建rdd的DAG

2. 當(dāng)下比較流行的實(shí)時計算引擎

吞吐量 編程語言 處理速度 生態(tài)

Storm 較低 clojure 非???亞秒) 阿里(JStorm)

Flink 較高 scala 較快(亞秒) 國內(nèi)使用較少

Spark Streaming 非常高 scala 快(毫秒) 完善的生態(tài)圈

3. Spark Streaming處理網(wǎng)絡(luò)數(shù)據(jù)

//創(chuàng)建StreamingContext 至少要有兩個線程 一個線程用于接收數(shù)據(jù) 一個線程用于處理數(shù)據(jù)
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val receiverDS: ReceiverInputDStream[String] = ssc.socketTextStream("uplooking01", 44444)
val pairRetDS: DStream[(String, Int)] = receiverDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)
pairRetDS.print()
//開啟流計算
ssc.start()
//優(yōu)雅的關(guān)閉
ssc.awaitTermination()

4. Spark Streaming接收數(shù)據(jù)的兩種方式(Kafka)

Receiver

  • 偏移量是由zookeeper來維護(hù)的
  • 使用的是Kafka高級的API(消費(fèi)者的API)
  • 編程簡單
  • 效率低(為了保證數(shù)據(jù)的安全性,會開啟WAL)
  • kafka0.10的版本中已經(jīng)徹底棄用Receiver了
  • 生產(chǎn)環(huán)境一般不會使用這種方式

Direct

  • 偏移量是有我們來手動維護(hù)
  • 效率高(我們直接把spark streaming接入到kafka的分區(qū)中了)
  • 編程比較復(fù)雜
  • 生產(chǎn)環(huán)境一般使用這種方式

5. Spark Streaming整合Kafka

基于Receiver的方式整合kafka(生產(chǎn)環(huán)境不建議使用,在0.10中已經(jīng)移除了)

//創(chuàng)建StreamingContext 至少要有兩個線程 一個線程用于接收數(shù)據(jù) 一個線程用于處理數(shù)據(jù)
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val zkQuorum = "uplooking03:2181,uplooking04:2181,uplooking05:2181"
val groupId = "myid"
val topics = Map("hadoop" -> 3)
val receiverDS: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
receiverDS.flatMap(_._2.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()

基于Direct的方式(生產(chǎn)環(huán)境使用)

//創(chuàng)建StreamingContext 至少要有兩個線程 一個線程用于接收數(shù)據(jù) 一個線程用于處理數(shù)據(jù)
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val kafkaParams = Map("metadata.broker.list" -> "uplooking03:9092,uplooking04:9092,uplooking05:9092")
val topics = Set("hadoop")
val inputDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
inputDS.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()

6. 實(shí)時流計算的架構(gòu)

1. 生成日志(模擬用戶訪問web應(yīng)用的日志)

public class GenerateAccessLog {
  public static void main(String[] args) throws IOException, InterruptedException {
    //準(zhǔn)備數(shù)據(jù)
    int[] ips = {123, 18, 123, 112, 181, 16, 172, 183, 190, 191, 196, 120};
    String[] requesTypes = {"GET", "POST"};
    String[] cursors = {"/vip/112", "/vip/113", "/vip/114", "/vip/115", "/vip/116", "/vip/117", "/vip/118", "/vip/119", "/vip/120", "/vip/121", "/free/210", "/free/211", "/free/212", "/free/213", "/free/214", "/company/312", "/company/313", "/company/314", "/company/315"};

    String[] courseNames = {"大數(shù)據(jù)", "python", "java", "c++", "c", "scala", "android", "spark", "hadoop", "redis"};
    String[] references = {"www.baidu.com/", "www.sougou.com/", "www.sou.com/", "www.google.com"};
    FileWriter fw = new FileWriter(args[0]);
    PrintWriter printWriter = new PrintWriter(fw);
    while (true) {
      //      Thread.sleep(1000);
      //產(chǎn)生字段
      String date = new Date().toLocaleString();
      String method = requesTypes[getRandomNum(0, requesTypes.length)];
      String url = "/cursor" + cursors[getRandomNum(0, cursors.length)];
      String HTTPVERSION = "HTTP/1.1";
      String ip = ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)];
      String reference = references[getRandomNum(0, references.length)];
      String rowLog = date + " " + method + " " + url + " " + HTTPVERSION + " " + ip + " " + reference;
      printWriter.println(rowLog);
      printWriter.flush();
    }
  }


  //[start,end)
  public static int getRandomNum(int start, int end) {
    int i = new Random().nextInt(end - start) + start;
    return i;
  }
}

2. flume使用avro采集web應(yīng)用服務(wù)器的日志數(shù)據(jù)

采集命令執(zhí)行的結(jié)果到avro中

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f1.sources = r1
f1.channels = c1
f1.sinks = k1

#define sources
f1.sources.r1.type = exec
f1.sources.r1.command =tail -F /logs/access.log

#define channels
f1.channels.c1.type = memory
f1.channels.c1.capacity = 1000
f1.channels.c1.transactionCapacity = 100

#define sink 采集日志到uplooking03
f1.sinks.k1.type = avro
f1.sinks.k1.hostname = uplooking03
f1.sinks.k1.port = 44444

#bind sources and sink to channel 
f1.sources.r1.channels = c1
f1.sinks.k1.channel = c1
從avro采集到控制臺
# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2

#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind = uplooking03
f2.sources.r2.port = 44444

#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactionCapacity = 100

#define sink
f2.sinks.k2.type = logger

#bind sources and sink to channel 
f2.sources.r2.channels = c2
f2.sinks.k2.channel = c2
從avro采集到kafka中
# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2

#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind = uplooking03
f2.sources.r2.port = 44444

#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactionCapacity = 100

#define sink
f2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
f2.sinks.k2.topic = hadoop
f2.sinks.k2.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092
f2.sinks.k2.requiredAcks = 1

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • mybatis-plus中l(wèi)ambdaQuery()與lambdaUpdate()比較常見的使用方法總結(jié)

    mybatis-plus中l(wèi)ambdaQuery()與lambdaUpdate()比較常見的使用方法總結(jié)

    mybatis-plus是在mybatis的基礎(chǔ)上做增強(qiáng)不做改變,簡化了CRUD操作,下面這篇文章主要給大家介紹了關(guān)于mybatis-plus中l(wèi)ambdaQuery()與lambdaUpdate()比較常見的使用方法,需要的朋友可以參考下
    2022-09-09
  • SpringBoot實(shí)現(xiàn)掃碼登錄的示例代碼

    SpringBoot實(shí)現(xiàn)掃碼登錄的示例代碼

    本文主要介紹了SpringBoot實(shí)現(xiàn)掃碼登錄的示例代碼,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-03-03
  • feign 調(diào)用第三方服務(wù)中部分特殊符號未轉(zhuǎn)義問題

    feign 調(diào)用第三方服務(wù)中部分特殊符號未轉(zhuǎn)義問題

    這篇文章主要介紹了feign 調(diào)用第三方服務(wù)中部分特殊符號未轉(zhuǎn)義問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Java的CGLIB動態(tài)代理深入解析

    Java的CGLIB動態(tài)代理深入解析

    這篇文章主要介紹了Java的CGLIB動態(tài)代理深入解析,CGLIB是強(qiáng)大的、高性能的代碼生成庫,被廣泛應(yīng)用于AOP框架,它底層使用ASM來操作字節(jié)碼生成新的類,為對象引入間接級別,以控制對象的訪問,需要的朋友可以參考下
    2023-11-11
  • sa-token整合springboot中的代碼示例展示

    sa-token整合springboot中的代碼示例展示

    sa-token?是一個輕量級的 Java 權(quán)限認(rèn)證框架,它可以很方便地集成到 Spring Boot 項目中,以提供簡潔的認(rèn)證和授權(quán)功能,這篇文章主要介紹了sa-token整合springboot中的代碼示例展示,需要的朋友可以參考下
    2024-04-04
  • 淺談SpringMVC的執(zhí)行流程

    淺談SpringMVC的執(zhí)行流程

    下面小編就為大家?guī)硪黄獪\談SpringMVC的執(zhí)行流程。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-09-09
  • Java Vector實(shí)現(xiàn)班級信息管理系統(tǒng)

    Java Vector實(shí)現(xiàn)班級信息管理系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了Java Vector實(shí)現(xiàn)班級信息管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • mybatis-plus分頁查詢?nèi)N方法小結(jié)

    mybatis-plus分頁查詢?nèi)N方法小結(jié)

    本文主要介紹了mybatis-plus分頁查詢?nèi)N方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-05-05
  • Java實(shí)現(xiàn)五子棋網(wǎng)絡(luò)版

    Java實(shí)現(xiàn)五子棋網(wǎng)絡(luò)版

    這篇文章主要為大家詳細(xì)介紹了基于Java編寫的網(wǎng)絡(luò)五子棋,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-03-03
  • Spring事務(wù)管理中的異?;貪L是什么

    Spring事務(wù)管理中的異?;貪L是什么

    Spring中的代碼出現(xiàn)異常時會回滾這是大家都希望的情況,這時候可以用@Transactional這個注解放在你的方法上來進(jìn)行回滾,這時候有個問題就是事務(wù)回滾是不希望你在Controller進(jìn)行處理,而是在Service層來進(jìn)行處理
    2023-02-02

最新評論