Python實現將MySQL中所有表的數據都導出為CSV文件并壓縮
Python將MySQL數據庫中所有表的數據都導出為CSV文件到一個目錄,并壓縮為zip文件到另一個目錄下,然后解壓縮這個目錄中的所有zip文件到第三個目錄下。不使用Pandas庫,需要考慮SQL結果集是大數據量分批數據導出的情況,通過多線程和異步操作來提高程序性能,程序需要異常處理和輸出,輸出出錯時的錯誤信息,每次每個查詢導出數據的運行狀態(tài)和表數據行數以及運行時間戳,導出時間,輸出每個文件記錄數量的日志。
該腳本已在考慮大數據量、異常處理和性能優(yōu)化的基礎上進行了全面設計,能夠處理大多數常見場景。根據具體需求可進一步調整批量大?。╞atch_size)和線程數(max_workers)以獲得最佳性能。
import os import csv import zipfile import logging import mysql.connector from datetime import datetime import time import concurrent.futures import glob # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('data_export.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def export_table_to_csv(table_name, csv_path, db_config, batch_size=1000): """導出單個表的數據到CSV文件,分批處理""" conn = None cursor = None total_rows = 0 try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor() # 獲取數據并寫入CSV with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) # 執(zhí)行查詢并獲取列名 cursor.execute(f"SELECT * FROM `{table_name}`") columns = [col[0] for col in cursor.description] writer.writerow(columns) # 分批獲取數據 while True: rows = cursor.fetchmany(batch_size) if not rows: break writer.writerows(rows) total_rows += len(rows) logger.debug(f"{table_name} 已導出 {total_rows} 行") logger.info(f"{table_name} CSV導出完成,總行數:{total_rows}") return total_rows except Exception as e: logger.error(f"導出表 {table_name} 失敗: {str(e)}", exc_info=True) raise finally: if cursor: cursor.close() if conn and conn.is_connected(): conn.close() def compress_to_zip(source_path, zip_path): """壓縮文件為ZIP格式""" try: with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: zipf.write(source_path, arcname=os.path.basename(source_path)) logger.info(f"成功壓縮 {source_path} 到 {zip_path}") except Exception as e: logger.error(f"壓縮 {source_path} 失敗: {str(e)}", exc_info=True) raise def process_table(table_name, db_config, csv_dir, zip_dir): """處理單個表的導出和壓縮""" start_time = time.time() logger.info(f"開始處理表: {table_name}") status = "成功" rows_exported = 0 try: # 定義文件路徑 csv_filename = f"{table_name}.csv" zip_filename = f"{table_name}.zip" csv_path = os.path.join(csv_dir, csv_filename) zip_path = os.path.join(zip_dir, zip_filename) # 導出CSV rows_exported = export_table_to_csv(table_name, csv_path, db_config) # 壓縮文件 compress_to_zip(csv_path, zip_path) except Exception as e: status = f"失敗: {str(e)}" # 清理可能存在的中間文件 for path in [csv_path, zip_path]: if path and os.path.exists(path): try: os.remove(path) logger.warning(f"已清理文件: {path}") except Exception as clean_error: logger.error(f"清理文件失敗: {clean_error}") finally: duration = time.time() - start_time log_message = ( f"表處理完成 - 表名: {table_name}, " f"狀態(tài): {status}, " f"導出行數: {rows_exported}, " f"耗時: {duration:.2f}秒" ) logger.info(log_message) def unzip_files(zip_dir, unzip_dir): """解壓指定目錄中的所有ZIP文件""" zip_files = glob.glob(os.path.join(zip_dir, '*.zip')) if not zip_files: logger.warning("未找到ZIP文件,跳過解壓") return with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for zip_path in zip_files: futures.append(executor.submit( lambda: extract_zip(zip_path, unzip_dir) )) for future in concurrent.futures.as_completed(futures): try: future.result() except Exception as e: logger.error(f"解壓過程中發(fā)生錯誤: {str(e)}") def extract_zip(zip_path, unzip_dir): """解壓單個ZIP文件""" try: start_time = time.time() with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(unzip_dir) duration = time.time() - start_time logger.info(f"解壓完成: {zip_path} => {unzip_dir} (耗時: {duration:.2f}秒)") except Exception as e: logger.error(f"解壓 {zip_path} 失敗: {str(e)}", exc_info=True) raise def main(): # 配置參數 db_config = { 'host': 'localhost', 'user': 'your_username', 'password': 'your_password', 'database': 'your_database' } # 目錄配置 base_dir = os.path.dirname(os.path.abspath(__file__)) csv_dir = os.path.join(base_dir, 'csv_exports') zip_dir = os.path.join(base_dir, 'zip_archives') unzip_dir = os.path.join(base_dir, 'unzipped_files') # 創(chuàng)建目錄 for dir_path in [csv_dir, zip_dir, unzip_dir]: os.makedirs(dir_path, exist_ok=True) logger.info(f"目錄已準備: {dir_path}") # 獲取所有表名 try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor() cursor.execute("SHOW TABLES") tables = [table[0] for table in cursor.fetchall()] logger.info(f"發(fā)現 {len(tables)} 個需要處理的表") except Exception as e: logger.error(f"獲取數據庫表失敗: {str(e)}", exc_info=True) return finally: if 'cursor' in locals(): cursor.close() if 'conn' in locals() and conn.is_connected(): conn.close() # 處理所有表(多線程導出和壓縮) with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [] for table in tables: futures.append(executor.submit( process_table, table, db_config, csv_dir, zip_dir )) # 處理任務結果 for future in concurrent.futures.as_completed(futures): try: future.result() except Exception as e: logger.error(f"表處理異常: {str(e)}") # 解壓所有ZIP文件(多線程解壓) logger.info("開始解壓所有ZIP文件") unzip_files(zip_dir, unzip_dir) logger.info("全部處理流程完成") if __name__ == "__main__": main()
關鍵特性說明:
1.分批處理大數據:
- 使用fetchmany(batch_size)分批獲取數據(默認每批1000行)
- 流式處理減少內存占用
2.多線程處理:
- 使用ThreadPoolExecutor并行處理不同表的導出和壓縮
- 獨立的數據庫連接池(每個線程有自己的連接)
- 并行解壓處理
3.異常處理:
- 全面的try-except塊覆蓋所有關鍵操作
- 自動清理失敗時產生的中間文件
- 詳細的錯誤日志記錄(包含堆棧跟蹤)
4.日志記錄:
- 同時輸出到文件和終端
- 記錄時間戳、操作類型、狀態(tài)、耗時等關鍵信息
- 包含每個表的處理結果統(tǒng)計
5.文件管理:
- 自動創(chuàng)建所需目錄
- 使用ZIP_DEFLATED進行高效壓縮
- 安全的文件路徑處理
6.性能優(yōu)化:
- 使用服務器端游標避免內存過載
- 可配置的批量大小和線程數
- 異步I/O操作
使用說明:
安裝依賴:
pip install mysql-connector-python
修改配置:
更新db_config中的數據庫連接信息
根據需要調整目錄路徑(csv_dir, zip_dir, unzip_dir)
運行腳本:
python script.py
查看日志:
實時終端輸出
詳細日志文件data_export.log
擴展建議:
通過命令行參數接受數據庫配置和目錄路徑
添加郵件通知功能(處理完成或失敗時通知)
實現斷點續(xù)傳功能
添加文件校驗(MD5校驗和)
支持配置文件(YAML/JSON格式)
添加進度條顯示
到此這篇關于Python實現將MySQL中所有表的數據都導出為CSV文件并壓縮的文章就介紹到這了,更多相關Python MySQL數據導出為CSV內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
python實現動態(tài)創(chuàng)建類的方法分析
這篇文章主要介紹了python實現動態(tài)創(chuàng)建類的方法,結合實例形式分析了Python動態(tài)創(chuàng)建類的原理、實現方法及相關操作技巧,需要的朋友可以參考下2019-06-06Python 通過requests實現騰訊新聞抓取爬蟲的方法
今天小編就為大家分享一篇Python 通過requests實現騰訊新聞抓取爬蟲的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-02-02python使用redis實現消息隊列(異步)的實現完整例程
本文主要介紹了python使用redis實現消息隊列(異步)的實現完整例程,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-01-01