Mysql遷移Postgresql的實(shí)現(xiàn)示例
原理
Mysql抽?。簃ysql命令重定向到操作系統(tǒng)文件,處理成csv文件;
PG裝載:copy方式將csv文件裝載進(jìn)PG。
環(huán)境準(zhǔn)備
操作系統(tǒng)(Centos7)
useradd pgload
passwd pgload
mkdir -p /data/etl/mysql2pg/csv
mkdir -p /data/etl/mysql2pg/tmp
mkdir -p /data/etl/mysql2pg/log
mkdir -p /data/etl/mysql2pg/shell
chown -R pgload.pgload /data/etl
su - pgload
touch /data/etl/mysql2pg/shell/dbmysql2pgmysqlcopy
chmod +x /data/etl/mysql2pg/shell/dbmysql2pgmysqlcopy
echo 'export PATH=${PATH}:/data/etl/mysql2pg/shell
# mysqlselect作為mysql抽取數(shù)據(jù)的用戶
export MYSQLID=mysqlselect:000000@10.10.10.10:3306/etl
# pgload為PG數(shù)據(jù)裝載的用戶
export PGID=pgload:000000@10.10.10.10:5432/etl' >> ~/.bash_profile
source ~/.bash_profile
Mysql客戶端安裝
由于rpm安裝方式與系統(tǒng)自帶的mariadb有沖突,所以只有卸載mariadb才能通過(guò)rpm方式進(jìn)行安裝;
所以,在此以壓縮包的方式進(jìn)行安裝。
下載mysql客戶端,我這里下載mysql5.7.34


將下載好的壓縮包進(jìn)行一次解壓得到2個(gè)文件

將 mysql-5.7.34-el7-x86_64.tar.gz 上傳至服務(wù)器
cd /usr/local rz tar -zxvf mysql-5.7.34-el7-x86_64.tar.gz mv mysql-5.7.34-el7-x86_64 mysql-client # 配置環(huán)境變量 echo 'export PATH=$PATH:/usr/local/mysql-client/bin' >> /etc/profile source /etc/profile # 測(cè)試mysql命令 mysql -uroot -h10.10.10.10 -P3306 --database etl -e "select 1 from dual;" -p
Psql客戶端安裝



