如何使用Flink CDC實(shí)現(xiàn) Oracle數(shù)據(jù)庫(kù)數(shù)據(jù)同步
前言
Flink CDC 是一個(gè)基于流的數(shù)據(jù)集成工具,旨在為用戶提供一套功能更加全面的編程接口(API)。 該工具使得用戶能夠以 YAML 配置文件的形式實(shí)現(xiàn)數(shù)據(jù)庫(kù)同步,同時(shí)也提供了Flink CDC Source Connector API。 Flink CDC 在任務(wù)提交過(guò)程中進(jìn)行了優(yōu)化,并且增加了一些高級(jí)特性,如表結(jié)構(gòu)變更自動(dòng)同步(Schema Evolution)、數(shù)據(jù)轉(zhuǎn)換(Data Transformation)、整庫(kù)同步(Full Database Synchronization)以及 精確一次(Exactly-once)語(yǔ)義。
本文通過(guò)flink-connector-oracle-cdc來(lái)實(shí)現(xiàn)Oracle數(shù)據(jù)庫(kù)的數(shù)據(jù)同步。
一、開啟歸檔日志
1)數(shù)據(jù)庫(kù)服務(wù)器終端,使用sysdba角色連接數(shù)據(jù)庫(kù)
sqlplus / as sysdba 或 sqlplus /nolog CONNECT sys/password AS SYSDBA;
2)檢查歸檔日志是否開啟
archive log list;
(“Database log mode: No Archive Mode”,日志歸檔未開啟)
(“Database log mode: Archive Mode”,日志歸檔已開啟)
3)啟用歸檔日志
alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile; shutdown immediate; startup mount; alter database archivelog; alter database open;
注意:
啟用歸檔日志需要重啟數(shù)據(jù)庫(kù)。
歸檔日志會(huì)占用大量的磁盤空間,應(yīng)定期清除過(guò)期的日志文件
4)啟動(dòng)完成后重新執(zhí)行 archive log list; 查看歸檔打開狀態(tài)
二、創(chuàng)建flinkcdc專屬用戶
2.1 對(duì)于Oracle 非CDB數(shù)據(jù)庫(kù),執(zhí)行如下sql
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flinkuser; GRANT SET CONTAINER TO flinkuser; GRANT SELECT ON V_$DATABASE to flinkuser; GRANT FLASHBACK ANY TABLE TO flinkuser; GRANT SELECT ANY TABLE TO flinkuser; GRANT SELECT_CATALOG_ROLE TO flinkuser; GRANT EXECUTE_CATALOG_ROLE TO flinkuser; GRANT SELECT ANY TRANSACTION TO flinkuser; GRANT LOGMINING TO flinkuser; GRANT ANALYZE ANY TO flinkuser; GRANT CREATE TABLE TO flinkuser; -- need not to execute if set scan.incremental.snapshot.enabled=true(default) GRANT LOCK ANY TABLE TO flinkuser; GRANT ALTER ANY TABLE TO flinkuser; GRANT CREATE SEQUENCE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser; GRANT SELECT ON V_$LOGFILE TO flinkuser; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
2.2 對(duì)于Oracle CDB數(shù)據(jù)庫(kù),執(zhí)行如下sql
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL; GRANT CREATE SESSION TO flinkuser CONTAINER=ALL; GRANT SET CONTAINER TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL; GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL; GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL; GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL; GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL; GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL; GRANT LOGMINING TO flinkuser CONTAINER=ALL; GRANT CREATE TABLE TO flinkuser CONTAINER=ALL; -- need not to execute if set scan.incremental.snapshot.enabled=true(default) GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL; GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
三、指定oracle表、庫(kù)級(jí)啟用
-- 指定表啟用補(bǔ)充日志記錄: ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- 為數(shù)據(jù)庫(kù)的所有表啟用 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- 指定數(shù)據(jù)庫(kù)啟用補(bǔ)充日志記錄 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
四、使用flink-connector-oracle-cdc實(shí)現(xiàn)數(shù)據(jù)庫(kù)同步
4.1 引入pom依賴
<dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-oracle-cdc</artifactId> <version>2.4.0</version> </dependency>
4.2 Java主代碼
package test.datastream.cdc.oracle; import com.ververica.cdc.connectors.oracle.OracleSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.types.Row; import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction; import test.datastream.cdc.oracle.function.CdcString2RowMap; import test.datastream.cdc.oracle.function.DbCdcSinkFunction; import java.util.Properties; public class OracleCdcExample { public static void main(String[] args) throws Exception { Properties properties = new Properties(); //數(shù)字類型數(shù)據(jù) 轉(zhuǎn)換為字符 properties.setProperty("decimal.handling.mode", "string"); SourceFunction<String> sourceFunction = OracleSource.<String>builder() // .startupOptions(StartupOptions.latest()) // 從最晚位點(diǎn)啟動(dòng) .url("jdbc:oracle:thin:@localhost:1521:orcl") .port(1521) .database("ORCL") // monitor XE database .schemaList("c##flink_user") // monitor inventory schema .tableList("c##flink_user.TEST2") // monitor products table .username("c##flink_user") .password("flinkpw") .debeziumProperties(properties) .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering SingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap()); SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new CacheDataAllWindowFunction()); //批量同步 winStream.addSink(new DbCdcSinkFunction(null)); env.execute(); } }
4.3json轉(zhuǎn)換為row
package test.datastream.cdc.oracle.function; import cn.com.victorysoft.common.configuration.VsConfiguration; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import test.datastream.cdc.CdcConstants; import java.sql.Timestamp; import java.util.HashMap; import java.util.Map; import java.util.Set; /** * @desc cdc json解析,并轉(zhuǎn)換為Row */ public class CdcString2RowMap extends RichFlatMapFunction<String, Row> { private Map<String,Integer> columnMap =new HashMap<>(); @Override public void open(Configuration parameters) throws Exception { columnMap.put("ID",0); columnMap.put("NAME",1); columnMap.put("DESCRIPTION",2); columnMap.put("AGE",3); columnMap.put("CREATE_TIME",4); columnMap.put("SCORE",5); columnMap.put("C_1",6); columnMap.put("B_1",7); } @Override public void flatMap(String s, Collector<Row> collector) throws Exception { System.out.println("receive: "+s); VsConfiguration conf=VsConfiguration.from(s); String op = conf.getString(CdcConstants.K_OP); VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE); VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER); Row row =null; if(CdcConstants.OP_C.equals(op)){ //插入,使用after數(shù)據(jù) row = convertToRow(after); row.setKind(RowKind.INSERT); }else if(CdcConstants.OP_U.equals(op)){ //更新,使用after數(shù)據(jù) row = convertToRow(after); row.setKind(RowKind.UPDATE_AFTER); }else if(CdcConstants.OP_D.equals(op)){ //刪除,使用before數(shù)據(jù) row = convertToRow(before); row.setKind(RowKind.DELETE); }else { //r 操作,使用after數(shù)據(jù) row = convertToRow(after); row.setKind(RowKind.INSERT); } collector.collect(row); } private Row convertToRow(VsConfiguration data){ Set<String> keys = data.getKeys(); int size = keys.size(); Row row=new Row(8); int i=0; for (String key:keys) { Integer index = this.columnMap.get(key); Object value=data.get(key); if(key.equals("CREATE_TIME")){ //long日期轉(zhuǎn)timestamp value=long2Timestamp((Long)value); } row.setField(index,value); } return row; } private static java.sql.Timestamp long2Timestamp(Long time){ Timestamp timestamp = new Timestamp(time/1000); System.out.println(timestamp); return timestamp; } }
到此這篇關(guān)于使用Flink CDC實(shí)現(xiàn) Oracle數(shù)據(jù)庫(kù)數(shù)據(jù)同步的文章就介紹到這了,更多相關(guān)Flink CDC Oracle數(shù)據(jù)同步內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Oracle數(shù)據(jù)庫(kù)批量變更字段類型的實(shí)現(xiàn)步驟
我有個(gè)項(xiàng)目使用Oracle數(shù)據(jù)庫(kù),運(yùn)行幾年后數(shù)據(jù)量較大,需要對(duì)數(shù)據(jù)庫(kù)做一次優(yōu)化,其中有些字段類型類型需要調(diào)整,這里分享一下實(shí)現(xiàn)步驟,感興趣的朋友可以參考下2024-02-02Oracle批量查詢、刪除、更新使用BULK COLLECT提高效率
BULK COLLECT(成批聚合類型)和數(shù)組集合type類型is table of 表%rowtype index by binary_integer用法筆記。對(duì)oracle批量查詢 刪除 更新使用bulk collect提高效果的相關(guān)知識(shí)感興趣的朋友通過(guò)本文一起學(xué)習(xí)吧2017-04-04支持在線寫SQL的Oracle學(xué)習(xí)免費(fèi)網(wǎng)站推薦!(個(gè)人常使用)
為了便于在線練習(xí),我們常常需要一個(gè)在線運(yùn)行SQL的功能,下面這篇文章主要給大家推薦介紹了一個(gè)支持在線寫SQL的Oracle學(xué)習(xí)免費(fèi)網(wǎng)站(個(gè)人常使用),需要的朋友可以參考下2022-12-12Linux ORCLE數(shù)據(jù)庫(kù)增量備份腳本
Linux下ORCLE數(shù)據(jù)庫(kù)增量備份腳本 (基礎(chǔ)篇) ,需要的朋友可以參考下。2009-11-11Oracle Max函數(shù)使用中出現(xiàn)的問(wèn)題
在Oracle Max函數(shù)使用中,會(huì)出現(xiàn)一些問(wèn)題,本文將介紹解決方法,需要的朋友可以了解下2012-11-11Oracle如何給數(shù)據(jù)庫(kù)添加約束過(guò)程解析
這篇文章主要介紹了Oracle如何給數(shù)據(jù)庫(kù)添加約束過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09Oracle In和exists not in和not exists的比較分析
一個(gè)是問(wèn)in exist的區(qū)別,一個(gè)是not in和not exists的區(qū)別2009-08-08Oracle基礎(chǔ)多條sql執(zhí)行在中間的語(yǔ)句出現(xiàn)錯(cuò)誤時(shí)的控制方式
今天小編就為大家分享一篇關(guān)于Oracle基礎(chǔ)多條sql執(zhí)行在中間的語(yǔ)句出現(xiàn)錯(cuò)誤時(shí)的控制方式,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-12-12