欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

高效的數據同步工具DataX的使用及實現示例

 更新時間:2022年03月21日 15:01:27   作者:愿許浪盡天涯  
這篇文章主要為大家介紹了高效的數據同步工具DataX的使用及實現示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言

我們公司有個項目的數據量高達五千萬,但是因為報表那塊數據不太準確,業(yè)務庫和報表庫又是跨庫操作,所以并不能使用 SQL 來進行同步。當時的打算是通過 mysqldump 或者存儲的方式來進行同步,但是嘗試后發(fā)現這些方案都不切實際:

mysqldump:不僅備份需要時間,同步也需要時間,而且在備份的過程,可能還會有數據產出(也就是說同步等于沒同步)

存儲方式:這個效率太慢了,要是數據量少還好,我們使用這個方式的時候,三個小時才同步兩千條數據…

后面在網上查看后,發(fā)現 DataX 這個工具用來同步不僅速度快,而且同步的數據量基本上也相差無幾。

一、DataX 簡介

DataX 是阿里云 DataWorks 數據集成 的開源版本,主要就是用于實現數據間的離線同步。 DataX 致力于實現包括關系型數據庫(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等 各種異構數據源(即不同的數據庫) 間穩(wěn)定高效的數據同步功能。

在這里插入圖片描述

為了 解決異構數據源同步問題,DataX 將復雜的網狀同步鏈路變成了星型數據鏈路,DataX 作為中間傳輸載體負責連接各種數據源;

當需要接入一個新的數據源時,只需要將此數據源對接到 DataX,便能跟已有的數據源作為無縫數據同步。

1.DataX3.0 框架設計

DataX 采用 Framework + Plugin 架構,將數據源讀取和寫入抽象稱為 Reader/Writer 插件,納入到整個同步框架中。

在這里插入圖片描述

角色作用
Reader(采集模塊)負責采集數據源的數據,將數據發(fā)送給 Framework
Writer(寫入模塊)負責不斷向 Framework 中取數據,并將數據寫入到目的端。
Framework(中間商)負責連接 ReaderWriter,作為兩者的數據傳輸通道,并處理緩沖,流控,并發(fā),數據轉換等核心技術問題。

2.DataX3.0 核心架構

DataX 完成單個數據同步的作業(yè),我們稱為 Job,DataX 接收到一個 Job 后,將啟動一個進程來完成整個作業(yè)同步過程。DataX Job 模塊是單個作業(yè)的中樞管理節(jié)點,承擔了數據清理、子任務切分、TaskGroup 管理等功能。

在這里插入圖片描述

DataX Job 啟動后,會根據不同源端的切分策略,將 Job 切分成多個小的 Task (子任務),以便于并發(fā)執(zhí)行。

接著 DataX Job 會調用 Scheduler 模塊,根據配置的并發(fā)數量,將拆分成的 Task 重新組合,組裝成 TaskGroup(任務組)

每一個 Task 都由 TaskGroup 負責啟動,Task 啟動后,會固定啟動 Reader --> Channel --> Writer 線程來完成任務同步工作。

DataX 作業(yè)運行啟動后,Job 會對 TaskGroup 進行監(jiān)控操作,等待所有 TaskGroup 完成后,Job 便會成功退出(異常退出時 值非 0)

DataX 調度過程:

首先 DataX Job 模塊會根據分庫分表切分成若干個 Task,然后根據用戶配置并發(fā)數,來計算需要分配多少個 TaskGroup;

計算過程:Task / Channel = TaskGroup,最后由 TaskGroup 根據分配好的并發(fā)數來運行 Task(任務)

二、使用 DataX 實現數據同步

準備工作:

  • JDK(1.8 以上,推薦 1.8)
  • Python(2,3 版本都可以)
  • Apache Maven 3.x(Compile DataX)(手動打包使用,使用 tar 包方式不需要安裝)
主機名操作系統(tǒng)IP 地址軟件包
MySQL-1CentOS 7.4192.168.1.1jdk-8u181-linux-x64.tar.gz datax.tar.gz
MySQL-2CentOS 7.4192.168.1.2 

安裝 JDK:下載地址(需要創(chuàng)建 Oracle 賬號)

[root@MySQL-1 ~]# ls
anaconda-ks.cfg  jdk-8u181-linux-x64.tar.gz
[root@MySQL-1 ~]# tar zxf jdk-8u181-linux-x64.tar.gz 
[root@DataX ~]# ls
anaconda-ks.cfg  jdk1.8.0_181  jdk-8u181-linux-x64.tar.gz
[root@MySQL-1 ~]# mv jdk1.8.0_181 /usr/local/java
[root@MySQL-1 ~]# cat <<END >> /etc/profile
export JAVA_HOME=/usr/local/java
export PATH=$PATH:"$JAVA_HOME/bin"
END
[root@MySQL-1 ~]# source /etc/profile
[root@MySQL-1 ~]# java -version

因為 CentOS 7 上自帶 Python 2.7 的軟件包,所以不需要進行安裝。

1.Linux 上安裝 DataX 軟件

[root@MySQL-1 ~]# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
[root@MySQL-1 ~]# tar zxf datax.tar.gz -C /usr/local/
[root@MySQL-1 ~]# rm -rf /usr/local/datax/plugin/*/._*						# 需要刪除隱藏文件 (重要)

當未刪除時,可能會輸出:

[/usr/local/datax/plugin/reader/._drdsreader/plugin.json] 不存在. 請檢查您的配置文件.

驗證:

[root@MySQL-1 ~]# cd /usr/local/datax/bin
[root@MySQL-1 ~]# python datax.py ../job/job.json							# 用來驗證是否安裝成功

輸出:

2021-12-13 19:26:28.828 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-13 19:26:28.829 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.060s |  All Task WaitReaderTime 0.068s | Percentage 100.00%
2021-12-13 19:26:28.829 [job-0] INFO  JobContainer - 
任務啟動時刻                    : 2021-12-13 19:26:18
任務結束時刻                    : 2021-12-13 19:26:28
任務總計耗時                    :                 10s
任務平均流量                    :          253.91KB/s
記錄寫入速度                    :          10000rec/s
讀出記錄總數                    :              100000
讀寫失敗總數                    :                   0

2.DataX 基本使用

查看 streamreader --> streamwriter 的模板:

[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r streamreader -w streamwriter

輸出:

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 
Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根據模板編寫 json 文件

[root@MySQL-1 ~]# cat <<END > test.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [								# 同步的列名 (* 表示所有)
			    {
			        "type":"string",
				"value":"Hello."
			    },
			    {
			        "type":"string",
				"value":"河北彭于晏"
			    },
			], 
                        "sliceRecordCount": "3"					# 打印數量
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "utf-8",					# 編碼
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "2"									# 并發(fā) (即 sliceRecordCount * channel = 結果)
            }
        }
    }
}

