Spring?Cloud?Stream實現數據流處理
1.什么是Spring Cloud Stream?
我看很多回答都是“為了屏蔽消息隊列的差異,使我們在使用消息隊列的時候能夠用統(tǒng)一的一套API,無需關心具體的消息隊列實現”。 這樣理解是有些不全面的,Spring Cloud Stream的核心是Stream,準確來講Spring Cloud Stream提供了一整套數據流走向(流向)的API, 它的最終目的是使我們不關心數據的流入和寫出,而只關心對數據的業(yè)務處理 我們舉一個例子:你們公司有一套系統(tǒng),這套系統(tǒng)由多個模塊組成,你負責其中一個模塊。數據會從第一個模塊流入,處理完后再交給下一個模塊。對于你負責的這個模塊來說,它的功能就是接收上一個模塊處理完成的數據,自己再加工加工,扔給下一個模塊。

我們很容易總結出每個模塊的流程:
- 從上一個模塊拉取數據
- 處理數據
- 將處理完成的數據發(fā)給下一個模塊
其中流程1和3代表兩個模塊間的數據交互,這種數據交互往往會采用一些中間件(middleware)。比如模塊1和模塊2間數據可能使用的是kafka,模塊1向kafka中push數據,模塊2向kafka中poll數據。而模塊2和模塊3可能使用的是rabbitMQ。很明顯,它們的功能都是一樣的:**提供數據的流向,讓數據可以流入自己同時又可以從自己流出發(fā)給別人。**但由于中間件的不同,需要使用不同的API。 為了消除這種數據流入(輸入)和數據流出(輸出)實現上的差異性,因此便出現了Spring Cloud Stream。
2.環(huán)境準備
采用docker-compose搭建kafaka環(huán)境
version: '3'
networks:
kafka:
ipam:
driver: default
config:
- subnet: "172.22.6.0/24"
services:
zookepper:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest
container_name: zookeeper-server
restart: unless-stopped
volumes:
- "/etc/localtime:/etc/localtime"
environment:
ALLOW_ANONYMOUS_LOGIN: yes
ports:
- "2181:2181"
networks:
kafka:
ipv4_address: 172.22.6.11
kafka:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1
container_name: kafka
restart: unless-stopped
volumes:
- "/etc/localtime:/etc/localtime"
environment:
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092
ports:
- "9092:9092"
depends_on:
- zookepper
networks:
kafka:
ipv4_address: 172.22.6.12
kafka-map:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map
container_name: kafka-map
restart: unless-stopped
volumes:
- "./kafka/kafka-map/data:/usr/local/kafka-map/data"
environment:
DEFAULT_USERNAME: admin
DEFAULT_PASSWORD: 123456
ports:
- "9080:8080"
depends_on:
- kafka
networks:
kafka:
ipv4_address: 172.22.6.13
run
docker-compose -f docker-compose-kafka.yml -p kafka up -d
kafka-map
https://github.com/dushixiang/kafka-map
- 訪問:http://127.0.0.1:9080
- 賬號密碼:admin/123456
3.代碼工程