cd /opt rz rpm -ivh postgresql12-libs-12.11-1PGDG.rhel7.x86_64.rpm rpm -ivh postgresql12-12.11-1PGDG.rhel7.x86_64.rpm # 測(cè)試 psql etl -h 10.10.10.10 -p 5432 -U pgload -W
數(shù)據(jù)庫(kù)用戶
Mysql
CREATE USER 'mysqlselect'@'%' IDENTIFIED BY '000000'; GRANT SELECT ON *.* TO 'mysqlselect'@'%';
PG
--普通用戶
create role yuzhenchao with login password '000000';
create schema yuzhenchao;
grant create,usage on schema yuzhenchao to yuzhenchao;
grant usage on schema yuzhenchao to public;
alter default privileges for role yuzhenchao revoke execute on functions from public;
alter default privileges in schema yuzhenchao revoke execute on functions from public;
alter default privileges in schema yuzhenchao grant select on tables to public;
alter default privileges for role yuzhenchao grant select on tables to public;
--集中用戶(即專門用來(lái)做數(shù)據(jù)裝載的用戶)
create role pgload with login password '000000' connection limit 60;
create schema pgload;
grant create,usage on schema pgload to pgload;
grant usage on schema pgload to public;
alter default privileges for role pgload revoke execute on functions from public;
alter default privileges in schema pgload revoke execute on functions from public;
alter default privileges in schema pgload grant select on tables to public;
alter default privileges for role pgload grant select on tables to public;
--普通用戶都要?jiǎng)?chuàng)建該函數(shù)
--為yuzhenchao用戶創(chuàng)建sp_exec函數(shù)
create or replace function yuzhenchao.sp_exec(vsql varchar)
returns void --返回空
language plpgsql
security definer --定義者權(quán)限
as $function$
begin
execute vsql;
end;
$function$
;
alter function yuzhenchao.sp_exec(varchar) owner to yuzhenchao;
grant execute on function yuzhenchao.sp_exec(varchar) to yuzhenchao,pgload;
create or replace function pgload.sp_exec(vsql varchar)
returns void --返回空
language plpgsql
security definer --定義者權(quán)限
as $function$
begin
execute vsql;
end;
$function$
;
alter function pgload.sp_exec(varchar) owner to pgload;
grant execute on function pgload.sp_exec(varchar) to pgload;
--集中用戶pgload創(chuàng)建該函數(shù),新增用戶則需要增加配置重新編譯
create or replace function pgload.sp_execsql(exec_sql character varying,exec_user character varying)
returns void
language plpgsql
security definer
as $function$
/* 作者 : v-yuzhenc
* 功能 : 集中處理程序,以某用戶的權(quán)限執(zhí)行某條sql語(yǔ)句
* exec_sql : 需要執(zhí)行的sql語(yǔ)句
* exec_user : 需要以哪個(gè)用戶的權(quán)限執(zhí)行該sql語(yǔ)句
* */
declare
p_user varchar := exec_user;
o_search_path varchar;
begin
--記錄原來(lái)的模式搜索路徑
execute 'show search_path;' into o_search_path;
--臨時(shí)切換模式搜索路徑
execute 'SET search_path TO '||p_user||',public,oracle';
case p_user
when 'yuzhenchao' then perform yuzhenchao.sp_exec(exec_sql);
when 'pgload' then perform pgload.sp_exec(exec_sql);
else raise exception '未配置該用戶:%',p_user;
end case;
--恢復(fù)模式搜索路徑
execute 'SET search_path TO '||o_search_path;
exception when others then
--恢復(fù)模式搜索路徑
execute 'SET search_path TO '||o_search_path;
raise exception '%',sqlerrm;
end;
$function$
;
--將對(duì)應(yīng)模式的對(duì)應(yīng)模式的函數(shù)給對(duì)應(yīng)的模式的擁有者
alter function pgload.sp_execsql(varchar,varchar) owner to pgload;
--將對(duì)應(yīng)模式的sp_exec函數(shù)授權(quán)給定義者和集中用戶execute權(quán)限
grant execute on function pgload.sp_execsql(varchar,varchar) to pgload;
空字符串處理成null
在pgload模式下建立函數(shù)
create or replace function replace_to_null(tablename character varying, schemaname character varying default ("current_user"())::character varying(64))
returns void
language plpgsql
as $function$
/* 作者 : v-yuzhenc
* 功能:掃描指定表的所有varchar和text類型的字段,將字段值為''替換成null
* tablename : 需要掃描的表名
* schemaname : 需要掃描的模式名
* */
declare
p_tablename varchar := lower(tablename);
p_schemaname varchar := lower(schemaname);
p_user varchar(64) := lower(user::varchar(64));--調(diào)用者
existbj int := 0; --存在標(biāo)記
v_sql varchar; --動(dòng)態(tài)sql
begin
--掃描varchar和text字段
select count(1)
into existbj
from pg_class a
inner join pg_namespace b
on (a.relnamespace = b.oid)
inner join pg_attribute c
on (a.oid = c.attrelid)
inner join pg_type d
on (c.atttypid = d.oid)
where c.attnum > 0
and d.typname in ('varchar','text')
and a.relname = p_tablename
and b.nspname = p_schemaname;
--若不存在varchar或者text字段,則不做處理
if existbj = 0 then
raise notice '%.%表不需要處理空字符串!',p_schemaname,p_tablename;
return;
end if;
--拼接處理空字符串語(yǔ)句
select
string_agg('update '||p_schemaname||'.'||p_tablename||'
set '||c.attname||' = null where '||c.attname||' = '''';',chr(10))
into v_sql
from pg_class a
inner join pg_namespace b
on (a.relnamespace = b.oid)
inner join pg_attribute c
on (a.oid = c.attrelid)
inner join pg_type d
on (c.atttypid = d.oid)
where c.attnum > 0
and d.typname in ('varchar','text')
and a.relname = p_tablename
and b.nspname = p_schemaname;
if p_user = p_schemaname then
execute v_sql;
execute 'analyze '||p_schemaname||'.'||p_tablename;
else
--通過(guò)集中處理程序執(zhí)行動(dòng)態(tài)sql
perform pgload.sp_execsql(v_sql,p_schemaname);
--分析表
perform pgload.sp_execsql('analyze '||p_schemaname||'.'||p_tablename,p_schemaname);
end if;
end;
$function$
;
導(dǎo)表腳本
dbmysql2pgmysqlcopy
#! /bin/bash
showuseage() {
echo "程序功能:mysql導(dǎo)出MYSQL數(shù)據(jù)庫(kù)表,copy方式導(dǎo)入PG數(shù)據(jù)庫(kù)
Useage: [dbmysql2pgmysqlcopy \${SCHEMANAME}.\${TABLENAME}]
-i [:可選,源數(shù)據(jù)庫(kù)(MYSQL)帳號(hào):username:passwd@hostname:port/dbname,默認(rèn)定義在.bash_profile \${MYSQLID},不要出現(xiàn)這些字符:
冒號(hào)(:),艾特(@),空格( ),斜杠(/)]
-j [:可選,目標(biāo)數(shù)據(jù)庫(kù)(PG)帳號(hào):username:passwd@hostname:port/dbname,默認(rèn)定義在.bash_profile \${PGID},不要出現(xiàn)這些字符:
冒號(hào)(:),艾特(@),空格( ),斜杠(/)]
-o [:可選,指定需要導(dǎo)入到PG的schemaname,默認(rèn)為MYSQL同名的schemaname(即MYSQL的數(shù)據(jù)庫(kù)名)]
-f [:可選,可指定導(dǎo)入表名,常用于不同數(shù)據(jù)庫(kù)或不同用戶同一表名沖突、源表改名不影響后續(xù)應(yīng)用、表名追加時(shí)間參數(shù)等情況,表名暫時(shí)限
定為:英文字母(不分大小寫)、數(shù)字和任意組合,禁止使用特殊字符]
-8 [:可選,指定字符編碼導(dǎo)出MYSQL數(shù)據(jù),默認(rèn)utf8]
-e [:可選,指定字符編碼入庫(kù)PG,默認(rèn)utf8]
-u [:可選,指定表授權(quán)其他用戶,指定且多個(gè)時(shí)使用逗號(hào)分開(kāi),如:'public'、'bss,apl',不要有空格]
-t [:可選,(test mod)調(diào)試模式,最多導(dǎo)出100行記錄進(jìn)行調(diào)試]
-a [:可選,指定where條件內(nèi)容,如:'city_id in (0,755)'(無(wú)需轉(zhuǎn)義)]
-c [:可選,不建表,直接導(dǎo)數(shù)據(jù),表結(jié)構(gòu)必須存在]
-z [:可選,導(dǎo)完表后的追加操作]
-I [:可選,過(guò)濾字段,建表時(shí)過(guò)濾掉過(guò)濾字段,逗號(hào)分隔,例如:serv_id,\"acc_nbr\"]
-s [:可選,指定字段,建表時(shí)只導(dǎo)指定的字段,逗號(hào)分隔,例如:serv_id,\"acc_nbr\"]
-d [:可選,指定字段特殊處理,原字段類型不變,字段處理后的值不能超出原來(lái)的精度,全角冒號(hào)頓號(hào)分隔,'字段名1:字段處理值1、字段名2:字段處理值2',
例如:COMPENSATETEXT:to_clob(COMPENSATETEXT)、update_time:to_date(to_char(update_time,'yyyymmdd hh24:mi:ss'),'yyyymmdd hh24:mi:ss')]
-v [:可選,指定某些字段對(duì)應(yīng)PG的類型,全角冒號(hào)頓號(hào)分隔,字段名1:PG類型1、字段名2:PG類型2',例如:COMPENSATETEXT:text、update_time:date]"
}
# 退出之前刪除臨時(shí)文件
trap "rmtmpfile" EXIT
# 進(jìn)度條程序
progress() {
M=0
local MAIN_PID=$1
local MAX_SECOND=14400
local SEP_SECOND=1
if [ -n "$2" ];then
SEP_SECOND=$2
fi
if [ -n "$3" ];then
MAX_SECOND=$3
fi
local MAX_SECOND=$[${MAX_SECOND}/${SEP_SECOND}]
while [ "$(ps -p ${MAIN_PID} | wc -l)" -ne "1" ] ; do
M=$[$M+1]
echo `date '+%Y-%m-%d %H:%M:%S'`"|WAIT|$M"
if [ $M -ge ${MAX_SECOND} ];then
echo `date '+%Y-%m-%d %H:%M:%S'`"|ERROR|后臺(tái)程序處理超時(shí)"
kill $MAIN_PID
exit 2
fi
sleep ${SEP_SECOND}
done
}
function killPid(){
#根據(jù)程序的ppid獲取程序的pid
PIDS=`ps -ef|awk '{if($3=='$1'){print $2} }'`;
#殺掉父程序的pid,防止子程序被殺掉后開(kāi)啟新的子程序
kill -s 9 $1
#如果獲得了pid,則以已獲得的pid作為ppid繼續(xù)進(jìn)行查找
if [ -n "$PIDS" ]; then
for PID in $PIDS
do
kill -9 $PID
done
fi
}
# 數(shù)據(jù)文件目錄
CSVDIR=/data/etl/mysql2pg/csv
# 臨時(shí)文件目錄
TMPDIR=/data/etl/mysql2pg/tmp
# 日志目錄
LOGDIR=/data/etl/mysql2pg/log
# 刪除臨時(shí)文件
rmtmpfile() {
# 刪除臨時(shí)文件
# PG裝載生成的模板SQL
rm -f ${TMP_M2P_SQL}
# 模板SQL生成的PSQL腳本
rm -f ${TMP_M2P_PSQL}
# MYSQL抽取生成的模板SQL
rm -f ${TMP_TMP_SQL}
# 模板SQL生成的MYSQL抽取SQL
rm -f ${TMP_TMPO_SQL}
# csv文件路徑
rm -f ${CSVFILEPATH}
# PGSQL執(zhí)行日志
rm -f ${PSQL_EXEC_LOG}
# 關(guān)閉子進(jìn)程
killPid $$
}
# 檢測(cè)參數(shù)
# 沒(méi)有參數(shù)直接退出
if [ $# -eq 0 ]
then
showuseage
exit -1
fi
# 限定第一個(gè)參數(shù)
PARAM1=$1
# 分析第一個(gè)參數(shù)中是否 - 開(kāi)頭
if [[ ${PARAM1} =~ ^-(.*?) ]]; then
#如果第一個(gè)參數(shù)第一個(gè)字符碰到-,
echo "dbmysql2pgmysqlcopy的第一個(gè)參數(shù)應(yīng)為需要導(dǎo)入的MYSQL的表名!"
showuseage
exit -1
else
PARAM=${PARAM1}
fi
# MYSQL連接串
MYSQLDESC=${MYSQLID}
# PG連接串
PGDESC=${PGID}
# 調(diào)試模式
TESTMOD="-1"
# 條件
MYSQLCOND=" WHERE 1 = 1 "
# PG的schema
PGSCHEMA="-1"
# MYSQL的schema
MYSQLSCHEMA="-1"
# MYSQL的tablename
MYSQLTABLE="-1"
# PG的tablename
PGTNAME="-1"
# 授權(quán)用戶
GRANTUSER="-1"
# 建表標(biāo)記 默認(rèn)建表
CREATEBJ="1"
# 追加操作
EXTRAOPT="-1"
# MYSQL導(dǎo)出編碼
MYSQLENCODE="utf8"
# PG裝載編碼
PGENCODE="utf8"
# 字段忽略標(biāo)記
IGNOREBJ="-1"
# 字段處理
COLUMNDEAL="-1"
# 指定類型
COLUMNTYPE="-1"
# 指定建為復(fù)制表
REPLICATEDBJ="-1"
# 日期處理標(biāo)記
DATEFORMAT="-1"
# 指定字段
SPECIALCOLUMN="-1"
# 解析mysql表
PARAM=$1
ARRAY=(${PARAM//./ })
MYSQLSCHEMA=${ARRAY[0]}
MYSQLTABLE=${ARRAY[1]}
# 如果mysql表名被雙引號(hào)包著,則直接去掉雙引號(hào)
# 如果mysql表名沒(méi)被雙引號(hào)包著,則默認(rèn)全部小寫
if [[ "$MYSQLTABLE" =~ \"(.*?)\" ]];then
MYSQLTABLE=`echo ${MYSQLTABLE} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'`
else
MYSQLTABLE=${MYSQLTABLE,,}
fi
# 如果mysql模式被雙引號(hào)包著,則直接去掉雙引號(hào)
# 如果mysql模式?jīng)]被雙引號(hào)包著,則默認(rèn)小寫
if [[ "$MYSQLSCHEMA" =~ \"(.*?)\" ]];then
MYSQLSCHEMA=`echo ${MYSQLSCHEMA} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'`
else
MYSQLSCHEMA=${MYSQLSCHEMA,,}
fi
# 參數(shù)后移
shift
while getopts :i:j:f:o:8:e:u:t:a:cz:I:s:d:v: OPTS; do
case "$OPTS" in
i)
MYSQLDESC="$OPTARG"
;;
j)
PGDESC="$OPTARG"
;;
o)
PGSCHEMA="${OPTARG}"
;;
f)
PGTNAME="${OPTARG}"
;;
u)
GRANTUSER="${OPTARG}"
;;
t)
if [ $OPTARG -gt 0 -a $OPTARG -le 100 ];then
TESTMOD="$OPTARG"
fi
;;
a)
MYSQLCOND=`echo " WHERE $OPTARG" | sed "s/'/''/g"`
;;
c)
CREATEBJ=-1
;;
z)
EXTRAOPT="$OPTARG"
;;
8)
MYSQLENCODE="$OPTARG"
;;
e)
PGENCODE="$OPTARG"
;;
I)
IGNOREBJ="$OPTARG"
;;
s)
SPECIALCOLUMN="$OPTARG"
;;
d)
COLUMNDEAL="$OPTARG"
;;
v)
COLUMNTYPE="$OPTARG"
;;
:)
echo "$0 必須為 -$OPTARG 添加一個(gè)參數(shù)!"
exit -1
;;
?)
showuseage
exit -1
;;
esac
done
# 解析mysql連接串
ARRAY2=(${MYSQLDESC//@/ })
USERPWD=${ARRAY2[0]}
ARR4=(${USERPWD//:/ })
MYSQLUSER=${ARR4[0]}
MYSQLPWD=${ARR4[1]}
HPDB=${ARRAY2[1]}
ARR5=(${HPDB//:/ })
MYSQLHOST=${ARR5[0]}
PDB=${ARR5[1]}
ARR6=(${PDB//// })
MYSQLPORT=${ARR6[0]}
MYSQLDB=${ARR6[1]}
export MYSQL_PWD=$MYSQLPWD
MYSQLCONN="mysql -u$MYSQLUSER -h$MYSQLHOST -P$MYSQLPORT --database $MYSQLDB"
#分隔符
PGCSEP=","
PGQSEP='"'
PGESCAPE='\'
# 獲取當(dāng)前時(shí)間戳
TIMEST=`date +%Y%m%d%H%M%S`
# 日志路徑
LOG_M2P_OUT=${LOGDIR}/m2p_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.out
# copy語(yǔ)句的輸出路徑
TMP_COPY_OUT=${TMPDIR}/copy_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.out
# PG裝載生成的模板SQL
TMP_M2P_SQL=${TMPDIR}/m2p_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.sql
# 模板SQL生成的PSQL腳本
TMP_M2P_PSQL=${TMPDIR}/m2p_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.psql
# MYSQL抽取生成的模板SQL
TMP_TMP_SQL=${TMPDIR}/tmp_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.sql
# 模板SQL生成的MYSQL抽取SQL
TMP_TMPO_SQL=${TMPDIR}/tmpo_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.sql
# csv文件路徑
CSVFILEPATH=${CSVDIR}/${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.csv
# PGSQL執(zhí)行日志
PSQL_EXEC_LOG=${LOGDIR}/PG_${MYSQLSCHEMA}_${MYSQLTABLE}_$TIMEST.out
# 判斷是否有表
tablebj=`${MYSQLCONN} -e "select 1 from information_schema.tables where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}' union all select 1 from information_schema.views where table_name = '${MYSQLTABLE}' and table_schema = '${MYSQLSCHEMA}';" | sed '1d'`
if [ -z "$tablebj" ];then
echo `date '+%Y-%m-%d %H:%M:%S'`"|ERROR|表或視圖不存在"
echo `date '+%Y-%m-%d %H:%M:%S'`"|ERROR|程序異常結(jié)束"
exit -1
fi
# 調(diào)試模式處理
if [ ! $TESTMOD = "-1" ];then
MYSQLCOND="${MYSQLCOND} limit ${TESTMOD}"
fi
# PGSCHEMA處理
# 如果PGSCHEMA等于"-1",則默認(rèn)使用mysql同名schema
if [ "$PGSCHEMA" = "-1" ];then
PGSCHEMA=${MYSQLSCHEMA}
fi
# PGTNAME處理
# 如果PGTNAME等于"-1",則默認(rèn)與源表同名
if [ "$PGTNAME" = "-1" ];then
PGTNAME=${MYSQLTABLE}
fi
# 如果PG表名被雙引號(hào)包著,則直接去掉雙引號(hào)
# 如果PG表名沒(méi)被雙引號(hào)包著,則轉(zhuǎn)為小寫
if [[ "$PGTNAME" =~ \"(.*?)\" ]];then
PGTNAME=`echo ${PGTNAME} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'`
else
PGTNAME=${PGTNAME,,}
fi
# 如果PG模式被雙引號(hào)包著,則直接去掉雙引號(hào)
# 如果PG模式?jīng)]被雙引號(hào)包著,則轉(zhuǎn)為小寫
if [[ "$PGSCHEMA" =~ \"(.*?)\" ]];then
PGSCHEMA=`echo ${PGSCHEMA} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'`
else
PGSCHEMA=${PGSCHEMA,,}
fi
# 解析PG連接串
ARRAY1=(${PGDESC//@/ })
USERPWD=${ARRAY1[0]}
ARR1=(${USERPWD//:/ })
PGUSER=${ARR1[0]}
PGPWD=${ARR1[1]}
HPDB=${ARRAY1[1]}
ARR2=(${HPDB//:/ })
PGHOST=${ARR2[0]}
PDB=${ARR2[1]}
ARR3=(${PDB//// })
PGPORT=${ARR3[0]}
PGDB=${ARR3[1]}
export PGPASSWORD="$PGPWD"
PGCONN="psql -d $PGDB -U $PGUSER -h $PGHOST -p $PGPORT"
# 格式化過(guò)濾字段
if [ "$IGNOREBJ" != "-1" ];then
IGNOREBJ_ARR=(${IGNOREBJ//,/ })
for I in "${!IGNOREBJ_ARR[@]}"
do
TMP=${IGNOREBJ_ARR[$I]}
if [[ "$TMP" =~ \"(.*?)\" ]];then
TMP="'`echo ${TMP} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'`'"
else
TMP="'${TMP,,}'"
fi
if [ $I -eq 0 ];then
IGNOREBJ="$TMP"
else
IGNOREBJ="$TMP,${IGNOREBJ}"
fi
done
IGNOREBJ="column_name not in (${IGNOREBJ}) and "
else
IGNOREBJ=" "
fi
# 格式化指定字段
if [ "$SPECIALCOLUMN" != "-1" ];then
SPECIALCOLUMN_ARR=(${SPECIALCOLUMN//,/ })
for I in "${!SPECIALCOLUMN_ARR[@]}"
do
TMP=${SPECIALCOLUMN_ARR[$I]}
if [[ "$TMP" =~ \"(.*?)\" ]];then
TMP="'`echo ${TMP} | sed -e 's/^[\"]*//g' | sed -e 's/[\"]*$//g'`'"
else
TMP="'${TMP,,}'"
fi
if [ $I -eq 0 ];then
SPECIALCOLUMN="$TMP"
else
SPECIALCOLUMN="$TMP,${SPECIALCOLUMN}"
fi
done
SPECIALCOLUMN="column_name in (${SPECIALCOLUMN}) and "
else
SPECIALCOLUMN=" "
fi
# 判斷PGUSER和PGSCHEMA是否一致
# 如果不一致,需要調(diào)用對(duì)方的權(quán)限執(zhí)行psql
if [ "$PGUSER" = "$PGSCHEMA" ];then
USERSCHEMABJ="1"
else
USERSCHEMABJ="-1"
fi
# 創(chuàng)建日志
rm -f ${LOG_M2P_OUT}
touch ${LOG_M2P_OUT}
# 寫入日志
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|導(dǎo)表準(zhǔn)備開(kāi)始" | tee -a ${LOG_M2P_OUT}
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|拼接數(shù)據(jù)抽取腳本開(kāi)始" | tee -a ${LOG_M2P_OUT}
rm -f ${TMP_TMP_SQL}
touch ${TMP_TMP_SQL}
rm -f ${TMP_TMPO_SQL}
touch ${TMP_TMPO_SQL}
#拼接導(dǎo)表語(yǔ)句
cat>${TMP_TMP_SQL}<<eof
select
'select ' dbsql
from
information_schema.tables
where
table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}';
eof
${MYSQLCONN} < ${TMP_TMP_SQL} | sed '1d' >> ${TMP_TMPO_SQL}
cat>${TMP_TMP_SQL}<<eof
select
columnname
from
(
select
concat(' ', case when ordinal_position = 1 then '' else ',' end, '\`', lower(column_name), '\`') columnname
,ordinal_position
from
information_schema.columns
where ${IGNOREBJ} ${SPECIALCOLUMN}
table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
order by
ordinal_position
) a;
eof
${MYSQLCONN} < ${TMP_TMP_SQL} | sed '1d' >> ${TMP_TMPO_SQL}
cat>${TMP_TMP_SQL}<<eof
select
'from \`${MYSQLSCHEMA}\`.\`${MYSQLTABLE}\`'
from
information_schema.tables
where
table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
union all
select
'${MYSQLCOND}'
from
information_schema.tables
where
table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}';
eof
${MYSQLCONN} < ${TMP_TMP_SQL} | sed '1d' >> ${TMP_TMPO_SQL}
#字段特殊處理替換
#COMPENSATETEXT:to_clob(COMPENSATETEXT)、update_time:to_date(to_char(update_time,'yyyymmdd hh24:mi:ss'),'yyyymmdd hh24:mi:ss')
COLUMNNAME=""
ORIGINROW=""
REPLACEROW=""
REPLACEROWNUM=""
OLD_IFS="$IFS"
if [ "$COLUMNDEAL" != "-1" ];then
IFS="、"
COLUMNDEAL_ARRAY=(${COLUMNDEAL})
for I in "${!COLUMNDEAL_ARRAY[@]}"
do
ORIGINROW=""
REPLACEROW=""
TMP=${COLUMNDEAL_ARRAY[$I]}
IFS=":"
TMP_ARRAY=(${TMP})
for J in "${!TMP_ARRAY[@]}"
do
TMP1=${TMP_ARRAY[$J]}
if [ $J -eq 0 ];then
if [[ "$TMP1" =~ \"(.*?)\" ]];then
COLUMNNAME=${TMP1}
ORIGINROW=" ,\`${TMP1}\`"
else
COLUMNNAME="\`${TMP1,,}\`"
ORIGINROW=" ,\`${TMP1,,}\`"
fi
else
REPLACEROW="${REPLACEROW}${TMP1}"
if [ $J -eq $[${#TMP_ARRAY[*]}-1] ];then
REPLACEROW=${REPLACEROW}' AS '${COLUMNNAME}
fi
fi
done
REPLACEROWNUM=`awk "/${ORIGINROW}/{print NR;exit;}" ${TMP_TMPO_SQL}`
if [ ${REPLACEROWNUM} -eq 2 ];then
REPLACEROW=' '$REPLACEROW
else
REPLACEROW=' ,'$REPLACEROW
fi
ORIGINROW=`sed -n "${REPLACEROWNUM}p" ${TMP_TMPO_SQL}`
#雙引號(hào)和斜杠轉(zhuǎn)義
ORIGINROW=${ORIGINROW//\"/\\\"}
REPLACEROW=${REPLACEROW//\"/\\\"}
ORIGINROW=${ORIGINROW//\//\\\/}
REPLACEROW=${REPLACEROW//\//\\\/}
sed -i "s/$ORIGINROW/$REPLACEROW/g" ${TMP_TMPO_SQL}
done
fi
IFS="$OLD_IFS"
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|拼接數(shù)據(jù)抽取腳本完成" | tee -a ${LOG_M2P_OUT}
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|輸出MYSQL腳本" | tee -a ${LOG_M2P_OUT}
cat ${TMP_TMPO_SQL} | tee -a ${LOG_M2P_OUT}
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|拼接PG的數(shù)據(jù)裝載腳本開(kāi)始" | tee -a ${LOG_M2P_OUT}
rm -f ${TMP_M2P_SQL}
touch ${TMP_M2P_SQL}
cat>${TMP_M2P_SQL}<<EOF
select
case when '${USERSCHEMABJ}' = '-1' then 'select pgload.sp_execsql(\$\$' else '--自己導(dǎo)表無(wú)須調(diào)用pgload' end psql
from dual
union all
select 'drop table if exists "${PGSCHEMA}"."m2p_${PGTNAME}";' psql
from information_schema.tables
where table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
union all
select 'create table "${PGSCHEMA}"."m2p_${PGTNAME}" ('
from information_schema.tables
where table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}';
EOF
${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL}
cat>${TMP_M2P_SQL}<<EOF
select columnname
from (
select concat(' ',case when ordinal_position = 1 then '' else ',' end,'"',lower(column_name),'"',' ',
case when data_type = 'int' then data_type
when data_type = 'varchar' then replace (column_type,'varchar(0)','varchar(1)')
when data_type = 'char' then replace(replace(column_type,'char','varchar'),'varchar(0)','varchar(1)')
when data_type = 'date' then 'date'
when data_type = 'datetime' then replace (column_type, data_type, 'timestamp')
when data_type = 'timestamp' then 'timestamp'
when data_type = 'bigint' then 'bigint'
when data_type = 'double' then 'double precision'
when data_type = 'smallint' then 'smallint'
when data_type = 'decimal' then replace (column_type,'unsigned zerofill','')
when data_type = 'longtext' then 'text'
when data_type = 'text' then 'text'
when data_type = 'tinyint' then 'int'
when data_type = 'longblob' then 'bytea'
when data_type = 'blob' then 'bytea'
when data_type = 'float' then 'real'
when data_type = 'tinytext' then 'text'
when data_type = 'mediumtext' then 'text'
when data_type = 'numeric' then 'numeric'
when data_type = 'time' then 'interval'
else 'varchar'
end
) columnname
,ordinal_position
from information_schema.columns
where ${IGNOREBJ} ${SPECIALCOLUMN} table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
order by ordinal_position
) a;
EOF
${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL}
cat>${TMP_M2P_SQL}<<EOF
select PGPRI from (
select
case when primarykey is not null then concat(' ,primary key (',primarykey,'));') else '); ' end PGPRI
from (
select
group_concat(case when column_key = 'PRI' then concat('"',lower(column_name),'"') else null end order by ORDINAL_POSITION separator ',') primarykey
from information_schema.tables a, information_schema.columns b
where a.table_name = b.table_name
and a.table_schema = b.table_schema
and a.table_name = '${MYSQLTABLE}'
and a.table_schema = '${MYSQLSCHEMA}'
group by table_rows
) a
) a
union all
select
concat('comment on table "${PGSCHEMA}"."m2p_${PGTNAME}" is ''',replace(table_comment,'''',''''''),''';')
from information_schema.tables
where table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
and table_comment != ''
and table_comment is not null
union all
select
concat('comment on column "${PGSCHEMA}"."m2p_${PGTNAME}"."',lower(column_name),'" is ''',replace(column_comment,'''',''''''),''';')
from information_schema.columns
where table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
and column_comment != ''
and column_comment is not null
UNION ALL
select
'\$\$,\$\$${PGSCHEMA}\$\$);'
from dual
where
'${USERSCHEMABJ}' = '-1'
union all
select
'select pgload.sp_execsql(\$\$grant insert on table "${PGSCHEMA}"."m2p_${PGTNAME}" to "${PGUSER}";\$\$,\$\$${PGSCHEMA}\$\$);'
from dual
where
'${USERSCHEMABJ}' = '-1';
EOF
${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL}
cat>>${TMP_M2P_PSQL}<<eof
\\copy "${PGSCHEMA}"."m2p_${PGTNAME}" FROM '${CSVFILEPATH}' WITH ( FORMAT csv,HEADER true,DELIMITER '${PGCSEP}',QUOTE '${PGQSEP}',ESCAPE '${PGESCAPE}');
eof
cat>${TMP_M2P_SQL}<<EOF
select
'select pgload.sp_execsql(\$\$' pgsql
from
dual
where
'${USERSCHEMABJ}' = '-1'
and '${CREATEBJ}' = '-1'
union all
select
'insert into "${PGSCHEMA}"."${PGTNAME}" (' insertsql
from
information_schema.tables
where
table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
and '${CREATEBJ}' = '-1';
EOF
${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL}
cat>${TMP_M2P_SQL}<<EOF
select
columnname
from
(
select
concat(' ', case when ordinal_position = 1 then '' else ',' end, '"', lower(column_name), '"') columnname
,ordinal_position
from
information_schema.columns
where ${IGNOREBJ} ${SPECIALCOLUMN}
table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
order by
ordinal_position
) a
where '${CREATEBJ}' = '-1';
EOF
${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL}
cat>${TMP_M2P_SQL}<<EOF
select
')' insertsql
from
information_schema.tables
where
table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
and '${CREATEBJ}' = '-1'
union all
select
'select ' insertsql
from
information_schema.tables
where
table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
and '${CREATEBJ}' = '-1';
EOF
${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL}
cat>${TMP_M2P_SQL}<<EOF
select
columnname
from
(
select
concat(' ', case when ordinal_position = 1 then '' else ',' end, '"', lower(column_name), '"') columnname
,ordinal_position
from
information_schema.columns
where ${IGNOREBJ} ${SPECIALCOLUMN}
table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
order by
ordinal_position
) a
where '${CREATEBJ}' = '-1';
EOF
${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL}
cat>${TMP_M2P_SQL}<<EOF
select
'from "${PGSCHEMA}"."m2p_${PGTNAME}";'
from
information_schema.tables
where
table_name = '${MYSQLTABLE}'
and table_schema = '${MYSQLSCHEMA}'
and '${CREATEBJ}' = '-1'
union all
select
'\$\$,\$\$${PGSCHEMA}\$\$);'
from
dual
where
'${USERSCHEMABJ}' = '-1'
and '${CREATEBJ}' = '-1'
union all
select
'drop table if exists "${PGSCHEMA}"."${PGTNAME}";'
from dual
where '${CREATEBJ}' = '1'
and '${USERSCHEMABJ}' = '1'
union all
select
'select pgload.sp_execsql(\$\$drop table if exists "${PGSCHEMA}"."${PGTNAME}";\$\$,\$\$${PGSCHEMA}\$\$);'
from dual
where '${CREATEBJ}' = '1'
and '${USERSCHEMABJ}' = '-1'
union all
select
'alter table "${PGSCHEMA}"."m2p_${PGTNAME}" rename to "${PGTNAME}";'
from dual
where '${CREATEBJ}' = '1'
and '${USERSCHEMABJ}' = '1'
union all
select
'select pgload.sp_execsql(\$\$alter table "${PGSCHEMA}"."m2p_${PGTNAME}" rename to "${PGTNAME}";\$\$,\$\$${PGSCHEMA}\$\$);'
from dual
where '${CREATEBJ}' = '1'
and '${USERSCHEMABJ}' = '-1'
union all
select
'drop table if exists "${PGSCHEMA}"."m2p_${PGTNAME}";'
from dual
where '${CREATEBJ}' = '-1'
and '${USERSCHEMABJ}' = '1'
union all
select
'select pgload.sp_execsql(\$\$drop table if exists "${PGSCHEMA}"."m2p_${PGTNAME}";\$\$,\$\$${PGSCHEMA}\$\$);'
from dual
where '${CREATEBJ}' = '-1'
and '${USERSCHEMABJ}' = '-1'
union all
select
'grant select on table "${PGSCHEMA}"."${PGTNAME}" to ${GRANTUSER};'
from dual
where '${GRANTUSER}' <> '-1'
and '${USERSCHEMABJ}' = '1'
union all
select
'select pgload.sp_execsql(\$\$grant select on table "${PGSCHEMA}"."${PGTNAME}" to "${GRANTUSER}";\$\$,\$\$${PGSCHEMA}\$\$);'
from dual
where '${GRANTUSER}' <> '-1'
and '${USERSCHEMABJ}' = '-1'
union all
select
'${EXTRAOPT}'
from dual
where '${EXTRAOPT}' <> '-1'
and '${USERSCHEMABJ}' = '1'
union all
select
'select pgload.sp_execsql(\$\$${EXTRAOPT}\$\$,\$\$${PGSCHEMA}\$\$);'
from dual
where '${EXTRAOPT}' <> '-1'
and '${USERSCHEMABJ}' = '-1'
union all
select
'select pgload.replace_to_null(\$\$${PGTNAME}\$\$,\$\$${PGSCHEMA}\$\$);'
from dual
union all
select
'analyze "${PGSCHEMA}"."${PGTNAME}";'
from dual
where '${USERSCHEMABJ}' = '1'
union all
select
'select pgload.sp_execsql(\$\$analyze "${PGSCHEMA}"."${PGTNAME}";\$\$,\$\$${PGSCHEMA}\$\$);'
from dual
where '${USERSCHEMABJ}' = '-1';
EOF
${MYSQLCONN} < ${TMP_M2P_SQL} | sed '1d' >> ${TMP_M2P_PSQL}
#指定字段類型
OLD_IFS="$IFS"
if [ "$COLUMNTYPE" != "-1" ];then
IFS="、"
COLUMNTYPE_ARRAY=(${COLUMNTYPE})
for I in "${!COLUMNTYPE_ARRAY[@]}"
do
ORIGINROW=""
REPLACEROW=""
TMP=${COLUMNTYPE_ARRAY[$I]}
IFS=":"
TMP_ARRAY=(${TMP})
for J in "${!TMP_ARRAY[@]}"
do
TMP1=${TMP_ARRAY[$J]}
if [ $J -eq 0 ];then
if [[ "$TMP1" =~ \"(.*?)\" ]];then
COLUMNNAME=${TMP1}
ORIGINROW=",${TMP1}"
else
COLUMNNAME="\"${TMP1,,}\""
ORIGINROW=",\"${TMP1,,}\""
fi
else
REPLACEROW="${REPLACEROW}${TMP1}"
if [ $J -eq $[${#TMP_ARRAY[*]}-1] ];then
REPLACEROW="${COLUMNNAME} ${REPLACEROW}"
fi
fi
done
REPLACEROWNUM=`awk "/${COLUMNNAME}/{print NR;exit;}" ${TMP_M2P_PSQL}`
if [ $REPLACEROWNUM -eq 4 ];then
REPLACEROW=' '$REPLACEROW
else
REPLACEROW=' ,'$REPLACEROW
fi
ORIGINROW=`sed -n "${REPLACEROWNUM}p" ${TMP_M2P_PSQL}`
#雙引號(hào)和斜杠轉(zhuǎn)義
ORIGINROW=${ORIGINROW//\"/\\\"}
REPLACEROW=${REPLACEROW//\"/\\\"}
ORIGINROW=${ORIGINROW//\//\\\/}
REPLACEROW=${REPLACEROW//\//\\\/}
#echo $ORIGINROW
#echo $REPLACEROW
#echo $REPLACEROWNUM
sed -i "s/$ORIGINROW/$REPLACEROW/g" ${TMP_M2P_PSQL}
done
fi
IFS="$OLD_IFS"
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|拼接PG的數(shù)據(jù)裝載腳本完成" | tee -a ${LOG_M2P_OUT}
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|輸出PSQL腳本" | tee -a ${LOG_M2P_OUT}
cat ${TMP_M2P_PSQL} | tee -a ${LOG_M2P_OUT}
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|導(dǎo)表準(zhǔn)備完成" | tee -a ${LOG_M2P_OUT}
#開(kāi)始抽取數(shù)據(jù)
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|從MYSQL抽取數(shù)據(jù)開(kāi)始" | tee -a ${LOG_M2P_OUT}
rm -f ${CSVFILEPATH}
touch ${CSVFILEPATH}
extractmysqldata(){
# ${MYSQLCONN} < ${TMP_TMPO_SQL} | sed "s/\x00//g;s/\\\n/\n/g;s/${PGQSEP}/\\""${PGESCAPE}${PGQSEP}/g;s/\t/${PGQSEP}${PGCSEP}${PGQSEP}/g;s/^/${PGQSEP}&/g;s/$/&${PGQSEP}/g;s/${PGQSEP}NULL${PGQSEP}//g;s/${PGQSEP}${PGQSEP}//g;s/\\\t/\t/g" > ${CSVFILEPATH}
${MYSQLCONN} < ${TMP_TMPO_SQL} | sed "s/\x00//g;s/\\\n/\n/g;s/${PGQSEP}/\\""${PGESCAPE}${PGQSEP}/g;s/\t/${PGQSEP}${PGCSEP}${PGQSEP}/g;s/^/${PGQSEP}&/g;s/$/&${PGQSEP}/g;s/${PGQSEP}NULL${PGQSEP}//g;s/\\\t/\t/g" > ${CSVFILEPATH}
}
extractmysqldata &
EXTRACTMYSQLDATA_PID=$(jobs -p | tail -1)
progress "${EXTRACTMYSQLDATA_PID}" &
EXTRACTMYSQLDATA_PPID=$(jobs -p | tail -1)
wait "${EXTRACTMYSQLDATA_PID}"
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|從MYSQL抽取數(shù)據(jù)完成" | tee -a ${LOG_M2P_OUT}
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|在PG中裝載數(shù)據(jù)開(kāi)始" | tee -a ${LOG_M2P_OUT}
loadmysqldata(){
#執(zhí)行psql建表腳本
${PGCONN} >>${PSQL_EXEC_LOG} 2>&1 <<PSQL
\set ECHO all
\timing on
\! echo `date "+%Y %m %d %H:%M:%S"`
\i ${TMP_M2P_PSQL}
\! echo `date "+%Y %m %d %H:%M:%S"`
PSQL
}
loadmysqldata &
LOADMYSQLDATA_PID=$(jobs -p | tail -1)
progress "${LOADMYSQLDATA_PID}" &
LOADMYSQLDATA_PPID=$(jobs -p | tail -1)
wait "${LOADMYSQLDATA_PID}"
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|在PG中裝載數(shù)據(jù)完成" | tee -a ${LOG_M2P_OUT}
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|輸出裝載日志" | tee -a ${LOG_M2P_OUT}
cat ${PSQL_EXEC_LOG} | tee -a ${LOG_M2P_OUT}
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|導(dǎo)表日志:${LOG_M2P_OUT}" | tee -a ${LOG_M2P_OUT}
#獲取PG裝載的記錄數(shù)
ERRORBJ=`cat ${LOG_M2P_OUT} | grep '^psql:' | grep -E 'FATAL:|ERROR:' | wc -l`
if [ ${ERRORBJ} -ne 0 ];then
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|數(shù)據(jù)導(dǎo)入失敗" | tee -a ${LOG_M2P_OUT}
exit -1
else
PGNUM=`awk '{if($1=="COPY") print $2}' ${PSQL_EXEC_LOG}`
echo `date '+%Y-%m-%d %H:%M:%S'`"|INFO|數(shù)據(jù)導(dǎo)入成功:${PGNUM}" | tee -a ${LOG_M2P_OUT}
fi
測(cè)試
在mysql中建表
create table tmp (
id int primary key comment '主鍵'
,name varchar(50) comment '姓名'
);
insert into tmp values (1,'張三');
insert into tmp values (2,'李四');
insert into tmp values (3,'王五');
insert into tmp values (4,'你好,'' "
我不好 1111');

導(dǎo)表測(cè)試
[root@yzcdb-2 ~]# su - pgload
Last login: Tue Mar 7 08:56:24 CST 2023 on pts/0
[pgload@yzcdb-2 ~]$ dbmysql2pgmysqlcopy etl.tmp -o yuzhenchao
2023-03-07 14:02:12|INFO|導(dǎo)表準(zhǔn)備開(kāi)始
2023-03-07 14:02:12|INFO|拼接數(shù)據(jù)抽取腳本開(kāi)始
2023-03-07 14:02:12|INFO|拼接數(shù)據(jù)抽取腳本完成
2023-03-07 14:02:12|INFO|輸出MYSQL腳本
select
`id`
,`name`
from `etl`.`tmp`
WHERE 1 = 1
2023-03-07 14:02:12|INFO|拼接PG的數(shù)據(jù)裝載腳本開(kāi)始
2023-03-07 14:02:12|INFO|拼接PG的數(shù)據(jù)裝載腳本完成
2023-03-07 14:02:12|INFO|輸出PSQL腳本
select pgload.sp_execsql($$
drop table if exists "yuzhenchao"."m2p_tmp";
create table "yuzhenchao"."m2p_tmp" (
"id" int
,"name" varchar(50)
,primary key ("id"));
comment on column "yuzhenchao"."m2p_tmp"."id" is '主鍵';
comment on column "yuzhenchao"."m2p_tmp"."name" is '姓名';
$$,$$yuzhenchao$$);
select pgload.sp_execsql($$grant insert on table "yuzhenchao"."m2p_tmp" to "pgload";$$,$$yuzhenchao$$);
\copy "yuzhenchao"."m2p_tmp" FROM '/data/etl/mysql2pg/csv/etl_tmp_20230307140212.csv' WITH ( FORMAT csv,HEADER true,DELIMITER ',',QUOTE '"',ESCAPE '\');
select pgload.sp_execsql($$drop table if exists "yuzhenchao"."tmp";$$,$$yuzhenchao$$);
select pgload.sp_execsql($$alter table "yuzhenchao"."m2p_tmp" rename to "tmp";$$,$$yuzhenchao$$);
select pgload.sp_execsql($$analyze "yuzhenchao"."tmp";$$,$$yuzhenchao$$);
2023-03-07 14:02:12|INFO|導(dǎo)表準(zhǔn)備完成
2023-03-07 14:02:12|INFO|從MYSQL抽取數(shù)據(jù)開(kāi)始
2023-03-07 14:02:12|WAIT|1
2023-03-07 14:02:12|INFO|從MYSQL抽取數(shù)據(jù)完成
2023-03-07 14:02:12|INFO|在PG中裝載數(shù)據(jù)開(kāi)始
2023-03-07 14:02:13|WAIT|1
2023-03-07 14:02:13|INFO|在PG中裝載數(shù)據(jù)完成
2023-03-07 14:02:13|INFO|輸出裝載日志
\timing on
Timing is on.
\! echo 2023 03 07 14:02:12
2023 03 07 14:02:12
\i /data/etl/mysql2pg/tmp/m2p_etl_tmp_20230307140212.psql
select pgload.sp_execsql($$
drop table if exists "yuzhenchao"."m2p_tmp";
create table "yuzhenchao"."m2p_tmp" (
"id" int
,"name" varchar(50)
,primary key ("id"));
comment on column "yuzhenchao"."m2p_tmp"."id" is '主鍵';
comment on column "yuzhenchao"."m2p_tmp"."name" is '姓名';
$$,$$yuzhenchao$$);
psql:/data/etl/mysql2pg/tmp/m2p_etl_tmp_20230307140212.psql:9: NOTICE: table "m2p_tmp" does not exist, skipping
sp_execsql
------------
(1 row)
Time: 17.889 ms
select pgload.sp_execsql($$grant insert on table "yuzhenchao"."m2p_tmp" to "pgload";$$,$$yuzhenchao$$);
sp_execsql
------------
(1 row)
Time: 1.558 ms
\copy "yuzhenchao"."m2p_tmp" FROM '/data/etl/mysql2pg/csv/etl_tmp_20230307140212.csv' WITH ( FORMAT csv,HEADER true,DELIMITER ',',QUOTE '"',ESCAPE '\');
COPY 4
Time: 32.051 ms
select pgload.sp_execsql($$drop table if exists "yuzhenchao"."tmp";$$,$$yuzhenchao$$);
sp_execsql
------------
(1 row)
Time: 3.049 ms
select pgload.sp_execsql($$alter table "yuzhenchao"."m2p_tmp" rename to "tmp";$$,$$yuzhenchao$$);
sp_execsql
------------
(1 row)
Time: 1.687 ms
select pgload.sp_execsql($$analyze "yuzhenchao"."tmp";$$,$$yuzhenchao$$);
sp_execsql
------------
(1 row)
Time: 1.848 ms
\! echo 2023 03 07 14:02:13
2023 03 07 14:02:13
2023-03-07 14:02:13|INFO|導(dǎo)表日志:/data/etl/mysql2pg/log/m2p_etl_tmp_20230307140212.out
2023-03-07 14:02:13|INFO|數(shù)據(jù)導(dǎo)入成功:4
Killed
查看pg中的表

到此這篇關(guān)于Mysql遷移Postgresql的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Mysql遷移Postgresql內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MySQL導(dǎo)入導(dǎo)出.sql文件及常用命令小結(jié)
在MySQL Qurey Brower中直接導(dǎo)入*.sql腳本,是不能一次執(zhí)行多條sql命令的,下面為大家介紹下MySQL導(dǎo)入導(dǎo)出.sql文件及常用命令2014-08-08
mysql存儲(chǔ)過(guò)程中使用游標(biāo)的實(shí)例
使用MYSQL存儲(chǔ)過(guò)程,可以實(shí)現(xiàn)諸多的功能,下面將為您介紹一個(gè)MYSQL存儲(chǔ)過(guò)程中使用游標(biāo)的實(shí)例2014-01-01
mysql和oracle默認(rèn)排序的方法 - 不指定order by
這篇文章主要介紹了mysql和oracle默認(rèn)排序的方法 - 不指定order by。具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07
用命令創(chuàng)建MySQL數(shù)據(jù)庫(kù)(de1)的方法
下面小編就為大家?guī)?lái)一篇用命令創(chuàng)建MySQL數(shù)據(jù)庫(kù)(de1)的方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-03-03
Mysql報(bào)錯(cuò)Duplicate?entry?'值'?for?key?'字段名&
今天在使用數(shù)據(jù)庫(kù)的過(guò)程中,發(fā)現(xiàn)一直報(bào)Duplicate?entry?'值'?for?key?'字段名'的錯(cuò)誤,所以下面這篇文章主要給大家介紹了關(guān)于Mysql報(bào)錯(cuò)Duplicate?entry?'值'?for?key?'字段名'的解決方法,需要的朋友可以參考下2023-04-04

