windows環(huán)境下flink入門(mén)實(shí)踐操作示例

前言碎語(yǔ)
為了應(yīng)對(duì)凱京科技集團(tuán)的飛速發(fā)展,凱京科技研發(fā)中心2019定下了數(shù)據(jù)中臺(tái)的目標(biāo)。數(shù)據(jù)處理我們選擇了批處理+流處理結(jié)合的大數(shù)據(jù)應(yīng)用軟件新秀Apache Flink,前幾天阿里又發(fā)出好信息稱(chēng)將開(kāi)源Blink(Flink早期分支遷出迭代優(yōu)化),所以今天來(lái)近距離認(rèn)識(shí)下Flink。博主之前沒(méi)接觸過(guò)大數(shù)據(jù)相關(guān)的東西,所以不細(xì)究其設(shè)計(jì)概念了。目標(biāo)就是跑一個(gè)最簡(jiǎn)單的流處理的例子,后面慢慢深入后在和大家分享具體的組件概念以及api設(shè)計(jì)。
Apache Flink是什么?
Apache Flink 是一個(gè)分布式大數(shù)據(jù)處理引擎,可對(duì)有限數(shù)據(jù)流和無(wú)限數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算??刹渴鹪诟鞣N集群環(huán)境,對(duì)各種大小的數(shù)據(jù)規(guī)模進(jìn)行快速計(jì)算。上面是非常官方的描述,說(shuō)白了我們?yōu)槭裁催x擇Flink,是因?yàn)樗谏鐓^(qū)口碑非常不錯(cuò)。在國(guó)內(nèi)的話有阿里這種大數(shù)據(jù)大流量的公司一直在輸出,當(dāng)然像騰訊、華為、餓了么、滴滴等也都有使用Apache Flink。
進(jìn)入正題
本篇博文涉及到的軟件工具以及下載地址:
Apache Flink :https://flink.apache.org/downloads.html
Netcat:https://eternallybored.org/misc/netcat/
Netcat是一個(gè)有“瑞士軍刀”美譽(yù)的網(wǎng)絡(luò)工具,這里用來(lái)綁定端口等待Apache Flink的連接
第一步:?jiǎn)?dòng)FLINK
從上面的地址下載Flink后是一個(gè)壓縮包,解壓后的目錄結(jié)構(gòu)如下:
/conf/flink-conf.yaml里有一些Flink的基本配置信息,如,jobmanager、taskmanager的端口和jvm內(nèi)存(默認(rèn)1024M)大小,web控制臺(tái)的端口(默認(rèn)8081)等。我們可以不該任何配置,然后進(jìn)入到bin下,執(zhí)行start-cluster.bat。這里要注意不是并不是flink.bat。flink.bat是用來(lái)提交job的。還有要確保相關(guān)的端口沒(méi)有被占用
運(yùn)行成功后會(huì)有兩個(gè)java黑窗口(一個(gè)TaskManager、一個(gè)JobManager),如果只有一個(gè)java黑窗口,很可能是你的TaskManager因?yàn)槎丝谡加脹](méi)有啟動(dòng)起來(lái),成功后訪問(wèn):http://localhost:8081.就會(huì)看到如下的web管理控制臺(tái)了:
如果啟動(dòng)失敗的話,上面箭頭所指向的地方應(yīng)該是0.
第二步:JOB任務(wù)編寫(xiě)
1.首先需要新建一個(gè)maven工程,然后導(dǎo)入Flink的接口依賴(lài)
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.7.1</version> </dependency>
2.編寫(xiě)具體的job,官方提供了一個(gè)單詞統(tǒng)計(jì)的demo
package com.kl; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class SocketWindowWordCount { public static void main(String[] args) throws Exception { // the host and the port to connect to final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.has("port") ? params.getInt("port"):9000; } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname--port', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l' and " + "type the input text into the command line"); return; } // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStreamtext = env.socketTextStream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStreamwindowCounts = text .flatMap(new FlatMapFunction() { public void flatMap(String value, Collectorout) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } }}) .keyBy("word") .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction() { public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); }}); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } /** * Data type for words with count. */ public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }
上面demo實(shí)現(xiàn)了從啟動(dòng)參數(shù)中獲取ip和端口,然后連接從輸入流接收文本信息,然后統(tǒng)計(jì)文本里單詞出現(xiàn)的次數(shù)。因?yàn)橐虺煽蛇\(yùn)行的jar,所以,還需要引入maven的jar打包插件,如下:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.kl.SocketWindowWordCount</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
mainClass標(biāo)簽中就是你的main方法所在類(lèi)全類(lèi)名。然后mvn install就可以打出一個(gè)可運(yùn)行的jar包了。
第三步:NETCAT監(jiān)聽(tīng)端口,等待連接
從上面貼的地址下載Netcat后,是一個(gè)壓縮包,有些安全軟件可能會(huì)報(bào)病毒,請(qǐng)忽略就好了。然后解壓文件目錄如下:
進(jìn)入到這個(gè)目錄,然后執(zhí)行: nc64.exe -l -p 9000。相當(dāng)于打開(kāi)了9000端口,并監(jiān)聽(tīng)了入站信息。最后實(shí)現(xiàn)的效果就是從這個(gè)窗口中輸入的數(shù)據(jù),回車(chē)后會(huì)發(fā)送Apache Flink中我們提交的job中處理輸出,所以這里的9000端口,要和我們等下啟動(dòng)job的啟動(dòng)參數(shù)端口一致。
第四步:提交JOB運(yùn)行
運(yùn)行job有兩種方式:可以通過(guò)Flink.bat運(yùn)行,也可以通過(guò)web控制臺(tái)運(yùn)行。
命令行運(yùn)行:
FLINK RUN E:\FLINKWORKINGSPCE\FLINKDEMO\TARGET\FINLK-DEMO-1.0-SNAPSHOT.JAR --PORT 9000
WEB控制臺(tái)運(yùn)行:
如上圖,點(diǎn)擊Add New后選擇你的jar包然后上傳,上傳成功就會(huì)在列表里列出來(lái)。然后選中你上傳的jar。就會(huì)出現(xiàn)如下圖的輸入框,可以輸入你的啟動(dòng)參數(shù),然后點(diǎn)擊submit提交就可以了
第五步:驗(yàn)證效果
提交后如果沒(méi)有問(wèn)題,job的詳情頁(yè)面如下:
這個(gè)時(shí)候我們從Netcat的監(jiān)聽(tīng)的黑窗口中敲入一些長(zhǎng)文本,就會(huì)在Flink的job里統(tǒng)計(jì)輸出出來(lái)如:
文末結(jié)語(yǔ)
Flink的Windows環(huán)境入門(mén)實(shí)例還算順利,這只是第一步,后面Apache Flink的生產(chǎn)落地肯定還會(huì)有更多的問(wèn)題和挑戰(zhàn)。我們會(huì)把落地過(guò)程中的問(wèn)題拿到osc來(lái)和大家一起交流,歡迎大家關(guān)注凱京科技。
以上就是windows環(huán)境下flink入門(mén)實(shí)踐操作示例的詳細(xì)內(nèi)容,更多關(guān)于windows環(huán)境flink入門(mén)實(shí)踐的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
微軟Windows 12 Build 12.0.30000版本曝光 僅限于微軟內(nèi)部測(cè)試 ???
微軟Windows 12 Build 12.0.30000版本曝光 僅限于微軟內(nèi)部測(cè)試 ?本文就為大家?guī)?lái)了詳細(xì)介紹,感興趣的朋友一起看看吧2022-02-26Windows系統(tǒng)下安裝Etcd集群及etcd-viewer
這篇文章主要為大家介紹了Windows系統(tǒng)下安裝Etcd集群及etcd-viewer的過(guò)程步驟,有需要的朋友可以借鑒參考下,希望能夠有所幫助2022-05-10Win12 重磅爆料!微軟 Windows 12 計(jì)劃 3 月份開(kāi)始開(kāi)發(fā)
Win12 重磅爆料!微軟 Windows 12 計(jì)劃 3 月份開(kāi)始開(kāi)發(fā),本文為大家?guī)?lái)了詳細(xì)介紹,感興趣的朋友一起看看吧2022-02-22基于Win11微軟 Windows Server vNext 22526 中文版多圖預(yù)覽
前幾天微軟推出了 Windows Server vNext 22526,趁著有空在虛擬機(jī)里安裝了一下,本次在分享體驗(yàn)的是繁體中文版。感興趣的朋友一起看看吧2022-01-14安全平臺(tái)無(wú)法正常運(yùn)行!微軟新更新導(dǎo)致Windows Server系統(tǒng)出現(xiàn)嚴(yán)重故障
安全平臺(tái)無(wú)法正常運(yùn)行!微軟新更新導(dǎo)致Windows Server系統(tǒng)出現(xiàn)嚴(yán)重故障,今天小編就為大家?guī)?lái)了詳細(xì)介紹,感興趣的朋友一起看看吧2021-12-02微軟 KB5007205 更新導(dǎo)致終結(jié)點(diǎn)安全平臺(tái)故障,僅影響 Windows Server 2
微軟 KB5007205 更新導(dǎo)致終結(jié)點(diǎn)安全平臺(tái)故障,僅影響 Windows Server 2022,下文為大家?guī)?lái)介紹,一起看看吧2021-11-29微軟 Windows Terminal 新設(shè)計(jì)搶先看,全面采用 Win11 風(fēng)格
微軟終端工具 Windows Terminal 正在設(shè)計(jì)開(kāi)發(fā)新外觀,將全面適配 Win11 風(fēng)格,基于全新的 WinUI 控件,下文為大家?guī)?lái)了詳細(xì)介紹,一起看看吧2021-11-15Windows 搜索工具 Everything 小插件:支持拼音首字母搜索,能與系統(tǒng)搜
今天為大家介紹Windows 搜索工具 Everything 小插件,使用也比較簡(jiǎn)單,感興趣的朋友一起看看吧2021-10-08WindowsServer2022正式版發(fā)布:鏡像發(fā)布下載
WindowsServer2022建立在 Windows Server 2019 的強(qiáng)大基礎(chǔ)之上,在三個(gè)關(guān)鍵主題上引入了許多創(chuàng)新:安全性、Azure 混合集成和管理以及應(yīng)用程序平臺(tái)。此外,可借助 “Windows2021-09-03如何使用鍵盤(pán)快捷鍵控制移動(dòng)Windows窗口
怎么適用于鍵盤(pán)快捷鍵來(lái)移動(dòng)和關(guān)閉瀏覽器窗口?下文小編就為大家?guī)?lái)了詳細(xì)介紹,感興趣的朋友一起看看吧2021-09-01