python 解決動(dòng)態(tài)的定義變量名,并給其賦值的方法(大數(shù)據(jù)處理)
最近消費(fèi)kafka數(shù)據(jù)到磁盤的時(shí)候遇到了這樣的問題:
需求:每天大概有1千萬(wàn)條數(shù)據(jù),每條數(shù)據(jù)包含19個(gè)字段信息,需要將數(shù)據(jù)寫到服務(wù)器磁盤,以第二個(gè)字段作為大類建立目錄,第7個(gè)字段作為小類配合時(shí)間戳作為文件名,臨時(shí)文件后綴tmp,當(dāng)每個(gè)文件的寫入條數(shù)(可配置,比如100條)達(dá)到要求條數(shù)時(shí),將后綴tmp改為out。
問題:大類共有30個(gè),小類不計(jì)其數(shù)而且未知,比如大類為A,小類為a,時(shí)間戳為20180606095835234,則A目錄下的文件名為20180606095835234_a.tmp,這樣一來(lái)需要在此文件寫滿100條時(shí),更新時(shí)間戳生成第二個(gè)文件名,如果此時(shí)有1000個(gè)文件都在寫則需要有1000個(gè)時(shí)間戳,和1000個(gè)計(jì)數(shù)器記錄每個(gè)文件當(dāng)前的條數(shù),如果分別定義1000個(gè)變量顯然是不劃算的,
嘗試:中間過程想到了動(dòng)態(tài)定義變量名,即
定義第七個(gè)字段:seven = data.split('|')[7]
定義文件名:filename = time_stamp + '_' + seven+'.tmp',
定義文件計(jì)數(shù)器:seven + ‘_num' = 0
定義文件時(shí)間戳:seven + '_stamp' = time.time( )
想法其實(shí)是沒問題的,但是這里用到了一個(gè)不常用的語(yǔ)法:用一個(gè)變量名和一個(gè)字符串拼接出來(lái)一個(gè)新的變量名,并繼續(xù)賦值(不知道我的表述是否清楚),試過了用local()函數(shù)、global()函數(shù)、exec()函數(shù)都沒有達(dá)到預(yù)期效果,也許是把問題想的太復(fù)雜了
解決:最后使用三個(gè)字典將這個(gè)問題完美解決,
定義一個(gè)字典用來(lái)存計(jì)數(shù)器,字典的每一個(gè)鍵對(duì)應(yīng)一個(gè)文件名,值對(duì)應(yīng)當(dāng)前計(jì)數(shù),并實(shí)時(shí)更新;
定義一個(gè)字典用來(lái)存時(shí)間戳,鍵對(duì)應(yīng)一個(gè)文件名,值對(duì)應(yīng)時(shí)間戳,達(dá)到100條就更新一次;
定義一個(gè)字典用來(lái)存大類,鍵對(duì)應(yīng)代號(hào),值對(duì)應(yīng)分類;
局部功能代碼如下:
def kafka_to_disk(): print('啟動(dòng)前檢測(cè)上次運(yùn)行時(shí)是否存在意外中斷的數(shù)據(jù)文件......') print('搜索最近一次執(zhí)行腳本產(chǎn)生的時(shí)間目錄......') # 待處理臨時(shí)文件列表 tmp_list = [] try: for category_dir in os.listdir(local_file_path): if len(os.listdir(local_file_path+os.sep+category_dir)) > 0: for file in os.listdir(local_file_path+os.sep+category_dir): if suffix in file: tmp_list.append(local_file_path+os.sep+category_dir+os.sep+file) # print('上次運(yùn)行程序產(chǎn)生的臨時(shí)文件有---{}'.format(tmp_list)) except Exception as e: pass if len(tmp_list) == 0: print('未掃描任何殘留臨時(shí)文件') else: print('開始修復(fù)殘留臨時(shí)文件......') tmp_num = 0 for tmp in tmp_list: os.rename(tmp, tmp.split('.')[0]+'.out') tmp_num += 1 print('本次啟動(dòng)共修復(fù)殘留臨時(shí)文件★★★★★-----{}個(gè)-----★★★★★'.format(tmp_num)) category_poor = { '1': 'news', '2': 'weibo', '3': 'weixin', '4': 'app', '5': 'newspaper', '6': 'luntan', '7': 'blog', '8': 'video', '9': 'shangji', '10': 'shangjia', '11': 'gtzy', '12': 'zfztb', '13': 'gyfp', '14': 'gjz', '15': 'zfxx', '16': 'ptztb', '17': 'company', '18': 'house', '19': 'hospital', '20': 'bank', '21': 'zone', '22': 'express', '23': 'zpgw', '24': 'zscq', '25': 'hotel', '26': 'cpws', '27': 'gxqy', '28': 'gpjj', '29': 'dtyy', '30': 'bdbk'} time_stamp = utils.get_time_stamp() # 初始化毫秒級(jí)時(shí)間戳 : 20180509103015125 consumer = KafkaConsumer(topic, group_id=group_id, auto_offset_reset=auto_offset_reset, bootstrap_servers=eval(bootstrap_servers)) print('連接kafka成功,數(shù)據(jù)篩選中......') file_poor = {} # 子類池用于文件計(jì)數(shù)器 time_stamp_poor = {} # 子類時(shí)間戳池,用于觸發(fā)文件切換 time_stamp = utils.get_time_stamp() # 初始化毫秒級(jí)時(shí)間戳 :20180509103015125 for message in consumer: # 提取第8個(gè)字段自動(dòng)匹配目錄進(jìn)行創(chuàng)建 if message.value.decode().split('|')[1] in category_poor: category = category_poor[message.value.decode().split('|')[1]] else: print(message.value.decode()) continue category_dir = local_file_path + os.sep + category if not os.path.exists(category_dir): os.makedirs(category_dir) # 提取第2個(gè)字段,用于生成文件名 if message.value.decode().split('|')[7] in time_stamp_poor: shot_file_name = time_stamp_poor[message.value.decode().split('|')[7]] + '_' + message.value.decode().split('|')[7] else: shot_file_name = time_stamp + '_' + message.value.decode().split('|')[7] file_name = category_dir + os.sep + shot_file_name + '.tmp' # 給每一個(gè)文件設(shè)定一個(gè)計(jì)數(shù)器 if message.value.decode().split('|')[7] not in file_poor: file_poor[message.value.decode().split('|')[7]] = 0 with open(file_name, 'a', encoding='utf-8')as f1: f1.write(message.value.decode()) file_poor[message.value.decode().split('|')[7]] += 1 # 觸發(fā)切換文件的操作,用時(shí)間戳生成第二文件名 if file_poor[message.value.decode().split('|')[7]] == strip_number: time_stamp_poor[message.value.decode().split('|')[7]] = utils.get_time_stamp() file_poor[message.value.decode().split('|')[7]] = 0
以上這篇python 解決動(dòng)態(tài)的定義變量名,并給其賦值的方法(大數(shù)據(jù)處理)就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python爬蟲天氣預(yù)報(bào)實(shí)例詳解(小白入門)
這篇文章主要介紹了Python爬蟲天氣預(yù)報(bào)實(shí)例詳解(小白入門),詳細(xì)介紹了整個(gè)爬蟲建立的流程,最后分享了實(shí)現(xiàn)代碼,很簡(jiǎn)潔,小編覺得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01Python2和Python3之間的str處理方式導(dǎo)致亂碼的講解
今天小編就為大家分享一篇關(guān)于Python2和Python3之間的str處理方式導(dǎo)致亂碼的講解,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-01-01解決Pytorch修改預(yù)訓(xùn)練模型時(shí)遇到key不匹配的情況
這篇文章主要介紹了解決Pytorch修改預(yù)訓(xùn)練模型時(shí)遇到key不匹配的情況,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Python中實(shí)現(xiàn)最小二乘法思路及實(shí)現(xiàn)代碼
這篇文章主要介紹了Python中實(shí)現(xiàn)最小二乘法思路及實(shí)現(xiàn)代碼,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01python使用多線程不斷刷新網(wǎng)頁(yè)的方法
這篇文章主要介紹了python使用多線程不斷刷新網(wǎng)頁(yè)的方法,涉及Python多線程thread及time模塊操作技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-03-03基于python實(shí)現(xiàn)開箱即用的桌面時(shí)鐘
這篇文章主要為大家詳細(xì)介紹了如何基于python實(shí)現(xiàn)開箱一個(gè)即用的桌面時(shí)鐘,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,需要的小伙伴可以參考下2023-12-12