欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Oracle同步數(shù)據(jù)到kafka的方法

 更新時(shí)間:2022年02月08日 09:30:41   作者:空白葛  
本文主要介紹如何使用kafka-connect-oracle開(kāi)源工具,將Oracle?dml產(chǎn)生的數(shù)據(jù)實(shí)時(shí)同步至kafka,供kafka消費(fèi),對(duì)Oracle同步數(shù)據(jù)到kafka的方法感興趣的朋友一起看看吧

環(huán)境準(zhǔn)備

軟件準(zhǔn)備

  • CentOS Linux 7.6.1810 (2臺(tái),A主機(jī),B主機(jī))
  • Oracle 11.2.0.4(A主機(jī)安裝)
  • Kafka 2.13-2.6.0 (B主機(jī)安裝)
  • kafka-connect-oracle-master (B主機(jī)安裝,開(kāi)源程序,用于同步Oracle數(shù)據(jù)到kafka)
  • apache-maven 3.6.3 (B主機(jī)安裝,kafka-connect-oracle-master 的打包工具)
  • jdk-8u261-linux-x64.rpm (B主機(jī)安裝)

下載地址

實(shí)施過(guò)程

Oracle主機(jī)(A)配置

Oracle實(shí)例配置項(xiàng):

  • 開(kāi)啟歸檔日志
  • 開(kāi)啟附加日志
  • 創(chuàng)建kafka-connect-oracle-master連接用戶
  • 創(chuàng)建測(cè)試數(shù)據(jù)生成用戶及測(cè)試表
--開(kāi)啟歸檔日志
sqlplus / as sysdba    
SQL>shutdown immediate
SQL>startup mount
SQL>alter database archivelog;
SQL>alter database open;
--開(kāi)啟附加日志
SQL>alter database add supplemental log data (all) columns;
--創(chuàng)建kafka-connect-oracle-master連接用戶
create role logmnr_role;
grant create session to logmnr_role;
grant  execute_catalog_role,select any transaction ,select any dictionary to logmnr_role;
create user kminer identified by kminerpass;
grant  logmnr_role to kminer;
alter user kminer quota unlimited on users;
--創(chuàng)建測(cè)試數(shù)據(jù)生成用戶及測(cè)試表
create tablespace test_date datafile '/u01/app/oracle/oradata/zzsrc/test_date01.dbf' size 100M autoextend on next 10M;
create user whtest identified by whtest default tablespace test_date;
grant connect,resource to whtest;
grant execute on dbms_scheduler to whtest;
grant execute on dbms_random to whtest;
grant   create  job  to  whtest;
create table t1 (
id int ,
name char(10),
createtime date default sysdate
);
alter table WHTEST.T1  add constraint PK_ID_T1 primary key (ID)  using index   tablespace TEST_DATE;

create table t2 (
id int ,
name char(10),
createtime date default sysdate
);
alter table WHTEST.T2  add constraint PK_ID_T2 primary key (ID)  using index   tablespace TEST_DATE;
create table t3 (
id int ,
name char(10),
createtime date default sysdate
);
alter table WHTEST.T3  add constraint PK_ID_T3 primary key (ID)  using index   tablespace TEST_DATE;
begin
dbms_scheduler.create_job(
job_name=> 't1_job',
job_type=> 'PLSQL_BLOCK',
job_action =>'declare
v_id int;
v_name char(10);
begin
  for i in 1..10 loop
    v_id := round(dbms_random.value(1,1000000000));
    v_name :=round(dbms_random.value(1,1000000000));
    insert into whtest.t1 (id,name)values(v_id,v_name);
  end loop;
end;',
enabled=>true,
repeat_interval=>'sysdate + 5/86400',
comments=>'insert into t1 every 5 sec');
end;
/ 

begin
dbms_scheduler.create_job(
job_name=> 't2_job',
job_type=> 'PLSQL_BLOCK',
job_action =>'declare
v_id int;
v_name char(10);
begin
  for i in 1..10 loop
    v_id := round(dbms_random.value(1,1000000000));
    v_name :=round(dbms_random.value(1,1000000000));
    insert into whtest.t2 (id,name)values(v_id,v_name);
  end loop;
end;',
enabled=>true,
repeat_interval=>'sysdate + 5/86400',
comments=>'insert into t1 every 5 sec');
end;
/ 

