Python實(shí)現(xiàn)將MongoDB中的數(shù)據(jù)導(dǎo)入到MySQL
本文主要介紹了一個(gè)將 MongoDB 中的數(shù)據(jù)導(dǎo)入到 MySQL 中的 Python 工具類 MongoToMysql。該工具類實(shí)現(xiàn)了獲取 MongoDB 數(shù)據(jù)類型、創(chuàng)建 MySQL 表結(jié)構(gòu)以及將數(shù)據(jù)從 MongoDB 推送到 MySQL 等功能。
通過(guò)該工具類,用戶可以輕松地將 MongoDB 中的數(shù)據(jù)導(dǎo)入到 MySQL 中,實(shí)現(xiàn)數(shù)據(jù)的轉(zhuǎn)移和使用。
使用該工具類,用戶需要傳入相應(yīng)的參數(shù),包括 MongoDB 的連接信息,MySQL 的連接信息,以及表名、是否設(shè)置最大長(zhǎng)度、批處理大小和表描述等信息。具體使用可以參考代碼中的注釋。
實(shí)現(xiàn)代碼
import pymysql
from loguru import logger
class MongoToMysql:
def __init__(self, mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, mysql_user,
mysql_password, mysql_db,table_name=None,set_max_length=False,batch_size=10000,table_description=''):
self.mongo_host = mongo_host
self.mongo_port = mongo_port
self.mongo_db = mongo_db
self.mongo_collection = mongo_collection
self.mysql_host = mysql_host
self.mysql_port = mysql_port
self.mysql_user = mysql_user
self.mysql_password = mysql_password
self.mysql_db = mysql_db
self.table_name = table_name
self.set_max_length = set_max_length
self.batch_size = batch_size
self.table_description = table_description
self.data_types = self.get_mongo_data_types()
self.create_mysql_table(self.data_types,set_max_length= self.set_max_length,table_description=self.table_description)
self.push_data_to_mysql(self.batch_size)
def get_mongo_data_types(self):
logger.debug('正在獲取mongo中字段的類型!')
client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
db = client[self.mongo_db]
collection = db[self.mongo_collection]
data_types = {}
for field in collection.find_one().keys():
data_types[field] = type(collection.find_one()[field]).__name__
return data_types
def check_mysql_table_exists(self):
logger.debug('檢查是否存在該表,有則刪之!')
conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
password=self.mysql_password, db=self.mysql_db)
cursor = conn.cursor()
sql = f"DROP TABLE IF EXISTS {self.mongo_collection}"
cursor.execute(sql)
conn.commit()
conn.close()
def get_max_length(self, field):
logger.debug(f'正在獲取字段 {field} 最大長(zhǎng)度......')
client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
db = client[self.mongo_db]
collection = db[self.mongo_collection]
max_length = 0
for item in collection.find({},{field:1,'_id':0}):
value = item.get(field)
if isinstance(value, str) and len(value) > max_length:
max_length = len(value)
return max_length
def create_mysql_table(self, data_types,set_max_length,table_description):
logger.debug('正在mysql中創(chuàng)建表結(jié)構(gòu)!')
self.check_mysql_table_exists()
conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
password=self.mysql_password, db=self.mysql_db)
cursor = conn.cursor()
if self.table_name:
table_name = self.table_name
else:
table_name = self.mongo_collection
fields = []
for field, data_type in data_types.items():
if data_type == 'int':
fields.append(f"{field} INT")
elif data_type == 'float':
fields.append(f"{field} FLOAT")
elif data_type == 'bool':
fields.append(f"{field} BOOLEAN")
else:
if set_max_length:
fields.append(f"{field} TEXT)")
else:
max_length = self.get_max_length(field)
fields.append(f"{field} VARCHAR({max_length + 200})")
fields_str = ','.join(fields)
sql = f"CREATE TABLE {table_name} (id INT PRIMARY KEY AUTO_INCREMENT,{fields_str}) COMMENT='{table_description}'"
cursor.execute(sql)
conn.commit()
conn.close()
def push_data_to_mysql(self, batch_size=10000):
logger.debug('--- 正在準(zhǔn)備從mongo中每次推送10000條數(shù)據(jù)到mysql ----')
client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
db = client[self.mongo_db]
collection = db[self.mongo_collection]
conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
password=self.mysql_password, db=self.mysql_db)
cursor = conn.cursor()
if self.table_name:
table_name = self.table_name
else:
table_name = self.mongo_collection
# table_name = self.mongo_collection
data = []
count = 0
for item in collection.find():
count += 1
row = []
for field, data_type in self.data_types.items():
value = item.get(field)
if value is None:
row.append(None)
elif data_type == 'int':
row.append(str(item.get(field, 0)))
elif data_type == 'float':
row.append(str(item.get(field, 0.0)))
elif data_type == 'bool':
row.append(str(item.get(field, False)))
else:
row.append(str(item.get(field, '')))
data.append(row)
if count % batch_size == 0:
placeholders = ','.join(['%s'] * len(data[0]))
sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
cursor.executemany(sql, data)
conn.commit()
data = []
logger.debug(f'--- 已完成推送:{count} 條數(shù)據(jù)! ----')
if data:
placeholders = ','.join(['%s'] * len(data[0]))
sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
cursor.executemany(sql, data)
conn.commit()
logger.debug(f'--- 已完成推送:{count} 條數(shù)據(jù)! ----')
conn.close()
if __name__ == '__main__':
"""MySQL"""
mongo_host = '127.0.0.1'
mongo_port = 27017
mongo_db = 'db_name'
mongo_collection = 'collection_name'
"""MongoDB"""
mysql_host = '127.0.0.1'
mysql_port = 3306
mysql_user = 'root'
mysql_password = '123456'
mysql_db = 'mysql_db'
table_description = '' # 表描述
mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
mysql_user, mysql_password, mysql_db,table_description=table_description)
#
# table_name = None # 默認(rèn)為None 則MySQL中的表名跟Mongo保持一致
# set_max_length = False # 默認(rèn)為False 根據(jù)mongo中字段最大長(zhǎng)度 加200 來(lái)設(shè)置字段的VARCHART長(zhǎng)度 , 否則定義TEXT類型
# batch_size = 10000 # 控制每次插入數(shù)據(jù)量的大小
# table_description = '' # 表描述
# mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
# mysql_user, mysql_password, mysql_db,table_name,set_max_length,batch_size,table_description)
代碼使用了 PyMongo、PyMySQL 和 Loguru 等 Python 庫(kù),并封裝了一個(gè) MongoToMysql 類。在類的初始化時(shí),會(huì)自動(dòng)獲取 MongoDB 中字段的類型,并根據(jù)字段類型創(chuàng)建 MySQL 表結(jié)構(gòu)。在將數(shù)據(jù)從 MongoDB 推送到 MySQL 時(shí),還可以控制每次插入數(shù)據(jù)量的大小,以避免一次性插入大量數(shù)據(jù)造成系統(tǒng)崩潰或性能下降。
需要注意的是,在創(chuàng)建 MySQL 表結(jié)構(gòu)時(shí),如果用戶選擇了設(shè)置最大長(zhǎng)度,則會(huì)創(chuàng)建 TEXT 類型的字段,否則會(huì)根據(jù) MongoDB 中字段的最大長(zhǎng)度加上200來(lái)設(shè)置 VARCHAR 類型的字段長(zhǎng)度。
總之,本文介紹的 MongoToMysql 工具類非常方便實(shí)用,對(duì)于需要將 MongoDB 數(shù)據(jù)遷移到 MySQL 的用戶來(lái)說(shuō),是一種很好的解決方案。
到此這篇關(guān)于Python實(shí)現(xiàn)將MongoDB中的數(shù)據(jù)導(dǎo)入到MySQL的文章就介紹到這了,更多相關(guān)Python MongoDB數(shù)據(jù)導(dǎo)入到MySQL內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python數(shù)據(jù)擬合之scipy.optimize.curve_fit解讀
這篇文章主要介紹了python數(shù)據(jù)擬合之scipy.optimize.curve_fit解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12
不知道這5種下劃線的含義,你就不算真的會(huì)Python!
Python是一種高級(jí)程序語(yǔ)言,其核心設(shè)計(jì)哲學(xué)是代碼可讀性和語(yǔ)法,能夠讓程序員用很少的代碼來(lái)表達(dá)自己的想法。這篇文章主要介紹了不知道這5種下劃線的含義,你就不算真的會(huì)Python!對(duì)此標(biāo)題感興趣的朋友一起閱讀本文吧2018-10-10
python?matplotlib庫(kù)繪圖實(shí)戰(zhàn)之繪制散點(diǎn)圖
Python有著強(qiáng)大的繪圖庫(kù) matplotlib,該庫(kù)集成了大量的繪制函數(shù),可以滿足我們平時(shí)絕大多數(shù)的繪圖要求,這篇文章主要給大家介紹了關(guān)于python?matplotlib庫(kù)繪圖實(shí)戰(zhàn)之繪制散點(diǎn)圖的相關(guān)資料,需要的朋友可以參考下2022-07-07
Pycharm如何對(duì)python文件進(jìn)行打包
這篇文章主要介紹了Pycharm如何對(duì)python文件進(jìn)行打包,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02
在pycharm中創(chuàng)建django項(xiàng)目的示例代碼
這篇文章主要介紹了在pycharm中創(chuàng)建django項(xiàng)目的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05
python使用代理IP爬取貓眼電影專業(yè)評(píng)分?jǐn)?shù)據(jù)
在編寫(xiě)爬蟲(chóng)程序的過(guò)程中,IP封鎖無(wú)疑是一個(gè)常見(jiàn)且棘手的問(wèn)題,盡管網(wǎng)絡(luò)上存在大量的免費(fèi)IP代理網(wǎng)站,但其質(zhì)量往往參差不齊,令人堪憂,本篇文章中介紹一下如何使用Python的Requests庫(kù)和BeautifulSoup庫(kù)來(lái)抓取貓眼電影網(wǎng)站上的專業(yè)評(píng)分?jǐn)?shù)據(jù),需要的朋友可以參考下2024-03-03
Python+Opencv身份證號(hào)碼區(qū)域提取及識(shí)別實(shí)現(xiàn)
這篇文章主要介紹了Python+Opencv身份證號(hào)碼區(qū)域提取及識(shí)別實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08
使用python+Pyqt5實(shí)現(xiàn)串口調(diào)試助手
這篇文章主要介紹了使用python+Pyqt5實(shí)現(xiàn)串口調(diào)試助手,串口通訊程序首先要對(duì)串口進(jìn)行設(shè)置,如波特率、數(shù)據(jù)位、停止位、校驗(yàn)位等,需要的朋友可以參考下2022-04-04

