欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring?Cloud?Stream實現數據流處理

 更新時間:2024年11月19日 08:24:07   作者:HBLOG  
Spring?Cloud?Stream的核心是Stream,準確來講Spring?Cloud?Stream提供了一整套數據流走向(流向)的API,?它的最終目的是使我們不關心數據的流入和寫出,而只關心對數據的業(yè)務處理,本文給大家介紹了Spring?Cloud?Stream實現數據流處理,需要的朋友可以參考下

1.什么是Spring Cloud Stream?

我看很多回答都是“為了屏蔽消息隊列的差異,使我們在使用消息隊列的時候能夠用統(tǒng)一的一套API,無需關心具體的消息隊列實現”。 這樣理解是有些不全面的,Spring Cloud Stream的核心是Stream,準確來講Spring Cloud Stream提供了一整套數據流走向(流向)的API, 它的最終目的是使我們不關心數據的流入和寫出,而只關心對數據的業(yè)務處理 我們舉一個例子:你們公司有一套系統(tǒng),這套系統(tǒng)由多個模塊組成,你負責其中一個模塊。數據會從第一個模塊流入,處理完后再交給下一個模塊。對于你負責的這個模塊來說,它的功能就是接收上一個模塊處理完成的數據,自己再加工加工,扔給下一個模塊。

我們很容易總結出每個模塊的流程:

  • 從上一個模塊拉取數據
  • 處理數據
  • 將處理完成的數據發(fā)給下一個模塊

其中流程1和3代表兩個模塊間的數據交互,這種數據交互往往會采用一些中間件(middleware)。比如模塊1和模塊2間數據可能使用的是kafka,模塊1向kafka中push數據,模塊2向kafka中poll數據。而模塊2和模塊3可能使用的是rabbitMQ。很明顯,它們的功能都是一樣的:**提供數據的流向,讓數據可以流入自己同時又可以從自己流出發(fā)給別人。**但由于中間件的不同,需要使用不同的API。 為了消除這種數據流入(輸入)和數據流出(輸出)實現上的差異性,因此便出現了Spring Cloud Stream。

2.環(huán)境準備

采用docker-compose搭建kafaka環(huán)境

version: '3'

networks:
  kafka:
    ipam:
      driver: default
      config:
        - subnet: "172.22.6.0/24"

services:
  zookepper:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest
    container_name: zookeeper-server
    restart: unless-stopped
    volumes:
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes
    ports:
      - "2181:2181"
    networks:
      kafka:
        ipv4_address: 172.22.6.11

  kafka:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1
    container_name: kafka
    restart: unless-stopped
    volumes:
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092
    ports:
      - "9092:9092"
    depends_on:
      - zookepper
    networks:
      kafka:
        ipv4_address: 172.22.6.12

  kafka-map:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map
    container_name: kafka-map
    restart: unless-stopped
    volumes:
      - "./kafka/kafka-map/data:/usr/local/kafka-map/data"
    environment:
      DEFAULT_USERNAME: admin
      DEFAULT_PASSWORD: 123456
    ports:
      - "9080:8080"
    depends_on:                         
      - kafka
    networks:
      kafka:
        ipv4_address: 172.22.6.13

run

docker-compose -f docker-compose-kafka.yml -p kafka up -d

kafka-map

https://github.com/dushixiang/kafka-map

  • 訪問:http://127.0.0.1:9080
  • 賬號密碼:admin/123456

3.代碼工程

實驗目標

  • 生成UUID并將其發(fā)送到Kafka主題batch-in。
  • batch-in主題接收UUID的批量消息,移除其中的數字,并將結果發(fā)送到batch-out主題。
  • 監(jiān)聽batch-out主題并打印接收到的消息。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring-cloud-stream-kafaka</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Spring Boot Starter Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

    </dependencies>

</project>

處理流

/*
 * Copyright 2023 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.et;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * @author Steven Gantz
 */
@SpringBootApplication
public class CloudStreamsFunctionBatch {

   public static void main(String[] args) {
      SpringApplication.run(CloudStreamsFunctionBatch.class, args);
   }

   @Bean
   public Supplier<UUID> stringSupplier() {
      return () -> {
         var uuid = UUID.randomUUID();
         System.out.println(uuid + " -> batch-in");
         return uuid;
      };
   }

   @Bean
   public Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() {
      return idBatch -> {
         System.out.println("Removed digits from batch of " + idBatch.size());
         return idBatch.stream()
            .map(UUID::toString)
            // Remove all digits from the UUID
            .map(uuid -> uuid.replaceAll("\\d",""))
            .map(noDigitString -> MessageBuilder.withPayload(noDigitString).build())
            .toList();
      };
   }

   @KafkaListener(id = "batch-out", topics = "batch-out")
   public void listen(String in) {
      System.out.println("batch-out -> " + in);
   }

}
  • 定義一個名為stringSupplier的Bean,它實現了Supplier<UUID>接口。這個方法生成一個隨機的UUID,并打印到控制臺,表示這個UUID將被發(fā)送到batch-in主題。

  • 定義一個名為digitRemovingConsumer的Bean,它實現了Function<List<UUID>, List<Message<String>>>接口。這個方法接受一個UUID的列表,打印出處理的UUID數量,然后將每個UUID轉換為字符串,移除其中的所有數字,最后將結果封裝為消息并返回。

  • 使用@KafkaListener注解定義一個Kafka監(jiān)聽器,監(jiān)聽batch-out主題。當接收到消息時,調用listen方法并打印接收到的消息內容。

