欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python從Hadoop?HDFS導(dǎo)出數(shù)據(jù)到關(guān)系數(shù)據(jù)庫(kù)

 更新時(shí)間:2024年11月06日 10:59:04   作者:jzy3711  
這篇文章主要為大家詳細(xì)介紹了Python如何從Hadoop?HDFS中導(dǎo)出數(shù)據(jù)并通過(guò)DataX工具導(dǎo)入到關(guān)系數(shù)據(jù)庫(kù),例如MySQL,Oracle,PostgreSQL等,感興趣的可以了解下

python從HDFS導(dǎo)出到關(guān)系數(shù)據(jù)庫(kù)(如MySQL、Oracle、PostgreSQL)

一整套從Hadoop HDFS中導(dǎo)出數(shù)據(jù)并通過(guò)DataX工具導(dǎo)入到關(guān)系數(shù)據(jù)庫(kù)的過(guò)程。

操作步驟

1. 定義參數(shù)和變量

sql=$1                  # 導(dǎo)出數(shù)據(jù)的SQL語(yǔ)句
s_tablename=$2          # 源表名
ds_name=$3              # 目標(biāo)數(shù)據(jù)庫(kù)名稱
t_tablename=$4          # 目標(biāo)表名
temptable="h2o_"`date +%s%N | md5sum | head -c 16`  # 生成一個(gè)基于時(shí)間戳的臨時(shí)表名
filename=${s_tablename}_${temptable}  # 文件名
path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/"   # HDFS路徑
local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp"  # 本地腳本路徑
flag=$5 # 標(biāo)志,用來(lái)確定是否TRUNCATE表

2. 構(gòu)造SQL查詢并提交給Hive

echo "$sql"
sql1=`echo "$sql"|cut -d ";" -f2`  # 截取分號(hào)后的部分
sql0=`echo "$sql"|cut -d ";" -f1`  # 截取分號(hào)前的部分
sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1"  # 構(gòu)建最終的SQL
echo "$sql0"

kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM
beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181, ... ,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()"

3. 獲取目標(biāo)數(shù)據(jù)庫(kù)連接信息并解析結(jié)果為變量

從PostgreSQL數(shù)據(jù)庫(kù)中獲取目標(biāo)數(shù)據(jù)庫(kù)的連接信息,并解析結(jié)果為變量。

re=$(PGPASSWORD=... psql -h 10.251.110.104 -p 18921 -U dacp -d dacp  -t <<EOF
SELECT CASE WHEN ds_type = 'mysql'  THEN CONCAT ('jdbc:mysql://'     ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') 
            WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url' 
            WHEN ds_type = 'pg'     THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url
      ,ds_acct
      ,ds_auth
      ,CASE WHEN ds_type = 'mysql'  THEN 'mysqlwriter' 
            WHEN ds_type = 'oracle' THEN 'oraclewriter' 
            WHEN ds_type = 'pg'     THEN 'postgresqlwriter' END ds_type
FROM dacp_dev.dacp_meta_datasource
WHERE ds_type IN ('mysql', 'oracle', 'pg')
  AND upper(trim(ds_name)) = upper(trim('$ds_name'))
EOF
)
eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}')

4. 獲取目標(biāo)數(shù)據(jù)庫(kù)密碼

通過(guò)執(zhí)行Java程序解密數(shù)據(jù)庫(kù)密碼:

pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar`

5. 預(yù)處理SQL語(yǔ)句

根據(jù)標(biāo)志變量flag,確定是否執(zhí)行TRUNCATE語(yǔ)句:

preSQL="select * from $t_tablename where 1=-1"
if [ "$flag" = "T" ];then
 preSQL="truncate table $t_tablename"
fi
echo "preSQL=$preSQL"

6. 數(shù)據(jù)導(dǎo)出并導(dǎo)入目標(biāo)數(shù)據(jù)庫(kù)

使用datax執(zhí)行從HDFS導(dǎo)入到關(guān)系數(shù)據(jù)庫(kù)的任務(wù):

python $local_path/datax/bin/datax.py -p "-Dpath=$path -Dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass="$pw" -Drdb_jdbc="$jdbc_url" -Drdb_table=$t_tablename -DpreSql="$preSQL"" $local_path/hdfs_to_rdb.json

7. Python代碼詳解

此外,你還展示了大量的Python代碼用于處理數(shù)據(jù)轉(zhuǎn)換和傳輸。重點(diǎn)如下:

1. 初始化設(shè)置和依賴

加載必要的包,并初始化變量。

import time
import datetime
import os
import threadpool
import commands
import calendar
import random
import pymssql
import pymysql
import cx_Oracle
import psycopg2
import socket
from pyhdfs import HdfsClient
from hashlib import md5

# 其他初始化設(shè)置

2. 連接數(shù)據(jù)庫(kù)并執(zhí)行SQL

定義了連接數(shù)據(jù)庫(kù)并執(zhí)行SQL的函數(shù):

def connect_database_to_select(conn, sql):
    cursor = conn.cursor()
    try:
        cursor.execute(sql)
        result = cursor.fetchall()
        conn.commit()
        return result
    except Exception as e:
        print('SQL執(zhí)行錯(cuò)誤:{},執(zhí)行SQL:{}'.format(str(e), sql))
        sys.exit(2)
    finally:
        cursor.close()

def connect_database_to_commit(exe_type, conn, sql, insert_list):
    cursor = conn.cursor()
    try:
        if exe_type.lower() in ('delete', 'insert'):
            cursor.execute(sql)
            conn.commit()
        elif exe_type.lower() == 'insertmany':
            cursor.executemany(sql, insert_list)
            conn.commit()
    except Exception as e:
        print('SQL執(zhí)行錯(cuò)誤:{},執(zhí)行SQL:{}'.format(str(e), sql))
        sys.exit(2)
    finally:
        cursor.close()

3. 數(shù)據(jù)導(dǎo)出處理

執(zhí)行數(shù)據(jù)導(dǎo)出并提交給Hive:

def produce_exe_data(sql, s_tablename):
    global local_path_name
    local_path_01 = local_path_list[random.randrange(len(local_path_list))] + '/dataos_exp'
    local_path_name = "h2o_{0}_{1}".format(s_tablename, get_md5_str()).lower()
    local_path = local_path_01 + '/' + local_path_name
    if os.path.exists(local_path):
        cmd = 'rm -rf {}'.format(local_path)
        exe_system_cmd(cmd)
    os.mkdir(local_path)
    
    hdfs_path = "hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name)
    sql = sql.strip().strip(';')
    sql_list = sql.split(';')
    
    hive_conn = hive_connect()
    compress_sql = 'set hive.exec.compress.output=false'
    connect_database_to_commit('insert', hive_conn, compress_sql, '')
    
    for i in range(len(sql_list)):
        sql_str = sql_list[i]
        if i == len(sql_list)-1:      # 如果是最后一條SQL,則執(zhí)行insert overwrite directory
            sql_str='''INSERT OVERWRITE DIRECTORY '{0}' 
         ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' COLLECTION ITEMS TERMINATED BY '\n' MAP KEYS TERMINATED BY ':' 
         {1} '''.format(hdfs_path, sql_str)
        connect_database_to_commit('insert', hive_conn, sql_str, '')

    if hive_conn:
        hive_conn.close()
    
    cmd = ''' hdfs dfs -get {}/* {} '''.format(hdfs_path, local_path)
    exe_system_cmd(cmd)
    return local_path, hdfs_path

4. 多線程數(shù)據(jù)傳輸

利用多線程加速數(shù)據(jù)傳輸過(guò)程:

def thread_exe_exchange_data(g_tablename, flag, local_path, hdfs_path):
    global rdb_conn
    rdb_conn = get_rdb_database_conn()

    if flag.upper() == 'T':
        presql = 'truncate table {}'.format(g_tablename)
        connect_database_to_commit('insert', rdb_conn, presql, '')

    if ds_type.lower() == 'oracle':
        global oracle_table_field       
        oracle_table_field = get_oracle_table_fields()

        localtime = str(time.strftime("%Y%m%d", time.localtime()))
        ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime)
        if not os.path.exists(ora_dir):
            os.mkdir(ora_dir)
    
    file_lists = os.listdir(local_path)
    global exp_num_list
    global log_list
    global exception_list
    exp_num_list = []
    log_list = []
    exception_list = []

    thread_list = []
    for file_name in file_lists:
        thread_list.append(local_path + '/' + file_name)
    
    pool = threadpool.ThreadPool(5)
    requests = threadpool.makeRequests(exchange_data, thread_list)
    [pool.putRequest(req) for req in requests]
    pool.wait()
   
    if exception_list:
        delete_local_path(local_path, hdfs_path)
        sys.exit(2)

    print('數(shù)據(jù)導(dǎo)出完成,導(dǎo)出數(shù)據(jù)總量為:{}'.format(sum(exp_num_list)))

完整python腳本

#!/bin/bash
sql=$1                  #導(dǎo)出數(shù)據(jù)sql
s_tablename=$2  #源表
ds_name=$3              #目標(biāo)庫(kù)
t_tablename=$4  #目標(biāo)表
temptable="h2o_"`date +%s%N | md5sum | head -c 16`      #構(gòu)造一個(gè)時(shí)間戳
filename=${s_tablename}_${temptable}    #文件名
path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/"
local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp"
flag=$5 #t TRUNCATE 
#hadoop fs -mkdir $path
# 參數(shù)sql0為 待執(zhí)行SQL
echo "$sql"
sql1=`echo "$sql"|cut -d ";" -f2`
sql0=`echo "$sql"|cut -d ";" -f1`
sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1"
echo "$sql0"
# 向Hive提交HQL
kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM
#beeline <<EOF
#!connect jdbc:hive2://devdataosambari:2181,devdataosnn1:2181,devdataosnn2:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2


#$sql0
beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181,prdnn2.yxbdprd.sc.ctc.com:2181,prdrm1.yxbdprd.sc.ctc.com:2181,prddb2.yxbdprd.sc.ctc.com:2181,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()"
# 獲取目標(biāo)數(shù)據(jù)源地址
#eval $(mysql -h 10.251.88.71 -udacp -pdacp123456 dacp_dev -e "select case when ds_type = 'mysql' then concat('jdbc:mysql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName')),'?characterEncoding=UTF-8')
#when ds_type = 'oracle' then concat('jdbc:oracle:thin:@', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName')))
#when ds_type = 'pg' then concat('jdbc:postgresql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName'))) end jdbc_url,
#ds_acct, ds_auth, case when ds_type = 'mysql' then 'mysqlwriter' when ds_type = 'oracle' then 'oraclewriter' when ds_type = 'pg' then 'postgresqlwriter' end ds_type
#from dacp_meta_datasource 
#where ds_type in ('mysql','oracle','pg') 
#and ds_name = '$ds_name'" | awk 'NR== 2 {printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$2,$3,$4)}')

re=$(PGPASSWORD=jxFgCKv9GJw2ohS3 psql -h 10.251.110.104 -p 18921 -U dacp -d dacp  -t <<EOF
SELECT CASE WHEN ds_type = 'mysql'  THEN CONCAT ('jdbc:mysql://'     ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') 
            WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url' 
            WHEN ds_type = 'pg'     THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url
      ,ds_acct
      ,ds_auth
      ,CASE WHEN ds_type = 'mysql'  THEN 'mysqlwriter' 
            WHEN ds_type = 'oracle' THEN 'oraclewriter' 
            WHEN ds_type = 'pg'     THEN 'postgresqlwriter' END ds_type
FROM dacp_dev.dacp_meta_datasource
WHERE ds_type IN ('mysql', 'oracle', 'pg')
  AND upper(trim(ds_name)) = upper(trim('$ds_name'))
EOF
)
eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}')

#eval $(java -jar /data01/etl/scripts/exec_aes.jar $ds_auth | awk ' {printf("pw=%s;",$1)}')
#pw=`java -jar $local_path/exec_aes.jar $ds_auth`
pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar`

