Python實(shí)現(xiàn)實(shí)時(shí)增量數(shù)據(jù)加載工具的解決方案
本次主要分享結(jié)合單例模式實(shí)際應(yīng)用案例:實(shí)現(xiàn)實(shí)時(shí)增量數(shù)據(jù)加載工具的解決方案。最關(guān)鍵的是實(shí)現(xiàn)一個(gè)可進(jìn)行添加、修改、刪除等操作的增量ID記錄表。
單例模式:提供全局訪問點(diǎn),確保類有且只有一個(gè)特定類型的對象。通常用于以下場景:日志記錄或數(shù)據(jù)庫操作等,避免對用一資源請求沖突。
創(chuàng)建增量ID記錄表
import?sqlite3
import?datetime
import?pymssql
import?pandas?as?pd
import?time
pd.set_option('expand_frame_repr',?False)導(dǎo)入所需模塊
?#?創(chuàng)建數(shù)據(jù)表 database_path?=?r'.\Database\ID_Record.db' from?sqlite3?import?connect with?connect(database_path)?as?conn: ????conn.execute( ????????'CREATE?TABLE?IF?NOT?EXISTS?Incremental_data_max_id_record(id?INTEGER?PRIMARY?KEY?AUTOINCREMENT,F_SDaqID_MAX?TEXT,record_date?datetime)')
增量最新記錄ID-F_SDaqID_MAX數(shù)據(jù)庫存儲
#數(shù)據(jù)保存到本地txt
def?text_save(filename,?record):#filename為寫入txt文件的路徑,record為要寫入F_SDaqID_MAX、record_date數(shù)據(jù)列表.
????file?=?open(filename,'a')?追加方式
????#?file?=?open(filename,?'w')??#覆蓋方式
????for?i?in?range(len(record)):
????????s?=?str(record[i]).replace('[','').replace(']','')
????????s?=?s.replace("'",'').replace(',','')?+'\n'???#去除單引號,逗號,每行末尾追加換行符
????????file.write(s)
????file.close()
增量最新記錄ID-F_SDaqID_MAX臨時(shí)文件存儲
增量ID記錄提供了兩種實(shí)現(xiàn)方案 ,一個(gè)是數(shù)據(jù)持久化存儲模式,另一個(gè)是臨時(shí)文件存儲模式。數(shù)據(jù)持久化模式顧名思義,也就是說在創(chuàng)建對象的時(shí)候,能將操作關(guān)鍵信息如增量ID-F_SDaqID_MAX記錄下來,這種flag記錄映射是常選擇的設(shè)計(jì)模式。
數(shù)據(jù)庫連接類
實(shí)現(xiàn)實(shí)時(shí)增量數(shù)據(jù)獲取需要實(shí)現(xiàn)兩個(gè)數(shù)據(jù)庫連接類:增量數(shù)據(jù)ID存儲類和增量目標(biāo)數(shù)據(jù)源類。這里利用單例模式實(shí)現(xiàn)數(shù)據(jù)庫操作類,將增量服務(wù)記錄信息按照順序存儲到數(shù)據(jù)庫或特定的日志文件中,以維護(hù)數(shù)據(jù)的一致性。
1、增量數(shù)據(jù)ID存儲sqlite連接類代碼
class?Database_sqlite(metaclass=MetaSingleton):
????database_path?=?r'.\Database\energy_rc_configure.db'
????connection?=?None
????def?connect(self):
????????if?self.connection?is?None:
????????????self.connection?=?sqlite3.connect(self.database_path,check_same_thread=False,isolation_level=None)
????????????self.cursorobj?=??self.connection.cursor()
????????return?self.cursorobj,self.connection
????#?插入最大記錄
????@staticmethod
????def?Insert_Max_ID_Record(f1,?f2):
????????cursor?=?Database_sqlite().connect()
????????print(cursor)
????????sql?=?f"""insert?into?Incremental_data_max_id_record(F_SDaqID_MAX,record_date)?values("{f1}","{f2}")"""
????????cursor[0].execute(sql)
????????#?sql?=?"insert??into?Incremental_data_max_id_record(F_SDaqID_MAX,record_date)?values(?,?)"
????????#?cursor[0].execute(sql,(f"{f1}",f"{f2}"))
????????cursor[1].commit()
????????print("插入成功!")
????????#?cursor[0].close()
????????return?
????#?取出增量數(shù)據(jù)庫中最新一次ID記錄
????@staticmethod
????def?View_Max_ID_Records():
????????cursor?=?Database_sqlite().connect()
????????sql?=?"select?max(F_SDaqID_MAX)?from?Incremental_data_max_id_record"
????????cursor[0].execute(sql)
????????results?=?cursor[0].fetchone()[0]
????????#?#單例模式不用關(guān)閉數(shù)據(jù)庫連接
????????#?cursor[0].close()
????????print("最新記錄ID",?results)
????????return?results
????#刪除數(shù)據(jù)記錄ID
????@staticmethod
????def?Del_Max_ID_Records():
????????cursor?=?Database_sqlite().connect()
????????sql?=?"delete?from?Incremental_data_max_id_record?where?record_date?=?(select?MAX(record_date)?from?Incremental_data_max_id_record)"
????????cursor[0].execute(sql)
????????#?results?=?cursor[0].fetchone()[0]
????????#?#?cursor[0].close()
????????cursor[1].commit()
????????print("刪除成功")
????????return2、增量數(shù)據(jù)源sqlserver連接類代碼
class?Database_sqlserver(metaclass=MetaSingleton):
????"""
????#實(shí)時(shí)數(shù)據(jù)庫
????"""
????connection?=?None
????#?def?connect(self):
????def?__init__(self):
????????if?self.connection?is?None:
????????????self.connection?=?pymssql.connect(host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8")
????????????if?self.connection:
????????????????print("連接成功!")
????????????#?打開數(shù)據(jù)庫連接
????????????self.cursorobj?=?self.connection.cursor()
????????#?return?self.cursorobj,?self.connection
????#?獲取數(shù)據(jù)源中最大ID
????@staticmethod
????def?get_F_SDaqID_MAX():
????????#?cursor_insert?=?Database_sqlserver().connect()
????????cursor_insert?=?Database_sqlserver().cursorobj
????????sql_MAXID?=?"""select?MAX(F_SDaqID)?from?T_DaqDataForEnergy"""
????????cursor_insert.execute(sql_MAXID)??#?執(zhí)行查詢語句,選擇表中所有數(shù)據(jù)
????????F_SDaqID_MAX?=?cursor_insert.fetchone()[0]??#?獲取記錄
????????print("最大ID值:{0}".format(F_SDaqID_MAX))
????????return?F_SDaqID_MAX
????#?提取增量數(shù)據(jù)
????@staticmethod
????def?get_incremental_data(incremental_Max_ID):
????????#?開始獲取增量數(shù)據(jù)
????????sql_incremental_data?=?"""select?F_ID,F_Datetime,F_Data?from?T_DaqDataForEnergy??where?F_ID?>?{0}""".format(
????????????incremental_Max_ID)
????????#?cursor_find?=?Database_sqlserver().connect()
????????cursor_find?=?Database_sqlserver().cursorobj
????????cursor_find.execute(sql_incremental_data)??#?執(zhí)行查詢語句,選擇表中所有數(shù)據(jù)
????????Target_data_source?=?cursor_find.fetchall()??#?獲取所有數(shù)據(jù)記錄
????????#?cursor_find.close()
????????cursor_find.close()
????????df?=?pd.DataFrame(
????????????Target_data_source,
????????????columns=[
????????????????"F_ID",
????????????????"F_Datetime",
????????????????"F_Data"])
????????print("提取數(shù)據(jù)",?df)
????????return?df數(shù)據(jù)資源應(yīng)用服務(wù)設(shè)計(jì)主要考慮數(shù)據(jù)庫操作的一致性和優(yōu)化數(shù)據(jù)庫的各種操作,提高內(nèi)存或CPU利用率。
實(shí)現(xiàn)多種讀取和寫入操作,客戶端操作調(diào)用API,執(zhí)行相應(yīng)的DB操作。
注:
1、使用metaclass實(shí)現(xiàn)創(chuàng)建具有單例特征的類
Database_sqlserver(metaclass=MetaSingleton)
Database_sqlite(metaclass=MetaSingleton)
使用class定義新類時(shí),數(shù)據(jù)庫類Database_sqlserver由MetaSingleton裝飾后即指定了metaclass,那么MetaSingleton的特殊方法__call__方法將自動(dòng)執(zhí)行。
class?MetaSingleton(type):
????_instances={}
????def?__call__(cls,?*args,?**kwargs):
????????if?cls?not?in?cls._instances:
????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs)
????????return?cls._instances[cls]
以上代碼基于元類的單例實(shí)現(xiàn),當(dāng)客戶端對數(shù)據(jù)庫執(zhí)行某些操作時(shí),會(huì)多次實(shí)例化數(shù)據(jù)庫類,但是只創(chuàng)建一個(gè)對象,所以對數(shù)據(jù)庫的調(diào)用是同步的。
2、多線程使用同一數(shù)據(jù)庫連接資源需采取一定同步機(jī)制
如果沒采用同步機(jī)制,可能出現(xiàn)一些意料之外的情況
1)with cls.lock加鎖
class?MetaSingleton(type):
????_instances={}
????lock?=?threading.Lock()
????def?__call__(cls,?*args,?**kwargs):
????????with?cls.lock:
????????????if?cls?not?in?cls._instances:
????????????????time.sleep(0.05)??#模擬耗時(shí)
????????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs)
????????????return?cls._instances[cls]
鎖的創(chuàng)建和釋放需要消耗資源,上面代碼每次創(chuàng)建都必須獲得鎖。
3、如果我們開發(fā)的程序非單個(gè)應(yīng)用,而是集群化的,即多個(gè)客戶端共享單個(gè)數(shù)據(jù)庫,導(dǎo)致數(shù)據(jù)庫操作無法同步,而數(shù)據(jù)庫連接池是更好的選擇。大大節(jié)省了內(nèi)存,提高了服務(wù)器地服務(wù)效率,能夠支持更多的客戶服務(wù)。
數(shù)據(jù)庫連接池的解決方案是在應(yīng)用程序啟動(dòng)時(shí)建立足夠的數(shù)據(jù)庫連接,并講這些連接組成一個(gè)連接池,由應(yīng)用程序動(dòng)態(tài)地對池中的連接進(jìn)行申請、使用和釋放。對于多于連接池中連接數(shù)的并發(fā)請求,應(yīng)該在請求隊(duì)列中排隊(duì)等待。
增量數(shù)據(jù)服務(wù)客戶端
增量處理策略:第一次加載先判斷增量數(shù)據(jù)表中是否存在最新記錄,若有直接加載;否則,記錄一下最大/最新的數(shù)據(jù)記錄ID或時(shí)間點(diǎn),保存到一個(gè)增量數(shù)據(jù)庫或記錄文件中。
從第二次加載開始只加載最大/最新的ID或時(shí)間點(diǎn)以后的數(shù)據(jù)。當(dāng)加載過程全部成功完成之后并同步更新增量數(shù)據(jù)庫或記錄文件,更新這次數(shù)據(jù)記錄的最后記錄ID或時(shí)間點(diǎn)。
一般這類數(shù)據(jù)記錄表有自增長列,那么也可以使用自增長列來實(shí)現(xiàn)這個(gè)標(biāo)識特征。比如本次我用到數(shù)據(jù)表增長列F_ID。
class?IncrementalRecordServer:
????_servers?=?[]
????_instance?=?None
????def?__new__(cls,?*args,?**kwargs):
????????if?not?IncrementalRecordServer._instance:
????????????#?IncrementalRecordServer._instance?=?super().__new__(cls)
????????????IncrementalRecordServer._instance?=?super(IncrementalRecordServer,cls).__new__(cls)
????????return?IncrementalRecordServer._instance
????def?__init__(self,changeServersID=None):
????????"""
????????變量初始化過程
????????"""
????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX()
????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S')
????????self.changeServersID?=?changeServersID
????#?回調(diào)更新本地記錄,清空記錄替換,臨時(shí)記錄
????def?record(func):
????????def?Server_record(self):
????????????v?=?func(self)
????????????text_save(filename=r"F:\AutoOps_platform\Database\Server_record.txt",record=IncrementalRecordServer._servers)
????????????print("保存成功")
????????????return?v
????????return?Server_record
????#增加服務(wù)記錄
????@record
????def?addServer(self):
????????self._servers.append([int(self.F_SDaqID_MAX),self.record_date])
????????print("添加記錄")
????????Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX,?f2=self.record_date)
????#修改服務(wù)記錄
????@record
????def?changeServers(self):
????????#?self._servers.pop()
????????#?此處傳入手動(dòng)修改的記錄ID
????????self._servers.append([self.changeServersID,self.record_date])
????????#先刪除再插入實(shí)現(xiàn)修改
????????Database_sqlite.Del_Max_ID_Records()
????????Database_sqlite.Insert_Max_ID_Record(f1=self.changeServersID,?f2=self.record_date)
????????print("更新記錄")
????#刪除服務(wù)記錄
????@record
????def?popServers(self):
????????#?self._servers.pop()
????????print("刪除記錄")
????????Database_sqlite.Del_Max_ID_Records()
????#?最新服務(wù)記錄
????def?getServers(self):
????????#?print(self._servers[-1])
????????Max_ID_Records?=?Database_sqlite.View_Max_ID_Records()
????????print("查看記錄",Max_ID_Records)
????????return?Max_ID_Records
????#提取數(shù)據(jù)
????def?Incremental_data_client(self):
????????"""
????????#?提取數(shù)據(jù)(增量數(shù)據(jù)MAXID獲取,并提取增量數(shù)據(jù))
????????"""
????????#?實(shí)時(shí)數(shù)據(jù)庫
????????#?第一次加載先判斷是否存在最新記錄
????????if?self.getServers()?==?None:
????????????#?插入增量數(shù)據(jù)庫ID
????????????self.addServer()
????????????#?提取增量數(shù)據(jù)
????????????data?=?Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX)
????????????return?data
????????#?獲取增量數(shù)據(jù)庫中已有的最新最大ID記錄
????????incremental_Max_ID?=?self.getServers()
????????#添加記錄
????????self.addServer()
????????#?提取增量數(shù)據(jù)
????????Target_data_source?=?Database_sqlserver.get_incremental_data(incremental_Max_ID)
????????return?Target_data_source
優(yōu)化策略:
1、延遲加載方式
以上增量記錄服務(wù)類IncrementalRecordServer通過覆蓋__new__方法來控制對象的創(chuàng)建,我們在創(chuàng)建對象的時(shí)候會(huì)先檢查對象是否存在。也可以通過懶加載的方式實(shí)現(xiàn),節(jié)約資源優(yōu)化如下。
class?IncrementalRecordServer:
????_servers?=?[]
????_instance?=?None
????def?__init__(self,changeServersID=None):
????????"""
????????變量初始化過程
????????"""
????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX()
????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S')
????????self.changeServersID?=?changeServersID
????????if?not?IncrementalRecordServer._instance:
????????????print("__init__對象創(chuàng)建")
????????else:
????????????print("對象已經(jīng)存在:",IncrementalRecordServer._instance)
????????????self.getInstance()
????@classmethod
????def?getInstance(cls):
????????if?not?cls._instance:
????????????cls._instance?=?IncrementalRecordServer()
????????return?cls._instance懶漢式實(shí)例化能夠確保實(shí)際需要時(shí)才創(chuàng)建對象,實(shí)例化a= IncrementalRecordServer()時(shí),調(diào)用初始化__init__方法,但是沒有新的對象創(chuàng)建。懶漢式這種方式加載類對象,也稱為延遲加載方式。
2、單例模式能有效利用空間資源,每次利用同一空間資源。
不同操作對象的內(nèi)存地址相同,且不同對象初始化將上一個(gè)對象初始化變量覆蓋,確保最新記錄實(shí)時(shí)更新。表面上以上代碼實(shí)現(xiàn)了單例模式?jīng)]問題,但多線程并發(fā)情況下,存在線程安全問題,可能同時(shí)創(chuàng)建不同的對象空間。考慮到線程安全,也可以進(jìn)一步加鎖處理.
3、適用范圍及注意事項(xiàng)
本次代碼適用于部署生產(chǎn)指定時(shí)間點(diǎn)運(yùn)行之后產(chǎn)出的增量數(shù)據(jù),長時(shí)間未啟用再啟動(dòng)需要清空歷史記錄即增量數(shù)據(jù)庫或文件ID需清空,一般實(shí)時(shí)數(shù)據(jù)增量實(shí)現(xiàn)一次加載沒有什么問題,所以這一點(diǎn)也不用很關(guān)注(文件方式代碼可自行完善);當(dāng)加載歷史數(shù)據(jù)庫或定時(shí)間隔產(chǎn)生數(shù)據(jù)量過大時(shí),需要進(jìn)一步修改代碼,需要判斷數(shù)據(jù)規(guī)模,指定起始節(jié)點(diǎn)及加載數(shù)據(jù)量,綜合因素考慮,下次分享一下億級數(shù)據(jù)量提取方案。
4、進(jìn)一步了解Python垃圾回收機(jī)制;并發(fā)情況下,通過優(yōu)化線程池來管理資源。
最后可以添加一個(gè)函數(shù)來釋放資源
def?__del__(self): ????class_name?=?self.__class__.__name__ ????print(class_name,"銷毀")
del obj 調(diào)用__del__() 銷毀對象,釋放其空間;只有Python 對象在不再引用對象時(shí)被釋放。當(dāng)程序中有其它變量引用該實(shí)例對象時(shí),即便手動(dòng)調(diào)用 __del__() 方法,該方法也不會(huì)立即執(zhí)行。這和 Python 的垃圾回收機(jī)制的實(shí)現(xiàn)有關(guān)。
結(jié)果測試
if?__name__?==?'__main__':
????for?i?in?range(6):
????????hc1?=?IncrementalRecordServer()
????????hc1.addServer()
????????print("Record_ID",hc1._servers[i])
????????#?del?hc1
????????time.sleep(60)
????#Server2-客戶端client
????#?最新服務(wù)記錄
????hc2?=?IncrementalRecordServer()
????hc2.getServers()
????#查看增量數(shù)據(jù)
????hc2.Incremental_data_client()
插入記錄
模擬每1分鐘插入一條記錄,向增量數(shù)據(jù)庫插入7條


