欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SparkStreaming-Kafka通過指定偏移量獲取數(shù)據(jù)實現(xiàn)

 更新時間:2023年06月20日 11:55:04   作者:spark打醬油  
這篇文章主要為大家介紹了SparkStreaming-Kafka通過指定偏移量獲取數(shù)據(jù),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

SparkStreaming-Kafka通過指定偏移量獲取數(shù)據(jù)

1.數(shù)據(jù)源

'310999003001', '3109990030010220140820141230292','00000000','','2017-08-20 14:09:35','0',255,'SN', 0.00,'4','','310999','310999003001','02','','','2','','','2017-08-20 14:12:30','2017-08-20 14:16:13',0,0,'2017-08-21 18:50:05','','',' '
'310999003102', '3109990031020220140820141230266','粵BT96V3','','2017-08-20 14:09:35','0',21,'NS', 0.00,'2','','310999','310999003102','02','','','2','','','2017-08-20 14:12:30','2017-08-20 14:16:13',0,0,'2017-08-21 18:50:05','','',' '

2.生產者

import java.util.Properties
import com.google.gson.{Gson, GsonBuilder}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Date 2022/11/8 9:49
  */
object KafkaEventProducer {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("KafkaEventProducer").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val topic = "ly_test"
    val props = new Properties()
    props.put("bootstrap.servers","node01:9092,node02:9092,node03:9092")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("acks","all")
//    props.put("security.protocol","SASL_PLAINTEXT")
//    props.put("sasl.mechanism","PLAIN")
//    props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='sjf' password='123sjf';")
    val kafkaProducer = new KafkaProducer[String,String](props)
    val srcRDD: RDD[String] = sc.textFile("file:///F:\\work\\sun\\lywork\\hbaseoper\\datas\\kafkaproducerdata.txt")
    val records: Array[Array[String]] = srcRDD.filter(!_.startsWith(";")).map(_.split(",")).collect()
    //對數(shù)據(jù)進行預處理形成json形式
    for(record<-records){
      val trafficInfo = new TrafficInfo(record(0),record(2),record(4),record(6),record(13))
      // 不能用new Gson()   會出現(xiàn) \u0027
      // val trafficInfoJson: String = new Gson().toJson(trafficInfo)
      //使用Gson gson = new Gson(),進行對象轉化json格式時,單引號會被轉換成u0027代碼。使用以下方法進行替換
      val gson: Gson = new GsonBuilder().disableHtmlEscaping().create()
      val trafficInfoJson: String = gson.toJson(trafficInfo)
      kafkaProducer.send(new ProducerRecord[String,String](topic,trafficInfoJson))
      println("Message Sent:"+trafficInfoJson)
      Thread.sleep(2000)
    }
    sc.stop()
    kafkaProducer.flush()
    kafkaProducer.close()
  }
  //相機編號
  //車牌號
  //時間
  //速度
  //車道編號
  case class TrafficInfo(camer_id:String,car_id:String,event_time:String,car_speed:String,car_code:String)
}

3.消費者獲取指定偏移量

import java.text.SimpleDateFormat
import java.util
import java.util.Date
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Date 2022/11/5 16:38
  */
/**
  * 通過偏移量獲取數(shù)據(jù)
  */
