Python?Asyncio?庫之同步原語常用函數詳解
前記
Asyncio的同步原語可以簡化我們編寫資源競爭的代碼和規(guī)避資源競爭導致的Bug的出現(xiàn)。 但是由于協(xié)程的特性,在大部分業(yè)務代碼中并不需要去考慮資源競爭的出現(xiàn),導致Asyncio同步原語被使用的頻率比較低,但是如果想基于Asyncio編寫框架則需要學習同步原語的使用。
0.基礎
同步原語都是適用于某些條件下對某個資源的爭奪,在代碼中大部分的資源都是屬于一個代碼塊,而Python對于代碼塊的管理的最佳實踐是使用with語法,with語法實際上是調用了一個類中的__enter__和__exit__方法,比如下面的代碼:
class Demo(object):
def __enter__(self):
return
def __exit__(self, exc_type, exc_val, exc_tb):
return
with Demo():
pass
代碼中的Demo類實現(xiàn)了__enter__和__exit__方法后,就可以被with語法調用,其中__enter__方法是進入代碼塊執(zhí)行的邏輯,__enxi__方法是用于退出代碼塊(包括異常退出)的邏輯。這兩個方法符合同步原語中對資源的爭奪和釋放,但是__enter__和__exit__兩個方法都是不支持await調用的,為了解決這個問題,Python引入了async with語法。
async with語法和with語法類似 ,我們只要編寫一個擁有__aenter__和__aexit__方法的類,那么這個類就支持asyncio with語法了,如下:
import asyncio
class Demo(object):
async def __aenter__(self):
return
async def __aexit__(self, exc_type, exc_val, exc_tb):
return
async def main():
async with Demo():
pass
asyncio.run(main())
其中,類中的__aenter__方法是進入代碼塊時執(zhí)行的方法,__aexit__是退出代碼塊時執(zhí)行的方法。
有了async with語法的加持,asyncio的同步原語使用起來會比較方便,所以asyncio中對資源爭奪的同步原語都會繼承于_ContextManagerMixin類:
class _ContextManagerMixin:
async def __aenter__(self):
await self.acquire()
# We have no use for the "as ..." clause in the with
# statement for locks.
return None
async def __aexit__(self, exc_type, exc, tb):
self.release()
并實現(xiàn)了acquire和release方法,供__aenter__和__aexit__方法調用,同時我們在使用同步原語的時候盡量用到async with語法防止忘記釋放資源的占用。
1.Lock
由于協(xié)程的特性,在編寫協(xié)程代碼時基本上可以不考慮到鎖的情況,但在一些情況下我們還是需要用到鎖,并通過鎖來維護并發(fā)時的數據安全性,如下例子:
import asyncio
share_data = {}
async def sub(i):
# 賦上相同的key和value
share_data[i] = i
await asyncio.sleep(0)
print(i, share_data[i] == i)
async def sub_add(i):
# 賦上的value值是原來的+1
share_data[i] = i + 1
await asyncio.sleep(0)
print(i, share_data[i] == i + 1)
async def main():
# 創(chuàng)建并發(fā)任務
task_list = []
for i in range(10):
task_list.append(sub(i))
task_list.append(sub_add(i))
# 并發(fā)執(zhí)行
await asyncio.gather(*task_list)
if __name__ == "__main__":
asyncio.run(main())
在這個例子中程序會并發(fā)的執(zhí)行sub和sub_add函數,他們是由不同的asyncio.Task驅動的,這意味著會出現(xiàn)這樣一個場景。 當負責執(zhí)行sub(1)函數的asyncio.Task在執(zhí)行完share_data[i]=i后就執(zhí)行await asyncio.sleep(0)從而主動讓出控制權并交還給事件循環(huán),等待事件循環(huán)的下一次調度。 不過事件循環(huán)不會空下來,而是馬上安排下一個asyncio.Task執(zhí)行,此時會先執(zhí)行到sub_add(1)函數的share_data[i] = i + 1,并同樣的在執(zhí)行到await asyncio.sleep(0)的時候把控制權交會給事件循環(huán)。 這時候控制權會由事件循環(huán)轉移給原先執(zhí)行sub(1)函數的asyncio.Task,獲取到控制權l(xiāng)后sub(1)函數的邏輯會繼續(xù)走,但由于share_data[i]的數據已經被share_data[i] = i + 1修改了,導致最后執(zhí)行print時,share_data[i]的數據已經變?yōu)榕K數據,而不是原本想要的數據了。
為了解決這個問題,我們可以使用asyncio.Lock來解決資源的沖突,如下:
import asyncio
share_data = {}
# 存放對應資源的鎖
lock_dict = {}
async def sub(i):
async with lock_dict[i]: # <-- 通過async with語句來控制鎖的粒度
share_data[i] = i
await asyncio.sleep(0)
print(i, share_data[i] == i)
async def sub_add(i):
async with lock_dict[i]:
share_data[i] = i + 1
await asyncio.sleep(0)
print(i, share_data[i] == i + 1)
async def main():
task_list = []
for i in range(10):
lock_dict[i] = asyncio.Lock()
task_list.append(sub(i))
task_list.append(sub_add(i))
await asyncio.gather(*task_list)
if __name__ == "__main__":
asyncio.run(main())
從例子可以看到asyncio.Lock的使用方法跟多線程的Lock差不多,通過async with語法來獲取和釋放鎖,它的原理也很簡單,主要做了如下幾件事:
- 1.確保某一協(xié)程獲取鎖后的執(zhí)行期間,別的協(xié)程在獲取鎖時需要一直等待,直到執(zhí)行完成并釋放鎖。
- 2.當有協(xié)程持有鎖的時候,其他協(xié)程必須等待,直到持有鎖的協(xié)程釋放了鎖。
- 2.確保所有協(xié)程能夠按照獲取的順序獲取到鎖。
這意味著需要有一個數據結構來維護當前持有鎖的協(xié)程的和下一個獲取鎖協(xié)程的關系,同時也需要一個隊列來維護多個獲取鎖的協(xié)程的喚醒順序。
asyncio.Lock跟其它asyncio功能的用法一樣,使用asyncio.Future來同步協(xié)程之間鎖的狀態(tài),使用deque維護協(xié)程間的喚醒順序,源碼如下:
class Lockl(_ContextManagerMixin, mixins._LoopBoundMixin):
def __init__(self):
self._waiters = None
self._locked = False
def locked(self):
return self._locked
async def acquire(self):
if (not self._locked and (self._waiters is None or all(w.cancelled() for w in self._waiters))):
# 目前沒有其他協(xié)程持有鎖,當前協(xié)程可以運行
self._locked = True
return True
if self._waiters is None:
self._waiters = collections.deque()
# 創(chuàng)建屬于自己的容器,并推送到`_waiters`這個雙端隊列中
fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
try:
await fut
finally:
# 如果執(zhí)行完畢,需要把自己移除,防止被`wake_up_first`調用
self._waiters.remove(fut)
except exceptions.CancelledError:
# 如果是等待的過程中被取消了,需要喚醒下一個調用`acquire`
if not self._locked:
self._wake_up_first()
raise
# 持有鎖
self._locked = True
return True
def release(self):
if self._locked:
# 釋放鎖
self._locked = False
self._wake_up_first()
else:
raise RuntimeError('Lock is not acquired.')
def _wake_up_first(self):
if not self._waiters:
return
# 獲取還處于鎖狀態(tài)協(xié)程對應的容器
try:
# 獲取下一個等待獲取鎖的waiter
fut = next(iter(self._waiters))
except StopIteration:
return
# 設置容器為True,這樣對應協(xié)程就可以繼續(xù)運行了。
if not fut.done():
fut.set_result(True)
通過源碼可以知道,鎖主要提供了獲取和釋放的功能,對于獲取鎖需要區(qū)分兩種情況:
- 1:當有協(xié)程想要獲取鎖時會先判斷鎖是否被持有,如果當前鎖沒有被持有就直接返回,使協(xié)程能夠正常運行。
- 2:如果協(xié)程獲取鎖時,鎖發(fā)現(xiàn)自己已經被其他協(xié)程持有則創(chuàng)建一個屬于當前協(xié)程的
asyncio.Future,用來同步狀態(tài),并添加到deque中。
而對于釋放鎖就比較簡單,只要獲取deque中的第一個asyncio.Future,并通過fut.set_result(True)進行標記,使asyncio.Future從peding狀態(tài)變?yōu)?code>done狀態(tài),這樣一來,持有該asyncio.Future的協(xié)程就能繼續(xù)運行,從而持有鎖。
不過需要注意源碼中acquire方法中對CancelledError異常進行捕獲,再喚醒下一個鎖,這是為了解決acquire方法執(zhí)行異常導致鎖一直被卡住的場景,通常情況下這能解決大部分的問題,但是如果遇到錯誤的封裝時,我們需要親自處理異常,并執(zhí)行鎖的喚醒。比如在通過繼承asyncio.Lock編寫一個超時鎖時,最簡單的實現(xiàn)代碼如下:
import asyncio
class TimeoutLock(asyncio.Lock):
def __init__(self, timeout, *, loop=None):
self.timeout = timeout
super().__init__(loop=loop)
async def acquire(self) -> bool:
return await asyncio.wait_for(super().acquire(), self.timeout)
這份代碼非常簡單,他只需要在__init__方法傳入timeout參數,并在acuiqre方法中通過wait_for來實現(xiàn)鎖超時即可,現(xiàn)在假設wait_for方法是一個無法傳遞協(xié)程cancel的方法,且編寫的acquire沒有進行捕獲異常再釋放鎖的操作,當異常發(fā)生的時候會導致鎖一直被卡住。 為了解決這個問題,只需要對TimeoutLock的acquire方法添加異常捕獲,并在捕獲到異常時釋放鎖即可,代碼如下:
class TimeoutLock(asyncio.Lock):
def __init__(self, timeout, *, loop=None):
self.timeout = timeout
super().__init__(loop=loop)
async def acquire(self) -> bool:
try:
return await asyncio.wait_for(super().acquire(), self.timeout)
except Exception:
self._wake_up_first()
raise
2.Event
asyncio.Event也是一個簡單的同步原語,但它跟asyncio.Lock不一樣,asyncio.Lock是確保每個資源只能被一個協(xié)程操作,而asyncio.Event是確保某個資源何時可以被協(xié)程操作,可以認為asyncio.Lock鎖的是資源,asyncio.Event鎖的是協(xié)程,所以asyncio.Event并不需要acquire來鎖資源,release釋放資源,所以也用不到async with語法。
asyncio.Event的簡單使用示例如下:
import asyncio
async def sub(event: asyncio.Event) -> None:
await event.wait()
print("I'm Done")
async def main() -> None:
event = asyncio.Event()
for _ in range(10):
asyncio.create_task(sub(event))
await asyncio.sleep(1)
event.set()
asyncio.run(main())
在這個例子中會先創(chuàng)建10個asyncio.Task來執(zhí)行sub函數,但是所有sub函數都會在event.wait處等待,直到main函數中調用event.set后,所有的sub函數的event.wait會放行,使sub函數能繼續(xù)執(zhí)行。
可以看到asyncio.Event功能比較簡單,它的源碼實現(xiàn)也很簡單,源碼如下:
class Event(mixins._LoopBoundMixin):
def __init__(self):
self._waiters = collections.deque()
self._value = False
def is_set(self):
return self._value
def set(self):
if not self._value:
# 確保每次只能set一次
self._value = True
# 設置每個協(xié)程存放的容器為True,這樣對應的協(xié)程就可以運行了
for fut in self._waiters:
if not fut.done():
fut.set_result(True)
def clear(self):
# 清理上一次的set
self._value = False
async def wait(self):
if self._value:
# 如果設置了,就不需要等待了
return True
# 否則需要創(chuàng)建一個容器,并需要等待容器完成
fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
await fut
return True
finally:
self._waiters.remove(fut)
通過源碼可以看到wait方法主要是創(chuàng)建了一個asyncio.Future,并把它加入到deque隊列后就一直等待著,而set方法被調用時會遍歷整個deque隊列,并把處于peding狀態(tài)的asyncio.Future設置為done,這時其他在調用event.wait方法的協(xié)程就會得到放行。
通過源碼也可以看出,asyncio.Event并沒有繼承于_ContextManagerMixin,這是因為它鎖的是協(xié)程,而不是資源。
asyncio.Event的使用頻率比asyncio.Lock多許多,不過通常都會讓asyncio.Event和其他數據結構進行封裝再使用,比如實現(xiàn)一個服務器的優(yōu)雅關閉功能,這個功能會確保服務器在等待n秒后或者所有連接都關閉后才關閉服務器,這個功能就可以使用set與asyncio.Event結合,如下:
import asyncio
class SetEvent(asyncio.Event):
def __init__(self, *, loop=None):
self._set = set()
super().__init__(loop=loop)
def add(self, value):
self._set.add(value)
self.clear()
def remove(self, value):
self._set.remove(value)
if not self._set:
self.set()
這個SetEvent結合了set和SetEvent的功能,當set有數據的時候,會通過clear方法使SetEvent變?yōu)榈却隣顟B(tài),而set沒數據的時候,會通過set方法使SetEvent變?yōu)闊o需等待的狀態(tài),所有調用wait的協(xié)程都可以放行,通過這種結合,SetEvent擁有了等待資源為空的功能。 接下來就可以用于服務器的優(yōu)雅退出功能:
async def mock_conn_io() -> None:
await asyncio.sleep(1)
def conn_handle(set_event: SetEvent):
task: asyncio.Task = asyncio.create_task(mock_conn_io())
set_event.add(task)
task.add_done_callback(lambda t: set_event.remove(t))
async def main():
set_event: SetEvent = SetEvent()
for _ in range(10):
conn_handle(set_event)
# 假設這里收到了退出信號
await asyncio.wait(set_event.wait(), timeout=9)
asyncio.run(main())
在這個演示功能中,mock_conn_io用于模擬服務器的連接正在處理中,而conn_handle用于創(chuàng)建服務器連接,main則是先創(chuàng)建10個連接,并模擬在收到退出信號后等待資源為空或者超時才退出服務。
這只是簡單的演示,實際上的優(yōu)雅關閉功能要考慮的東西不僅僅是這些。
4.Condition
condition只做簡單介紹
asyncio.Condition是同步原語中使用最少的一種,因為他使用情況很奇怪,而且大部分場景可以被其他寫法代替,比如下面這個例子:
import asyncio
async def task(condition, work_list):
await asyncio.sleep(1)
work_list.append(33)
print('Task sending notification...')
async with condition:
condition.notify()
async def main():
condition = asyncio.Condition()
work_list = list()
print('Main waiting for data...')
async with condition:
_ = asyncio.create_task(task(condition, work_list))
await condition.wait()
print(f'Got data: {work_list}')
asyncio.run(main())
# >>> Main waiting for data...
# >>> Task sending notification...
# >>> Got data: [33]
在這個例子中可以看到,notify和wait方法只能在async with condition中可以使用,如果沒有在async with condition中使用則會報錯,同時這個示例代碼有點復雜,沒辦法一看就知道執(zhí)行邏輯是什么,其實這個邏輯可以轉變成一個更簡單的寫法:
import asyncio
async def task(work_list):
await asyncio.sleep(1)
work_list.append(33)
print('Task sending notification...')
return
async def main():
work_list = list()
print('Main waiting for data...')
_task = asyncio.create_task(task(work_list))
await _task
print(f'Got data: {work_list}')
asyncio.run(main())
# >>> Main waiting for data...
# >>> Task sending notification...
# >>> Got data: [33]
通過這個代碼可以看到這個寫法更簡單一點,而且更有邏輯性,而condition的寫法卻更有點Go協(xié)程寫法/或者回調函數寫法的感覺。 所以建議在認為自己的代碼可能會用到asyncio.Conditon時需要先考慮到是否需要asyncio.Codition?是否有別的方案代替,如果沒有才考慮去使用asyncio.Conditonk。
5.Semaphore
asyncio.Semaphore--信號量是同步原語中被使用最頻繁的,大多數都是用在限流場景中,比如用在爬蟲中和客戶端網關中限制請求頻率。
asyncio.Semaphore可以認為是一個延緩觸發(fā)的asyncio.Lock,asyncio.Semaphore內部會維護一個計數器,無論何時進行獲取或釋放,它都會遞增或者遞減(但不會超過邊界值),當計數器歸零時,就會進入到鎖的邏輯,但是這個鎖邏輯會在計數器大于0的時候釋放j,它的用法如下:`
import asyncio
async def main():
semaphore = asyncio.Semaphore(10):
async with semaphore:
pass
asyncio.run(main())
示例中代碼通過async with來指明一個代碼塊(代碼用pass代替),這個代碼塊是被asyncio.Semaphore管理的,每次協(xié)程在進入代碼塊時,asyncio.Semaphore的內部計數器就會遞減一,而離開代碼塊則asyncio.Semaphore的內部計數器會遞增一。
當有一個協(xié)程進入代碼塊時asyncio.Semaphore發(fā)現(xiàn)計數器已經為0了,則會使當前協(xié)程進入等待狀態(tài),直到某個協(xié)程離開這個代碼塊時,計數器會遞增一,并喚醒等待的協(xié)程,使其能夠進入代碼塊中繼續(xù)執(zhí)行。
asyncio.Semaphore的源碼如下,需要注意的是由于asyncio.Semaphore是一個延緩的asyncio.Lock,所以當調用一次release后可能會導致被喚醒的協(xié)程和剛進入代碼塊的協(xié)程起沖突,所以在acquire方法中要通過一個while循環(huán)來解決這個問題:`
class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
def __init__(self, value=1):
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._value = value
self._waiters = collections.deque()
self._wakeup_scheduled = False
def _wake_up_next(self):
while self._waiters:
# 按照放置順序依次彈出容器
waiter = self._waiters.popleft()
if not waiter.done():
# 設置容器狀態(tài),使對應的協(xié)程可以繼續(xù)執(zhí)行
waiter.set_result(None)
# 設置標記
self._wakeup_scheduled = True
return
def locked(self):
return self._value == 0
async def acquire(self):
# 如果`self._wakeup_scheduled`為True或者value小于0
while self._wakeup_scheduled or self._value <= 0:
# 創(chuàng)建容器并等待執(zhí)行完成
fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
await fut
self._wakeup_scheduled = False
except exceptions.CancelledError:
# 如果被取消了,也要喚醒下一個協(xié)程
self._wake_up_next()
raise
self._value -= 1
return True
def release(self):
# 釋放資源占用,喚醒下一個協(xié)程。
self._value += 1
self._wake_up_next()
針對asyncio.Semaphore進行修改可以實現(xiàn)很多功能,比如基于信號量可以實現(xiàn)一個簡單的協(xié)程池,這個協(xié)程池可以限制創(chuàng)建協(xié)程的量,當協(xié)程池滿的時候就無法繼續(xù)創(chuàng)建協(xié)程,只有協(xié)程中的協(xié)程執(zhí)行完畢后才能繼續(xù)創(chuàng)建(當然無法控制在協(xié)程中創(chuàng)建新的協(xié)程),代碼如下:
import asyncio
import time
from typing import Coroutine
class Pool(object):
def __init__(self, max_concurrency: int):
self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency)
async def create_task(self, coro: Coroutine) -> asyncio.Task:
await self._semaphore.acquire()
task: asyncio.Task = asyncio.create_task(coro)
task.add_done_callback(lambda t: self._semaphore.release())
return task
async def demo(cnt: int) -> None:
print(f"{int(time.time())} create {cnt} task...")
await asyncio.sleep(cnt)
async def main() -> None:
pool: Pool = Pool(3)
for i in range(10):
await pool.create_task(demo(i))
asyncio.run(main())
# >>> 1677517996 create 0 task...
# >>> 1677517996 create 1 task...
# >>> 1677517996 create 2 task...
# >>> 1677517996 create 3 task...
# >>> 1677517997 create 4 task...
# >>> 1677517998 create 5 task...
# >>> 1677517999 create 6 task...
# >>> 1677518001 create 7 task...
# >>> 1677518003 create 8 task...
# >>> 1677518005 create 9 task...以上就是Python Asyncio 庫之同步原語常用函數詳解的詳細內容,更多關于Python Asyncio 庫同步原語的資料請關注腳本之家其它相關文章!
相關文章
Python3爬蟲教程之利用Python實現(xiàn)發(fā)送天氣預報郵件
這篇文章主要給大家介紹了關于Python3爬蟲教程之利用Python實現(xiàn)發(fā)送天氣預報郵件的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面來一起看看吧2018-12-12

