欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java 數據流之Broadcast State

 更新時間:2021年09月14日 10:44:56   作者:Vicky_Tang  
這篇文章主要介紹了Java 數據流之Broadcast State,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

一、BroadcastState 的介紹

廣播狀態(tài)(Broadcast State)是 Operator State 的一種特殊類型。如果我們需要將配置 、規(guī)則等低吞吐事件流廣播到下游所有 Task 時,就可以使用 BroadcastState。下游的 Task 接收這些配置、規(guī)則并保存為 BroadcastState,所有Task 中的狀態(tài)保持一致,作用于另一個數據流的計算中。
簡單理解:一個低吞吐量流包含一組規(guī)則,我們想對來自另一個流的所有元素基于此規(guī)則進行評估。
場景:動態(tài)更新計算規(guī)則。

廣播狀態(tài)與其他操作符狀態(tài)的區(qū)別在于:

  • 它有一個 map 格式,用于定義存儲結構
  • 它僅對具有廣播流和非廣播流輸入的特定操作符可用
  • 這樣的操作符可以具有不同名稱的多個廣播狀態(tài)

二、BroadcastState 操作流程

三、案例實現

  • 從端口讀取Json數據作為事件流
  • 從Mysql讀取數據作為廣播流
  • 關聯廣播流和事件流
  • 匹配對應的用戶信息
package cn.kgc.broadcast
 
import java.sql.{Connection, DriverManager, PreparedStatement}
 
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
 
// (001,'tom',18,'北京',15830010002)
// 定義樣例類 接受 MySQL的用戶數據
case class BaseUserInfo(id:Long,name:String,age:Int,city:String,phone:Long)
 
// user_id、user_name、user_addrss、behaviour、url
// 輸出數據類型
case class UserVisitInfo(id:Long,name:String,city:String,behaviour:String,url:String)
 
// 實現廣播ProcessFunction
class MyBroadcastFunc extends BroadcastProcessFunction[String,(Long, BaseUserInfo),UserVisitInfo]{
 
  lazy val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
 
  // 處理的是日志流中的每條數據
  override def processElement(value: String, ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#ReadOnlyContext, out: Collector[UserVisitInfo]): Unit = {
    // {"user_id":"001","ts":"2021-07-10 11:10:05","behaviour":"browse","url":"https://www.tb1.com/1.html"}
    val user_id = JSON.parseObject(value).getLong("user_id")
    val behaviour = JSON.parseObject(value).getString("behaviour")
    val url = JSON.parseObject(value).getString("url")
 
    val mapState = ctx.getBroadcastState(mapStateDes)
    val userInfo = mapState.get(user_id)
 
    out.collect(UserVisitInfo(user_id,userInfo.name,userInfo.city,behaviour,url))
 
  }
 
  // 處理的是廣播流的每個值
  override def processBroadcastElement(value: (Long, BaseUserInfo), ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#Context, out: Collector[UserVisitInfo]): Unit = {
    val mapState: BroadcastState[Long, BaseUserInfo] = ctx.getBroadcastState(mapStateDes)
    mapState.put(value._1,value._2)
  }
}
 
 
class UserSourceFunc extends RichParallelSourceFunction[BaseUserInfo]{
 
  var conn:Connection = _
  var statement: PreparedStatement = _
  var flag:Boolean = true
 
  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC","root","liu911223")
    statement = conn.prepareStatement("select * from base_user")
  }
 
  override def run(ctx: SourceFunction.SourceContext[BaseUserInfo]): Unit = {
    while (flag){
      Thread.sleep(5000)
      val resultSet = statement.executeQuery()
      while (resultSet.next()){
        val id = resultSet.getLong(1)
        val name = resultSet.getString(2)
        val age = resultSet.getInt(3)
        val city = resultSet.getString(4)
        val phone = resultSet.getLong(5)
        ctx.collect(BaseUserInfo(id,name,age,city,phone))
      }
    }
  }
 
  override def cancel(): Unit = {
    flag = false
  }
 
  override def close(): Unit = {
    if (statement != null) statement.close()
    if (conn != null) conn.close()
  }
}
object BroadcastDemo01 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
 
    // 定義為KV,一方面是為了廣播的時候定義為map,另一方面是為了做關聯操作
    val userBaseDS: DataStream[(Long, BaseUserInfo)] = env.addSource(new UserSourceFunc)
      .map(user => (user.id, user))
    val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
    val broadCastStream: BroadcastStream[(Long, BaseUserInfo)] = userBaseDS.broadcast(mapStateDes)
 
    // 日志JSON數據
    val dataInfoDS: DataStream[String] = env.socketTextStream("master",1314)
 
    dataInfoDS.connect(broadCastStream)
      .process(new MyBroadcastFunc)
      .print()
 
    env.execute()
  }
}

到此這篇關于Java 數據流之Broadcast State的文章就介紹到這了,更多相關Java Broadcast State內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java結構型模式之門面模式詳解

    Java結構型模式之門面模式詳解

    門面模式又叫外觀模式(Facade Pattern),主要用于隱藏系統(tǒng)的復雜性,并向客戶端提供了一個客戶端可以訪問系統(tǒng)的接口,本文通過實例代碼給大家介紹下java門面模式的相關知識,感興趣的朋友一起看看吧
    2023-02-02
  • SpringMvc/SpringBoot HTTP通信加解密的實現

    SpringMvc/SpringBoot HTTP通信加解密的實現

    這篇文章主要介紹了SpringMvc/SpringBoot HTTP通信加解密的實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-08-08
  • Java 8中default方法能做什么?不能做什么?

    Java 8中default方法能做什么?不能做什么?

    這篇文章主要給大家介紹了關于Java 8中default方法能做什么?不能做什么?文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧。
    2018-04-04
  • 使用Springboot實現OAuth服務的示例詳解

    使用Springboot實現OAuth服務的示例詳解

    OAuth(Open Authorization)是一個開放標準,用于授權第三方應用程序訪問用戶資源,而不需要共享用戶憑證。本文主要介紹了如何使用Springboot實現一個OAuth服務,需要的可以參考一下
    2023-05-05
  • Java8 將一個List<T>轉為Map<String,T>的操作

    Java8 將一個List<T>轉為Map<String,T>的操作

    這篇文章主要介紹了Java8 將一個List<T>轉為Map<String, T>的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • java輸入多個數據(不確定),排序,并求最大值的方法

    java輸入多個數據(不確定),排序,并求最大值的方法

    今天小編就為大家分享一篇java輸入多個數據(不確定),排序,并求最大值的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-07-07
  • JAVA抽象類及接口使用方法解析

    JAVA抽象類及接口使用方法解析

    這篇文章主要介紹了JAVA抽象類及接口使用方法解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-08-08
  • Java深入講解SPI的使用

    Java深入講解SPI的使用

    SPI英文全稱為Service Provider Interface,顧名思義,服務提供者接口,它是jdk提供給“服務提供廠商”或者“插件開發(fā)者”使用的接口
    2022-06-06
  • SpringBoot中使用@Value注解注入詳解

    SpringBoot中使用@Value注解注入詳解

    這篇文章主要介紹了SpringBoot中的@Value注入詳解,在SpringBoot中,@Value注解可以注入一些字段的普通屬性,并且會自動進行類型轉換,本文對這些類型進行總結,需要的朋友可以參考下
    2023-08-08
  • Maven打包報錯:[WARNING] The POM for xxx is missing, no dependency inform

    Maven打包報錯:[WARNING] The POM for xxx 

    本文主要介紹了Maven打包報錯:[WARNING] The POM for xxx is missing, no dependency inform,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-06-06

最新評論