Mysql遷移Postgresql的實(shí)現(xiàn)示例
原理
Mysql抽取:mysql命令重定向到操作系統(tǒng)文件,處理成csv文件;
PG裝載:copy方式將csv文件裝載進(jìn)PG。
環(huán)境準(zhǔn)備
操作系統(tǒng)(Centos7)
useradd pgload passwd pgload mkdir -p /data/etl/mysql2pg/csv mkdir -p /data/etl/mysql2pg/tmp mkdir -p /data/etl/mysql2pg/log mkdir -p /data/etl/mysql2pg/shell chown -R pgload.pgload /data/etl su - pgload touch /data/etl/mysql2pg/shell/dbmysql2pgmysqlcopy chmod +x /data/etl/mysql2pg/shell/dbmysql2pgmysqlcopy echo 'export PATH=${PATH}:/data/etl/mysql2pg/shell # mysqlselect作為mysql抽取數(shù)據(jù)的用戶 export MYSQLID=mysqlselect:000000@10.10.10.10:3306/etl # pgload為PG數(shù)據(jù)裝載的用戶 export PGID=pgload:000000@10.10.10.10:5432/etl' >> ~/.bash_profile source ~/.bash_profile
Mysql客戶端安裝
由于rpm安裝方式與系統(tǒng)自帶的mariadb有沖突,所以只有卸載mariadb才能通過rpm方式進(jìn)行安裝;
所以,在此以壓縮包的方式進(jìn)行安裝。
下載mysql客戶端,我這里下載mysql5.7.34
將下載好的壓縮包進(jìn)行一次解壓得到2個(gè)文件
將 mysql-5.7.34-el7-x86_64.tar.gz 上傳至服務(wù)器
cd /usr/local rz tar -zxvf mysql-5.7.34-el7-x86_64.tar.gz mv mysql-5.7.34-el7-x86_64 mysql-client # 配置環(huán)境變量 echo 'export PATH=$PATH:/usr/local/mysql-client/bin' >> /etc/profile source /etc/profile # 測(cè)試mysql命令 mysql -uroot -h10.10.10.10 -P3306 --database etl -e "select 1 from dual;" -p
Psql客戶端安裝
cd /opt rz rpm -ivh postgresql12-libs-12.11-1PGDG.rhel7.x86_64.rpm rpm -ivh postgresql12-12.11-1PGDG.rhel7.x86_64.rpm # 測(cè)試 psql etl -h 10.10.10.10 -p 5432 -U pgload -W
數(shù)據(jù)庫用戶
Mysql
CREATE USER 'mysqlselect'@'%' IDENTIFIED BY '000000'; GRANT SELECT ON *.* TO 'mysqlselect'@'%';
PG
--普通用戶 create role yuzhenchao with login password '000000'; create schema yuzhenchao; grant create,usage on schema yuzhenchao to yuzhenchao; grant usage on schema yuzhenchao to public; alter default privileges for role yuzhenchao revoke execute on functions from public; alter default privileges in schema yuzhenchao revoke execute on functions from public; alter default privileges in schema yuzhenchao grant select on tables to public; alter default privileges for role yuzhenchao grant select on tables to public; --集中用戶(即專門用來做數(shù)據(jù)裝載的用戶) create role pgload with login password '000000' connection limit 60; create schema pgload; grant create,usage on schema pgload to pgload; grant usage on schema pgload to public; alter default privileges for role pgload revoke execute on functions from public; alter default privileges in schema pgload revoke execute on functions from public; alter default privileges in schema pgload grant select on tables to public; alter default privileges for role pgload grant select on tables to public; --普通用戶都要?jiǎng)?chuàng)建該函數(shù) --為yuzhenchao用戶創(chuàng)建sp_exec函數(shù) create or replace function yuzhenchao.sp_exec(vsql varchar) returns void --返回空 language plpgsql security definer --定義者權(quán)限 as $function$ begin execute vsql; end; $function$ ; alter function yuzhenchao.sp_exec(varchar) owner to yuzhenchao; grant execute on function yuzhenchao.sp_exec(varchar) to yuzhenchao,pgload; create or replace function pgload.sp_exec(vsql varchar) returns void --返回空 language plpgsql security definer --定義者權(quán)限 as $function$ begin execute vsql; end; $function$ ; alter function pgload.sp_exec(varchar) owner to pgload; grant execute on function pgload.sp_exec(varchar) to pgload; --集中用戶pgload創(chuàng)建該函數(shù),新增用戶則需要增加配置重新編譯 create or replace function pgload.sp_execsql(exec_sql character varying,exec_user character varying) returns void language plpgsql security definer as $function$ /* 作者 : v-yuzhenc * 功能 : 集中處理程序,以某用戶的權(quán)限執(zhí)行某條sql語句 * exec_sql : 需要執(zhí)行的sql語句 * exec_user : 需要以哪個(gè)用戶的權(quán)限執(zhí)行該sql語句 * */ declare p_user varchar := exec_user; o_search_path varchar; begin --記錄原來的模式搜索路徑 execute 'show search_path;' into o_search_path; --臨時(shí)切換模式搜索路徑 execute 'SET search_path TO '||p_user||',public,oracle'; case p_user when 'yuzhenchao' then perform yuzhenchao.sp_exec(exec_sql); when 'pgload' then perform pgload.sp_exec(exec_sql); else raise exception '未配置該用戶:%',p_user; end case; --恢復(fù)模式搜索路徑 execute 'SET search_path TO '||o_search_path; exception when others then --恢復(fù)模式搜索路徑 execute 'SET search_path TO '||o_search_path; raise exception '%',sqlerrm; end; $function$ ; --將對(duì)應(yīng)模式的對(duì)應(yīng)模式的函數(shù)給對(duì)應(yīng)的模式的擁有者 alter function pgload.sp_execsql(varchar,varchar) owner to pgload; --將對(duì)應(yīng)模式的sp_exec函數(shù)授權(quán)給定義者和集中用戶execute權(quán)限 grant execute on function pgload.sp_execsql(varchar,varchar) to pgload;
空字符串處理成null
在pgload模式下建立函數(shù)
create or replace function replace_to_null(tablename character varying, schemaname character varying default ("current_user"())::character varying(64)) returns void language plpgsql as $function$ /* 作者 : v-yuzhenc * 功能:掃描指定表的所有varchar和text類型的字段,將字段值為''替換成null * tablename : 需要掃描的表名 * schemaname : 需要掃描的模式名 * */ declare p_tablename varchar := lower(tablename); p_schemaname varchar := lower(schemaname); p_user varchar(64) := lower(user::varchar(64));--調(diào)用者 existbj int := 0; --存在標(biāo)記 v_sql varchar; --動(dòng)態(tài)sql begin --掃描varchar和text字段 select count(1) into existbj from pg_class a inner join pg_namespace b on (a.relnamespace = b.oid) inner join pg_attribute c on (a.oid = c.attrelid) inner join pg_type d on (c.atttypid = d.oid) where c.attnum > 0 and d.typname in ('varchar','text') and a.relname = p_tablename and b.nspname = p_schemaname; --若不存在varchar或者text字段,則不做處理 if existbj = 0 then raise notice '%.%表不需要處理空字符串!',p_schemaname,p_tablename; return; end if; --拼接處理空字符串語句 select string_agg('update '||p_schemaname||'.'||p_tablename||' set '||c.attname||' = null where '||c.attname||' = '''';',chr(10)) into v_sql from pg_class a inner join pg_namespace b on (a.relnamespace = b.oid) inner join pg_attribute c on (a.oid = c.attrelid) inner join pg_type d on (c.atttypid = d.oid) where c.attnum > 0 and d.typname in ('varchar','text') and a.relname = p_tablename and b.nspname = p_schemaname; if p_user = p_schemaname then execute v_sql; execute 'analyze '||p_schemaname||'.'||p_tablename; else --通過集中處理程序執(zhí)行動(dòng)態(tài)sql perform pgload.sp_execsql(v_sql,p_schemaname); --分析表 perform pgload.sp_execsql('analyze '||p_schemaname||'.'||p_tablename,p_schemaname); end if; end; $function$ ;
導(dǎo)表腳本
dbmysql2pgmysqlcopy
#! /bin/bash showuseage() { echo "程序功能:mysql導(dǎo)出MYSQL數(shù)據(jù)庫表,copy方式導(dǎo)入PG數(shù)據(jù)庫 Useage: [dbmysql2pgmysqlcopy \${SCHEMANAME}.\${TABLENAME}] -i [:可選,源數(shù)據(jù)庫(MYSQL)帳號(hào):username:passwd@hostname:port/dbname,默認(rèn)定義在.bash_profile \${MYSQLID},不要出現(xiàn)這些字符: 冒號(hào)(:),艾特(@),空格( ),斜杠(/)] -j [:可選,目標(biāo)數(shù)據(jù)庫(PG)帳號(hào):username:passwd@hostname:port/dbname,默認(rèn)定義在.bash_profile \${PGID},不要出現(xiàn)這些字符: 冒號(hào)(:),艾特(@),空格( ),斜杠(/)] -o [:可選,指定需要導(dǎo)入到PG的schemaname,默認(rèn)為MYSQL同名的schemaname(即MYSQL的數(shù)據(jù)庫名)] -f [:可選,可指定導(dǎo)入表名,常用于不同數(shù)據(jù)庫或不同用戶同一表名沖突、源表改名不影響后續(xù)應(yīng)用、表名追加時(shí)間參數(shù)等情況,表名暫時(shí)限 定為:英文字母(不分大小寫)、數(shù)字和任意組合,禁止使用特殊字符] -8 [:可選,指定字符編碼導(dǎo)出MYSQL數(shù)據(jù),默認(rèn)utf8] -e [:可選,指定字符編碼入庫PG,默認(rèn)utf8] -u [:可選,指定表授權(quán)其他用戶,指定且多個(gè)時(shí)使用逗號(hào)分開,如:'public'、'bss,apl',不要有空格] -t [:可選,(test mod)調(diào)試模式,最多導(dǎo)出100行記錄進(jìn)行調(diào)試] -a [:可選,指定where條件內(nèi)容,如:'city_id in (0,755)'(無需轉(zhuǎn)義)] -c [:可選,不建表,直接導(dǎo)數(shù)據(jù),表結(jié)構(gòu)必須存在] -z [:可選,導(dǎo)完表后的追加操作] -I [:可選,過濾字段,建表時(shí)過濾掉過濾字段,逗號(hào)分隔,例如:serv_id,\"acc_nbr\"] -s [:可選,指定字段,建表時(shí)只導(dǎo)指定的字段,逗號(hào)分隔,例如:serv_id,\"acc_nbr\"] -d [:可選,指定字段特殊處理,原字段類型不變,字段處理后的值不能超出原來的精度,全角冒號(hào)頓號(hào)分隔,'字段名1:字段處理值1、字段名2:字段處理值2', 例如:COMPENSATETEXT:to_clob(COMPENSATETEXT)、update_time:to_date(to_char(update_time,'yyyymmdd hh24:mi:ss'),'yyyymmdd hh24:mi:ss')] -v [:可選,指定某些字段對(duì)應(yīng)PG的類型,全角冒號(hào)頓號(hào)分隔,字段名1:PG類型1、字段名2:PG類型2',例如:COMPENSATETEXT:text、update_time:date]" } # 退出之前刪除臨時(shí)文件 trap "rmtmpfile" EXIT # 進(jìn)度條程序 progress() { M=0 local MAIN_PID=$1 local MAX_SECOND=14400 local SEP_SECOND=1 if [ -n "$2" ];then SEP_SECOND=$2 fi if [ -n "$3" ];then MAX_SECOND=$3 fi local MAX_SECOND=$[${MAX_SECOND}/${SEP_SECOND}] while [ "$(ps -p ${MAIN_PID} | wc -l)" -ne "1" ] ; do M=$[$M+1] echo `date '+%Y-%m-%d %H:%M:%S'`"|WAIT|$M" if [ $M -ge ${MAX_SECOND} ];then echo `date '+%Y-%m-%d %H:%M:%S'`"|ERROR|后臺(tái)程序處理超時(shí)" kill $MAIN_PID exit 2 fi sleep ${SEP_SECOND} done } function killPid(){ #根據(jù)程序的ppid獲取程序的pid PIDS=`ps -ef|awk '{if($3=='$1'){print $2} }'`; #殺掉父程序的pid,防止子程序被殺掉后開啟新的子程序 kill -s 9 $1 #如果獲得了pid,則以已獲得的pid作為ppid繼續(xù)進(jìn)行查找 if [ -n "$PIDS" ]; then for PID in $PIDS do kill -9 $PID done fi } # 數(shù)據(jù)文件目錄 CSVDIR=/data/etl/mysql2pg/csv # 臨時(shí)文件目錄 TMPDIR=/data/etl/mysql2pg/tmp # 日志目錄 LOGDIR=/data/etl/mysql2pg/log # 刪除臨時(shí)文件 rmtmpfile() { # 刪除臨時(shí)文件 # PG裝載生成的模板SQL rm -f ${TMP_M2P_SQL} # 模板SQL生成的PSQL腳本 rm -f ${TMP_M2P_PSQL} # MYSQL抽取生成的模板SQL rm -f ${TMP_TMP_SQL} # 模板SQL生成的MYSQL抽取SQL rm -f ${TMP_TMPO_SQL} # csv文件路徑 rm -f ${CSVFILEPATH} # PGSQL執(zhí)行日志 rm -f ${PSQL_EXEC_LOG} # 關(guān)閉子進(jìn)程 killPid $$ } # 檢測(cè)參數(shù) # 沒有參數(shù)直接退出 if [ $# -eq 0 ] then showuseage exit -1 fi # 限定第一個(gè)參數(shù) PARAM1=$1 # 分析第一個(gè)參數(shù)中是否 - 開頭 if [[ ${PARAM1} =~ ^-(.*?) ]]; then #如果第一個(gè)參數(shù)第一個(gè)字符碰到-, echo "dbmysql2pgmysqlcopy的第一個(gè)參數(shù)應(yīng)為需要導(dǎo)入的MYSQL的表名!" showuseage exit -1 else PARAM=${PARAM1} fi # MYSQL連接串 MYSQLDESC=${MYSQLID} # PG連接串 PGDESC=${PGID} # 調(diào)試模式 TESTMOD="-1" # 條件 MYSQLCOND=" WHERE 1 = 1 " # PG的schema PGSCHEMA="-1" # MYSQL的schema MYSQLSCHEMA="-1" # MYSQL的tablename MYSQLTABLE="-1" # PG的tablename PGTNAME="-1" # 授權(quán)用戶 GRANTUSER="-1" # 建表標(biāo)記 默認(rèn)建表 CREATEBJ="1" # 追加操作 EXTRAOPT="-1" # MYSQL導(dǎo)出編碼 MYSQLENCODE="utf8" # PG裝載編碼 PGENCODE="utf8" # 字段忽略標(biāo)記 IGNOREBJ="-1" # 字段處理 COLUMNDEAL="-1" # 指定類型 COLUMNTYPE="-1" # 指定建為復(fù)制表 REPLICATEDBJ="-1" # 日期處理標(biāo)記 DATEFORMAT="-1" # 指定字段 SPECIALCOLUMN="-1" # 解析mysql表 PARAM=$1 ARRAY=(${PARAM//./ }) MYSQLSCHEMA=${ARRAY[0]} MYSQLTABLE=${ARRAY[1]} # 如果mysql表名被雙引號(hào)包著,則直接去掉雙引號(hào) # 如果mysql表名沒被雙引號(hào)包著,則默認(rèn)全部小寫 if [[ "$MYSQLTABLE" =~ \"(.*?)\" ]];then MYSQLTABLE=`echo ${MYSQLTABLE} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'` else MYSQLTABLE=${MYSQLTABLE,,} fi # 如果mysql模式被雙引號(hào)包著,則直接去掉雙引號(hào) # 如果mysql模式?jīng)]被雙引號(hào)包著,則默認(rèn)小寫 if [[ "$MYSQLSCHEMA" =~ \"(.*?)\" ]];then MYSQLSCHEMA=`echo ${MYSQLSCHEMA} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'` else MYSQLSCHEMA=${MYSQLSCHEMA,,} fi # 參數(shù)后移 shift while getopts :i:j:f:o:8:e:u:t:a:cz:I:s:d:v: OPTS; do case "$OPTS" in i) MYSQLDESC="$OPTARG" ;; j) PGDESC="$OPTARG" ;; o) PGSCHEMA="${OPTARG}" ;; f) PGTNAME="${OPTARG}" ;; u) GRANTUSER="${OPTARG}" ;; t) if [ $OPTARG -gt 0 -a $OPTARG -le 100 ];then TESTMOD="$OPTARG" fi ;; a) MYSQLCOND=`echo " WHERE $OPTARG" | sed "s/'/''/g"` ;; c) CREATEBJ=-1 ;; z) EXTRAOPT="$OPTARG" ;; 8) MYSQLENCODE="$OPTARG" ;; e) PGENCODE="$OPTARG" ;; I) IGNOREBJ="$OPTARG" ;; s) SPECIALCOLUMN="$OPTARG" ;; d) COLUMNDEAL="$OPTARG" ;; v) COLUMNTYPE="$OPTARG" ;; :) echo "$0 必須為 -$OPTARG 添加一個(gè)參數(shù)!" exit -1 ;; ?) showuseage exit -1 ;; esac done # 解析mysql連接串 ARRAY2=(${MYSQLDESC//@/ }) USERPWD=${ARRAY2[0]} ARR4=(${USERPWD//:/ }) MYSQLUSER=${ARR4[0]} MYSQLPWD=${ARR4[1]} HPDB=${ARRAY2[1]} ARR5=(${HPDB//:/ }) MYSQLHOST=${ARR5[0]} PDB=${ARR5[1]} ARR6=(${PDB//// }) MYSQLPORT=${ARR6[0]} MYSQLDB=${ARR6[1]} export MYSQL_PWD=$MYSQLPWD MYSQLCONN="mysql -u$MYSQLUSER -h$MYSQLHOST -P$MYSQLPORT --database $MYSQLDB" #分隔符 PGCSEP="," PGQSEP='"' PGESCAPE='\' # 獲取當(dāng)前時(shí)間戳 TIMEST=`date +%Y%m%d%H%M%S` # 日志路徑 LOG_M2P_OUT=${LOGDIR}/m2p_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.out # copy語句的輸出路徑 TMP_COPY_OUT=${TMPDIR}/copy_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.out # PG裝載生成的模板SQL TMP_M2P_SQL=${TMPDIR}/m2p_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.sql # 模板SQL生成的PSQL腳本 TMP_M2P_PSQL=${TMPDIR}/m2p_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.psql # MYSQL抽取生成的模板SQL TMP_TMP_SQL=${TMPDIR}/tmp_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.sql # 模板SQL生成的MYSQL抽取SQL TMP_TMPO_SQL=${TMPDIR}/tmpo_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.sql # csv文件路徑 CSVFILEPATH=${CSVDIR}/${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.csv # PGSQL執(zhí)行日志 PSQL_EXEC_LOG=${LOGDIR}/PG_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.out # 判斷是否有表 tablebj=`${MYSQLCONN} -e "select 1 from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' union all select 1 from information_schema.views where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}';" | sed '1d'` if [ -z "$tablebj" ];then echo `date '+%Y-%m-%d %H:%M:%S'`"|ERROR|表或視圖不存在" echo `date '+%Y-%m-%d %H:%M:%S'`"|ERROR|程序異常結(jié)束" exit -1 fi # 調(diào)試模式處理 if [ ! $TESTMOD = "-1" ];then MYSQLCOND="${MYSQLCOND} limit ${TESTMOD}" fi # PGSCHEMA處理 # 如果PGSCHEMA等于"-1",則默認(rèn)使用mysql同名schema if [ "$PGSCHEMA" = "-1" ];then PGSCHEMA=${MYSQLSCHEMA} fi # PGTNAME處理 # 如果PGTNAME等于"-1",則默認(rèn)與源表同名 if [ "$PGTNAME" = "-1" ];then PGTNAME=${MYSQLTABLE} fi # 如果PG表名被雙引號(hào)包著,則直接去掉雙引號(hào) # 如果PG表名沒被雙引號(hào)包著,則轉(zhuǎn)為小寫 if [[ "$PGTNAME" =~ \"(.*?)\" ]];then PGTNAME=`echo ${PGTNAME} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'` else PGTNAME=${PGTNAME,,} fi # 如果PG模式被雙引號(hào)包著,則直接去掉雙引號(hào) # 如果PG模式?jīng)]被雙引號(hào)包著,則轉(zhuǎn)為小寫 if [[ "$PGSCHEMA" =~ \"(.*?)\" ]];then PGSCHEMA=`echo ${PGSCHEMA} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'` else PGSCHEMA=${PGSCHEMA,,} fi # 解析PG連接串 ARRAY1=(${PGDESC//@/ }) USERPWD=${ARRAY1[0]} ARR1=(${USERPWD//:/ }) PGUSER=${ARR1[0]} PGPWD=${ARR1[1]} HPDB=${ARRAY1[1]} ARR2=(${HPDB//:/ }) PGHOST=${ARR2[0]} PDB=${ARR2[1]} ARR3=(${PDB//// }) PGPORT=${ARR3[0]} PGDB=${ARR3[1]} export PGPASSWORD="$PGPWD" PGCONN="psql -d $PGDB -U $PGUSER -h $PGHOST -p $PGPORT" # 格式化過濾字段 if [ "$IGNOREBJ" != "-1" ];then IGNOREBJ_ARR=(${IGNOREBJ//,/ }) for I in "${!IGNOREBJ_ARR[@]}" do TMP=${IGNOREBJ_ARR[$I]} if [[ "$TMP" =~ \"(.*?)\" ]];then TMP="'`echo ${TMP} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'`'" else TMP="'${TMP,,}'" fi if [ $I -eq 0 ];then IGNOREBJ="$TMP" else IGNOREBJ="$TMP,${IGNOREBJ}" fi done IGNOREBJ="column_name not in (${IGNOREBJ}) and " else IGNOREBJ=" " fi # 格式化指定字段 if [ "$SPECIALCOLUMN" != "-1" ];then SPECIALCOLUMN_ARR=(${SPECIALCOLUMN//,/ }) for I in "${!SPECIALCOLUMN_ARR[@]}" do TMP=${SPECIALCOLUMN_ARR[$I]} if [[ "$TMP" =~ \"(.*?)\" ]];then TMP="'`echo ${TMP} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'`'" else TMP="'${TMP,,}'" fi if [ $I -eq 0 ];then SPECIALCOLUMN="$TMP" else SPECIALCOLUMN="$TMP,${SPECIALCOLUMN}" fi done SPECIALCOLUMN="column_name in (${SPECIALCOLUMN}) and " else SPECIALCOLUMN=" " fi # 判斷PGUSER和PGSCHEMA是否一致 # 如果不一致,需要調(diào)用對(duì)方的權(quán)限執(zhí)行psql if [ "$PGUSER" = "$PGSCHEMA" ];then USERSCHEMABJ="1" else USERSCHEMABJ="-1" fi # 創(chuàng)建日志 rm -f ${LOG_M2P_OUT} touch ${LOG_M2P_OUT} # 寫入日志 echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|導(dǎo)表準(zhǔn)備開始" | tee -a ${LOG_M2P_OUT} echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|拼接數(shù)據(jù)抽取腳本開始" | tee -a ${LOG_M2P_OUT} rm -f ${TMP_TMP_SQL} touch ${TMP_TMP_SQL} rm -f ${TMP_TMPO_SQL} touch ${TMP_TMPO_SQL} #拼接導(dǎo)表語句 cat>${TMP_TMP_SQL}<<eof select 'select ' dbsql from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}'; eof ${MYSQLCONN} < ${TMP_TMP_SQL} | sed '1d' >> ${TMP_TMPO_SQL} cat>${TMP_TMP_SQL}<<eof select columnname from ( select concat(' ', case when ordinal_position = 1 then '' else ',' end, '\`', lower(column_name), '\`') columnname ,ordinal_position from information_schema.columns where ${IGNOREBJ} ${SPECIALCOLUMN} table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' order by ordinal_position ) a; eof ${MYSQLCONN} < ${TMP_TMP_SQL} | sed '1d' >> ${TMP_TMPO_SQL} cat>${TMP_TMP_SQL}<<eof select 'from \`${MYSQLSCHEMA}\`.\`${MYSQLTABLE}\`' from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' union all select '${MYSQLCOND}' from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}'; eof ${MYSQLCONN} < ${TMP_TMP_SQL} | sed '1d' >> ${TMP_TMPO_SQL} #字段特殊處理替換 #COMPENSATETEXT:to_clob(COMPENSATETEXT)、update_time:to_date(to_char(update_time,'yyyymmdd hh24:mi:ss'),'yyyymmdd hh24:mi:ss') COLUMNNAME="" ORIGINROW="" REPLACEROW="" REPLACEROWNUM="" OLD_IFS="$IFS" if [ "$COLUMNDEAL" != "-1" ];then IFS="、" COLUMNDEAL_ARRAY=(${COLUMNDEAL}) for I in "${!COLUMNDEAL_ARRAY[@]}" do ORIGINROW="" REPLACEROW="" TMP=${COLUMNDEAL_ARRAY[$I]} IFS=":" TMP_ARRAY=(${TMP}) for J in "${!TMP_ARRAY[@]}" do TMP1=${TMP_ARRAY[$J]} if [ $J -eq 0 ];then if [[ "$TMP1" =~ \"(.*?)\" ]];then COLUMNNAME=${TMP1} ORIGINROW=" ,\`${TMP1}\`" else COLUMNNAME="\`${TMP1,,}\`" ORIGINROW=" ,\`${TMP1,,}\`" fi else REPLACEROW="${REPLACEROW}${TMP1}" if [ $J -eq $[${#TMP_ARRAY[*]}-1] ];then REPLACEROW=${REPLACEROW}' AS '${COLUMNNAME} fi fi done REPLACEROWNUM=`awk "/${ORIGINROW}/{print NR;exit;}" ${TMP_TMPO_SQL}` if [ ${REPLACEROWNUM} -eq 2 ];then REPLACEROW=' '$REPLACEROW else REPLACEROW=' ,'$REPLACEROW fi ORIGINROW=`sed -n "${REPLACEROWNUM}p" ${TMP_TMPO_SQL}` #雙引號(hào)和斜杠轉(zhuǎn)義 ORIGINROW=${ORIGINROW//\"/\\\"} REPLACEROW=${REPLACEROW//\"/\\\"} ORIGINROW=${ORIGINROW//\//\\\/} REPLACEROW=${REPLACEROW//\//\\\/} sed -i "s/$ORIGINROW/$REPLACEROW/g" ${TMP_TMPO_SQL} done fi IFS="$OLD_IFS" echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|拼接數(shù)據(jù)抽取腳本完成" | tee -a ${LOG_M2P_OUT} echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|輸出MYSQL腳本" | tee -a ${LOG_M2P_OUT} cat ${TMP_TMPO_SQL} | tee -a ${LOG_M2P_OUT} echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|拼接PG的數(shù)據(jù)裝載腳本開始" | tee -a ${LOG_M2P_OUT} rm -f ${TMP_M2P_SQL} touch ${TMP_M2P_SQL} cat>${TMP_M2P_SQL}<<EOF select case when '${USERSCHEMABJ}' = '-1' then 'select pgload.sp_execsql(\$\$' else '--自己導(dǎo)表無須調(diào)用pgload' end psql from dual union all select 'drop table if exists "${PGSCHEMA}"."m2p_${PGTNAME}";' psql from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' union all select 'create table "${PGSCHEMA}"."m2p_${PGTNAME}" (' from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}'; EOF ${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL} cat>${TMP_M2P_SQL}<<EOF select columnname from ( select concat(' ',case when ordinal_position = 1 then '' else ',' end,'"',lower(column_name),'"',' ', case when data_type = 'int' then data_type when data_type = 'varchar' then replace (column_type,'varchar(0)','varchar(1)') when data_type = 'char' then replace(replace(column_type,'char','varchar'),'varchar(0)','varchar(1)') when data_type = 'date' then 'date' when data_type = 'datetime' then replace (column_type, data_type, 'timestamp') when data_type = 'timestamp' then 'timestamp' when data_type = 'bigint' then 'bigint' when data_type = 'double' then 'double precision' when data_type = 'smallint' then 'smallint' when data_type = 'decimal' then replace (column_type,'unsigned zerofill','') when data_type = 'longtext' then 'text' when data_type = 'text' then 'text' when data_type = 'tinyint' then 'int' when data_type = 'longblob' then 'bytea' when data_type = 'blob' then 'bytea' when data_type = 'float' then 'real' when data_type = 'tinytext' then 'text' when data_type = 'mediumtext' then 'text' when data_type = 'numeric' then 'numeric' when data_type = 'time' then 'interval' else 'varchar' end ) columnname ,ordinal_position from information_schema.columns where ${IGNOREBJ} ${SPECIALCOLUMN} table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' order by ordinal_position ) a; EOF ${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL} cat>${TMP_M2P_SQL}<<EOF select PGPRI from ( select case when primarykey is not null then concat(' ,primary key (',primarykey,'));') else '); ' end PGPRI from ( select group_concat(case when column_key = 'PRI' then concat('"',lower(column_name),'"') else null end order by ORDINAL_POSITION separator ',') primarykey from information_schema.tables a, information_schema.columns b where a.table_name = b.table_name and a.table_schema = b.table_schema and a.table_name = '${MYSQLTABLE}' and a.table_schema = '${MYSQLSCHEMA}' group by table_rows ) a ) a union all select concat('comment on table "${PGSCHEMA}"."m2p_${PGTNAME}" is ''',replace(table_comment,'''',''''''),''';') from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' and table_comment != '' and table_comment is not null union all select concat('comment on column "${PGSCHEMA}"."m2p_${PGTNAME}"."',lower(column_name),'" is ''',replace(column_comment,'''',''''''),''';') from information_schema.columns where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' and column_comment != '' and column_comment is not null UNION ALL select '\$\$,\$\$${PGSCHEMA}\$\$);' from dual where '${USERSCHEMABJ}' = '-1' union all select 'select pgload.sp_execsql(\$\$grant insert on table "${PGSCHEMA}"."m2p_${PGTNAME}" to "${PGUSER}";\$\$,\$\$${PGSCHEMA}\$\$);' from dual where '${USERSCHEMABJ}' = '-1'; EOF ${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL} cat>>${TMP_M2P_PSQL}<<eof \\copy "${PGSCHEMA}"."m2p_${PGTNAME}" FROM '${CSVFILEPATH}' WITH ( FORMAT csv,HEADER true,DELIMITER '${PGCSEP}',QUOTE '${PGQSEP}',ESCAPE '${PGESCAPE}'); eof cat>${TMP_M2P_SQL}<<EOF select 'select pgload.sp_execsql(\$\$' pgsql from dual where '${USERSCHEMABJ}' = '-1' and '${CREATEBJ}' = '-1' union all select 'insert into "${PGSCHEMA}"."${PGTNAME}" (' insertsql from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' and '${CREATEBJ}' = '-1'; EOF ${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL} cat>${TMP_M2P_SQL}<<EOF select columnname from ( select concat(' ', case when ordinal_position = 1 then '' else ',' end, '"', lower(column_name), '"') columnname ,ordinal_position from information_schema.columns where ${IGNOREBJ} ${SPECIALCOLUMN} table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' order by ordinal_position ) a where '${CREATEBJ}' = '-1'; EOF ${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL} cat>${TMP_M2P_SQL}<<EOF select ')' insertsql from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' and '${CREATEBJ}' = '-1' union all select 'select ' insertsql from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' and '${CREATEBJ}' = '-1'; EOF ${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL} cat>${TMP_M2P_SQL}<<EOF select columnname from ( select concat(' ', case when ordinal_position = 1 then '' else ',' end, '"', lower(column_name), '"') columnname ,ordinal_position from information_schema.columns where ${IGNOREBJ} ${SPECIALCOLUMN} table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' order by ordinal_position ) a where '${CREATEBJ}' = '-1'; EOF ${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL} cat>${TMP_M2P_SQL}<<EOF select 'from "${PGSCHEMA}"."m2p_${PGTNAME}";' from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' and '${CREATEBJ}' = '-1' union all select '\$\$,\$\$${PGSCHEMA}\$\$);' from dual where '${USERSCHEMABJ}' = '-1' and '${CREATEBJ}' = '-1' union all select 'drop table if exists "${PGSCHEMA}"."${PGTNAME}";' from dual where '${CREATEBJ}' = '1' and '${USERSCHEMABJ}' = '1' union all select 'select pgload.sp_execsql(\$\$drop table if exists "${PGSCHEMA}"."${PGTNAME}";\$\$,\$\$${PGSCHEMA}\$\$);' from dual where '${CREATEBJ}' = '1' and '${USERSCHEMABJ}' = '-1' union all select 'alter table "${PGSCHEMA}"."m2p_${PGTNAME}" rename to "${PGTNAME}";' from dual where '${CREATEBJ}' = '1' and '${USERSCHEMABJ}' = '1' union all select 'select pgload.sp_execsql(\$\$alter table "${PGSCHEMA}"."m2p_${PGTNAME}" rename to "${PGTNAME}";\$\$,\$\$${PGSCHEMA}\$\$);' from dual where '${CREATEBJ}' = '1' and '${USERSCHEMABJ}' = '-1' union all select 'drop table if exists "${PGSCHEMA}"."m2p_${PGTNAME}";' from dual where '${CREATEBJ}' = '-1' and '${USERSCHEMABJ}' = '1' union all select 'select pgload.sp_execsql(\$\$drop table if exists "${PGSCHEMA}"."m2p_${PGTNAME}";\$\$,\$\$${PGSCHEMA}\$\$);' from dual where '${CREATEBJ}' = '-1' and '${USERSCHEMABJ}' = '-1' union all select 'grant select on table "${PGSCHEMA}"."${PGTNAME}" to ${GRANTUSER};' from dual where '${GRANTUSER}' <> '-1' and '${USERSCHEMABJ}' = '1' union all select 'select pgload.sp_execsql(\$\$grant select on table "${PGSCHEMA}"."${PGTNAME}" to "${GRANTUSER}";\$\$,\$\$${PGSCHEMA}\$\$);' from dual where '${GRANTUSER}' <> '-1' and '${USERSCHEMABJ}' = '-1' union all select '${EXTRAOPT}' from dual where '${EXTRAOPT}' <> '-1' and '${USERSCHEMABJ}' = '1' union all select 'select pgload.sp_execsql(\$\$${EXTRAOPT}\$\$,\$\$${PGSCHEMA}\$\$);' from dual where '${EXTRAOPT}' <> '-1' and '${USERSCHEMABJ}' = '-1' union all select 'select pgload.replace_to_null(\$\$${PGTNAME}\$\$,\$\$${PGSCHEMA}\$\$);' from dual union all select 'analyze "${PGSCHEMA}"."${PGTNAME}";' from dual where '${USERSCHEMABJ}' = '1' union all select 'select pgload.sp_execsql(\$\$analyze "${PGSCHEMA}"."${PGTNAME}";\$\$,\$\$${PGSCHEMA}\$\$);' from dual where '${USERSCHEMABJ}' = '-1'; EOF ${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL} #指定字段類型 OLD_IFS="$IFS" if [ "$COLUMNTYPE" != "-1" ];then IFS="、" COLUMNTYPE_ARRAY=(${COLUMNTYPE}) for I in "${!COLUMNTYPE_ARRAY[@]}" do ORIGINROW="" REPLACEROW="" TMP=${COLUMNTYPE_ARRAY[$I]} IFS=":" TMP_ARRAY=(${TMP}) for J in "${!TMP_ARRAY[@]}" do TMP1=${TMP_ARRAY[$J]} if [ $J -eq 0 ];then if [[ "$TMP1" =~ \"(.*?)\" ]];then COLUMNNAME=${TMP1} ORIGINROW=",${TMP1}" else COLUMNNAME="\"${TMP1,,}\"" ORIGINROW=",\"${TMP1,,}\"" fi else REPLACEROW="${REPLACEROW}${TMP1}" if [ $J -eq $[${#TMP_ARRAY[*]}-1] ];then REPLACEROW="${COLUMNNAME} ${REPLACEROW}" fi fi done REPLACEROWNUM=`awk "/${COLUMNNAME}/{print NR;exit;}" ${TMP_M2P_PSQL}` if [ $REPLACEROWNUM -eq 4 ];then REPLACEROW=' '$REPLACEROW else REPLACEROW=' ,'$REPLACEROW fi ORIGINROW=`sed -n "${REPLACEROWNUM}p" ${TMP_M2P_PSQL}` #雙引號(hào)和斜杠轉(zhuǎn)義 ORIGINROW=${ORIGINROW//\"/\\\"} REPLACEROW=${REPLACEROW//\"/\\\"} ORIGINROW=${ORIGINROW//\//\\\/} REPLACEROW=${REPLACEROW//\//\\\/} #echo $ORIGINROW #echo $REPLACEROW #echo $REPLACEROWNUM sed -i "s/$ORIGINROW/$REPLACEROW/g" ${TMP_M2P_PSQL} done fi IFS="$OLD_IFS" echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|拼接PG的數(shù)據(jù)裝載腳本完成" | tee -a ${LOG_M2P_OUT} echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|輸出PSQL腳本" | tee -a ${LOG_M2P_OUT} cat ${TMP_M2P_PSQL} | tee -a ${LOG_M2P_OUT} echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|導(dǎo)表準(zhǔn)備完成" | tee -a ${LOG_M2P_OUT} #開始抽取數(shù)據(jù) echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|從MYSQL抽取數(shù)據(jù)開始" | tee -a ${LOG_M2P_OUT} rm -f ${CSVFILEPATH} touch ${CSVFILEPATH} extractmysqldata(){ # ${MYSQLCONN} < ${TMP_TMPO_SQL} | sed "s/\x00//g;s/\\\n/\n/g;s/${PGQSEP}/\\""${PGESCAPE}${PGQSEP}/g;s/\t/${PGQSEP}${PGCSEP}${PGQSEP}/g;s/^/${PGQSEP}&/g;s/$/&${PGQSEP}/g;s/${PGQSEP}NULL${PGQSEP}//g;s/${PGQSEP}${PGQSEP}//g;s/\\\t/\t/g" > ${CSVFILEPATH} ${MYSQLCONN} < ${TMP_TMPO_SQL} | sed "s/\x00//g;s/\\\n/\n/g;s/${PGQSEP}/\\""${PGESCAPE}${PGQSEP}/g;s/\t/${PGQSEP}${PGCSEP}${PGQSEP}/g;s/^/${PGQSEP}&/g;s/$/&${PGQSEP}/g;s/${PGQSEP}NULL${PGQSEP}//g;s/\\\t/\t/g" > ${CSVFILEPATH} } extractmysqldata & EXTRACTMYSQLDATA_PID=$(jobs -p | tail -1) progress "${EXTRACTMYSQLDATA_PID}" & EXTRACTMYSQLDATA_PPID=$(jobs -p | tail -1) wait "${EXTRACTMYSQLDATA_PID}" echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|從MYSQL抽取數(shù)據(jù)完成" | tee -a ${LOG_M2P_OUT} echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|在PG中裝載數(shù)據(jù)開始" | tee -a ${LOG_M2P_OUT} loadmysqldata(){ #執(zhí)行psql建表腳本 ${PGCONN} >>${PSQL_EXEC_LOG} 2>&1 <<PSQL \set ECHO all \timing on \! echo `date "+%Y %m %d %H:%M:%S"` \i ${TMP_M2P_PSQL} \! echo `date "+%Y %m %d %H:%M:%S"` PSQL } loadmysqldata & LOADMYSQLDATA_PID=$(jobs -p | tail -1) progress "${LOADMYSQLDATA_PID}" & LOADMYSQLDATA_PPID=$(jobs -p | tail -1) wait "${LOADMYSQLDATA_PID}" echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|在PG中裝載數(shù)據(jù)完成" | tee -a ${LOG_M2P_OUT} echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|輸出裝載日志" | tee -a ${LOG_M2P_OUT} cat ${PSQL_EXEC_LOG} | tee -a ${LOG_M2P_OUT} echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|導(dǎo)表日志:${LOG_M2P_OUT}" | tee -a ${LOG_M2P_OUT} #獲取PG裝載的記錄數(shù) ERRORBJ=`cat ${LOG_M2P_OUT} | grep '^psql:' | grep -E 'FATAL:|ERROR:' | wc -l` if [ ${ERRORBJ} -ne 0 ];then echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|數(shù)據(jù)導(dǎo)入失敗" | tee -a ${LOG_M2P_OUT} exit -1 else PGNUM=`awk '{if($1=="COPY") print $2}' ${PSQL_EXEC_LOG}` echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|數(shù)據(jù)導(dǎo)入成功:${PGNUM}" | tee -a ${LOG_M2P_OUT} fi
測(cè)試
在mysql中建表
create table tmp ( id int primary key comment '主鍵' ,name varchar(50) comment '姓名' ); insert into tmp values (1,'張三'); insert into tmp values (2,'李四'); insert into tmp values (3,'王五'); insert into tmp values (4,'你好,'' " 我不好 1111');
導(dǎo)表測(cè)試
[root@yzcdb-2 ~]# su - pgload Last login: Tue Mar 7 08:56:24 CST 2023 on pts/0 [pgload@yzcdb-2 ~]$ dbmysql2pgmysqlcopy etl.tmp -o yuzhenchao 2023-03-07 14:02:12|INFO|導(dǎo)表準(zhǔn)備開始 2023-03-07 14:02:12|INFO|拼接數(shù)據(jù)抽取腳本開始 2023-03-07 14:02:12|INFO|拼接數(shù)據(jù)抽取腳本完成 2023-03-07 14:02:12|INFO|輸出MYSQL腳本 select `id` ,`name` from `etl`.`tmp` WHERE 1 = 1 2023-03-07 14:02:12|INFO|拼接PG的數(shù)據(jù)裝載腳本開始 2023-03-07 14:02:12|INFO|拼接PG的數(shù)據(jù)裝載腳本完成 2023-03-07 14:02:12|INFO|輸出PSQL腳本 select pgload.sp_execsql($$ drop table if exists "yuzhenchao"."m2p_tmp"; create table "yuzhenchao"."m2p_tmp" ( "id" int ,"name" varchar(50) ,primary key ("id")); comment on column "yuzhenchao"."m2p_tmp"."id" is '主鍵'; comment on column "yuzhenchao"."m2p_tmp"."name" is '姓名'; $$,$$yuzhenchao$$); select pgload.sp_execsql($$grant insert on table "yuzhenchao"."m2p_tmp" to "pgload";$$,$$yuzhenchao$$); \copy "yuzhenchao"."m2p_tmp" FROM '/data/etl/mysql2pg/csv/etl_tmp_20230307140212.csv' WITH ( FORMAT csv,HEADER true,DELIMITER ',',QUOTE '"',ESCAPE '\'); select pgload.sp_execsql($$drop table if exists "yuzhenchao"."tmp";$$,$$yuzhenchao$$); select pgload.sp_execsql($$alter table "yuzhenchao"."m2p_tmp" rename to "tmp";$$,$$yuzhenchao$$); select pgload.sp_execsql($$analyze "yuzhenchao"."tmp";$$,$$yuzhenchao$$); 2023-03-07 14:02:12|INFO|導(dǎo)表準(zhǔn)備完成 2023-03-07 14:02:12|INFO|從MYSQL抽取數(shù)據(jù)開始 2023-03-07 14:02:12|WAIT|1 2023-03-07 14:02:12|INFO|從MYSQL抽取數(shù)據(jù)完成 2023-03-07 14:02:12|INFO|在PG中裝載數(shù)據(jù)開始 2023-03-07 14:02:13|WAIT|1 2023-03-07 14:02:13|INFO|在PG中裝載數(shù)據(jù)完成 2023-03-07 14:02:13|INFO|輸出裝載日志 \timing on Timing is on. \! echo 2023 03 07 14:02:12 2023 03 07 14:02:12 \i /data/etl/mysql2pg/tmp/m2p_etl_tmp_20230307140212.psql select pgload.sp_execsql($$ drop table if exists "yuzhenchao"."m2p_tmp"; create table "yuzhenchao"."m2p_tmp" ( "id" int ,"name" varchar(50) ,primary key ("id")); comment on column "yuzhenchao"."m2p_tmp"."id" is '主鍵'; comment on column "yuzhenchao"."m2p_tmp"."name" is '姓名'; $$,$$yuzhenchao$$); psql:/data/etl/mysql2pg/tmp/m2p_etl_tmp_20230307140212.psql:9: NOTICE: table "m2p_tmp" does not exist, skipping sp_execsql ------------ (1 row) Time: 17.889 ms select pgload.sp_execsql($$grant insert on table "yuzhenchao"."m2p_tmp" to "pgload";$$,$$yuzhenchao$$); sp_execsql ------------ (1 row) Time: 1.558 ms \copy "yuzhenchao"."m2p_tmp" FROM '/data/etl/mysql2pg/csv/etl_tmp_20230307140212.csv' WITH ( FORMAT csv,HEADER true,DELIMITER ',',QUOTE '"',ESCAPE '\'); COPY 4 Time: 32.051 ms select pgload.sp_execsql($$drop table if exists "yuzhenchao"."tmp";$$,$$yuzhenchao$$); sp_execsql ------------ (1 row) Time: 3.049 ms select pgload.sp_execsql($$alter table "yuzhenchao"."m2p_tmp" rename to "tmp";$$,$$yuzhenchao$$); sp_execsql ------------ (1 row) Time: 1.687 ms select pgload.sp_execsql($$analyze "yuzhenchao"."tmp";$$,$$yuzhenchao$$); sp_execsql ------------ (1 row) Time: 1.848 ms \! echo 2023 03 07 14:02:13 2023 03 07 14:02:13 2023-03-07 14:02:13|INFO|導(dǎo)表日志:/data/etl/mysql2pg/log/m2p_etl_tmp_20230307140212.out 2023-03-07 14:02:13|INFO|數(shù)據(jù)導(dǎo)入成功:4 Killed
查看pg中的表
到此這篇關(guān)于Mysql遷移Postgresql的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Mysql遷移Postgresql內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MySQL導(dǎo)入導(dǎo)出.sql文件及常用命令小結(jié)
在MySQL Qurey Brower中直接導(dǎo)入*.sql腳本,是不能一次執(zhí)行多條sql命令的,下面為大家介紹下MySQL導(dǎo)入導(dǎo)出.sql文件及常用命令2014-08-08mysql存儲(chǔ)過程中使用游標(biāo)的實(shí)例
使用MYSQL存儲(chǔ)過程,可以實(shí)現(xiàn)諸多的功能,下面將為您介紹一個(gè)MYSQL存儲(chǔ)過程中使用游標(biāo)的實(shí)例2014-01-01mysql和oracle默認(rèn)排序的方法 - 不指定order by
這篇文章主要介紹了mysql和oracle默認(rèn)排序的方法 - 不指定order by。具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07用命令創(chuàng)建MySQL數(shù)據(jù)庫(de1)的方法
下面小編就為大家?guī)硪黄妹顒?chuàng)建MySQL數(shù)據(jù)庫(de1)的方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-03-03Mysql報(bào)錯(cuò)Duplicate?entry?'值'?for?key?'字段名&
今天在使用數(shù)據(jù)庫的過程中,發(fā)現(xiàn)一直報(bào)Duplicate?entry?'值'?for?key?'字段名'的錯(cuò)誤,所以下面這篇文章主要給大家介紹了關(guān)于Mysql報(bào)錯(cuò)Duplicate?entry?'值'?for?key?'字段名'的解決方法,需要的朋友可以參考下2023-04-04