輸出:(要是復制我上面的話,需要把 # 帶的內容去掉)

在這里插入圖片描述

3.安裝 MySQL 數據庫

分別在兩臺主機上安裝:

[root@MySQL-1 ~]# yum -y install mariadb mariadb-server mariadb-libs mariadb-devel   
[root@MySQL-1 ~]# systemctl start mariadb												# 安裝 MariaDB 數據庫
[root@MySQL-1 ~]# mysql_secure_installation												# 初始化	
NOTE: RUNNING ALL PARTS OF THIS SCRIPT IS RECOMMENDED FOR ALL MariaDB
      SERVERS IN PRODUCTION USE!  PLEASE READ EACH STEP CAREFULLY!

Enter current password for root (enter for none):	     	# 直接回車
OK, successfully used password, moving on...
Set root password? [Y/n] y                       	 	 	# 配置 root 密碼
New password: 
Re-enter new password: 
Password updated successfully!
Reloading privilege tables..
 ... Success!
Remove anonymous users? [Y/n] y                			 	# 移除匿名用戶
 ... skipping.
Disallow root login remotely? [Y/n] n            		 	# 允許 root 遠程登錄
 ... skipping.
Remove test database and access to it? [Y/n] y 		     	# 移除測試數據庫
 ... skipping.
Reload privilege tables now? [Y/n] y             	     	# 重新加載表
 ... Success!

1)準備同步數據(要同步的兩臺主機都要有這個表)

