欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot操作spark處理hdfs文件的操作方法

 更新時(shí)間:2025年01月09日 11:56:45   作者:念言-ny  
本文介紹了如何使用Spring Boot操作Spark處理HDFS文件,包括導(dǎo)入依賴、配置Spark信息、編寫Controller和Service處理地鐵數(shù)據(jù)、運(yùn)行項(xiàng)目以及觀察Spark和HDFS的狀態(tài),感興趣的朋友跟隨小編一起看看吧

SpringBoot操作spark處理hdfs文件

1、導(dǎo)入依賴

<!--        spark依賴-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.2.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.12</artifactId>
            <version>3.2.2</version>
        </dependency>

2、配置spark信息

建立一個(gè)配置文件,配置spark信息

import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//將文件交于spring管理
@Configuration
public class SparkConfig {
    //使用yml中的配置
    @Value("${spark.master}")
    private String sparkMaster;
    @Value("${spark.appName}")
    private String sparkAppName;
    @Value("${hdfs.user}")
    private String hdfsUser;
    @Value("${hdfs.path}")
    private String hdfsPath;
    @Bean
    public SparkConf sparkConf() {
        SparkConf conf = new SparkConf();
        conf.setMaster(sparkMaster);
        conf.setAppName(sparkAppName);
        // 添加HDFS配置
        conf.set("fs.defaultFS", hdfsPath);
        conf.set("spark.hadoop.hdfs.user",hdfsUser);
        return conf;
    }
    @Bean
    public SparkSession sparkSession() {
        return SparkSession.builder()
                .config(sparkConf())
                .getOrCreate();
    }
}

3、controller和service

controller類

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import xyz.zzj.traffic_main_code.service.SparkService;
@RestController
@RequestMapping("/spark")
public class SparkController {
    @Autowired
    private SparkService sparkService;
    @GetMapping("/run")
    public String runSparkJob() {
        //讀取Hadoop HDFS文件
        String filePath = "hdfs://192.168.44.128:9000/subwayData.csv";
        sparkService.executeHadoopSparkJob(filePath);
        return "Spark job executed successfully!";
    }
}

處理地鐵數(shù)據(jù)的service

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import xyz.zzj.traffic_main_code.service.SparkReadHdfs;
import java.io.IOException;
import java.net.URI;
import static org.apache.spark.sql.functions.*;
@Service
public class SparkReadHdfsImpl implements SparkReadHdfs {
    private final SparkSession spark;
    @Value("${hdfs.user}")
    private String hdfsUser;
    @Value("${hdfs.path}")
    private String hdfsPath;
    @Autowired
    public SparkReadHdfsImpl(SparkSession spark) {
        this.spark = spark;
    }
    /**
     * 讀取HDFS上的CSV文件并上傳到HDFS
     * @param filePath
     */
    @Override
    public void sparkSubway(String filePath) {
        try {
            // 設(shè)置Hadoop配置
            JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
            Configuration hadoopConf = jsc.hadoopConfiguration();
            hadoopConf.set("fs.defaultFS", hdfsPath);
            hadoopConf.set("hadoop.user.name", hdfsUser);
            // 讀取HDFS上的文件
            Dataset<Row> df = spark.read()
                    .option("header", "true") // 指定第一行是列名
                    .option("inferSchema", "true") // 自動(dòng)推斷列的數(shù)據(jù)類型
                    .csv(filePath);
            // 顯示DataFrame的所有數(shù)據(jù)
//            df.show(Integer.MAX_VALUE, false);
            // 對(duì)DataFrame進(jìn)行清洗和轉(zhuǎn)換操作
            // 檢查缺失值
            df.select("number", "people", "dateTime").na().drop().show();
            // 對(duì)數(shù)據(jù)進(jìn)行類型轉(zhuǎn)換
            Dataset<Row> df2 = df.select(
                    col("number").cast(DataTypes.IntegerType),
                    col("people").cast(DataTypes.IntegerType),
                    to_date(col("dateTime"), "yyyy年MM月dd日").alias("dateTime")
            );
            // 去重
            Dataset<Row> df3 = df2.dropDuplicates();
            // 數(shù)據(jù)過濾,確保people列沒有負(fù)數(shù)
            Dataset<Row> df4 = df3.filter(col("people").geq(0));
//            df4.show();
            // 數(shù)據(jù)聚合,按dateTime分組,統(tǒng)計(jì)每天的總客流量
            Dataset<Row> df6 = df4.groupBy("dateTime").agg(sum("people").alias("total_people"));
//            df6.show();
            sparkForSubway(df6,"/time_subwayData.csv");
            //數(shù)據(jù)聚合,獲取每天人數(shù)最多的地鐵number
            Dataset<Row> df7 = df4.groupBy("dateTime").agg(max("people").alias("max_people"));
            sparkForSubway(df7,"/everyday_max_subwayData.csv");
            //數(shù)據(jù)聚合,計(jì)算每天的客流強(qiáng)度:每天總people除以632840
            Dataset<Row> df8 = df4.groupBy("dateTime").agg(sum("people").divide(632.84).alias("strength"));
            sparkForSubway(df8,"/everyday_strength_subwayData.csv");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void sparkForSubway(Dataset<Row> df6, String hdfsPath) throws IOException {
        // 保存處理后的數(shù)據(jù)到HDFS
        df6.coalesce(1)
                .write().mode("overwrite")
                .option("header", "true")
                .csv("hdfs://192.168.44.128:9000/time_subwayData");
        // 創(chuàng)建Hadoop配置
        Configuration conf = new Configuration();
        // 獲取FileSystem實(shí)例
        FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.44.128:9000"), conf);
        // 定義臨時(shí)目錄和目標(biāo)文件路徑
        Path tempDir = new Path("/time_subwayData");
        FileStatus[] files = fs.listStatus(tempDir);
        // 檢查目標(biāo)文件是否存在,如果存在則刪除
        Path targetFile1 = new Path(hdfsPath);
        if (fs.exists(targetFile1)) {
            fs.delete(targetFile1, true); // true 表示遞歸刪除
        }
        for (FileStatus file : files) {
            if (file.isFile() && file.getPath().getName().startsWith("part-")) {
                Path targetFile = new Path(hdfsPath);
                fs.rename(file.getPath(), targetFile);
            }
        }
        // 刪除臨時(shí)目錄
        fs.delete(tempDir, true);
    }
}

4、運(yùn)行

  • 項(xiàng)目運(yùn)行完后,打開瀏覽器
    • spark處理地鐵數(shù)據(jù)
  • http://localhost:8686/spark/dispose
  • 觀察spark和hdfs
    • http://192.168.44.128:8099/
    • http://192.168.44.128:9870/explorer.html#/

到此這篇關(guān)于SpringBoot操作spark處理hdfs文件的文章就介紹到這了,更多相關(guān)SpringBoot spark處理hdfs文件內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Android入門簡(jiǎn)單實(shí)例

    Android入門簡(jiǎn)單實(shí)例

    這篇文章主要介紹了Android入門簡(jiǎn)單實(shí)例,對(duì)于初學(xué)Android的朋友有一定的借鑒價(jià)值,需要的朋友可以參考下
    2014-08-08
  • 如何解決創(chuàng)建maven工程時(shí),產(chǎn)生“找不到插件的錯(cuò)誤”問題

    如何解決創(chuàng)建maven工程時(shí),產(chǎn)生“找不到插件的錯(cuò)誤”問題

    這篇文章主要介紹了如何解決創(chuàng)建maven工程時(shí),產(chǎn)生“找不到插件的錯(cuò)誤”問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • SpringCloud Alibaba Nacos 整合SpringBoot Admin實(shí)戰(zhàn)

    SpringCloud Alibaba Nacos 整合SpringBoot A

    這篇文章主要介紹了SpringCloud Alibaba Nacos 整合SpringBoot Admin實(shí)戰(zhàn),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • jackson json序列化實(shí)現(xiàn)首字母大寫,第二個(gè)字母需小寫

    jackson json序列化實(shí)現(xiàn)首字母大寫,第二個(gè)字母需小寫

    這篇文章主要介紹了jackson json序列化實(shí)現(xiàn)首字母大寫,第二個(gè)字母需小寫方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • Spring MVC中使用Google kaptcha驗(yàn)證碼的方法詳解

    Spring MVC中使用Google kaptcha驗(yàn)證碼的方法詳解

    kaptcha 是一個(gè)非常實(shí)用的驗(yàn)證碼生成工具。有了它,你可以生成各種樣式的驗(yàn)證碼,因?yàn)樗强膳渲玫?,下面這篇文章主要給大家介紹了關(guān)于Spring MVC中使用Google kaptcha驗(yàn)證碼的方法,需要的朋友可以參考借鑒,下面來一起看看吧。
    2017-10-10
  • Spring簡(jiǎn)明分析Bean作用域

