SpringBoot集成flink全過程
SpringBoot集成flink
Flink是一個批處理和流處理結(jié)合的統(tǒng)一計算框架,其核心是一個提供了數(shù)據(jù)分發(fā)以及并行化計算的流數(shù)據(jù)處理引擎。
最大亮點是流處理,最適合的應(yīng)用場景是低時延的數(shù)據(jù)處理。
場景
高并發(fā)pipeline處理數(shù)據(jù),時延毫秒級,且兼具可靠性。
環(huán)境搭建
①、安裝flink
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/
②、安裝Netcat
Netcat(又稱為NC)是一個計算機網(wǎng)絡(luò)工具,它可以在兩臺計算機之間建立 TCP/IP 或 UDP 連接。
用于測試網(wǎng)絡(luò)中的端口,發(fā)送文件等操作。
進行網(wǎng)絡(luò)調(diào)試和探測,也可以進行加密連接和遠程管理等高級網(wǎng)絡(luò)操作
yum install -y nc # 安裝nc命令 nc -lk 8888 # 啟動socket端口
無界流之讀取socket文本流
一、依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springboot-demo</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>flink</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 添加 Flink 依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>1.17.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.17.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>1.17.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.et.flink.job.SocketJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
二、SoketJob
public class SocketJob{ public static void main(String[] args)throws Exception{ // 創(chuàng)建執(zhí)行環(huán)境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 指定并行度,默認電腦線程數(shù) env.setParallelism(3); // 讀取數(shù)據(jù)socket文本流 指定監(jiān)聽 IP 端口 只有在接收到數(shù)據(jù)才會執(zhí)行任務(wù) DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888); // 處理數(shù)據(jù): 切換、轉(zhuǎn)換、分組、聚合 得到統(tǒng)計結(jié)果 SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS .flatMap( (String value, Collector<Tuple2<String, Integer>> out) -> { String[] words = value.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } ) .setParallelism(2) // // 顯式地提供類型信息:對于flatMap傳入Lambda表達式,系統(tǒng)只能推斷出返回的是Tuple2類型,而無法得到Tuple2<String, Long>。只有顯式設(shè)置系統(tǒng)當前返回類型,才能正確解析出完整數(shù)據(jù) .returns(new TypeHint<Tuple2<String, Integer>>() { }) // .returns(Types.TUPLE(Types.STRING,Types.INT)) .keyBy(value -> value.f0) .sum(1); // 輸出 sum.print(); // 執(zhí)行 env.execute(); } }
測試:
啟動socket流:
nc -l 8888
本地執(zhí)行:直接ideal啟動main程序,在socket流中輸入
abc bcd cde bcd cde fgh cde fgh hij
集群執(zhí)行:
執(zhí)行maven打包,將打包的jar上傳到集群中
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring Boot Maven Plugin打包異常解決方案
這篇文章主要介紹了Spring Boot Maven Plugin打包異常解決方案,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-11-11SpringBoot整合sharding-jdbc實現(xiàn)自定義分庫分表的實踐
本文主要介紹了SpringBoot整合sharding-jdbc實現(xiàn)自定義分庫分表的實踐,將通過自定義算法來實現(xiàn)定制化的分庫分表來擴展相應(yīng)業(yè)務(wù),感興趣的可以了解一下2021-11-11Java中String、StringBuffer和StringBuilder的區(qū)別
這篇文章主要介紹了Java中String、StringBuffer和StringBuilder的區(qū)別,StringBuilder與StringBuffer都繼承自AbstractStringBuilder類,在AbstractStringBuilder中也是使用字符數(shù)組保存字符串char[]value但是沒有final關(guān)鍵字修飾,所以這兩個可變,需要的朋友可以參考下2024-01-01SpringBoot實現(xiàn)文件上傳下載功能小結(jié)
最近做的一個項目涉及到文件上傳與下載功能。SpringBoot后臺如何實現(xiàn)文件上傳下載呢?下面有單文件上傳和多文件上傳功能,感興趣的朋友一起看看吧2017-08-08SpringBoot + SpringSecurity 環(huán)境搭建的步驟
這篇文章主要介紹了SpringBoot + SpringSecurity 環(huán)境搭建的步驟,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-05-05maven pom中內(nèi)置變量及引用的實現(xiàn)
maven其實有很多內(nèi)置變量供開發(fā)著在開發(fā)中使用,本文主要介紹了maven pom中內(nèi)置變量及引用的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2024-01-01mybatis-plus IdWorker生成的Id和返回給前臺的不一致的解決
這篇文章主要介紹了mybatis-plus IdWorker生成的Id和返回給前臺的不一致的解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-03-03