簡單理解Python中的事件循環(huán)EventLoop
簡介
在 python 3中,加入了 asyncio 模塊,來實現(xiàn)協(xié)程,其中一個很重要的概念是事件循環(huán),整個異步流程都是事件循環(huán)推動的。下面自己實現(xiàn)一個相對簡單的EventLoop,了解一下事件循環(huán)是如何進行運轉(zhuǎn)的。
事件循環(huán)
下面看一下整個流程的實現(xiàn)過程
將以下代碼寫入 spider_event_loop.py 文件:
# spider_event_loop.py
import time
import os
import socket
from urllib.parse import urlparse
from collections import deque
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
# selector = DefaultSelector()
urls = ['https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image',
'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image',
'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image',
'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image',
'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image',
'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image',
'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image',
'https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image',
'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image',
'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image'
]
# 事件循環(huán)實現(xiàn)
class EventLoop:
def __init__(self):
# 整個流程是否
self._stopped = False
self._ready = deque()
self._selector = DefaultSelector()
# 向 _ready 隊列中加入回調(diào)方法
def call_soon(self, callback, *args):
# 將事件添加到隊列里
handle = Handle(callback, self, *args)
self._ready.append(handle)
# 套接字注冊讀事件
def add_writer(self, fd, callback, *args):
# 將回調(diào)方法封裝成Handle
handle = Handle(callback, self, *args)
try:
# 檢查有沒有注冊過
key = self._selector.get_key(fd)
except KeyError:
# 沒有注冊過,進行寫事件注冊
self._selector.register(fd, EVENT_WRITE, handle)
else:
# 注冊過,添加寫事件
mask = key.events
self._selector.modify(fd, mask | EVENT_WRITE, handle)
# 套接字注銷讀事件
def remove_writer(self, fd):
try:
# 檢查有沒有注冊過
self._selector.get_key(fd)
except KeyError:
# 沒有,直接返回
return
# 注銷事件
self._selector.unregister(fd)
# 套接字注冊寫事件
def add_reader(self, fd, callback, *args):
# 將回調(diào)方法,封裝成Handle
handle = Handle(callback, self, *args)
try:
# 檢查是否注冊過
key = self._selector.get_key(fd)
except KeyError:
# 沒有注冊過,進行讀事件注冊
self._selector.register(fd, EVENT_READ, handle)
else:
# 注冊過,添加讀事件
mask = key.events
self._selector.modify(fd, mask | EVENT_READ, handle)
# 套接字注銷寫事件
def remove_reader(self, fd):
try:
# 檢查有沒有注冊過
self._selector.get_key(fd)
except KeyError:
# 沒有,直接返回
return
# 注銷事件
self._selector.unregister(fd)
# 處理事件
def _process_events(self, event_list):
for key, mask in event_list:
fileobj, handle = key.fileobj, key.data
if mask & EVENT_READ:
self.remove_reader(fileobj)
if mask & EVENT_WRITE:
self.remove_writer(fileobj)
# 注冊的事件發(fā)生,將注冊時的handle放入隊列中
self._ready.append(handle)
# 運行事件隊列,執(zhí)行回調(diào)方法
def run_once(self):
# 獲取發(fā)生事件的所有文件描述符
event_list = self._selector.select()
self._process_events(event_list)
while self._ready:
# 將隊列中的handle取出,執(zhí)行回調(diào)方法
handle = self._ready.popleft()
handle.run()
# 無限循環(huán),直到stopped
def run_forever(self):
while True:
# 執(zhí)行事件
self.run_once()
if self._stopped:
break
# 執(zhí)行協(xié)程方法,直到事件循環(huán)執(zhí)行結(jié)束
def run_until_complete(self, future):
# 在 future 中添加回調(diào)方法,在 future set_value 的時候執(zhí)行這個回調(diào)方法
future.add_done_callback(self._run_until_complete_cb)
# 開始無限循環(huán)
self.run_forever()
# 返回 future 的值
return future.value
# 結(jié)束時的回調(diào)方法
def _run_until_complete_cb(self, future):
# 結(jié)束無限循環(huán)
self.close()
# 關(guān)閉方法
def close(self):
self._stopped = True
# 創(chuàng)建 Future 實例
def create_future(self):
return Future(loop=self)
# 創(chuàng)建 Task 實例
def create_task(self, coro):
return Task(coro, loop=self)以上類是一個簡單是事件循環(huán),事件循環(huán)基本的操作都已經(jīng)包含,注冊注銷事件,無限循環(huán)獲取文件描述符的事件,執(zhí)行Handle隊列,這里是使用的 _ready 隊列,表示已經(jīng)準備好的 Handle,由于沒有延時事件所以省略了調(diào)度隊列
# Handle 類,對回調(diào)方法做封裝
class Handle:
def __init__(self, callback, loop, *args):
self._callback = callback
self._args = args
# 執(zhí)行回調(diào)方法
def run(self):
self._callback(*self._args)Handle 類對回調(diào)方法進行封裝
# Future 類
class Future:
def __init__(self, loop=None):
self.value = None
# 將要執(zhí)行的回調(diào)方法
self._step_func = []
# 和事件循環(huán)關(guān)聯(lián)
if loop:
self._loop = loop
else:
self._loop = get_event_loop()
def add_done_callback(self, func):
# 添加回調(diào)方法
self._step_func.append(func)
def set_value(self, value):
# 設置值
self.value = value
for func in self._step_func:
# 將回調(diào)方法添加到事件循環(huán)的 _ready 隊列中
# 在下次事件循環(huán)中執(zhí)行回調(diào)方法
self._loop.call_soon(func, self)
# 實現(xiàn) __iter__ 方法,F(xiàn)uture 類的實例為可迭代對象
def __iter__(self):
# 該語句起到暫停協(xié)程的作用,并返回實例本身
yield self
# 該語句定義的返回值會賦給 yield from 語句等號前面的變量
return self.valueFuture類和以前的實現(xiàn),區(qū)分不是很大,只是和 loop 做關(guān)聯(lián),將回調(diào)方法放入事件循環(huán)的隊列中,依靠事件循環(huán)執(zhí)行回調(diào)方法
# Task 類,繼承 Future 類, 也是可迭代的
class Task(Future):
def __init__(self, coro, loop=None):
super().__init__(loop=loop)
self.coro = coro
# step 方法直接放入事件循環(huán)的隊列中進行執(zhí)行
# 激活協(xié)程方法運行
self._loop.call_soon(self.step, self)
def step(self, future):
try:
# 向協(xié)程發(fā)送數(shù)據(jù),驅(qū)動協(xié)程執(zhí)行
# 直到遇到 yield ,返回新的 Future實例
new_futrue = self.coro.send(future.value)
except StopIteration as exc:
# 在協(xié)程方法執(zhí)行到結(jié)束,或者 return 之后,拋出 StopIteration 異常
# 由于 Task 也是 Future, 在協(xié)程執(zhí)行完之后,最后 Task 執(zhí)行自己的回調(diào)方法
self.set_value(exc.value)
return
# 將 step 加入回調(diào)列表,等待下次驅(qū)動執(zhí)行
# 在 Future 執(zhí)行 set_value 時,又會執(zhí)行 step 方法,再次驅(qū)動協(xié)程執(zhí)行
new_futrue.add_done_callback(self.step)Task 類繼承 Future 類,類的實例也是可迭代對象,在 step 方法會不斷觸發(fā)協(xié)程繼續(xù)執(zhí)行,在協(xié)程執(zhí)行結(jié)束之后,拋出 StopIteration 異常,最后 Task 類再執(zhí)行回調(diào)方法,主要做一些清理工作或者收集結(jié)果。
class AsyncSocket:
def __init__(self, loop=None):
# 綁定事件循環(huán)
if loop:
self._loop = loop
else:
self._loop = get_event_loop()
self.sock = socket.socket()
self.sock.setblocking(False)
# 該方法用于向服務器發(fā)送連接請求并注冊監(jiān)聽套接字的可寫事件
def connect(self, address):
# 由 loop 創(chuàng)建 Future 實例
f = self._loop.create_future()
try:
self.sock.connect(address)
except BlockingIOError:
pass
# 注冊寫事件,在連接成功事件發(fā)生之后,調(diào)用回調(diào)方法
self._loop.add_writer(self.sock.fileno(), self._connect_cb, f)
# 暫停執(zhí)行,等待寫事件發(fā)生
yield from f
# 回調(diào)方法,連接事件發(fā)生
def _connect_cb(self, future):
# 設置 Future 值,并且驅(qū)動協(xié)程繼續(xù)執(zhí)行
future.set_value(None)
# 向服務器發(fā)送獲取圖片的請求
def send(self, data):
self.sock.send(data)
# 該方法會多次執(zhí)行,以獲取服務器返回的數(shù)據(jù)片段
def read(self):
f = self._loop.create_future()
# 注冊讀事件,在可讀事件發(fā)生之后,調(diào)用回調(diào)方法
self._loop.add_reader(self.sock.fileno(), self._read_cb, f, self.sock)
# 暫停執(zhí)行,等待讀事件發(fā)生
yield from f
# 返回最后的值
return f.value
# 回調(diào)方法,讀事件發(fā)生
def _read_cb(self, future, sock):
# 套接字讀取 4096 字節(jié)的數(shù)據(jù),設置 Future 值,并且驅(qū)動協(xié)程繼續(xù)執(zhí)行
future.set_value(sock.recv(4096))
# 讀取所有數(shù)據(jù)
def read_all(self):
data = b''
while True:
# 不斷讀取 sock 的數(shù)據(jù)
value = yield from self.read()
if value:
data += value
else:
return data
# 關(guān)閉客戶端套接字
def close(self):
self.sock.close()AsyncSocket 類是對 Socket 做的簡單封裝,綁定事件循環(huán) Loop ,讀寫事件都是在 Loop 中注冊,由事件循環(huán)來驅(qū)動 Socket 的讀寫。
# 爬蟲類
class Crawler:
def __init__(self, url):
self._url = url
self.url = urlparse(url)
self.response = b''
def fetch(self):
self.time = time.time()
# AsyncSocket 類的實例對象負責完成數(shù)據(jù)獲取的工作
sock = AsyncSocket()
# 向服務器發(fā)送連接請求,協(xié)程會暫停到嵌套協(xié)程中的某個 yield from 處
yield from sock.connect((self.url.netloc, 80))
data = 'GET {0} HTTP/1.1\r\nHost: {1}\r\nConnection: close\r\n\r\n \
'.format(self.url.path, self.url.netloc)
# 發(fā)送請求數(shù)據(jù)
sock.send(data.encode())
# 讀取全部的數(shù)據(jù)
self.response = yield from sock.read_all()
# 關(guān)閉 socket
sock.close()
# 將下載的圖片寫入文件
with open('pic/{}'.format(self.url.path[1:].split('/')[-1]), 'wb') as f:
f.write(self.response.split(b'\r\n\r\n')[1])
return "URL: {0}, 耗時: {1:.3f}s".format(self._url, time.time() - self.time)Crawler 完成圖片爬取工作,在 fetch 方法中,AsyncSocket 完成請求連接,發(fā)送數(shù)據(jù),接收數(shù)據(jù)的工作,整個流程非常的清晰,和同步阻塞模式流程基本相同,但是性能會有大量提升
# 事件循環(huán),全局變量
_event_loop = None
# 獲取事件循環(huán),這里是要獲取同一個全局實例
def get_event_loop():
global _event_loop
if _event_loop is None:
# 生成一個新的事件循環(huán)實例
_event_loop = EventLoop()
return _event_loop
# 收集所有的 task
def gather(tasks, loop=None):
# 使用 Future 類 收集 所有 tasks 的結(jié)果
outer = Future(loop=loop)
# tasks 數(shù)量
count = len(tasks)
nfinished = 0
# 收集結(jié)果
results = []
# 回調(diào)方法
def _gather_cb(f):
nonlocal nfinished
# 完成數(shù)量
nfinished += 1
# 收集結(jié)果
results.append(f.value)
if nfinished == count:
# 都完成之后,outer 設置值,同時執(zhí)行回調(diào)方法
outer.set_value(results)
for task in tasks:
# 所有的 task 都添加回調(diào)方法
# task 中協(xié)程方法執(zhí)行完成時, 在 step 方法中會拋出 StopIteration 異常
# 這時候 task 會執(zhí)行 set_value 方法,同時會執(zhí)行 _gather_cb 回調(diào)方法
task.add_done_callback(_gather_cb)
# 將 Future 實例返回
return outer以上獲取事件循環(huán)單例,所有的事件都是在一個循環(huán)中執(zhí)行。gather 方法使多個 task 并發(fā)執(zhí)行,并且收集所有 task 的結(jié)果
def main():
os.system('mkdir -p pic')
start = time.time()
loop = get_event_loop()
tasks = []
for url in urls:
# 爬蟲實例
crawler = Crawler(url)
# 將 fetch 方法封裝成 task 實例
tasks.append(Task(crawler.fetch()))
# gather 方法將收集所有的 task 結(jié)果,并且返回 Futrue 實例,
# Futrue 實例在 run_until_complete 方法中添加了 _run_until_complete_cb 回調(diào)方法
# 在所有的 task 執(zhí)行結(jié)束之后,F(xiàn)uture 實例執(zhí)行 set_value 方法,同時執(zhí)行回調(diào)方法 _run_until_complete_cb
# 在 _run_until_complete_cb 方法中執(zhí)行了 close 方法,無限循環(huán)結(jié)束,整個流程結(jié)束
# 返回 results 的值
results = loop.run_until_complete(gather(tasks))
for res in results:
print(res)
if __name__ == '__main__':
main()執(zhí)行 spider_event_loop.py 文件
輸出:
$ python3 spider_event_loop.py
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.055s
URL: https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.101s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.157s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.158s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.162s
URL: https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.164s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.163s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.164s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.224s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.492s
總共耗時: 0.495s
使用事件循環(huán),異步執(zhí)行爬取數(shù)據(jù),消耗時間很少,性能很高。
現(xiàn)在我們把整個流程梳理一下:
- 從main方法開始執(zhí)行
- 獲取 EventLoop 實例,loop 實例是全局變量
- 將 urls 中的 url 分別進行處理,生成 Crawler 實例
- Crawler 的 fetch 方法是協(xié)程,不會立即執(zhí)行,Task 包裝協(xié)程方法
- 在 Task 的 __init__ 方法中, 會將 Task 的 step 方法,加入到 loop 的 隊列中
- step 方法會被封裝成 Handle 方法,加入到 loop 的 _ready 的隊列中
- 在所有的 task 處理完成之后,加入到 tasks 列表中
- 接下來執(zhí)行 gather 方法,outer 是 Future 實例, 所有的 task 實例都添加 _gather_cb 回調(diào)方法
- 在 _gather_cb 方法中,會收集 task 的結(jié)果
- 接下來執(zhí)行 loop 的 run_until_complete 方法,入?yún)⑹?gather 方法返回的 outer 的 Future 實例
- run_until_complete 方法中,outer 實例加入 _run_until_complete_cb 回調(diào)方法
- 開始執(zhí)行 run_forever 方法,進入循環(huán)
- 執(zhí)行 run_once 方法,獲取所有的發(fā)生時間的文件描述符,由于現(xiàn)在還沒有注冊事件,所以事件列表為空
- 將 _ready 隊列中所有的 handle 取出,開始執(zhí)行,由于 task 的 __init__ 方法中將 step 放入了隊列中,所以這里會執(zhí)行 task 的 step 方法
- task 中的 coro 是 fetch 協(xié)程,所以在 step 方法中 coro 會執(zhí)行 send 方法,由于 future 就是 task 實例,value 還是 None,所以會激活協(xié)程方法執(zhí)行
- fetch 方法開始真正執(zhí)行,創(chuàng)建 AsyncSocket 實例 sock
- 執(zhí)行到 sock.connect((self.url.netloc, 80)) 會在 connect 方法中 創(chuàng)建新的 future 實例
- 進行 sock 請求連接,再將網(wǎng)絡套接字的文件描述符注冊寫事件,在 add_writer 方法中 _connect_cb 封裝成 handle 實例,在 事件發(fā)生之后會執(zhí)行 handle 的 run 方法
- 暫停執(zhí)行,返回 future 實例
- 由于是在 task 的 step 方法中激活的協(xié)程執(zhí)行,所以 new_future 就是返回的 future 實例
- new_future 將 step 加入到回調(diào)列表中
- 現(xiàn)在流程都暫停了,等待事件的發(fā)生
- 在事件循環(huán)中,注冊的文件描述符的寫事件發(fā)生之后,_process_events 方法循環(huán)處理所有的事件,取出事件注冊時候的 handle 方法,加入到 _ready 執(zhí)行隊列里面
- 再從 _ready 的隊列里面取出 handle 方法,此 handle 方法是注冊時候封裝的 _connect_cb 回調(diào)方法
- 執(zhí)行 _connect_cb 方法,參數(shù)是 future,也是返回的 new_future,這時候再執(zhí)行 future 的 set_value 方法
- 設置 future 的值,并且將回調(diào)隊列里的回調(diào)方法取出,加入到事件循環(huán)的 _ready 隊列里,由于 new_future 將 step 方法加入了回調(diào)隊列,所以會再次執(zhí)行 task 的 step 方法
- step 又繼續(xù)驅(qū)動協(xié)程執(zhí)行,fetch 又開始了繼續(xù)執(zhí)行
- sock 發(fā)送數(shù)據(jù),然后注冊讀事件,由于讀取數(shù)據(jù)時一次不會全部讀完,會多次注冊讀事件,讀取全部的數(shù)據(jù)
- 讀事件和寫事件是相同的,在注冊之后,就返回 future 對象,添加 step 回調(diào)方法,暫停執(zhí)行。等待事件的發(fā)生
- 在所有的數(shù)據(jù)讀取之后,fetch 方法會執(zhí)行結(jié)束,return 當前的數(shù)據(jù)
- 由于 task 實例的 step 驅(qū)動的 fetch 方法,所以 step 方法會拋出 StopIteration 異常
- task 繼承的 Future,所以可以執(zhí)行 set_value 方法,異常的值就是 fetch 返回的值
- 在 gather 方法中,task 添加了 _gather_cb 回調(diào)方法,所以在 set_value 時,會調(diào)用回調(diào)方法
- 將 task 的值收集到 results 列表中,等所有的 task 都執(zhí)行結(jié)束之后,outer 實例開始執(zhí)行 set_value
- outer 添加了 _run_until_complete_cb 回調(diào)方法,所以這里同樣會執(zhí)行回調(diào)方法,在 _run_until_complete_cb 方法中調(diào)用事件循環(huán)的 close 方法,_stopped 設置為 True
- run_forever 退出無限循環(huán),整個流程執(zhí)行結(jié)束,將 outer 的值返回
整個流程梳理完了,Task 不斷驅(qū)動協(xié)程執(zhí)行,EventLoop 監(jiān)聽事件循環(huán),又不斷驅(qū)動 Task 執(zhí)行,F(xiàn)uture 在協(xié)程的通道中傳輸數(shù)據(jù),幾個部分配合合作完成整個流程。
以上就是簡單理解Python中的事件循環(huán)EventLoop的詳細內(nèi)容,更多關(guān)于Python事件循環(huán)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
讓python同時兼容python2和python3的8個技巧分享
這篇文章主要介紹了讓python同時兼容python2和python3的8個技巧分享,對代碼稍微做些修改就可以很好的同時支持python2和python3的,需要的朋友可以參考下2014-07-07

