Spark集群框架的搭建與入門(mén)
一、Spark概述
運(yùn)行結(jié)構(gòu)

Driver
運(yùn)行Spark的Applicaion中main()函數(shù),會(huì)創(chuàng)建SparkContext,SparkContext負(fù)責(zé)和Cluster-Manager進(jìn)行通信,并負(fù)責(zé)申請(qǐng)資源、任務(wù)分配和監(jiān)控等。
ClusterManager
負(fù)責(zé)申請(qǐng)和管理在WorkerNode上運(yùn)行應(yīng)用所需的資源,可以高效地在一個(gè)計(jì)算節(jié)點(diǎn)到數(shù)千個(gè)計(jì)算節(jié)點(diǎn)之間伸縮計(jì)算,目前包括Spark原生的ClusterManager、ApacheMesos和HadoopYARN。
Executor
Application運(yùn)行在WorkerNode上的一個(gè)進(jìn)程,作為工作節(jié)點(diǎn)負(fù)責(zé)運(yùn)行Task任務(wù),并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤(pán)上,每個(gè) Application都有各自獨(dú)立的一批Executor,任務(wù)間相互獨(dú)立。
二、環(huán)境部署
1、Scala環(huán)境
安裝包管理
[root@hop01 opt]# tar -zxvf scala-2.12.2.tgz [root@hop01 opt]# mv scala-2.12.2 scala2.12
配置變量
[root@hop01 opt]# vim /etc/profile export SCALA_HOME=/opt/scala2.12 export PATH=$PATH:$SCALA_HOME/bin [root@hop01 opt]# source /etc/profile
版本查看
[root@hop01 opt]# scala -version
Scala環(huán)境需要部署在Spark運(yùn)行的相關(guān)服務(wù)節(jié)點(diǎn)上。
2、Spark基礎(chǔ)環(huán)境
安裝包管理
[root@hop01 opt]# tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz [root@hop01 opt]# mv spark-2.1.1-bin-hadoop2.7 spark2.1
配置變量
[root@hop01 opt]# vim /etc/profile export SPARK_HOME=/opt/spark2.1 export PATH=$PATH:$SPARK_HOME/bin [root@hop01 opt]# source /etc/profile
版本查看
[root@hop01 opt]# spark-shell

3、Spark集群配置
服務(wù)節(jié)點(diǎn)
[root@hop01 opt]# cd /opt/spark2.1/conf/ [root@hop01 conf]# cp slaves.template slaves [root@hop01 conf]# vim slaves hop01 hop02 hop03
環(huán)境配置
[root@hop01 conf]# cp spark-env.sh.template spark-env.sh [root@hop01 conf]# vim spark-env.sh export JAVA_HOME=/opt/jdk1.8 export SCALA_HOME=/opt/scala2.12 export SPARK_MASTER_IP=hop01 export SPARK_LOCAL_IP=安裝節(jié)點(diǎn)IP export SPARK_WORKER_MEMORY=1g export HADOOP_CONF_DIR=/opt/hadoop2.7/etc/hadoop
注意SPARK_LOCAL_IP的配置。
4、Spark啟動(dòng)
依賴(lài)Hadoop相關(guān)環(huán)境,所以要先啟動(dòng)。
啟動(dòng):/opt/spark2.1/sbin/start-all.sh 停止:/opt/spark2.1/sbin/stop-all.sh
這里在主節(jié)點(diǎn)會(huì)啟動(dòng)兩個(gè)進(jìn)程:Master和Worker,其他節(jié)點(diǎn)只啟動(dòng)一個(gè)Worker進(jìn)程。
5、訪問(wèn)Spark集群
默認(rèn)端口是:8080。
http://hop01:8080/

