欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spark網(wǎng)站日志過濾分析實例講解

 更新時間:2023年02月01日 11:27:55   作者:CarveStone  
這篇文章主要介紹了Spark網(wǎng)站日志過濾分析實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧

日志過濾

對于一個網(wǎng)站日志,首先要對它進(jìn)行過濾,刪除一些不必要的信息,我們通過scala語言來實現(xiàn),清洗代碼如下,代碼要通過別的軟件打包為jar包,此次實驗所用需要用到的代碼都被打好jar包,放到了/root/jar-files文件夾下:

package com.imooc.log
import com.imooc.log.SparkStatFormatJob.SetLogger
import com.imooc.log.util.AccessConvertUtil
import org.apache.spark.sql.{SaveMode, SparkSession}
/*
數(shù)據(jù)清洗部分
*/
object SparkStatCleanJob {
  def main(args: Array[String]): Unit = {
    SetLogger
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("SparkStatCleanJob").getOrCreate()
    val accessRDD = spark.sparkContext.textFile("/root/resources/access.log")
    accessRDD.take(4).foreach(println)
    val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),AccessConvertUtil.struct)
    accessDF.printSchema()
    //-----------------數(shù)據(jù)清洗存儲到目標(biāo)地址------------------------
    // coalesce(1)輸出指定分區(qū)數(shù)的小文件
    accessDF.coalesce(1).write.format("parquet").partitionBy("day").mode(SaveMode.Overwrite).save("/root/clean")//mode(SaveMode.Overwrite)覆蓋已經(jīng)存在的文件  存儲為parquet格式,按day分區(qū)
      //存儲為parquet格式,按day分區(qū)
    /**
      * 調(diào)優(yōu)點(diǎn):
      *     1) 控制文件輸出的大小: coalesce
      *     2) 分區(qū)字段的數(shù)據(jù)類型調(diào)整:spark.sql.sources.partitionColumnTypeInference.enabled
      *     3) 批量插入數(shù)據(jù)庫數(shù)據(jù),提交使用batch操作
      */
    spark.stop()
  }
}

過濾好的數(shù)據(jù)將被存放在/root/clean文件夾中,這部分已被執(zhí)行好,后面直接使用就可以,其中代碼開始的SetLogger功能在自定義類com.imooc.log.SparkStatFormatJob中,它關(guān)閉了大部分log日志輸出,這樣可以使界面變得簡潔,代碼如下:

def SetLogger() = {
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("com").setLevel(Level.OFF)
    System.setProperty("spark.ui.showConsoleProgress", "false")
    Logger.getRootLogger().setLevel(Level.OFF);
  }

過濾中的AccessConvertUtil類內(nèi)容如下所示:

object AccessConvertUtil {
  //定義的輸出字段
  val struct = StructType(            //過濾日志結(jié)構(gòu)
    Array(
      StructField("url", StringType), //課程URL
      StructField("cmsType", StringType), //課程類型:video / article
      StructField("cmsId", LongType), //課程編號
      StructField("traffic", LongType), //耗費(fèi)流量
      StructField("ip", StringType), //ip信息
      StructField("city", StringType), //所在城市
      StructField("time", StringType), //訪問時間
      StructField("day", StringType) //分區(qū)字段,天
    )
  )
  /**
    * 根據(jù)輸入的每一行信息轉(zhuǎn)換成輸出的樣式
    * 日志樣例:2017-05-11 14:09:14     http://www.imooc.com/video/4500    304    218.75.35.226
    */
  def parseLog(log: String) = {
    try {
      val splits = log.split("\t")
      val url = splits(1)
      //http://www.imooc.com/video/4500
      val traffic = splits(2).toLong
      val ip = splits(3)
      val domain = "http://www.imooc.com/"
      //主域名
      val cms = url.substring(url.indexOf(domain) + domain.length)    //建立一個url的子字符串,它將從domain出現(xiàn)時的位置加domain的長度的位置開始計起
      val cmsTypeId = cms.split("/") 
      var cmsType = ""
      var cmsId = 0L
      if (cmsTypeId.length > 1) {
        cmsType = cmsTypeId(0)
        cmsId = cmsTypeId(1).toLong
      }      //以"/"分隔開后,就相當(dāng)于分開了課程格式和id,以http://www.imooc.com/video/4500為例,此時cmsType=video,cmsId=4500
      val city = IpUtils.getCity(ip)         //從ip表中可以知道ip對應(yīng)哪個城市
      val time = splits(0)
      //2017-05-11 14:09:14
      val day = time.split(" ")(0).replace("-", "")    //day=20170511
      //Row中的字段要和Struct中的字段對應(yīng)
      Row(url, cmsType, cmsId, traffic, ip, city, time, day)
    } catch {
      case e: Exception => Row(0)
    }
  }
  def main(args: Array[String]): Unit = {
      //示例程序:
    val url = "http://www.imooc.com/video/4500"
    val domain = "http://www.imooc.com/" //主域名
    val index_0 = url.indexOf(domain)
    val index_1 = index_0 + domain.length
    val cms = url.substring(index_1)
    val cmsTypeId = cms.split("/")
    var cmsType = ""
    var cmsId = 0L
    if (cmsTypeId.length > 1) {
      cmsType = cmsTypeId(0)
      cmsId = cmsTypeId(1).toLong
    }
    println(cmsType + "   " + cmsId)
    val time = "2017-05-11 14:09:14"
    val day = time.split(" ")(0).replace("-", "")
    println(day)
  }
}

