python從Hadoop?HDFS導出數(shù)據(jù)到關系數(shù)據(jù)庫
python從HDFS導出到關系數(shù)據(jù)庫(如MySQL、Oracle、PostgreSQL)
一整套從Hadoop HDFS中導出數(shù)據(jù)并通過DataX工具導入到關系數(shù)據(jù)庫的過程。
操作步驟
1. 定義參數(shù)和變量
sql=$1 # 導出數(shù)據(jù)的SQL語句 s_tablename=$2 # 源表名 ds_name=$3 # 目標數(shù)據(jù)庫名稱 t_tablename=$4 # 目標表名 temptable="h2o_"`date +%s%N | md5sum | head -c 16` # 生成一個基于時間戳的臨時表名 filename=${s_tablename}_${temptable} # 文件名 path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/" # HDFS路徑 local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp" # 本地腳本路徑 flag=$5 # 標志,用來確定是否TRUNCATE表
2. 構造SQL查詢并提交給Hive
echo "$sql" sql1=`echo "$sql"|cut -d ";" -f2` # 截取分號后的部分 sql0=`echo "$sql"|cut -d ";" -f1` # 截取分號前的部分 sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1" # 構建最終的SQL echo "$sql0" kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181, ... ,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()"
3. 獲取目標數(shù)據(jù)庫連接信息并解析結果為變量
從PostgreSQL數(shù)據(jù)庫中獲取目標數(shù)據(jù)庫的連接信息,并解析結果為變量。
re=$(PGPASSWORD=... psql -h 10.251.110.104 -p 18921 -U dacp -d dacp -t <<EOF SELECT CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url' WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url ,ds_acct ,ds_auth ,CASE WHEN ds_type = 'mysql' THEN 'mysqlwriter' WHEN ds_type = 'oracle' THEN 'oraclewriter' WHEN ds_type = 'pg' THEN 'postgresqlwriter' END ds_type FROM dacp_dev.dacp_meta_datasource WHERE ds_type IN ('mysql', 'oracle', 'pg') AND upper(trim(ds_name)) = upper(trim('$ds_name')) EOF ) eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}')
4. 獲取目標數(shù)據(jù)庫密碼
通過執(zhí)行Java程序解密數(shù)據(jù)庫密碼:
pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar`
5. 預處理SQL語句
根據(jù)標志變量flag
,確定是否執(zhí)行TRUNCATE語句:
preSQL="select * from $t_tablename where 1=-1" if [ "$flag" = "T" ];then preSQL="truncate table $t_tablename" fi echo "preSQL=$preSQL"
6. 數(shù)據(jù)導出并導入目標數(shù)據(jù)庫
使用datax
執(zhí)行從HDFS導入到關系數(shù)據(jù)庫的任務:
python $local_path/datax/bin/datax.py -p "-Dpath=$path -Dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass="$pw" -Drdb_jdbc="$jdbc_url" -Drdb_table=$t_tablename -DpreSql="$preSQL"" $local_path/hdfs_to_rdb.json
7. Python代碼詳解
此外,你還展示了大量的Python代碼用于處理數(shù)據(jù)轉換和傳輸。重點如下:
1. 初始化設置和依賴
加載必要的包,并初始化變量。
import time import datetime import os import threadpool import commands import calendar import random import pymssql import pymysql import cx_Oracle import psycopg2 import socket from pyhdfs import HdfsClient from hashlib import md5 # 其他初始化設置
2. 連接數(shù)據(jù)庫并執(zhí)行SQL
定義了連接數(shù)據(jù)庫并執(zhí)行SQL的函數(shù):
def connect_database_to_select(conn, sql): cursor = conn.cursor() try: cursor.execute(sql) result = cursor.fetchall() conn.commit() return result except Exception as e: print('SQL執(zhí)行錯誤:{},執(zhí)行SQL:{}'.format(str(e), sql)) sys.exit(2) finally: cursor.close() def connect_database_to_commit(exe_type, conn, sql, insert_list): cursor = conn.cursor() try: if exe_type.lower() in ('delete', 'insert'): cursor.execute(sql) conn.commit() elif exe_type.lower() == 'insertmany': cursor.executemany(sql, insert_list) conn.commit() except Exception as e: print('SQL執(zhí)行錯誤:{},執(zhí)行SQL:{}'.format(str(e), sql)) sys.exit(2) finally: cursor.close()
3. 數(shù)據(jù)導出處理
執(zhí)行數(shù)據(jù)導出并提交給Hive:
def produce_exe_data(sql, s_tablename): global local_path_name local_path_01 = local_path_list[random.randrange(len(local_path_list))] + '/dataos_exp' local_path_name = "h2o_{0}_{1}".format(s_tablename, get_md5_str()).lower() local_path = local_path_01 + '/' + local_path_name if os.path.exists(local_path): cmd = 'rm -rf {}'.format(local_path) exe_system_cmd(cmd) os.mkdir(local_path) hdfs_path = "hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name) sql = sql.strip().strip(';') sql_list = sql.split(';') hive_conn = hive_connect() compress_sql = 'set hive.exec.compress.output=false' connect_database_to_commit('insert', hive_conn, compress_sql, '') for i in range(len(sql_list)): sql_str = sql_list[i] if i == len(sql_list)-1: # 如果是最后一條SQL,則執(zhí)行insert overwrite directory sql_str='''INSERT OVERWRITE DIRECTORY '{0}' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' COLLECTION ITEMS TERMINATED BY '\n' MAP KEYS TERMINATED BY ':' {1} '''.format(hdfs_path, sql_str) connect_database_to_commit('insert', hive_conn, sql_str, '') if hive_conn: hive_conn.close() cmd = ''' hdfs dfs -get {}/* {} '''.format(hdfs_path, local_path) exe_system_cmd(cmd) return local_path, hdfs_path
4. 多線程數(shù)據(jù)傳輸
利用多線程加速數(shù)據(jù)傳輸過程:
def thread_exe_exchange_data(g_tablename, flag, local_path, hdfs_path): global rdb_conn rdb_conn = get_rdb_database_conn() if flag.upper() == 'T': presql = 'truncate table {}'.format(g_tablename) connect_database_to_commit('insert', rdb_conn, presql, '') if ds_type.lower() == 'oracle': global oracle_table_field oracle_table_field = get_oracle_table_fields() localtime = str(time.strftime("%Y%m%d", time.localtime())) ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime) if not os.path.exists(ora_dir): os.mkdir(ora_dir) file_lists = os.listdir(local_path) global exp_num_list global log_list global exception_list exp_num_list = [] log_list = [] exception_list = [] thread_list = [] for file_name in file_lists: thread_list.append(local_path + '/' + file_name) pool = threadpool.ThreadPool(5) requests = threadpool.makeRequests(exchange_data, thread_list) [pool.putRequest(req) for req in requests] pool.wait() if exception_list: delete_local_path(local_path, hdfs_path) sys.exit(2) print('數(shù)據(jù)導出完成,導出數(shù)據(jù)總量為:{}'.format(sum(exp_num_list)))
完整python腳本
#!/bin/bash sql=$1 #導出數(shù)據(jù)sql s_tablename=$2 #源表 ds_name=$3 #目標庫 t_tablename=$4 #目標表 temptable="h2o_"`date +%s%N | md5sum | head -c 16` #構造一個時間戳 filename=${s_tablename}_${temptable} #文件名 path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/" local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp" flag=$5 #t TRUNCATE #hadoop fs -mkdir $path # 參數(shù)sql0為 待執(zhí)行SQL echo "$sql" sql1=`echo "$sql"|cut -d ";" -f2` sql0=`echo "$sql"|cut -d ";" -f1` sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1" echo "$sql0" # 向Hive提交HQL kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM #beeline <<EOF #!connect jdbc:hive2://devdataosambari:2181,devdataosnn1:2181,devdataosnn2:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2 #$sql0 beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181,prdnn2.yxbdprd.sc.ctc.com:2181,prdrm1.yxbdprd.sc.ctc.com:2181,prddb2.yxbdprd.sc.ctc.com:2181,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()" # 獲取目標數(shù)據(jù)源地址 #eval $(mysql -h 10.251.88.71 -udacp -pdacp123456 dacp_dev -e "select case when ds_type = 'mysql' then concat('jdbc:mysql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName')),'?characterEncoding=UTF-8') #when ds_type = 'oracle' then concat('jdbc:oracle:thin:@', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName'))) #when ds_type = 'pg' then concat('jdbc:postgresql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName'))) end jdbc_url, #ds_acct, ds_auth, case when ds_type = 'mysql' then 'mysqlwriter' when ds_type = 'oracle' then 'oraclewriter' when ds_type = 'pg' then 'postgresqlwriter' end ds_type #from dacp_meta_datasource #where ds_type in ('mysql','oracle','pg') #and ds_name = '$ds_name'" | awk 'NR== 2 {printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$2,$3,$4)}') re=$(PGPASSWORD=jxFgCKv9GJw2ohS3 psql -h 10.251.110.104 -p 18921 -U dacp -d dacp -t <<EOF SELECT CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url' WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url ,ds_acct ,ds_auth ,CASE WHEN ds_type = 'mysql' THEN 'mysqlwriter' WHEN ds_type = 'oracle' THEN 'oraclewriter' WHEN ds_type = 'pg' THEN 'postgresqlwriter' END ds_type FROM dacp_dev.dacp_meta_datasource WHERE ds_type IN ('mysql', 'oracle', 'pg') AND upper(trim(ds_name)) = upper(trim('$ds_name')) EOF ) eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}') #eval $(java -jar /data01/etl/scripts/exec_aes.jar $ds_auth | awk ' {printf("pw=%s;",$1)}') #pw=`java -jar $local_path/exec_aes.jar $ds_auth` pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar` preSQL="select * from $t_tablename where 1=-1" if [ "$flag" = "T" ];then preSQL="truncate table $t_tablename" fi echo "preSQL=$preSQL" python $local_path/datax/bin/datax.py -p "-Dpath=$path -Dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass=\"$pw\" -Drdb_jdbc=\"$jdbc_url\" -Drdb_table=$t_tablename -DpreSql=\"$preSQL\"" $local_path/hdfs_to_rdb.json # -*- coding:utf-8 -*- import time import datetime import os import threadpool import commands import calendar import random import pymssql import pymysql import cx_Oracle import psycopg2 import socket from pyhdfs import HdfsClient from hashlib import md5 import sys reload(sys) sys.setdefaultencoding('utf8') sys.path.append('/data02/dcadmin/scripts/common') from connect_postgresql import postgresql_connect from connect_hive import hive_connect pg_conn_str='dataos_71_pg_dev' # 本地磁盤目錄,文件隨機選擇一個目錄 local_path_list=['/data01','/data02','/data03','/data04','/data05'] def close_datadb_conn(): if rdb_conn: rdb_conn.close() def connect_database_to_select(conn,sql): # print('\r\n執(zhí)行SQL:{}'.format(sql)) cursor = conn.cursor() try: cursor.execute(sql) #cursor.execute(sql.decode('utf-8').encode('gbk')) result = cursor.fetchall() conn.commit() return result except Exception as e: print('SQL執(zhí)行錯誤:{},執(zhí)行SQL:{}'.format(str(e),sql)) sys.exit(2) finally: cursor.close() def connect_database_to_commit(exe_type,conn,sql,insert_list): # print('\r\n執(zhí)行SQL:{}'.format(sql)) cursor = conn.cursor() try: if exe_type.lower() in ('delete','insert'): cursor.execute(sql) conn.commit() print('執(zhí)行SQL:{}'.format(sql)) elif exe_type.lower()=='insertmany': cursor.executemany(sql, insert_list) conn.commit() except Exception as e: print('SQL執(zhí)行錯誤c:{},執(zhí)行SQL:{}'.format(str(e),sql)) print(sql) sys.exit(2) finally: cursor.close() # 執(zhí)行系統(tǒng)命令 def exe_system_cmd(cmd): status,output=commands.getstatusoutput(cmd) if status!=0: print('命令{}:執(zhí)行失敗,請檢查!'.format(cmd)) print('失敗日志:{}'.format(output)) sys.exit(2) return output # 返回MD5串 def get_md5_str(): # 時間戳 ts = calendar.timegm(time.gmtime()) md5_str=md5(str(ts).encode(encoding='utf-8')).hexdigest() return md5_str # 判斷輸入?yún)?shù) def judge_input_parameters_num(): if len(sys.argv)!=6: print('參數(shù)有問題,請檢查!') print(sys.argv) sys.exit(2) else: sql =sys.argv[1] # 導出數(shù)據(jù)sql s_tablename =sys.argv[2] # 源表名 ds_name =sys.argv[3] # 目標庫 g_tablename =sys.argv[4] # 目標表 flag =sys.argv[5] # A:append,T:truncate return sql,s_tablename,ds_name,g_tablename,flag # 執(zhí)行SQL語句,生成HDFS文件 def produce_exe_data(sql,s_tablename): global local_path_name # 1、創(chuàng)建本地文件夾 # 隨機選擇一個磁盤目錄:'/data01','/data02','/data03','/data04','/data05' local_path_01 = local_path_list[random.randrange(len(local_path_list))]+'/dataos_exp' # /data01/dataos_exp local_path_name="h2o_{0}_{1}".format(s_tablename,get_md5_str()).lower() # local_path_name='h2o_app_hub_resource_value_level_d_e6963bad13299e939a3a4cc2b2a26a47' local_path=local_path_01+'/'+local_path_name if os.path.exists(local_path): cmd='rm -rf {}'.format(local_path) exe_system_cmd(cmd) os.mkdir(local_path) # 創(chuàng)建hdfs文件夾 hdfs_path="hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name) # 處理SQL,先去除兩邊的空格,再去除兩邊的分號 sql=sql.strip().strip(';') sql_list=sql.split(';') # 依次執(zhí)行切分的SQL hive_conn=hive_connect() # 連接生產(chǎn)HIVE compress_sql='set hive.exec.compress.output=false' print('執(zhí)行SQL:{}'.format(compress_sql)) connect_database_to_commit('insert',hive_conn,compress_sql,'') for i in range(len(sql_list)): sql_str=sql_list[i] if i==len(sql_list)-1: # 如果是最后一條SQL,則執(zhí)行insert overwrite directory sql_str='''INSERT OVERWRITE DIRECTORY '{0}' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\u0001' COLLECTION ITEMS TERMINATED BY '\\n' MAP KEYS TERMINATED BY ':' {1} '''.format(hdfs_path,sql_str) print('執(zhí)行SQL:{}'.format(sql_str)) connect_database_to_commit('insert',hive_conn,sql_str,'') # 關閉HIVE連接 if hive_conn: hive_conn.close() # 將hdfs文件從hdfs_path路徑get到local_path下 cmd=''' hdfs dfs -get {}/* {} '''.format(hdfs_path,local_path) exe_system_cmd(cmd) print('文件GET成功,當前主機:{},數(shù)據(jù)臨時文件夾:{}'.format(socket.gethostname(),local_path)) return local_path,hdfs_path # 獲取目標端的連接信息 def get_rdb_conn_msg(ds_name): global ds_type global ds_acct global ds_auth global host global port global database global jdbc_url sql=''' SELECT ds_name ,ds_type ,ds_acct ,ds_auth ,split_part(ds_inst_loc,':',1) as host ,case when split_part(ds_inst_loc,':',2)='' and ds_type='oracle' then '1521' else split_part(ds_inst_loc,':',2) end as port ,case when lower(ds_type)='oracle' then split_part(replace(replace(replace(ds_conf::json->>'url','jdbc:oracle:thin:@',''),':1521',''),'/',':'),':',2) else ds_conf::json->>'physicalDbName' end as database ,CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') WHEN ds_type = 'oracle' THEN ds_conf::json ->>'url' WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END as jdbc_url FROM dacp_dev.dacp_meta_datasource WHERE ds_type IN ('mysql', 'oracle', 'pg') AND upper(trim(ds_name)) = upper(trim('{}')) '''.format(ds_name) pg_conn=postgresql_connect(pg_conn_str) results=connect_database_to_select(pg_conn,sql) # print(results) if not results: print('未查詢到數(shù)據(jù)庫連接信息,請檢查,DS_NAME:{}'.format(ds_name)) sys.exit(2) # 關閉數(shù)據(jù)庫連接 if pg_conn: pg_conn.close() # 解密密碼 cmd='''java -Dpwd='{0}' -jar /data02/dcadmin/scripts/common/AesCipher-1.0.jar'''.format(results[0][3]) pw=exe_system_cmd(cmd).replace('\r','').replace('\n','') ds_type = results[0][1]ds_acct = results[0][2] ds_auth = pw host = results[0][4] port = int(results[0][5]) database= results[0][6] jdbc_url= results[0][7] # 判斷連接的數(shù)據(jù)庫類型,并返回數(shù)據(jù)庫連接conn def get_rdb_database_conn(): dbms_conn=None try: if ds_type.upper()=='SQLSERVER': dbms_conn = pymssql.connect(host=host , user=ds_acct, password=ds_auth, port=port, database=database, charset='utf8') elif ds_type.upper()=='MYSQL': dbms_conn = pymysql.connect(host=host , user=ds_acct, passwd=ds_auth , port=port, database=database, charset='utf8', local_infile=True) elif ds_type.upper()=='ORACLE': listener = '{0}:{1}/{2}'.format(host,port,database) print('listener:{}'.format(listener)) dbms_conn = cx_Oracle.connect(ds_acct,ds_auth,listener,encoding='utf-8') elif ds_type.upper() in ('POSTGRESQL','PG'): dbms_conn = psycopg2.connect(host=host , user=ds_acct, password=ds_auth , port=port, database=database, client_encoding='utf8') else: print("未知源端數(shù)據(jù)庫類型{}~~~~~,請檢查!".format(ds_type.upper())) sys.exit(2) except Exception as e: print('{0},{1}數(shù)據(jù)庫連接失敗,請檢查~~~~~~!'.format(ds_type,ds_name)) print('報錯日志:{}'.format(e)) print(host) print(ds_acct) print(ds_auth) print(port) print(database) sys.exit(2) return dbms_conn def thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path): global rdb_conn rdb_conn=get_rdb_database_conn() # 執(zhí)行預處理SQL if flag.upper()=='T': presql='truncate table {}'.format(g_tablename) print('執(zhí)行SQL:{}'.format(presql)) connect_database_to_commit('insert',rdb_conn,presql,'') # 獲取Oracle表結構 if ds_type.lower() in ('oracle'): global oracle_table_field # oracle 表結構 oracle_table_field=get_oracle_table_fields() # 創(chuàng)建ctl,bad,log存放目錄 global ora_dir localtime = str(time.strftime("%Y%m%d", time.localtime())) ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime) if not os.path.exists(ora_dir): os.mkdir(ora_dir) # 文件列表 file_lists=os.listdir(local_path) # 多線程導數(shù) global exp_num_list # 存儲導數(shù)數(shù)量 global log_list # 存儲多線程的日志信息 global exception_list # 存儲多線程異常信息 exp_num_list =[] log_list =[] exception_list=[] thread_list=[] # 存儲多線程任務 for file_name in file_lists: thread_list.append(local_path+'/'+file_name) # 創(chuàng)建線程池 pool=threadpool.ThreadPool(5) # 存放任務列表 requests = threadpool.makeRequests(exchange_data,thread_list) [pool.putRequest(req) for req in requests] pool.wait() # 處理異常 if exception_list: # 導數(shù)出現(xiàn)異常,刪除文件 delete_local_path(local_path,hdfs_path) print('導數(shù)失敗,異常日志信息如下:') for except_msg in exception_list: print(except_msg) sys.exit(2) # 打印多線程日志 # log_list.sort() # for log in log_list: # print(log) # 打印導出結果 print('數(shù)據(jù)導出完成,導出數(shù)據(jù)總量為:{}'.format(sum(exp_num_list))) # 獲取Oracle表結構 def get_oracle_table_fields(): sql = ''' SELECT COLUMN_NAME || SUFFIX AS AA FROM (SELECT A.COLUMN_NAME ,A.COLUMN_ID ,CASE WHEN UPPER(A.DATA_TYPE) LIKE '%DATE%' THEN ' DATE "yyyy-mm-dd hh24:mi:ss"' WHEN UPPER(A.DATA_TYPE) LIKE '%TIMESTAMP%' THEN ' DATE "yyyy-mm-dd hh24:mi:ss.ff"' WHEN UPPER(A.DATA_TYPE) LIKE '%VARCHAR%' THEN ' CHAR(3000)' ELSE '' END AS SUFFIX FROM ALL_TAB_COLUMNS A WHERE UPPER(A.OWNER||'.'||A.TABLE_NAME) = UPPER(TRIM('{0}')) ORDER BY A.COLUMN_ID) ''' if '.' in g_tablename: sql=sql.format(g_tablename) else: sql=sql.format(database+'.'+g_tablename) oracle_table_fields=connect_database_to_select(rdb_conn,sql) if not oracle_table_fields: print('未查詢到表結構,表名:{}'.format(g_tablename)) sys.exit(2) oracle_table_field = ",\n".join([str(list[0]) for list in oracle_table_fields]) return oracle_table_field # 執(zhí)行單個導出任務 def exchange_data(file_path): try: output='' # 執(zhí)行導數(shù)任務 if ds_type.lower() in ('pg','postgresql','telpg','antdb'): cmd='''psql "port={0} host={1} user={2} dbname={3} password={4} " -c "\copy {5} from '{6}' DELIMITER AS E'\u0001' " ''' cmd=cmd.format(port,host,ds_acct,database,ds_auth,g_tablename,file_path) status,output=commands.getstatusoutput(cmd) if status!=0: exception_list.append('命令{}:執(zhí)行失敗,請檢查!失敗日志:{}'.format(cmd,output)) elif ds_type.lower() in ('mysql','teldb'): mysql_conn = pymysql.connect(host=host , user=ds_acct, passwd=ds_auth , port=port, database=database, charset='utf8', local_infile=True) mysql_cursor=mysql_conn.cursor() sql='SET NAMES UTF8' mysql_cursor.execute(sql) sql='''load data local infile '{}' into table {} fields terminated by X'01' lines terminated by '\\n' '''.format(file_path,g_tablename) #print(sql) output=mysql_cursor.execute(sql) mysql_conn.commit() mysql_conn.close() # cmd='''mysql -h {} -P {} -u {} -p{} -D {} -e "SET NAMES UTF8;load data local infile '{}' into table {} fields terminated by X'01' lines terminated by '\\n'" ''' # cmd=cmd.format(host,port,ds_acct,ds_auth,database,file_path,g_tablename) elif ds_type.lower() in ('oracle'): tns='''\'{}/"{}"\'@{}:1521/{}'''.format(ds_acct,ds_auth,host,database) ora_file_name=file_path.replace(local_path+'/','') ora_file_path=ora_dir+'/'+local_path_name+'_'+ora_file_name control_file = ora_file_path+".ctl" log_file = ora_file_path+".log" bad_file = ora_file_path+".bad" dis_file = ora_file_path+".dis" content =''' UNRECOVERABLE LOAD DATA CHARACTERSET AL32UTF8 APPEND INTO TABLE {0} FIELDS TERMINATED BY x'01' TRAILING NULLCOLS ({1}) '''.format(g_tablename, oracle_table_field) # 如果控制文件存在,則先刪除 if os.path.exists(control_file): cmd='rm -rf {}'.format(control_file) exe_system_cmd(cmd) # 再創(chuàng)建控制文件 with open(control_file, "w") as file: file.write(content) cmd='''export ORACLE_HOME=/data03/apps/db_1;export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$LD_LIBRARY_PATH;cat {0} | /data03/apps/db_1/bin/sqlldr userid={1} control={2} data=\\"-\\" log={3} bad={4} discard={5} errors=0 direct=true parallel=true multithreading=true columnarrayrows=100000 STREAMSIZE=20971520 readsize=20971520 bindsize=20971520 date_cache=0 ''' cmd=cmd.format(file_path, tns, control_file, log_file, bad_file, dis_file) status,output=commands.getstatusoutput(cmd) if status!=0: exception_list.append('命令{}:執(zhí)行失敗,請檢查!失敗日志:{}'.format(cmd,output)) else: exception_list.append('目標端數(shù)據(jù)庫類型為:{},此類型暫未支持!'.format(db_type.lower())) # 計算導出行數(shù) if ds_type.lower() in ('pg','postgresql','telpg','antdb'): file_row_num=int(output.split('COPY ')[1].strip()) exp_num_list.append(file_row_num) elif ds_type.lower() in ('oracle'): try: output=output.decode('gbk') except: output=output file_row_num=int(output.split('邏輯記錄計數(shù) ')[1].replace('。','').strip()) exp_num_list.append(file_row_num) elif ds_type.lower() in ('mysql','teldb'): exp_num_list.append(output) # 插入日志 log_list.append(output) except Exception as e: exception_list.append(e) def delete_local_path(local_path,hdfs_path): cmd='rm -rf {}'.format(local_path) exe_system_cmd(cmd) print('本地文件夾刪除成功。') cmd='hdfs dfs -rm -r {}'.format(hdfs_path) exe_system_cmd(cmd) print('HDFS文件夾刪除成功。') if __name__ == '__main__': starttime = datetime.datetime.now() print('開始時間:{0}'.format(starttime.strftime('%Y-%m-%d %H:%M:%S'))) # 1、判斷輸入?yún)?shù) sql,s_tablename,ds_name,g_tablename,flag=judge_input_parameters_num() # 2、執(zhí)行SQL,生產(chǎn)文件,并返回本地目錄 local_path,hdfs_path=produce_exe_data(sql,s_tablename) hdfs_time=datetime.datetime.now() #print('當前時間:{}'.format(hdfs_time)) print("生成HDFS文件耗時:{0}秒".format((hdfs_time - starttime).seconds)) # 3、獲取目標端連接信息(host,port等) get_rdb_conn_msg(ds_name) # 4、執(zhí)行導數(shù)任務 thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path) # 5、刪除本地文件夾 delete_local_path(local_path,hdfs_path) # 6、關閉數(shù)據(jù)庫連接 close_datadb_conn() endtime = datetime.datetime.now() print('結束時間:{0}'.format(endtime.strftime('%Y-%m-%d %H:%M:%S'))) print('導數(shù)耗時:{0}秒'.format((endtime - hdfs_time).seconds)) print("一共耗時:{0}秒".format((endtime - starttime).seconds))
總結
整個腳本有效地實現(xiàn)了從HDFS到關系數(shù)據(jù)庫的數(shù)據(jù)遷移,確保數(shù)據(jù)的完整性和一致性。首先通過Hive導出數(shù)據(jù),再利用多線程和DataX工具導入到目標數(shù)據(jù)庫。本地化和多線程處理使過程更高效,適合大數(shù)據(jù)處理和數(shù)據(jù)倉庫遷移。
請務必按需調(diào)整腳本中的具體參數(shù)和配置以適應你的環(huán)境和數(shù)據(jù)架構。
以上就是python從Hadoop HDFS導出數(shù)據(jù)到關系數(shù)據(jù)庫的詳細內(nèi)容,更多關于python HDFS導出數(shù)據(jù)到數(shù)據(jù)庫的資料請關注腳本之家其它相關文章!
相關文章
Python如何利用Har文件進行遍歷指定字典替換提交的數(shù)據(jù)詳解
這篇文章主要給大家介紹了關于Python如何利用Har文件進行遍歷指定字典替換提交的數(shù)據(jù)的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-11-11Python提取JSON格式數(shù)據(jù)實戰(zhàn)案例
這篇文章主要給大家介紹了關于Python提取JSON格式數(shù)據(jù)的相關資料, Python提供了內(nèi)置的json模塊,用于處理JSON數(shù)據(jù),文中給出了詳細的代碼示例,需要的朋友可以參考下2023-07-07anaconda jupyter不能導入安裝的lightgbm解決方案
這篇文章主要介紹了anaconda jupyter不能導入安裝的lightgbm解決方案,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03Python Metaclass原理與實現(xiàn)過程詳細講解
MetaClass元類,本質(zhì)也是一個類,但和普通類的用法不同,它可以對類內(nèi)部的定義(包括類屬性和類方法)進行動態(tài)的修改??梢赃@么說,使用元類的主要目的就是為了實現(xiàn)在創(chuàng)建類時,能夠動態(tài)地改變類中定義的屬性或者方法2022-11-11