SpringBoot操作spark處理hdfs文件的操作方法
SpringBoot操作spark處理hdfs文件
1、導(dǎo)入依賴
<!-- spark依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.12</artifactId> <version>3.2.2</version> </dependency>
2、配置spark信息
建立一個配置文件,配置spark信息
import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //將文件交于spring管理 @Configuration public class SparkConfig { //使用yml中的配置 @Value("${spark.master}") private String sparkMaster; @Value("${spark.appName}") private String sparkAppName; @Value("${hdfs.user}") private String hdfsUser; @Value("${hdfs.path}") private String hdfsPath; @Bean public SparkConf sparkConf() { SparkConf conf = new SparkConf(); conf.setMaster(sparkMaster); conf.setAppName(sparkAppName); // 添加HDFS配置 conf.set("fs.defaultFS", hdfsPath); conf.set("spark.hadoop.hdfs.user",hdfsUser); return conf; } @Bean public SparkSession sparkSession() { return SparkSession.builder() .config(sparkConf()) .getOrCreate(); } }
3、controller和service
controller類
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import xyz.zzj.traffic_main_code.service.SparkService; @RestController @RequestMapping("/spark") public class SparkController { @Autowired private SparkService sparkService; @GetMapping("/run") public String runSparkJob() { //讀取Hadoop HDFS文件 String filePath = "hdfs://192.168.44.128:9000/subwayData.csv"; sparkService.executeHadoopSparkJob(filePath); return "Spark job executed successfully!"; } }
處理地鐵數(shù)據(jù)的service
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import xyz.zzj.traffic_main_code.service.SparkReadHdfs; import java.io.IOException; import java.net.URI; import static org.apache.spark.sql.functions.*; @Service public class SparkReadHdfsImpl implements SparkReadHdfs { private final SparkSession spark; @Value("${hdfs.user}") private String hdfsUser; @Value("${hdfs.path}") private String hdfsPath; @Autowired public SparkReadHdfsImpl(SparkSession spark) { this.spark = spark; } /** * 讀取HDFS上的CSV文件并上傳到HDFS * @param filePath */ @Override public void sparkSubway(String filePath) { try { // 設(shè)置Hadoop配置 JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); Configuration hadoopConf = jsc.hadoopConfiguration(); hadoopConf.set("fs.defaultFS", hdfsPath); hadoopConf.set("hadoop.user.name", hdfsUser); // 讀取HDFS上的文件 Dataset<Row> df = spark.read() .option("header", "true") // 指定第一行是列名 .option("inferSchema", "true") // 自動推斷列的數(shù)據(jù)類型 .csv(filePath); // 顯示DataFrame的所有數(shù)據(jù) // df.show(Integer.MAX_VALUE, false); // 對DataFrame進行清洗和轉(zhuǎn)換操作 // 檢查缺失值 df.select("number", "people", "dateTime").na().drop().show(); // 對數(shù)據(jù)進行類型轉(zhuǎn)換 Dataset<Row> df2 = df.select( col("number").cast(DataTypes.IntegerType), col("people").cast(DataTypes.IntegerType), to_date(col("dateTime"), "yyyy年MM月dd日").alias("dateTime") ); // 去重 Dataset<Row> df3 = df2.dropDuplicates(); // 數(shù)據(jù)過濾,確保people列沒有負數(shù) Dataset<Row> df4 = df3.filter(col("people").geq(0)); // df4.show(); // 數(shù)據(jù)聚合,按dateTime分組,統(tǒng)計每天的總客流量 Dataset<Row> df6 = df4.groupBy("dateTime").agg(sum("people").alias("total_people")); // df6.show(); sparkForSubway(df6,"/time_subwayData.csv"); //數(shù)據(jù)聚合,獲取每天人數(shù)最多的地鐵number Dataset<Row> df7 = df4.groupBy("dateTime").agg(max("people").alias("max_people")); sparkForSubway(df7,"/everyday_max_subwayData.csv"); //數(shù)據(jù)聚合,計算每天的客流強度:每天總people除以632840 Dataset<Row> df8 = df4.groupBy("dateTime").agg(sum("people").divide(632.84).alias("strength")); sparkForSubway(df8,"/everyday_strength_subwayData.csv"); } catch (Exception e) { e.printStackTrace(); } } private static void sparkForSubway(Dataset<Row> df6, String hdfsPath) throws IOException { // 保存處理后的數(shù)據(jù)到HDFS df6.coalesce(1) .write().mode("overwrite") .option("header", "true") .csv("hdfs://192.168.44.128:9000/time_subwayData"); // 創(chuàng)建Hadoop配置 Configuration conf = new Configuration(); // 獲取FileSystem實例 FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.44.128:9000"), conf); // 定義臨時目錄和目標文件路徑 Path tempDir = new Path("/time_subwayData"); FileStatus[] files = fs.listStatus(tempDir); // 檢查目標文件是否存在,如果存在則刪除 Path targetFile1 = new Path(hdfsPath); if (fs.exists(targetFile1)) { fs.delete(targetFile1, true); // true 表示遞歸刪除 } for (FileStatus file : files) { if (file.isFile() && file.getPath().getName().startsWith("part-")) { Path targetFile = new Path(hdfsPath); fs.rename(file.getPath(), targetFile); } } // 刪除臨時目錄 fs.delete(tempDir, true); } }
4、運行
- 項目運行完后,打開瀏覽器
- spark處理地鐵數(shù)據(jù)
- http://localhost:8686/spark/dispose
- 觀察spark和hdfs
- http://192.168.44.128:8099/
- http://192.168.44.128:9870/explorer.html#/
到此這篇關(guān)于SpringBoot操作spark處理hdfs文件的文章就介紹到這了,更多相關(guān)SpringBoot spark處理hdfs文件內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何解決創(chuàng)建maven工程時,產(chǎn)生“找不到插件的錯誤”問題
這篇文章主要介紹了如何解決創(chuàng)建maven工程時,產(chǎn)生“找不到插件的錯誤”問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12SpringCloud Alibaba Nacos 整合SpringBoot A
這篇文章主要介紹了SpringCloud Alibaba Nacos 整合SpringBoot Admin實戰(zhàn),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12jackson json序列化實現(xiàn)首字母大寫,第二個字母需小寫
這篇文章主要介紹了jackson json序列化實現(xiàn)首字母大寫,第二個字母需小寫方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06Spring MVC中使用Google kaptcha驗證碼的方法詳解
kaptcha 是一個非常實用的驗證碼生成工具。有了它,你可以生成各種樣式的驗證碼,因為它是可配置的,下面這篇文章主要給大家介紹了關(guān)于Spring MVC中使用Google kaptcha驗證碼的方法,需要的朋友可以參考借鑒,下面來一起看看吧。2017-10-10Java中的List接口實現(xiàn)類LinkList和ArrayList詳解
這篇文章主要介紹了Java中的List接口實現(xiàn)類LinkList和ArrayList詳解,List接口繼承自Collection接口,是單列集合的一個重要分支,實現(xiàn)了List接口的對象稱為List集合,在List集合中允許出現(xiàn)重復(fù)的元素,所有的元素是以一種線性方式進行存儲的,需要的朋友可以參考下2024-01-01SpringBoot整合Dubbo+Zookeeper實現(xiàn)RPC調(diào)用
這篇文章主要給大家介紹了Spring Boot整合Dubbo+Zookeeper實現(xiàn)RPC調(diào)用的步驟詳解,文中有詳細的代碼示例,對我們的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2023-07-07解決springboot讀取application.properties中文亂碼問題
初用properties,讀取java properties文件的時候如果value是中文,會出現(xiàn)亂碼的問題,所以本文小編將給大家介紹如何解決springboot讀取application.properties中文亂碼問題,需要的朋友可以參考下2023-11-11