執(zhí)行完畢后clean文件夾下內(nèi)容如圖1所示:

日志分析

現(xiàn)在我們已經(jīng)擁有了過濾好的日志文件,可以開始編寫分析代碼,例如實現(xiàn)一個按地市統(tǒng)計主站最受歡迎的TopN課程

package com.imooc.log
import com.imooc.log.SparkStatFormatJob.SetLogger
import com.imooc.log.dao.StatDAO
import com.imooc.log.entity.{DayCityVideoAccessStat, DayVideoAccessStat, DayVideoTrafficsStat}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
object TopNStatJob2 {
  def main(args: Array[String]): Unit = {
    SetLogger
    val spark = SparkSession.builder()
      .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false") //分區(qū)字段的數(shù)據(jù)類型調(diào)整【禁用】
      .master("local[2]")
      .config("spark.sql.parquet.compression.codec","gzip")   //修改parquet壓縮格式
      .appName("SparkStatCleanJob").getOrCreate()
    //讀取清洗過后的數(shù)據(jù)
    val cleanDF = spark.read.format("parquet").load("/root/clean")
    //執(zhí)行業(yè)務(wù)前先清空當(dāng)天表中的數(shù)據(jù)
    val day = "20170511"
    import spark.implicits._
    val commonDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
    commonDF.cache()
    StatDAO.delete(day)
    cityAccessTopSata(spark, commonDF)     //按地市統(tǒng)計主站最受歡迎的TopN課程功能
    commonDF.unpersist(true)     //RDD去持久化,優(yōu)化內(nèi)存空間
    spark.stop()
  }
/*
 * 按地市統(tǒng)計主站最受歡迎的TopN課程
*/
def cityAccessTopSata(spark: SparkSession, commonDF: DataFrame): Unit = {
    //------------------使用DataFrame API完成統(tǒng)計操作--------------------------------------------
    import spark.implicits._
    val cityAccessTopNDF = commonDF
      .groupBy("day", "city", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)     //聚合
        cityAccessTopNDF.printSchema()
        cityAccessTopNDF.show(false)
     //-----------Window函數(shù)在Spark SQL中的使用--------------------
    val cityTop3DF = cityAccessTopNDF.select(       //Top3中涉及到的列
      cityAccessTopNDF("day"),
      cityAccessTopNDF("city"),
      cityAccessTopNDF("cmsId"),
      cityAccessTopNDF("times"),
      row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
        .orderBy(cityAccessTopNDF("times").desc)).as("times_rank")
    ).filter("times_rank <= 3").orderBy($"city".desc, $"times_rank".asc)         //以city為一個partition,聚合times為times_rank,過濾出前三,降序聚合city,升序聚合times_rank
    cityTop3DF.show(false) //展示每個地市的Top3
     //-------------------將統(tǒng)計結(jié)果寫入數(shù)據(jù)庫-------------------
    try {
      cityTop3DF.foreachPartition(partitionOfRecords => {
        val list = new ListBuffer[DayCityVideoAccessStat]
        partitionOfRecords.foreach(info => {        
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val city = info.getAs[String]("city")
          val times = info.getAs[Long]("times")
          val timesRank = info.getAs[Int]("times_rank")
          list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
        })
        StatDAO.insertDayCityVideoAccessTopN(list)
      })
    } catch {
      case e: Exception => e.printStackTrace()
    }
    }

其中保存統(tǒng)計時用到了StatDAO類的insertDayCityVideoAccessTopN()方法,這部分的說明如下:

def insertDayVideoTrafficsTopN(list: ListBuffer[DayVideoTrafficsStat]): Unit = {
    var connection: Connection = null
    var pstmt: PreparedStatement = null
    try {
      connection = MySQLUtils.getConnection()      //JDBC連接MySQL
      connection.setAutoCommit(false) //設(shè)置手動提交
        //向day_video_traffics_topn_stat表中插入數(shù)據(jù)
      val sql = "insert into day_video_traffics_topn_stat(day,cms_id,traffics) values(?,?,?)"         
      pstmt = connection.prepareStatement(sql)
      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setLong(3, ele.traffics)
        pstmt.addBatch() //優(yōu)化點(diǎn):批量插入數(shù)據(jù)庫數(shù)據(jù),提交使用batch操作
      }
      pstmt.executeBatch() //執(zhí)行批量處理
      connection.commit() //手工提交
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)          //釋放連接
    }
  }

JDBC連接MySQL和釋放連接用到了MySQLUtils中的方法

此外我們還需要在MySQL中插入表,用來寫入統(tǒng)計數(shù)據(jù),MySQL表已經(jīng)設(shè)置好。

下面將程序和所有依賴打包,用spark-submit提交:

./spark-submit --class com.imooc.log.TopNStatJob2 --master spark://localhost:9000 /root/jar-files/sql-1.0-jar-with-dependencies.jar

執(zhí)行結(jié)果:

Schema信息

TopN課程信息

各地區(qū)Top3課程信息

MySQL表中數(shù)據(jù):

到此這篇關(guān)于Spark網(wǎng)站日志過濾分析實例講解的文章就介紹到這了,更多相關(guān)Spark日志分析內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論