preSQL="select * from $t_tablename where 1=-1"
if [ "$flag" = "T" ];then
 preSQL="truncate table $t_tablename"
fi
echo "preSQL=$preSQL"

python $local_path/datax/bin/datax.py -p "-Dpath=$path -Dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass=\"$pw\" -Drdb_jdbc=\"$jdbc_url\" -Drdb_table=$t_tablename -DpreSql=\"$preSQL\"" $local_path/hdfs_to_rdb.json

# -*- coding:utf-8 -*-

import time
import datetime
import os
import threadpool
import commands
import calendar
import random
import pymssql
import pymysql
import cx_Oracle
import psycopg2
import socket
from pyhdfs import HdfsClient
from hashlib import md5
import sys
reload(sys)
sys.setdefaultencoding('utf8')
sys.path.append('/data02/dcadmin/scripts/common')
from connect_postgresql import postgresql_connect
from connect_hive import hive_connect

pg_conn_str='dataos_71_pg_dev'



# 本地磁盤(pán)目錄,文件隨機(jī)選擇一個(gè)目錄
local_path_list=['/data01','/data02','/data03','/data04','/data05']


def close_datadb_conn():
    if rdb_conn:
        rdb_conn.close()


def connect_database_to_select(conn,sql):
    # print('\r\n執(zhí)行SQL:{}'.format(sql))
    cursor = conn.cursor()
    try:
        cursor.execute(sql)
        #cursor.execute(sql.decode('utf-8').encode('gbk'))
        result = cursor.fetchall()
        conn.commit()
        return result
    except Exception as e:
        print('SQL執(zhí)行錯(cuò)誤:{},執(zhí)行SQL:{}'.format(str(e),sql))
        sys.exit(2)
    finally:
        cursor.close()