begin
dbms_scheduler.create_job(
job_name=> 't3_job',
job_type=> 'PLSQL_BLOCK',
job_action =>'declare
v_id int;
v_name char(10);
begin
  for i in 1..10 loop
    v_id := round(dbms_random.value(1,1000000000));
    v_name :=round(dbms_random.value(1,1000000000));
    insert into whtest.t3 (id,name)values(v_id,v_name);
  end loop;
end;',
enabled=>true,
repeat_interval=>'sysdate + 5/86400',
comments=>'insert into t3 every 5 sec');
end;
/ 
--JOB創(chuàng)建之后,暫時(shí)先diable,待kafka配置完成之后再enable
exec DBMS_SCHEDULER.DISABLE('T1_JOB');
exec DBMS_SCHEDULER.DISABLE('T2_JOB');
exec DBMS_SCHEDULER.DISABLE('T3_JOB');

exec DBMS_SCHEDULER.ENABLE('T1_JOB');
exec DBMS_SCHEDULER.ENABLE('T2_JOB');
exec DBMS_SCHEDULER.ENABLE('T3_JOB');

Kafka主機(jī)(B)配置

? 將下載好的Kafka 2.13-2.6.0 、kafka-connect-oracle-master、apache-maven 3.6.3、JDK 1.8.0上傳至B主機(jī)/soft目錄待使用。

主機(jī)hosts文件添加解析

