Java 數據流之Broadcast State
一、BroadcastState 的介紹
廣播狀態(tài)(Broadcast State)是 Operator State 的一種特殊類型。如果我們需要將配置 、規(guī)則等低吞吐事件流廣播到下游所有 Task 時,就可以使用 BroadcastState。下游的 Task 接收這些配置、規(guī)則并保存為 BroadcastState,所有Task 中的狀態(tài)保持一致,作用于另一個數據流的計算中。
簡單理解:一個低吞吐量流包含一組規(guī)則,我們想對來自另一個流的所有元素基于此規(guī)則進行評估。
場景:動態(tài)更新計算規(guī)則。
廣播狀態(tài)與其他操作符狀態(tài)的區(qū)別在于:
- 它有一個 map 格式,用于定義存儲結構
- 它僅對具有廣播流和非廣播流輸入的特定操作符可用
- 這樣的操作符可以具有不同名稱的多個廣播狀態(tài)
二、BroadcastState 操作流程
三、案例實現
- 從端口讀取Json數據作為事件流
- 從Mysql讀取數據作為廣播流
- 關聯廣播流和事件流
- 匹配對應的用戶信息
package cn.kgc.broadcast import java.sql.{Connection, DriverManager, PreparedStatement} import com.alibaba.fastjson.JSON import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector // (001,'tom',18,'北京',15830010002) // 定義樣例類 接受 MySQL的用戶數據 case class BaseUserInfo(id:Long,name:String,age:Int,city:String,phone:Long) // user_id、user_name、user_addrss、behaviour、url // 輸出數據類型 case class UserVisitInfo(id:Long,name:String,city:String,behaviour:String,url:String) // 實現廣播ProcessFunction class MyBroadcastFunc extends BroadcastProcessFunction[String,(Long, BaseUserInfo),UserVisitInfo]{ lazy val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo]) // 處理的是日志流中的每條數據 override def processElement(value: String, ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#ReadOnlyContext, out: Collector[UserVisitInfo]): Unit = { // {"user_id":"001","ts":"2021-07-10 11:10:05","behaviour":"browse","url":"https://www.tb1.com/1.html"} val user_id = JSON.parseObject(value).getLong("user_id") val behaviour = JSON.parseObject(value).getString("behaviour") val url = JSON.parseObject(value).getString("url") val mapState = ctx.getBroadcastState(mapStateDes) val userInfo = mapState.get(user_id) out.collect(UserVisitInfo(user_id,userInfo.name,userInfo.city,behaviour,url)) } // 處理的是廣播流的每個值 override def processBroadcastElement(value: (Long, BaseUserInfo), ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#Context, out: Collector[UserVisitInfo]): Unit = { val mapState: BroadcastState[Long, BaseUserInfo] = ctx.getBroadcastState(mapStateDes) mapState.put(value._1,value._2) } } class UserSourceFunc extends RichParallelSourceFunction[BaseUserInfo]{ var conn:Connection = _ var statement: PreparedStatement = _ var flag:Boolean = true override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC","root","liu911223") statement = conn.prepareStatement("select * from base_user") } override def run(ctx: SourceFunction.SourceContext[BaseUserInfo]): Unit = { while (flag){ Thread.sleep(5000) val resultSet = statement.executeQuery() while (resultSet.next()){ val id = resultSet.getLong(1) val name = resultSet.getString(2) val age = resultSet.getInt(3) val city = resultSet.getString(4) val phone = resultSet.getLong(5) ctx.collect(BaseUserInfo(id,name,age,city,phone)) } } } override def cancel(): Unit = { flag = false } override def close(): Unit = { if (statement != null) statement.close() if (conn != null) conn.close() } } object BroadcastDemo01 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 定義為KV,一方面是為了廣播的時候定義為map,另一方面是為了做關聯操作 val userBaseDS: DataStream[(Long, BaseUserInfo)] = env.addSource(new UserSourceFunc) .map(user => (user.id, user)) val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo]) val broadCastStream: BroadcastStream[(Long, BaseUserInfo)] = userBaseDS.broadcast(mapStateDes) // 日志JSON數據 val dataInfoDS: DataStream[String] = env.socketTextStream("master",1314) dataInfoDS.connect(broadCastStream) .process(new MyBroadcastFunc) .print() env.execute() } }
到此這篇關于Java 數據流之Broadcast State的文章就介紹到這了,更多相關Java Broadcast State內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringMvc/SpringBoot HTTP通信加解密的實現
這篇文章主要介紹了SpringMvc/SpringBoot HTTP通信加解密的實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-08-08Java8 將一個List<T>轉為Map<String,T>的操作
這篇文章主要介紹了Java8 將一個List<T>轉為Map<String, T>的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02Maven打包報錯:[WARNING] The POM for xxx 
本文主要介紹了Maven打包報錯:[WARNING] The POM for xxx is missing, no dependency inform,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-06-06