Spring?Cloud?Stream實現(xiàn)數(shù)據(jù)流處理
1.什么是Spring Cloud Stream?
我看很多回答都是“為了屏蔽消息隊列的差異,使我們在使用消息隊列的時候能夠用統(tǒng)一的一套API,無需關心具體的消息隊列實現(xiàn)”。 這樣理解是有些不全面的,Spring Cloud Stream的核心是Stream,準確來講Spring Cloud Stream提供了一整套數(shù)據(jù)流走向(流向)的API, 它的最終目的是使我們不關心數(shù)據(jù)的流入和寫出,而只關心對數(shù)據(jù)的業(yè)務處理 我們舉一個例子:你們公司有一套系統(tǒng),這套系統(tǒng)由多個模塊組成,你負責其中一個模塊。數(shù)據(jù)會從第一個模塊流入,處理完后再交給下一個模塊。對于你負責的這個模塊來說,它的功能就是接收上一個模塊處理完成的數(shù)據(jù),自己再加工加工,扔給下一個模塊。

我們很容易總結(jié)出每個模塊的流程:
- 從上一個模塊拉取數(shù)據(jù)
- 處理數(shù)據(jù)
- 將處理完成的數(shù)據(jù)發(fā)給下一個模塊
其中流程1和3代表兩個模塊間的數(shù)據(jù)交互,這種數(shù)據(jù)交互往往會采用一些中間件(middleware)。比如模塊1和模塊2間數(shù)據(jù)可能使用的是kafka,模塊1向kafka中push數(shù)據(jù),模塊2向kafka中poll數(shù)據(jù)。而模塊2和模塊3可能使用的是rabbitMQ。很明顯,它們的功能都是一樣的:**提供數(shù)據(jù)的流向,讓數(shù)據(jù)可以流入自己同時又可以從自己流出發(fā)給別人。**但由于中間件的不同,需要使用不同的API。 為了消除這種數(shù)據(jù)流入(輸入)和數(shù)據(jù)流出(輸出)實現(xiàn)上的差異性,因此便出現(xiàn)了Spring Cloud Stream。
2.環(huán)境準備
采用docker-compose搭建kafaka環(huán)境
version: '3'
networks:
kafka:
ipam:
driver: default
config:
- subnet: "172.22.6.0/24"
services:
zookepper:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest
container_name: zookeeper-server
restart: unless-stopped
volumes:
- "/etc/localtime:/etc/localtime"
environment:
ALLOW_ANONYMOUS_LOGIN: yes
ports:
- "2181:2181"
networks:
kafka:
ipv4_address: 172.22.6.11
kafka:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1
container_name: kafka
restart: unless-stopped
volumes:
- "/etc/localtime:/etc/localtime"
environment:
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092
ports:
- "9092:9092"
depends_on:
- zookepper
networks:
kafka:
ipv4_address: 172.22.6.12
kafka-map:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map
container_name: kafka-map
restart: unless-stopped
volumes:
- "./kafka/kafka-map/data:/usr/local/kafka-map/data"
environment:
DEFAULT_USERNAME: admin
DEFAULT_PASSWORD: 123456
ports:
- "9080:8080"
depends_on:
- kafka
networks:
kafka:
ipv4_address: 172.22.6.13
run
docker-compose -f docker-compose-kafka.yml -p kafka up -d
kafka-map
https://github.com/dushixiang/kafka-map
- 訪問:http://127.0.0.1:9080
- 賬號密碼:admin/123456
3.代碼工程

