SpringBoot超詳細(xì)講解集成Flink的部署與打包方法
一、SpringBoot集成Flink
其實沒什么特別的,就把Flink依賴的包在pom引入就行了。只是FlinkTask的寫法要小調(diào)整下,把相關(guān)依賴交給spring管理就行。
然后如果放棄Flink的Dashboard端監(jiān)控task執(zhí)行相關(guān)信息,那也可以在SpringBoot的啟動類里調(diào)用就行,但是可能出現(xiàn)task的相關(guān)對象沒有注入,這種都是小問題(實際就是springboot啟動完成再調(diào)用,或者通過自動任務(wù)調(diào)用。也可以在springBoot的入口類用@ComponentScan注解掃描flinkTask所在的目錄)。
實際更瀟灑一點的做法可以將flinkTask的信息存表,通過springBoot的接口調(diào)用restfullApi接口,接口里調(diào)用task,甚至可以做task的啟停、線程監(jiān)控(接口里開線程調(diào)用task)。
二、FlinkTask寫法調(diào)整
@Component @Slf4j public class JianGongStopCarTask { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<JGParkingLotInfo> dataStream = env.addSource(new JGParkingLotInfoSource()); // 獲取到數(shù)據(jù)之后轉(zhuǎn)換格式 此處不做轉(zhuǎn)換 SingleOutputStreamOperator<JGParkingLotInfo> jgParkingLotInfoSingleOutputStreamOperator = dataStream.map(jgParkingLotInfo -> jgParkingLotInfo); jgParkingLotInfoSingleOutputStreamOperator.addSink(new SinkToMySQL()); env.execute(); } }
PS:
實際就是@Component這個注解,然后這個示例里的JGParkingLotInfoSource、SinkToMySQL類都需要加這個注解。其他就沒有什么調(diào)整了,然后如果還有其他依賴沒有,使用hutool的SpringUtil進(jìn)行g(shù)et對象就行。
三、打包插件
<plugins> <!-- 編譯插件 --> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <!-- spring boot 項目打包 <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin>--> <!-- Flink打包方式一 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <archive> <manifest> <mainClass>com.easylinkin.dc.olap.JianGongStopCarTask</mainClass> </manifest> </archive> <!-- 打包依賴 --> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <!-- flink打包方式二 <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.3.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>module-info.class</exclude> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> <resource>reference.conf</resource> </transformer> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.easylinkin.dc.olap.JianGongStopCarTask</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> --> <!-- flink打包方式三 <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>com.easylinkin.dc.olap.JianGongStopCarTask</mainClass> <useUniqueVersions>false</useUniqueVersions> <addClasspath>true</addClasspath> <classpathPrefix>./lib/</classpathPrefix> </manifest> </archive> <excludes> <exclude>module-info.class</exclude> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>3.3.0</version> <executions> <execution> <id>copy-dependencies</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> <excludeTransitive>false</excludeTransitive> <stripVersion>false</stripVersion> </configuration> </execution> </executions> </plugin>--> </plugins>
除了打成SpringBoot用springboot的插件打包,flinkTask的打包有3種方式,方式三適合lib包提前傳到task指定的依賴存儲目錄。這樣上傳flinkTask就很小。
方式二是官方推薦FlinkTask的打包方式,地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/configuration/maven/
說一千道一萬就是因為打包的META-INF下的MANIFEST.MF文件的內(nèi)容有區(qū)別。springBoot項目的這個文件有自己的JarLauncher。
FlinkTask的jar這個文件內(nèi)容
四、Flink的上傳與運行
1、上傳并命令運行
配置好flink環(huán)境,命令就是
flink run WordCount.jar
2、Flink管理大屏上傳運行
點擊“Submit”
總結(jié)
這樣處理應(yīng)該是最優(yōu)雅的了,task的寫法改動也小。
官方用的是方式二打包,實際我覺得要是依賴特多,用方式三打包,然后將依賴的jar傳到flink運行環(huán)境flinkTask指定的目錄下應(yīng)該也不錯(flinkTask的包就變小了)
flink的這個計算監(jiān)控真香
所以基本還是就當(dāng)springBoot集成flink的項目一樣開發(fā)吧,在打包的時候注意切換插件就ok了。就分享到這里,up!
到此這篇關(guān)于SpringBoot超詳細(xì)講解集成Flink的部署與打包方法的文章就介紹到這了,更多相關(guān)SpringBoot Flink內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java函數(shù)式編程(十二):監(jiān)控文件修改
這篇文章主要介紹了Java函數(shù)式編程(十二):監(jiān)控文件修改,本文是系列文章的第12篇,其它文章請參閱本文底部的相關(guān)文章,需要的朋友可以參考下2014-09-09Spring?component-scan?XML配置與@ComponentScan注解配置
這篇文章主要介紹了Spring?component-scan?XML配置與@ComponentScan注解配置,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-09-09JAVA實現(xiàn)遍歷文件夾下的所有文件(遞歸調(diào)用和非遞歸調(diào)用)
本篇文章主要介紹了JAVA 遍歷文件夾下的所有文件(遞歸調(diào)用和非遞歸調(diào)用) ,具有一定的參考價值,有興趣的可以了解一下。2017-01-01