Python利用多進(jìn)程將大量數(shù)據(jù)放入有限內(nèi)存的教程
簡介
這是一篇有關(guān)如何將大量的數(shù)據(jù)放入有限的內(nèi)存中的簡略教程。
與客戶工作時,有時會發(fā)現(xiàn)他們的數(shù)據(jù)庫實際上只是一個csv或Excel文件倉庫,你只能將就著用,經(jīng)常需要在不更新他們的數(shù)據(jù)倉庫的情況下完成工作。大部分情況下,如果將這些文件存儲在一個簡單的數(shù)據(jù)庫框架中或許更好,但時間可能不允許。這種方法對時間、機(jī)器硬件和所處環(huán)境都有要求。
下面介紹一個很好的例子:假設(shè)有一堆表格(沒有使用Neo4j、MongoDB或其他類型的數(shù)據(jù)庫,僅僅使用csvs、tsvs等格式存儲的表格),如果將所有表格組合在一起,得到的數(shù)據(jù)幀太大,無法放入內(nèi)存。所以第一個想法是:將其拆分成不同的部分,逐個存儲。這個方案看起來不錯,但處理起來很慢。除非我們使用多核處理器。
目標(biāo)
這里的目標(biāo)是從所有職位中(大約1萬個),找出相關(guān)的的職位。將這些職位與政府給的職位代碼組合起來。接著將組合的結(jié)果與對應(yīng)的州(行政單位)信息組合起來。然后用通過word2vec生成的屬性信息在我們的客戶的管道中增強(qiáng)已有的屬性。
這個任務(wù)要求在短時間內(nèi)完成,誰也不愿意等待。想象一下,這就像在不使用標(biāo)準(zhǔn)的關(guān)系型數(shù)據(jù)庫的情況下進(jìn)行多個表的連接。
數(shù)據(jù)
示例腳本
下面的是一個示例腳本,展示了如何使用multiprocessing來在有限的內(nèi)存空間中加速操作過程。腳本的第一部分是和特定任務(wù)相關(guān)的,可以自由跳過。請著重關(guān)注第二部分,這里側(cè)重的是multiprocessing引擎。
#import the necessary packages import pandas as pd import us import numpy as np from multiprocessing import Pool,cpu_count,Queue,Manager # the data in one particular column was number in the form that horrible excel version # of a number where '12000' is '12,000' with that beautiful useless comma in there. # did I mention I excel bothers me? # instead of converting the number right away, we only convert them when we need to def median_maker(column): return np.median([int(x.replace(',','')) for x in column]) # dictionary_of_dataframes contains a dataframe with information for each title; e.g title is 'Data Scientist' # related_title_score_df is the dataframe of information for the title; columns = ['title','score'] ### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871 # code_title_df contains columns ['code','title'] # oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!) def job_title_location_matcher(title,location): try: related_title_score_df = dictionary_of_dataframes[title] # we limit dataframe1 to only those related_titles that are above # a previously established threshold related_title_score_df = related_title_score_df[title_score_df['score']>80] #we merge the related titles with another table and its codes codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df) codes_relTitles_scores = codes_relTitles_scores.drop_duplicates() # merge the two dataframes by the codes merged_df = pd.merge(codes_relTitles_scores, oes_data_df) #limit the BLS data to the state we want all_merged = merged_df[merged_df['area_title']==str(us.states.lookup(location).name)] #calculate some summary statistics for the time we want group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker) row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90] #convert it all to strings so we can combine them all when writing to file row_string = [str(x) for x in row] return row_string except: # if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant 'do nothing'
這里發(fā)生了神奇的事情:
#runs the function and puts the answers in the queue def worker(row, q): ans = job_title_location_matcher(row[0],row[1]) q.put(ans) # this writes to the file while there are still things that could be in the queue # this allows for multiple processes to write to the same file without blocking eachother def listener(q): f = open(filename,'wb') while 1: m = q.get() if m =='kill': break f.write(','.join(m) + 'n') f.flush() f.close() def main(): #load all your data, then throw out all unnecessary tables/columns filename = 'skill_TEST_POOL.txt' #sets up the necessary multiprocessing tasks manager = Manager() q = manager.Queue() pool = Pool(cpu_count() + 2) watcher = pool.map_async(listener,(q,)) jobs = [] #titles_states is a dataframe of millions of job titles and states they were found in for i in titles_states.iloc: job = pool.map_async(worker, (i, q)) jobs.append(job) for job in jobs: job.get() q.put('kill') pool.close() pool.join() if __name__ == "__main__": main()
由于每個數(shù)據(jù)幀的大小都不同(總共約有100Gb),所以將所有數(shù)據(jù)都放入內(nèi)存是不可能的。通過將最終的數(shù)據(jù)幀逐行寫入內(nèi)存,但從來不在內(nèi)存中存儲完整的數(shù)據(jù)幀。我們可以完成所有的計算和組合任務(wù)。這里的“標(biāo)準(zhǔn)方法”是,我們可以僅僅在“job_title_location_matcher”的末尾編寫一個“write_line”方法,但這樣每次只會處理一個實例。根據(jù)我們需要處理的職位/州的數(shù)量,這大概需要2天的時間。而通過multiprocessing,只需2個小時。
雖然讀者可能接觸不到本教程處理的任務(wù)環(huán)境,但通過multiprocessing,可以突破許多計算機(jī)硬件的限制。本例的工作環(huán)境是c3.8xl ubuntu ec2,硬件為32核60Gb內(nèi)存(雖然這個內(nèi)存很大,但還是無法一次性放入所有數(shù)據(jù))。這里的關(guān)鍵之處是我們在60Gb的內(nèi)存的機(jī)器上有效的處理了約100Gb的數(shù)據(jù),同時速度提升了約25倍。通過multiprocessing在多核機(jī)器上自動處理大規(guī)模的進(jìn)程,可以有效提高機(jī)器的利用率。也許有些讀者已經(jīng)知道了這個方法,但對于其他人,可以通過multiprocessing能帶來非常大的收益。順便說一句,這部分是skill assets in the job-market這篇博文的延續(xù)。
- Python StringIO模塊實現(xiàn)在內(nèi)存緩沖區(qū)中讀寫數(shù)據(jù)
- 解決python讀取幾千萬行的大表內(nèi)存問題
- python監(jiān)控linux內(nèi)存并寫入mongodb(推薦)
- 用Python的Flask框架結(jié)合MySQL寫一個內(nèi)存監(jiān)控程序
- 使用Python獲取CPU、內(nèi)存和硬盤等windowns系統(tǒng)信息的2個例子
- python中查看變量內(nèi)存地址的方法
- 談?wù)勅绾问謩俞尫臥ython的內(nèi)存
- Python深入學(xué)習(xí)之內(nèi)存管理
- 2款Python內(nèi)存檢測工具介紹和使用方法
- 淺談Python 對象內(nèi)存占用
- Python內(nèi)存讀寫操作示例
相關(guān)文章
Python使用pathlib庫實現(xiàn)優(yōu)雅的處理路徑
如果你需要在 Python 里進(jìn)行文件處理,那么標(biāo)準(zhǔn)庫中的os和os.path兄弟倆一定是你無法避開的兩個模塊,本文主要來和大家聊聊如何使用pathlib庫實現(xiàn)優(yōu)雅的處理路徑,感興趣的可以了解下2023-12-12python中time tzset()函數(shù)實例用法
在本篇文章里小編給大家整理的是一篇關(guān)于python中time tzset()函數(shù)實例用法內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。2021-02-02一文詳解Python中實現(xiàn)單例模式的幾種常見方式
這篇文章主要為大家介紹了Python中實現(xiàn)單例模式的幾種常見方式示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03Python 利用pandas和mysql-connector獲取Excel數(shù)據(jù)寫入到MySQL數(shù)據(jù)庫
在實際應(yīng)用中,我們可能需要將Excel表格中的數(shù)據(jù)導(dǎo)入到MySQL數(shù)據(jù)庫中,以便于進(jìn)行進(jìn)一步的數(shù)據(jù)分析和處理,本文將介紹如何使用Python將Excel表格中的數(shù)據(jù)插入到MySQL數(shù)據(jù)庫中,需要的朋友可以參考下2023-10-10使用Python的Bottle框架寫一個簡單的服務(wù)接口的示例
這篇文章主要介紹了使用Python的Bottle框架寫一個簡單的服務(wù)接口的示例,基于Linux系統(tǒng)環(huán)境,需要的朋友可以參考下2015-08-08Python TCP全連接攻擊中SockStress全連接攻擊詳解
Sock Stress 全連接攻擊屬于TCP全連接攻擊,因為需要建立一次完整的TCP三次握手,該攻擊的關(guān)鍵點就在于,攻擊主機(jī)將windows窗口緩沖設(shè)置為0,實現(xiàn)的拒絕服務(wù)2022-10-10淺談python print(xx, flush = True) 全網(wǎng)最清晰的解釋
今天小編就為大家分享一篇淺談python print(xx, flush = True) 全網(wǎng)最清晰的解釋,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-02-02