def connect_database_to_commit(exe_type,conn,sql,insert_list):
    # print('\r\n執(zhí)行SQL:{}'.format(sql))
    cursor = conn.cursor()
    try:
        if exe_type.lower() in ('delete','insert'):
            cursor.execute(sql)
            conn.commit()
            print('執(zhí)行SQL:{}'.format(sql))
        elif exe_type.lower()=='insertmany':
            cursor.executemany(sql, insert_list)
            conn.commit()
    except Exception as e:
        print('SQL執(zhí)行錯(cuò)誤c:{},執(zhí)行SQL:{}'.format(str(e),sql))
        print(sql)
        sys.exit(2)
    finally:
        cursor.close()

# 執(zhí)行系統(tǒng)命令
def exe_system_cmd(cmd):
    status,output=commands.getstatusoutput(cmd)
    if status!=0:
        print('命令{}:執(zhí)行失敗,請(qǐng)檢查!'.format(cmd))
        print('失敗日志:{}'.format(output))
        sys.exit(2)
    return output


# 返回MD5串
def get_md5_str():
    # 時(shí)間戳
    ts = calendar.timegm(time.gmtime())
    md5_str=md5(str(ts).encode(encoding='utf-8')).hexdigest()
    return md5_str


# 判斷輸入?yún)?shù)
def judge_input_parameters_num():
    if len(sys.argv)!=6:
        print('參數(shù)有問(wèn)題,請(qǐng)檢查!')
        print(sys.argv)
        sys.exit(2)
    else:
        sql         =sys.argv[1]            # 導(dǎo)出數(shù)據(jù)sql
        s_tablename =sys.argv[2]            # 源表名
        ds_name     =sys.argv[3]            # 目標(biāo)庫(kù)
        g_tablename =sys.argv[4]            # 目標(biāo)表
        flag        =sys.argv[5]            # A:append,T:truncate
    return sql,s_tablename,ds_name,g_tablename,flag


# 執(zhí)行SQL語(yǔ)句,生成HDFS文件
def produce_exe_data(sql,s_tablename):
    global local_path_name
    # 1、創(chuàng)建本地文件夾
    # 隨機(jī)選擇一個(gè)磁盤(pán)目錄:'/data01','/data02','/data03','/data04','/data05'
    local_path_01 = local_path_list[random.randrange(len(local_path_list))]+'/dataos_exp'     # /data01/dataos_exp
    local_path_name="h2o_{0}_{1}".format(s_tablename,get_md5_str()).lower()
    # local_path_name='h2o_app_hub_resource_value_level_d_e6963bad13299e939a3a4cc2b2a26a47'
    local_path=local_path_01+'/'+local_path_name
    if os.path.exists(local_path):
        cmd='rm -rf {}'.format(local_path)
        exe_system_cmd(cmd)
    os.mkdir(local_path)

    # 創(chuàng)建hdfs文件夾
    hdfs_path="hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name)

    # 處理SQL,先去除兩邊的空格,再去除兩邊的分號(hào)
    sql=sql.strip().strip(';')
    sql_list=sql.split(';')
    # 依次執(zhí)行切分的SQL
    hive_conn=hive_connect()        # 連接生產(chǎn)HIVE
    compress_sql='set hive.exec.compress.output=false'
    print('執(zhí)行SQL:{}'.format(compress_sql))
    connect_database_to_commit('insert',hive_conn,compress_sql,'')
    for i in range(len(sql_list)):
        sql_str=sql_list[i]
        if i==len(sql_list)-1:      # 如果是最后一條SQL,則執(zhí)行insert overwrite directory
            sql_str='''INSERT OVERWRITE DIRECTORY '{0}' 
         ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\u0001' COLLECTION ITEMS TERMINATED BY '\\n' MAP KEYS TERMINATED BY ':' 
         {1} '''.format(hdfs_path,sql_str)
        print('執(zhí)行SQL:{}'.format(sql_str))
        connect_database_to_commit('insert',hive_conn,sql_str,'')
    # 關(guān)閉HIVE連接
    if hive_conn:
        hive_conn.close()
    # 將hdfs文件從hdfs_path路徑get到local_path下
    cmd=''' hdfs dfs -get {}/* {} '''.format(hdfs_path,local_path)
    exe_system_cmd(cmd)
    print('文件GET成功,當(dāng)前主機(jī):{},數(shù)據(jù)臨時(shí)文件夾:{}'.format(socket.gethostname(),local_path))
    return local_path,hdfs_path
