欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring Boot 整合 Apache Flink 的詳細(xì)過程

 更新時(shí)間:2025年06月06日 14:30:54   作者:嘵奇  
Apache Flink 是一個(gè)高性能的分布式流處理框架,而Spring Boot提供了快速構(gòu)建企業(yè)級應(yīng)用的能力,下面給大家介紹Spring Boot 整合 Apache Flink 教程,感興趣的朋友一起看看吧

Spring Boot 整合 Apache Flink 教程

一、背景與目標(biāo)

Apache Flink 是一個(gè)高性能的分布式流處理框架,而Spring Boot提供了快速構(gòu)建企業(yè)級應(yīng)用的能力。整合二者可實(shí)現(xiàn):

  • 利用Spring Boot的依賴注入、配置管理等功能簡化Flink作業(yè)開發(fā)
  • 構(gòu)建完整的微服務(wù)架構(gòu),將流處理嵌入Spring生態(tài)
  • 實(shí)現(xiàn)動(dòng)態(tài)作業(yè)提交與管理

二、環(huán)境準(zhǔn)備

  • JDK 17+
  • Maven 3.8+
  • Spring Boot 3.1.5
  • Flink 1.17.2

三、創(chuàng)建項(xiàng)目 & 添加依賴

1. 創(chuàng)建Spring Boot項(xiàng)目

使用Spring Initializr生成基礎(chǔ)項(xiàng)目,選擇:

  • Maven
  • Spring Web(可選,用于創(chuàng)建REST接口)

2. 添加Flink依賴

<!-- pom.xml -->
<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- Flink核心依賴 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.17.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.17.2</version>
        <scope>provided</scope>
    </dependency>
    <!-- 本地執(zhí)行時(shí)需添加 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime</artifactId>
        <version>1.17.2</version>
        <scope>test</scope>
    </dependency>
</dependencies>

四、基礎(chǔ)整合示例

1. 編寫Flink流處理作業(yè)

// src/main/java/com/example/demo/flink/WordCountJob.java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountJob {
    public static void execute() throws Exception {
        final StreamExecutionEnvironment env = 
            StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> text = env.fromElements(
            "Spring Boot整合Flink",
            "Flink實(shí)時(shí)流處理",
            "Spring生態(tài)集成"
        );
        DataStream<WordCount> counts = text
            .flatMap(new FlatMapFunction<String, WordCount>() {
                @Override
                public void flatMap(String value, Collector<WordCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordCount(word, 1L));
                    }
                }
            })
            .keyBy(value -> value.word)
            .sum("count");
        counts.print();
        env.execute("Spring Boot Flink Job");
    }
    public static class WordCount {
        public String word;
        public long count;
        public WordCount() {}
        public WordCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

2. 在Spring Boot中啟動(dòng)作業(yè)

// src/main/java/com/example/demo/DemoApplication.java
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
    @Override
    public void run(String... args) throws Exception {
        WordCountJob.execute(); // 啟動(dòng)Flink作業(yè)
    }
}

五、進(jìn)階整合 - 通過REST API動(dòng)態(tài)提交作業(yè)

1. 創(chuàng)建Job提交服務(wù)

// src/main/java/com/example/demo/service/FlinkJobService.java
@Service
public class FlinkJobService {
    public String submitWordCountJob(List<String> inputLines) {
        try {
            final StreamExecutionEnvironment env = 
                StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> text = env.fromCollection(inputLines);
            // ...(同上WordCount邏輯)
            JobExecutionResult result = env.execute();
            return "JobID: " + result.getJobID();
        } catch (Exception e) {
            return "Job Failed: " + e.getMessage();
        }
    }
}

2. 創(chuàng)建REST控制器

// src/main/java/com/example/demo/controller/JobController.java
@RestController
@RequestMapping("/jobs")
public class JobController {
    @Autowired
    private FlinkJobService flinkJobService;
    @PostMapping("/wordcount")
    public String submitWordCount(@RequestBody List<String> inputs) {
        return flinkJobService.submitWordCountJob(inputs);
    }
}

六、關(guān)鍵配置說明

1. application.properties

# 設(shè)置Flink本地執(zhí)行環(huán)境
spring.flink.local.enabled=true
spring.flink.job.name=SpringBootFlinkJob
# 調(diào)整并行度(根據(jù)CPU核心數(shù))
spring.flink.parallelism=4

2. 解決依賴沖突

在pom.xml中排除沖突依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.17.2</version>
    <exclusions>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>

七、運(yùn)行與驗(yàn)證

啟動(dòng)Spring Boot應(yīng)用:

mvn spring-boot:run

調(diào)用API提交作業(yè):

curl -X POST -H "Content-Type: application/json" \
-d '["Hello Flink", "Spring Boot Integration"]' \
http://localhost:8080/jobs/wordcount

查看控制臺輸出:

Flink> Spring : 1
Flink> Boot : 1
Flink> Integration : 1
...

