flink?RichFunction之坑及解決
flink RichFunction之坑
flink的RichMapFunction,RichSinkFunction等,并不能百分百做到每次只open一個(gè)數(shù)據(jù)庫連接。
在有些情況下他會(huì)一直創(chuàng)建然后銷毀,創(chuàng)建銷毀。
舉例: 重點(diǎn)在第三行的注釋
val value = env.socketTextStream("192.168.13.11", 9090) val value2 = value.filter(x => { try { var a = 1 / 0 //此處若沒有異常處理,任務(wù)不會(huì)斷,但是會(huì)重復(fù)打開數(shù)據(jù)庫連接 } catch { case e: Exception => } isInter(x) }).map(fun = x => { x.toLong }) val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) { override def extractTimestamp(element: Long): Long = { println(element + "***************") element } }) try { var a = 1 / 0 } catch { case e: Exception => } value1.map(new mymap) env.execute("test") } def isInter(input: String): Boolean = { val matcher = Pattern.compile("^[0-9]+$").matcher(input) matcher.find() } } class myRichMapfun6() extends RichMapFunction[ListBuffer[String], Unit] { var conn: Connection = _ var pst: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://xxxxxxx:3306/zzt?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true", "root", "bigdata@mysql") println(conn) pst = conn.prepareStatement("insert into testa (str) values (?)") } override def close(): Unit = { conn.close() pst.close() } override def map(in: ListBuffer[String]): Unit = { pst.setString(1, in.head) pst.execute() } }
所以你是不是覺得那就價(jià)格異常處理不就得了?
NO
再看:
這個(gè)時(shí)候,如果傳進(jìn)來line不是數(shù)字或者格式不對(duì),就會(huì)觸發(fā)異常,然而此時(shí)就不會(huì)像上面那樣幫你解決問題,而是一遍遍創(chuàng)建對(duì)象銷毀對(duì)象,一條消息創(chuàng)建一個(gè)連接,我就問你慌不慌,
原因
據(jù)觀察是因?yàn)?,輸入的?shù)據(jù)有問題,直接導(dǎo)致
val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) { override def extractTimestamp(element: Long): Long = { println(element + "***************") element } })
這個(gè)崩潰了,不走這行代碼了,沒有獲得eventime,然后估計(jì)。。。 剩下的我也沒詳細(xì)測。。。
解決方案
先fiiter過濾任何可能導(dǎo)致異常的臟數(shù)據(jù)確保數(shù)據(jù)都沒問題就可以了。
flink中RichFunction的一點(diǎn)小作用
①傳遞參數(shù)
所有需要用戶定義的函數(shù)都可以轉(zhuǎn)換成richfunction,例如實(shí)現(xiàn)map operator中你需要實(shí)現(xiàn)一個(gè)內(nèi)部類,并實(shí)現(xiàn)它的map方法:
data.map (new MapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });
然后我們可以將其轉(zhuǎn)換為RichMapFunction:
data.map (new RichMapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });
當(dāng)然,RichFuction除了提供原來MapFuction的方法之外,還提供open, close, getRuntimeContext 和setRuntimeContext方法,這些功能可用于參數(shù)化函數(shù)(傳遞參數(shù)),創(chuàng)建和完成本地狀態(tài),訪問廣播變量以及訪問運(yùn)行時(shí)信息以及有關(guān)迭代中的信息。
下面我們來看看RichFuction中傳遞參數(shù)的例子,以下代碼是測試RichFilterFuction的例子,基于DataSet而非DataStream。
由代碼可見,可以將Configuration中的limit參數(shù)的值傳遞進(jìn)RichFuction里面,通過后面withParameters方法傳遞進(jìn)去,最后的結(jié)果是
由此可見,我從configuration中獲取了limit的值,并設(shè)定了fliter的閾值是2,從而過濾了1,2。
②傳遞廣播變量
原理和上面差不多,下面我直接把代碼貼出來:
這是目前我學(xué)習(xí)到的RichFunction的用法,和大家分享一下。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
解決nacos修改配置信息后需要重啟服務(wù)才能生效的問題
當(dāng)配置信息發(fā)生變動(dòng)時(shí),傳統(tǒng)修改配置信息后,需要重新重啟服務(wù)器才可以生效,大量應(yīng)用配置修改時(shí),需要一個(gè)個(gè)修改配置,無法統(tǒng)一修改,且沒有辦法回溯配置版本,所以本文給大家介紹了如何解決這些問題的方法,需要的朋友可以參考下2023-10-10springboot logback如何從apollo配置中心讀取變量
這篇文章主要介紹了springboot logback如何從apollo配置中心讀取變量的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Spring?Cloud?使用?Resilience4j?實(shí)現(xiàn)服務(wù)熔斷的方法
服務(wù)熔斷是為了保護(hù)我們的服務(wù),比如當(dāng)某個(gè)服務(wù)出現(xiàn)問題的時(shí)候,控制打向它的流量,讓它有時(shí)間去恢復(fù),或者限制一段時(shí)間只能有固定數(shù)量的請求打向這個(gè)服務(wù),這篇文章主要介紹了Spring?Cloud?使用?Resilience4j?實(shí)現(xiàn)服務(wù)熔斷,需要的朋友可以參考下2022-12-12JAVA JNI原理詳細(xì)介紹及簡單實(shí)例代碼
這篇文章主要介紹了JAVA JNI原理的相關(guān)資料,這里提供簡單實(shí)例代碼,需要的朋友可以參考下2016-12-12使用Mybatis Generator結(jié)合Ant腳本快速自動(dòng)生成Model、Mapper等文件的方法
這篇文章主要介紹了使用Mybatis Generator結(jié)合Ant腳本快速自動(dòng)生成Model、Mapper等文件的方法的相關(guān)資料,需要的朋友可以參考下2016-06-06Java動(dòng)態(tài)數(shù)組Arraylist存放自定義數(shù)據(jù)類型方式
這篇文章主要介紹了Java動(dòng)態(tài)數(shù)組Arraylist存放自定義數(shù)據(jù)類型方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10intellij idea修改maven配置時(shí)總是恢復(fù)默認(rèn)配置的解決方法idea版本(2020.2.x)
這篇文章主要介紹了intellij idea修改maven配置時(shí)總是恢復(fù)默認(rèn)配置的解決方法idea版本(2020.2.x),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08Java如何基于IO流實(shí)現(xiàn)同一文件讀寫操作
這篇文章主要介紹了Java如何基于IO流實(shí)現(xiàn)文件讀寫操作,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10Hibernate Validation自定義注解校驗(yàn)的實(shí)現(xiàn)
這篇文章主要介紹了Hibernate Validation自定義注解校驗(yàn)的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04