object AttainDataFromOffset {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("AttainDataFromOffset").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val ssc = new StreamingContext(sc,Seconds(5))
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "ly",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
      // kafka 帶有賬號密碼sasl協(xié)議的認證
//      "security.protocol" -> "SASL_PLAINTEXT",
//      "sasl.mechanism" -> "PLAIN",
//      "sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username='sjf' password='123sjf';"
    )
    val topics = Array("ly_test")
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](topics, kafkaParams)
    )
    val res: DStream[(String, String, Int, Long)] = stream.map(recoed => {
      val key: String = recoed.key()
      val value: String = recoed.value()
      val partionId: Int = recoed.partition()
      val offset: Long = recoed.offset()
      (key, value, partionId, offset)
      //println(key+"\t"+value+"\t"+partionId+"\t"+offset)
    })
    // 指定偏移量
    val offsetRanges = Array(
      // topic, partition, inclusive starting offset, exclusive ending offset
      OffsetRange("lawyee_test", 0, 1L, 10L)
    )
    // 獲取指定偏移量的數(shù)據(jù)
    import scala.collection.JavaConverters._
    val jkafkaParams: util.Map[String, Object] = kafkaParams.asJava
    val offsetRDD: RDD[ConsumerRecord[String, String]] = KafkaUtils.createRDD[String,String](
      sc,
      jkafkaParams,
      offsetRanges,
      LocationStrategies.PreferConsistent
    )
    val resRDD: RDD[(String, String, Int, Long,String,TimestampType)] = offsetRDD.map(recoed => {
      val key: String = recoed.key()
      val value: String = recoed.value()
      val partionId: Int = recoed.partition()
      val offset: Long = recoed.offset()
      var time: Long = recoed.timestamp()
      val timeStr = timeStampToDate(time)
      val timestampType: TimestampType = recoed.timestampType()
      (key, value, partionId, offset,timeStr,timestampType)
      //println(key+"\t"+value+"\t"+partionId+"\t"+offset)
    })
    resRDD.foreach(println(_))
    res.print()
    ssc.start()
    ssc.awaitTermination()
  }
  // 時間格式時間 轉換為字符串時間
  def dateToString(date:Date): String ={
    val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val strDate: String = simpleDateFormat.format(date)
    strDate
  }
  // 字符串時間轉換為時間格式時間
  def strToDate(str:String):Date = {
    val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val date: Date = simpleDateFormat.parse(str)
    date
  }
  // 時間戳轉化為字符串時間
  def timeStampToDate(timeStamp:Long): String ={
    val date = new Date(timeStamp)
    val strDate: String = dateToString(date)
    strDate
  }
  //字符串時間轉化為時間戳
  def dateToTimeStamp(strDate:String): Long ={
    val date: Date = strToDate(strDate)
    val timeStamp: Long = date.getTime
    timeStamp
  }
}

以上就是SparkStreaming-Kafka通過指定偏移量獲取數(shù)據(jù)的詳細內容,更多關于SparkStreaming Kafka獲取數(shù)據(jù)的資料請關注腳本之家其它相關文章!

相關文章

  • springboot cloud使用eureka整合分布式事務組件Seata 的方法

    springboot cloud使用eureka整合分布式事務組件Seata 的方法

    這篇文章主要介紹了springboot cloud使用eureka整合分布式事務組件Seata 的方法 ,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-05-05
  • 解決JDK版本沖突顯示問題(雙版本沖突)

    解決JDK版本沖突顯示問題(雙版本沖突)

    這篇文章主要介紹了解決JDK版本沖突顯示問題(雙版本沖突),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • mybatis-plus(insertBatchSomeColumn批量添加方式)

    mybatis-plus(insertBatchSomeColumn批量添加方式)

    這篇文章主要介紹了mybatis-plus(insertBatchSomeColumn批量添加方式),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • 解決SpringMVC、tomcat、Intellij idea、ajax中文亂碼問題

    解決SpringMVC、tomcat、Intellij idea、ajax中文亂碼問題

    這篇文章主要介紹了解決SpringMVC、tomcat、Intellij idea、ajax中文亂碼問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • 使用Java生成32位16進制密鑰的代碼實現(xiàn)

    使用Java生成32位16進制密鑰的代碼實現(xiàn)

    在許多加密和安全應用中,生成隨機的密鑰是至關重要的一步,密鑰通常以16進制形式表示,并且具有特定的長度,在這篇博客中,我們將探討如何使用Java生成一個32位長度的16進制密鑰,并展示詳細的代碼示例和運行結果,需要的朋友可以參考下
    2024-08-08
  • Java調用第三方接口示范的實現(xiàn)

    Java調用第三方接口示范的實現(xiàn)

    這篇文章主要介紹了Java調用第三方接口示范的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-09-09
  • Java等待喚醒機制原理實例解析

    Java等待喚醒機制原理實例解析

    這篇文章主要介紹了Java等待喚醒機制原理實例解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-01-01
  • 詳解Java 包掃描實現(xiàn)和應用(Jar篇)

    詳解Java 包掃描實現(xiàn)和應用(Jar篇)

    這篇文章主要介紹了詳解Java 包掃描實現(xiàn)和應用(Jar篇),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-07-07
  • Java中使用ZXing和QRCode生成二維碼的示例詳解

    Java中使用ZXing和QRCode生成二維碼的示例詳解

    生成二維碼在Java中有多種方法,常用的是通過第三方庫來實現(xiàn),比較流行的庫包括?ZXing?(Zebra?Crossing)?和?QRCode,本文小編就給大家介紹了Java中使用ZXing和QRCode生成二維碼的示例,需要的朋友可以參考下
    2024-09-09
  • MyBatis的注解使用、ORM層優(yōu)化方式(懶加載和緩存)

    MyBatis的注解使用、ORM層優(yōu)化方式(懶加載和緩存)

    這篇文章主要介紹了MyBatis的注解使用、ORM層優(yōu)化方式(懶加載和緩存),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10

最新評論