Python利用py-redis實現(xiàn)分布式鎖
前記
隨著業(yè)務(wù)的增長,后端技術(shù)架構(gòu)會慢慢的從單體服務(wù)轉(zhuǎn)向多服務(wù)或者微服務(wù)的分布式架構(gòu),此時語言級別的鎖無法管理所有資源的競爭,只能采用分布式鎖。而分布式鎖的主體思想雖然與語言級別的鎖類似,但還需要考慮到一些網(wǎng)絡(luò)因素,使其變得復(fù)雜。
1.為何需要分布式鎖
舉一個栗子,比如在做聊天服務(wù)時,需要統(tǒng)計一個聊天會話的在線人數(shù),它的簡單示例代碼如下:
class Counter(object): def __init__(self, namespace: str) -> None: self.namespace: int = namespace self.count: int = 0 def login(self) -> None: self.count += 1 def logout(self) -> None: self.count -= 1
這份代碼比較簡單,它是每個namespace
的全局計數(shù)器實現(xiàn),每有一個用戶成功登陸就會調(diào)用login
方法使計數(shù)器+1,而每次退出就會調(diào)用logout
方法使計數(shù)器-1。這個計數(shù)器看起來實現(xiàn)了需求,但是它也符合了最簡單的線程不安全模型,意味著在多線程/進程等環(huán)境下無法得出正確的結(jié)果。
這個操作之所以線程不安全,是因為self.count+=1
的這類操作不是原子性的,它在運行之前會被編譯為self.count = self.count + 1
,這是一個先更改再賦值的操作,實際在執(zhí)行的時候CPU會分為下面三個步驟去執(zhí)行:
1.將count的值從內(nèi)存讀到CPU對應(yīng)的寄存器上。
2.CPU操作寄存器上的count并進行+1操作。
3.把寄存器里的指寫回內(nèi)存中。
這樣在多線程/進程的場景下可能出現(xiàn)了CPU核心1和CPU核心2同時從內(nèi)存讀到對應(yīng)值0,并放到了自己的寄存器上面,然后再對它進行+1操作,最后又把值(此時已經(jīng)為1)寫回到內(nèi)存中,導(dǎo)致self.count的結(jié)果變?yōu)?而不是真正想要的值2了。
1.self.count-=1
同理。
2.線程是操作系統(tǒng)的最小調(diào)度單位,在多核心系統(tǒng)時,會出現(xiàn)多核心同時調(diào)用線程去進行資源爭奪。
3.Python 3.11做了優(yōu)化,可能進行了幾百次加減處理,結(jié)果也是對的。
這就是多線程環(huán)境對同一個資源競爭從而產(chǎn)生數(shù)據(jù)安全性的就問題,許多語言為了解決這個問題引入了鎖機制,并使用鎖機制保護了多線程環(huán)境下對同一個資源競爭的數(shù)據(jù)安全性。 開發(fā)者可以非常方便的通過鎖機制給一些代碼塊加上鎖從而使這些操作變成了原子性,比如對示例代碼的Counter
進行了如下修改:
from threading import Lock class Counter(object): def __init__(self, namespace: str) -> None: self.namespace: int = namespace self.count: int = 0 self._lock: Lock = Lock() def login(self) -> None: with lock(): self.count += 1 def logout(self) -> None: with lock(): self.count -= 1
代碼引入了Lock
對象,并把它套在了資源沖突的self.count+=1
和self.count-=1
上面,使得線程只有持有鎖的時候才能對self.count
進行操作,而拿不到鎖的線程則需要等待到獲取鎖才能繼續(xù)操作,這樣一來就不會產(chǎn)生多個線程同時操作一份數(shù)據(jù)而導(dǎo)致了結(jié)果不一致的問題。
不過現(xiàn)在的大多數(shù)服務(wù)不再是單體應(yīng)用,更多的是以多服務(wù),微服務(wù)的形式存在,這時上述的問題就會從不同的線程/進程爭奪一個資源變?yōu)椴煌臋C器上的服務(wù)對同一個資源進行競爭。 而語言級別的鎖只存在于進程中,無法跨進程,只能管理自己進程里面的資源競爭,無法解決跨服務(wù)資源競爭的問題,只能使用一個帶有鎖機制的中間人來協(xié)調(diào)各個服務(wù)的資源競爭的問題,而這個中間人就是分布式鎖。
2.分布式鎖的實現(xiàn)
為了解決多服務(wù),微服務(wù)的資源競爭這問題,分布式鎖誕生了,分布式鎖與語言級別的鎖一樣都是在某塊空間打上標記,然后再通過打標記是否成功來判斷是否獲取鎖,與語言級別鎖唯一不同的是分布式鎖需要通過網(wǎng)絡(luò)進行通信,而網(wǎng)絡(luò)是復(fù)雜的,這也就導(dǎo)致分布式鎖的實現(xiàn)變得復(fù)雜。
為了降低分布式鎖實現(xiàn)的復(fù)雜度,大多數(shù)分布式鎖的方案都會基于擁有存儲媒介和防止資源沖突的數(shù)據(jù)庫進行開發(fā),比如關(guān)系數(shù)據(jù)庫MySQL
,KV數(shù)據(jù)庫Redis
和Etcd
等,它們都有一套邏輯來確保數(shù)據(jù)的一致性和可用性,同時也有一套完善的傳輸協(xié)議,這樣就可以不去考慮網(wǎng)絡(luò)傳輸?shù)膯栴}和數(shù)據(jù)沖突與丟失的問題,只專注分布式鎖功能的實現(xiàn)了。 不過實際業(yè)務(wù)中需要分布式鎖有較高的性能,所以大多數(shù)會分布式鎖都會基于KV數(shù)據(jù)庫開發(fā),目前常用的分布式鎖使用的KV數(shù)據(jù)庫是Redis
。
2.1.基于Redis的分布式鎖
Redis
本質(zhì)上就是使用一個單進程對一塊內(nèi)存進行讀寫(只考慮基本的讀寫),且每個操作都是以一個協(xié)程去操作內(nèi)存,這保證了客戶端提交的每個操作都是擁有原子性的。同時Redis
還支持使用Lua腳本去編寫復(fù)雜的操作,這兩個特性組合起來意味著可以通過Redis
實現(xiàn)出一個高性能且功能復(fù)雜的分布式鎖。
在Python
的Redis
客戶端庫py-redis
中,提供了一個簡單的Redis
鎖封裝,開發(fā)者通過這個封裝可以很方便的使用基于Redis
的分布式鎖。如以上面計數(shù)器示例代碼進行修改后的代碼:
import asyncio from redis.asyncio import Redis class Counter(object): def __init__(self, namespace: str, redis: Redis) -> None: self._redis = redis self.namespace: int = namespace self.count: int = 0 async def login(self) -> None: async with self._redis.lock("demo"): self.count += 1 async def logout(self) -> None: async with self._redis.lock("demo"): self.count -= 1 async def main(): counter = Counter("demo", Redis()) await counter.login() await counter.logout() asyncio.run(main())
在這個示例代碼中可以發(fā)現(xiàn),鎖的使用方法很簡單,只要通過redis.lock
方法就可以獲取到分布式鎖的實例,而這個鎖實例的使用方法與thread.Lock
類似,不用大改代碼。
不過lock方法只是提供了一個簡單的調(diào)用,實際上它返回的是符合如下函數(shù)簽名的對象:
class Lock: async def __aenter__(self): pass async def __aexit__(self, exc_type, exc_value, traceback): pass async def acquire( self, blocking: Optional[bool] = None, blocking_timeout: Optional[float] = None, token: Optional[Union[str, bytes]] = None, ): pass async def locked(self) -> bool: pass async def owned(self) -> bool: pass def release(self) -> Awaitable[None]: pass
這個對象擁有多個方法,首先是針對async with
語法提供了__aenter__
和__aexit__
方法,它們分別在進入和離開async with
語法塊時被調(diào)用,它們的源碼如下:
class Lock: ... async def __aenter__(self): if await self.acquire(): return self raise LockError("Unable to acquire lock within the time specified") async def __aexit__(self, exc_type, exc_value, traceback): await self.release()
通過源碼可以看到它們的實現(xiàn)很簡單,__aenter__
只調(diào)用acquire
方法,如果返回True
就允許進入代碼塊,如果返回False
則拋出獲取鎖錯誤,而__aexit__
則更簡單,它只是調(diào)用release
方法執(zhí)行分布式鎖的釋放。
接下來就是acquire
和release
這兩個分別代表獲取鎖和釋放鎖的核心方法了,其中acquire
的源碼如下:
class Lock: ... async def acquire( self, blocking: Optional[bool] = None, blocking_timeout: Optional[float] = None, token: Optional[Union[str, bytes]] = None, ): ################################ # 第一部分,這里主要是初始化各種參數(shù) # ################################ sleep = self.sleep if token is None: token = uuid.uuid1().hex.encode() else: try: encoder = self.redis.connection_pool.get_encoder() except AttributeError: # Cluster encoder = self.redis.get_encoder() token = encoder.encode(token) if blocking is None: blocking = self.blocking if blocking_timeout is None: blocking_timeout = self.blocking_timeout stop_trying_at = None if blocking_timeout is not None: stop_trying_at = asyncio.get_running_loop().time() + blocking_timeout ##################################################### # 第二部分,這里通過一個循環(huán)去爭奪鎖,在爭奪成功時會返回True # ##################################################### while True: if await self.do_acquire(token): # 爭奪成功,把Token存入當前線程存儲中,并返回True self.local.token = token return True # 爭奪失敗且設(shè)置blocking為False,則返回獲取鎖失敗 if not blocking: return False # 在設(shè)置的時間內(nèi)爭奪失敗,返回獲取鎖失敗 next_try_at = asyncio.get_running_loop().time() + sleep if stop_trying_at is not None and next_try_at > stop_trying_at: return False # 每次爭奪鎖的間隔,這是在初始化Lock對象時指定的, # 建議根據(jù)業(yè)務(wù)設(shè)置比較大的sleep時間,防止獲取鎖失敗時頻繁的與Redis通信。 await asyncio.sleep(sleep) async def do_acquire(self, token: Union[str, bytes]) -> bool: if self.timeout: # convert to milliseconds timeout = int(self.timeout * 1000) else: timeout = None # 真正通過Redis設(shè)置鎖的方法, if await self.redis.set(self.name, token, nx=True, px=timeout): return True return False
通過源碼可以發(fā)現(xiàn)acquire
方法主要是做三件事:
1.初始化各種參數(shù),其中token是采用uuid1生成的,該方法雖然會泄露主機信息,但它是能確保每個客戶端生成的ID唯一且速度很快的方法,同時Redis
一般都在內(nèi)網(wǎng)運行的,只要能確保內(nèi)網(wǎng)安全,一般也沒啥事。
2.通過do_acquire
方法去獲取鎖,當獲取成功就會返回True
,獲取失敗且設(shè)置不阻塞就返回False
,而獲取失敗且設(shè)置阻塞就會通過循環(huán)去競爭鎖。
3.獲取鎖的重點(do_acquire方法),這里通過Redis
的set <key> <value> nx xxx ps xxx
的方法向Redis設(shè)置了一個K-V,并返回設(shè)置是否成功。這個命令是原子性的執(zhí)行三個操作,從而保證獲取鎖的操作要么成功要么失敗。 命令中的nx是確保Key不存在時且該命令能正常寫入才返回True
,而ps是設(shè)置了Key的過期時間,防止客戶端假死或宕機而導(dǎo)致整個鎖無法被釋放(避免死鎖的一種技術(shù)手段)。
了解完獲取鎖acquire
方法的執(zhí)行原理后再看釋放鎖release
方法的源碼,由于release
操作要多個操作,所以采用了Lua腳本,如下:
class Lock: ... # 初始化時會初始化lua腳本--`LUA_RELEASE_SCRIPT`, # 并把Redis返回的ID存放到lua_release中,減少后續(xù)調(diào)用的網(wǎng)絡(luò)傳輸 lua_release = None # lua腳本的邏輯是通過name獲取token,如果token不存在或者不是實例產(chǎn)生的則返回0, # 否則代表成功獲取到鎖,然后會執(zhí)行刪除鎖并返回1告訴客戶端說鎖釋放成功。 LUA_RELEASE_SCRIPT = """ local token = redis.call('get', KEYS[1]) if not token or token ~= ARGV[1] then return 0 end redis.call('del', KEYS[1]) return 1 """ ... def release(self) -> Awaitable[None]: # 如果沒有從本地線程存儲獲取到Token,則代表還沒有獲取到鎖,拋出對應(yīng)的異常 expected_token = self.local.token if expected_token is None: raise LockError("Cannot release an unlocked lock") # 先清楚本地線程存放的Token,再釋放鎖 self.local.token = None return self.do_release(expected_token) async def do_release(self, expected_token: bytes) -> None: # 調(diào)用lua腳本并根據(jù)響應(yīng)結(jié)果判斷釋放鎖是否成功。 if not bool( await self.lua_release( keys=[self.name], args=[expected_token], client=self.redis ) ): raise LockNotOwnedError("Cannot release a lock that's no longer owned")
release
也比較簡單,它的整個邏輯是先通過本地線程存儲中獲取token,如果該值為空,則證明有可能沒有執(zhí)行acquire
獲取鎖,需要拋出鎖已經(jīng)被釋放的異常,如果不為空則置空,再通過lua
腳本去釋放鎖,在釋放鎖時會校驗token的值防止釋放一個不是自己產(chǎn)生的鎖。
此外,Lock
中的token存放在本地線程存儲的原因是為了防止多線程調(diào)用同個Loc實例導(dǎo)致的問題,如下:
1.當A線程以timeout為30秒獲取了鎖。
2.B線程獲取了鎖,但是由于A已經(jīng)獲取了鎖了,所以通過自旋進行等待。
3.A線程執(zhí)行的邏輯超過30秒還未執(zhí)行完成,而鎖已經(jīng)過期而被釋放。
4.B線程發(fā)現(xiàn)鎖已經(jīng)被釋放,開始獲取鎖并執(zhí)行,最終在A線程執(zhí)行完成之前運行完畢,并執(zhí)行了釋放鎖的操作。
5.A線程執(zhí)行了釋放鎖的操作,發(fā)現(xiàn)鎖已經(jīng)被釋放了。
可以看到這個方法只是防止動作沒執(zhí)行完,但鎖卻過期的一種情況,它并不能真正的解決問題,如果要真正的解決這個問題,則需要引入WatchDog機制。
2.2.WatchDog實現(xiàn)
py-redis
的Lock
對象支持Timeout
參數(shù),Timeout參數(shù)的作用是標記鎖在被獲取的n秒后被自動釋放,這樣加鎖的程序即使崩潰了也能確保鎖會在一定的時間后被釋放,避免了死鎖問題。 不過需要注意的是,Timeout參數(shù)就不能設(shè)置太長,如果設(shè)置太長,且程序在獲取鎖后崩潰而無法釋放鎖時,其他等待獲取鎖的程序會花時間進行無效的等待。 然而Timeout參數(shù)設(shè)置得太短也不行,如果程序的執(zhí)行時間超過了Timeout設(shè)置的時間,那么就會出現(xiàn)程序還在運行著,但是鎖卻提前釋放了,最終就會導(dǎo)致多個程序爭奪同一個資源,也就是鎖機制無效了。
由于Timeout參數(shù)設(shè)置太短太長都有問題,這意味著Timeout參數(shù)并不能完美的解決問題,這時就需要一個更好的機制--WatchDog來完善Timeout參數(shù)的不足。 WatchDog機制會在程序獲取鎖之后啟動,在釋放鎖之前關(guān)閉,也就是跟隨程序獲取鎖的行為一起運行,然后它會在程序執(zhí)行期間按照一定的時間間隔幫鎖自動續(xù)約(也就是增加鎖的過期時間),從而防止業(yè)務(wù)代碼沒執(zhí)行完,鎖卻過期的情況。
py-redis
庫的Lock
追求的是簡單的原則,它沒有提供一套完整的WatchDog實現(xiàn),但是提供了一個續(xù)約機制,代碼如下:
class Lock: # 效果與lua_release一致 lua_extend = None # 續(xù)約的Lua代碼主要邏輯為: # 1.判斷token是否一致,不一致則不是自己獲取的鎖,返回0。 # 2.判讀鎖的過期時間,如果小于0則鎖不存在,返回0 # 3.如果當前鎖的過期時間還未變?yōu)?,則為當前的鎖續(xù)約 # 續(xù)約的方式有兩種,一種是在原來的基礎(chǔ)增加一個固定值,另一種是把過期時間設(shè)置為固定值。 # 最后再返回1,代表續(xù)約成功 LUA_EXTEND_SCRIPT = """ local token = redis.call('get', KEYS[1]) if not token or token ~= ARGV[1] then return 0 end local expiration = redis.call('pttl', KEYS[1]) if not expiration then expiration = 0 end if expiration < 0 then return 0 end local newttl = ARGV[2] if ARGV[3] == "0" then newttl = ARGV[2] + expiration end redis.call('pexpire', KEYS[1], newttl) return 1 """ def extend( self, additional_time: float, replace_ttl: bool = False ) -> Awaitable[bool]: # 判斷是否持有鎖,沒有鎖不能續(xù)約 if self.local.token is None: raise LockError("Cannot extend an unlocked lock") # 沒設(shè)置超時不能續(xù)約 if self.timeout is None: raise LockError("Cannot extend a lock with no timeout") return self.do_extend(additional_time, replace_ttl) async def do_extend(self, additional_time, replace_ttl) -> bool: # 通過Lua腳本執(zhí)行續(xù)約的邏輯 additional_time = int(additional_time * 1000) if not bool( await self.lua_extend( keys=[self.name], args=[self.local.token, additional_time, replace_ttl and "1" or "0"], client=self.redis, ) ): raise LockNotOwnedError("Cannot extend a lock that's no longer owned") return True
它的邏輯非常簡單,就是先校驗當前是不是自己持有鎖以及鎖是否還在,當所有條件都滿足時才續(xù)約,不過續(xù)約有兩種方案,一種是把key的過期時間設(shè)置為指定的時間,另一種是在剩余的過期時間基礎(chǔ)上再添加指定的時間。了解了py-redis
提供的續(xù)約機制后,我們還需要考慮WatchDog剩余的邏輯,一個是什么時候開啟/關(guān)閉WatchDog,另一個是如何制訂WatchDog的執(zhí)行周期。
前面說到WatchDog會伴隨著加鎖一直運行著,那么意味著WatchDog會在加鎖成功后開始運行,并在釋放鎖之前停止,在根據(jù)之前針對Lock的代碼分析可以判斷,WatchDog需要在acquire
后開始運行,在do_release
之前停止運行。 至于WatchDog的間隔時間,大部分框架都是采用用戶定義Timeout時間的1/3,這是考慮到網(wǎng)絡(luò)通信的不可靠以及防止發(fā)送太多請求而權(quán)衡的結(jié)果。
現(xiàn)在WatchDog的原理分析完畢,可以著手實現(xiàn)WatchDog了,WatchDog最終的代碼實現(xiàn)如下:
import asyncio from typing import Optional, Union from redis.asyncio.lock import Lock class MyLock(Lock): # 存儲循環(huán)的協(xié)程對象 _watch_dog: Optional[asyncio.Task] async def _watch(self) -> None: """ 這是一個一直在循環(huán)的方法,它每次循環(huán)都會執(zhí)行續(xù)約,然后休眠超時時間的1/3,然后再執(zhí)行下一個循環(huán)。 """ while True: await self.extend(self.timeout) await asyncio.sleep(self.timeout / 3) def _cancel_watch_dog(self) -> None: """ 取消正在運行WatchDog的協(xié)程 """ _old_watch_dog: Optional[asyncio.Future] = getattr(self, "_watch_dog", None) if _old_watch_dog and not _old_watch_dog.cancelled(): _old_watch_dog.cancel() async def acquire( self, blocking: Optional[bool] = None, blocking_timeout: Optional[float] = None, token: Optional[Union[str, bytes]] = None, ) -> bool: result = await super().acquire(blocking, blocking_timeout, token) if result: # 獲取鎖成功就開始啟用WatchDog機制 # 它先把舊的WatchDog取消,再通過`asyncio.create_task`方法使用一個單獨的協(xié)程執(zhí)行WatchDog self._cancel_watch_dog() self._watch_dog = asyncio.create_task(self._watch()) return result async def do_release(self, expected_token: bytes) -> None: # 在釋放之前取消WatchDod機制 # 即使取消TachDog成功,但是鎖釋放失敗也沒關(guān)系,因為有超時機制兜底 self._cancel_watch_dog() return await super().do_release(expected_token) def __del__(self) -> None: try: self._cancel_watch_dog() except Exception: pass
接著編寫一個Demo文件來驗證編寫的WatchDog是否有效,Demo文件代碼如下:
import asyncio import time from typing import Optional, Union from redis.asyncio.lock import Lock from redis.asyncio import Redis # WatchDog實現(xiàn)省略... async def main(): _redis = Redis() s_t = time.time() # pyredis的`lock`方法通過`lock_class`參數(shù)支持自定義Lock類 async with _redis.lock("demo", lock_class=MyLock, timeout=3): print("lock") await asyncio.sleep(5) print("ok", time.time() - s_t) if __name__ == "__main__": asyncio.run(main())
然后在終端直接運行,會看到終端有如下輸出:
lock
ok 5.005310297012329
通過輸出可以發(fā)現(xiàn),雖然鎖設(shè)定的timeout參數(shù)為3秒,但是被鎖住的代碼能夠正常的執(zhí)行了5秒,也就代表WatchDog的實現(xiàn)是成功的。
需要注意的是,協(xié)程的創(chuàng)建銷毀成本很低,所以使用一個協(xié)程執(zhí)行一個WatchDog,如果是在線程模型下,則不能使用一個單獨的線程來執(zhí)行WatchDog,這樣會導(dǎo)致頻繁的開啟和關(guān)閉線程,建議使用一個線程池來管理所有鎖的WatchDog的運行。不過WatchDog是每隔一段時間運行的,所以也可以使用時間輪+單獨的Worker來執(zhí)行WatchDog。
3.總結(jié)
到目前為止,實現(xiàn)的分布式鎖基本完備,也沒有什么缺陷,同時它的性能也是非常高的。不過由于py-redis
的鎖實現(xiàn)比較簡單,導(dǎo)致拓展性比較低,無法兼容部分場景,同時py-redis
庫并沒有打算開發(fā)出包含更多功能的分布鎖實現(xiàn)。這意味著開發(fā)者只能根據(jù)他提供的Lock對象進行重新開發(fā),并通過redis.lock
中的lock_class
參數(shù)傳遞重新開發(fā)后的鎖實現(xiàn)。
以上就是Python利用py-redis實現(xiàn)分布式鎖的詳細內(nèi)容,更多關(guān)于Python分布式鎖的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
python Selenium爬取內(nèi)容并存儲至MySQL數(shù)據(jù)庫的實現(xiàn)代碼
這篇文章主要介紹了python Selenium爬取內(nèi)容并存儲至MySQL數(shù)據(jù)庫的實現(xiàn)代碼,需要的朋友可以參考下2017-03-03Python中的數(shù)據(jù)對象持久化存儲模塊pickle的使用示例
這篇文章主要介紹了Python中的數(shù)據(jù)對象持久化存儲模塊pickle的使用示例,重點講解了pickle中模塊中對象持久化和文件讀取的相關(guān)方法,需要的朋友可以參考下2016-03-03pytorch常用函數(shù)之torch.randn()解讀
這篇文章主要介紹了pytorch常用函數(shù)之torch.randn()解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-02-02Python腳本實現(xiàn)DNSPod DNS動態(tài)解析域名
這篇文章主要介紹了Python腳本實現(xiàn)DNSPod DNS動態(tài)解析域名,本文直接給出實現(xiàn)代碼,需要的朋友可以參考下2015-02-02