欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解

 更新時間:2023年01月07日 09:12:25   作者:dk168  
這篇文章主要介紹了基于SpringBoot?使用?Flink?收發(fā)Kafka消息,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

前言

這周學習下Flink相關的知識,學習到一個讀寫Kafka消息的示例, 自己動手實踐了一下,別人示例使用的是普通的Java Main方法,沒有用到spring boot. 我們在實際工作中會使用spring boot。 因此我做了些加強, 把流程打通了,過程記錄下來。

準備工作

首先我們通過docker安裝一個kafka服務,參照Kafka的官方知道文檔
https://developer.confluent.io/tutorials/kafka-console-consumer-producer-basics/kafka.html主要的是有個docker-compose.yml文件

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

docker compose up -d
就可以把kafka docker 環(huán)境搭起來,
使用以下命令,創(chuàng)建一個flink.kafka.streaming.source的topic
docker exec -t broker kafka-topics --create --topic flink.kafka.streaming.source --bootstrap-server broker:9092
然后使用命令,就可以進入到kafka機器的命令行
docker exec -it broker bash
官方文檔示例中沒有-it, 運行后沒有進入broker的命令行,加上來才可以。這里說明下

Flink我們打算直接采用開發(fā)工具運行,暫時未搭環(huán)境,以體驗為主。

開發(fā)階段

首先需要引入的包POM文件

    <properties>
        <jdk.version>1.8</jdk.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-boot.version>2.7.7</spring-boot.version>
        <flink.version>1.16.0</flink.version>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

這里我們使用Java8, 本來想使用Spring Boot 3的,但是Spring Boot 3 最低需要Java17了, 目前Flink支持Java8和Java11,所以我們使用Spring Boot 2, Java 8來開發(fā)。

spring-boot-starter 我們就一個命令行程序,所以用這個就夠了
lombok 用來定義model
flink-java, flink-clients, flink-streaming-java 是使用基本組件, 缺少flink-clients編譯階段不會報錯,運行的時候會報java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
flink-connector-kafka 是連接kafka用
我們這里把provided, 打包的時候不用打包flink相關組件,由運行環(huán)境提供。但是IDEA運行的時候會報java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/DeserializationSchema,
在運行的configuration上面勾選上“add dependencies with provided scope to classpath”可以解決這個問題。

主要代碼

@Component
@Slf4j
public class KafkaRunner implements ApplicationRunner
{
    @Override
    public void run(ApplicationArguments args) throws Exception {
        try{

            /****************************************************************************
             *                 Setup Flink environment.
             ****************************************************************************/

            // Set up the streaming execution environment
            final StreamExecutionEnvironment streamEnv
                    = StreamExecutionEnvironment.getExecutionEnvironment();

            /****************************************************************************
             *                  Read Kafka Topic Stream into a DataStream.
             ****************************************************************************/

            //Set connection properties to Kafka Cluster
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:29092");
            properties.setProperty("group.id", "flink.learn.realtime");

            //Setup a Kafka Consumer on Flnk
            FlinkKafkaConsumer<String> kafkaConsumer =
                    new FlinkKafkaConsumer<>
                            ("flink.kafka.streaming.source", //topic
                                    new SimpleStringSchema(), //Schema for data
                                    properties); //connection properties

            //Setup to receive only new messages
            kafkaConsumer.setStartFromLatest();

            //Create the data stream
            DataStream<String> auditTrailStr = streamEnv
                    .addSource(kafkaConsumer);

            //Convert each record to an Object
            DataStream<Tuple2<String, Integer>> userCounts
                    = auditTrailStr
                    .map(new MapFunction<String,Tuple2<String,Integer>>() {

                        @Override
                        public Tuple2<String,Integer> map(String auditStr) {
                            System.out.println("--- Received Record : " + auditStr);
                            AuditTrail at = new AuditTrail(auditStr);
                            return new Tuple2<String,Integer>(at.getUser(),at.getDuration());
                        }
                    })

                    .keyBy(0)  //By user name
                    .reduce((x,y) -> new Tuple2<String,Integer>( x.f0, x.f1 + y.f1));

            //Print User and Durations.
            userCounts.print();

            /****************************************************************************
             *                  Setup data source and execute the Flink pipeline
             ****************************************************************************/
            //Start the Kafka Stream generator on a separate thread
            System.out.println("Starting Kafka Data Generator...");
            Thread kafkaThread = new Thread(new KafkaStreamDataGenerator());
            kafkaThread.start();

            // execute the streaming pipeline
            streamEnv.execute("Flink Windowing Example");

        }
        catch(Exception e) {
            e.printStackTrace();
        }
    }
}

