Python大批量寫入數據(百萬級別)的方法
背景
現有一個百萬行數據的csv格式文件,需要在兩分鐘之內存入數據庫。
方案
方案一:多線程+協(xié)程+異步MySql方案二:多線程+MySql批量插入
代碼
1,先通過pandas讀取所有csv數據存入列表。
2,設置N個線程,將一百萬數據均分為N份,以start,end傳遞給線程以切片的方法讀取區(qū)間數據(建議為16個線程)
3,方案二 線程內以 executemany 方法批量插入所有數據。
4,方案一 線程內使用異步事件循環(huán)遍歷所有數據異步插入。
5,方案一純屬沒事找事型。
方案二
import threading import pandas as pd import asyncio import time import aiomysql import pymysql data=[] error_data=[] def run(start,end): global data global error_data print("start"+threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) mysdb = getDb("*", *, "*", "*", "*") cursor = mysdb.cursor() sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" cursor.executemany(sql,data[start:end]) mysdb.commit() mysdb.close() print("end" + threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) def csv_file_read_use_pd(csvFile): csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t') csv_result = csv_result.fillna(value="None") result = csv_result.values.tolist() return result class MyDataBase: def __init__(self,host=None,port=None,username=None,password=None,database=None): self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database) def close(self): self.db.close() def getDb(host,port,username,password,database): MyDb = MyDataBase(host, port, username, password,database) return MyDb.db def main(csvFile): global data #獲取全局對象 csv全量數據 #讀取所有的數據 將所有數據均分成 thread_lens 份 分發(fā)給 thread_lens 個線程去執(zhí)行 thread_lens=20 csv_result=csv_file_read_use_pd(csvFile) day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) for item in csv_result: item.insert(0,day) data=csv_result thread_exe_count_list=[] #線程需要執(zhí)行的區(qū)間 csv_lens=len(csv_result) avg = csv_lens // thread_lens remainder=csv_lens % thread_lens # 0,27517 27517,55,034 nowIndex=0 for i in range(thread_lens): temp=[nowIndex,nowIndex+avg] nowIndex=nowIndex+avg thread_exe_count_list.append(temp) thread_exe_count_list[-1:][0][1]+=remainder #余數分給最后一個線程 # print(thread_exe_count_list) #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1]) for i in range(thread_lens): sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],)) sub_thread.start() sub_thread.join() time.sleep(3) if __name__=="__main__": #csv_file_read_use_pd("分公司箱型箱量.csv") main("分公司箱型箱量.csv")
方案一
import threading import pandas as pd import asyncio import time import aiomysql data=[] error_data=[] async def async_basic(loop,start,end): global data global error_data print("start"+threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) conn = await aiomysql.connect( host="*", port=*, user="*", password="*", db="*", loop=loop ) day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" async with conn.cursor() as cursor: for item in data[start:end]: params=[day] params.extend(item) try: x=await cursor.execute(sql,params) if x==0: error_data.append(item) print(threading.current_thread().name+" result "+str(x)) except Exception as e: print(e) error_data.append(item) time.sleep(10) pass await conn.close() #await conn.commit() #關閉連接池 # pool.close() # await pool.wait_closed() print("end" + threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) def csv_file_read_use_pd(csvFile): csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t') csv_result = csv_result.fillna(value="None") result = csv_result.values.tolist() return result def th(start,end): loop = asyncio.new_event_loop() loop.run_until_complete(async_basic(loop,start,end)) def main(csvFile): global data #獲取全局對象 csv全量數據 #讀取所有的數據 將所有數據均分成 thread_lens 份 分發(fā)給 thread_lens 個線程去執(zhí)行 thread_lens=20 csv_result=csv_file_read_use_pd(csvFile) data=csv_result thread_exe_count_list=[] #線程需要執(zhí)行的區(qū)間 csv_lens=len(csv_result) avg = csv_lens // thread_lens remainder=csv_lens % thread_lens # 0,27517 27517,55,034 nowIndex=0 for i in range(thread_lens): temp=[nowIndex,nowIndex+avg] nowIndex=nowIndex+avg thread_exe_count_list.append(temp) thread_exe_count_list[-1:][0][1]+=remainder #余數分給最后一個線程 print(thread_exe_count_list) #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1]) for i in range(thread_lens): sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],)) sub_thread.start() time.sleep(3) if __name__=="__main__": #csv_file_read_use_pd("分公司箱型箱量.csv") main("分公司箱型箱量.csv")
總結
到此這篇關于Python大批量寫入數據的文章就介紹到這了,更多相關Python大批量寫入數據內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Python網絡通訊之TCP協(xié)議實現服務器和客戶端實例
這篇文章主要介紹了Python網絡通訊之TCP協(xié)議實現服務器和客戶端實例, socket編程一種獨立于協(xié)議的網絡編程接口,應用程序可以通過它發(fā)送或接收數據,可對其進行像對文件一樣的打開、讀寫和關閉等操作,需要的朋友可以參考下2023-08-08python中?OpenCV和Pillow處理圖像操作及時間對比
這篇文章主要介紹了python中OpenCV和Pillow處理圖像操作及時間對比,文章圍繞主題展開詳細的內容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-09-09Python使用Matplotlib進行圖案填充和邊緣顏色分離的三種方法
Matplotlib是Python中功能強大的繪圖庫,允許廣泛的自定義選項,一個常見的要求是分離出圖中的圖案填充和邊緣顏色,默認情況下,Matplotlib中的填充顏色與邊緣顏色相關聯,但有一些方法可以獨立自定義這些顏色,本文將深入研究如何實現這一點的技術細節(jié),并提供分步說明和示例2025-01-01