# 獲取目標(biāo)端的連接信息
def get_rdb_conn_msg(ds_name):
    global ds_type
    global ds_acct
    global ds_auth
    global host
    global port
    global database
    global jdbc_url
    sql='''
        SELECT ds_name
              ,ds_type
              ,ds_acct
              ,ds_auth
              ,split_part(ds_inst_loc,':',1) as host
              ,case when split_part(ds_inst_loc,':',2)='' and ds_type='oracle' then '1521' else split_part(ds_inst_loc,':',2) end as port
              ,case when lower(ds_type)='oracle' then split_part(replace(replace(replace(ds_conf::json->>'url','jdbc:oracle:thin:@',''),':1521',''),'/',':'),':',2) else ds_conf::json->>'physicalDbName' end as database
              ,CASE WHEN ds_type = 'mysql'  THEN CONCAT ('jdbc:mysql://'     ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8')
                    WHEN ds_type = 'oracle' THEN ds_conf::json ->>'url'
                    WHEN ds_type = 'pg'     THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END as jdbc_url
        FROM dacp_dev.dacp_meta_datasource
        WHERE ds_type IN ('mysql', 'oracle', 'pg')
          AND upper(trim(ds_name)) = upper(trim('{}')) '''.format(ds_name)
    pg_conn=postgresql_connect(pg_conn_str)
    results=connect_database_to_select(pg_conn,sql)
    # print(results)
    if not results:
        print('未查詢到數(shù)據(jù)庫(kù)連接信息,請(qǐng)檢查,DS_NAME:{}'.format(ds_name))
        sys.exit(2)
    # 關(guān)閉數(shù)據(jù)庫(kù)連接
    if pg_conn:
        pg_conn.close()
    # 解密密碼
    cmd='''java -Dpwd='{0}' -jar /data02/dcadmin/scripts/common/AesCipher-1.0.jar'''.format(results[0][3])
    pw=exe_system_cmd(cmd).replace('\r','').replace('\n','')
    ds_type = results[0][1]ds_acct = results[0][2]
    ds_auth = pw
    host    = results[0][4]
    port    = int(results[0][5])
    database= results[0][6]
    jdbc_url= results[0][7]


# 判斷連接的數(shù)據(jù)庫(kù)類型,并返回?cái)?shù)據(jù)庫(kù)連接conn
def get_rdb_database_conn():
    dbms_conn=None
    try:
        if ds_type.upper()=='SQLSERVER':
            dbms_conn = pymssql.connect(host=host  , user=ds_acct, password=ds_auth, port=port, database=database, charset='utf8')
        elif ds_type.upper()=='MYSQL':
            dbms_conn = pymysql.connect(host=host  , user=ds_acct, passwd=ds_auth  , port=port, database=database, charset='utf8', local_infile=True)
        elif ds_type.upper()=='ORACLE':
            listener = '{0}:{1}/{2}'.format(host,port,database)
            print('listener:{}'.format(listener))
            dbms_conn = cx_Oracle.connect(ds_acct,ds_auth,listener,encoding='utf-8')
        elif ds_type.upper() in ('POSTGRESQL','PG'):
            dbms_conn = psycopg2.connect(host=host , user=ds_acct, password=ds_auth  , port=port, database=database, client_encoding='utf8')
        else:
            print("未知源端數(shù)據(jù)庫(kù)類型{}~~~~~,請(qǐng)檢查!".format(ds_type.upper()))
            sys.exit(2)
    except Exception as e:
        print('{0},{1}數(shù)據(jù)庫(kù)連接失敗,請(qǐng)檢查~~~~~~!'.format(ds_type,ds_name))
        print('報(bào)錯(cuò)日志:{}'.format(e))
        print(host)
        print(ds_acct)
        print(ds_auth)
        print(port)
        print(database)
        sys.exit(2)
    return dbms_conn


def thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path):
    global rdb_conn
    rdb_conn=get_rdb_database_conn()
    # 執(zhí)行預(yù)處理SQL
    if flag.upper()=='T':
        presql='truncate table {}'.format(g_tablename)
        print('執(zhí)行SQL:{}'.format(presql))
        connect_database_to_commit('insert',rdb_conn,presql,'')
    # 獲取Oracle表結(jié)構(gòu)
    if ds_type.lower() in ('oracle'):
        global oracle_table_field       # oracle 表結(jié)構(gòu)
        oracle_table_field=get_oracle_table_fields()

        # 創(chuàng)建ctl,bad,log存放目錄
        global ora_dir
        localtime = str(time.strftime("%Y%m%d", time.localtime()))
        ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime)
        if not os.path.exists(ora_dir):
            os.mkdir(ora_dir)

    # 文件列表
    file_lists=os.listdir(local_path)

    # 多線程導(dǎo)數(shù)
    global exp_num_list     # 存儲(chǔ)導(dǎo)數(shù)數(shù)量
    global log_list         # 存儲(chǔ)多線程的日志信息
    global exception_list   # 存儲(chǔ)多線程異常信息
    exp_num_list  =[]
    log_list      =[]
    exception_list=[]

    thread_list=[]          # 存儲(chǔ)多線程任務(wù)
    for file_name in file_lists:
        thread_list.append(local_path+'/'+file_name)
    # 創(chuàng)建線程池
    pool=threadpool.ThreadPool(5)
    # 存放任務(wù)列表
    requests = threadpool.makeRequests(exchange_data,thread_list)
    [pool.putRequest(req) for req in requests]
    pool.wait()
    # 處理異常
    if exception_list:
        # 導(dǎo)數(shù)出現(xiàn)異常,刪除文件
        delete_local_path(local_path,hdfs_path)
        print('導(dǎo)數(shù)失敗,異常日志信息如下:')
        for except_msg in exception_list:
            print(except_msg)
        sys.exit(2)

    # 打印多線程日志
    # log_list.sort()
    # for log in log_list:
    #     print(log)

    # 打印導(dǎo)出結(jié)果
    print('數(shù)據(jù)導(dǎo)出完成,導(dǎo)出數(shù)據(jù)總量為:{}'.format(sum(exp_num_list)))



# 獲取Oracle表結(jié)構(gòu)
def get_oracle_table_fields():
    sql = '''
             SELECT COLUMN_NAME || SUFFIX AS AA
               FROM (SELECT A.COLUMN_NAME
                           ,A.COLUMN_ID
                           ,CASE WHEN UPPER(A.DATA_TYPE) LIKE '%DATE%'      THEN ' DATE "yyyy-mm-dd hh24:mi:ss"'
                                 WHEN UPPER(A.DATA_TYPE) LIKE '%TIMESTAMP%' THEN ' DATE "yyyy-mm-dd hh24:mi:ss.ff"'
                                 WHEN UPPER(A.DATA_TYPE) LIKE '%VARCHAR%'   THEN ' CHAR(3000)'
                                 ELSE '' END AS SUFFIX
                       FROM ALL_TAB_COLUMNS A
                      WHERE UPPER(A.OWNER||'.'||A.TABLE_NAME) = UPPER(TRIM('{0}'))
                      ORDER BY A.COLUMN_ID) '''
    if '.' in g_tablename:
        sql=sql.format(g_tablename)
    else:
        sql=sql.format(database+'.'+g_tablename)
    oracle_table_fields=connect_database_to_select(rdb_conn,sql)
    if not oracle_table_fields:
        print('未查詢到表結(jié)構(gòu),表名:{}'.format(g_tablename))
        sys.exit(2)
    oracle_table_field = ",\n".join([str(list[0]) for list in oracle_table_fields])
    return oracle_table_field
    # 執(zhí)行單個(gè)導(dǎo)出任務(wù)
