SpringBoot集成Flink-CDC實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)數(shù)據(jù)的監(jiān)聽問題
一、什么是 CDC ?
CDC 是 Change Data Capture(變更數(shù)據(jù)獲?。?nbsp;的簡(jiǎn)稱。 核心思想是,監(jiān)測(cè)并捕獲數(shù)據(jù)庫(kù)的變動(dòng)(包括數(shù)據(jù)或數(shù)據(jù)表的插入、 更新以及刪除等),將這些變更按發(fā)生的順序完整記錄下來(lái),寫入到消息中間件中以供其他服務(wù)進(jìn)行訂閱及消費(fèi)。
二、Flink-CDC 是什么?
CDC Connectors for Apache Flink是一組用于Apache Flink 的源連接器,使用變更數(shù)據(jù)捕獲 (CDC) 從不同數(shù)據(jù)庫(kù)獲取變更。用于 Apache Flink 的 CDC 連接器將 Debezium 集成為捕獲數(shù)據(jù)更改的引擎。所以它可以充分發(fā)揮 Debezium 的能力。
大概意思就是,F(xiàn)link 社區(qū)開發(fā)了 flink-cdc-connectors 組件,這是一個(gè)可以直接從 MySQL、 PostgreSQL等數(shù)據(jù)庫(kù)直接讀取全量數(shù)據(jù)和增量變更數(shù)據(jù)的 source 組件。
Flink-CDC 開源地址: Apache/Flink-CDC
Flink-CDC 中文文檔:Apache Flink CDC | Apache Flink CDC
三、SpringBoot 整合 Flink-CDC
3.1、如何集成到SpringBoot中?
Flink 作業(yè)通常獨(dú)立于一般的服務(wù)之外,專門編寫代碼,用 Flink 命令行工具來(lái)運(yùn)行和停止。將Flink 作業(yè)集成到 Spring Boot 應(yīng)用中并不常見,而且一般也不建議這樣做,因?yàn)镕link作業(yè)一般運(yùn)行在大數(shù)據(jù)環(huán)境中。
然而,在特殊需求下,我們可以做一些改變使 Flink 應(yīng)用適應(yīng) Spring Boot 環(huán)境,比如在你的場(chǎng)景中使用 Flink CDC 進(jìn)行 數(shù)據(jù)變更捕獲。將 Flink 作業(yè)以本地項(xiàng)目的方式啟動(dòng),集成在 Spring Boot應(yīng)用中,可以使用到 Spring 的便利性。
- CommandLineRunner
- ApplicationRunner
3.2、集成舉例
1、CommandLineRunner
@SpringBootApplication public class MyApp { public static void main(String[] args) { SpringApplication.run(MyApp.class, args); } @Bean public CommandLineRunner commandLineRunner(ApplicationContext ctx) { return args -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("localhost") .port(3306) .username("flinkuser") .password("flinkpw") .databaseList("mydb") // monitor all tables under "mydb" database .tableList("mydb.table1", "mydb.table2") // monitor only "table1" and "table2" under "mydb" database .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String .build(); DataStreamSource<String> mysqlSource = env.addSource(sourceFunction); // formulate processing logic here, e.g., printing to standard output mysqlSource.print(); // execute the Flink job within the Spring Boot application env.execute("Flink CDC"); }; } }
2、ApplicationRunner
@SpringBootApplication public class FlinkCDCApplication implements ApplicationRunner { public static void main(String[] args) { SpringApplication.run(FlinkCDCApplication.class, args); } @Override public void run(ApplicationArguments args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Configure your Flink job here DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("localhost") .port(3306) .username("flinkuser") .password("flinkpw") .databaseList("mydb") // set other source options ... .deserializer(new StringDebeziumDeserializationSchema()) // Converts SourceRecord to String .build(); DataStream<String> cdcStream = env.addSource(sourceFunction); // Implement your processing logic here // For example: cdcStream.print(); // Start the Flink job within the Spring Boot application env.execute("Flink CDC with Spring Boot"); } }
這次用例采用 ApplicationRunner,不過(guò)要改變一下,讓 Flink CDC 作為 Bean 來(lái)實(shí)現(xiàn)。
四、功能實(shí)現(xiàn)
4.1、功能邏輯
總體來(lái)講,不太想把 Flink CDC單獨(dú)拉出來(lái),更想讓它依托于一個(gè)服務(wù)上,徹底當(dāng)成一個(gè)組件。
其中在生產(chǎn)者中,我們將要進(jìn)行實(shí)現(xiàn):
4.2、所需環(huán)境
- MySQL 5.7 +:確保源數(shù)據(jù)庫(kù)已經(jīng)開啟 Binlog 日志功能,并且設(shè)置 Row 格式
- Spring Boot 2.7.6:還是不要輕易使用 3.0 以上為好,有好多jar沒有適配
- RabbitMQ:適配即可
- Flink CDC:特別注意版本
4.3、Flink CDC POM依賴
<flink.version>1.13.6</flink.version> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <!--mysql -cdc--> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.10</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.5</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.10</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.42</version> </dependency>
上面是一些Flink CDC必須的依賴,當(dāng)然如果需要實(shí)現(xiàn)其他數(shù)據(jù)庫(kù),可以替換其他數(shù)據(jù)庫(kù)的CDC jar。怎么安排jar包的位置和其余需要的jar,這個(gè)可自行調(diào)整。
4.4、代碼展示
核心類
- MysqlEventListener:配置類
- MysqlDeserialization:MySQL消息讀取自定義序列化
- DataChangeInfo:封裝的變更對(duì)象
- DataChangeSink:繼承一個(gè)Flink提供的抽象類,用于定義數(shù)據(jù)的輸出或“下沉”邏輯,sink 是Flink處理流的最后階段,通常用于將數(shù)據(jù)寫入外部系統(tǒng),如數(shù)據(jù)庫(kù)、文件系統(tǒng)、消息隊(duì)列等
(1)通過(guò) ApplicationRunner 接入 SpringBoot
@Component public class MysqlEventListener implements ApplicationRunner { private final DataChangeSink dataChangeSink; public MysqlEventListener(DataChangeSink dataChangeSink) { this.dataChangeSink = dataChangeSink; } @Override public void run(ApplicationArguments args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSourceRemote(); DataStream<DataChangeInfo> streamSource = env .addSource(dataChangeInfoMySqlSource, "mysql-source") .setParallelism(1); streamSource.addSink(dataChangeSink); env.execute("mysql-stream-cdc"); } private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSourceLocal() { return MySqlSource.<DataChangeInfo>builder() .hostname("127.0.0.1") .port(3306) .username("root") .password("0507") .databaseList("flink-cdc-producer") .tableList("flink-cdc-producer.producer_content", "flink-cdc-producer.name_content") /* * initial初始化快照,即全量導(dǎo)入后增量導(dǎo)入(檢測(cè)更新數(shù)據(jù)寫入) * latest:只進(jìn)行增量導(dǎo)入(不讀取歷史變化) * timestamp:指定時(shí)間戳進(jìn)行數(shù)據(jù)導(dǎo)入(大于等于指定時(shí)間錯(cuò)讀取數(shù)據(jù)) */ .startupOptions(StartupOptions.latest()) .deserializer(new MysqlDeserialization()) .serverTimeZone("GMT+8") .build(); } }
(2)自定義 MySQL 消息讀取序列化
public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> { public static final String TS_MS = "ts_ms"; public static final String BIN_FILE = "file"; public static final String POS = "pos"; public static final String CREATE = "CREATE"; public static final String BEFORE = "before"; public static final String AFTER = "after"; public static final String SOURCE = "source"; public static final String UPDATE = "UPDATE"; /** * 反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對(duì)象 */ @Override public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) { String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); String database = fields[1]; String tableName = fields[2]; Struct struct = (Struct) sourceRecord.value(); final Struct source = struct.getStruct(SOURCE); DataChangeInfo dataChangeInfo = new DataChangeInfo(); dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString()); dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString()); //5.獲取操作類型 CREATE UPDATE DELETE Envelope.Operation operation = Envelope.operationFor(sourceRecord); // String type = operation.toString().toUpperCase(); // int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3; dataChangeInfo.setEventType(operation.name()); dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse("")); dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0)); dataChangeInfo.setDatabase(database); dataChangeInfo.setTableName(tableName); dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis)); //7.輸出數(shù)據(jù) collector.collect(dataChangeInfo); } private Struct getStruct(Struct value, String fieldElement) { return value.getStruct(fieldElement); } /** * 從元數(shù)據(jù)獲取出變更之前或之后的數(shù)據(jù) */ private JSONObject getJsonObject(Struct value, String fieldElement) { Struct element = value.getStruct(fieldElement); JSONObject jsonObject = new JSONObject(); if (element != null) { Schema afterSchema = element.schema(); List<Field> fieldList = afterSchema.fields(); for (Field field : fieldList) { Object afterValue = element.get(field); jsonObject.put(field.name(), afterValue); } } return jsonObject; } @Override public TypeInformation<DataChangeInfo> getProducedType() { return TypeInformation.of(DataChangeInfo.class); } }
(3)封裝的變更對(duì)象
@Data public class DataChangeInfo implements Serializable { /** * 變更前數(shù)據(jù) */ private String beforeData; /** * 變更后數(shù)據(jù) */ private String afterData; /** * 變更類型 1新增 2修改 3刪除 */ private String eventType; /** * binlog文件名 */ private String fileName; /** * binlog當(dāng)前讀取點(diǎn)位 */ private Integer filePos; /** * 數(shù)據(jù)庫(kù)名 */ private String database; /** * 表名 */ private String tableName; /** * 變更時(shí)間 */ private Long changeTime; }
這里的 beforeData 、afterData直接存儲(chǔ) Struct 不好嗎,還得費(fèi)勁去來(lái)回轉(zhuǎn)?
我曾嘗試過(guò)使用 Struct 存放在對(duì)象中,但是無(wú)法進(jìn)行序列化。具體原因可以網(wǎng)上搜索,或者自己嘗試一下。
(4)定義 Flink 的 Sink
@Component @Slf4j public class DataChangeSink extends RichSinkFunction<DataChangeInfo> { transient RabbitTemplate rabbitTemplate; transient ConfirmService confirmService; transient TableDataConvertService tableDataConvertService; @Override public void invoke(DataChangeInfo value, Context context) { log.info("收到變更原始數(shù)據(jù):{}", value); //轉(zhuǎn)換后發(fā)送到對(duì)應(yīng)的MQ if (MIGRATION_TABLE_CACHE.containsKey(value.getTableName())) { String routingKey = MIGRATION_TABLE_CACHE.get(value.getTableName()); //可根據(jù)需要自行進(jìn)行confirmService的設(shè)計(jì) rabbitTemplate.setReturnsCallback(confirmService); rabbitTemplate.setConfirmCallback(confirmService); rabbitTemplate.convertAndSend(EXCHANGE_NAME, routingKey, tableDataConvertService.convertSqlByDataChangeInfo(value)); } } /** * 在啟動(dòng)SpringBoot項(xiàng)目是加載了Spring容器,其他地方可以使用@Autowired獲取Spring容器中的類;但是Flink啟動(dòng)的項(xiàng)目中, * 默認(rèn)啟動(dòng)了多線程執(zhí)行相關(guān)代碼,導(dǎo)致在其他線程無(wú)法獲取Spring容器,只有在Spring所在的線程才能使用@Autowired, * 故在Flink自定義的Sink的open()方法中初始化Spring容器 */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.rabbitTemplate = ApplicationContextUtil.getBean(RabbitTemplate.class); this.confirmService = ApplicationContextUtil.getBean(ConfirmService.class); this.tableDataConvertService = ApplicationContextUtil.getBean(TableDataConvertService.class); } }
(5)數(shù)據(jù)轉(zhuǎn)換類接口和實(shí)現(xiàn)類
public interface TableDataConvertService { String convertSqlByDataChangeInfo(DataChangeInfo dataChangeInfo); }
@Service public class TableDataConvertServiceImpl implements TableDataConvertService { @Autowired Map<String, SqlGeneratorService> sqlGeneratorServiceMap; @Override public String convertSqlByDataChangeInfo(DataChangeInfo dataChangeInfo) { SqlGeneratorService sqlGeneratorService = sqlGeneratorServiceMap.get(dataChangeInfo.getEventType()); return sqlGeneratorService.generatorSql(dataChangeInfo); } }
因?yàn)樵?nbsp;dataChangeInfo 中我們有封裝對(duì)象的類型(CREATE、DELETE、UPDATE),所以我希望通過(guò)不同類來(lái)進(jìn)行不同的工作。于是就有了下面的類結(jié)構(gòu):
根據(jù) dataChangeInfo 的類型去生成對(duì)應(yīng)的 SqlGeneratorServiceImpl。
這是策略模式還是模板方法?
策略模式(Strategy Pattern)允許在運(yùn)行時(shí)選擇算法的行為。在策略模式中,定義了一系列的算法(策略),并將每一個(gè)算法封裝起來(lái),使它們可以相互替換。策略模式允許算法獨(dú)立于使用它的客戶端進(jìn)行變化。
InsertSqlGeneratorServiceImpl、UpdateSqlGeneratorServiceImpl 和 DeleteSqlGeneratorServiceImpl 各自實(shí)現(xiàn)了 SqlGeneratorService 接口,這確實(shí)表明了一種策略。每一個(gè)實(shí)現(xiàn)類表示一個(gè)特定的SQL生成策略,并且可以相互替換,只要它們遵守同一個(gè)接口。
模板方法模式(Template Method Pattern),則側(cè)重于在抽象類中定義算法的框架,讓子類實(shí)現(xiàn)算法的某些步驟而不改變算法的結(jié)構(gòu)。AbstractSqlGenerator 作為抽象類的存在是為了被繼承,但如果它不含有模板方法(即沒有定義算法骨架的方法),那它就不符合模板方法模式。
在實(shí)際應(yīng)用中,一個(gè)設(shè)計(jì)可能同時(shí)結(jié)合了多個(gè)設(shè)計(jì)模式,或者在某些情況下,一種設(shè)計(jì)模式的實(shí)現(xiàn)可能看起來(lái)與另一種模式類似。在這種情況下,若 AbstractSqlGenerator 提供了更多的共享代碼或默認(rèn)實(shí)現(xiàn)表現(xiàn)出框架角色,那么它可能更接近模板方法。而如果 AbstractSqlGenerator 僅僅作為一種接口實(shí)現(xiàn)方式,且策略之間可以相互替換,那么這確實(shí)更符合策略模式。
值得注意的是,在 TableDataConvertServiceImpl 中,我們注入了一個(gè) Map<String, SqlGeneratorService> sqlGeneratorServiceMap,通過(guò)它來(lái)進(jìn)行具體實(shí)現(xiàn)類的獲取。那么他是個(gè)什么東西呢?作用是什么呢?為什么可以通過(guò)它來(lái)獲取呢?
@Resource、@Autowired 標(biāo)注作用于 Map 類型時(shí),如果 Map 的 key 為 String 類型,則 Spring 會(huì)將容器中所有類型符合 Map 的 value 對(duì)應(yīng)的類型的 Bean 增加進(jìn)來(lái),用 Bean 的 id 或 name 作為 Map 的 key。
那么可以看到下面第六步,在進(jìn)行DeleteSqlGeneratorServiceImpl裝配的時(shí)候進(jìn)行指定了名字@Service("DELETE"),方便通過(guò)dataChangeInfo獲取。
(6)轉(zhuǎn)換類部分代碼
public interface SqlGeneratorService { String generatorSql(DataChangeInfo dataChangeInfo); } public abstract class AbstractSqlGenerator implements SqlGeneratorService { @Override public String generatorSql(DataChangeInfo dataChangeInfo) { return null; } public String quoteIdentifier(String identifier) { // 對(duì)字段名進(jìn)行轉(zhuǎn)義處理,這里簡(jiǎn)化為對(duì)其加反引號(hào) // 實(shí)際應(yīng)該處理數(shù)據(jù)庫(kù)標(biāo)識(shí)符的特殊字符 return "`" + identifier + "`"; } }
@Service("DELETE") @Slf4j public class DeleteSqlGeneratorServiceImpl extends AbstractSqlGenerator { @Override public String generatorSql(DataChangeInfo dataChangeInfo) { String beforeData = dataChangeInfo.getBeforeData(); Map<String, Object> beforeDataMap = JSONObjectUtils.JsonToMap(beforeData); StringBuilder wherePart = new StringBuilder(); for (String key : beforeDataMap.keySet()) { Object beforeValue = beforeDataMap.get(key); if ("create_time".equals(key)){ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); beforeValue = dateFormat.format(beforeValue); } if (wherePart.length() > 0) { // 不是第一個(gè)更改的字段,增加逗號(hào)分隔 wherePart.append(", "); } wherePart.append(quoteIdentifier(key)).append(" = ").append(formatValue(beforeValue)); } log.info("wherePart : {}", wherePart); return "DELETE FROM " + dataChangeInfo.getTableName() + " WHERE " + wherePart; } }
核心代碼如上所示,具體實(shí)現(xiàn)可自行設(shè)計(jì)。
五、源碼獲取
Github:incremental-sync-flink-cdc
到此這篇關(guān)于SpringBoot集成Flink-CDC,實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)數(shù)據(jù)的監(jiān)聽的文章就介紹到這了,更多相關(guān)SpringBoot集成Flink-CDC監(jiān)聽內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot中webSocket實(shí)現(xiàn)即時(shí)聊天
這篇文章主要介紹了SpringBoot中webSocket實(shí)現(xiàn)即時(shí)聊天,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04Mybatis-Plus如何使用分頁(yè)實(shí)例詳解
最近在研究mybatis,然后就去找簡(jiǎn)化mybatis開發(fā)的工具,下面這篇文章主要給大家介紹了關(guān)于Mybatis-Plus如何使用分頁(yè)的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-03-03JAVA解決在@autowired,@Resource注入為null的情況
這篇文章主要介紹了JAVA解決在@autowired,@Resource注入為null的情況,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-10-10關(guān)于Java實(shí)體類Serializable序列化接口的作用和必要性解析
序列化是將對(duì)象狀態(tài)轉(zhuǎn)化為可保持或者傳輸?shù)母袷竭^(guò)程,與序列化相反的是反序列化,完成序列化和反序列化,可以存儲(chǔ)或傳輸數(shù)據(jù),一般情況下,在定義實(shí)體類時(shí)會(huì)使用Serializable,需要的朋友可以參考下2023-05-05java中url漢字編碼互相轉(zhuǎn)換實(shí)例
這篇文章介紹了java中url漢字編碼互相轉(zhuǎn)換實(shí)例,有需要的朋友可以參考一下2013-10-10java socket實(shí)現(xiàn)聊天室 java實(shí)現(xiàn)多人聊天功能
這篇文章主要為大家詳細(xì)介紹了java socket實(shí)現(xiàn)聊天室,java實(shí)現(xiàn)多人聊天功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-07-07