運(yùn)行基礎(chǔ)案例:
[root@hop01 spark2.1]# cd /opt/spark2.1/ [root@hop01 spark2.1]# bin/spark-submit --class org.apache.spark.examples.SparkPi --master local examples/jars/spark-examples_2.11-2.1.1.jar 運(yùn)行結(jié)果:Pi is roughly 3.1455357276786384
三、開(kāi)發(fā)案例
1、核心依賴(lài)
依賴(lài)Spark2.1.1版本:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency>
引入Scala編譯插件:
<plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin>
2、案例代碼開(kāi)發(fā)
讀取指定位置的文件,并輸出文件內(nèi)容單詞統(tǒng)計(jì)結(jié)果。
@RestController
public class WordWeb implements Serializable {
@GetMapping("/word/web")
public String getWeb (){
// 1、創(chuàng)建Spark的配置對(duì)象
SparkConf sparkConf = new SparkConf().setAppName("LocalCount")
.setMaster("local[*]");
// 2、創(chuàng)建SparkContext對(duì)象
JavaSparkContext sc = new JavaSparkContext(sparkConf);
sc.setLogLevel("WARN");
// 3、讀取測(cè)試文件
JavaRDD lineRdd = sc.textFile("/var/spark/test/word.txt");
// 4、行內(nèi)容進(jìn)行切分
JavaRDD wordsRdd = lineRdd.flatMap(new FlatMapFunction() {
@Override
public Iterator call(Object obj) throws Exception {
String value = String.valueOf(obj);
String[] words = value.split(",");
return Arrays.asList(words).iterator();
}
});
// 5、切分的單詞進(jìn)行標(biāo)注
JavaPairRDD wordAndOneRdd = wordsRdd.mapToPair(new PairFunction() {
@Override
public Tuple2 call(Object obj) throws Exception {
//將單詞進(jìn)行標(biāo)記:
return new Tuple2(String.valueOf(obj), 1);
}
});
// 6、統(tǒng)計(jì)單詞出現(xiàn)次數(shù)
JavaPairRDD wordAndCountRdd = wordAndOneRdd.reduceByKey(new Function2() {
@Override
public Object call(Object obj1, Object obj2) throws Exception {
return Integer.parseInt(obj1.toString()) + Integer.parseInt(obj2.toString());
}
});
// 7、排序
JavaPairRDD sortedRdd = wordAndCountRdd.sortByKey();
List<Tuple2> finalResult = sortedRdd.collect();
// 8、結(jié)果打印
for (Tuple2 tuple2 : finalResult) {
System.out.println(tuple2._1 + " ===> " + tuple2._2);
}
// 9、保存統(tǒng)計(jì)結(jié)果
sortedRdd.saveAsTextFile("/var/spark/output");
sc.stop();
return "success" ;
}
}
打包執(zhí)行結(jié)果:

四、源代碼地址
GitHub·地址
https://github.com/cicadasmile/big-data-parent
GitEE·地址
https://gitee.com/cicadasmile/big-data-parent
以上就是Spark集群框架的搭建與入門(mén)的詳細(xì)內(nèi)容,更多關(guān)于Spark集群框架的搭建的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java.Net.UnknownHostException異常處理問(wèn)題解決
這篇文章主要介紹了java.Net.UnknownHostException異常處理方法,問(wèn)題原因是在系統(tǒng)的?/etc/Hostname中配置了主機(jī)名,而在/etc/hosts文件中沒(méi)有相應(yīng)的配置,本文給大家詳細(xì)講解,需要的朋友可以參考下2023-03-03
Java實(shí)現(xiàn)讀取SFTP服務(wù)器指定目錄文件的方法
SFTP是一種在安全通道上傳輸文件的協(xié)議,它是基于SSH(Secure Shell)協(xié)議的擴(kuò)展,用于在客戶(hù)端和服務(wù)器之間進(jìn)行加密的文件傳輸,這篇文章主要介紹了Java實(shí)現(xiàn)讀取SFTP服務(wù)器指定目錄文件,感興趣的朋友跟隨小編一起看看吧2023-08-08
springboot項(xiàng)目實(shí)現(xiàn)斷點(diǎn)續(xù)傳功能
這篇文章主要介紹了springboot項(xiàng)目實(shí)現(xiàn)斷點(diǎn)續(xù)傳,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08
詳解Spring?@Lazy注解為什么能破解死循環(huán)
這篇文章主要來(lái)和大家探討一下Spring中的@Lazy注解為什么能破解死循環(huán),文中的示例代碼講解詳細(xì),具有一定的參考價(jià)值,需要的可以了解一下2023-07-07
Java利用遞歸實(shí)現(xiàn)樹(shù)形結(jié)構(gòu)的工具類(lèi)
有時(shí)候,我們的數(shù)據(jù)是帶有層級(jí)的,比如常見(jiàn)的省市區(qū)三級(jí)聯(lián)動(dòng),就是一層套著一層。而我們?cè)跀?shù)據(jù)庫(kù)存放數(shù)據(jù)的時(shí)候,往往是列表形式的,這個(gè)時(shí)候可能就需要遞歸處理為樹(shù)形結(jié)構(gòu)了。本文就為大家介紹了Java利用遞歸實(shí)現(xiàn)樹(shù)形結(jié)構(gòu)的工具類(lèi),希望對(duì)大家有所幫助2023-03-03