    Spring簡(jiǎn)明分析Bean作用域

    scope用來聲明容器中的對(duì)象所應(yīng)該處的限定場(chǎng)景或者說該對(duì)象的存活時(shí)間,即容器在對(duì)象進(jìn)入其 相應(yīng)的scope之前,生成并裝配這些對(duì)象,在該對(duì)象不再處于這些scope的限定之后,容器通常會(huì)銷毀這些對(duì)象,這篇文章主要介紹了Spring中的Bean作用域,需要的朋友可以參考下
    2022-07-07
  • Java中的List接口實(shí)現(xiàn)類LinkList和ArrayList詳解

    Java中的List接口實(shí)現(xiàn)類LinkList和ArrayList詳解

    這篇文章主要介紹了Java中的List接口實(shí)現(xiàn)類LinkList和ArrayList詳解,List接口繼承自Collection接口,是單列集合的一個(gè)重要分支,實(shí)現(xiàn)了List接口的對(duì)象稱為L(zhǎng)ist集合,在List集合中允許出現(xiàn)重復(fù)的元素,所有的元素是以一種線性方式進(jìn)行存儲(chǔ)的,需要的朋友可以參考下
    2024-01-01
  • SpringBoot整合Dubbo+Zookeeper實(shí)現(xiàn)RPC調(diào)用

    SpringBoot整合Dubbo+Zookeeper實(shí)現(xiàn)RPC調(diào)用

    這篇文章主要給大家介紹了Spring Boot整合Dubbo+Zookeeper實(shí)現(xiàn)RPC調(diào)用的步驟詳解,文中有詳細(xì)的代碼示例,對(duì)我們的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下
    2023-07-07
  • idea如何自動(dòng)添加版權(quán)許可證信息

    idea如何自動(dòng)添加版權(quán)許可證信息

    這篇文章主要介紹了idea如何自動(dòng)添加版權(quán)許可證信息問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • 解決springboot讀取application.properties中文亂碼問題

    解決springboot讀取application.properties中文亂碼問題

    初用properties,讀取java properties文件的時(shí)候如果value是中文,會(huì)出現(xiàn)亂碼的問題,所以本文小編將給大家介紹如何解決springboot讀取application.properties中文亂碼問題,需要的朋友可以參考下
    2023-11-11

最新評(píng)論