基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解
前言
這周學(xué)習(xí)下Flink相關(guān)的知識(shí),學(xué)習(xí)到一個(gè)讀寫Kafka消息的示例, 自己動(dòng)手實(shí)踐了一下,別人示例使用的是普通的Java Main方法,沒有用到spring boot. 我們?cè)趯?shí)際工作中會(huì)使用spring boot。 因此我做了些加強(qiáng), 把流程打通了,過(guò)程記錄下來(lái)。
準(zhǔn)備工作
首先我們通過(guò)docker安裝一個(gè)kafka服務(wù),參照Kafka的官方知道文檔
https://developer.confluent.io/tutorials/kafka-console-consumer-producer-basics/kafka.html主要的是有個(gè)docker-compose.yml文件
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0docker compose up -d
就可以把kafka docker 環(huán)境搭起來(lái),
使用以下命令,創(chuàng)建一個(gè)flink.kafka.streaming.source的topic
docker exec -t broker kafka-topics --create --topic flink.kafka.streaming.source --bootstrap-server broker:9092
然后使用命令,就可以進(jìn)入到kafka機(jī)器的命令行
docker exec -it broker bash
官方文檔示例中沒有-it, 運(yùn)行后沒有進(jìn)入broker的命令行,加上來(lái)才可以。這里說(shuō)明下
Flink我們打算直接采用開發(fā)工具運(yùn)行,暫時(shí)未搭環(huán)境,以體驗(yàn)為主。
開發(fā)階段
首先需要引入的包POM文件
<properties>
<jdk.version>1.8</jdk.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-boot.version>2.7.7</spring-boot.version>
<flink.version>1.16.0</flink.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>這里我們使用Java8, 本來(lái)想使用Spring Boot 3的,但是Spring Boot 3 最低需要Java17了, 目前Flink支持Java8和Java11,所以我們使用Spring Boot 2, Java 8來(lái)開發(fā)。
spring-boot-starter 我們就一個(gè)命令行程序,所以用這個(gè)就夠了
lombok 用來(lái)定義model
flink-java, flink-clients, flink-streaming-java 是使用基本組件, 缺少flink-clients編譯階段不會(huì)報(bào)錯(cuò),運(yùn)行的時(shí)候會(huì)報(bào)java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
flink-connector-kafka 是連接kafka用
我們這里把provided, 打包的時(shí)候不用打包flink相關(guān)組件,由運(yùn)行環(huán)境提供。但是IDEA運(yùn)行的時(shí)候會(huì)報(bào)java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/DeserializationSchema,
在運(yùn)行的configuration上面勾選上“add dependencies with provided scope to classpath”可以解決這個(gè)問(wèn)題。
主要代碼
@Component
@Slf4j
public class KafkaRunner implements ApplicationRunner
{
@Override
public void run(ApplicationArguments args) throws Exception {
try{
/****************************************************************************
* Setup Flink environment.
****************************************************************************/
// Set up the streaming execution environment
final StreamExecutionEnvironment streamEnv
= StreamExecutionEnvironment.getExecutionEnvironment();
/****************************************************************************
* Read Kafka Topic Stream into a DataStream.
****************************************************************************/
//Set connection properties to Kafka Cluster
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:29092");
properties.setProperty("group.id", "flink.learn.realtime");
//Setup a Kafka Consumer on Flnk
FlinkKafkaConsumer<String> kafkaConsumer =
new FlinkKafkaConsumer<>
("flink.kafka.streaming.source", //topic
new SimpleStringSchema(), //Schema for data
properties); //connection properties
//Setup to receive only new messages
kafkaConsumer.setStartFromLatest();
//Create the data stream
DataStream<String> auditTrailStr = streamEnv
.addSource(kafkaConsumer);
//Convert each record to an Object
DataStream<Tuple2<String, Integer>> userCounts
= auditTrailStr
.map(new MapFunction<String,Tuple2<String,Integer>>() {
@Override
public Tuple2<String,Integer> map(String auditStr) {
System.out.println("--- Received Record : " + auditStr);
AuditTrail at = new AuditTrail(auditStr);
return new Tuple2<String,Integer>(at.getUser(),at.getDuration());
}
})
.keyBy(0) //By user name
.reduce((x,y) -> new Tuple2<String,Integer>( x.f0, x.f1 + y.f1));
//Print User and Durations.
userCounts.print();
/****************************************************************************
* Setup data source and execute the Flink pipeline
****************************************************************************/
//Start the Kafka Stream generator on a separate thread
System.out.println("Starting Kafka Data Generator...");
Thread kafkaThread = new Thread(new KafkaStreamDataGenerator());
kafkaThread.start();
// execute the streaming pipeline
streamEnv.execute("Flink Windowing Example");
}
catch(Exception e) {
e.printStackTrace();
}
}
}
簡(jiǎn)單說(shuō)明下程序
DataStream auditTrailStr = streamEnv
.addSource(kafkaConsumer);
就是接通了Kafka Source
Thread kafkaThread = new Thread(new KafkaStreamDataGenerator());
kafkaThread.start();這段代碼是另外開一個(gè)線程往kafka里面去發(fā)送文本消息
我們?cè)谶@個(gè)示例中就是一個(gè)線程發(fā),然后flink就讀出來(lái),然后統(tǒng)計(jì)出每個(gè)用戶的操作時(shí)間。
auditTrailStr.map 就是來(lái)進(jìn)行統(tǒng)計(jì)操作。
運(yùn)行效果

