SpringBoot集成Flink-CDC實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)數(shù)據(jù)的監(jiān)聽(tīng)問(wèn)題
一、什么是 CDC ?
CDC 是 Change Data Capture(變更數(shù)據(jù)獲?。?nbsp;的簡(jiǎn)稱。 核心思想是,監(jiān)測(cè)并捕獲數(shù)據(jù)庫(kù)的變動(dòng)(包括數(shù)據(jù)或數(shù)據(jù)表的插入、 更新以及刪除等),將這些變更按發(fā)生的順序完整記錄下來(lái),寫入到消息中間件中以供其他服務(wù)進(jìn)行訂閱及消費(fèi)。
二、Flink-CDC 是什么?
CDC Connectors for Apache Flink是一組用于Apache Flink 的源連接器,使用變更數(shù)據(jù)捕獲 (CDC) 從不同數(shù)據(jù)庫(kù)獲取變更。用于 Apache Flink 的 CDC 連接器將 Debezium 集成為捕獲數(shù)據(jù)更改的引擎。所以它可以充分發(fā)揮 Debezium 的能力。
大概意思就是,F(xiàn)link 社區(qū)開(kāi)發(fā)了 flink-cdc-connectors 組件,這是一個(gè)可以直接從 MySQL、 PostgreSQL等數(shù)據(jù)庫(kù)直接讀取全量數(shù)據(jù)和增量變更數(shù)據(jù)的 source 組件。

Flink-CDC 開(kāi)源地址: Apache/Flink-CDC
Flink-CDC 中文文檔:Apache Flink CDC | Apache Flink CDC
三、SpringBoot 整合 Flink-CDC
3.1、如何集成到SpringBoot中?
Flink 作業(yè)通常獨(dú)立于一般的服務(wù)之外,專門編寫代碼,用 Flink 命令行工具來(lái)運(yùn)行和停止。將Flink 作業(yè)集成到 Spring Boot 應(yīng)用中并不常見(jiàn),而且一般也不建議這樣做,因?yàn)镕link作業(yè)一般運(yùn)行在大數(shù)據(jù)環(huán)境中。
然而,在特殊需求下,我們可以做一些改變使 Flink 應(yīng)用適應(yīng) Spring Boot 環(huán)境,比如在你的場(chǎng)景中使用 Flink CDC 進(jìn)行 數(shù)據(jù)變更捕獲。將 Flink 作業(yè)以本地項(xiàng)目的方式啟動(dòng),集成在 Spring Boot應(yīng)用中,可以使用到 Spring 的便利性。
- CommandLineRunner
- ApplicationRunner
3.2、集成舉例
1、CommandLineRunner
@SpringBootApplication
public class MyApp {
public static void main(String[] args) {
SpringApplication.run(MyApp.class, args);
}
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
return args -> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("flinkuser")
.password("flinkpw")
.databaseList("mydb") // monitor all tables under "mydb" database
.tableList("mydb.table1", "mydb.table2") // monitor only "table1" and "table2" under "mydb" database
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();
DataStreamSource<String> mysqlSource = env.addSource(sourceFunction);
// formulate processing logic here, e.g., printing to standard output
mysqlSource.print();
// execute the Flink job within the Spring Boot application
env.execute("Flink CDC");
};
}
}2、ApplicationRunner
@SpringBootApplication
public class FlinkCDCApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(FlinkCDCApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure your Flink job here
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("flinkuser")
.password("flinkpw")
.databaseList("mydb")
// set other source options ...
.deserializer(new StringDebeziumDeserializationSchema()) // Converts SourceRecord to String
.build();
DataStream<String> cdcStream = env.addSource(sourceFunction);
// Implement your processing logic here
// For example:
cdcStream.print();
// Start the Flink job within the Spring Boot application
env.execute("Flink CDC with Spring Boot");
}
}這次用例采用 ApplicationRunner,不過(guò)要改變一下,讓 Flink CDC 作為 Bean 來(lái)實(shí)現(xiàn)。
四、功能實(shí)現(xiàn)
4.1、功能邏輯

