SpringBoot集成flink全過程
SpringBoot集成flink
Flink是一個批處理和流處理結(jié)合的統(tǒng)一計(jì)算框架,其核心是一個提供了數(shù)據(jù)分發(fā)以及并行化計(jì)算的流數(shù)據(jù)處理引擎。
最大亮點(diǎn)是流處理,最適合的應(yīng)用場景是低時延的數(shù)據(jù)處理。
場景
高并發(fā)pipeline處理數(shù)據(jù),時延毫秒級,且兼具可靠性。
環(huán)境搭建
①、安裝flink
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/
②、安裝Netcat
Netcat(又稱為NC)是一個計(jì)算機(jī)網(wǎng)絡(luò)工具,它可以在兩臺計(jì)算機(jī)之間建立 TCP/IP 或 UDP 連接。
用于測試網(wǎng)絡(luò)中的端口,發(fā)送文件等操作。
進(jìn)行網(wǎng)絡(luò)調(diào)試和探測,也可以進(jìn)行加密連接和遠(yuǎn)程管理等高級網(wǎng)絡(luò)操作
yum install -y nc # 安裝nc命令 nc -lk 8888 # 啟動socket端口
無界流之讀取socket文本流
一、依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springboot-demo</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>flink</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 添加 Flink 依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>1.17.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.17.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>1.17.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.et.flink.job.SocketJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
二、SoketJob
public class SocketJob{ public static void main(String[] args)throws Exception{ // 創(chuàng)建執(zhí)行環(huán)境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 指定并行度,默認(rèn)電腦線程數(shù) env.setParallelism(3); // 讀取數(shù)據(jù)socket文本流 指定監(jiān)聽 IP 端口 只有在接收到數(shù)據(jù)才會執(zhí)行任務(wù) DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888); // 處理數(shù)據(jù): 切換、轉(zhuǎn)換、分組、聚合 得到統(tǒng)計(jì)結(jié)果 SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS .flatMap( (String value, Collector<Tuple2<String, Integer>> out) -> { String[] words = value.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } ) .setParallelism(2) // // 顯式地提供類型信息:對于flatMap傳入Lambda表達(dá)式,系統(tǒng)只能推斷出返回的是Tuple2類型,而無法得到Tuple2<String, Long>。只有顯式設(shè)置系統(tǒng)當(dāng)前返回類型,才能正確解析出完整數(shù)據(jù) .returns(new TypeHint<Tuple2<String, Integer>>() { }) // .returns(Types.TUPLE(Types.STRING,Types.INT)) .keyBy(value -> value.f0) .sum(1); // 輸出 sum.print(); // 執(zhí)行 env.execute(); } }
測試:
啟動socket流:
nc -l 8888
本地執(zhí)行:直接ideal啟動main程序,在socket流中輸入
abc bcd cde bcd cde fgh cde fgh hij
集群執(zhí)行:
執(zhí)行maven打包,將打包的jar上傳到集群中
總結(jié)
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring Boot Maven Plugin打包異常解決方案
這篇文章主要介紹了Spring Boot Maven Plugin打包異常解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-11-11SpringBoot整合sharding-jdbc實(shí)現(xiàn)自定義分庫分表的實(shí)踐
本文主要介紹了SpringBoot整合sharding-jdbc實(shí)現(xiàn)自定義分庫分表的實(shí)踐,將通過自定義算法來實(shí)現(xiàn)定制化的分庫分表來擴(kuò)展相應(yīng)業(yè)務(wù),感興趣的可以了解一下2021-11-11Java中String、StringBuffer和StringBuilder的區(qū)別
這篇文章主要介紹了Java中String、StringBuffer和StringBuilder的區(qū)別,StringBuilder與StringBuffer都繼承自AbstractStringBuilder類,在AbstractStringBuilder中也是使用字符數(shù)組保存字符串char[]value但是沒有final關(guān)鍵字修飾,所以這兩個可變,需要的朋友可以參考下2024-01-01SpringBoot實(shí)現(xiàn)文件上傳下載功能小結(jié)
最近做的一個項(xiàng)目涉及到文件上傳與下載功能。SpringBoot后臺如何實(shí)現(xiàn)文件上傳下載呢?下面有單文件上傳和多文件上傳功能,感興趣的朋友一起看看吧2017-08-08SpringBoot + SpringSecurity 環(huán)境搭建的步驟
這篇文章主要介紹了SpringBoot + SpringSecurity 環(huán)境搭建的步驟,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-05-05maven pom中內(nèi)置變量及引用的實(shí)現(xiàn)
maven其實(shí)有很多內(nèi)置變量供開發(fā)著在開發(fā)中使用,本文主要介紹了maven pom中內(nèi)置變量及引用的實(shí)現(xiàn),具有一定的參考價值,感興趣的可以了解一下2024-01-01mybatis-plus IdWorker生成的Id和返回給前臺的不一致的解決
這篇文章主要介紹了mybatis-plus IdWorker生成的Id和返回給前臺的不一致的解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03