def exchange_data(file_path):
    try:
        output=''
        # 執(zhí)行導(dǎo)數(shù)任務(wù)
        if ds_type.lower() in ('pg','postgresql','telpg','antdb'):
            cmd='''psql "port={0} host={1} user={2} dbname={3} password={4} " -c "\copy {5} from '{6}' DELIMITER AS E'\u0001' " '''
            cmd=cmd.format(port,host,ds_acct,database,ds_auth,g_tablename,file_path)
            status,output=commands.getstatusoutput(cmd)
            if status!=0:
                exception_list.append('命令{}:執(zhí)行失敗,請(qǐng)檢查!失敗日志:{}'.format(cmd,output))
        elif ds_type.lower() in ('mysql','teldb'):
            mysql_conn = pymysql.connect(host=host  , user=ds_acct, passwd=ds_auth  , port=port, database=database, charset='utf8', local_infile=True)
            mysql_cursor=mysql_conn.cursor()
            sql='SET NAMES UTF8'
            mysql_cursor.execute(sql)
            sql='''load data local infile '{}' into table {} fields terminated by X'01' lines terminated by '\\n'  '''.format(file_path,g_tablename)
            #print(sql)
            output=mysql_cursor.execute(sql)
            mysql_conn.commit()
            mysql_conn.close()
            # cmd='''mysql -h {} -P {} -u {} -p{} -D {} -e "SET NAMES UTF8;load data local infile '{}' into table {} fields terminated by X'01' lines terminated by '\\n'"  '''
            # cmd=cmd.format(host,port,ds_acct,ds_auth,database,file_path,g_tablename)
        elif ds_type.lower() in ('oracle'):
            tns='''\'{}/"{}"\'@{}:1521/{}'''.format(ds_acct,ds_auth,host,database)
            ora_file_name=file_path.replace(local_path+'/','')
            ora_file_path=ora_dir+'/'+local_path_name+'_'+ora_file_name
            control_file = ora_file_path+".ctl"
            log_file     = ora_file_path+".log"
            bad_file     = ora_file_path+".bad"
            dis_file     = ora_file_path+".dis"
            content ='''
UNRECOVERABLE LOAD DATA CHARACTERSET AL32UTF8 
APPEND INTO TABLE {0} FIELDS TERMINATED BY x'01' 
TRAILING NULLCOLS ({1}) '''.format(g_tablename, oracle_table_field)
            # 如果控制文件存在,則先刪除
            if os.path.exists(control_file):
                cmd='rm -rf {}'.format(control_file)
                exe_system_cmd(cmd)
            # 再創(chuàng)建控制文件
             with open(control_file, "w") as file:
                file.write(content)
            cmd='''export ORACLE_HOME=/data03/apps/db_1;export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$LD_LIBRARY_PATH;cat {0} | /data03/apps/db_1/bin/sqlldr userid={1} control={2} data=\\"-\\" log={3} bad={4} discard={5} errors=0 direct=true parallel=true multithreading=true columnarrayrows=100000 STREAMSIZE=20971520 readsize=20971520 bindsize=20971520 date_cache=0 '''
            cmd=cmd.format(file_path, tns, control_file, log_file, bad_file, dis_file)
            status,output=commands.getstatusoutput(cmd)
            if status!=0:
                exception_list.append('命令{}:執(zhí)行失敗,請(qǐng)檢查!失敗日志:{}'.format(cmd,output))
        else:
            exception_list.append('目標(biāo)端數(shù)據(jù)庫(kù)類型為:{},此類型暫未支持!'.format(db_type.lower()))

        # 計(jì)算導(dǎo)出行數(shù)
        if ds_type.lower() in ('pg','postgresql','telpg','antdb'):
            file_row_num=int(output.split('COPY ')[1].strip())
            exp_num_list.append(file_row_num)
        elif ds_type.lower() in ('oracle'):
            try:
                output=output.decode('gbk')
            except:
                output=output
            file_row_num=int(output.split('邏輯記錄計(jì)數(shù) ')[1].replace('。','').strip())
            exp_num_list.append(file_row_num)
        elif ds_type.lower() in ('mysql','teldb'):
            exp_num_list.append(output)
        # 插入日志
        log_list.append(output)
    except Exception as e:
        exception_list.append(e)




def delete_local_path(local_path,hdfs_path):
    cmd='rm -rf {}'.format(local_path)
    exe_system_cmd(cmd)
    print('本地文件夾刪除成功。')
    cmd='hdfs dfs -rm -r {}'.format(hdfs_path)
    exe_system_cmd(cmd)
    print('HDFS文件夾刪除成功。')
if __name__ == '__main__':
    starttime = datetime.datetime.now()
    print('開(kāi)始時(shí)間:{0}'.format(starttime.strftime('%Y-%m-%d %H:%M:%S')))
    # 1、判斷輸入?yún)?shù)
    sql,s_tablename,ds_name,g_tablename,flag=judge_input_parameters_num()
    # 2、執(zhí)行SQL,生產(chǎn)文件,并返回本地目錄
    local_path,hdfs_path=produce_exe_data(sql,s_tablename)
    hdfs_time=datetime.datetime.now()
    #print('當(dāng)前時(shí)間:{}'.format(hdfs_time))
    print("生成HDFS文件耗時(shí):{0}秒".format((hdfs_time - starttime).seconds))
    # 3、獲取目標(biāo)端連接信息(host,port等)
    get_rdb_conn_msg(ds_name)
    # 4、執(zhí)行導(dǎo)數(shù)任務(wù)
    thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path)
    # 5、刪除本地文件夾
    delete_local_path(local_path,hdfs_path)
    # 6、關(guān)閉數(shù)據(jù)庫(kù)連接
    close_datadb_conn()
    endtime = datetime.datetime.now()
    print('結(jié)束時(shí)間:{0}'.format(endtime.strftime('%Y-%m-%d %H:%M:%S')))
    print('導(dǎo)數(shù)耗時(shí):{0}秒'.format((endtime - hdfs_time).seconds))
    print("一共耗時(shí):{0}秒".format((endtime - starttime).seconds))

總結(jié)

整個(gè)腳本有效地實(shí)現(xiàn)了從HDFS到關(guān)系數(shù)據(jù)庫(kù)的數(shù)據(jù)遷移,確保數(shù)據(jù)的完整性和一致性。首先通過(guò)Hive導(dǎo)出數(shù)據(jù),再利用多線程和DataX工具導(dǎo)入到目標(biāo)數(shù)據(jù)庫(kù)。本地化和多線程處理使過(guò)程更高效,適合大數(shù)據(jù)處理和數(shù)據(jù)倉(cāng)庫(kù)遷移。

