Spark網(wǎng)站日志過濾分析實例講解
日志過濾
對于一個網(wǎng)站日志,首先要對它進(jìn)行過濾,刪除一些不必要的信息,我們通過scala語言來實現(xiàn),清洗代碼如下,代碼要通過別的軟件打包為jar包,此次實驗所用需要用到的代碼都被打好jar包,放到了/root/jar-files文件夾下:
package com.imooc.log import com.imooc.log.SparkStatFormatJob.SetLogger import com.imooc.log.util.AccessConvertUtil import org.apache.spark.sql.{SaveMode, SparkSession} /* 數(shù)據(jù)清洗部分 */ object SparkStatCleanJob { def main(args: Array[String]): Unit = { SetLogger val spark = SparkSession.builder() .master("local[2]") .appName("SparkStatCleanJob").getOrCreate() val accessRDD = spark.sparkContext.textFile("/root/resources/access.log") accessRDD.take(4).foreach(println) val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),AccessConvertUtil.struct) accessDF.printSchema() //-----------------數(shù)據(jù)清洗存儲到目標(biāo)地址------------------------ // coalesce(1)輸出指定分區(qū)數(shù)的小文件 accessDF.coalesce(1).write.format("parquet").partitionBy("day").mode(SaveMode.Overwrite).save("/root/clean")//mode(SaveMode.Overwrite)覆蓋已經(jīng)存在的文件 存儲為parquet格式,按day分區(qū) //存儲為parquet格式,按day分區(qū) /** * 調(diào)優(yōu)點(diǎn): * 1) 控制文件輸出的大小: coalesce * 2) 分區(qū)字段的數(shù)據(jù)類型調(diào)整:spark.sql.sources.partitionColumnTypeInference.enabled * 3) 批量插入數(shù)據(jù)庫數(shù)據(jù),提交使用batch操作 */ spark.stop() } }
過濾好的數(shù)據(jù)將被存放在/root/clean文件夾中,這部分已被執(zhí)行好,后面直接使用就可以,其中代碼開始的SetLogger功能在自定義類com.imooc.log.SparkStatFormatJob
中,它關(guān)閉了大部分log日志輸出,這樣可以使界面變得簡潔,代碼如下:
def SetLogger() = { Logger.getLogger("org").setLevel(Level.OFF) Logger.getLogger("com").setLevel(Level.OFF) System.setProperty("spark.ui.showConsoleProgress", "false") Logger.getRootLogger().setLevel(Level.OFF); }
過濾中的AccessConvertUtil
類內(nèi)容如下所示:
object AccessConvertUtil { //定義的輸出字段 val struct = StructType( //過濾日志結(jié)構(gòu) Array( StructField("url", StringType), //課程URL StructField("cmsType", StringType), //課程類型:video / article StructField("cmsId", LongType), //課程編號 StructField("traffic", LongType), //耗費(fèi)流量 StructField("ip", StringType), //ip信息 StructField("city", StringType), //所在城市 StructField("time", StringType), //訪問時間 StructField("day", StringType) //分區(qū)字段,天 ) ) /** * 根據(jù)輸入的每一行信息轉(zhuǎn)換成輸出的樣式 * 日志樣例:2017-05-11 14:09:14 http://www.imooc.com/video/4500 304 218.75.35.226 */ def parseLog(log: String) = { try { val splits = log.split("\t") val url = splits(1) //http://www.imooc.com/video/4500 val traffic = splits(2).toLong val ip = splits(3) val domain = "http://www.imooc.com/" //主域名 val cms = url.substring(url.indexOf(domain) + domain.length) //建立一個url的子字符串,它將從domain出現(xiàn)時的位置加domain的長度的位置開始計起 val cmsTypeId = cms.split("/") var cmsType = "" var cmsId = 0L if (cmsTypeId.length > 1) { cmsType = cmsTypeId(0) cmsId = cmsTypeId(1).toLong } //以"/"分隔開后,就相當(dāng)于分開了課程格式和id,以http://www.imooc.com/video/4500為例,此時cmsType=video,cmsId=4500 val city = IpUtils.getCity(ip) //從ip表中可以知道ip對應(yīng)哪個城市 val time = splits(0) //2017-05-11 14:09:14 val day = time.split(" ")(0).replace("-", "") //day=20170511 //Row中的字段要和Struct中的字段對應(yīng) Row(url, cmsType, cmsId, traffic, ip, city, time, day) } catch { case e: Exception => Row(0) } } def main(args: Array[String]): Unit = { //示例程序: val url = "http://www.imooc.com/video/4500" val domain = "http://www.imooc.com/" //主域名 val index_0 = url.indexOf(domain) val index_1 = index_0 + domain.length val cms = url.substring(index_1) val cmsTypeId = cms.split("/") var cmsType = "" var cmsId = 0L if (cmsTypeId.length > 1) { cmsType = cmsTypeId(0) cmsId = cmsTypeId(1).toLong } println(cmsType + " " + cmsId) val time = "2017-05-11 14:09:14" val day = time.split(" ")(0).replace("-", "") println(day) } }
執(zhí)行完畢后clean文件夾下內(nèi)容如圖1所示:
日志分析
現(xiàn)在我們已經(jīng)擁有了過濾好的日志文件,可以開始編寫分析代碼,例如實現(xiàn)一個按地市統(tǒng)計主站最受歡迎的TopN課程
package com.imooc.log import com.imooc.log.SparkStatFormatJob.SetLogger import com.imooc.log.dao.StatDAO import com.imooc.log.entity.{DayCityVideoAccessStat, DayVideoAccessStat, DayVideoTrafficsStat} import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.mutable.ListBuffer object TopNStatJob2 { def main(args: Array[String]): Unit = { SetLogger val spark = SparkSession.builder() .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false") //分區(qū)字段的數(shù)據(jù)類型調(diào)整【禁用】 .master("local[2]") .config("spark.sql.parquet.compression.codec","gzip") //修改parquet壓縮格式 .appName("SparkStatCleanJob").getOrCreate() //讀取清洗過后的數(shù)據(jù) val cleanDF = spark.read.format("parquet").load("/root/clean") //執(zhí)行業(yè)務(wù)前先清空當(dāng)天表中的數(shù)據(jù) val day = "20170511" import spark.implicits._ val commonDF = cleanDF.filter($"day" === day && $"cmsType" === "video") commonDF.cache() StatDAO.delete(day) cityAccessTopSata(spark, commonDF) //按地市統(tǒng)計主站最受歡迎的TopN課程功能 commonDF.unpersist(true) //RDD去持久化,優(yōu)化內(nèi)存空間 spark.stop() } /* * 按地市統(tǒng)計主站最受歡迎的TopN課程 */ def cityAccessTopSata(spark: SparkSession, commonDF: DataFrame): Unit = { //------------------使用DataFrame API完成統(tǒng)計操作-------------------------------------------- import spark.implicits._ val cityAccessTopNDF = commonDF .groupBy("day", "city", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc) //聚合 cityAccessTopNDF.printSchema() cityAccessTopNDF.show(false) //-----------Window函數(shù)在Spark SQL中的使用-------------------- val cityTop3DF = cityAccessTopNDF.select( //Top3中涉及到的列 cityAccessTopNDF("day"), cityAccessTopNDF("city"), cityAccessTopNDF("cmsId"), cityAccessTopNDF("times"), row_number().over(Window.partitionBy(cityAccessTopNDF("city")) .orderBy(cityAccessTopNDF("times").desc)).as("times_rank") ).filter("times_rank <= 3").orderBy($"city".desc, $"times_rank".asc) //以city為一個partition,聚合times為times_rank,過濾出前三,降序聚合city,升序聚合times_rank cityTop3DF.show(false) //展示每個地市的Top3 //-------------------將統(tǒng)計結(jié)果寫入數(shù)據(jù)庫------------------- try { cityTop3DF.foreachPartition(partitionOfRecords => { val list = new ListBuffer[DayCityVideoAccessStat] partitionOfRecords.foreach(info => { val day = info.getAs[String]("day") val cmsId = info.getAs[Long]("cmsId") val city = info.getAs[String]("city") val times = info.getAs[Long]("times") val timesRank = info.getAs[Int]("times_rank") list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank)) }) StatDAO.insertDayCityVideoAccessTopN(list) }) } catch { case e: Exception => e.printStackTrace() } }
其中保存統(tǒng)計時用到了StatDAO類的insertDayCityVideoAccessTopN()方法,這部分的說明如下:
def insertDayVideoTrafficsTopN(list: ListBuffer[DayVideoTrafficsStat]): Unit = { var connection: Connection = null var pstmt: PreparedStatement = null try { connection = MySQLUtils.getConnection() //JDBC連接MySQL connection.setAutoCommit(false) //設(shè)置手動提交 //向day_video_traffics_topn_stat表中插入數(shù)據(jù) val sql = "insert into day_video_traffics_topn_stat(day,cms_id,traffics) values(?,?,?)" pstmt = connection.prepareStatement(sql) for (ele <- list) { pstmt.setString(1, ele.day) pstmt.setLong(2, ele.cmsId) pstmt.setLong(3, ele.traffics) pstmt.addBatch() //優(yōu)化點(diǎn):批量插入數(shù)據(jù)庫數(shù)據(jù),提交使用batch操作 } pstmt.executeBatch() //執(zhí)行批量處理 connection.commit() //手工提交 } catch { case e: Exception => e.printStackTrace() } finally { MySQLUtils.release(connection, pstmt) //釋放連接 } }
JDBC連接MySQL和釋放連接用到了MySQLUtils中的方法
此外我們還需要在MySQL中插入表,用來寫入統(tǒng)計數(shù)據(jù),MySQL表已經(jīng)設(shè)置好。
下面將程序和所有依賴打包,用spark-submit提交:
./spark-submit --class com.imooc.log.TopNStatJob2 --master spark://localhost:9000 /root/jar-files/sql-1.0-jar-with-dependencies.jar
執(zhí)行結(jié)果:
Schema信息
TopN課程信息
各地區(qū)Top3課程信息
MySQL表中數(shù)據(jù):
到此這篇關(guān)于Spark網(wǎng)站日志過濾分析實例講解的文章就介紹到這了,更多相關(guān)Spark日志分析內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JAVA中使用JSON進(jìn)行數(shù)據(jù)傳遞示例
本篇文章主要介紹了JAVA中使用JSON進(jìn)行數(shù)據(jù)傳遞示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-01-01java面試突擊之sleep和wait有什么區(qū)別詳析
按理來說sleep和wait本身就是八竿子打不著的兩個東西,但是在實際使用中大家都喜歡拿他們來做比較,或許是因為它們都可以讓線程處于阻塞狀態(tài),這篇文章主要給大家介紹了關(guān)于java面試突擊之sleep和wait有什么區(qū)別的相關(guān)資料,需要的朋友可以參考下2022-02-02Java網(wǎng)絡(luò)編程之簡單的服務(wù)端客戶端應(yīng)用實例
這篇文章主要介紹了Java網(wǎng)絡(luò)編程之簡單的服務(wù)端客戶端應(yīng)用,以實例形式較為詳細(xì)的分析了java網(wǎng)絡(luò)編程的原理與服務(wù)器端客戶端的實現(xiàn)技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-04-04Java使用組件編寫窗口實現(xiàn)網(wǎng)上文件下載
這篇文章主要為大家詳細(xì)介紹了Java使用組件編寫窗口實現(xiàn)網(wǎng)上文件下載的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-02-02JAVA應(yīng)用系統(tǒng)工具快捷托盤實例代碼
JAVA應(yīng)用系統(tǒng)工具快捷托盤實例代碼,需要的朋友可以參考一下2013-02-02Mybatis日期格式自動轉(zhuǎn)換需要用到的兩個注解說明
這篇文章主要介紹了Mybatis日期格式自動轉(zhuǎn)換需要用到的兩個注解說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08java多線程事務(wù)加鎖引發(fā)bug用戶重復(fù)注冊解決分析
這篇文章主要為大家介紹了java多線程事務(wù)加鎖引發(fā)bug用戶重復(fù)注冊解決分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11