Apache?Hudi異步Clustering部署操作的掌握
1. 摘要
在之前的一篇博客中,我們介紹了Clustering(聚簇)的表服務(wù)來重新組織數(shù)據(jù)來提供更好的查詢性能,而不用降低攝取速度,并且我們已經(jīng)知道如何部署同步Clustering,本篇博客中,我們將討論近期社區(qū)做的一些改進(jìn)以及如何通過HoodieClusteringJob
和DeltaStreamer
工具來部署異步Clustering。
2. 介紹
通常講,Clustering
根據(jù)可配置的策略創(chuàng)建一個計(jì)劃,根據(jù)特定規(guī)則對符合條件的文件進(jìn)行分組,然后執(zhí)行該計(jì)劃。Hudi支持并發(fā)寫入,并在多個表服務(wù)之間提供快照隔離,從而允許寫入程序在后臺運(yùn)行Clustering
時繼續(xù)攝取。有關(guān)Clustering
的體系結(jié)構(gòu)的更詳細(xì)概述請查看上一篇博文。
3. Clustering策略
如前所述Clustering
計(jì)劃和執(zhí)行取決于可插拔的配置策略。這些策略大致可分為三類:計(jì)劃策略、執(zhí)行策略和更新策略。
3.1 計(jì)劃策略
該策略在創(chuàng)建Clustering計(jì)劃時發(fā)揮作用。它有助于決定應(yīng)該對哪些文件組進(jìn)行Clustering。讓我們看一下Hudi提供的不同計(jì)劃策略。請注意,使用此配置可以輕松地插拔這些策略。
- SparkSizeBasedClusteringPlanStrategy:根據(jù)基本文件的小文件限制選擇文件切片并創(chuàng)建
Clustering
組,最大大小為每個組允許的最大文件大小。可以使用此配置指定最大大小。此策略對于將中等大小的文件合并成大文件非常有用,以減少跨冷分區(qū)分布的大量文件。 - SparkRecentDaysClusteringPlanStrategy:根據(jù)以前的
N
天分區(qū)創(chuàng)建一個計(jì)劃,將這些分區(qū)中的小文件片進(jìn)行Clustering
,這是默認(rèn)策略,當(dāng)工作負(fù)載是可預(yù)測的并且數(shù)據(jù)是按時間劃分時,它可能很有用。 - SparkSelectedPartitionsClusteringPlanStrategy:如果只想對某個范圍內(nèi)的特定分區(qū)進(jìn)行
Clustering
,那么無論這些分區(qū)是新分區(qū)還是舊分區(qū),此策略都很有用,要使用此策略,還需要在下面設(shè)置兩個配置(包括開始和結(jié)束分區(qū)):
hoodie.clustering.plan.strategy.cluster.begin.partition hoodie.clustering.plan.strategy.cluster.end.partition
注意:所有策略都是分區(qū)感知的,后兩種策略仍然受到第一種策略的大小限制的約束。
3.2 執(zhí)行策略
在計(jì)劃階段構(gòu)建Clustering
組后,Hudi主要根據(jù)排序列和大小為每個組應(yīng)用執(zhí)行策略,可以使用此配置指定策略。
SparkSortAndSizeExecutionStrategy
是默認(rèn)策略。使用此配置進(jìn)行Clustering
時,用戶可以指定數(shù)據(jù)排序列。除此之外我們還可以為Clustering
產(chǎn)生的Parquet文件設(shè)置最大文件大小。該策略使用bulk_insert
將數(shù)據(jù)寫入新文件,在這種情況下,Hudi隱式使用一個分區(qū)器,該分區(qū)器根據(jù)指定列進(jìn)行排序。通過這種策略改變數(shù)據(jù)布局,不僅提高了查詢性能,而且自動平衡了重寫開銷。
現(xiàn)在該策略可以作為單個Spark作業(yè)或多個作業(yè)執(zhí)行,具體取決于在計(jì)劃階段創(chuàng)建的Clustering
組的數(shù)量。默認(rèn)情況下Hudi將提交多個Spark作業(yè)并合并結(jié)果。如果要強(qiáng)制Hudi使用單Spark作業(yè),請將執(zhí)行策略類配置設(shè)置為SingleSparkJobExecutionStrategy
。
3.3 更新策略
目前只能為未接收任何并發(fā)更新的表/分區(qū)調(diào)度Clustering
。默認(rèn)情況下更新策略的配置設(shè)置為SparkRejectUpdateStrategy
。如果某個文件組在Clustering
期間有更新,則它將拒絕更新并引發(fā)異常。然而在某些用例中,更新是非常稀疏的,并且不涉及大多數(shù)文件組。簡單拒絕更新的默認(rèn)策略似乎不公平。在這種用例中用戶可以將配置設(shè)置為SparkAllowUpdateStregy
。
我們討論了關(guān)鍵策略配置,下面列出了與Clustering
相關(guān)的所有其他配置。在此列表中一些非常有用的配置包括:
配置項(xiàng) | 解釋 | 默認(rèn)值 |
---|---|---|
hoodie.clustering.async.enabled | 啟用在表上的異步運(yùn)行Clustering服務(wù)。 | false |
hoodie.clustering.async.max.commits | 通過指定應(yīng)觸發(fā)多少次提交來控制異步Clustering的頻率。 | 4 |
hoodie.clustering.preserve.commit.metadata | 重寫數(shù)據(jù)時保留現(xiàn)有的_hoodie_commit_time。這意味著用戶可以在Clustering數(shù)據(jù)上運(yùn)行增量查詢,而不會產(chǎn)生任何副作用。 | false |
4. 異步Clustering
之前我們已經(jīng)了解了用戶如何設(shè)置同步Clustering。此外用戶可以利用HoodiecClusteringJob設(shè)置兩步異步Clustering。
4.1 HoodieClusteringJob
隨著Hudi版本0.9.0的發(fā)布,我們可以在同一步驟中調(diào)度和執(zhí)行Clustering
。我們只需要指定-mode
或-m
選項(xiàng)。有如下三種模式:
schedule
(調(diào)度):制定一個Clustering計(jì)劃。這提供了一個可以在執(zhí)行模式下傳遞的instant
。
execute
(執(zhí)行):在給定的instant
執(zhí)行Clustering計(jì)劃,這意味著這里需要instant
。
scheduleAndExecute
(調(diào)度并執(zhí)行):首先制定Clustering計(jì)劃并立即執(zhí)行該計(jì)劃。
請注意要在原始寫入程序仍在運(yùn)行時運(yùn)行作業(yè)請啟用多寫入:
hoodie.write.concurrency.mode=optimistic_concurrency_control hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
使用spark submit
命令提交HoodieClusteringJob
示例如下:
spark-submit \ --class org.apache.hudi.utilities.HoodieClusteringJob \ /path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \ --props /path/to/config/clusteringjob.properties \ --mode scheduleAndExecute \ --base-path /path/to/hudi_table/basePath \ --table-name hudi_table_schedule_clustering \ --spark-memory 1g
clusteringjob.properties
配置文件示例如下
hoodie.clustering.async.enabled=true hoodie.clustering.async.max.commits=4 hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824 hoodie.clustering.plan.strategy.small.file.limit=629145600 hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy hoodie.clustering.plan.strategy.sort.columns=column1,column2
4.2 HoodieDeltaStreamer
接著看下如何使用HudiDeltaStreamer
。現(xiàn)在我們可以使用DeltaStreamer
觸發(fā)異步Clustering。只需將hoodie.clustering.async.enabled為true
,并在屬性文件中指定其他Clustering配置,在啟動Deltastreamer
時可以將其位置設(shè)為-props
(與HoodieClusteringJob
配置類似)。
使用spark submit
命令提交HoodieDeltaStreamer
示例如下:
spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ /path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \ --props /path/to/config/clustering_kafka.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --source-ordering-field impresssiontime \ --table-type COPY_ON_WRITE \ --target-base-path /path/to/hudi_table/basePath \ --target-table impressions_cow_cluster \ --op INSERT \ --hoodie-conf hoodie.clustering.async.enabled=true \ --continuous
4.3 Spark Structured Streaming
我們還可以使用Spark結(jié)構(gòu)化流啟用異步Clustering,如下所示。
val commonOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" ) def getAsyncClusteringOpts(isAsyncClustering: String, clusteringNumCommit: String, executionStrategy: String):Map[String, String] = { commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering, HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit, HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key -> executionStrategy ) } def initStreamingWriteFuture(hudiOptions: Map[String, String]): Future[Unit] = { val streamingInput = // define the source of streaming Future { println("streaming starting") streamingInput .writeStream .format("org.apache.hudi") .options(hudiOptions) .option("checkpointLocation", basePath + "/checkpoint") .mode(Append) .start() .awaitTermination(10000) println("streaming ends") } } def structuredStreamingWithClustering(): Unit = { val df = //generate data frame val hudiOptions = getClusteringOpts("true", "1", "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy") val f1 = initStreamingWriteFuture(hudiOptions) Await.result(f1, Duration.Inf) }
5. 總結(jié)和未來工作
在這篇文章中,我們討論了不同的Clustering策略以及如何設(shè)置異步Clustering。未來的工作包括:
Clustering支持更新。
支持Clustering的CLI工具。
另外Flink支持Clustering已經(jīng)有相應(yīng)Pull Request,有興趣的小伙伴可以關(guān)注該P(yáng)R。
以上就是Apache Hudi異步Clustering部署操作的掌握的詳細(xì)內(nèi)容,更多關(guān)于Apache Hudi異步Clustering部署的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Springboot 整合通用mapper和pagehelper展示分頁數(shù)據(jù)的問題(附github源碼)
這篇文章主要介紹了Springboot 整合通用mapper和pagehelper展示分頁數(shù)據(jù)(附github源碼),本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-09-09java 解決異常 2 字節(jié)的 UTF-8 序列的字節(jié)2 無效的問題
這篇文章主要介紹了java 解決異常 2 字節(jié)的 UTF-8 序列的字節(jié) 2 無效的問題的相關(guān)資料,需要的朋友可以參考下2016-12-12詳解Spring?Security?捕獲?filter?層面異常返回我們自定義的內(nèi)容
Spring?的異常會轉(zhuǎn)發(fā)到?BasicErrorController?中進(jìn)行異常寫入,然后才會返回客戶端。所以,我們可以在?BasicErrorController?對?filter異常進(jìn)行捕獲并處理,下面通過本文給大家介紹Spring?Security?捕獲?filter?層面異常,返回我們自定義的內(nèi)容,感興趣的朋友一起看看吧2022-05-05Java注解機(jī)制之Spring自動裝配實(shí)現(xiàn)原理詳解
這篇文章主要為大家詳細(xì)介紹了Java注解機(jī)制之Spring自動裝配實(shí)現(xiàn)原理,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-10-10使用WebUploader實(shí)現(xiàn)分片斷點(diǎn)上傳文件功能(二)
這篇文章主要為大家詳細(xì)介紹了使用WebUploader實(shí)現(xiàn)分片斷點(diǎn)上傳文件功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-01-01SpringBoot實(shí)現(xiàn)阿里云快遞物流查詢的示例代碼
本文將基于springboot實(shí)現(xiàn)快遞物流查詢,物流信息的獲取通過阿里云第三方實(shí)現(xiàn),具有一定的參考價值,感興趣的可以了解一下2021-10-10使用spring+maven不同環(huán)境讀取配置方式
這篇文章主要介紹了使用spring+maven不同環(huán)境讀取配置方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08