欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何使用Flink CDC實現(xiàn) Oracle數(shù)據(jù)庫數(shù)據(jù)同步

 更新時間:2024年08月21日 11:55:13   作者:shandongwill  
Flink CDC是一個基于流的數(shù)據(jù)集成工具,為用戶提供一套功能全面的編程接口API, 該工具使得用戶能夠以YAML 配置文件的形式實現(xiàn)數(shù)據(jù)庫同步,同時也提供了Flink CDC Source Connector API,本文給大家介紹使用Flink CDC實現(xiàn) Oracle數(shù)據(jù)庫數(shù)據(jù)同步的方法,感興趣的朋友一起看看吧

前言

Flink CDC 是一個基于流的數(shù)據(jù)集成工具,旨在為用戶提供一套功能更加全面的編程接口(API)。 該工具使得用戶能夠以 YAML 配置文件的形式實現(xiàn)數(shù)據(jù)庫同步,同時也提供了Flink CDC Source Connector API。 Flink CDC 在任務(wù)提交過程中進(jìn)行了優(yōu)化,并且增加了一些高級特性,如表結(jié)構(gòu)變更自動同步(Schema Evolution)、數(shù)據(jù)轉(zhuǎn)換(Data Transformation)、整庫同步(Full Database Synchronization)以及 精確一次(Exactly-once)語義。
本文通過flink-connector-oracle-cdc來實現(xiàn)Oracle數(shù)據(jù)庫的數(shù)據(jù)同步。

一、開啟歸檔日志

1)數(shù)據(jù)庫服務(wù)器終端,使用sysdba角色連接數(shù)據(jù)庫

 sqlplus / as sysdba
或
sqlplus /nolog
CONNECT sys/password AS SYSDBA;

2)檢查歸檔日志是否開啟

archive log list;

(“Database log mode: No Archive Mode”,日志歸檔未開啟)
(“Database log mode: Archive Mode”,日志歸檔已開啟)
3)啟用歸檔日志

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

注意:
啟用歸檔日志需要重啟數(shù)據(jù)庫。
歸檔日志會占用大量的磁盤空間,應(yīng)定期清除過期的日志文件
4)啟動完成后重新執(zhí)行 archive log list; 查看歸檔打開狀態(tài)

二、創(chuàng)建flinkcdc專屬用戶

2.1 對于Oracle 非CDB數(shù)據(jù)庫,執(zhí)行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
  GRANT CREATE SESSION TO flinkuser;
  GRANT SET CONTAINER TO flinkuser;
  GRANT SELECT ON V_$DATABASE to flinkuser;
  GRANT FLASHBACK ANY TABLE TO flinkuser;
  GRANT SELECT ANY TABLE TO flinkuser;
  GRANT SELECT_CATALOG_ROLE TO flinkuser;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
  GRANT SELECT ANY TRANSACTION TO flinkuser;
  GRANT LOGMINING TO flinkuser;
  GRANT ANALYZE ANY TO flinkuser;
  GRANT CREATE TABLE TO flinkuser;
  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
  GRANT LOCK ANY TABLE TO flinkuser;
  GRANT ALTER ANY TABLE TO flinkuser;
  GRANT CREATE SEQUENCE TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
  GRANT SELECT ON V_$LOG TO flinkuser;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
  GRANT SELECT ON V_$LOGFILE TO flinkuser;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

2.2 對于Oracle CDB數(shù)據(jù)庫,執(zhí)行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
  GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
  GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
  GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
  GRANT LOGMINING TO flinkuser CONTAINER=ALL;
  GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
  GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;

三、指定oracle表、庫級啟用

-- 指定表啟用補充日志記錄:
ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 為數(shù)據(jù)庫的所有表啟用
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 指定數(shù)據(jù)庫啟用補充日志記錄
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

四、使用flink-connector-oracle-cdc實現(xiàn)數(shù)據(jù)庫同步

