Flink?側(cè)流輸出源碼示例解析
Flink 側(cè)流輸出源碼解析
Flink 的 side output 為我們提供了側(cè)流(分流)輸出的功能,根據(jù)條件可以把一條流分為多個不同的流,之后做不同的處理邏輯,下面就來看下側(cè)流輸出相關(guān)的源碼。
先來看下面的一個 Demo,一個流被分成了 3 個流,一個主流,兩個側(cè)流輸出。
SingleOutputStreamOperator<JasonLeePOJO> process = kafka_source1.process( new ProcessFunction<JasonLeePOJO, JasonLeePOJO>() { @Override public void processElement( JasonLeePOJO value, ProcessFunction<JasonLeePOJO, JasonLeePOJO>.Context ctx, Collector<JasonLeePOJO> out) throws Exception { // 這個是主流輸出 if (value.getName().equals("flink")) { out.collect(value); // 下面兩個是測流輸出 } else if (value.getName().equals("spark")) { ctx.output(test, value); // 測流 } else if (value.getName().equals("hadoop")) { ctx.output(test1, value); } } });
為了更加清楚的查看每一個算子,我禁用了 operator chain,任務(wù)的 DAG 圖如下所示:
這樣就比較清晰了,很明顯從 process 算子開始,1 個數(shù)據(jù)流分為了 3 個數(shù)據(jù)流,當然,在默認情況下沒有禁止
operator chain 所有的算子都是 chain 在一起的。
源碼解析
我們先來看第一個主流輸出也就是 out.collect(value) 的源碼,這里的 out 實際上是 TimestampedCollector 對象。
TimestampedCollector#collect
@Override public void collect(T record) { output.collect(reuse.replace(record)); }
在 collect 方法中持有一個 output 對象,用來輸出數(shù)據(jù),在這里實際上是一個 CountingOutput 它是一個包裝了 Output 的對象,主要用于更新發(fā)送數(shù)據(jù)的 metric,并輸出數(shù)據(jù)。
CountingOutput#collect
@Override public void collect(StreamRecord<OUT> record) { numRecordsOut.inc(); output.collect(record); }
在 CountingOutput 中也持有一個 output 對象,但是這里的 output 是 BroadcastingOutputCollector 對象,從名字就可以看出它是往下游廣播數(shù)據(jù)的,這里就有一個疑問?把數(shù)據(jù)廣播到下游,那豈不是下游的每個數(shù)據(jù)流都有這條數(shù)據(jù)嗎?這樣的話是怎么實現(xiàn)分流的呢?帶著這個疑問,我們來看 BroadcastingOutputCollector 的 collect 方法是怎么實現(xiàn)的。
BroadcastingOutputCollector#collect
@Override public void collect(StreamRecord<T> record) { // 這里的 outputs 數(shù)組有三個 output 分別對應(yīng)上面的三個輸出流 for (Output<StreamRecord<T>> output : outputs) { output.collect(record); } }
在 BroadcastingOutputCollector 對象里也持有一個 output 對象,其實他們都實現(xiàn)了 Output 接口,用來往下游發(fā)送數(shù)據(jù),這里的 outputs 是一個 Output 數(shù)組,代表了下游的所有 Output,因為上面有三個輸出流,所以數(shù)組里面就包含了 3 個 Output 對象。
循環(huán)的調(diào)用 output 的 collect 方法往下游發(fā)送數(shù)據(jù),因為我打斷了 operator chain,所以 process 算子和下游的 Print 算子不在同一個 operatorChain 內(nèi),那么上下游算子之間數(shù)據(jù)傳輸用的就是 RecordWriterOutput,否則用的是 CopyingChainingOutput 或者 ChainingOutput,具體使用的是哪個 Output 這里就不多介紹了,后面有時間的話會單獨介紹。
RecordWriterOutput#collect
@Override public void collect(StreamRecord<OUT> record) { // 主流是沒有 outputTag 的,只有測流有 outputTag if (this.outputTag != null) { // we are not responsible for emitting to the main output. return; } pushToRecordWriter(record); }
接著來看 RecordWriterOutput 的 collect 方法,在 collect 方法里面會先判斷 outputTag 是否為空,如果不為空不做任何處理,直接返回,否則就把數(shù)據(jù)推送到下游算子,只有側(cè)流輸出才需要定義 outputTag,主流(正常流)是沒有 outputTag 的,所以這里會走 pushToRecordWriter 方法把數(shù)據(jù)寫入到下游,也就是說雖然會以廣播的形式把數(shù)據(jù)廣播到所有下游,但其實另外兩個側(cè)流是直接返回的,只有主流才會把數(shù)據(jù)推送到下游,這也就解釋了上面的疑問。
然后再來看第二個側(cè)流輸出 ctx.output(test, value) 的源碼,這里的 ctx 實際上是 ProcessOperator#ContextImpl 對象。
ProcessOperator#ContextImpl#output
@Override public <X> void output(OutputTag<X> outputTag, X value) { if (outputTag == null) { throw new IllegalArgumentException("OutputTag must not be null."); } output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp())); }
如果 outputTag 是空,直接拋出異常,因為這個是側(cè)流,所以必須要定義 OutputTag。這里的 output 實際上是父類 AbstractStreamOperator 所持有的變量,如果 outputTag 不為空,就調(diào)用 output 的 collect 方法把數(shù)據(jù)發(fā)送到下游,這里的 output 和上面的一樣是 CountingOutput 但是 collect 方法是另外一個重載的方法。
CountingOutput#collect
@Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { numRecordsOut.inc(); output.collect(outputTag, record); }
可以發(fā)現(xiàn),這個 collect 方法比上面那個多了一個 OutputTag 參數(shù),也就是使用側(cè)流輸出的時候定義的 OutputTag 對象,然后調(diào)用 output 的 collect 方法發(fā)送數(shù)據(jù),這個也和上面的一樣,同樣是 BroadcastingOutputCollector 對象的另外一個重載方法,多了一個 OutputTag 參數(shù)。
BroadcastingOutputCollector#collect
@Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { for (Output<StreamRecord<T>> output : outputs) { output.collect(outputTag, record); } }
這里的邏輯和上面是一樣的,同樣的循環(huán)調(diào)用 collect 方法發(fā)送數(shù)據(jù)。
RecordWriterOutput#collect
@Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { // 先要判斷兩個 OutputTag 是否一樣 if (OutputTag.isResponsibleFor(this.outputTag, outputTag)) { pushToRecordWriter(record); } }
在這個 collect 方法中會先判斷傳入的 OutputTag 對象和成員變量 this.outputTag 是不是相等,如果是的話,就發(fā)送數(shù)據(jù),否則不做任何處理,所以這里每次只會選擇一個下游側(cè)流輸出數(shù)據(jù),這樣就實現(xiàn)了所謂的分流。
OutputTag#isResponsibleFor
public static boolean isResponsibleFor( @Nullable OutputTag<?> owner, @Nonnull OutputTag<?> other) { return other.equals(owner); }
可以看到在 isResponsibleFor 方法內(nèi)是直接調(diào)用 OutputTag 的 equals 方法判斷兩個對象是否相等的。
第三個側(cè)流 test1 ctx.output(test1, value) 和第二個側(cè)流 test 是完全一樣的情況,這里就不在看代碼了。
上面是完成了分流操作,那怎么獲取到分流后結(jié)果呢(數(shù)據(jù)流)?我們可以通過 getSideOutput 方法獲取。
DataStream<JasonLeePOJO> sideOutput = process.getSideOutput(test); DataStream<JasonLeePOJO> sideOutput1 = process.getSideOutput(test1);
getSideOutput 源碼
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) { sideOutputTag = clean(requireNonNull(sideOutputTag)); // make a defensive copy sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo()); TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag); if (type != null && !type.equals(sideOutputTag.getTypeInfo())) { throw new UnsupportedOperationException( "A side output with a matching id was " + "already requested with a different type. This is not allowed, side output " + "ids need to be unique."); } requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo()); SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag); return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation); }
getSideOutput 方法里先是構(gòu)建了一個 SideOutputTransformation 對象,然后又構(gòu)建了 DataStream 對象,這樣我們就可以基于分流后的 DataStream 做不同的處理邏輯了,從而實現(xiàn)了把一個 DataStream 分流成多個 DataStream 功能。
總結(jié)
通過對側(cè)流輸出的源碼進行解析,在分流的時候,數(shù)據(jù)是通過廣播的方式發(fā)送到下游算子的,對于主流的數(shù)據(jù)來說,只有 OutputTag 為空的才會處理,側(cè)流因為 OutputTag 不為空,所以直接返回,不做任何處理,那對于側(cè)流的數(shù)據(jù)來說,是通過判斷兩個 OutputTag 是否相等,所以每次只會把數(shù)據(jù)發(fā)送到下游對應(yīng)的那一個側(cè)流上去,這樣即可實現(xiàn)分流邏輯。
以上就是Flink 側(cè)流輸出源碼示例解析的詳細內(nèi)容,更多關(guān)于Flink 側(cè)流輸出的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
服務(wù)器安裝Macfee(麥咖啡)殺毒軟件后可能出現(xiàn)的問題
這篇文章主要介紹了服務(wù)器安裝Macfee(麥咖啡)殺毒軟件后可能出現(xiàn)的問題,需要的朋友可以參考下2015-10-10網(wǎng)站壓力測試工具-ab工具apache?bench使用過程
apache?bench是apache自帶的壓力測試工具。ab不僅可以對apache服務(wù)器進行網(wǎng)站訪問壓力測試,也可以對或其它類型的服務(wù)器進行壓力測試。ab工具上手學習較快,可以提供需要的基本性能指標,但沒有圖形化結(jié)果,不能監(jiān)控。因此可以用作臨時緊急任務(wù)和簡單測試。2022-11-11關(guān)于HTTPS端口443的技術(shù)講解(什么是443端口)
本文將重點介紹HTTPS 443端口,它是如何工作的,它保護什么,以及為什么我們需要它,需要的朋友可以參考下2022-10-10ROS參數(shù)服務(wù)器中的理論模型與參數(shù)操作(C++)
在C++中實現(xiàn)參數(shù)服務(wù)器數(shù)據(jù)的增刪改查,均可以通過兩套API實現(xiàn)分別是ros::NodeHandle和ros::param,這篇文章主要介紹了ROS參數(shù)服務(wù)器--理論模型與參數(shù)操作(C++),需要的朋友可以參考下2023-08-08Elasticsearch6.2服務(wù)器升配后的bug(避坑指南)
這篇文章主要介紹了Elasticsearch6.2服務(wù)器升配后的bug問題及解決方法,可以幫助有其他人避坑,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-09-09vscode設(shè)置免密登錄遠程服務(wù)器的解決方案
當我們使用vscode的ssh連接遠程服務(wù)器后,過一段時間后,總是要求登錄服務(wù)器的密碼,遇到這樣的問題如何解決呢,下面給大家分享vscode設(shè)置免密登錄遠程服務(wù)器的解決方案,感興趣的朋友跟隨小編一起看看吧2024-05-05解決JMail無法安裝的方法(帳戶名與安全標識間無任何映射完成)
今日在遠程服務(wù)器上安裝Jmail,結(jié)果提示錯誤“帳戶名與安全標識間無任何映射完成”2010-04-04