實驗目標
- 生成UUID并將其發(fā)送到Kafka主題
batch-in。 - 從
batch-in主題接收UUID的批量消息,移除其中的數(shù)字,并將結(jié)果發(fā)送到batch-out主題。 - 監(jiān)聽
batch-out主題并打印接收到的消息。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-kafaka</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Starter Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
處理流
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.et;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* @author Steven Gantz
*/
@SpringBootApplication
public class CloudStreamsFunctionBatch {
public static void main(String[] args) {
SpringApplication.run(CloudStreamsFunctionBatch.class, args);
}
@Bean
public Supplier<UUID> stringSupplier() {
return () -> {
var uuid = UUID.randomUUID();
System.out.println(uuid + " -> batch-in");
return uuid;
};
}
@Bean
public Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() {
return idBatch -> {
System.out.println("Removed digits from batch of " + idBatch.size());
return idBatch.stream()
.map(UUID::toString)
// Remove all digits from the UUID
.map(uuid -> uuid.replaceAll("\\d",""))
.map(noDigitString -> MessageBuilder.withPayload(noDigitString).build())
.toList();
};
}
@KafkaListener(id = "batch-out", topics = "batch-out")
public void listen(String in) {
System.out.println("batch-out -> " + in);
}
}
定義一個名為
stringSupplier的Bean,它實現(xiàn)了Supplier<UUID>接口。這個方法生成一個隨機的UUID,并打印到控制臺,表示這個UUID將被發(fā)送到batch-in主題。定義一個名為
digitRemovingConsumer的Bean,它實現(xiàn)了Function<List<UUID>, List<Message<String>>>接口。這個方法接受一個UUID的列表,打印出處理的UUID數(shù)量,然后將每個UUID轉(zhuǎn)換為字符串,移除其中的所有數(shù)字,最后將結(jié)果封裝為消息并返回。使用
@KafkaListener注解定義一個Kafka監(jiān)聽器,監(jiān)聽batch-out主題。當接收到消息時,調(diào)用listen方法并打印接收到的消息內(nèi)容。
配置文件
spring:
cloud:
function:
definition: stringSupplier;digitRemovingConsumer
stream:
bindings:
stringSupplier-out-0:
destination: batch-in
digitRemovingConsumer-in-0:
destination: batch-in
group: batch-in
consumer:
batch-mode: true
digitRemovingConsumer-out-0:
destination: batch-out
kafka:
binder:
brokers: localhost:9092
bindings:
digitRemovingConsumer-in-0:
consumer:
configuration:
# Forces consumer to wait 5 seconds before polling for messages
fetch.max.wait.ms: 5000
fetch.min.bytes: 1000000000
max.poll.records: 10000000
參數(shù)解釋
spring:
cloud:
function:
definition: stringSupplier;digitRemovingConsumer
spring.cloud.function.definition:定義了兩個函數(shù),stringSupplier和digitRemovingConsumer。這兩個函數(shù)將在應用程序中被使用。
stream:
bindings:
stringSupplier-out-0:
destination: batch-in
stream.bindings.stringSupplier-out-0.destination:將stringSupplier函數(shù)的輸出綁定到Kafka主題batch-in。
digitRemovingConsumer-in-0:
destination: batch-in
group: batch-in
consumer:
batch-mode: true
stream.bindings.digitRemovingConsumer-in-0.destination:將digitRemovingConsumer函數(shù)的輸入綁定到Kafka主題batch-in。group: batch-in:指定消費者組為batch-in,這意味著多個實例可以共享這個組來處理消息。consumer.batch-mode: true:啟用批處理模式,允許消費者一次處理多條消息。
digitRemovingConsumer-out-0:
destination: batch-out
stream.bindings.digitRemovingConsumer-out-0.destination:將digitRemovingConsumer函數(shù)的輸出綁定到Kafka主題batch-out。
以上只是一些關鍵代碼
4.測試
啟動弄Spring Boot應用,可以看到控制臺輸出日志如下:
291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in Removed digits from batch of 5 batch-out -> eacc-ee-dfb-b-dead batch-out -> cbae-e-f-c-acfb batch-out -> ab-dd---adade batch-out -> db-fb-f-bbb-bfdec batch-out -> bdb--d-ad-bbbb
以上就是Spring Cloud Stream實現(xiàn)數(shù)據(jù)流處理的詳細內(nèi)容,更多關于Spring Cloud Stream數(shù)據(jù)流處理的資料請關注腳本之家其它相關文章!
相關文章
Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實例
這篇文章主要介紹了Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-12-12
Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式
這篇文章主要為大家介紹了Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-01-01
如何在Java中調(diào)用python文件執(zhí)行詳解
豐富的第三方庫使得python非常適合用于進行數(shù)據(jù)分析,最近在項目中就涉及到java調(diào)用python實現(xiàn)的算法,下面這篇文章主要給大家介紹了關于如何在Java中調(diào)用python文件執(zhí)行的相關資料,需要的朋友可以參考下2022-05-05
Java操作mongodb增刪改查的基本操作實戰(zhàn)指南
MongoDB是一個基于分布式文件存儲的數(shù)據(jù)庫,由c++語言編寫,旨在為WEB應用提供可擴展的高性能數(shù)據(jù)存儲解決方案,下面這篇文章主要給大家介紹了關于Java操作mongodb增刪改查的基本操作實戰(zhàn)指南,需要的朋友可以參考下2023-05-05
springboot多個service互相調(diào)用的事務處理方式
這篇文章主要介紹了springboot多個service互相調(diào)用的事務處理方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02