請(qǐng)務(wù)必按需調(diào)整腳本中的具體參數(shù)和配置以適應(yīng)你的環(huán)境和數(shù)據(jù)架構(gòu)。

以上就是python從Hadoop HDFS導(dǎo)出數(shù)據(jù)到關(guān)系數(shù)據(jù)庫(kù)的詳細(xì)內(nèi)容,更多關(guān)于python HDFS導(dǎo)出數(shù)據(jù)到數(shù)據(jù)庫(kù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 如何用Python做一個(gè)微信機(jī)器人自動(dòng)拉群

    如何用Python做一個(gè)微信機(jī)器人自動(dòng)拉群

    這篇文章主要介紹了如何用Python做一個(gè)微信機(jī)器人自動(dòng)拉群,微當(dāng)群人數(shù)達(dá)到100人后,用戶無(wú)法再通過(guò)掃描群二維碼加入,只能讓用戶先添加群內(nèi)聯(lián)系人微信,再由聯(lián)系人把用戶拉進(jìn)來(lái)。這樣,聯(lián)系人員的私人微信會(huì)添加大量陌生人,給其帶來(lái)不必要的打擾,需要的朋友可以參考下
    2019-07-07
  • python模擬表單提交登錄圖書(shū)館

    python模擬表單提交登錄圖書(shū)館

    這篇文章主要為大家詳細(xì)介紹了python模擬表單提交登錄圖書(shū)館的實(shí)現(xiàn)方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-04-04
  • Python 讀取用戶指令和格式化打印實(shí)現(xiàn)解析

    Python 讀取用戶指令和格式化打印實(shí)現(xiàn)解析

    這篇文章主要介紹了Python 讀取用戶指令和格式化打印實(shí)現(xiàn)解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-09-09
  • django解決跨域請(qǐng)求的問(wèn)題詳解

    django解決跨域請(qǐng)求的問(wèn)題詳解

    這篇文章主要介紹了django解決跨域請(qǐng)求的問(wèn)題詳解,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2019-01-01
  • python如何實(shí)現(xiàn)控制電腦音量

    python如何實(shí)現(xiàn)控制電腦音量

    這篇文章主要介紹了python如何實(shí)現(xiàn)控制電腦音量問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • 基于Python實(shí)現(xiàn)IP代理池

    基于Python實(shí)現(xiàn)IP代理池

    在網(wǎng)絡(luò)爬蟲(chóng)或數(shù)據(jù)采集領(lǐng)域,IP代理池是一種常用的工具,本文將詳細(xì)介紹如何使用Python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的IP代理池,有需要的可以參考一下
    2024-11-11
  • Python如何利用Har文件進(jìn)行遍歷指定字典替換提交的數(shù)據(jù)詳解

    Python如何利用Har文件進(jìn)行遍歷指定字典替換提交的數(shù)據(jù)詳解

    這篇文章主要給大家介紹了關(guān)于Python如何利用Har文件進(jìn)行遍歷指定字典替換提交的數(shù)據(jù)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11
  • Python提取JSON格式數(shù)據(jù)實(shí)戰(zhàn)案例

    Python提取JSON格式數(shù)據(jù)實(shí)戰(zhàn)案例

    這篇文章主要給大家介紹了關(guān)于Python提取JSON格式數(shù)據(jù)的相關(guān)資料, Python提供了內(nèi)置的json模塊,用于處理JSON數(shù)據(jù),文中給出了詳細(xì)的代碼示例,需要的朋友可以參考下
    2023-07-07
  • anaconda jupyter不能導(dǎo)入安裝的lightgbm解決方案

    anaconda jupyter不能導(dǎo)入安裝的lightgbm解決方案

    這篇文章主要介紹了anaconda jupyter不能導(dǎo)入安裝的lightgbm解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-03-03
  • Python Metaclass原理與實(shí)現(xiàn)過(guò)程詳細(xì)講解

    Python Metaclass原理與實(shí)現(xiàn)過(guò)程詳細(xì)講解

    MetaClass元類,本質(zhì)也是一個(gè)類,但和普通類的用法不同,它可以對(duì)類內(nèi)部的定義(包括類屬性和類方法)進(jìn)行動(dòng)態(tài)的修改??梢赃@么說(shuō),使用元類的主要目的就是為了實(shí)現(xiàn)在創(chuàng)建類時(shí),能夠動(dòng)態(tài)地改變類中定義的屬性或者方法
    2022-11-11

最新評(píng)論