Java lambda表達(dá)式實(shí)現(xiàn)Flink WordCount過(guò)程解析
這篇文章主要介紹了Java lambda表達(dá)式實(shí)現(xiàn)Flink WordCount過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
本篇我們將使用Java語(yǔ)言來(lái)實(shí)現(xiàn)Flink的單詞統(tǒng)計(jì)。
代碼開(kāi)發(fā)
環(huán)境準(zhǔn)備
導(dǎo)入Flink 1.9 pom依賴(lài)
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.7</version> </dependency> </dependencies>
構(gòu)建Flink流處理環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
自定義source
每秒生成一行文本
DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanal = false; private String[] words = { "important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ", "downloading and using this product an faq is available here ", "commercial license and support is available with a low cost java se subscription", "oracle also provides the latest openjdk release under the open source gpl license at jdk java net" }; @Override public void run(SourceContext<String> ctx) throws Exception { // 每秒發(fā)送一行文本 while (!isCanal) { int randomIndex = RandomUtils.nextInt(0, words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override public void cancel() { isCanal = true; } });
單詞計(jì)算
// 3. 單詞統(tǒng)計(jì) // 3.1 將文本行切分成一個(gè)個(gè)的單詞 SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分單詞 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING); //3.2 將單詞轉(zhuǎn)換為一個(gè)個(gè)的元組 SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // 3.3 按照單詞進(jìn)行分組 KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0); // 3.4 對(duì)每組單詞數(shù)量進(jìn)行累加 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS .timeWindow(Time.seconds(3)) .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1)); resultDS.print();
參考代碼
public class WordCount { public static void main(String[] args) throws Exception { // 1. 構(gòu)建Flink流式初始化環(huán)境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 自定義source - 每秒發(fā)送一行文本 DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanal = false; private String[] words = { "important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ", "downloading and using this product an faq is available here ", "commercial license and support is available with a low cost java se subscription", "oracle also provides the latest openjdk release under the open source gpl license at jdk java net" }; @Override public void run(SourceContext<String> ctx) throws Exception { // 每秒發(fā)送一行文本 while (!isCanal) { int randomIndex = RandomUtils.nextInt(0, words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override public void cancel() { isCanal = true; } }); // 3. 單詞統(tǒng)計(jì) // 3.1 將文本行切分成一個(gè)個(gè)的單詞 SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分單詞 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING); //3.2 將單詞轉(zhuǎn)換為一個(gè)個(gè)的元組 SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // 3.3 按照單詞進(jìn)行分組 KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0); // 3.4 對(duì)每組單詞數(shù)量進(jìn)行累加 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS .timeWindow(Time.seconds(3)) .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1)); resultDS.print(); env.execute("app"); } }
Flink對(duì)Java Lambda表達(dá)式支持情況
Flink支持Java API所有操作符使用Lambda表達(dá)式。但是,但Lambda表達(dá)式使用Java泛型時(shí),就需要聲明類(lèi)型信息。
我們來(lái)看下上述的這段代碼:
SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分單詞 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING);
之所以這里將所有的類(lèi)型信息,因?yàn)镕link無(wú)法正確自動(dòng)推斷出來(lái)Collector中帶的泛型。我們來(lái)看一下FlatMapFuntion的源代碼
@Public @FunctionalInterface public interface FlatMapFunction<T, O> extends Function, Serializable { /** * The core method of the FlatMapFunction. Takes an element from the input data set and transforms * it into zero, one, or more elements. * * @param value The input value. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ void flatMap(T value, Collector<O> out) throws Exception; }
我們發(fā)現(xiàn) flatMap的第二個(gè)參數(shù)是Collector<O>,是一個(gè)帶參數(shù)的泛型。Java編譯器編譯該代碼時(shí)會(huì)進(jìn)行參數(shù)類(lèi)型擦除,所以Java編譯器會(huì)變成成:
void flatMap(T value, Collector out)
這種情況,F(xiàn)link將無(wú)法自動(dòng)推斷類(lèi)型信息。如果我們沒(méi)有顯示地提供類(lèi)型信息,將會(huì)出現(xiàn)以下錯(cuò)誤:
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
這種情況下,必須要顯示指定類(lèi)型信息,否則輸出將返回值視為Object類(lèi)型,這將導(dǎo)致Flink無(wú)法正確序列化。
所以,我們需要顯示地指定Lambda表達(dá)式的參數(shù)類(lèi)型信息,并通過(guò)returns方法顯示指定輸出的類(lèi)型信息
我們?cè)倏匆欢未a:
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT));
為什么map后面也需要指定類(lèi)型呢?
因?yàn)榇颂巑ap返回的是Tuple2類(lèi)型,Tuple2是帶有泛型參數(shù),在編譯的時(shí)候同樣會(huì)被查出泛型參數(shù)信息,導(dǎo)致Flink無(wú)法正確推斷。
更多關(guān)于對(duì)Java Lambda表達(dá)式的支持請(qǐng)參考官網(wǎng):https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
java獲取和設(shè)置系統(tǒng)變量問(wèn)題(環(huán)境變量)
這篇文章主要介紹了java獲取和設(shè)置系統(tǒng)變量問(wèn)題(環(huán)境變量),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-01-01不寫(xiě)mybatis的@Param有的報(bào)錯(cuò)有的卻不報(bào)錯(cuò)問(wèn)題分析
這篇文章主要為大家介紹了不寫(xiě)mybatis的@Param有的報(bào)錯(cuò)有的卻不報(bào)錯(cuò)問(wèn)題分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09Java泛型extends關(guān)鍵字設(shè)置邊界的實(shí)現(xiàn)
這篇文章主要介紹了Java泛型extends關(guān)鍵字設(shè)置邊界的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09JavaSwing基礎(chǔ)之Layout布局相關(guān)知識(shí)詳解
上次我們說(shuō)到View的Mearsure流程,今天接著說(shuō)說(shuō)layout. 關(guān)于layout,很多朋友知道它是負(fù)責(zé)布局的,那么具體是怎么布局的?viewGroup和view的layout方法又有什么不同?一起來(lái)看看吧,需要的朋友可以參考下2021-05-05SpringBoot配置文件高級(jí)用法實(shí)戰(zhàn)分享
Spring Boot配置文件的優(yōu)先級(jí)是一個(gè)重要的概念,它決定了當(dāng)存在多個(gè)配置文件時(shí),哪個(gè)配置文件中的配置將被優(yōu)先采用,本文給大家介紹了SpringBoot配置文件高級(jí)用法實(shí)戰(zhàn),文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-08-08