Python實(shí)現(xiàn)實(shí)時(shí)增量數(shù)據(jù)加載工具的解決方案
本次主要分享結(jié)合單例模式實(shí)際應(yīng)用案例:實(shí)現(xiàn)實(shí)時(shí)增量數(shù)據(jù)加載工具的解決方案。最關(guān)鍵的是實(shí)現(xiàn)一個(gè)可進(jìn)行添加、修改、刪除等操作的增量ID記錄表。
單例模式:提供全局訪問(wèn)點(diǎn),確保類有且只有一個(gè)特定類型的對(duì)象。通常用于以下場(chǎng)景:日志記錄或數(shù)據(jù)庫(kù)操作等,避免對(duì)用一資源請(qǐng)求沖突。
創(chuàng)建增量ID記錄表
import?sqlite3 import?datetime import?pymssql import?pandas?as?pd import?time pd.set_option('expand_frame_repr',?False)
導(dǎo)入所需模塊
?#?創(chuàng)建數(shù)據(jù)表 database_path?=?r'.\Database\ID_Record.db' from?sqlite3?import?connect with?connect(database_path)?as?conn: ????conn.execute( ????????'CREATE?TABLE?IF?NOT?EXISTS?Incremental_data_max_id_record(id?INTEGER?PRIMARY?KEY?AUTOINCREMENT,F_SDaqID_MAX?TEXT,record_date?datetime)')
增量最新記錄ID-F_SDaqID_MAX數(shù)據(jù)庫(kù)存儲(chǔ)
#數(shù)據(jù)保存到本地txt def?text_save(filename,?record):#filename為寫入txt文件的路徑,record為要寫入F_SDaqID_MAX、record_date數(shù)據(jù)列表. ????file?=?open(filename,'a')?追加方式 ????#?file?=?open(filename,?'w')??#覆蓋方式 ????for?i?in?range(len(record)): ????????s?=?str(record[i]).replace('[','').replace(']','') ????????s?=?s.replace("'",'').replace(',','')?+'\n'???#去除單引號(hào),逗號(hào),每行末尾追加換行符 ????????file.write(s) ????file.close()
增量最新記錄ID-F_SDaqID_MAX臨時(shí)文件存儲(chǔ)
增量ID記錄提供了兩種實(shí)現(xiàn)方案 ,一個(gè)是數(shù)據(jù)持久化存儲(chǔ)模式,另一個(gè)是臨時(shí)文件存儲(chǔ)模式。數(shù)據(jù)持久化模式顧名思義,也就是說(shuō)在創(chuàng)建對(duì)象的時(shí)候,能將操作關(guān)鍵信息如增量ID-F_SDaqID_MAX記錄下來(lái),這種flag記錄映射是常選擇的設(shè)計(jì)模式。
數(shù)據(jù)庫(kù)連接類
實(shí)現(xiàn)實(shí)時(shí)增量數(shù)據(jù)獲取需要實(shí)現(xiàn)兩個(gè)數(shù)據(jù)庫(kù)連接類:增量數(shù)據(jù)ID存儲(chǔ)類和增量目標(biāo)數(shù)據(jù)源類。這里利用單例模式實(shí)現(xiàn)數(shù)據(jù)庫(kù)操作類,將增量服務(wù)記錄信息按照順序存儲(chǔ)到數(shù)據(jù)庫(kù)或特定的日志文件中,以維護(hù)數(shù)據(jù)的一致性。
1、增量數(shù)據(jù)ID存儲(chǔ)sqlite連接類代碼
class?Database_sqlite(metaclass=MetaSingleton): ????database_path?=?r'.\Database\energy_rc_configure.db' ????connection?=?None ????def?connect(self): ????????if?self.connection?is?None: ????????????self.connection?=?sqlite3.connect(self.database_path,check_same_thread=False,isolation_level=None) ????????????self.cursorobj?=??self.connection.cursor() ????????return?self.cursorobj,self.connection ????#?插入最大記錄 ????@staticmethod ????def?Insert_Max_ID_Record(f1,?f2): ????????cursor?=?Database_sqlite().connect() ????????print(cursor) ????????sql?=?f"""insert?into?Incremental_data_max_id_record(F_SDaqID_MAX,record_date)?values("{f1}","{f2}")""" ????????cursor[0].execute(sql) ????????#?sql?=?"insert??into?Incremental_data_max_id_record(F_SDaqID_MAX,record_date)?values(?,?)" ????????#?cursor[0].execute(sql,(f"{f1}",f"{f2}")) ????????cursor[1].commit() ????????print("插入成功!") ????????#?cursor[0].close() ????????return? ????#?取出增量數(shù)據(jù)庫(kù)中最新一次ID記錄 ????@staticmethod ????def?View_Max_ID_Records(): ????????cursor?=?Database_sqlite().connect() ????????sql?=?"select?max(F_SDaqID_MAX)?from?Incremental_data_max_id_record" ????????cursor[0].execute(sql) ????????results?=?cursor[0].fetchone()[0] ????????#?#單例模式不用關(guān)閉數(shù)據(jù)庫(kù)連接 ????????#?cursor[0].close() ????????print("最新記錄ID",?results) ????????return?results ????#刪除數(shù)據(jù)記錄ID ????@staticmethod ????def?Del_Max_ID_Records(): ????????cursor?=?Database_sqlite().connect() ????????sql?=?"delete?from?Incremental_data_max_id_record?where?record_date?=?(select?MAX(record_date)?from?Incremental_data_max_id_record)" ????????cursor[0].execute(sql) ????????#?results?=?cursor[0].fetchone()[0] ????????#?#?cursor[0].close() ????????cursor[1].commit() ????????print("刪除成功") ????????return
2、增量數(shù)據(jù)源sqlserver連接類代碼
class?Database_sqlserver(metaclass=MetaSingleton): ????""" ????#實(shí)時(shí)數(shù)據(jù)庫(kù) ????""" ????connection?=?None ????#?def?connect(self): ????def?__init__(self): ????????if?self.connection?is?None: ????????????self.connection?=?pymssql.connect(host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8") ????????????if?self.connection: ????????????????print("連接成功!") ????????????#?打開(kāi)數(shù)據(jù)庫(kù)連接 ????????????self.cursorobj?=?self.connection.cursor() ????????#?return?self.cursorobj,?self.connection ????#?獲取數(shù)據(jù)源中最大ID ????@staticmethod ????def?get_F_SDaqID_MAX(): ????????#?cursor_insert?=?Database_sqlserver().connect() ????????cursor_insert?=?Database_sqlserver().cursorobj ????????sql_MAXID?=?"""select?MAX(F_SDaqID)?from?T_DaqDataForEnergy""" ????????cursor_insert.execute(sql_MAXID)??#?執(zhí)行查詢語(yǔ)句,選擇表中所有數(shù)據(jù) ????????F_SDaqID_MAX?=?cursor_insert.fetchone()[0]??#?獲取記錄 ????????print("最大ID值:{0}".format(F_SDaqID_MAX)) ????????return?F_SDaqID_MAX ????#?提取增量數(shù)據(jù) ????@staticmethod ????def?get_incremental_data(incremental_Max_ID): ????????#?開(kāi)始獲取增量數(shù)據(jù) ????????sql_incremental_data?=?"""select?F_ID,F_Datetime,F_Data?from?T_DaqDataForEnergy??where?F_ID?>?{0}""".format( ????????????incremental_Max_ID) ????????#?cursor_find?=?Database_sqlserver().connect() ????????cursor_find?=?Database_sqlserver().cursorobj ????????cursor_find.execute(sql_incremental_data)??#?執(zhí)行查詢語(yǔ)句,選擇表中所有數(shù)據(jù) ????????Target_data_source?=?cursor_find.fetchall()??#?獲取所有數(shù)據(jù)記錄 ????????#?cursor_find.close() ????????cursor_find.close() ????????df?=?pd.DataFrame( ????????????Target_data_source, ????????????columns=[ ????????????????"F_ID", ????????????????"F_Datetime", ????????????????"F_Data"]) ????????print("提取數(shù)據(jù)",?df) ????????return?df
數(shù)據(jù)資源應(yīng)用服務(wù)設(shè)計(jì)主要考慮數(shù)據(jù)庫(kù)操作的一致性和優(yōu)化數(shù)據(jù)庫(kù)的各種操作,提高內(nèi)存或CPU利用率。
實(shí)現(xiàn)多種讀取和寫入操作,客戶端操作調(diào)用API,執(zhí)行相應(yīng)的DB操作。
注:
1、使用metaclass實(shí)現(xiàn)創(chuàng)建具有單例特征的類
Database_sqlserver(metaclass=MetaSingleton)
Database_sqlite(metaclass=MetaSingleton)
使用class定義新類時(shí),數(shù)據(jù)庫(kù)類Database_sqlserver由MetaSingleton裝飾后即指定了metaclass,那么MetaSingleton的特殊方法__call__方法將自動(dòng)執(zhí)行。
class?MetaSingleton(type): ????_instances={} ????def?__call__(cls,?*args,?**kwargs): ????????if?cls?not?in?cls._instances: ????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs) ????????return?cls._instances[cls]
以上代碼基于元類的單例實(shí)現(xiàn),當(dāng)客戶端對(duì)數(shù)據(jù)庫(kù)執(zhí)行某些操作時(shí),會(huì)多次實(shí)例化數(shù)據(jù)庫(kù)類,但是只創(chuàng)建一個(gè)對(duì)象,所以對(duì)數(shù)據(jù)庫(kù)的調(diào)用是同步的。
2、多線程使用同一數(shù)據(jù)庫(kù)連接資源需采取一定同步機(jī)制
如果沒(méi)采用同步機(jī)制,可能出現(xiàn)一些意料之外的情況
1)with cls.lock加鎖
class?MetaSingleton(type): ????_instances={} ????lock?=?threading.Lock() ????def?__call__(cls,?*args,?**kwargs): ????????with?cls.lock: ????????????if?cls?not?in?cls._instances: ????????????????time.sleep(0.05)??#模擬耗時(shí) ????????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs) ????????????return?cls._instances[cls]
鎖的創(chuàng)建和釋放需要消耗資源,上面代碼每次創(chuàng)建都必須獲得鎖。
3、如果我們開(kāi)發(fā)的程序非單個(gè)應(yīng)用,而是集群化的,即多個(gè)客戶端共享單個(gè)數(shù)據(jù)庫(kù),導(dǎo)致數(shù)據(jù)庫(kù)操作無(wú)法同步,而數(shù)據(jù)庫(kù)連接池是更好的選擇。大大節(jié)省了內(nèi)存,提高了服務(wù)器地服務(wù)效率,能夠支持更多的客戶服務(wù)。
數(shù)據(jù)庫(kù)連接池的解決方案是在應(yīng)用程序啟動(dòng)時(shí)建立足夠的數(shù)據(jù)庫(kù)連接,并講這些連接組成一個(gè)連接池,由應(yīng)用程序動(dòng)態(tài)地對(duì)池中的連接進(jìn)行申請(qǐng)、使用和釋放。對(duì)于多于連接池中連接數(shù)的并發(fā)請(qǐng)求,應(yīng)該在請(qǐng)求隊(duì)列中排隊(duì)等待。
增量數(shù)據(jù)服務(wù)客戶端
增量處理策略:第一次加載先判斷增量數(shù)據(jù)表中是否存在最新記錄,若有直接加載;否則,記錄一下最大/最新的數(shù)據(jù)記錄ID或時(shí)間點(diǎn),保存到一個(gè)增量數(shù)據(jù)庫(kù)或記錄文件中。
從第二次加載開(kāi)始只加載最大/最新的ID或時(shí)間點(diǎn)以后的數(shù)據(jù)。當(dāng)加載過(guò)程全部成功完成之后并同步更新增量數(shù)據(jù)庫(kù)或記錄文件,更新這次數(shù)據(jù)記錄的最后記錄ID或時(shí)間點(diǎn)。
一般這類數(shù)據(jù)記錄表有自增長(zhǎng)列,那么也可以使用自增長(zhǎng)列來(lái)實(shí)現(xiàn)這個(gè)標(biāo)識(shí)特征。比如本次我用到數(shù)據(jù)表增長(zhǎng)列F_ID。
class?IncrementalRecordServer: ????_servers?=?[] ????_instance?=?None ????def?__new__(cls,?*args,?**kwargs): ????????if?not?IncrementalRecordServer._instance: ????????????#?IncrementalRecordServer._instance?=?super().__new__(cls) ????????????IncrementalRecordServer._instance?=?super(IncrementalRecordServer,cls).__new__(cls) ????????return?IncrementalRecordServer._instance ????def?__init__(self,changeServersID=None): ????????""" ????????變量初始化過(guò)程 ????????""" ????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX() ????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S') ????????self.changeServersID?=?changeServersID ????#?回調(diào)更新本地記錄,清空記錄替換,臨時(shí)記錄 ????def?record(func): ????????def?Server_record(self): ????????????v?=?func(self) ????????????text_save(filename=r"F:\AutoOps_platform\Database\Server_record.txt",record=IncrementalRecordServer._servers) ????????????print("保存成功") ????????????return?v ????????return?Server_record ????#增加服務(wù)記錄 ????@record ????def?addServer(self): ????????self._servers.append([int(self.F_SDaqID_MAX),self.record_date]) ????????print("添加記錄") ????????Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX,?f2=self.record_date) ????#修改服務(wù)記錄 ????@record ????def?changeServers(self): ????????#?self._servers.pop() ????????#?此處傳入手動(dòng)修改的記錄ID ????????self._servers.append([self.changeServersID,self.record_date]) ????????#先刪除再插入實(shí)現(xiàn)修改 ????????Database_sqlite.Del_Max_ID_Records() ????????Database_sqlite.Insert_Max_ID_Record(f1=self.changeServersID,?f2=self.record_date) ????????print("更新記錄") ????#刪除服務(wù)記錄 ????@record ????def?popServers(self): ????????#?self._servers.pop() ????????print("刪除記錄") ????????Database_sqlite.Del_Max_ID_Records() ????#?最新服務(wù)記錄 ????def?getServers(self): ????????#?print(self._servers[-1]) ????????Max_ID_Records?=?Database_sqlite.View_Max_ID_Records() ????????print("查看記錄",Max_ID_Records) ????????return?Max_ID_Records ????#提取數(shù)據(jù) ????def?Incremental_data_client(self): ????????""" ????????#?提取數(shù)據(jù)(增量數(shù)據(jù)MAXID獲取,并提取增量數(shù)據(jù)) ????????""" ????????#?實(shí)時(shí)數(shù)據(jù)庫(kù) ????????#?第一次加載先判斷是否存在最新記錄 ????????if?self.getServers()?==?None: ????????????#?插入增量數(shù)據(jù)庫(kù)ID ????????????self.addServer() ????????????#?提取增量數(shù)據(jù) ????????????data?=?Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX) ????????????return?data ????????#?獲取增量數(shù)據(jù)庫(kù)中已有的最新最大ID記錄 ????????incremental_Max_ID?=?self.getServers() ????????#添加記錄 ????????self.addServer() ????????#?提取增量數(shù)據(jù) ????????Target_data_source?=?Database_sqlserver.get_incremental_data(incremental_Max_ID) ????????return?Target_data_source
優(yōu)化策略:
1、延遲加載方式
以上增量記錄服務(wù)類IncrementalRecordServer通過(guò)覆蓋__new__方法來(lái)控制對(duì)象的創(chuàng)建,我們?cè)趧?chuàng)建對(duì)象的時(shí)候會(huì)先檢查對(duì)象是否存在。也可以通過(guò)懶加載的方式實(shí)現(xiàn),節(jié)約資源優(yōu)化如下。
class?IncrementalRecordServer: ????_servers?=?[] ????_instance?=?None ????def?__init__(self,changeServersID=None): ????????""" ????????變量初始化過(guò)程 ????????""" ????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX() ????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S') ????????self.changeServersID?=?changeServersID ????????if?not?IncrementalRecordServer._instance: ????????????print("__init__對(duì)象創(chuàng)建") ????????else: ????????????print("對(duì)象已經(jīng)存在:",IncrementalRecordServer._instance) ????????????self.getInstance() ????@classmethod ????def?getInstance(cls): ????????if?not?cls._instance: ????????????cls._instance?=?IncrementalRecordServer() ????????return?cls._instance
懶漢式實(shí)例化能夠確保實(shí)際需要時(shí)才創(chuàng)建對(duì)象,實(shí)例化a= IncrementalRecordServer()時(shí),調(diào)用初始化__init__方法,但是沒(méi)有新的對(duì)象創(chuàng)建。懶漢式這種方式加載類對(duì)象,也稱為延遲加載方式。
2、單例模式能有效利用空間資源,每次利用同一空間資源。
不同操作對(duì)象的內(nèi)存地址相同,且不同對(duì)象初始化將上一個(gè)對(duì)象初始化變量覆蓋,確保最新記錄實(shí)時(shí)更新。表面上以上代碼實(shí)現(xiàn)了單例模式?jīng)]問(wèn)題,但多線程并發(fā)情況下,存在線程安全問(wèn)題,可能同時(shí)創(chuàng)建不同的對(duì)象空間??紤]到線程安全,也可以進(jìn)一步加鎖處理.
3、適用范圍及注意事項(xiàng)
本次代碼適用于部署生產(chǎn)指定時(shí)間點(diǎn)運(yùn)行之后產(chǎn)出的增量數(shù)據(jù),長(zhǎng)時(shí)間未啟用再啟動(dòng)需要清空歷史記錄即增量數(shù)據(jù)庫(kù)或文件ID需清空,一般實(shí)時(shí)數(shù)據(jù)增量實(shí)現(xiàn)一次加載沒(méi)有什么問(wèn)題,所以這一點(diǎn)也不用很關(guān)注(文件方式代碼可自行完善);當(dāng)加載歷史數(shù)據(jù)庫(kù)或定時(shí)間隔產(chǎn)生數(shù)據(jù)量過(guò)大時(shí),需要進(jìn)一步修改代碼,需要判斷數(shù)據(jù)規(guī)模,指定起始節(jié)點(diǎn)及加載數(shù)據(jù)量,綜合因素考慮,下次分享一下億級(jí)數(shù)據(jù)量提取方案。
4、進(jìn)一步了解Python垃圾回收機(jī)制;并發(fā)情況下,通過(guò)優(yōu)化線程池來(lái)管理資源。
最后可以添加一個(gè)函數(shù)來(lái)釋放資源
def?__del__(self): ????class_name?=?self.__class__.__name__ ????print(class_name,"銷毀")
del obj 調(diào)用__del__() 銷毀對(duì)象,釋放其空間;只有Python 對(duì)象在不再引用對(duì)象時(shí)被釋放。當(dāng)程序中有其它變量引用該實(shí)例對(duì)象時(shí),即便手動(dòng)調(diào)用 __del__() 方法,該方法也不會(huì)立即執(zhí)行。這和 Python 的垃圾回收機(jī)制的實(shí)現(xiàn)有關(guān)。
結(jié)果測(cè)試
if?__name__?==?'__main__': ????for?i?in?range(6): ????????hc1?=?IncrementalRecordServer() ????????hc1.addServer() ????????print("Record_ID",hc1._servers[i]) ????????#?del?hc1 ????????time.sleep(60) ????#Server2-客戶端client ????#?最新服務(wù)記錄 ????hc2?=?IncrementalRecordServer() ????hc2.getServers() ????#查看增量數(shù)據(jù) ????hc2.Incremental_data_client()
插入記錄
模擬每1分鐘插入一條記錄,向增量數(shù)據(jù)庫(kù)插入7條
if?__name__?==?'__main__': ????#?Server3-客戶端client ????#?手動(dòng)添加增量起始ID記錄 ????hc3?=?IncrementalRecordServer(changeServersID='346449980') ????hc3.changeServers()
if?__name__?==?'__main__': ????#刪除ID ????hc3?=?IncrementalRecordServer(changeServersID='346449980') ????#?hc3.changeServers() ????hc3.popServers()
以上就是Python實(shí)現(xiàn)實(shí)時(shí)增量數(shù)據(jù)加載工具的解決方案的詳細(xì)內(nèi)容,更多關(guān)于Python增量數(shù)據(jù)加載的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
TensorFlow2.0使用keras訓(xùn)練模型的實(shí)現(xiàn)
這篇文章主要介紹了TensorFlow2.0使用keras訓(xùn)練模型的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02Python數(shù)據(jù)分析Matplotlib?柱狀圖繪制
本文主要介紹了Python數(shù)據(jù)分析Matplotlib柱狀圖繪制,Matplotlib提供了bar()方法繪制柱狀圖,下面具體繪制介紹需要的小伙伴可以參考以一下2022-05-05python將字符串以u(píng)tf-8格式保存在txt文件中的方法
今天小編就為大家分享一篇python將字符串以u(píng)tf-8格式保存在txt文件中的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-10-10python中readline判斷文件讀取結(jié)束的方法
這篇文章主要介紹了python中readline判斷文件讀取結(jié)束的方法,實(shí)例形式詳細(xì)分析了Python中readline的用法,需要的朋友可以參考下2014-11-11python中用Scrapy實(shí)現(xiàn)定時(shí)爬蟲的實(shí)例講解
在本篇文章里小編給大家整理的是一篇關(guān)于python中用Scrapy實(shí)現(xiàn)定時(shí)爬蟲的實(shí)例講解內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。2021-01-01