簡單說明下程序
DataStream auditTrailStr = streamEnv
.addSource(kafkaConsumer);
就是接通了Kafka Source

        Thread kafkaThread = new Thread(new KafkaStreamDataGenerator());
        kafkaThread.start();

這段代碼是另外開一個線程往kafka里面去發(fā)送文本消息
我們在這個示例中就是一個線程發(fā),然后flink就讀出來,然后統(tǒng)計出每個用戶的操作時間。
auditTrailStr.map 就是來進行統(tǒng)計操作。

運行效果

可以看到Kafka一邊發(fā)送,然后我們就一邊讀出來,然后就統(tǒng)計出了每個用戶的時間。

總結

本文只是簡單的打通了幾個環(huán)節(jié),對于flink的知識沒有涉及太多,算是一個環(huán)境入門。后面學習更多的以后我們再深入些來記錄flink. 示例代碼會放到 https://github.com/dengkun39/redisdemo.git spring-boot-flink 文件夾。

相關文章

  • Java中DateTimeFormatter的使用方法和案例

    Java中DateTimeFormatter的使用方法和案例

    在Java中,DateTimeFormatter類用于格式化和解析日期時間對象,它是日期時間格式化的強大而靈活的工具,本文將和大家一起探討Java中DateTimeFormatter的使用方法和案例,需要的朋友可以參考下
    2023-10-10
  • MyBatis學習筆記(二)之關聯(lián)關系

    MyBatis學習筆記(二)之關聯(lián)關系

    這篇文章主要介紹了MyBatis學習筆記(二)之關聯(lián)關系 的相關資料,需要的朋友可以參考下
    2016-02-02
  • 淺談 JDBC 元數(shù)據(jù)

    淺談 JDBC 元數(shù)據(jù)

    這篇文章主要介紹了JDBC元數(shù)據(jù)的相關內(nèi)容,涉及一些獲取數(shù)據(jù)源各種信息的方法,具有一定參考價值,需要的朋友可以了解下。
    2017-09-09
  • Java實現(xiàn)解析.xlsb文件的示例代碼

    Java實現(xiàn)解析.xlsb文件的示例代碼

    這篇文章主要為大家詳細介紹了Java實現(xiàn)解析.xlsb文件的相關方法,文中的示例代碼講解詳細,具有一定的借鑒價值,感興趣的可以了解一下
    2023-01-01
  • java實現(xiàn)文件重命名的方法

    java實現(xiàn)文件重命名的方法

    這篇文章主要介紹了java實現(xiàn)文件重命名的方法,涉及java針對文件的重命名操作技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-07-07
  • Java編程發(fā)展歷史(動力節(jié)點Java學院整理)

    Java編程發(fā)展歷史(動力節(jié)點Java學院整理)

    Java的歷史可以追溯到1991年4月,Sun公司的James Gosling領導的綠色計劃(Green Project)開始著力發(fā)展一種分布式系統(tǒng)結構,使其能夠在各種消費性電子產(chǎn)品上運行,他們使用了C/C++/Oak語言。由于多種原因,綠色計劃逐漸陷于停滯狀態(tài)
    2017-03-03
  • Springboot中使用Redis實現(xiàn)分布式鎖的示例代碼

    Springboot中使用Redis實現(xiàn)分布式鎖的示例代碼

    在分布式系統(tǒng)中,為了保證數(shù)據(jù)的一致性和任務的互斥執(zhí)行,分布式鎖是一種常見的解決方案,本文主要介紹了Springboot中使用Redis實現(xiàn)分布式鎖的示例代碼,具有一定的參考價值,感興趣的可以了解一下
    2024-05-05
  • Java如何通過File類方法刪除指定文件夾中的全部文件

    Java如何通過File類方法刪除指定文件夾中的全部文件

    這篇文章主要給大家介紹了關于Java如何通過File類方法刪除指定文件夾中的全部文件的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-01-01
  • Java中Json解析的方法分析

    Java中Json解析的方法分析

    這篇文章主要介紹了Java中Json解析的方法,結合實例形式分析了java針對json格式數(shù)據(jù)的解析實現(xiàn)步驟與相關操作技巧,需要的朋友可以參考下
    2017-05-05
  • Lucene實現(xiàn)多種高級搜索形式

    Lucene實現(xiàn)多種高級搜索形式

    這篇文章主要介紹了Lucene實現(xiàn)多種高級搜索形式的相關資料,需要的朋友可以參考下
    2017-04-04

最新評論