Python腳本實(shí)現(xiàn)datax全量同步mysql到hive
前言
在我們構(gòu)建離線數(shù)倉(cāng)時(shí)或者遷移數(shù)據(jù)時(shí),通常選用sqoop和datax等工具進(jìn)行操作,sqoop和datax各有優(yōu)點(diǎn),datax優(yōu)點(diǎn)也很明顯,基于內(nèi)存,所以速度上很快,那么在進(jìn)行全量同步時(shí)編寫json文件是一項(xiàng)很繁瑣的事,是否可以編寫腳本來(lái)把繁瑣事來(lái)簡(jiǎn)單化,接下來(lái)我將分享這樣一個(gè)mysql全量同步到hive自動(dòng)生成json文件的python腳本。
一、展示腳本
# coding=utf-8
import json
import getopt
import os
import sys
import pymysql
# MySQL 相關(guān)配置,需根據(jù)實(shí)際情況作出修改
mysql_host = "XXXXXX"
mysql_port = "XXXX"
mysql_user = "XXX"
mysql_passwd = "XXXXXX"
# HDFS NameNode 相關(guān)配置,需根據(jù)實(shí)際情況作出修改
hdfs_nn_host = "XXXXXX"
hdfs_nn_port = "XXXX"
# 生成配置文件的目標(biāo)路徑,可根據(jù)實(shí)際情況作出修改
output_path = "/XXX/XXX/XXX"
def get_connection():
return pymysql.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, password=mysql_passwd)
def get_mysql_meta(database, table):
connection = get_connection()
cursor = connection.cursor()
sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
cursor.execute(sql, [database, table])
fetchall = cursor.fetchall()
cursor.close()
connection.close()
return fetchall
def get_mysql_columns(database, table):
return list(map(lambda x: x[0], get_mysql_meta(database, table)))
def get_hive_columns(database, table):
def type_mapping(mysql_type):
mappings = {
"bigint": "bigint",
"int": "bigint",
"smallint": "bigint",
"tinyint": "bigint",
"decimal": "string",
"double": "double",
"float": "float",
"binary": "string",
"char": "string",
"varchar": "string",
"datetime": "string",
"time": "string",
"timestamp": "string",
"date": "string",
"text": "string"
}
return mappings[mysql_type]
meta = get_mysql_meta(database, table)
return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta))
def generate_json(source_database, source_table):
job = {
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": mysql_user,
"password": mysql_passwd,
"column": get_mysql_columns(source_database, source_table),
"splitPk": "",
"connection": [{
"table": [source_table],
"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
}]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
"fileType": "text",
"path": "${targetdir}",
"fileName": source_table,
"column": get_hive_columns(source_database, source_table),
"writeMode": "append",
"fieldDelimiter": "\t",
"compress": "gzip"
}
}
}]
}
}
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
json.dump(job, f)
def main(args):
source_database = ""
source_table = ""
options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
for opt_name, opt_value in options:
if opt_name in ('-d', '--sourcedb'):
source_database = opt_value
if opt_name in ('-t', '--sourcetbl'):
source_table = opt_value
generate_json(source_database, source_table)
if __name__ == '__main__':
main(sys.argv[1:])
二、使用準(zhǔn)備
1、安裝python環(huán)境
這里我安裝的是python3環(huán)境
sudo yum install -y python3
2、安裝EPEL
EPEL(Extra Packages for Enterprise Linux)是一個(gè)由 Fedora Special Interest Group 維護(hù)的軟件倉(cāng)庫(kù),提供了大量在官方 RHEL 或 CentOS 軟件倉(cāng)庫(kù)中沒有的軟件包。當(dāng)你在 CentOS 或 RHEL 系統(tǒng)上需要安裝一些不在官方軟件倉(cāng)庫(kù)中的軟件時(shí),通常會(huì)先安裝epel - release
sudo yum install -y epel-release
3、安裝腳本執(zhí)行需要的第三方模塊
pip3 install pymysql pip3 install cryptography
這里可能由于斑紋問題cryptography安裝不上去更新一下pip和setuptools
pip3 install --upgrade pip pip3 install --upgrade setuptools
重新安裝cryptography
pip3 install cryptography
三、腳本使用方法
1、配置腳本
首先根據(jù)自己服務(wù)器修改腳本相關(guān)配置
2、創(chuàng)建.py文件
vim /xxx/xxx/xxx/gen_import_config.py
3、執(zhí)行腳本
python3 /腳本路徑/gen_import_config.py -d 數(shù)據(jù)庫(kù)名 -t 表名
4、測(cè)試生成json文件是否可用
datax.py -p"-Dtargetdir=/表在hdfs存放路徑" /生成的json文件路徑
執(zhí)行時(shí)首先要確保targetdir目標(biāo)地址在hdfs上存在,如果沒有需要?jiǎng)?chuàng)建后再次執(zhí)行
到此這篇關(guān)于Python腳本實(shí)現(xiàn)datax全量同步mysql到hive的文章就介紹到這了,更多相關(guān)Python datax全量同步mysql到hive內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解sklearn?Preprocessing?數(shù)據(jù)預(yù)處理功能
這篇文章主要介紹了sklearn?Preprocessing?數(shù)據(jù)預(yù)處理功能,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08
Python利用prettytable庫(kù)輸出好看的表格
prettytable庫(kù)就是這么一個(gè)工具,prettytable可以打印出美觀的表格,并且對(duì)中文支持相當(dāng)好。本文將介紹如何通過prettytable輸出好看的表格,需要的可以參考一下2022-01-01
使用 Python 創(chuàng)建一個(gè)基于規(guī)則的聊天機(jī)器人
這篇文章主要介紹了使用 Python 創(chuàng)建一個(gè)基于規(guī)則的聊天機(jī)器人,使用 Python 創(chuàng)建一個(gè)簡(jiǎn)單的基于規(guī)則的聊天機(jī)器人 聊天機(jī)器人本身是一種機(jī)器或軟件,它通過文本或句子模仿人類交互。 簡(jiǎn)而言之,可以使用類似于與人類對(duì)話的軟件進(jìn)行聊天。2021-10-10
Python PIL實(shí)現(xiàn)GIF壓縮工具
本文將結(jié)合wxPython的GUI框架和PIL(Python Imaging Library)的圖像處理能力編寫一個(gè)GIF壓縮工具,并提供了兩種壓縮方式,感興趣的小伙伴可以了解下2024-10-10
Python實(shí)現(xiàn)剪刀石頭布小游戲(與電腦對(duì)戰(zhàn))
這篇文章給大家分享Python基礎(chǔ)實(shí)現(xiàn)與電腦對(duì)戰(zhàn)的剪刀石頭布小游戲,練習(xí)if while輸入和輸出,代碼簡(jiǎn)單易懂,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2019-12-12
pycharm配置anaconda環(huán)境時(shí)找不到python.exe的兩種解決辦法
如果你在Anaconda中創(chuàng)建了虛擬環(huán)境,但是無(wú)法找到python.exe,可能是因?yàn)樘摂M環(huán)境的Python路徑?jīng)]有添加到系統(tǒng)環(huán)境變量中,這篇文章主要給大家介紹了關(guān)于pycharm配置anaconda環(huán)境時(shí)找不到python.exe的兩種解決辦法,需要的朋友可以參考下2024-07-07
Pandas數(shù)據(jù)分析之pandas文本處理
這篇文章主要介紹了Pandas數(shù)據(jù)分析之pandas文本處理,pandas對(duì)文本數(shù)據(jù)也有很多便捷處理方法,可以不用寫循環(huán),向量化操作運(yùn)算速度快,還可以進(jìn)行高級(jí)的正則表達(dá)式,各種復(fù)雜的邏輯篩選和匹配提取信息2022-08-08
Python3.x+迅雷x 自動(dòng)下載高分電影的實(shí)現(xiàn)方法
這篇文章主要介紹了Python3.x+迅雷x 自動(dòng)下載高分電影的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01