4.1 引入pom依賴

 <dependency>
     <groupId>com.ververica</groupId>
     <artifactId>flink-connector-oracle-cdc</artifactId>
     <version>2.4.0</version>
 </dependency>

4.2 Java主代碼

package test.datastream.cdc.oracle;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Row;
import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction;
import test.datastream.cdc.oracle.function.CdcString2RowMap;
import test.datastream.cdc.oracle.function.DbCdcSinkFunction;
import java.util.Properties;
public class OracleCdcExample {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        //數(shù)字類型數(shù)據(jù) 轉(zhuǎn)換為字符
        properties.setProperty("decimal.handling.mode", "string");
        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
//                .startupOptions(StartupOptions.latest()) // 從最晚位點啟動
                .url("jdbc:oracle:thin:@localhost:1521:orcl")
                .port(1521)
                .database("ORCL") // monitor XE database
                .schemaList("c##flink_user") // monitor inventory schema
                .tableList("c##flink_user.TEST2") // monitor products table
                .username("c##flink_user")
                .password("flinkpw")
                .debeziumProperties(properties)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering
        SingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap());
        SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new CacheDataAllWindowFunction());
		//批量同步
        winStream.addSink(new DbCdcSinkFunction(null));
        env.execute();
    }
}

4.3json轉(zhuǎn)換為row

package test.datastream.cdc.oracle.function;
import cn.com.victorysoft.common.configuration.VsConfiguration;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import test.datastream.cdc.CdcConstants;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
 * @desc cdc json解析,并轉(zhuǎn)換為Row
 */
public class CdcString2RowMap extends RichFlatMapFunction<String, Row> {
    private Map<String,Integer> columnMap =new HashMap<>();
    @Override
    public void open(Configuration parameters) throws Exception {
        columnMap.put("ID",0);
        columnMap.put("NAME",1);
        columnMap.put("DESCRIPTION",2);
        columnMap.put("AGE",3);
        columnMap.put("CREATE_TIME",4);
        columnMap.put("SCORE",5);
        columnMap.put("C_1",6);
        columnMap.put("B_1",7);
    }
    @Override
    public void flatMap(String s, Collector<Row> collector) throws Exception {
        System.out.println("receive: "+s);
        VsConfiguration conf=VsConfiguration.from(s);
        String op = conf.getString(CdcConstants.K_OP);
        VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);
        VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);
        Row row =null;
        if(CdcConstants.OP_C.equals(op)){
            //插入,使用after數(shù)據(jù)
            row = convertToRow(after);
            row.setKind(RowKind.INSERT);
        }else if(CdcConstants.OP_U.equals(op)){
            //更新,使用after數(shù)據(jù)
            row = convertToRow(after);
            row.setKind(RowKind.UPDATE_AFTER);
        }else if(CdcConstants.OP_D.equals(op)){
            //刪除,使用before數(shù)據(jù)
            row = convertToRow(before);
            row.setKind(RowKind.DELETE);
        }else {
            //r 操作,使用after數(shù)據(jù)
            row = convertToRow(after);
            row.setKind(RowKind.INSERT);
        }
        collector.collect(row);
    }
    private Row convertToRow(VsConfiguration data){
        Set<String> keys = data.getKeys();
        int size = keys.size();
        Row row=new Row(8);
        int i=0;
        for (String key:keys) {
            Integer index = this.columnMap.get(key);
            Object value=data.get(key);
            if(key.equals("CREATE_TIME")){
                //long日期轉(zhuǎn)timestamp
                value=long2Timestamp((Long)value);
            }
            row.setField(index,value);
        }
        return row;
    }
    private static  java.sql.Timestamp long2Timestamp(Long time){
        Timestamp timestamp = new Timestamp(time/1000);
        System.out.println(timestamp);
        return timestamp;
    }
}

到此這篇關(guān)于使用Flink CDC實現(xiàn) Oracle數(shù)據(jù)庫數(shù)據(jù)同步的文章就介紹到這了,更多相關(guān)Flink CDC Oracle數(shù)據(jù)同步內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論