可以看到Kafka一邊發(fā)送,然后我們就一邊讀出來(lái),然后就統(tǒng)計(jì)出了每個(gè)用戶的時(shí)間。
總結(jié)
本文只是簡(jiǎn)單的打通了幾個(gè)環(huán)節(jié),對(duì)于flink的知識(shí)沒有涉及太多,算是一個(gè)環(huán)境入門。后面學(xué)習(xí)更多的以后我們?cè)偕钊胄﹣?lái)記錄flink. 示例代碼會(huì)放到 https://github.com/dengkun39/redisdemo.git spring-boot-flink 文件夾。
相關(guān)文章
Java中DateTimeFormatter的使用方法和案例
在Java中,DateTimeFormatter類用于格式化和解析日期時(shí)間對(duì)象,它是日期時(shí)間格式化的強(qiáng)大而靈活的工具,本文將和大家一起探討Java中DateTimeFormatter的使用方法和案例,需要的朋友可以參考下2023-10-10
MyBatis學(xué)習(xí)筆記(二)之關(guān)聯(lián)關(guān)系
這篇文章主要介紹了MyBatis學(xué)習(xí)筆記(二)之關(guān)聯(lián)關(guān)系 的相關(guān)資料,需要的朋友可以參考下2016-02-02
Java實(shí)現(xiàn)解析.xlsb文件的示例代碼
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)解析.xlsb文件的相關(guān)方法,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,感興趣的可以了解一下2023-01-01
Java編程發(fā)展歷史(動(dòng)力節(jié)點(diǎn)Java學(xué)院整理)
Java的歷史可以追溯到1991年4月,Sun公司的James Gosling領(lǐng)導(dǎo)的綠色計(jì)劃(Green Project)開始著力發(fā)展一種分布式系統(tǒng)結(jié)構(gòu),使其能夠在各種消費(fèi)性電子產(chǎn)品上運(yùn)行,他們使用了C/C++/Oak語(yǔ)言。由于多種原因,綠色計(jì)劃逐漸陷于停滯狀態(tài)2017-03-03
Springboot中使用Redis實(shí)現(xiàn)分布式鎖的示例代碼
在分布式系統(tǒng)中,為了保證數(shù)據(jù)的一致性和任務(wù)的互斥執(zhí)行,分布式鎖是一種常見的解決方案,本文主要介紹了Springboot中使用Redis實(shí)現(xiàn)分布式鎖的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下2024-05-05
Java如何通過(guò)File類方法刪除指定文件夾中的全部文件
這篇文章主要給大家介紹了關(guān)于Java如何通過(guò)File類方法刪除指定文件夾中的全部文件的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01
Lucene實(shí)現(xiàn)多種高級(jí)搜索形式
這篇文章主要介紹了Lucene實(shí)現(xiàn)多種高級(jí)搜索形式的相關(guān)資料,需要的朋友可以參考下2017-04-04

