Spark-Sql入門程序示例詳解
SparkSQL運(yùn)行架構(gòu)
Spark SQL對(duì)SQL語(yǔ)句的處理,首先會(huì)將SQL語(yǔ)句進(jìn)行解析(Parse),然后形成一個(gè)Tree,在后續(xù)的如綁定、優(yōu)化等處理過(guò)程都是對(duì)Tree的操作,而操作的方法是采用Rule,通過(guò)模式匹配,對(duì)不同類型的節(jié)點(diǎn)采用不同的操作。
spark-sql是用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的模塊,是入門spark的首要模塊。
技術(shù)的學(xué)習(xí)無(wú)非就是去了解它的API,但是Spark有點(diǎn)難,因?yàn)樗睦雍途W(wǎng)上能搜到的基本都是Scala寫的。我們這里使用Java。
入門例子
數(shù)據(jù)處理的第一個(gè)例子通常都是word count,就是統(tǒng)計(jì)一個(gè)文件里每個(gè)單詞出現(xiàn)了幾次。我們也來(lái)試一下。
> 這個(gè)例子網(wǎng)上有很多,即使是通過(guò)spark實(shí)現(xiàn)的也不少;這里面大部分都是使用Scala寫的,我沒有試過(guò);少部分是通過(guò)Java寫的;
Java里面的例子有一些是使用RDD實(shí)現(xiàn)的,只有極個(gè)別是通過(guò)DataSet來(lái)做的。但即使這一小撮例子,我也跑不通。
所以我自己來(lái)嘗試完成這個(gè)例子,看到別人用Scala寫三五行就完成了,而我嘗試了一整天幾無(wú)進(jìn)展。在網(wǎng)上東拼西湊熟悉Spark的Java?
還是以我們前面的例子來(lái)改:
String logFile = "words";SparkSession spark = SparkSession.builder().appName("Simple Application").master("local").getOrCreate();Dataset<String> logData = spark.read().textFile(logFile).cache();System.out.println("行數(shù):" + logData.count());這里我不再使用之前的README文件,自己創(chuàng)建了一個(gè)words文件,內(nèi)容隨意寫了一堆單詞。
執(zhí)行程序,可以正常打印出來(lái):
接下來(lái)我們需要把句子分割成一個(gè)個(gè)單詞合在一起,然后統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù)。
> 可能有人會(huì)說(shuō),這個(gè)簡(jiǎn)單,我用Java8的流一下就處理好了:
把行集合通過(guò)flatMap處理,每一行通過(guò)split(" ")分割成一個(gè)獨(dú)立的單詞集合,再把結(jié)果通過(guò)自身groupBy一下就拿到終止數(shù)據(jù)結(jié)構(gòu)Map了。
最后把map的key和value的大小拿到就好了。
的確,使用Java就是這樣實(shí)現(xiàn)。但是Spark提供了一套和Java的流API名字和效果類似的工具,區(qū)別是Spark的是分布式API
我們通過(guò)Spark的flatMap先來(lái)處理一下:
Dataset<String> words = logData.flatMap((FlatMapFunction<String, String>) k -> Arrays.asList(k.split("\\s")).iterator(), Encoders.STRING()); System.out.println("單詞數(shù):" + words.count()); words.foreach(k -> { System.out.println("W:" + k); });
不同于Java的流,spark這個(gè)flatMap的返回值是可以直接訪問結(jié)果的:
> 可能有人留意到spark中函數(shù)式方法的參數(shù)定義和Java差距較大。他們的參數(shù)不太一樣,還多了個(gè)編碼器。目前來(lái)講我還不清楚為啥這樣定義,不過(guò)印象中編碼器也是spark3的重要優(yōu)化內(nèi)容。
再Java中使用Scala的方法總是有些怪異,Lambda表達(dá)式前面總是需要強(qiáng)制類型轉(zhuǎn)換,只是為了指明參數(shù)類型,否則需要new一個(gè)匿名類。
這個(gè)也花了我不少時(shí)間,后來(lái)找到一個(gè)網(wǎng)頁(yè)org.apache.spark.sql.Dataset.flatMap java code examples | Tabnine
再往后我迷茫了:
KeyValueGroupedDataset<String, String> group = words.groupByKey((Function1<String, String>) k -> k, Encoders.STRING());
這樣我已經(jīng)group好了,但是返回的不是DataSet,我也不知道這個(gè)返回有啥用,怎么拿到里面的內(nèi)容呢?我費(fèi)了好大勁沒搞定。
比如我發(fā)現(xiàn)count方法會(huì)返回一個(gè)DataSet:
看起來(lái)正是我想要的,但是當(dāng)我想把它輸出竟然執(zhí)行報(bào)錯(cuò):
ount.foreach(t -> { System.out.println(t); });
別說(shuō)foreach了,就算想看看里面的數(shù)量(就像一開始我們查看了文件有幾行那樣)都會(huì)報(bào)錯(cuò),錯(cuò)誤內(nèi)容一樣
count.count();
查了很多資料,大意是說(shuō)spark的計(jì)算方法都是分布式的,各個(gè)任務(wù)之間需要通信,通信時(shí)需要序列化來(lái)傳遞信息。所以上面我們能看文件行數(shù)因?yàn)轭愋褪荢tring,有序列化標(biāo)志;現(xiàn)在生成的是元組,不能序列化。我嘗試了各種方法,甚至自己創(chuàng)建新類模擬了計(jì)算過(guò)程還是不行
查了好久資料,比如Job aborted due to stage failure: Task not serializable: | Databricks Spark Knowledge Base (gitbooks.io)依然沒有解決。偶然的機(jī)會(huì)找到一個(gè)令人激動(dòng)的網(wǎng)站Spark Groupby Example with DataFrame — SparkByExamples終于解決了我的問題。
使用DataFrame
DataFrame雖然是spark提供的重要工具,但是再Java上并沒有對(duì)應(yīng)的類,只是把DataSet的泛型對(duì)象改成Row而已。注意這個(gè)Row沒有泛型定義,所以里面有哪些列不知道
可以從一開始就把DataSet轉(zhuǎn)成DataFrame:
但是可以看到要從Row里面拿數(shù)據(jù)比較麻煩。所以目前我只在需要序列化的地方轉(zhuǎn):
到此這篇關(guān)于Spark-Sql入門程序的文章就介紹到這了,更多相關(guān)Spark-Sql入門內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot自動(dòng)配置與@Configuration配置類詳解
這篇文章主要介紹了SpringBoot中的@Configuration與自動(dòng)配置,在進(jìn)行項(xiàng)目編寫前,我們還需要知道一個(gè)東西,就是SpringBoot對(duì)我們的SpringMVC還做了哪些配置,包括如何擴(kuò)展,如何定制,只有把這些都搞清楚了,我們?cè)谥笫褂貌艜?huì)更加得心應(yīng)手2022-07-07SpringBoot3集成SLF4J+logback進(jìn)行日志記錄的實(shí)現(xiàn)
本文主要介紹了SpringBoot3集成SLF4J+logback進(jìn)行日志記錄的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01使用SkyWalking監(jiān)控Java服務(wù)的過(guò)程
這篇文章主要介紹了使用SkyWalking監(jiān)控Java服務(wù),介紹一個(gè)對(duì)源碼0入侵的Java服務(wù)監(jiān)控方式,SkyWalking Agent,只需要啟動(dòng)Java程序的時(shí)候加幾個(gè)參數(shù),就能對(duì)Java服務(wù)進(jìn)行可視化監(jiān)控,需要的朋友可以參考下2023-08-08Java畢業(yè)設(shè)計(jì)實(shí)戰(zhàn)之醫(yī)院心理咨詢問診系統(tǒng)的實(shí)現(xiàn)
這是一個(gè)使用了java+Spring+Maven+mybatis+Vue+mysql開發(fā)的醫(yī)院心理咨詢問診系統(tǒng),是一個(gè)畢業(yè)設(shè)計(jì)的實(shí)戰(zhàn)練習(xí),具有心理咨詢問診該有的所有功能,感興趣的朋友快來(lái)看看吧2022-01-01java 計(jì)算中位數(shù)的實(shí)現(xiàn)方法
這篇文章主要介紹了java 計(jì)算中位數(shù)的實(shí)現(xiàn)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-08-08