八、生產(chǎn)環(huán)境注意事項(xiàng)

集群部署:將打包后的jar提交到Flink集群

flink run -c com.example.demo.DemoApplication your-application.jar

狀態(tài)管理:集成Flink State Backend(如RocksDB)

監(jiān)控集成:通過Micrometer接入Spring Boot Actuator

資源隔離:使用YarnKubernetes部署模式

九、完整項(xiàng)目結(jié)構(gòu)

src/
├── main/
│   ├── java/
│   │   ├── com/example/demo/
│   │   │   ├── DemoApplication.java
│   │   │   ├── flink/
│   │   │   │   └── WordCountJob.java
│   │   │   ├── controller/
│   │   │   ├── service/
│   ├── resources/
│   │   └── application.properties
pom.xml

通過以上步驟,即可實(shí)現(xiàn)Spring Boot與Apache Flink的深度整合。這種架構(gòu)特別適合需要將實(shí)時(shí)流處理能力嵌入微服務(wù)體系的場景,如實(shí)時(shí)風(fēng)控系統(tǒng)、IoT數(shù)據(jù)處理平臺等。后續(xù)可擴(kuò)展集成Kafka、HBase等大數(shù)據(jù)組件。

到此這篇關(guān)于Spring Boot 整合 Apache Flink 教程的文章就介紹到這了,更多相關(guān)Spring Boot 整合 Apache Flink內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

  • Mybatis使用foreach標(biāo)簽實(shí)現(xiàn)批量插入方式

    Mybatis使用foreach標(biāo)簽實(shí)現(xiàn)批量插入方式

    這篇文章主要介紹了Mybatis使用foreach標(biāo)簽實(shí)現(xiàn)批量插入方式,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-03-03
  • 詳解Lombok快速上手(安裝、使用與注解參數(shù))

    詳解Lombok快速上手(安裝、使用與注解參數(shù))

    這篇文章主要介紹了詳解Lombok快速上手(安裝、使用與注解參數(shù)) ,這里整理了一些日常編碼中能遇到的所有關(guān)于它的使用詳解,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-12-12
  • 詳解Java分布式IP限流和防止惡意IP攻擊方案

    詳解Java分布式IP限流和防止惡意IP攻擊方案

    這篇文章主要介紹了詳解Java分布式IP限流和防止惡意IP攻擊方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-03-03
  • Spring?Boot中觸發(fā)異步任務(wù)的幾種實(shí)現(xiàn)方式總結(jié)

    Spring?Boot中觸發(fā)異步任務(wù)的幾種實(shí)現(xiàn)方式總結(jié)

    這篇文章主要介紹了Spring?Boot中觸發(fā)異步任務(wù)的幾種實(shí)現(xiàn)方式,包括使用@Async注解、消息隊(duì)列、CompletableFuture和Spring?Events,每種方法都有其優(yōu)缺點(diǎn),文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2025-04-04
  • Java中常用的設(shè)計(jì)模式之單例模式詳解

    Java中常用的設(shè)計(jì)模式之單例模式詳解

    這篇文章主要為大家詳細(xì)介紹了Java中常用的設(shè)計(jì)模式之單例模式,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-02-02
  • MyBatis中有關(guān)int和Integer的使用方式

    MyBatis中有關(guān)int和Integer的使用方式

    這篇文章主要介紹了MyBatis中有關(guān)int和Integer的使用方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • 將java程序打包成可執(zhí)行文件的實(shí)現(xiàn)方式

    將java程序打包成可執(zhí)行文件的實(shí)現(xiàn)方式

    本文介紹了將Java程序打包成可執(zhí)行文件的三種方法:手動(dòng)打包(將編譯后的代碼及JRE運(yùn)行環(huán)境一起打包),使用第三方打包工具(如Launch4j)和JDK自帶工具(jpackage),每種方法都有其優(yōu)缺點(diǎn),可根據(jù)實(shí)際需求選擇合適的方式
    2025-02-02
  • MyBatis攔截器分表實(shí)踐分享

    MyBatis攔截器分表實(shí)踐分享

    部門內(nèi)有一些億級別核心業(yè)務(wù)表增速非???增量日均100W,但線上業(yè)務(wù)只依賴近一周的數(shù)據(jù),隨著數(shù)據(jù)量的迅速增長,慢SQL頻發(fā),數(shù)據(jù)庫性能下降,系統(tǒng)穩(wěn)定性受到嚴(yán)重影響,本篇文章,將分享如何使用MyBatis攔截器低成本的提升數(shù)據(jù)庫穩(wěn)定性,需要的朋友可以參考下
    2024-01-01
  • Netty學(xué)習(xí)之理解selector原理示例

    Netty學(xué)習(xí)之理解selector原理示例

    這篇文章主要為大家介紹了Netty學(xué)習(xí)之理解selector原理示例使用分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪<BR>
    2023-07-07
  • 最新評論