IDEA?中使用?Hudi的示例代碼
環(huán)境準(zhǔn)備
創(chuàng)建 Maven 項(xiàng)目創(chuàng)建服務(wù)器遠(yuǎn)程連接
Tools------Delployment-----Browse Remote Host
設(shè)置如下內(nèi)容:
在這里輸入服務(wù)器的賬號(hào)和密碼
點(diǎn)擊Test Connection,提示Successfully的話,就說明配置成功。
復(fù)制Hadoop的 core-site.xml、hdfs-site.xml 以及 log4j.properties 三個(gè)文件復(fù)制到resources文件夾下。
設(shè)置 log4j.properties 為打印警告異常信息:
log4j.rootCategory=WARN, console
4.添加 pom.xml 文件
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.12.10</scala.version> <scala.binary.version>2.12</scala.binary.version> <spark.version>3.0.0</spark.version> <hadoop.version>2.7.3</hadoop.version> <hudi.version>0.9.0</hudi.version> </properties> <dependencies> <!-- 依賴Scala語言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark SQL 依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依賴 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- hudi-spark3 --> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark3-bundle_2.12</artifactId> <version>${hudi.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-avro_2.12</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 編譯的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
要注釋掉創(chuàng)建項(xiàng)目時(shí)的生成的下面的代碼,不然依賴一直報(bào)錯(cuò):
<!-- <properties>-->
<!-- <maven.compiler.source>8</maven.compiler.source>-->
<!-- <maven.compiler.target>8</maven.compiler.target>-->
<!-- </properties>-->
代碼結(jié)構(gòu):
核心代碼
import org.apache.hudi.QuickstartUtils.DataGenerator import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} /** * Hudi 數(shù)據(jù)湖的框架,基于Spark計(jì)算引擎,對(duì)數(shù)據(jù)進(jìn)行CURD操作,使用官方模擬賽生成的出租車出行數(shù)據(jù) * * 任務(wù)一:模擬數(shù)據(jù),插入Hudi表,采用COW模式 * 任務(wù)二:快照方式查詢(Snapshot Query)數(shù)據(jù),采用DSL方式 * 任務(wù)三:更新(Update)數(shù)據(jù) * 任務(wù)四:增量查詢(Incremental Query)數(shù)據(jù),采用SQL方式 * 任務(wù)五:刪除(Delete)數(shù)據(jù) */ object HudiSparkDemo { /** * 官方案例:模擬產(chǎn)生數(shù)據(jù),插入Hudi表,表的類型為COW */ def insertData(spark: SparkSession, table: String, path: String): Unit = { import spark.implicits._ // 第1步、模擬乘車數(shù)據(jù) import org.apache.hudi.QuickstartUtils._ val dataGen: DataGenerator = new DataGenerator() val inserts = convertToStringList(dataGen.generateInserts(100)) import scala.collection.JavaConverters._ val insertDF: DataFrame = spark.read.json( spark.sparkContext.parallelize(inserts.asScala, 2).toDS() ) // insertDF.printSchema() // insertDF.show(10, truncate = false) //第二步: 插入數(shù)據(jù)到Hudi表 import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ insertDF.write .mode(SaveMode.Append) .format("hudi") .option("hoodie.insert.shuffle.parallelism", 2) .option("hoodie.insert.shuffle.parallelism", 2) //Hudi表的屬性設(shè)置 .option(PRECOMBINE_FIELD.key(), "ts") .option(RECORDKEY_FIELD.key(), "uuid") .option(PARTITIONPATH_FIELD.key(), "partitionpath") .option(TBL_NAME.key(), table) .save(path) } /** * 采用Snapshot Query快照方式查詢表的數(shù)據(jù) */ def queryData(spark: SparkSession, path: String): Unit = { import spark.implicits._ val tripsDF: DataFrame = spark.read.format("hudi").load(path) // tripsDF.printSchema() // tripsDF.show(10, truncate = false) //查詢費(fèi)用大于10,小于50的乘車數(shù)據(jù) tripsDF .filter($"fare" >= 20 && $"fare" <=50) .select($"driver", $"rider", $"fare", $"begin_lat", $"begin_lon", $"partitionpath", $"_hoodie_commit_time") .orderBy($"fare".desc, $"_hoodie_commit_time".desc) .show(20, truncate = false) } def queryDataByTime(spark: SparkSession, path: String):Unit = { import org.apache.spark.sql.functions._ //方式一:指定字符串,按照日期時(shí)間過濾獲取數(shù)據(jù) val df1 = spark.read .format("hudi") .option("as.of.instant", "20220610160908") .load(path) .sort(col("_hoodie_commit_time").desc) df1.printSchema() df1.show(numRows = 5, truncate = false) //方式二:指定字符串,按照日期時(shí)間過濾獲取數(shù)據(jù) val df2 = spark.read .format("hudi") .option("as.of.instant", "2022-06-10 16:09:08") .load(path) .sort(col("_hoodie_commit_time").desc) df2.printSchema() df2.show(numRows = 5, truncate = false) } /** * 將DataGenerator作為參數(shù)傳入生成數(shù)據(jù) */ def insertData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator): Unit = { import spark.implicits._ // 第1步、模擬乘車數(shù)據(jù) import org.apache.hudi.QuickstartUtils._ val inserts = convertToStringList(dataGen.generateInserts(100)) import scala.collection.JavaConverters._ val insertDF: DataFrame = spark.read.json( spark.sparkContext.parallelize(inserts.asScala, 2).toDS() ) // insertDF.printSchema() // insertDF.show(10, truncate = false) //第二步: 插入數(shù)據(jù)到Hudi表 import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ insertDF.write //更換為Overwrite模式 .mode(SaveMode.Overwrite) .format("hudi") .option("hoodie.insert.shuffle.parallelism", 2) .option("hoodie.insert.shuffle.parallelism", 2) //Hudi表的屬性設(shè)置 .option(PRECOMBINE_FIELD.key(), "ts") .option(RECORDKEY_FIELD.key(), "uuid") .option(PARTITIONPATH_FIELD.key(), "partitionpath") .option(TBL_NAME.key(), table) .save(path) } /** * 模擬產(chǎn)生Hudi表中更新數(shù)據(jù),將其更新到Hudi表中 */ def updateData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator):Unit = { import spark.implicits._ // 第1步、模擬乘車數(shù)據(jù) import org.apache.hudi.QuickstartUtils._ //產(chǎn)生更新的數(shù)據(jù) val updates = convertToStringList(dataGen.generateUpdates(100)) import scala.collection.JavaConverters._ val updateDF: DataFrame = spark.read.json( spark.sparkContext.parallelize(updates.asScala, 2).toDS() ) // TOOD: 第2步、插入數(shù)據(jù)到Hudi表 import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ updateDF.write //追加模式 .mode(SaveMode.Append) .format("hudi") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") // Hudi 表的屬性值設(shè)置 .option(PRECOMBINE_FIELD.key(), "ts") .option(RECORDKEY_FIELD.key(), "uuid") .option(PARTITIONPATH_FIELD.key(), "partitionpath") .option(TBL_NAME.key(), table) .save(path) } /** * 采用Incremental Query增量方式查詢數(shù)據(jù),需要指定時(shí)間戳 */ def incrementalQueryData(spark: SparkSession, path: String): Unit = { import spark.implicits._ // 第1步、加載Hudi表數(shù)據(jù),獲取commit time時(shí)間,作為增量查詢數(shù)據(jù)閾值 import org.apache.hudi.DataSourceReadOptions._ spark.read .format("hudi") .load(path) .createOrReplaceTempView("view_temp_hudi_trips") val commits: Array[String] = spark .sql( """ |select | distinct(_hoodie_commit_time) as commitTime |from | view_temp_hudi_trips |order by | commitTime DESC |""".stripMargin ) .map(row => row.getString(0)) .take(50) val beginTime = commits(commits.length - 1) // commit time we are interested in println(s"beginTime = ${beginTime}") // 第2步、設(shè)置Hudi數(shù)據(jù)CommitTime時(shí)間閾值,進(jìn)行增量數(shù)據(jù)查詢 val tripsIncrementalDF = spark.read .format("hudi") // 設(shè)置查詢數(shù)據(jù)模式為:incremental,增量讀取 .option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL) // 設(shè)置增量讀取數(shù)據(jù)時(shí)開始時(shí)間 .option(BEGIN_INSTANTTIME.key(), beginTime) .load(path) // 第3步、將增量查詢數(shù)據(jù)注冊(cè)為臨時(shí)視圖,查詢費(fèi)用大于20數(shù)據(jù) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark .sql( """ |select | `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts |from | hudi_trips_incremental |where | fare > 20.0 |""".stripMargin ) .show(10, truncate = false) } /** * 刪除Hudi表數(shù)據(jù),依據(jù)主鍵uuid進(jìn)行刪除,如果是分區(qū)表,指定分區(qū)路徑 */ def deleteData(spark: SparkSession, table: String, path: String): Unit = { import spark.implicits._ // 第1步、加載Hudi表數(shù)據(jù),獲取條目數(shù) val tripsDF: DataFrame = spark.read.format("hudi").load(path) println(s"Raw Count = ${tripsDF.count()}") // 第2步、模擬要?jiǎng)h除的數(shù)據(jù),從Hudi中加載數(shù)據(jù),獲取幾條數(shù)據(jù),轉(zhuǎn)換為要?jiǎng)h除數(shù)據(jù)集合 val dataframe = tripsDF.limit(2).select($"uuid", $"partitionpath") import org.apache.hudi.QuickstartUtils._ val dataGenerator = new DataGenerator() val deletes = dataGenerator.generateDeletes(dataframe.collectAsList()) import scala.collection.JavaConverters._ val deleteDF = spark.read.json(spark.sparkContext.parallelize(deletes.asScala, 2)) // 第3步、保存數(shù)據(jù)到Hudi表中,設(shè)置操作類型:DELETE import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ deleteDF.write .mode(SaveMode.Append) .format("hudi") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") // 設(shè)置數(shù)據(jù)操作類型為delete,默認(rèn)值為upsert .option(OPERATION.key(), "delete") .option(PRECOMBINE_FIELD.key(), "ts") .option(RECORDKEY_FIELD.key(), "uuid") .option(PARTITIONPATH_FIELD.key(), "partitionpath") .option(TBL_NAME.key(), table) .save(path) // 第4步、再次加載Hudi表數(shù)據(jù),統(tǒng)計(jì)條目數(shù),查看是否減少2條數(shù)據(jù) val hudiDF: DataFrame = spark.read.format("hudi").load(path) println(s"Delete After Count = ${hudiDF.count()}") } def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME","hty") //創(chuàng)建SparkSession示例對(duì)象,設(shè)置屬性 val spark: SparkSession = { SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[2]") // 設(shè)置序列化方式:Kryo .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() } //定義變量:表名稱、保存路徑 val tableName: String = "tbl_trips_cow" val tablePath: String = "/hudi_warehouse/tbl_trips_cow" //構(gòu)建數(shù)據(jù)生成器,模擬產(chǎn)生業(yè)務(wù)數(shù)據(jù) import org.apache.hudi.QuickstartUtils._ //任務(wù)一:模擬數(shù)據(jù),插入Hudi表,采用COW模式 //insertData(spark, tableName, tablePath) //任務(wù)二:快照方式查詢(Snapshot Query)數(shù)據(jù),采用DSL方式 //queryData(spark, tablePath) //queryDataByTime(spark, tablePath) // 任務(wù)三:更新(Update)數(shù)據(jù),第1步、模擬產(chǎn)生數(shù)據(jù),第2步、模擬產(chǎn)生數(shù)據(jù),針對(duì)第1步數(shù)據(jù)字段值更新, // 第3步、將數(shù)據(jù)更新到Hudi表中 val dataGen: DataGenerator = new DataGenerator() //insertData(spark, tableName, tablePath, dataGen) //updateData(spark, tableName, tablePath, dataGen) //任務(wù)四:增量查詢(Incremental Query)數(shù)據(jù),采用SQL方式 //incrementalQueryData(spark, tablePath) //任務(wù)五:刪除(Delete)數(shù)據(jù) deleteData(spark, tableName,tablePath) //應(yīng)用結(jié)束,關(guān)閉資源 spark.stop() } }
測試
執(zhí)行 insertData(spark, tableName, tablePath) 方法后對(duì)其用快照查詢的方式進(jìn)行查詢:
queryData(spark, tablePath)
增量查詢(Incremental Query)數(shù)據(jù):
incrementalQueryData(spark, tablePath)
參考資料
https://www.bilibili.com/video/BV1sb4y1n7hK?p=21&vd_source=e21134e00867aeadc3c6b37bb38b9eee
到此這篇關(guān)于IDEA 中使用 Hudi的文章就介紹到這了,更多相關(guān)IDEA 使用 Hudi內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于Mybatis中SQL節(jié)點(diǎn)的深入解析
這篇文章主要給大家介紹了關(guān)于Mybatis中SQL節(jié)點(diǎn)的深入解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2022-03-03關(guān)于Java8 parallelStream并發(fā)安全的深入講解
這篇文章主要給大家介紹了關(guān)于Java8 parallelStream并發(fā)安全的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-10-10在Eclipse中運(yùn)行Solr 基礎(chǔ)知識(shí)
Solr我還是個(gè)菜鳥,寫這一些文章只是記錄一下最近一段時(shí)間學(xué)習(xí)Solr的心得,望各位同仁不要見笑,還希望多多指點(diǎn)2012-11-11詳解用Eclipse如何創(chuàng)建Web項(xiàng)目
本篇文章主要介紹了詳解用Eclipse如何創(chuàng)建Web項(xiàng)目,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-12-12Java及Android中常用鏈?zhǔn)秸{(diào)用寫法簡單示例
這篇文章主要介紹了Java及Android中常用鏈?zhǔn)秸{(diào)用寫法,結(jié)合實(shí)例形式分析了java編程中的鏈?zhǔn)秸{(diào)用概念、簡單使用方法及相關(guān)操作技巧,需要的朋友可以參考下2018-01-01Hibernate基于ThreadLocal管理Session過程解析
這篇文章主要介紹了Hibernate基于ThreadLocal管理Session過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10