欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java 中Spark中將對象序列化存儲到hdfs

 更新時間:2017年06月09日 14:12:18   作者:小水熊  
這篇文章主要介紹了java 中Spark中將對象序列化存儲到hdfs的相關(guān)資料,需要的朋友可以參考下

java 中Spark中將對象序列化存儲到hdfs

摘要: Spark應(yīng)用中經(jīng)常會遇到這樣一個需求: 需要將JAVA對象序列化并存儲到HDFS, 尤其是利用MLlib計算出來的一些模型, 存儲到hdfs以便模型可以反復(fù)利用. 下面的例子演示了Spark環(huán)境下從Hbase讀取數(shù)據(jù), 生成一個word2vec模型, 存儲到hdfs.

廢話不多說, 直接貼代碼了. spark1.4 + hbase0.98

import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source

object Word2VecDemo {

 def convertScanToString(scan: Scan) = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
 }

 def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  sparkConf.set("spark.kryoserializer.buffer", "256m")
  sparkConf.set("spark.kryoserializer.buffer.max","2046m")
  sparkConf.set("spark.akka.frameSize", "500")
  sparkConf.set("spark.rpc.askTimeout", "30")
  

  val sc = new SparkContext(sparkConf)
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

  val scan = new Scan()
  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
  
  val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
  
  val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
  "data".getBytes,
  "article".getBytes,
  CompareOp.EQUAL,
  comp
  )
  
  filterList.addFilter(articleFilter)
  filterList.addFilter(new PageFilter(100))
  
  scan.setFilter(filterList)
  scan.setCaching(50)
  scan.setCacheBlocks(false)
  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

  val crawledRDD = sc.newAPIHadoopRDD(
   hbaseConf,
   classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result]
  )
 
  val articlesRDD = crawledRDD.filter{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     content != null
   }
  }

  val wordsInDoc = articlesRDD.map{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
     else Seq("")
   }
  }
  
  val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
  
  val word2vec = new Word2Vec()
  val model = word2vec.fit(fitleredWordsInDoc)
  
  //---------------------------------------重點看這里-------------------------------------------------------------
  //將上面的模型存儲到hdfs
  val hadoopConf = sc.hadoopConfiguration
  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
  val fileSystem = FileSystem.get(hadoopConf)
  val path = new Path("/user/hadoop/data/mllib/word2vec-object")
  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
  oos.writeObject(model)
  oos.close
  
  //這里示例另外一個程序直接從hdfs讀取序列化對象使用模型
  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  
  /*
  * //你還可以將序列化文件從hdfs放到本地, scala程序使用模型
  * import java.io._
  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  * ois.close
  */
  //--------------------------------------------------------------------------------------------------------------
 }
}


感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!

相關(guān)文章

  • 基于Java實現(xiàn)掃碼登錄的示例代碼

    基于Java實現(xiàn)掃碼登錄的示例代碼

    相信大家對二維碼都不陌生,生活中到處充斥著掃碼登錄的場景,如登錄網(wǎng)頁版微信、支付寶等。本文將利用Java實現(xiàn)一個簡易版掃碼登錄的 Demo,需要的可以參考一下
    2022-04-04
  • 不同方式遍歷Map集合(全)

    不同方式遍歷Map集合(全)

    大家都知道Map是一種以鍵值對的形式存在的集合,其中每個鍵映射到一個值,下面把Map遍歷集合總結(jié)了一下給大家分享下,需要的朋友可以參考下
    2015-07-07
  • 一文教你如何使用原生的Feign

    一文教你如何使用原生的Feign

    Feign使得 Java HTTP 客戶端編寫更方便,Feign 靈感來源于Retrofit、JAXRS-2.0和WebSocket,這篇文章主要給大家介紹了如何使用原生的Feign的相關(guān)資料,需要的朋友可以參考下
    2021-10-10
  • SpringBoot設(shè)置編碼UTF-8的兩種方法

    SpringBoot設(shè)置編碼UTF-8的兩種方法

    本文通過兩種方式給大家介紹SpringBoot 設(shè)置編碼UTF-8 ,每種方式通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-11-11
  • Mybatis-Plus多表關(guān)聯(lián)查詢的使用案例解析

    Mybatis-Plus多表關(guān)聯(lián)查詢的使用案例解析

    這篇文章主要介紹了Mybatis-Plus多表關(guān)聯(lián)查詢的使用,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-05-05
  • Java消息隊列RabbitMQ之消息回調(diào)詳解

    Java消息隊列RabbitMQ之消息回調(diào)詳解

    這篇文章主要介紹了Java消息隊列RabbitMQ之消息回調(diào)詳解,消息回調(diào),其實就是消息確認(rèn)(生產(chǎn)者推送消息成功,消費者接收消息成功)  , 對于程序來說,發(fā)送者沒法確認(rèn)是否發(fā)送成功,需要的朋友可以參考下
    2023-07-07
  • Spring?Boot?整合持久層之MyBatis

    Spring?Boot?整合持久層之MyBatis

    在實際開發(fā)中不僅僅是要展示數(shù)據(jù),還要構(gòu)成數(shù)據(jù)模型添加數(shù)據(jù),這篇文章主要介紹了SpringBoot集成Mybatis操作數(shù)據(jù)庫,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-08-08
  • 最新評論