python實(shí)現(xiàn)mysql的讀寫分離及負(fù)載均衡
Oracle數(shù)據(jù)庫有其公司開發(fā)的配套rac來實(shí)現(xiàn)負(fù)載均衡,目前已知的最大節(jié)點(diǎn)數(shù)能到128個(gè),但是其帶來的維護(hù)成本無疑是很高的,并且rac的穩(wěn)定性也并不是特別理想,尤其是節(jié)點(diǎn)很多的時(shí)候。
但是,相對(duì)mysql來說,rac的實(shí)用性要比mysql的配套集群軟件mysql-cluster要高很多。因?yàn)閺木W(wǎng)上了解到情況來看,很少公司在使用mysql-cluster,大多數(shù)企業(yè)都會(huì)選擇第三方代理軟件,例如MySQL Proxy、Mycat、haproxy等,但是這會(huì)引起另外一個(gè)問題:?jiǎn)吸c(diǎn)故障(包括mysql-cluster:管理節(jié)點(diǎn))。如果要解決這個(gè)問題,就需要給代理軟件搭建集群,在訪問量很大的情況下,代理軟件的雙機(jī)或三機(jī)集群會(huì)成為訪問瓶頸,繼續(xù)增加其節(jié)點(diǎn)數(shù),無疑會(huì)帶來各方面的成本。
那么,如何可以解決這個(gè)問題呢?
解決上述問題,最好的方式個(gè)人認(rèn)為應(yīng)該是在程序中實(shí)現(xiàn)。通過和其他mysql DBA的溝通,也證實(shí)了這個(gè)想法。但是由此帶來的疑問也就產(chǎn)生了:會(huì)不會(huì)增加開發(fā)成本?對(duì)現(xiàn)有的應(yīng)用系統(tǒng)做修改會(huì)不會(huì)改動(dòng)很大?會(huì)不會(huì)增加后期版本升級(jí)的難度?等等。
對(duì)于一個(gè)架構(gòu)設(shè)計(jì)良好的應(yīng)用系統(tǒng)可以很肯定的回答:不會(huì)。
那么怎么算一個(gè)架構(gòu)設(shè)計(jì)良好的應(yīng)用系統(tǒng)呢?
簡(jiǎn)單來說,就是分層合理、功能模塊之間耦合性底。以本人的經(jīng)驗(yàn)來說,系統(tǒng)設(shè)計(jì)基本上可以劃分為以下四層:
1. 實(shí)體層:主要定義一些實(shí)體類
2. 數(shù)據(jù)層:也可以叫SQL處理層。主要負(fù)責(zé)跟數(shù)據(jù)庫交互取得數(shù)據(jù)
3. 業(yè)務(wù)處:主要是根據(jù)業(yè)務(wù)流程及功能區(qū)分模塊(或者說定義不同的業(yè)務(wù)類)
4. 表現(xiàn)層:呈現(xiàn)最終結(jié)果給用戶
實(shí)現(xiàn)上述功能(mysql的讀寫分離及負(fù)載均衡),在這四個(gè)層次中,僅僅涉及到數(shù)據(jù)層。
嚴(yán)格來說,對(duì)于設(shè)計(jì)良好的系統(tǒng),只涉及到一個(gè)類的一個(gè)函數(shù):在數(shù)據(jù)層中,一般都會(huì)單獨(dú)劃分出一個(gè)連接類,并且這個(gè)連接類中會(huì)有一個(gè)連接函數(shù),需要改動(dòng)的就是這個(gè)函數(shù):在讀取連接字符串之前加一個(gè)功能函數(shù)返回需要的主機(jī)、ip、端口號(hào)等信息(沒有開發(fā)經(jīng)歷的同學(xué)可能理解這段話有點(diǎn)費(fèi)勁)。
流程圖如下:

代碼如下:
import mmap
import json
import random
import mysql.connector
import time
##公有變量
#dbinfos={
# "db0":{'host':'192.168.42.60','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,"database":"","role":"RW","weight":10,"status":1},
# "db1":{'host':'192.168.42.61','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,,"database":"":"R","weight":20,"status":1}
# }
dbinfos={}
mmap_file = None
mmap_time=None
##這個(gè)函數(shù)返回json格式的字符串,也是實(shí)現(xiàn)初始化數(shù)據(jù)庫信息的地方
##使用json格式是為了方便數(shù)據(jù)轉(zhuǎn)換,從字符串---》二進(jìn)制--》字符串---》字典
##如果采用其它方式共享dbinfos的方法,可以不用此方式
##配置庫的地址
def get_json_str1():
return json.dumps(dbinfos)
##讀取配置庫中的內(nèi)容
def get_json_str():
try:
global dbinfos
cnx = mysql.connector.connect(user='root', password='Abcd.1234',
host='192.168.42.60',
database='rwlb')
cursor = cnx.cursor()
cmdString="select * from rwlb"
cnt=-1
cursor.execute(cmdString)
for (host,user,pwd,my_user,my_pwd,role,weight,status,port,db ) in cursor:
cnt=cnt+1
dict_db={'host':host,'user':user,'pwd':pwd,'my_user':my_user,'my_pwd':my_pwd,"port":port,"database":db,"role":role,"weight":weight,"status":status}
dbinfos["db"+str(cnt)]=dict_db
cursor.close()
cnx.close()
return json.dumps(dbinfos)
except:
cursor.close()
cnx.close()
return ""
##判斷是否能正常連接到數(shù)據(jù)庫
def check_conn_host():
try:
cnx = mysql.connector.connect(user='root', password='Abcd.1234',
host='192.168.42.60',
database='rwlb')
cursor = cnx.cursor()
cmdString="select user()"
cnt=-1
cursor.execute(cmdString)
for user in cursor:
cnt=len(user)
cursor.close()
cnx.close()
return cnt
except :
return -1;
##select 屬于讀操作,其他屬于寫操作-----這里可以劃分的更詳細(xì),比如執(zhí)行存儲(chǔ)過程等
def analyze_sql_state(sql):
if "select" in sql:
return "R"
else:
return "W"
##讀取時(shí)間信息
def read_mmap_time():
global mmap_time,mmap_file
mmap_time.seek(0)
##初始時(shí)間
inittime=int(mmap_time.read().translate(None, b'\x00').decode())
##當(dāng)前時(shí)間
endtime=int(time.time())
##時(shí)間差
dis_time=endtime-inittime
print("dis_time:"+str(dis_time))
#重新讀取數(shù)據(jù)
if dis_time>10:
##當(dāng)配置庫正常的情況下才重新讀取數(shù)據(jù)
print(str(check_conn_host()))
if check_conn_host()>0:
print("read data again")
mmap_time.seek(0)
mmap_file.seek(0)
mmap_time.write(b'\x00')
mmap_file.write(b'\x00')
get_mmap_time()
get_mmap_info()
else:
print("can not connect to host")
#不重新讀取數(shù)據(jù)
else:
print("do not read data again")
##從內(nèi)存中讀取信息,
def read_mmap_info(sql):
read_mmap_time()
print("The data is in memory")
global mmap_file,dict_db
mmap_file.seek(0)
##把二進(jìn)制轉(zhuǎn)換為字符串
info_str=mmap_file.read().translate(None, b'\x00').decode()
#3把字符串轉(zhuǎn)成json格式,方便后面轉(zhuǎn)換為字典使用
infos=json.loads(info_str)
host_count=len(infos)
##權(quán)重列表
listw=[]
##總的權(quán)重?cái)?shù)量
wtotal=0
##數(shù)據(jù)庫角色
dbrole=analyze_sql_state(sql)
##根據(jù)權(quán)重初始化一個(gè)列表。這個(gè)是比較簡(jiǎn)單的算法,所以權(quán)重和控制在100以內(nèi)比較好----這里可以選擇其他比較好的算法
for i in range(host_count):
db="db"+str(i)
if dbrole in infos[db]["role"]:
if int(infos[db]["status"])==1:
w=infos[db]["weight"]
wtotal=wtotal+w
for j in range(w):
listw.append(i)
if wtotal >0:
##產(chǎn)生一個(gè)隨機(jī)數(shù)
rad=random.randint(0,wtotal-1)
##讀取隨機(jī)數(shù)所在的列表位置的數(shù)據(jù)
dbindex=listw[rad]
##確定選擇的是哪個(gè)db
db="db"+str(dbindex)
##為dict_db賦值,即選取的db的信息
dict_db=infos[db]
return dict_db
else :
return {}
##如果內(nèi)存中沒有時(shí)間信息,則向內(nèi)存紅寫入時(shí)間信息
def get_mmap_time():
global mmap_time
##第二個(gè)參數(shù)1024是設(shè)定的內(nèi)存大小,單位:字節(jié)。如果內(nèi)容較多,可以調(diào)大一點(diǎn)
mmap_time = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time')
##讀取有效比特?cái)?shù),不包括空比特
cnt=mmap_time.read_byte()
if cnt==0:
print("Load time to memory")
mmap_time = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time')
inittime=str(int(time.time()))
mmap_time.write(inittime.encode())
##如果內(nèi)存中沒有對(duì)應(yīng)信息,則向內(nèi)存中寫信息以供下次調(diào)用使用
def get_mmap_info():
global mmap_file
##第二個(gè)參數(shù)1024是設(shè)定的內(nèi)存大小,單位:字節(jié)。如果內(nèi)容較多,可以調(diào)大一點(diǎn)
mmap_file = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap')
##讀取有效比特?cái)?shù),不包括空比特
cnt=mmap_file.read_byte()
if cnt==0:
print("Load data to memory")
mmap_file = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap')
mmap_file.write(get_json_str().encode())
##測(cè)試函數(shù)
def test1():
get_mmap_time()
get_mmap_info()
for i in range(10):
sql="select * from db"
#sql="update t set col1=a where b=2"
dbrole=analyze_sql_state(sql)
dict_db=read_mmap_info(sql)
print(dict_db["host"])
def test2():
sql="select * from db"
res=analyze_sql_state(sql)
print("select:"+res)
sql="update t set col1=a where b=2"
res=analyze_sql_state(sql)
print("update:"+res)
sql="insert into t values(1,2)"
res=analyze_sql_state(sql)
print("insert:"+res)
sql="delete from t where b=2"
res=analyze_sql_state(sql)
print("delete:"+res)
##類似主函數(shù)
if __name__=="__main__":
test2()
測(cè)試結(jié)果:

從結(jié)果可以看出,只有第一次向內(nèi)存加載數(shù)據(jù),并且按照權(quán)重實(shí)現(xiàn)了負(fù)載均衡。
因?yàn)闇y(cè)試函數(shù)test1()寫的是固定語句,所以讀寫分離的結(jié)果沒有顯示出來。
另外:測(cè)試使用的數(shù)據(jù)庫表結(jié)構(gòu)及數(shù)據(jù):
desc rwlb; +---------+-------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +---------+-------------+------+-----+---------+-------+ | host | varchar(50) | YES | | NULL | | | user | varchar(50) | YES | | NULL | | | pwd | varchar(50) | YES | | NULL | | | my_user | varchar(50) | YES | | NULL | | | my_pwd | varchar(50) | YES | | NULL | | | role | varchar(10) | YES | | NULL | | | weight | int(11) | YES | | NULL | | | status | int(11) | YES | | NULL | | | port | int(11) | YES | | NULL | | | db | varchar(50) | YES | | NULL | | +---------+-------------+------+-----+---------+-------+ select * from rwlb; +---------------+------+----------+---------+-----------+------+--------+--------+------+------+ | host | user | pwd | my_user | my_pwd | role | weight | status | port | db | +---------------+------+----------+---------+-----------+------+--------+--------+------+------+ | 192.168.42.60 | root | Abcd1234 | root | Abcd.1234 | RW | 10 | 1 | NULL | NULL | | 192.168.42.61 | root | Abcd1234 | root | Abcd.1234 | R | 20 | 1 | NULL | NULL | +---------------+------+----------+---------+-----------+------+--------+--------+------+------+
總結(jié)
以上所述是小編給大家介紹的python實(shí)現(xiàn)mysql的讀寫分離及負(fù)載均衡,希望對(duì)大家有所幫助,如果大家有任何疑問歡迎給我留言,小編會(huì)及時(shí)回復(fù)大家的!
- Python實(shí)現(xiàn)自定義讀寫分離代碼實(shí)例
- Python暴力破解Mysql數(shù)據(jù)的示例
- python查詢MySQL將數(shù)據(jù)寫入Excel
- Python操控mysql批量插入數(shù)據(jù)的實(shí)現(xiàn)方法
- python對(duì) MySQL 數(shù)據(jù)庫進(jìn)行增刪改查的腳本
- Python連接mysql數(shù)據(jù)庫及簡(jiǎn)單增刪改查操作示例代碼
- Python連接Mysql進(jìn)行增刪改查的示例代碼
- Python操作MySQL數(shù)據(jù)庫的示例代碼
- Python web框架(django,flask)實(shí)現(xiàn)mysql數(shù)據(jù)庫讀寫分離的示例
相關(guān)文章
Python出現(xiàn)segfault錯(cuò)誤解決方法
這篇文章主要介紹了Python出現(xiàn)segfault錯(cuò)誤解決方法,分析了系統(tǒng)日志提示segfault錯(cuò)誤的原因與對(duì)應(yīng)的解決方法,需要的朋友可以參考下2016-04-04
Python備份目錄及目錄下的全部內(nèi)容的實(shí)現(xiàn)方法
下面小編就為大家?guī)硪黄狿ython備份目錄及目錄下的全部內(nèi)容的實(shí)現(xiàn)方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-06-06
python獲取局域網(wǎng)占帶寬最大3個(gè)ip的方法
這篇文章主要介紹了python獲取局域網(wǎng)占帶寬最大3個(gè)ip的方法,涉及Python解析URL參數(shù)的相關(guān)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-07-07
Python數(shù)據(jù)可視化Pyecharts庫實(shí)現(xiàn)桑葚圖效果
這篇文章主要介紹了Python數(shù)據(jù)可視化如何使用Pyecharts庫來實(shí)現(xiàn)桑葚圖效果圖,文中給出實(shí)現(xiàn)的示例代碼,有需要的朋友可以借鑒參考想,希望能夠有所幫助2021-09-09
使用Python?Cupy模塊加速大規(guī)模數(shù)值計(jì)算實(shí)例深究
Cupy是一個(gè)基于NumPy的庫,專門設(shè)計(jì)用于在GPU上進(jìn)行高性能計(jì)算,它提供了與NumPy相似的API,因此用戶可以很容易地將現(xiàn)有的NumPy代碼遷移到Cupy上,從而充分利用GPU的并行計(jì)算能力2023-12-12
python實(shí)現(xiàn)某考試系統(tǒng)生成word試卷
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)某考試系統(tǒng)生成word試卷,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-05-05
python3.x實(shí)現(xiàn)發(fā)送郵件功能
這篇文章主要為大家詳細(xì)介紹了python3.x實(shí)現(xiàn)發(fā)送郵件功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-05-05
pytorch中的nn.ZeroPad2d()零填充函數(shù)實(shí)例詳解
這篇文章主要介紹了pytorch中的nn.ZeroPad2d()零填充函數(shù)實(shí)例詳解,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04