總體來(lái)講,不太想把 Flink CDC單獨(dú)拉出來(lái),更想讓它依托于一個(gè)服務(wù)上,徹底當(dāng)成一個(gè)組件。
其中在生產(chǎn)者中,我們將要進(jìn)行實(shí)現(xiàn):

4.2、所需環(huán)境
- MySQL 5.7 +:確保源數(shù)據(jù)庫(kù)已經(jīng)開(kāi)啟 Binlog 日志功能,并且設(shè)置 Row 格式
- Spring Boot 2.7.6:還是不要輕易使用 3.0 以上為好,有好多jar沒(méi)有適配
- RabbitMQ:適配即可
- Flink CDC:特別注意版本
4.3、Flink CDC POM依賴
<flink.version>1.13.6</flink.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--mysql -cdc-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.42</version>
</dependency>上面是一些Flink CDC必須的依賴,當(dāng)然如果需要實(shí)現(xiàn)其他數(shù)據(jù)庫(kù),可以替換其他數(shù)據(jù)庫(kù)的CDC jar。怎么安排jar包的位置和其余需要的jar,這個(gè)可自行調(diào)整。
4.4、代碼展示
核心類
- MysqlEventListener:配置類
- MysqlDeserialization:MySQL消息讀取自定義序列化
- DataChangeInfo:封裝的變更對(duì)象
- DataChangeSink:繼承一個(gè)Flink提供的抽象類,用于定義數(shù)據(jù)的輸出或“下沉”邏輯,sink 是Flink處理流的最后階段,通常用于將數(shù)據(jù)寫入外部系統(tǒng),如數(shù)據(jù)庫(kù)、文件系統(tǒng)、消息隊(duì)列等
(1)通過(guò) ApplicationRunner 接入 SpringBoot
@Component
public class MysqlEventListener implements ApplicationRunner {
private final DataChangeSink dataChangeSink;
public MysqlEventListener(DataChangeSink dataChangeSink) {
this.dataChangeSink = dataChangeSink;
}
@Override
public void run(ApplicationArguments args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSourceRemote();
DataStream<DataChangeInfo> streamSource = env
.addSource(dataChangeInfoMySqlSource, "mysql-source")
.setParallelism(1);
streamSource.addSink(dataChangeSink);
env.execute("mysql-stream-cdc");
}
private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSourceLocal() {
return MySqlSource.<DataChangeInfo>builder()
.hostname("127.0.0.1")
.port(3306)
.username("root")
.password("0507")
.databaseList("flink-cdc-producer")
.tableList("flink-cdc-producer.producer_content", "flink-cdc-producer.name_content")
/*
* initial初始化快照,即全量導(dǎo)入后增量導(dǎo)入(檢測(cè)更新數(shù)據(jù)寫入)
* latest:只進(jìn)行增量導(dǎo)入(不讀取歷史變化)
* timestamp:指定時(shí)間戳進(jìn)行數(shù)據(jù)導(dǎo)入(大于等于指定時(shí)間錯(cuò)讀取數(shù)據(jù))
*/
.startupOptions(StartupOptions.latest())
.deserializer(new MysqlDeserialization())
.serverTimeZone("GMT+8")
.build();
}
}(2)自定義 MySQL 消息讀取序列化
public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {
public static final String TS_MS = "ts_ms";
public static final String BIN_FILE = "file";
public static final String POS = "pos";
public static final String CREATE = "CREATE";
public static final String BEFORE = "before";
public static final String AFTER = "after";
public static final String SOURCE = "source";
public static final String UPDATE = "UPDATE";
/**
* 反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對(duì)象
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct struct = (Struct) sourceRecord.value();
final Struct source = struct.getStruct(SOURCE);
DataChangeInfo dataChangeInfo = new DataChangeInfo();
dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
//5.獲取操作類型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
// String type = operation.toString().toUpperCase();
// int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
dataChangeInfo.setEventType(operation.name());
dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0));
dataChangeInfo.setDatabase(database);
dataChangeInfo.setTableName(tableName);
dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
//7.輸出數(shù)據(jù)
collector.collect(dataChangeInfo);
}
private Struct getStruct(Struct value, String fieldElement) {
return value.getStruct(fieldElement);
}
/**
* 從元數(shù)據(jù)獲取出變更之前或之后的數(shù)據(jù)
*/
private JSONObject getJsonObject(Struct value, String fieldElement) {
Struct element = value.getStruct(fieldElement);
JSONObject jsonObject = new JSONObject();
if (element != null) {
Schema afterSchema = element.schema();
List<Field> fieldList = afterSchema.fields();
for (Field field : fieldList) {
Object afterValue = element.get(field);
jsonObject.put(field.name(), afterValue);
}
}
return jsonObject;
}
@Override
public TypeInformation<DataChangeInfo> getProducedType() {
return TypeInformation.of(DataChangeInfo.class);
}
}(3)封裝的變更對(duì)象
@Data
public class DataChangeInfo implements Serializable {
/**
* 變更前數(shù)據(jù)
*/
private String beforeData;
/**
* 變更后數(shù)據(jù)
*/
private String afterData;
/**
* 變更類型 1新增 2修改 3刪除
*/
private String eventType;
/**
* binlog文件名
*/
private String fileName;
/**
* binlog當(dāng)前讀取點(diǎn)位
*/
private Integer filePos;
/**
* 數(shù)據(jù)庫(kù)名
*/
private String database;
/**
* 表名
*/
private String tableName;
/**
* 變更時(shí)間
*/
private Long changeTime;
}這里的 beforeData 、afterData直接存儲(chǔ) Struct 不好嗎,還得費(fèi)勁去來(lái)回轉(zhuǎn)?
我曾嘗試過(guò)使用 Struct 存放在對(duì)象中,但是無(wú)法進(jìn)行序列化。具體原因可以網(wǎng)上搜索,或者自己嘗試一下。
(4)定義 Flink 的 Sink
@Component
@Slf4j
public class DataChangeSink extends RichSinkFunction<DataChangeInfo> {
transient RabbitTemplate rabbitTemplate;
transient ConfirmService confirmService;
transient TableDataConvertService tableDataConvertService;
@Override
public void invoke(DataChangeInfo value, Context context) {
log.info("收到變更原始數(shù)據(jù):{}", value);
//轉(zhuǎn)換后發(fā)送到對(duì)應(yīng)的MQ
if (MIGRATION_TABLE_CACHE.containsKey(value.getTableName())) {
String routingKey = MIGRATION_TABLE_CACHE.get(value.getTableName());
//可根據(jù)需要自行進(jìn)行confirmService的設(shè)計(jì)
rabbitTemplate.setReturnsCallback(confirmService);
rabbitTemplate.setConfirmCallback(confirmService);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, routingKey, tableDataConvertService.convertSqlByDataChangeInfo(value));
}
}
/**
* 在啟動(dòng)SpringBoot項(xiàng)目是加載了Spring容器,其他地方可以使用@Autowired獲取Spring容器中的類;但是Flink啟動(dòng)的項(xiàng)目中,
* 默認(rèn)啟動(dòng)了多線程執(zhí)行相關(guān)代碼,導(dǎo)致在其他線程無(wú)法獲取Spring容器,只有在Spring所在的線程才能使用@Autowired,
* 故在Flink自定義的Sink的open()方法中初始化Spring容器
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.rabbitTemplate = ApplicationContextUtil.getBean(RabbitTemplate.class);
this.confirmService = ApplicationContextUtil.getBean(ConfirmService.class);
this.tableDataConvertService = ApplicationContextUtil.getBean(TableDataConvertService.class);
}
}(5)數(shù)據(jù)轉(zhuǎn)換類接口和實(shí)現(xiàn)類
public interface TableDataConvertService {
String convertSqlByDataChangeInfo(DataChangeInfo dataChangeInfo);
}@Service
public class TableDataConvertServiceImpl implements TableDataConvertService {
@Autowired
Map<String, SqlGeneratorService> sqlGeneratorServiceMap;
@Override
public String convertSqlByDataChangeInfo(DataChangeInfo dataChangeInfo) {
SqlGeneratorService sqlGeneratorService = sqlGeneratorServiceMap.get(dataChangeInfo.getEventType());
return sqlGeneratorService.generatorSql(dataChangeInfo);
}
}因?yàn)樵?nbsp;dataChangeInfo 中我們有封裝對(duì)象的類型(CREATE、DELETE、UPDATE),所以我希望通過(guò)不同類來(lái)進(jìn)行不同的工作。于是就有了下面的類結(jié)構(gòu):

根據(jù) dataChangeInfo 的類型去生成對(duì)應(yīng)的 SqlGeneratorServiceImpl。
這是策略模式還是模板方法?
策略模式(Strategy Pattern)允許在運(yùn)行時(shí)選擇算法的行為。在策略模式中,定義了一系列的算法(策略),并將每一個(gè)算法封裝起來(lái),使它們可以相互替換。策略模式允許算法獨(dú)立于使用它的客戶端進(jìn)行變化。
InsertSqlGeneratorServiceImpl、UpdateSqlGeneratorServiceImpl 和 DeleteSqlGeneratorServiceImpl 各自實(shí)現(xiàn)了 SqlGeneratorService 接口,這確實(shí)表明了一種策略。每一個(gè)實(shí)現(xiàn)類表示一個(gè)特定的SQL生成策略,并且可以相互替換,只要它們遵守同一個(gè)接口。
模板方法模式(Template Method Pattern),則側(cè)重于在抽象類中定義算法的框架,讓子類實(shí)現(xiàn)算法的某些步驟而不改變算法的結(jié)構(gòu)。AbstractSqlGenerator 作為抽象類的存在是為了被繼承,但如果它不含有模板方法(即沒(méi)有定義算法骨架的方法),那它就不符合模板方法模式。
在實(shí)際應(yīng)用中,一個(gè)設(shè)計(jì)可能同時(shí)結(jié)合了多個(gè)設(shè)計(jì)模式,或者在某些情況下,一種設(shè)計(jì)模式的實(shí)現(xiàn)可能看起來(lái)與另一種模式類似。在這種情況下,若 AbstractSqlGenerator 提供了更多的共享代碼或默認(rèn)實(shí)現(xiàn)表現(xiàn)出框架角色,那么它可能更接近模板方法。而如果 AbstractSqlGenerator 僅僅作為一種接口實(shí)現(xiàn)方式,且策略之間可以相互替換,那么這確實(shí)更符合策略模式。
值得注意的是,在 TableDataConvertServiceImpl 中,我們注入了一個(gè) Map<String, SqlGeneratorService> sqlGeneratorServiceMap,通過(guò)它來(lái)進(jìn)行具體實(shí)現(xiàn)類的獲取。那么他是個(gè)什么東西呢?作用是什么呢?為什么可以通過(guò)它來(lái)獲取呢?
@Resource、@Autowired 標(biāo)注作用于 Map 類型時(shí),如果 Map 的 key 為 String 類型,則 Spring 會(huì)將容器中所有類型符合 Map 的 value 對(duì)應(yīng)的類型的 Bean 增加進(jìn)來(lái),用 Bean 的 id 或 name 作為 Map 的 key。
那么可以看到下面第六步,在進(jìn)行DeleteSqlGeneratorServiceImpl裝配的時(shí)候進(jìn)行指定了名字@Service("DELETE"),方便通過(guò)dataChangeInfo獲取。
(6)轉(zhuǎn)換類部分代碼
public interface SqlGeneratorService {
String generatorSql(DataChangeInfo dataChangeInfo);
}
public abstract class AbstractSqlGenerator implements SqlGeneratorService {
@Override
public String generatorSql(DataChangeInfo dataChangeInfo) {
return null;
}
public String quoteIdentifier(String identifier) {
// 對(duì)字段名進(jìn)行轉(zhuǎn)義處理,這里簡(jiǎn)化為對(duì)其加反引號(hào)
// 實(shí)際應(yīng)該處理數(shù)據(jù)庫(kù)標(biāo)識(shí)符的特殊字符
return "`" + identifier + "`";
}
}@Service("DELETE")
@Slf4j
public class DeleteSqlGeneratorServiceImpl extends AbstractSqlGenerator {
@Override
public String generatorSql(DataChangeInfo dataChangeInfo) {
String beforeData = dataChangeInfo.getBeforeData();
Map<String, Object> beforeDataMap = JSONObjectUtils.JsonToMap(beforeData);
StringBuilder wherePart = new StringBuilder();
for (String key : beforeDataMap.keySet()) {
Object beforeValue = beforeDataMap.get(key);
if ("create_time".equals(key)){
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
beforeValue = dateFormat.format(beforeValue);
}
if (wherePart.length() > 0) {
// 不是第一個(gè)更改的字段,增加逗號(hào)分隔
wherePart.append(", ");
}
wherePart.append(quoteIdentifier(key)).append(" = ").append(formatValue(beforeValue));
}
log.info("wherePart : {}", wherePart);
return "DELETE FROM " + dataChangeInfo.getTableName() + " WHERE " + wherePart;
}
}核心代碼如上所示,具體實(shí)現(xiàn)可自行設(shè)計(jì)。
五、源碼獲取
Github:incremental-sync-flink-cdc
到此這篇關(guān)于SpringBoot集成Flink-CDC,實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)數(shù)據(jù)的監(jiān)聽(tīng)的文章就介紹到這了,更多相關(guān)SpringBoot集成Flink-CDC監(jiān)聽(tīng)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot中webSocket實(shí)現(xiàn)即時(shí)聊天
這篇文章主要介紹了SpringBoot中webSocket實(shí)現(xiàn)即時(shí)聊天,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
Mybatis-Plus如何使用分頁(yè)實(shí)例詳解
最近在研究mybatis,然后就去找簡(jiǎn)化mybatis開(kāi)發(fā)的工具,下面這篇文章主要給大家介紹了關(guān)于Mybatis-Plus如何使用分頁(yè)的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-03-03
JAVA解決在@autowired,@Resource注入為null的情況
這篇文章主要介紹了JAVA解決在@autowired,@Resource注入為null的情況,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-10-10
關(guān)于Java實(shí)體類Serializable序列化接口的作用和必要性解析
序列化是將對(duì)象狀態(tài)轉(zhuǎn)化為可保持或者傳輸?shù)母袷竭^(guò)程,與序列化相反的是反序列化,完成序列化和反序列化,可以存儲(chǔ)或傳輸數(shù)據(jù),一般情況下,在定義實(shí)體類時(shí)會(huì)使用Serializable,需要的朋友可以參考下2023-05-05
java中url漢字編碼互相轉(zhuǎn)換實(shí)例
這篇文章介紹了java中url漢字編碼互相轉(zhuǎn)換實(shí)例,有需要的朋友可以參考一下2013-10-10
java socket實(shí)現(xiàn)聊天室 java實(shí)現(xiàn)多人聊天功能
這篇文章主要為大家詳細(xì)介紹了java socket實(shí)現(xiàn)聊天室,java實(shí)現(xiàn)多人聊天功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-07-07