配置文件

spring:
  cloud:
    function:
      definition: stringSupplier;digitRemovingConsumer
    stream:
      bindings:
        stringSupplier-out-0:
          destination: batch-in
        digitRemovingConsumer-in-0:
          destination: batch-in
          group: batch-in
          consumer:
            batch-mode: true
        digitRemovingConsumer-out-0:
          destination: batch-out
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          digitRemovingConsumer-in-0:
            consumer:
              configuration:
                # Forces consumer to wait 5 seconds before polling for messages
                fetch.max.wait.ms: 5000
                fetch.min.bytes: 1000000000
                max.poll.records: 10000000

參數解釋

spring:
  cloud:
    function:
      definition: stringSupplier;digitRemovingConsumer

spring.cloud.function.definition:定義了兩個函數,stringSupplierdigitRemovingConsumer。這兩個函數將在應用程序中被使用。

stream:
  bindings:
    stringSupplier-out-0:
      destination: batch-in

stream.bindings.stringSupplier-out-0.destination:將stringSupplier函數的輸出綁定到Kafka主題batch-in。

    digitRemovingConsumer-in-0:
      destination: batch-in
      group: batch-in
      consumer:
        batch-mode: true
  • stream.bindings.digitRemovingConsumer-in-0.destination:將digitRemovingConsumer函數的輸入綁定到Kafka主題batch-in。

  • group: batch-in:指定消費者組為batch-in,這意味著多個實例可以共享這個組來處理消息。

  • consumer.batch-mode: true:啟用批處理模式,允許消費者一次處理多條消息。

    digitRemovingConsumer-out-0:
      destination: batch-out
  • stream.bindings.digitRemovingConsumer-out-0.destination:將digitRemovingConsumer函數的輸出綁定到Kafka主題batch-out。

以上只是一些關鍵代碼

4.測試

啟動弄Spring Boot應用,可以看到控制臺輸出日志如下:

291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in
c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in
a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in
db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in
b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in
Removed digits from batch of 5
batch-out -> eacc-ee-dfb-b-dead
batch-out -> cbae-e-f-c-acfb
batch-out -> ab-dd---adade
batch-out -> db-fb-f-bbb-bfdec
batch-out -> bdb--d-ad-bbbb

以上就是Spring Cloud Stream實現數據流處理的詳細內容,更多關于Spring Cloud Stream數據流處理的資料請關注腳本之家其它相關文章!

相關文章

  • 史上最全圖文講解Java泛型

    史上最全圖文講解Java泛型

    泛型在java中有很重要的地位,在面向對象編程及各種設計模式中有非常廣泛的應用,下面這篇文章主要給大家介紹了Java泛型的相關資料,需要的朋友可以參考下
    2022-02-02
  • java map中相同的key保存多個value值方式

    java map中相同的key保存多個value值方式

    這篇文章主要介紹了java map中相同的key保存多個value值方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Json轉list二層解析轉換代碼實例

    Json轉list二層解析轉換代碼實例

    這篇文章主要介紹了Json轉list二層解析轉換代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-12-12
  • Spring?Boot異步線程間數據傳遞的四種方式

    Spring?Boot異步線程間數據傳遞的四種方式

    這篇文章主要為大家介紹了Spring?Boot異步線程間數據傳遞的四種方式詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-01-01
  • JAVA泛型的繼承和實現、擦除原理解析

    JAVA泛型的繼承和實現、擦除原理解析

    這篇文章主要介紹了JAVA泛型的繼承和實現、擦除原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-11-11
  • 如何在Java中調用python文件執(zhí)行詳解

    如何在Java中調用python文件執(zhí)行詳解

    豐富的第三方庫使得python非常適合用于進行數據分析,最近在項目中就涉及到java調用python實現的算法,下面這篇文章主要給大家介紹了關于如何在Java中調用python文件執(zhí)行的相關資料,需要的朋友可以參考下
    2022-05-05
  • Spring循環(huán)依賴的解決方法詳解

    Spring循環(huán)依賴的解決方法詳解

    Spring的解決循環(huán)依賴是有前置條件的,要解決循環(huán)依賴我們首先要了解Spring Bean對象的創(chuàng)建過程和依賴注入的方式。依賴注入方式,我之前的博客有所分享,大家可以在看本篇文章之前進行一下小小的回顧
    2022-08-08
  • Java操作mongodb增刪改查的基本操作實戰(zhàn)指南

    Java操作mongodb增刪改查的基本操作實戰(zhàn)指南

    MongoDB是一個基于分布式文件存儲的數據庫,由c++語言編寫,旨在為WEB應用提供可擴展的高性能數據存儲解決方案,下面這篇文章主要給大家介紹了關于Java操作mongodb增刪改查的基本操作實戰(zhàn)指南,需要的朋友可以參考下
    2023-05-05
  • java編程小白進階包的作用詳解

    java編程小白進階包的作用詳解

    這篇文章主要為大家介紹了java編程中包的作用詳解,文中通過示例分析方便大家更容易理解包的作用,有需要的朋友可以借鑒參考下,希望能夠有所幫助
    2021-10-10
  • springboot多個service互相調用的事務處理方式

    springboot多個service互相調用的事務處理方式

    這篇文章主要介紹了springboot多個service互相調用的事務處理方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-02-02

最新評論