if?__name__?==?'__main__': ????#?Server3-客戶端client ????#?手動(dòng)添加增量起始ID記錄 ????hc3?=?IncrementalRecordServer(changeServersID='346449980') ????hc3.changeServers()


if?__name__?==?'__main__': ????#刪除ID ????hc3?=?IncrementalRecordServer(changeServersID='346449980') ????#?hc3.changeServers() ????hc3.popServers()

以上就是Python實(shí)現(xiàn)實(shí)時(shí)增量數(shù)據(jù)加載工具的解決方案的詳細(xì)內(nèi)容,更多關(guān)于Python增量數(shù)據(jù)加載的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
TensorFlow2.0使用keras訓(xùn)練模型的實(shí)現(xiàn)
這篇文章主要介紹了TensorFlow2.0使用keras訓(xùn)練模型的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02
Python數(shù)據(jù)分析Matplotlib?柱狀圖繪制
本文主要介紹了Python數(shù)據(jù)分析Matplotlib柱狀圖繪制,Matplotlib提供了bar()方法繪制柱狀圖,下面具體繪制介紹需要的小伙伴可以參考以一下2022-05-05
python將字符串以utf-8格式保存在txt文件中的方法
今天小編就為大家分享一篇python將字符串以utf-8格式保存在txt文件中的方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-10-10
python中readline判斷文件讀取結(jié)束的方法
這篇文章主要介紹了python中readline判斷文件讀取結(jié)束的方法,實(shí)例形式詳細(xì)分析了Python中readline的用法,需要的朋友可以參考下2014-11-11
python中用Scrapy實(shí)現(xiàn)定時(shí)爬蟲的實(shí)例講解
在本篇文章里小編給大家整理的是一篇關(guān)于python中用Scrapy實(shí)現(xiàn)定時(shí)爬蟲的實(shí)例講解內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。2021-01-01

