python實(shí)現(xiàn)mysql的讀寫(xiě)分離及負(fù)載均衡
Oracle數(shù)據(jù)庫(kù)有其公司開(kāi)發(fā)的配套rac來(lái)實(shí)現(xiàn)負(fù)載均衡,目前已知的最大節(jié)點(diǎn)數(shù)能到128個(gè),但是其帶來(lái)的維護(hù)成本無(wú)疑是很高的,并且rac的穩(wěn)定性也并不是特別理想,尤其是節(jié)點(diǎn)很多的時(shí)候。
但是,相對(duì)mysql來(lái)說(shuō),rac的實(shí)用性要比mysql的配套集群軟件mysql-cluster要高很多。因?yàn)閺木W(wǎng)上了解到情況來(lái)看,很少公司在使用mysql-cluster,大多數(shù)企業(yè)都會(huì)選擇第三方代理軟件,例如MySQL Proxy、Mycat、haproxy等,但是這會(huì)引起另外一個(gè)問(wèn)題:?jiǎn)吸c(diǎn)故障(包括mysql-cluster:管理節(jié)點(diǎn))。如果要解決這個(gè)問(wèn)題,就需要給代理軟件搭建集群,在訪問(wèn)量很大的情況下,代理軟件的雙機(jī)或三機(jī)集群會(huì)成為訪問(wèn)瓶頸,繼續(xù)增加其節(jié)點(diǎn)數(shù),無(wú)疑會(huì)帶來(lái)各方面的成本。
那么,如何可以解決這個(gè)問(wèn)題呢?
解決上述問(wèn)題,最好的方式個(gè)人認(rèn)為應(yīng)該是在程序中實(shí)現(xiàn)。通過(guò)和其他mysql DBA的溝通,也證實(shí)了這個(gè)想法。但是由此帶來(lái)的疑問(wèn)也就產(chǎn)生了:會(huì)不會(huì)增加開(kāi)發(fā)成本?對(duì)現(xiàn)有的應(yīng)用系統(tǒng)做修改會(huì)不會(huì)改動(dòng)很大?會(huì)不會(huì)增加后期版本升級(jí)的難度?等等。
對(duì)于一個(gè)架構(gòu)設(shè)計(jì)良好的應(yīng)用系統(tǒng)可以很肯定的回答:不會(huì)。
那么怎么算一個(gè)架構(gòu)設(shè)計(jì)良好的應(yīng)用系統(tǒng)呢?
簡(jiǎn)單來(lái)說(shuō),就是分層合理、功能模塊之間耦合性底。以本人的經(jīng)驗(yàn)來(lái)說(shuō),系統(tǒng)設(shè)計(jì)基本上可以劃分為以下四層:
1. 實(shí)體層:主要定義一些實(shí)體類(lèi)
2. 數(shù)據(jù)層:也可以叫SQL處理層。主要負(fù)責(zé)跟數(shù)據(jù)庫(kù)交互取得數(shù)據(jù)
3. 業(yè)務(wù)處:主要是根據(jù)業(yè)務(wù)流程及功能區(qū)分模塊(或者說(shuō)定義不同的業(yè)務(wù)類(lèi))
4. 表現(xiàn)層:呈現(xiàn)最終結(jié)果給用戶(hù)
實(shí)現(xiàn)上述功能(mysql的讀寫(xiě)分離及負(fù)載均衡),在這四個(gè)層次中,僅僅涉及到數(shù)據(jù)層。
嚴(yán)格來(lái)說(shuō),對(duì)于設(shè)計(jì)良好的系統(tǒng),只涉及到一個(gè)類(lèi)的一個(gè)函數(shù):在數(shù)據(jù)層中,一般都會(huì)單獨(dú)劃分出一個(gè)連接類(lèi),并且這個(gè)連接類(lèi)中會(huì)有一個(gè)連接函數(shù),需要改動(dòng)的就是這個(gè)函數(shù):在讀取連接字符串之前加一個(gè)功能函數(shù)返回需要的主機(jī)、ip、端口號(hào)等信息(沒(méi)有開(kāi)發(fā)經(jīng)歷的同學(xué)可能理解這段話(huà)有點(diǎn)費(fèi)勁)。
流程圖如下:
代碼如下:
import mmap import json import random import mysql.connector import time ##公有變量 #dbinfos={ # "db0":{'host':'192.168.42.60','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,"database":"","role":"RW","weight":10,"status":1}, # "db1":{'host':'192.168.42.61','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,,"database":"":"R","weight":20,"status":1} # } dbinfos={} mmap_file = None mmap_time=None ##這個(gè)函數(shù)返回json格式的字符串,也是實(shí)現(xiàn)初始化數(shù)據(jù)庫(kù)信息的地方 ##使用json格式是為了方便數(shù)據(jù)轉(zhuǎn)換,從字符串---》二進(jìn)制--》字符串---》字典 ##如果采用其它方式共享dbinfos的方法,可以不用此方式 ##配置庫(kù)的地址 def get_json_str1(): return json.dumps(dbinfos) ##讀取配置庫(kù)中的內(nèi)容 def get_json_str(): try: global dbinfos cnx = mysql.connector.connect(user='root', password='Abcd.1234', host='192.168.42.60', database='rwlb') cursor = cnx.cursor() cmdString="select * from rwlb" cnt=-1 cursor.execute(cmdString) for (host,user,pwd,my_user,my_pwd,role,weight,status,port,db ) in cursor: cnt=cnt+1 dict_db={'host':host,'user':user,'pwd':pwd,'my_user':my_user,'my_pwd':my_pwd,"port":port,"database":db,"role":role,"weight":weight,"status":status} dbinfos["db"+str(cnt)]=dict_db cursor.close() cnx.close() return json.dumps(dbinfos) except: cursor.close() cnx.close() return "" ##判斷是否能正常連接到數(shù)據(jù)庫(kù) def check_conn_host(): try: cnx = mysql.connector.connect(user='root', password='Abcd.1234', host='192.168.42.60', database='rwlb') cursor = cnx.cursor() cmdString="select user()" cnt=-1 cursor.execute(cmdString) for user in cursor: cnt=len(user) cursor.close() cnx.close() return cnt except : return -1; ##select 屬于讀操作,其他屬于寫(xiě)操作-----這里可以劃分的更詳細(xì),比如執(zhí)行存儲(chǔ)過(guò)程等 def analyze_sql_state(sql): if "select" in sql: return "R" else: return "W" ##讀取時(shí)間信息 def read_mmap_time(): global mmap_time,mmap_file mmap_time.seek(0) ##初始時(shí)間 inittime=int(mmap_time.read().translate(None, b'\x00').decode()) ##當(dāng)前時(shí)間 endtime=int(time.time()) ##時(shí)間差 dis_time=endtime-inittime print("dis_time:"+str(dis_time)) #重新讀取數(shù)據(jù) if dis_time>10: ##當(dāng)配置庫(kù)正常的情況下才重新讀取數(shù)據(jù) print(str(check_conn_host())) if check_conn_host()>0: print("read data again") mmap_time.seek(0) mmap_file.seek(0) mmap_time.write(b'\x00') mmap_file.write(b'\x00') get_mmap_time() get_mmap_info() else: print("can not connect to host") #不重新讀取數(shù)據(jù) else: print("do not read data again") ##從內(nèi)存中讀取信息, def read_mmap_info(sql): read_mmap_time() print("The data is in memory") global mmap_file,dict_db mmap_file.seek(0) ##把二進(jìn)制轉(zhuǎn)換為字符串 info_str=mmap_file.read().translate(None, b'\x00').decode() #3把字符串轉(zhuǎn)成json格式,方便后面轉(zhuǎn)換為字典使用 infos=json.loads(info_str) host_count=len(infos) ##權(quán)重列表 listw=[] ##總的權(quán)重?cái)?shù)量 wtotal=0 ##數(shù)據(jù)庫(kù)角色 dbrole=analyze_sql_state(sql) ##根據(jù)權(quán)重初始化一個(gè)列表。這個(gè)是比較簡(jiǎn)單的算法,所以權(quán)重和控制在100以?xún)?nèi)比較好----這里可以選擇其他比較好的算法 for i in range(host_count): db="db"+str(i) if dbrole in infos[db]["role"]: if int(infos[db]["status"])==1: w=infos[db]["weight"] wtotal=wtotal+w for j in range(w): listw.append(i) if wtotal >0: ##產(chǎn)生一個(gè)隨機(jī)數(shù) rad=random.randint(0,wtotal-1) ##讀取隨機(jī)數(shù)所在的列表位置的數(shù)據(jù) dbindex=listw[rad] ##確定選擇的是哪個(gè)db db="db"+str(dbindex) ##為dict_db賦值,即選取的db的信息 dict_db=infos[db] return dict_db else : return {} ##如果內(nèi)存中沒(méi)有時(shí)間信息,則向內(nèi)存紅寫(xiě)入時(shí)間信息 def get_mmap_time(): global mmap_time ##第二個(gè)參數(shù)1024是設(shè)定的內(nèi)存大小,單位:字節(jié)。如果內(nèi)容較多,可以調(diào)大一點(diǎn) mmap_time = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time') ##讀取有效比特?cái)?shù),不包括空比特 cnt=mmap_time.read_byte() if cnt==0: print("Load time to memory") mmap_time = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time') inittime=str(int(time.time())) mmap_time.write(inittime.encode()) ##如果內(nèi)存中沒(méi)有對(duì)應(yīng)信息,則向內(nèi)存中寫(xiě)信息以供下次調(diào)用使用 def get_mmap_info(): global mmap_file ##第二個(gè)參數(shù)1024是設(shè)定的內(nèi)存大小,單位:字節(jié)。如果內(nèi)容較多,可以調(diào)大一點(diǎn) mmap_file = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap') ##讀取有效比特?cái)?shù),不包括空比特 cnt=mmap_file.read_byte() if cnt==0: print("Load data to memory") mmap_file = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap') mmap_file.write(get_json_str().encode()) ##測(cè)試函數(shù) def test1(): get_mmap_time() get_mmap_info() for i in range(10): sql="select * from db" #sql="update t set col1=a where b=2" dbrole=analyze_sql_state(sql) dict_db=read_mmap_info(sql) print(dict_db["host"]) def test2(): sql="select * from db" res=analyze_sql_state(sql) print("select:"+res) sql="update t set col1=a where b=2" res=analyze_sql_state(sql) print("update:"+res) sql="insert into t values(1,2)" res=analyze_sql_state(sql) print("insert:"+res) sql="delete from t where b=2" res=analyze_sql_state(sql) print("delete:"+res) ##類(lèi)似主函數(shù) if __name__=="__main__": test2()
測(cè)試結(jié)果:
從結(jié)果可以看出,只有第一次向內(nèi)存加載數(shù)據(jù),并且按照權(quán)重實(shí)現(xiàn)了負(fù)載均衡。
因?yàn)闇y(cè)試函數(shù)test1()寫(xiě)的是固定語(yǔ)句,所以讀寫(xiě)分離的結(jié)果沒(méi)有顯示出來(lái)。
另外:測(cè)試使用的數(shù)據(jù)庫(kù)表結(jié)構(gòu)及數(shù)據(jù):
desc rwlb; +---------+-------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +---------+-------------+------+-----+---------+-------+ | host | varchar(50) | YES | | NULL | | | user | varchar(50) | YES | | NULL | | | pwd | varchar(50) | YES | | NULL | | | my_user | varchar(50) | YES | | NULL | | | my_pwd | varchar(50) | YES | | NULL | | | role | varchar(10) | YES | | NULL | | | weight | int(11) | YES | | NULL | | | status | int(11) | YES | | NULL | | | port | int(11) | YES | | NULL | | | db | varchar(50) | YES | | NULL | | +---------+-------------+------+-----+---------+-------+ select * from rwlb; +---------------+------+----------+---------+-----------+------+--------+--------+------+------+ | host | user | pwd | my_user | my_pwd | role | weight | status | port | db | +---------------+------+----------+---------+-----------+------+--------+--------+------+------+ | 192.168.42.60 | root | Abcd1234 | root | Abcd.1234 | RW | 10 | 1 | NULL | NULL | | 192.168.42.61 | root | Abcd1234 | root | Abcd.1234 | R | 20 | 1 | NULL | NULL | +---------------+------+----------+---------+-----------+------+--------+--------+------+------+
總結(jié)
以上所述是小編給大家介紹的python實(shí)現(xiàn)mysql的讀寫(xiě)分離及負(fù)載均衡,希望對(duì)大家有所幫助,如果大家有任何疑問(wèn)歡迎給我留言,小編會(huì)及時(shí)回復(fù)大家的!
- Python實(shí)現(xiàn)自定義讀寫(xiě)分離代碼實(shí)例
- Python暴力破解Mysql數(shù)據(jù)的示例
- python查詢(xún)MySQL將數(shù)據(jù)寫(xiě)入Excel
- Python操控mysql批量插入數(shù)據(jù)的實(shí)現(xiàn)方法
- python對(duì) MySQL 數(shù)據(jù)庫(kù)進(jìn)行增刪改查的腳本
- Python連接mysql數(shù)據(jù)庫(kù)及簡(jiǎn)單增刪改查操作示例代碼
- Python連接Mysql進(jìn)行增刪改查的示例代碼
- Python操作MySQL數(shù)據(jù)庫(kù)的示例代碼
- Python web框架(django,flask)實(shí)現(xiàn)mysql數(shù)據(jù)庫(kù)讀寫(xiě)分離的示例
相關(guān)文章
Python出現(xiàn)segfault錯(cuò)誤解決方法
這篇文章主要介紹了Python出現(xiàn)segfault錯(cuò)誤解決方法,分析了系統(tǒng)日志提示segfault錯(cuò)誤的原因與對(duì)應(yīng)的解決方法,需要的朋友可以參考下2016-04-04Python備份目錄及目錄下的全部?jī)?nèi)容的實(shí)現(xiàn)方法
下面小編就為大家?guī)?lái)一篇Python備份目錄及目錄下的全部?jī)?nèi)容的實(shí)現(xiàn)方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-06-06python獲取局域網(wǎng)占帶寬最大3個(gè)ip的方法
這篇文章主要介紹了python獲取局域網(wǎng)占帶寬最大3個(gè)ip的方法,涉及Python解析URL參數(shù)的相關(guān)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-07-07Python數(shù)據(jù)可視化Pyecharts庫(kù)實(shí)現(xiàn)桑葚圖效果
這篇文章主要介紹了Python數(shù)據(jù)可視化如何使用Pyecharts庫(kù)來(lái)實(shí)現(xiàn)桑葚圖效果圖,文中給出實(shí)現(xiàn)的示例代碼,有需要的朋友可以借鑒參考想,希望能夠有所幫助2021-09-09關(guān)于如何使用python的gradio庫(kù)
這篇文章主要介紹了關(guān)于如何使用python的gradio庫(kù),Gradio是一個(gè)功能豐富的Python庫(kù),可以讓您輕松創(chuàng)建和共享自己的交互式機(jī)器學(xué)習(xí)和深度學(xué)習(xí)模型,需要的朋友可以參考下2023-04-04使用Python?Cupy模塊加速大規(guī)模數(shù)值計(jì)算實(shí)例深究
Cupy是一個(gè)基于NumPy的庫(kù),專(zhuān)門(mén)設(shè)計(jì)用于在GPU上進(jìn)行高性能計(jì)算,它提供了與NumPy相似的API,因此用戶(hù)可以很容易地將現(xiàn)有的NumPy代碼遷移到Cupy上,從而充分利用GPU的并行計(jì)算能力2023-12-12python實(shí)現(xiàn)某考試系統(tǒng)生成word試卷
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)某考試系統(tǒng)生成word試卷,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-05-05python3.x實(shí)現(xiàn)發(fā)送郵件功能
這篇文章主要為大家詳細(xì)介紹了python3.x實(shí)現(xiàn)發(fā)送郵件功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-05-05pytorch中的nn.ZeroPad2d()零填充函數(shù)實(shí)例詳解
這篇文章主要介紹了pytorch中的nn.ZeroPad2d()零填充函數(shù)實(shí)例詳解,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04