MariaDB [(none)]> create database `course-study`;
Query OK, 1 row affected (0.00 sec)
MariaDB [(none)]> create table `course-study`.t_member(ID int,Name varchar(20),Email varchar(30));
Query OK, 0 rows affected (0.00 sec)

在這里插入圖片描述

因為是使用 DataX 程序進行同步的,所以需要在雙方的數據庫上開放權限:

grant all privileges on *.* to root@'%' identified by '123123';
flush privileges;

2)創(chuàng)建存儲過程:

DELIMITER $$
CREATE PROCEDURE test()
BEGIN
declare A int default 1;
while (A < 3000000)do
insert into `course-study`.t_member values(A,concat("LiSa",A),concat("LiSa",A,"@163.com"));
set A = A + 1;
END while;
END $$
DELIMITER ;

在這里插入圖片描述

3)調用存儲過程(在數據源配置,驗證同步使用):

call test();

4.通過 DataX 實 MySQL 數據同步

1)生成 MySQL 到 MySQL 同步的模板:

[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r mysqlreader -w mysqlwriter
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",							# 讀取端
                    "parameter": {
                        "column": [], 								# 需要同步的列 (* 表示所有的列)
                        "connection": [
                            {
                                "jdbcUrl": [], 						# 連接信息
                                "table": []							# 連接表
                            }
                        ], 
                        "password": "", 							# 連接用戶
                        "username": "", 							# 連接密碼
                        "where": ""									# 描述篩選條件
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter",							# 寫入端
                    "parameter": {
                        "column": [], 								# 需要同步的列
                        "connection": [
                            {
                                "jdbcUrl": "", 						# 連接信息
                                "table": []							# 連接表
                            }
                        ], 
                        "password": "", 							# 連接密碼
                        "preSql": [], 								# 同步前. 要做的事
                        "session": [], 
                        "username": "",								# 連接用戶 
                        "writeMode": ""								# 操作類型
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""										# 指定并發(fā)數
            }
        }
    }
}

2)編寫 json 文件:

[root@MySQL-1 ~]# vim install.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

3)驗證

[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py install.json

輸出:

2021-12-15 16:45:15.120 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-15 16:45:15.120 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2999999 records, 107666651 bytes | Speed 2.57MB/s, 74999 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 82.173s |  All Task WaitReaderTime 75.722s | Percentage 100.00%
2021-12-15 16:45:15.124 [job-0] INFO  JobContainer - 
任務啟動時刻                    : 2021-12-15 16:44:32
任務結束時刻                    : 2021-12-15 16:45:15
任務總計耗時                    :                 42s
任務平均流量                    :            2.57MB/s
記錄寫入速度                    :          74999rec/s
讀出記錄總數                    :             2999999
讀寫失敗總數                    :                   0
 

你們可以在目的數據庫進行查看,是否同步完成。

在這里插入圖片描述

上面的方式相當于是完全同步,但是當數據量較大時,同步的時候被中斷,是件很痛苦的事情;所以在有些情況下,增量同步還是蠻重要的。

5.使用 DataX 進行增量同步

使用 DataX 進行全量同步和增量同步的唯一區(qū)別就是:增量同步需要使用 where 進行條件篩選。(即,同步篩選后的 SQL)

1)編寫 json 文件:

[root@MySQL-1 ~]# vim where.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "where": "ID <= 1888",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

需要注意的部分就是:where(條件篩選) 和 preSql(同步前,要做的事) 參數。

2)驗證:

[root@MySQL-1 ~]# python /usr/local/data/bin/data.py where.json

輸出:

2021-12-16 17:34:38.534 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-16 17:34:38.534 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1888 records, 49543 bytes | Speed 1.61KB/s, 62 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.002s |  All Task WaitReaderTime 100.570s | Percentage 100.00%
2021-12-16 17:34:38.537 [job-0] INFO  JobContainer - 
任務啟動時刻                    : 2021-12-16 17:34:06
任務結束時刻                    : 2021-12-16 17:34:38
任務總計耗時                    :                 32s
任務平均流量                    :            1.61KB/s
記錄寫入速度                    :             62rec/s
讀出記錄總數                    :                1888
讀寫失敗總數                    :                   0

