python從Hadoop?HDFS導(dǎo)出數(shù)據(jù)到關(guān)系數(shù)據(jù)庫(kù)
python從HDFS導(dǎo)出到關(guān)系數(shù)據(jù)庫(kù)(如MySQL、Oracle、PostgreSQL)
一整套從Hadoop HDFS中導(dǎo)出數(shù)據(jù)并通過(guò)DataX工具導(dǎo)入到關(guān)系數(shù)據(jù)庫(kù)的過(guò)程。
操作步驟
1. 定義參數(shù)和變量
sql=$1 # 導(dǎo)出數(shù)據(jù)的SQL語(yǔ)句 s_tablename=$2 # 源表名 ds_name=$3 # 目標(biāo)數(shù)據(jù)庫(kù)名稱 t_tablename=$4 # 目標(biāo)表名 temptable="h2o_"`date +%s%N | md5sum | head -c 16` # 生成一個(gè)基于時(shí)間戳的臨時(shí)表名 filename=${s_tablename}_${temptable} # 文件名 path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/" # HDFS路徑 local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp" # 本地腳本路徑 flag=$5 # 標(biāo)志,用來(lái)確定是否TRUNCATE表
2. 構(gòu)造SQL查詢并提交給Hive
echo "$sql" sql1=`echo "$sql"|cut -d ";" -f2` # 截取分號(hào)后的部分 sql0=`echo "$sql"|cut -d ";" -f1` # 截取分號(hào)前的部分 sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1" # 構(gòu)建最終的SQL echo "$sql0" kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181, ... ,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()"
3. 獲取目標(biāo)數(shù)據(jù)庫(kù)連接信息并解析結(jié)果為變量
從PostgreSQL數(shù)據(jù)庫(kù)中獲取目標(biāo)數(shù)據(jù)庫(kù)的連接信息,并解析結(jié)果為變量。
re=$(PGPASSWORD=... psql -h 10.251.110.104 -p 18921 -U dacp -d dacp -t <<EOF SELECT CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url' WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url ,ds_acct ,ds_auth ,CASE WHEN ds_type = 'mysql' THEN 'mysqlwriter' WHEN ds_type = 'oracle' THEN 'oraclewriter' WHEN ds_type = 'pg' THEN 'postgresqlwriter' END ds_type FROM dacp_dev.dacp_meta_datasource WHERE ds_type IN ('mysql', 'oracle', 'pg') AND upper(trim(ds_name)) = upper(trim('$ds_name')) EOF ) eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}')
4. 獲取目標(biāo)數(shù)據(jù)庫(kù)密碼
通過(guò)執(zhí)行Java程序解密數(shù)據(jù)庫(kù)密碼:
pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar`
5. 預(yù)處理SQL語(yǔ)句
根據(jù)標(biāo)志變量flag
,確定是否執(zhí)行TRUNCATE語(yǔ)句:
preSQL="select * from $t_tablename where 1=-1" if [ "$flag" = "T" ];then preSQL="truncate table $t_tablename" fi echo "preSQL=$preSQL"
6. 數(shù)據(jù)導(dǎo)出并導(dǎo)入目標(biāo)數(shù)據(jù)庫(kù)
使用datax
執(zhí)行從HDFS導(dǎo)入到關(guān)系數(shù)據(jù)庫(kù)的任務(wù):
python $local_path/datax/bin/datax.py -p "-Dpath=$path -Dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass="$pw" -Drdb_jdbc="$jdbc_url" -Drdb_table=$t_tablename -DpreSql="$preSQL"" $local_path/hdfs_to_rdb.json
7. Python代碼詳解
此外,你還展示了大量的Python代碼用于處理數(shù)據(jù)轉(zhuǎn)換和傳輸。重點(diǎn)如下:
1. 初始化設(shè)置和依賴
加載必要的包,并初始化變量。
import time import datetime import os import threadpool import commands import calendar import random import pymssql import pymysql import cx_Oracle import psycopg2 import socket from pyhdfs import HdfsClient from hashlib import md5 # 其他初始化設(shè)置
2. 連接數(shù)據(jù)庫(kù)并執(zhí)行SQL
定義了連接數(shù)據(jù)庫(kù)并執(zhí)行SQL的函數(shù):
def connect_database_to_select(conn, sql): cursor = conn.cursor() try: cursor.execute(sql) result = cursor.fetchall() conn.commit() return result except Exception as e: print('SQL執(zhí)行錯(cuò)誤:{},執(zhí)行SQL:{}'.format(str(e), sql)) sys.exit(2) finally: cursor.close() def connect_database_to_commit(exe_type, conn, sql, insert_list): cursor = conn.cursor() try: if exe_type.lower() in ('delete', 'insert'): cursor.execute(sql) conn.commit() elif exe_type.lower() == 'insertmany': cursor.executemany(sql, insert_list) conn.commit() except Exception as e: print('SQL執(zhí)行錯(cuò)誤:{},執(zhí)行SQL:{}'.format(str(e), sql)) sys.exit(2) finally: cursor.close()
3. 數(shù)據(jù)導(dǎo)出處理
執(zhí)行數(shù)據(jù)導(dǎo)出并提交給Hive:
def produce_exe_data(sql, s_tablename): global local_path_name local_path_01 = local_path_list[random.randrange(len(local_path_list))] + '/dataos_exp' local_path_name = "h2o_{0}_{1}".format(s_tablename, get_md5_str()).lower() local_path = local_path_01 + '/' + local_path_name if os.path.exists(local_path): cmd = 'rm -rf {}'.format(local_path) exe_system_cmd(cmd) os.mkdir(local_path) hdfs_path = "hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name) sql = sql.strip().strip(';') sql_list = sql.split(';') hive_conn = hive_connect() compress_sql = 'set hive.exec.compress.output=false' connect_database_to_commit('insert', hive_conn, compress_sql, '') for i in range(len(sql_list)): sql_str = sql_list[i] if i == len(sql_list)-1: # 如果是最后一條SQL,則執(zhí)行insert overwrite directory sql_str='''INSERT OVERWRITE DIRECTORY '{0}' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' COLLECTION ITEMS TERMINATED BY '\n' MAP KEYS TERMINATED BY ':' {1} '''.format(hdfs_path, sql_str) connect_database_to_commit('insert', hive_conn, sql_str, '') if hive_conn: hive_conn.close() cmd = ''' hdfs dfs -get {}/* {} '''.format(hdfs_path, local_path) exe_system_cmd(cmd) return local_path, hdfs_path
4. 多線程數(shù)據(jù)傳輸
利用多線程加速數(shù)據(jù)傳輸過(guò)程:
def thread_exe_exchange_data(g_tablename, flag, local_path, hdfs_path): global rdb_conn rdb_conn = get_rdb_database_conn() if flag.upper() == 'T': presql = 'truncate table {}'.format(g_tablename) connect_database_to_commit('insert', rdb_conn, presql, '') if ds_type.lower() == 'oracle': global oracle_table_field oracle_table_field = get_oracle_table_fields() localtime = str(time.strftime("%Y%m%d", time.localtime())) ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime) if not os.path.exists(ora_dir): os.mkdir(ora_dir) file_lists = os.listdir(local_path) global exp_num_list global log_list global exception_list exp_num_list = [] log_list = [] exception_list = [] thread_list = [] for file_name in file_lists: thread_list.append(local_path + '/' + file_name) pool = threadpool.ThreadPool(5) requests = threadpool.makeRequests(exchange_data, thread_list) [pool.putRequest(req) for req in requests] pool.wait() if exception_list: delete_local_path(local_path, hdfs_path) sys.exit(2) print('數(shù)據(jù)導(dǎo)出完成,導(dǎo)出數(shù)據(jù)總量為:{}'.format(sum(exp_num_list)))
完整python腳本
#!/bin/bash sql=$1 #導(dǎo)出數(shù)據(jù)sql s_tablename=$2 #源表 ds_name=$3 #目標(biāo)庫(kù) t_tablename=$4 #目標(biāo)表 temptable="h2o_"`date +%s%N | md5sum | head -c 16` #構(gòu)造一個(gè)時(shí)間戳 filename=${s_tablename}_${temptable} #文件名 path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/" local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp" flag=$5 #t TRUNCATE #hadoop fs -mkdir $path # 參數(shù)sql0為 待執(zhí)行SQL echo "$sql" sql1=`echo "$sql"|cut -d ";" -f2` sql0=`echo "$sql"|cut -d ";" -f1` sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1" echo "$sql0" # 向Hive提交HQL kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM #beeline <<EOF #!connect jdbc:hive2://devdataosambari:2181,devdataosnn1:2181,devdataosnn2:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2 #$sql0 beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181,prdnn2.yxbdprd.sc.ctc.com:2181,prdrm1.yxbdprd.sc.ctc.com:2181,prddb2.yxbdprd.sc.ctc.com:2181,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()" # 獲取目標(biāo)數(shù)據(jù)源地址 #eval $(mysql -h 10.251.88.71 -udacp -pdacp123456 dacp_dev -e "select case when ds_type = 'mysql' then concat('jdbc:mysql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName')),'?characterEncoding=UTF-8') #when ds_type = 'oracle' then concat('jdbc:oracle:thin:@', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName'))) #when ds_type = 'pg' then concat('jdbc:postgresql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName'))) end jdbc_url, #ds_acct, ds_auth, case when ds_type = 'mysql' then 'mysqlwriter' when ds_type = 'oracle' then 'oraclewriter' when ds_type = 'pg' then 'postgresqlwriter' end ds_type #from dacp_meta_datasource #where ds_type in ('mysql','oracle','pg') #and ds_name = '$ds_name'" | awk 'NR== 2 {printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$2,$3,$4)}') re=$(PGPASSWORD=jxFgCKv9GJw2ohS3 psql -h 10.251.110.104 -p 18921 -U dacp -d dacp -t <<EOF SELECT CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url' WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url ,ds_acct ,ds_auth ,CASE WHEN ds_type = 'mysql' THEN 'mysqlwriter' WHEN ds_type = 'oracle' THEN 'oraclewriter' WHEN ds_type = 'pg' THEN 'postgresqlwriter' END ds_type FROM dacp_dev.dacp_meta_datasource WHERE ds_type IN ('mysql', 'oracle', 'pg') AND upper(trim(ds_name)) = upper(trim('$ds_name')) EOF ) eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}') #eval $(java -jar /data01/etl/scripts/exec_aes.jar $ds_auth | awk ' {printf("pw=%s;",$1)}') #pw=`java -jar $local_path/exec_aes.jar $ds_auth` pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar` preSQL="select * from $t_tablename where 1=-1" if [ "$flag" = "T" ];then preSQL="truncate table $t_tablename" fi echo "preSQL=$preSQL" python $local_path/datax/bin/datax.py -p "-Dpath=$path -Dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass=\"$pw\" -Drdb_jdbc=\"$jdbc_url\" -Drdb_table=$t_tablename -DpreSql=\"$preSQL\"" $local_path/hdfs_to_rdb.json # -*- coding:utf-8 -*- import time import datetime import os import threadpool import commands import calendar import random import pymssql import pymysql import cx_Oracle import psycopg2 import socket from pyhdfs import HdfsClient from hashlib import md5 import sys reload(sys) sys.setdefaultencoding('utf8') sys.path.append('/data02/dcadmin/scripts/common') from connect_postgresql import postgresql_connect from connect_hive import hive_connect pg_conn_str='dataos_71_pg_dev' # 本地磁盤(pán)目錄,文件隨機(jī)選擇一個(gè)目錄 local_path_list=['/data01','/data02','/data03','/data04','/data05'] def close_datadb_conn(): if rdb_conn: rdb_conn.close() def connect_database_to_select(conn,sql): # print('\r\n執(zhí)行SQL:{}'.format(sql)) cursor = conn.cursor() try: cursor.execute(sql) #cursor.execute(sql.decode('utf-8').encode('gbk')) result = cursor.fetchall() conn.commit() return result except Exception as e: print('SQL執(zhí)行錯(cuò)誤:{},執(zhí)行SQL:{}'.format(str(e),sql)) sys.exit(2) finally: cursor.close() def connect_database_to_commit(exe_type,conn,sql,insert_list): # print('\r\n執(zhí)行SQL:{}'.format(sql)) cursor = conn.cursor() try: if exe_type.lower() in ('delete','insert'): cursor.execute(sql) conn.commit() print('執(zhí)行SQL:{}'.format(sql)) elif exe_type.lower()=='insertmany': cursor.executemany(sql, insert_list) conn.commit() except Exception as e: print('SQL執(zhí)行錯(cuò)誤c:{},執(zhí)行SQL:{}'.format(str(e),sql)) print(sql) sys.exit(2) finally: cursor.close() # 執(zhí)行系統(tǒng)命令 def exe_system_cmd(cmd): status,output=commands.getstatusoutput(cmd) if status!=0: print('命令{}:執(zhí)行失敗,請(qǐng)檢查!'.format(cmd)) print('失敗日志:{}'.format(output)) sys.exit(2) return output # 返回MD5串 def get_md5_str(): # 時(shí)間戳 ts = calendar.timegm(time.gmtime()) md5_str=md5(str(ts).encode(encoding='utf-8')).hexdigest() return md5_str # 判斷輸入?yún)?shù) def judge_input_parameters_num(): if len(sys.argv)!=6: print('參數(shù)有問(wèn)題,請(qǐng)檢查!') print(sys.argv) sys.exit(2) else: sql =sys.argv[1] # 導(dǎo)出數(shù)據(jù)sql s_tablename =sys.argv[2] # 源表名 ds_name =sys.argv[3] # 目標(biāo)庫(kù) g_tablename =sys.argv[4] # 目標(biāo)表 flag =sys.argv[5] # A:append,T:truncate return sql,s_tablename,ds_name,g_tablename,flag # 執(zhí)行SQL語(yǔ)句,生成HDFS文件 def produce_exe_data(sql,s_tablename): global local_path_name # 1、創(chuàng)建本地文件夾 # 隨機(jī)選擇一個(gè)磁盤(pán)目錄:'/data01','/data02','/data03','/data04','/data05' local_path_01 = local_path_list[random.randrange(len(local_path_list))]+'/dataos_exp' # /data01/dataos_exp local_path_name="h2o_{0}_{1}".format(s_tablename,get_md5_str()).lower() # local_path_name='h2o_app_hub_resource_value_level_d_e6963bad13299e939a3a4cc2b2a26a47' local_path=local_path_01+'/'+local_path_name if os.path.exists(local_path): cmd='rm -rf {}'.format(local_path) exe_system_cmd(cmd) os.mkdir(local_path) # 創(chuàng)建hdfs文件夾 hdfs_path="hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name) # 處理SQL,先去除兩邊的空格,再去除兩邊的分號(hào) sql=sql.strip().strip(';') sql_list=sql.split(';') # 依次執(zhí)行切分的SQL hive_conn=hive_connect() # 連接生產(chǎn)HIVE compress_sql='set hive.exec.compress.output=false' print('執(zhí)行SQL:{}'.format(compress_sql)) connect_database_to_commit('insert',hive_conn,compress_sql,'') for i in range(len(sql_list)): sql_str=sql_list[i] if i==len(sql_list)-1: # 如果是最后一條SQL,則執(zhí)行insert overwrite directory sql_str='''INSERT OVERWRITE DIRECTORY '{0}' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\u0001' COLLECTION ITEMS TERMINATED BY '\\n' MAP KEYS TERMINATED BY ':' {1} '''.format(hdfs_path,sql_str) print('執(zhí)行SQL:{}'.format(sql_str)) connect_database_to_commit('insert',hive_conn,sql_str,'') # 關(guān)閉HIVE連接 if hive_conn: hive_conn.close() # 將hdfs文件從hdfs_path路徑get到local_path下 cmd=''' hdfs dfs -get {}/* {} '''.format(hdfs_path,local_path) exe_system_cmd(cmd) print('文件GET成功,當(dāng)前主機(jī):{},數(shù)據(jù)臨時(shí)文件夾:{}'.format(socket.gethostname(),local_path)) return local_path,hdfs_path # 獲取目標(biāo)端的連接信息 def get_rdb_conn_msg(ds_name): global ds_type global ds_acct global ds_auth global host global port global database global jdbc_url sql=''' SELECT ds_name ,ds_type ,ds_acct ,ds_auth ,split_part(ds_inst_loc,':',1) as host ,case when split_part(ds_inst_loc,':',2)='' and ds_type='oracle' then '1521' else split_part(ds_inst_loc,':',2) end as port ,case when lower(ds_type)='oracle' then split_part(replace(replace(replace(ds_conf::json->>'url','jdbc:oracle:thin:@',''),':1521',''),'/',':'),':',2) else ds_conf::json->>'physicalDbName' end as database ,CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') WHEN ds_type = 'oracle' THEN ds_conf::json ->>'url' WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END as jdbc_url FROM dacp_dev.dacp_meta_datasource WHERE ds_type IN ('mysql', 'oracle', 'pg') AND upper(trim(ds_name)) = upper(trim('{}')) '''.format(ds_name) pg_conn=postgresql_connect(pg_conn_str) results=connect_database_to_select(pg_conn,sql) # print(results) if not results: print('未查詢到數(shù)據(jù)庫(kù)連接信息,請(qǐng)檢查,DS_NAME:{}'.format(ds_name)) sys.exit(2) # 關(guān)閉數(shù)據(jù)庫(kù)連接 if pg_conn: pg_conn.close() # 解密密碼 cmd='''java -Dpwd='{0}' -jar /data02/dcadmin/scripts/common/AesCipher-1.0.jar'''.format(results[0][3]) pw=exe_system_cmd(cmd).replace('\r','').replace('\n','') ds_type = results[0][1]ds_acct = results[0][2] ds_auth = pw host = results[0][4] port = int(results[0][5]) database= results[0][6] jdbc_url= results[0][7] # 判斷連接的數(shù)據(jù)庫(kù)類型,并返回?cái)?shù)據(jù)庫(kù)連接conn def get_rdb_database_conn(): dbms_conn=None try: if ds_type.upper()=='SQLSERVER': dbms_conn = pymssql.connect(host=host , user=ds_acct, password=ds_auth, port=port, database=database, charset='utf8') elif ds_type.upper()=='MYSQL': dbms_conn = pymysql.connect(host=host , user=ds_acct, passwd=ds_auth , port=port, database=database, charset='utf8', local_infile=True) elif ds_type.upper()=='ORACLE': listener = '{0}:{1}/{2}'.format(host,port,database) print('listener:{}'.format(listener)) dbms_conn = cx_Oracle.connect(ds_acct,ds_auth,listener,encoding='utf-8') elif ds_type.upper() in ('POSTGRESQL','PG'): dbms_conn = psycopg2.connect(host=host , user=ds_acct, password=ds_auth , port=port, database=database, client_encoding='utf8') else: print("未知源端數(shù)據(jù)庫(kù)類型{}~~~~~,請(qǐng)檢查!".format(ds_type.upper())) sys.exit(2) except Exception as e: print('{0},{1}數(shù)據(jù)庫(kù)連接失敗,請(qǐng)檢查~~~~~~!'.format(ds_type,ds_name)) print('報(bào)錯(cuò)日志:{}'.format(e)) print(host) print(ds_acct) print(ds_auth) print(port) print(database) sys.exit(2) return dbms_conn def thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path): global rdb_conn rdb_conn=get_rdb_database_conn() # 執(zhí)行預(yù)處理SQL if flag.upper()=='T': presql='truncate table {}'.format(g_tablename) print('執(zhí)行SQL:{}'.format(presql)) connect_database_to_commit('insert',rdb_conn,presql,'') # 獲取Oracle表結(jié)構(gòu) if ds_type.lower() in ('oracle'): global oracle_table_field # oracle 表結(jié)構(gòu) oracle_table_field=get_oracle_table_fields() # 創(chuàng)建ctl,bad,log存放目錄 global ora_dir localtime = str(time.strftime("%Y%m%d", time.localtime())) ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime) if not os.path.exists(ora_dir): os.mkdir(ora_dir) # 文件列表 file_lists=os.listdir(local_path) # 多線程導(dǎo)數(shù) global exp_num_list # 存儲(chǔ)導(dǎo)數(shù)數(shù)量 global log_list # 存儲(chǔ)多線程的日志信息 global exception_list # 存儲(chǔ)多線程異常信息 exp_num_list =[] log_list =[] exception_list=[] thread_list=[] # 存儲(chǔ)多線程任務(wù) for file_name in file_lists: thread_list.append(local_path+'/'+file_name) # 創(chuàng)建線程池 pool=threadpool.ThreadPool(5) # 存放任務(wù)列表 requests = threadpool.makeRequests(exchange_data,thread_list) [pool.putRequest(req) for req in requests] pool.wait() # 處理異常 if exception_list: # 導(dǎo)數(shù)出現(xiàn)異常,刪除文件 delete_local_path(local_path,hdfs_path) print('導(dǎo)數(shù)失敗,異常日志信息如下:') for except_msg in exception_list: print(except_msg) sys.exit(2) # 打印多線程日志 # log_list.sort() # for log in log_list: # print(log) # 打印導(dǎo)出結(jié)果 print('數(shù)據(jù)導(dǎo)出完成,導(dǎo)出數(shù)據(jù)總量為:{}'.format(sum(exp_num_list))) # 獲取Oracle表結(jié)構(gòu) def get_oracle_table_fields(): sql = ''' SELECT COLUMN_NAME || SUFFIX AS AA FROM (SELECT A.COLUMN_NAME ,A.COLUMN_ID ,CASE WHEN UPPER(A.DATA_TYPE) LIKE '%DATE%' THEN ' DATE "yyyy-mm-dd hh24:mi:ss"' WHEN UPPER(A.DATA_TYPE) LIKE '%TIMESTAMP%' THEN ' DATE "yyyy-mm-dd hh24:mi:ss.ff"' WHEN UPPER(A.DATA_TYPE) LIKE '%VARCHAR%' THEN ' CHAR(3000)' ELSE '' END AS SUFFIX FROM ALL_TAB_COLUMNS A WHERE UPPER(A.OWNER||'.'||A.TABLE_NAME) = UPPER(TRIM('{0}')) ORDER BY A.COLUMN_ID) ''' if '.' in g_tablename: sql=sql.format(g_tablename) else: sql=sql.format(database+'.'+g_tablename) oracle_table_fields=connect_database_to_select(rdb_conn,sql) if not oracle_table_fields: print('未查詢到表結(jié)構(gòu),表名:{}'.format(g_tablename)) sys.exit(2) oracle_table_field = ",\n".join([str(list[0]) for list in oracle_table_fields]) return oracle_table_field # 執(zhí)行單個(gè)導(dǎo)出任務(wù) def exchange_data(file_path): try: output='' # 執(zhí)行導(dǎo)數(shù)任務(wù) if ds_type.lower() in ('pg','postgresql','telpg','antdb'): cmd='''psql "port={0} host={1} user={2} dbname={3} password={4} " -c "\copy {5} from '{6}' DELIMITER AS E'\u0001' " ''' cmd=cmd.format(port,host,ds_acct,database,ds_auth,g_tablename,file_path) status,output=commands.getstatusoutput(cmd) if status!=0: exception_list.append('命令{}:執(zhí)行失敗,請(qǐng)檢查!失敗日志:{}'.format(cmd,output)) elif ds_type.lower() in ('mysql','teldb'): mysql_conn = pymysql.connect(host=host , user=ds_acct, passwd=ds_auth , port=port, database=database, charset='utf8', local_infile=True) mysql_cursor=mysql_conn.cursor() sql='SET NAMES UTF8' mysql_cursor.execute(sql) sql='''load data local infile '{}' into table {} fields terminated by X'01' lines terminated by '\\n' '''.format(file_path,g_tablename) #print(sql) output=mysql_cursor.execute(sql) mysql_conn.commit() mysql_conn.close() # cmd='''mysql -h {} -P {} -u {} -p{} -D {} -e "SET NAMES UTF8;load data local infile '{}' into table {} fields terminated by X'01' lines terminated by '\\n'" ''' # cmd=cmd.format(host,port,ds_acct,ds_auth,database,file_path,g_tablename) elif ds_type.lower() in ('oracle'): tns='''\'{}/"{}"\'@{}:1521/{}'''.format(ds_acct,ds_auth,host,database) ora_file_name=file_path.replace(local_path+'/','') ora_file_path=ora_dir+'/'+local_path_name+'_'+ora_file_name control_file = ora_file_path+".ctl" log_file = ora_file_path+".log" bad_file = ora_file_path+".bad" dis_file = ora_file_path+".dis" content =''' UNRECOVERABLE LOAD DATA CHARACTERSET AL32UTF8 APPEND INTO TABLE {0} FIELDS TERMINATED BY x'01' TRAILING NULLCOLS ({1}) '''.format(g_tablename, oracle_table_field) # 如果控制文件存在,則先刪除 if os.path.exists(control_file): cmd='rm -rf {}'.format(control_file) exe_system_cmd(cmd) # 再創(chuàng)建控制文件 with open(control_file, "w") as file: file.write(content) cmd='''export ORACLE_HOME=/data03/apps/db_1;export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$LD_LIBRARY_PATH;cat {0} | /data03/apps/db_1/bin/sqlldr userid={1} control={2} data=\\"-\\" log={3} bad={4} discard={5} errors=0 direct=true parallel=true multithreading=true columnarrayrows=100000 STREAMSIZE=20971520 readsize=20971520 bindsize=20971520 date_cache=0 ''' cmd=cmd.format(file_path, tns, control_file, log_file, bad_file, dis_file) status,output=commands.getstatusoutput(cmd) if status!=0: exception_list.append('命令{}:執(zhí)行失敗,請(qǐng)檢查!失敗日志:{}'.format(cmd,output)) else: exception_list.append('目標(biāo)端數(shù)據(jù)庫(kù)類型為:{},此類型暫未支持!'.format(db_type.lower())) # 計(jì)算導(dǎo)出行數(shù) if ds_type.lower() in ('pg','postgresql','telpg','antdb'): file_row_num=int(output.split('COPY ')[1].strip()) exp_num_list.append(file_row_num) elif ds_type.lower() in ('oracle'): try: output=output.decode('gbk') except: output=output file_row_num=int(output.split('邏輯記錄計(jì)數(shù) ')[1].replace('。','').strip()) exp_num_list.append(file_row_num) elif ds_type.lower() in ('mysql','teldb'): exp_num_list.append(output) # 插入日志 log_list.append(output) except Exception as e: exception_list.append(e) def delete_local_path(local_path,hdfs_path): cmd='rm -rf {}'.format(local_path) exe_system_cmd(cmd) print('本地文件夾刪除成功。') cmd='hdfs dfs -rm -r {}'.format(hdfs_path) exe_system_cmd(cmd) print('HDFS文件夾刪除成功。') if __name__ == '__main__': starttime = datetime.datetime.now() print('開(kāi)始時(shí)間:{0}'.format(starttime.strftime('%Y-%m-%d %H:%M:%S'))) # 1、判斷輸入?yún)?shù) sql,s_tablename,ds_name,g_tablename,flag=judge_input_parameters_num() # 2、執(zhí)行SQL,生產(chǎn)文件,并返回本地目錄 local_path,hdfs_path=produce_exe_data(sql,s_tablename) hdfs_time=datetime.datetime.now() #print('當(dāng)前時(shí)間:{}'.format(hdfs_time)) print("生成HDFS文件耗時(shí):{0}秒".format((hdfs_time - starttime).seconds)) # 3、獲取目標(biāo)端連接信息(host,port等) get_rdb_conn_msg(ds_name) # 4、執(zhí)行導(dǎo)數(shù)任務(wù) thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path) # 5、刪除本地文件夾 delete_local_path(local_path,hdfs_path) # 6、關(guān)閉數(shù)據(jù)庫(kù)連接 close_datadb_conn() endtime = datetime.datetime.now() print('結(jié)束時(shí)間:{0}'.format(endtime.strftime('%Y-%m-%d %H:%M:%S'))) print('導(dǎo)數(shù)耗時(shí):{0}秒'.format((endtime - hdfs_time).seconds)) print("一共耗時(shí):{0}秒".format((endtime - starttime).seconds))
總結(jié)
整個(gè)腳本有效地實(shí)現(xiàn)了從HDFS到關(guān)系數(shù)據(jù)庫(kù)的數(shù)據(jù)遷移,確保數(shù)據(jù)的完整性和一致性。首先通過(guò)Hive導(dǎo)出數(shù)據(jù),再利用多線程和DataX工具導(dǎo)入到目標(biāo)數(shù)據(jù)庫(kù)。本地化和多線程處理使過(guò)程更高效,適合大數(shù)據(jù)處理和數(shù)據(jù)倉(cāng)庫(kù)遷移。
請(qǐng)務(wù)必按需調(diào)整腳本中的具體參數(shù)和配置以適應(yīng)你的環(huán)境和數(shù)據(jù)架構(gòu)。
以上就是python從Hadoop HDFS導(dǎo)出數(shù)據(jù)到關(guān)系數(shù)據(jù)庫(kù)的詳細(xì)內(nèi)容,更多關(guān)于python HDFS導(dǎo)出數(shù)據(jù)到數(shù)據(jù)庫(kù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
如何用Python做一個(gè)微信機(jī)器人自動(dòng)拉群
這篇文章主要介紹了如何用Python做一個(gè)微信機(jī)器人自動(dòng)拉群,微當(dāng)群人數(shù)達(dá)到100人后,用戶無(wú)法再通過(guò)掃描群二維碼加入,只能讓用戶先添加群內(nèi)聯(lián)系人微信,再由聯(lián)系人把用戶拉進(jìn)來(lái)。這樣,聯(lián)系人員的私人微信會(huì)添加大量陌生人,給其帶來(lái)不必要的打擾,需要的朋友可以參考下2019-07-07Python 讀取用戶指令和格式化打印實(shí)現(xiàn)解析
這篇文章主要介紹了Python 讀取用戶指令和格式化打印實(shí)現(xiàn)解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09Python如何利用Har文件進(jìn)行遍歷指定字典替換提交的數(shù)據(jù)詳解
這篇文章主要給大家介紹了關(guān)于Python如何利用Har文件進(jìn)行遍歷指定字典替換提交的數(shù)據(jù)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11Python提取JSON格式數(shù)據(jù)實(shí)戰(zhàn)案例
這篇文章主要給大家介紹了關(guān)于Python提取JSON格式數(shù)據(jù)的相關(guān)資料, Python提供了內(nèi)置的json模塊,用于處理JSON數(shù)據(jù),文中給出了詳細(xì)的代碼示例,需要的朋友可以參考下2023-07-07anaconda jupyter不能導(dǎo)入安裝的lightgbm解決方案
這篇文章主要介紹了anaconda jupyter不能導(dǎo)入安裝的lightgbm解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-03-03Python Metaclass原理與實(shí)現(xiàn)過(guò)程詳細(xì)講解
MetaClass元類,本質(zhì)也是一個(gè)類,但和普通類的用法不同,它可以對(duì)類內(nèi)部的定義(包括類屬性和類方法)進(jìn)行動(dòng)態(tài)的修改??梢赃@么說(shuō),使用元類的主要目的就是為了實(shí)現(xiàn)在創(chuàng)建類時(shí),能夠動(dòng)態(tài)地改變類中定義的屬性或者方法2022-11-11