實驗目標
- 生成UUID并將其發(fā)送到Kafka主題
batch-in。 - 從
batch-in主題接收UUID的批量消息,移除其中的數字,并將結果發(fā)送到batch-out主題。 - 監(jiān)聽
batch-out主題并打印接收到的消息。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-kafaka</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Starter Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
處理流
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.et;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* @author Steven Gantz
*/
@SpringBootApplication
public class CloudStreamsFunctionBatch {
public static void main(String[] args) {
SpringApplication.run(CloudStreamsFunctionBatch.class, args);
}
@Bean
public Supplier<UUID> stringSupplier() {
return () -> {
var uuid = UUID.randomUUID();
System.out.println(uuid + " -> batch-in");
return uuid;
};
}
@Bean
public Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() {
return idBatch -> {
System.out.println("Removed digits from batch of " + idBatch.size());
return idBatch.stream()
.map(UUID::toString)
// Remove all digits from the UUID
.map(uuid -> uuid.replaceAll("\\d",""))
.map(noDigitString -> MessageBuilder.withPayload(noDigitString).build())
.toList();
};
}
@KafkaListener(id = "batch-out", topics = "batch-out")
public void listen(String in) {
System.out.println("batch-out -> " + in);
}
}
定義一個名為
stringSupplier的Bean,它實現了Supplier<UUID>接口。這個方法生成一個隨機的UUID,并打印到控制臺,表示這個UUID將被發(fā)送到batch-in主題。定義一個名為
digitRemovingConsumer的Bean,它實現了Function<List<UUID>, List<Message<String>>>接口。這個方法接受一個UUID的列表,打印出處理的UUID數量,然后將每個UUID轉換為字符串,移除其中的所有數字,最后將結果封裝為消息并返回。使用
@KafkaListener注解定義一個Kafka監(jiān)聽器,監(jiān)聽batch-out主題。當接收到消息時,調用listen方法并打印接收到的消息內容。
配置文件
spring:
cloud:
function:
definition: stringSupplier;digitRemovingConsumer
stream:
bindings:
stringSupplier-out-0:
destination: batch-in
digitRemovingConsumer-in-0:
destination: batch-in
group: batch-in
consumer:
batch-mode: true
digitRemovingConsumer-out-0:
destination: batch-out
kafka:
binder:
brokers: localhost:9092
bindings:
digitRemovingConsumer-in-0:
consumer:
configuration:
# Forces consumer to wait 5 seconds before polling for messages
fetch.max.wait.ms: 5000
fetch.min.bytes: 1000000000
max.poll.records: 10000000
參數解釋
spring:
cloud:
function:
definition: stringSupplier;digitRemovingConsumer
spring.cloud.function.definition:定義了兩個函數,stringSupplier和digitRemovingConsumer。這兩個函數將在應用程序中被使用。
stream:
bindings:
stringSupplier-out-0:
destination: batch-in
stream.bindings.stringSupplier-out-0.destination:將stringSupplier函數的輸出綁定到Kafka主題batch-in。
digitRemovingConsumer-in-0:
destination: batch-in
group: batch-in
consumer:
batch-mode: true
stream.bindings.digitRemovingConsumer-in-0.destination:將digitRemovingConsumer函數的輸入綁定到Kafka主題batch-in。group: batch-in:指定消費者組為batch-in,這意味著多個實例可以共享這個組來處理消息。consumer.batch-mode: true:啟用批處理模式,允許消費者一次處理多條消息。
digitRemovingConsumer-out-0:
destination: batch-out
stream.bindings.digitRemovingConsumer-out-0.destination:將digitRemovingConsumer函數的輸出綁定到Kafka主題batch-out。
以上只是一些關鍵代碼
4.測試
啟動弄Spring Boot應用,可以看到控制臺輸出日志如下:
291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in Removed digits from batch of 5 batch-out -> eacc-ee-dfb-b-dead batch-out -> cbae-e-f-c-acfb batch-out -> ab-dd---adade batch-out -> db-fb-f-bbb-bfdec batch-out -> bdb--d-ad-bbbb
以上就是Spring Cloud Stream實現數據流處理的詳細內容,更多關于Spring Cloud Stream數據流處理的資料請關注腳本之家其它相關文章!
相關文章
Java操作mongodb增刪改查的基本操作實戰(zhàn)指南
MongoDB是一個基于分布式文件存儲的數據庫,由c++語言編寫,旨在為WEB應用提供可擴展的高性能數據存儲解決方案,下面這篇文章主要給大家介紹了關于Java操作mongodb增刪改查的基本操作實戰(zhàn)指南,需要的朋友可以參考下2023-05-05
springboot多個service互相調用的事務處理方式
這篇文章主要介紹了springboot多個service互相調用的事務處理方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02