目標數據庫上查看:

在這里插入圖片描述

3)基于上面數據,再次進行增量同步:

主要是 where 配置:"where": "ID > 1888 AND ID <= 2888"						# 通過條件篩選來進行增量同步

同時需要將我上面的 preSql 刪除(因為我上面做的操作時 truncate 表)

在這里插入圖片描述

以上就是高效的數據同步工具DataX的使用及實現示例的詳細內容,更多關于DataX數據同步工具的資料請關注腳本之家其它相關文章!

相關文章

  • DBCC SHRINKDATABASEMS SQL數據庫日志壓縮方法

    DBCC SHRINKDATABASEMS SQL數據庫日志壓縮方法

    DBCC SHRINKDATABASEMS SQL數據庫日志壓縮方法...
    2007-07-07
  • 詳解通過SQL進行分布式死鎖的檢測與消除

    詳解通過SQL進行分布式死鎖的檢測與消除

    本文主要介紹在 GaussDB(DWS) 中,如何通過 SQL 語句,對分布式死鎖進行檢測和恢復。
    2021-05-05
  • 在PostgreSQL中使用日期類型時一些需要注意的地方

    在PostgreSQL中使用日期類型時一些需要注意的地方

    這篇文章主要介紹了在PostgreSQL中使用日期類型時一些需要注意的地方,包括時間戳和日期轉換等方面,需要的朋友可以參考下
    2015-04-04
  • Navicat12.1系列破解激活教程親測有效

    Navicat12.1系列破解激活教程親測有效

    這篇文章主要介紹了 Navicat12.1系列破解激活教程親測有效,本文給大家介紹的非常詳細,對大家的學習或工作工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-11-11
  • 免費開源數據庫:SQLite、MySQL和PostgreSQL的優(yōu)缺點

    免費開源數據庫:SQLite、MySQL和PostgreSQL的優(yōu)缺點

    對于處理大規(guī)模數據和高并發(fā)訪問的場景,MySQL和PostgreSQL更適合,SQLite在小型應用程序或嵌入式設備中是一種輕量級、簡單和易于使用的選擇,根據具體的應用需求和場景特點,選擇合適的開源關系型數據庫可以提供更好的性能、可擴展性和靈活性
    2024-02-02
  • 詳細聊聊關于sql注入的一些零散知識點

    詳細聊聊關于sql注入的一些零散知識點

    SQL注入攻擊是通過將惡意的SQL查詢或添加語句插入到應用的輸入參數中,再在后臺SQL服務器上解析執(zhí)行進行的攻擊,它目前是黑客對數據庫進行攻擊的最常用的手段之一,這篇文章主要給大家介紹了關于sql注入的一些零散知識點,需要的朋友可以參考下
    2021-10-10
  • mybatis 項目配置文件實例詳解

    mybatis 項目配置文件實例詳解

    這篇文章主要介紹了mybatis 項目配置文件實例詳解的相關資料,需要的朋友可以參考下
    2017-03-03
  • 數據庫連接池Druid與Hikari對比詳解

    數據庫連接池Druid與Hikari對比詳解

    這篇文章主要為大家介紹了數據庫連接池Druid與Hikari對比詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-02-02
  • Apache?Doris?Join?優(yōu)化原理詳解

    Apache?Doris?Join?優(yōu)化原理詳解

    這篇文章主要為大家介紹了Apache?Doris?Join?優(yōu)化原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-10-10
  • 大數據時代的數據庫選擇:SQL還是NoSQL?

    大數據時代的數據庫選擇:SQL還是NoSQL?

    執(zhí)行大數據項目的企業(yè)面對的關鍵決策之一是使用哪個數據庫,SQL還是NoSQL?SQL有著驕人的業(yè)績,龐大的安裝基礎;而NoSQL正在獲得可觀的收益,且有很多支持者。我們來看看兩位專家對這個問題的看法
    2014-03-03

最新評論