[root@softdelvily ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.20.44 softdelvily localhost

安裝JDK

[root@softdelvily soft]# rpm -ivh jdk-8u261-linux-x64.rpm 
warning: jdk-8u261-linux-x64.rpm: Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEY
Preparing...                          ################################# [100%]
Updating / installing...
   1:jdk1.8-2000:1.8.0_261-fcs        ################################# [100%]
Unpacking JAR files...
        tools.jar...
        plugin.jar...
        javaws.jar...
        deploy.jar...
        rt.jar...
        jsse.jar...
        charsets.jar...
        localedata.jar...

配置apache-maven工具

? 將apache-maven-3.6.3-bin.tar.gz解壓至/usr/local目錄,并設(shè)置相應(yīng)的/etc/profile環(huán)境變量。

[root@softdelvily soft]# tar xvf apache-maven-3.6.3-bin.tar.gz -C /usr/local/
apache-maven-3.6.3/README.txt
apache-maven-3.6.3/LICENSE
.....
[root@softdelvily soft]# cd /usr/local/
[root@softdelvily local]# ll
total 0
drwxr-xr-x. 6 root root  99 Sep 23 09:56 apache-maven-3.6.3
drwxr-xr-x. 2 root root   6 Apr 11  2018 bin
.....
[root@softdelvily local]# vi /etc/profile
.......
##添加如下環(huán)境變量
MAVEN_HOME=/usr/local/apache-maven-3.6.3
export MAVEN_HOME
export PATH=${PATH}:${MAVEN_HOME}/bin
[root@softdelvily local]# source /etc/profile
[root@softdelvily local]# mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/apache-maven-3.6.3
Java version: 1.8.0_262, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"

配置Kafka 2.13-2.6.0

? 解壓Kafka 2.13-2.6.0 至/usr/local目錄。

[root@softdelvily soft]# tar xvf kafka_2.13-2.6.0.tgz -C /usr/local/
kafka_2.13-2.6.0/
kafka_2.13-2.6.0/LICENSE
......
[root@softdelvily soft]# cd /usr/local/
[root@softdelvily local]# ll
total 0
drwxr-xr-x. 6 root root 99 Sep 23 09:56 apache-maven-3.6.3
drwxr-xr-x. 6 root root 89 Jul 29 02:23 kafka_2.13-2.6.0
.....

? 開(kāi)啟kafka,并創(chuàng)建對(duì)應(yīng)同步數(shù)據(jù)庫(kù)過(guò)的topic

--1、session1 啟動(dòng)ZK
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
[2020-09-23 10:06:49,158] INFO Reading configuration from: ../config/zookeeper.properties 
.......
[2020-09-23 10:06:49,311] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
--2、session2 啟動(dòng)kafka
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./kafka-server-start.sh ../config/server.properties
--3、session3 創(chuàng)建cdczztar
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cdczztar
Created topic cdczztar.
[root@softdelvily bin]# ./kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
cdczztar

配置kafka-connect-oracle-maste

解壓kafka-connect-oracle-master至/soft目錄,并配置相應(yīng)config文件,然后使用maven工具編譯程序。

--解壓zip包
[root@softdelvily soft]# unzip kafka-connect-oracle-master.zip 
[root@softdelvily soft]# ll
total 201180
-rw-r--r--. 1 root root   9506321 Sep 22 16:05 apache-maven-3.6.3-bin.tar.gz
-rw-r--r--. 1 root root 127431820 Sep  8 10:43 jdk-8u261-linux-x64.rpm
-rw-r--r--. 1 root root  65537909 Sep 22 15:59 kafka_2.13-2.6.0.tgz
drwxr-xr-x. 5 root root       107 Sep  8 15:48 kafka-connect-oracle-master
-rw-r--r--. 1 root root   3522729 Sep 22 14:14 kafka-connect-oracle-master.zip
[root@softdelvily soft]# cd kafka-connect-oracle-master/config/
[root@softdelvily config]# ll
total 4
-rw-r--r--. 1 root root 1135 Sep  8 15:48 OracleSourceConnector.properties
--調(diào)整properties配置文件
--需要調(diào)整項(xiàng)db.name.alias、topic、db.name、db.hostname、db.user、db.user.password、table.whitelist、table.blacklist信息,具體說(shuō)明參考README.md
[root@softdelvily config]# vi OracleSourceConnector.properties 
name=oracle-logminer-connector
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias=zztar
tasks.max=1
topic=cdczztar
db.name=zztar
db.hostname=192.168.xx.xx
db.port=1521
db.user=kminer
db.user.password=kminerpass
db.fetch.size=1
table.whitelist=WHTEST.T1,WHTEST.T2
table.blacklist=WHTEST.T3
parse.dml.data=true
reset.offset=true
start.scn=
multitenant=false
--編譯程序
[root@softdelvily ~]# cd /soft/kafka-connect-oracle-master
[root@softdelvily kafka-connect-oracle-master]# mvn clean package
[INFO] Scanning for projects...
.......
[INFO] Building jar: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar
with assembly file: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  94.03 s
[INFO] Finished at: 2020-09-23T10:25:52+08:00
[INFO] ------------------------------------------------------------------------

? 將如下文件復(fù)制到kafka工作目錄。

  • 復(fù)制kafka-connect-oracle-1.058.jar 和 lib/ojdbc7.jar 到$KAFKA_HOME/lib
  • 復(fù)制config/OracleSourceConnector.properties 文件到$KAFKA_HOME/config
[root@softdelvily config]# cd /soft/kafka-connect-oracle-master/target/
[root@softdelvily target]# cp kafka-connect-oracle-1.0.68.jar /usr/local/kafka_2.13-2.6.0/libs/
[root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/lib
[root@softdelvily lib]# cp ojdbc7.jar /usr/local/kafka_2.13-2.6.0/libs/
[root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/config/
[root@softdelvily config]# cp OracleSourceConnector.properties /usr/local/kafka_2.13-2.6.0/config/

啟動(dòng)kafka-connect-oracle

[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/OracleSourceConnector.properties
......
(com.ecer.kafka.connect.oracle.OracleSourceTask:187)
[2020-09-23 10:40:31,375] INFO Log Miner will start at new position SCN : 2847346 with fetch size : 1 (com.ecer.kafka.connect.oracle.OracleSourceTask:188)

啟動(dòng)kafka消費(fèi)者

[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic cdczztar

啟動(dòng)數(shù)據(jù)庫(kù)JOB

[oracle@oracle01 ~]$ sqlplus / as sysdba

SQL*Plus: Release 11.2.0.4.0 Production on Wed Sep 23 10:45:16 2020

Copyright (c) 1982, 2011, Oracle.  All rights reserved.

set pagesize 999

Connected to:
Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production
With the Partitioning, OLAP, Data Mining and Real Application Testing options

SQL> SQL> conn whtest/whtest
Connected.
SQL> exec DBMS_SCHEDULER.ENABLE('T1_JOB');

PL/SQL procedure successfully completed.

kafka消費(fèi)者界面

出現(xiàn)類似記錄,表明同步成功,數(shù)據(jù)以key:value的形式輸出。

{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"SCN"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"SQL_REDO"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"data"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"before"}],"optional":false,"name":"zztar.whtest.t1.row"},"payload":{"SCN":2847668,"SEG_OWNER":"WHTEST","TABLE_NAME":"T1","TIMESTAMP":1600829206000,"SQL_REDO":"insert into \"WHTEST\".\"T1\"(\"ID\",\"NAME\",\"CREATETIME\") values (557005146,'533888119 ',TIMESTAMP ' 2020-09-23 10:46:46')","OPERATION":"INSERT","data":{"ID":5.57005146E8,"NAME":"533888119","CREATETIME":1600829206000},"before":null}}

到此這篇關(guān)于Oracle同步數(shù)據(jù)到kafka的文章就介紹到這了,更多相關(guān)Oracle同步數(shù)據(jù)到kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Oracle 11G密碼180天過(guò)期后的修改方法

    Oracle 11G密碼180天過(guò)期后的修改方法

    在Oracle 11G 創(chuàng)建用戶時(shí)缺省密碼過(guò)期限制是180天, 如果超過(guò)180天用戶密碼未做修改則該用戶無(wú)法登錄,下面與大家分享下修改方法
    2014-07-07
  • AWR 深入分析( Automatic Workload Repository )

    AWR 深入分析( Automatic Workload Repository )

    本篇文章,小編為大家介紹一下關(guān)于AWR 深入分析( Automatic Workload Repository )有需要的朋友可以參考一下
    2013-04-04
  • Oracle數(shù)據(jù)泵實(shí)現(xiàn)不同用戶導(dǎo)入導(dǎo)出表級(jí)

    Oracle數(shù)據(jù)泵實(shí)現(xiàn)不同用戶導(dǎo)入導(dǎo)出表級(jí)

    這篇文章主要介紹了Oracle數(shù)據(jù)泵實(shí)現(xiàn)不同用戶導(dǎo)入導(dǎo)出表級(jí),文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的朋友可以參考一下
    2022-07-07
  • 使用Oracle跟蹤文件的問(wèn)題詳解

    使用Oracle跟蹤文件的問(wèn)題詳解

    從跟蹤文件的產(chǎn)生的來(lái)源來(lái)看,跟蹤文件又可以分為兩類:一類是數(shù)據(jù)庫(kù)的操作人員有意生成的;另一類則是由于出現(xiàn)了異常錯(cuò)誤,由數(shù)據(jù)庫(kù)自動(dòng)生成的,本文給大家介紹使用Oracle的跟蹤文件的方法,需要的朋友參考下吧
    2021-06-06
  • 查看Oracle中是否有鎖表的sql

    查看Oracle中是否有鎖表的sql

    查看Oracle中是否有鎖表的sql,具體是那個(gè)用戶那個(gè)進(jìn)程造成死鎖,鎖的級(jí)別等等,感興趣的朋友可以參考下
    2013-09-09
  • 如何解決Oracle數(shù)據(jù)表入庫(kù)中文亂碼問(wèn)題

    如何解決Oracle數(shù)據(jù)表入庫(kù)中文亂碼問(wèn)題

    Oracle數(shù)據(jù)庫(kù)在處理中文數(shù)據(jù)時(shí),經(jīng)常會(huì)遇到亂碼問(wèn)題,導(dǎo)致數(shù)據(jù)無(wú)法正常顯示和處理,這是因?yàn)镺racle數(shù)據(jù)庫(kù)默認(rèn)的字符集為US7ASCII,無(wú)法識(shí)別中文字符,通過(guò)修改數(shù)據(jù)庫(kù),客戶端和應(yīng)用程序字符集,將數(shù)據(jù)轉(zhuǎn)換為正確的字符集,可以避免亂碼問(wèn)題
    2024-02-02
  • 解決plsql因事務(wù)未提交造成的鎖表問(wèn)題

    解決plsql因事務(wù)未提交造成的鎖表問(wèn)題

    這篇文章主要介紹了plsql因事務(wù)未提交造成的鎖表的解決辦法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-01-01
  • 淺談oracle中單引號(hào)轉(zhuǎn)義

    淺談oracle中單引號(hào)轉(zhuǎn)義

    這篇文章主要介紹了淺談oracle中單引號(hào)轉(zhuǎn)義的香瓜內(nèi)容,涉及單引號(hào)的作用以及其具體用法,具有一定參考價(jià)值,需要的朋友可以參考下。
    2017-09-09
  • Oracle用戶密碼過(guò)期報(bào)錯(cuò)的解決辦法

    Oracle用戶密碼過(guò)期報(bào)錯(cuò)的解決辦法

    Oracle數(shù)據(jù)庫(kù) 11g默認(rèn)密碼過(guò)期時(shí)間為180天過(guò)期,針對(duì)密碼過(guò)期企業(yè)一般是采用修改密碼的方式,個(gè)人電腦上則可以將密碼過(guò)期時(shí)間修改為永久,本文給大家介紹了Oracle用戶密碼過(guò)期報(bào)錯(cuò)的解決辦法,需要的朋友可以參考下
    2024-03-03
  • Oracle開(kāi)發(fā)之分析函數(shù)(Top/Bottom N、First/Last、NTile)

    Oracle開(kāi)發(fā)之分析函數(shù)(Top/Bottom N、First/Last、NTile)

    本文主要是對(duì)Oracle分析函數(shù)查找前幾名、后幾名、最多、最少以及按層次查詢的介紹,需要的朋友可以參考下。
    2016-05-05

最新評(píng)論