Python實(shí)現(xiàn)將MySQL中所有表的數(shù)據(jù)都導(dǎo)出為CSV文件并壓縮
Python將MySQL數(shù)據(jù)庫(kù)中所有表的數(shù)據(jù)都導(dǎo)出為CSV文件到一個(gè)目錄,并壓縮為zip文件到另一個(gè)目錄下,然后解壓縮這個(gè)目錄中的所有zip文件到第三個(gè)目錄下。不使用Pandas庫(kù),需要考慮SQL結(jié)果集是大數(shù)據(jù)量分批數(shù)據(jù)導(dǎo)出的情況,通過(guò)多線(xiàn)程和異步操作來(lái)提高程序性能,程序需要異常處理和輸出,輸出出錯(cuò)時(shí)的錯(cuò)誤信息,每次每個(gè)查詢(xún)導(dǎo)出數(shù)據(jù)的運(yùn)行狀態(tài)和表數(shù)據(jù)行數(shù)以及運(yùn)行時(shí)間戳,導(dǎo)出時(shí)間,輸出每個(gè)文件記錄數(shù)量的日志。
該腳本已在考慮大數(shù)據(jù)量、異常處理和性能優(yōu)化的基礎(chǔ)上進(jìn)行了全面設(shè)計(jì),能夠處理大多數(shù)常見(jiàn)場(chǎng)景。根據(jù)具體需求可進(jìn)一步調(diào)整批量大?。╞atch_size)和線(xiàn)程數(shù)(max_workers)以獲得最佳性能。
import os import csv import zipfile import logging import mysql.connector from datetime import datetime import time import concurrent.futures import glob # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('data_export.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def export_table_to_csv(table_name, csv_path, db_config, batch_size=1000): """導(dǎo)出單個(gè)表的數(shù)據(jù)到CSV文件,分批處理""" conn = None cursor = None total_rows = 0 try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor() # 獲取數(shù)據(jù)并寫(xiě)入CSV with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) # 執(zhí)行查詢(xún)并獲取列名 cursor.execute(f"SELECT * FROM `{table_name}`") columns = [col[0] for col in cursor.description] writer.writerow(columns) # 分批獲取數(shù)據(jù) while True: rows = cursor.fetchmany(batch_size) if not rows: break writer.writerows(rows) total_rows += len(rows) logger.debug(f"{table_name} 已導(dǎo)出 {total_rows} 行") logger.info(f"{table_name} CSV導(dǎo)出完成,總行數(shù):{total_rows}") return total_rows except Exception as e: logger.error(f"導(dǎo)出表 {table_name} 失敗: {str(e)}", exc_info=True) raise finally: if cursor: cursor.close() if conn and conn.is_connected(): conn.close() def compress_to_zip(source_path, zip_path): """壓縮文件為ZIP格式""" try: with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: zipf.write(source_path, arcname=os.path.basename(source_path)) logger.info(f"成功壓縮 {source_path} 到 {zip_path}") except Exception as e: logger.error(f"壓縮 {source_path} 失敗: {str(e)}", exc_info=True) raise def process_table(table_name, db_config, csv_dir, zip_dir): """處理單個(gè)表的導(dǎo)出和壓縮""" start_time = time.time() logger.info(f"開(kāi)始處理表: {table_name}") status = "成功" rows_exported = 0 try: # 定義文件路徑 csv_filename = f"{table_name}.csv" zip_filename = f"{table_name}.zip" csv_path = os.path.join(csv_dir, csv_filename) zip_path = os.path.join(zip_dir, zip_filename) # 導(dǎo)出CSV rows_exported = export_table_to_csv(table_name, csv_path, db_config) # 壓縮文件 compress_to_zip(csv_path, zip_path) except Exception as e: status = f"失敗: {str(e)}" # 清理可能存在的中間文件 for path in [csv_path, zip_path]: if path and os.path.exists(path): try: os.remove(path) logger.warning(f"已清理文件: {path}") except Exception as clean_error: logger.error(f"清理文件失敗: {clean_error}") finally: duration = time.time() - start_time log_message = ( f"表處理完成 - 表名: {table_name}, " f"狀態(tài): {status}, " f"導(dǎo)出行數(shù): {rows_exported}, " f"耗時(shí): {duration:.2f}秒" ) logger.info(log_message) def unzip_files(zip_dir, unzip_dir): """解壓指定目錄中的所有ZIP文件""" zip_files = glob.glob(os.path.join(zip_dir, '*.zip')) if not zip_files: logger.warning("未找到ZIP文件,跳過(guò)解壓") return with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for zip_path in zip_files: futures.append(executor.submit( lambda: extract_zip(zip_path, unzip_dir) )) for future in concurrent.futures.as_completed(futures): try: future.result() except Exception as e: logger.error(f"解壓過(guò)程中發(fā)生錯(cuò)誤: {str(e)}") def extract_zip(zip_path, unzip_dir): """解壓?jiǎn)蝹€(gè)ZIP文件""" try: start_time = time.time() with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(unzip_dir) duration = time.time() - start_time logger.info(f"解壓完成: {zip_path} => {unzip_dir} (耗時(shí): {duration:.2f}秒)") except Exception as e: logger.error(f"解壓 {zip_path} 失敗: {str(e)}", exc_info=True) raise def main(): # 配置參數(shù) db_config = { 'host': 'localhost', 'user': 'your_username', 'password': 'your_password', 'database': 'your_database' } # 目錄配置 base_dir = os.path.dirname(os.path.abspath(__file__)) csv_dir = os.path.join(base_dir, 'csv_exports') zip_dir = os.path.join(base_dir, 'zip_archives') unzip_dir = os.path.join(base_dir, 'unzipped_files') # 創(chuàng)建目錄 for dir_path in [csv_dir, zip_dir, unzip_dir]: os.makedirs(dir_path, exist_ok=True) logger.info(f"目錄已準(zhǔn)備: {dir_path}") # 獲取所有表名 try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor() cursor.execute("SHOW TABLES") tables = [table[0] for table in cursor.fetchall()] logger.info(f"發(fā)現(xiàn) {len(tables)} 個(gè)需要處理的表") except Exception as e: logger.error(f"獲取數(shù)據(jù)庫(kù)表失敗: {str(e)}", exc_info=True) return finally: if 'cursor' in locals(): cursor.close() if 'conn' in locals() and conn.is_connected(): conn.close() # 處理所有表(多線(xiàn)程導(dǎo)出和壓縮) with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [] for table in tables: futures.append(executor.submit( process_table, table, db_config, csv_dir, zip_dir )) # 處理任務(wù)結(jié)果 for future in concurrent.futures.as_completed(futures): try: future.result() except Exception as e: logger.error(f"表處理異常: {str(e)}") # 解壓所有ZIP文件(多線(xiàn)程解壓) logger.info("開(kāi)始解壓所有ZIP文件") unzip_files(zip_dir, unzip_dir) logger.info("全部處理流程完成") if __name__ == "__main__": main()
關(guān)鍵特性說(shuō)明:
1.分批處理大數(shù)據(jù):
- 使用fetchmany(batch_size)分批獲取數(shù)據(jù)(默認(rèn)每批1000行)
- 流式處理減少內(nèi)存占用
2.多線(xiàn)程處理:
- 使用ThreadPoolExecutor并行處理不同表的導(dǎo)出和壓縮
- 獨(dú)立的數(shù)據(jù)庫(kù)連接池(每個(gè)線(xiàn)程有自己的連接)
- 并行解壓處理
3.異常處理:
- 全面的try-except塊覆蓋所有關(guān)鍵操作
- 自動(dòng)清理失敗時(shí)產(chǎn)生的中間文件
- 詳細(xì)的錯(cuò)誤日志記錄(包含堆棧跟蹤)
4.日志記錄:
- 同時(shí)輸出到文件和終端
- 記錄時(shí)間戳、操作類(lèi)型、狀態(tài)、耗時(shí)等關(guān)鍵信息
- 包含每個(gè)表的處理結(jié)果統(tǒng)計(jì)
5.文件管理:
- 自動(dòng)創(chuàng)建所需目錄
- 使用ZIP_DEFLATED進(jìn)行高效壓縮
- 安全的文件路徑處理
6.性能優(yōu)化:
- 使用服務(wù)器端游標(biāo)避免內(nèi)存過(guò)載
- 可配置的批量大小和線(xiàn)程數(shù)
- 異步I/O操作
使用說(shuō)明:
安裝依賴(lài):
pip install mysql-connector-python
修改配置:
更新db_config中的數(shù)據(jù)庫(kù)連接信息
根據(jù)需要調(diào)整目錄路徑(csv_dir, zip_dir, unzip_dir)
運(yùn)行腳本:
python script.py
查看日志:
實(shí)時(shí)終端輸出
詳細(xì)日志文件data_export.log
擴(kuò)展建議:
通過(guò)命令行參數(shù)接受數(shù)據(jù)庫(kù)配置和目錄路徑
添加郵件通知功能(處理完成或失敗時(shí)通知)
實(shí)現(xiàn)斷點(diǎn)續(xù)傳功能
添加文件校驗(yàn)(MD5校驗(yàn)和)
支持配置文件(YAML/JSON格式)
添加進(jìn)度條顯示
到此這篇關(guān)于Python實(shí)現(xiàn)將MySQL中所有表的數(shù)據(jù)都導(dǎo)出為CSV文件并壓縮的文章就介紹到這了,更多相關(guān)Python MySQL數(shù)據(jù)導(dǎo)出為CSV內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 使用python將csv數(shù)據(jù)導(dǎo)入mysql數(shù)據(jù)庫(kù)
- 使用python的pandas庫(kù)讀取csv文件保存至mysql數(shù)據(jù)庫(kù)
- Python之csv文件從MySQL數(shù)據(jù)庫(kù)導(dǎo)入導(dǎo)出的方法
- python 從csv讀數(shù)據(jù)到mysql的實(shí)例
- Python實(shí)現(xiàn)將MySQL數(shù)據(jù)庫(kù)表中的數(shù)據(jù)導(dǎo)出生成csv格式文件的方法
- 利用Python批量導(dǎo)出mysql數(shù)據(jù)庫(kù)表結(jié)構(gòu)的操作實(shí)例
相關(guān)文章
python實(shí)現(xiàn)動(dòng)態(tài)創(chuàng)建類(lèi)的方法分析
這篇文章主要介紹了python實(shí)現(xiàn)動(dòng)態(tài)創(chuàng)建類(lèi)的方法,結(jié)合實(shí)例形式分析了Python動(dòng)態(tài)創(chuàng)建類(lèi)的原理、實(shí)現(xiàn)方法及相關(guān)操作技巧,需要的朋友可以參考下2019-06-06python多線(xiàn)程并發(fā)讓兩個(gè)LED同時(shí)亮的方法
今天小編就為大家分享一篇python多線(xiàn)程并發(fā)讓兩個(gè)LED同時(shí)亮的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-02-02Python任意字符串轉(zhuǎn)16, 32, 64進(jìn)制的方法
今天小編就為大家分享一篇Python任意字符串轉(zhuǎn)16, 32, 64進(jìn)制的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-06-06將自己的數(shù)據(jù)集制作成TFRecord格式教程
今天小編就為大家分享一篇將自己的數(shù)據(jù)集制作成TFRecord格式教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-02-02Python 通過(guò)requests實(shí)現(xiàn)騰訊新聞抓取爬蟲(chóng)的方法
今天小編就為大家分享一篇Python 通過(guò)requests實(shí)現(xiàn)騰訊新聞抓取爬蟲(chóng)的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-02-02python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程
本文主要介紹了python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01Python安裝第三方庫(kù)及常見(jiàn)問(wèn)題處理方法匯總
本文給大家匯總介紹了Python安裝第三方庫(kù)及常見(jiàn)問(wèn)題處理方法,非常的簡(jiǎn)單使用,有需要的小伙伴可以參考下2016-09-09Python實(shí)現(xiàn)爬取馬云的微博功能示例
這篇文章主要介紹了Python實(shí)現(xiàn)爬取馬云的微博功能,結(jié)合實(shí)例形式較為詳細(xì)的分析了Python模擬ajax請(qǐng)求爬取馬云微博的相關(guān)操作技巧與注意事項(xiàng),需要的朋友可以參考下2019-02-02