SpringBoot操作spark處理hdfs文件的操作方法
SpringBoot操作spark處理hdfs文件
1、導(dǎo)入依賴
<!-- spark依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.12</artifactId> <version>3.2.2</version> </dependency>
2、配置spark信息
建立一個(gè)配置文件,配置spark信息
import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //將文件交于spring管理 @Configuration public class SparkConfig { //使用yml中的配置 @Value("${spark.master}") private String sparkMaster; @Value("${spark.appName}") private String sparkAppName; @Value("${hdfs.user}") private String hdfsUser; @Value("${hdfs.path}") private String hdfsPath; @Bean public SparkConf sparkConf() { SparkConf conf = new SparkConf(); conf.setMaster(sparkMaster); conf.setAppName(sparkAppName); // 添加HDFS配置 conf.set("fs.defaultFS", hdfsPath); conf.set("spark.hadoop.hdfs.user",hdfsUser); return conf; } @Bean public SparkSession sparkSession() { return SparkSession.builder() .config(sparkConf()) .getOrCreate(); } }
3、controller和service
controller類
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import xyz.zzj.traffic_main_code.service.SparkService; @RestController @RequestMapping("/spark") public class SparkController { @Autowired private SparkService sparkService; @GetMapping("/run") public String runSparkJob() { //讀取Hadoop HDFS文件 String filePath = "hdfs://192.168.44.128:9000/subwayData.csv"; sparkService.executeHadoopSparkJob(filePath); return "Spark job executed successfully!"; } }
處理地鐵數(shù)據(jù)的service
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import xyz.zzj.traffic_main_code.service.SparkReadHdfs; import java.io.IOException; import java.net.URI; import static org.apache.spark.sql.functions.*; @Service public class SparkReadHdfsImpl implements SparkReadHdfs { private final SparkSession spark; @Value("${hdfs.user}") private String hdfsUser; @Value("${hdfs.path}") private String hdfsPath; @Autowired public SparkReadHdfsImpl(SparkSession spark) { this.spark = spark; } /** * 讀取HDFS上的CSV文件并上傳到HDFS * @param filePath */ @Override public void sparkSubway(String filePath) { try { // 設(shè)置Hadoop配置 JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); Configuration hadoopConf = jsc.hadoopConfiguration(); hadoopConf.set("fs.defaultFS", hdfsPath); hadoopConf.set("hadoop.user.name", hdfsUser); // 讀取HDFS上的文件 Dataset<Row> df = spark.read() .option("header", "true") // 指定第一行是列名 .option("inferSchema", "true") // 自動(dòng)推斷列的數(shù)據(jù)類型 .csv(filePath); // 顯示DataFrame的所有數(shù)據(jù) // df.show(Integer.MAX_VALUE, false); // 對(duì)DataFrame進(jìn)行清洗和轉(zhuǎn)換操作 // 檢查缺失值 df.select("number", "people", "dateTime").na().drop().show(); // 對(duì)數(shù)據(jù)進(jìn)行類型轉(zhuǎn)換 Dataset<Row> df2 = df.select( col("number").cast(DataTypes.IntegerType), col("people").cast(DataTypes.IntegerType), to_date(col("dateTime"), "yyyy年MM月dd日").alias("dateTime") ); // 去重 Dataset<Row> df3 = df2.dropDuplicates(); // 數(shù)據(jù)過濾,確保people列沒有負(fù)數(shù) Dataset<Row> df4 = df3.filter(col("people").geq(0)); // df4.show(); // 數(shù)據(jù)聚合,按dateTime分組,統(tǒng)計(jì)每天的總客流量 Dataset<Row> df6 = df4.groupBy("dateTime").agg(sum("people").alias("total_people")); // df6.show(); sparkForSubway(df6,"/time_subwayData.csv"); //數(shù)據(jù)聚合,獲取每天人數(shù)最多的地鐵number Dataset<Row> df7 = df4.groupBy("dateTime").agg(max("people").alias("max_people")); sparkForSubway(df7,"/everyday_max_subwayData.csv"); //數(shù)據(jù)聚合,計(jì)算每天的客流強(qiáng)度:每天總people除以632840 Dataset<Row> df8 = df4.groupBy("dateTime").agg(sum("people").divide(632.84).alias("strength")); sparkForSubway(df8,"/everyday_strength_subwayData.csv"); } catch (Exception e) { e.printStackTrace(); } } private static void sparkForSubway(Dataset<Row> df6, String hdfsPath) throws IOException { // 保存處理后的數(shù)據(jù)到HDFS df6.coalesce(1) .write().mode("overwrite") .option("header", "true") .csv("hdfs://192.168.44.128:9000/time_subwayData"); // 創(chuàng)建Hadoop配置 Configuration conf = new Configuration(); // 獲取FileSystem實(shí)例 FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.44.128:9000"), conf); // 定義臨時(shí)目錄和目標(biāo)文件路徑 Path tempDir = new Path("/time_subwayData"); FileStatus[] files = fs.listStatus(tempDir); // 檢查目標(biāo)文件是否存在,如果存在則刪除 Path targetFile1 = new Path(hdfsPath); if (fs.exists(targetFile1)) { fs.delete(targetFile1, true); // true 表示遞歸刪除 } for (FileStatus file : files) { if (file.isFile() && file.getPath().getName().startsWith("part-")) { Path targetFile = new Path(hdfsPath); fs.rename(file.getPath(), targetFile); } } // 刪除臨時(shí)目錄 fs.delete(tempDir, true); } }
4、運(yùn)行
- 項(xiàng)目運(yùn)行完后,打開瀏覽器
- spark處理地鐵數(shù)據(jù)
- http://localhost:8686/spark/dispose
- 觀察spark和hdfs
- http://192.168.44.128:8099/
- http://192.168.44.128:9870/explorer.html#/
到此這篇關(guān)于SpringBoot操作spark處理hdfs文件的文章就介紹到這了,更多相關(guān)SpringBoot spark處理hdfs文件內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何解決創(chuàng)建maven工程時(shí),產(chǎn)生“找不到插件的錯(cuò)誤”問題
這篇文章主要介紹了如何解決創(chuàng)建maven工程時(shí),產(chǎn)生“找不到插件的錯(cuò)誤”問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12SpringCloud Alibaba Nacos 整合SpringBoot A
這篇文章主要介紹了SpringCloud Alibaba Nacos 整合SpringBoot Admin實(shí)戰(zhàn),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12jackson json序列化實(shí)現(xiàn)首字母大寫,第二個(gè)字母需小寫
這篇文章主要介紹了jackson json序列化實(shí)現(xiàn)首字母大寫,第二個(gè)字母需小寫方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Spring MVC中使用Google kaptcha驗(yàn)證碼的方法詳解
kaptcha 是一個(gè)非常實(shí)用的驗(yàn)證碼生成工具。有了它,你可以生成各種樣式的驗(yàn)證碼,因?yàn)樗强膳渲玫?,下面這篇文章主要給大家介紹了關(guān)于Spring MVC中使用Google kaptcha驗(yàn)證碼的方法,需要的朋友可以參考借鑒,下面來一起看看吧。2017-10-10Java中的List接口實(shí)現(xiàn)類LinkList和ArrayList詳解
這篇文章主要介紹了Java中的List接口實(shí)現(xiàn)類LinkList和ArrayList詳解,List接口繼承自Collection接口,是單列集合的一個(gè)重要分支,實(shí)現(xiàn)了List接口的對(duì)象稱為L(zhǎng)ist集合,在List集合中允許出現(xiàn)重復(fù)的元素,所有的元素是以一種線性方式進(jìn)行存儲(chǔ)的,需要的朋友可以參考下2024-01-01SpringBoot整合Dubbo+Zookeeper實(shí)現(xiàn)RPC調(diào)用
這篇文章主要給大家介紹了Spring Boot整合Dubbo+Zookeeper實(shí)現(xiàn)RPC調(diào)用的步驟詳解,文中有詳細(xì)的代碼示例,對(duì)我們的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2023-07-07解決springboot讀取application.properties中文亂碼問題
初用properties,讀取java properties文件的時(shí)候如果value是中文,會(huì)出現(xiàn)亂碼的問題,所以本文小編將給大家介紹如何解決springboot讀取application.properties中文亂碼問題,需要的朋友可以參考下2023-11-11