簡單理解Python中的事件循環(huán)EventLoop
簡介
在 python 3中,加入了 asyncio 模塊,來實現(xiàn)協(xié)程,其中一個很重要的概念是事件循環(huán),整個異步流程都是事件循環(huán)推動的。下面自己實現(xiàn)一個相對簡單的EventLoop,了解一下事件循環(huán)是如何進行運轉(zhuǎn)的。
事件循環(huán)
下面看一下整個流程的實現(xiàn)過程
將以下代碼寫入 spider_event_loop.py 文件:
# spider_event_loop.py import time import os import socket from urllib.parse import urlparse from collections import deque from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ # selector = DefaultSelector() urls = ['https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image', 'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image', 'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image', 'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image', 'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image', 'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image', 'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image', 'https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image', 'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image', 'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image' ] # 事件循環(huán)實現(xiàn) class EventLoop: def __init__(self): # 整個流程是否 self._stopped = False self._ready = deque() self._selector = DefaultSelector() # 向 _ready 隊列中加入回調(diào)方法 def call_soon(self, callback, *args): # 將事件添加到隊列里 handle = Handle(callback, self, *args) self._ready.append(handle) # 套接字注冊讀事件 def add_writer(self, fd, callback, *args): # 將回調(diào)方法封裝成Handle handle = Handle(callback, self, *args) try: # 檢查有沒有注冊過 key = self._selector.get_key(fd) except KeyError: # 沒有注冊過,進行寫事件注冊 self._selector.register(fd, EVENT_WRITE, handle) else: # 注冊過,添加寫事件 mask = key.events self._selector.modify(fd, mask | EVENT_WRITE, handle) # 套接字注銷讀事件 def remove_writer(self, fd): try: # 檢查有沒有注冊過 self._selector.get_key(fd) except KeyError: # 沒有,直接返回 return # 注銷事件 self._selector.unregister(fd) # 套接字注冊寫事件 def add_reader(self, fd, callback, *args): # 將回調(diào)方法,封裝成Handle handle = Handle(callback, self, *args) try: # 檢查是否注冊過 key = self._selector.get_key(fd) except KeyError: # 沒有注冊過,進行讀事件注冊 self._selector.register(fd, EVENT_READ, handle) else: # 注冊過,添加讀事件 mask = key.events self._selector.modify(fd, mask | EVENT_READ, handle) # 套接字注銷寫事件 def remove_reader(self, fd): try: # 檢查有沒有注冊過 self._selector.get_key(fd) except KeyError: # 沒有,直接返回 return # 注銷事件 self._selector.unregister(fd) # 處理事件 def _process_events(self, event_list): for key, mask in event_list: fileobj, handle = key.fileobj, key.data if mask & EVENT_READ: self.remove_reader(fileobj) if mask & EVENT_WRITE: self.remove_writer(fileobj) # 注冊的事件發(fā)生,將注冊時的handle放入隊列中 self._ready.append(handle) # 運行事件隊列,執(zhí)行回調(diào)方法 def run_once(self): # 獲取發(fā)生事件的所有文件描述符 event_list = self._selector.select() self._process_events(event_list) while self._ready: # 將隊列中的handle取出,執(zhí)行回調(diào)方法 handle = self._ready.popleft() handle.run() # 無限循環(huán),直到stopped def run_forever(self): while True: # 執(zhí)行事件 self.run_once() if self._stopped: break # 執(zhí)行協(xié)程方法,直到事件循環(huán)執(zhí)行結束 def run_until_complete(self, future): # 在 future 中添加回調(diào)方法,在 future set_value 的時候執(zhí)行這個回調(diào)方法 future.add_done_callback(self._run_until_complete_cb) # 開始無限循環(huán) self.run_forever() # 返回 future 的值 return future.value # 結束時的回調(diào)方法 def _run_until_complete_cb(self, future): # 結束無限循環(huán) self.close() # 關閉方法 def close(self): self._stopped = True # 創(chuàng)建 Future 實例 def create_future(self): return Future(loop=self) # 創(chuàng)建 Task 實例 def create_task(self, coro): return Task(coro, loop=self)
以上類是一個簡單是事件循環(huán),事件循環(huán)基本的操作都已經(jīng)包含,注冊注銷事件,無限循環(huán)獲取文件描述符的事件,執(zhí)行Handle隊列,這里是使用的 _ready 隊列,表示已經(jīng)準備好的 Handle,由于沒有延時事件所以省略了調(diào)度隊列
# Handle 類,對回調(diào)方法做封裝 class Handle: def __init__(self, callback, loop, *args): self._callback = callback self._args = args # 執(zhí)行回調(diào)方法 def run(self): self._callback(*self._args)
Handle 類對回調(diào)方法進行封裝
# Future 類 class Future: def __init__(self, loop=None): self.value = None # 將要執(zhí)行的回調(diào)方法 self._step_func = [] # 和事件循環(huán)關聯(lián) if loop: self._loop = loop else: self._loop = get_event_loop() def add_done_callback(self, func): # 添加回調(diào)方法 self._step_func.append(func) def set_value(self, value): # 設置值 self.value = value for func in self._step_func: # 將回調(diào)方法添加到事件循環(huán)的 _ready 隊列中 # 在下次事件循環(huán)中執(zhí)行回調(diào)方法 self._loop.call_soon(func, self) # 實現(xiàn) __iter__ 方法,F(xiàn)uture 類的實例為可迭代對象 def __iter__(self): # 該語句起到暫停協(xié)程的作用,并返回實例本身 yield self # 該語句定義的返回值會賦給 yield from 語句等號前面的變量 return self.value
Future類和以前的實現(xiàn),區(qū)分不是很大,只是和 loop 做關聯(lián),將回調(diào)方法放入事件循環(huán)的隊列中,依靠事件循環(huán)執(zhí)行回調(diào)方法
# Task 類,繼承 Future 類, 也是可迭代的 class Task(Future): def __init__(self, coro, loop=None): super().__init__(loop=loop) self.coro = coro # step 方法直接放入事件循環(huán)的隊列中進行執(zhí)行 # 激活協(xié)程方法運行 self._loop.call_soon(self.step, self) def step(self, future): try: # 向協(xié)程發(fā)送數(shù)據(jù),驅(qū)動協(xié)程執(zhí)行 # 直到遇到 yield ,返回新的 Future實例 new_futrue = self.coro.send(future.value) except StopIteration as exc: # 在協(xié)程方法執(zhí)行到結束,或者 return 之后,拋出 StopIteration 異常 # 由于 Task 也是 Future, 在協(xié)程執(zhí)行完之后,最后 Task 執(zhí)行自己的回調(diào)方法 self.set_value(exc.value) return # 將 step 加入回調(diào)列表,等待下次驅(qū)動執(zhí)行 # 在 Future 執(zhí)行 set_value 時,又會執(zhí)行 step 方法,再次驅(qū)動協(xié)程執(zhí)行 new_futrue.add_done_callback(self.step)
Task 類繼承 Future 類,類的實例也是可迭代對象,在 step 方法會不斷觸發(fā)協(xié)程繼續(xù)執(zhí)行,在協(xié)程執(zhí)行結束之后,拋出 StopIteration 異常,最后 Task 類再執(zhí)行回調(diào)方法,主要做一些清理工作或者收集結果。
class AsyncSocket: def __init__(self, loop=None): # 綁定事件循環(huán) if loop: self._loop = loop else: self._loop = get_event_loop() self.sock = socket.socket() self.sock.setblocking(False) # 該方法用于向服務器發(fā)送連接請求并注冊監(jiān)聽套接字的可寫事件 def connect(self, address): # 由 loop 創(chuàng)建 Future 實例 f = self._loop.create_future() try: self.sock.connect(address) except BlockingIOError: pass # 注冊寫事件,在連接成功事件發(fā)生之后,調(diào)用回調(diào)方法 self._loop.add_writer(self.sock.fileno(), self._connect_cb, f) # 暫停執(zhí)行,等待寫事件發(fā)生 yield from f # 回調(diào)方法,連接事件發(fā)生 def _connect_cb(self, future): # 設置 Future 值,并且驅(qū)動協(xié)程繼續(xù)執(zhí)行 future.set_value(None) # 向服務器發(fā)送獲取圖片的請求 def send(self, data): self.sock.send(data) # 該方法會多次執(zhí)行,以獲取服務器返回的數(shù)據(jù)片段 def read(self): f = self._loop.create_future() # 注冊讀事件,在可讀事件發(fā)生之后,調(diào)用回調(diào)方法 self._loop.add_reader(self.sock.fileno(), self._read_cb, f, self.sock) # 暫停執(zhí)行,等待讀事件發(fā)生 yield from f # 返回最后的值 return f.value # 回調(diào)方法,讀事件發(fā)生 def _read_cb(self, future, sock): # 套接字讀取 4096 字節(jié)的數(shù)據(jù),設置 Future 值,并且驅(qū)動協(xié)程繼續(xù)執(zhí)行 future.set_value(sock.recv(4096)) # 讀取所有數(shù)據(jù) def read_all(self): data = b'' while True: # 不斷讀取 sock 的數(shù)據(jù) value = yield from self.read() if value: data += value else: return data # 關閉客戶端套接字 def close(self): self.sock.close()
AsyncSocket 類是對 Socket 做的簡單封裝,綁定事件循環(huán) Loop ,讀寫事件都是在 Loop 中注冊,由事件循環(huán)來驅(qū)動 Socket 的讀寫。
# 爬蟲類 class Crawler: def __init__(self, url): self._url = url self.url = urlparse(url) self.response = b'' def fetch(self): self.time = time.time() # AsyncSocket 類的實例對象負責完成數(shù)據(jù)獲取的工作 sock = AsyncSocket() # 向服務器發(fā)送連接請求,協(xié)程會暫停到嵌套協(xié)程中的某個 yield from 處 yield from sock.connect((self.url.netloc, 80)) data = 'GET {0} HTTP/1.1\r\nHost: {1}\r\nConnection: close\r\n\r\n \ '.format(self.url.path, self.url.netloc) # 發(fā)送請求數(shù)據(jù) sock.send(data.encode()) # 讀取全部的數(shù)據(jù) self.response = yield from sock.read_all() # 關閉 socket sock.close() # 將下載的圖片寫入文件 with open('pic/{}'.format(self.url.path[1:].split('/')[-1]), 'wb') as f: f.write(self.response.split(b'\r\n\r\n')[1]) return "URL: {0}, 耗時: {1:.3f}s".format(self._url, time.time() - self.time)
Crawler 完成圖片爬取工作,在 fetch 方法中,AsyncSocket 完成請求連接,發(fā)送數(shù)據(jù),接收數(shù)據(jù)的工作,整個流程非常的清晰,和同步阻塞模式流程基本相同,但是性能會有大量提升
# 事件循環(huán),全局變量 _event_loop = None # 獲取事件循環(huán),這里是要獲取同一個全局實例 def get_event_loop(): global _event_loop if _event_loop is None: # 生成一個新的事件循環(huán)實例 _event_loop = EventLoop() return _event_loop # 收集所有的 task def gather(tasks, loop=None): # 使用 Future 類 收集 所有 tasks 的結果 outer = Future(loop=loop) # tasks 數(shù)量 count = len(tasks) nfinished = 0 # 收集結果 results = [] # 回調(diào)方法 def _gather_cb(f): nonlocal nfinished # 完成數(shù)量 nfinished += 1 # 收集結果 results.append(f.value) if nfinished == count: # 都完成之后,outer 設置值,同時執(zhí)行回調(diào)方法 outer.set_value(results) for task in tasks: # 所有的 task 都添加回調(diào)方法 # task 中協(xié)程方法執(zhí)行完成時, 在 step 方法中會拋出 StopIteration 異常 # 這時候 task 會執(zhí)行 set_value 方法,同時會執(zhí)行 _gather_cb 回調(diào)方法 task.add_done_callback(_gather_cb) # 將 Future 實例返回 return outer
以上獲取事件循環(huán)單例,所有的事件都是在一個循環(huán)中執(zhí)行。gather 方法使多個 task 并發(fā)執(zhí)行,并且收集所有 task 的結果
def main(): os.system('mkdir -p pic') start = time.time() loop = get_event_loop() tasks = [] for url in urls: # 爬蟲實例 crawler = Crawler(url) # 將 fetch 方法封裝成 task 實例 tasks.append(Task(crawler.fetch())) # gather 方法將收集所有的 task 結果,并且返回 Futrue 實例, # Futrue 實例在 run_until_complete 方法中添加了 _run_until_complete_cb 回調(diào)方法 # 在所有的 task 執(zhí)行結束之后,F(xiàn)uture 實例執(zhí)行 set_value 方法,同時執(zhí)行回調(diào)方法 _run_until_complete_cb # 在 _run_until_complete_cb 方法中執(zhí)行了 close 方法,無限循環(huán)結束,整個流程結束 # 返回 results 的值 results = loop.run_until_complete(gather(tasks)) for res in results: print(res) if __name__ == '__main__': main()
執(zhí)行 spider_event_loop.py 文件
輸出:
$ python3 spider_event_loop.py
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.055s
URL: https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.101s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.157s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.158s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.162s
URL: https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.164s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.163s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.164s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.224s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image, 耗時: 0.492s
總共耗時: 0.495s
使用事件循環(huán),異步執(zhí)行爬取數(shù)據(jù),消耗時間很少,性能很高。
現(xiàn)在我們把整個流程梳理一下:
- 從main方法開始執(zhí)行
- 獲取 EventLoop 實例,loop 實例是全局變量
- 將 urls 中的 url 分別進行處理,生成 Crawler 實例
- Crawler 的 fetch 方法是協(xié)程,不會立即執(zhí)行,Task 包裝協(xié)程方法
- 在 Task 的 __init__ 方法中, 會將 Task 的 step 方法,加入到 loop 的 隊列中
- step 方法會被封裝成 Handle 方法,加入到 loop 的 _ready 的隊列中
- 在所有的 task 處理完成之后,加入到 tasks 列表中
- 接下來執(zhí)行 gather 方法,outer 是 Future 實例, 所有的 task 實例都添加 _gather_cb 回調(diào)方法
- 在 _gather_cb 方法中,會收集 task 的結果
- 接下來執(zhí)行 loop 的 run_until_complete 方法,入?yún)⑹?gather 方法返回的 outer 的 Future 實例
- run_until_complete 方法中,outer 實例加入 _run_until_complete_cb 回調(diào)方法
- 開始執(zhí)行 run_forever 方法,進入循環(huán)
- 執(zhí)行 run_once 方法,獲取所有的發(fā)生時間的文件描述符,由于現(xiàn)在還沒有注冊事件,所以事件列表為空
- 將 _ready 隊列中所有的 handle 取出,開始執(zhí)行,由于 task 的 __init__ 方法中將 step 放入了隊列中,所以這里會執(zhí)行 task 的 step 方法
- task 中的 coro 是 fetch 協(xié)程,所以在 step 方法中 coro 會執(zhí)行 send 方法,由于 future 就是 task 實例,value 還是 None,所以會激活協(xié)程方法執(zhí)行
- fetch 方法開始真正執(zhí)行,創(chuàng)建 AsyncSocket 實例 sock
- 執(zhí)行到 sock.connect((self.url.netloc, 80)) 會在 connect 方法中 創(chuàng)建新的 future 實例
- 進行 sock 請求連接,再將網(wǎng)絡套接字的文件描述符注冊寫事件,在 add_writer 方法中 _connect_cb 封裝成 handle 實例,在 事件發(fā)生之后會執(zhí)行 handle 的 run 方法
- 暫停執(zhí)行,返回 future 實例
- 由于是在 task 的 step 方法中激活的協(xié)程執(zhí)行,所以 new_future 就是返回的 future 實例
- new_future 將 step 加入到回調(diào)列表中
- 現(xiàn)在流程都暫停了,等待事件的發(fā)生
- 在事件循環(huán)中,注冊的文件描述符的寫事件發(fā)生之后,_process_events 方法循環(huán)處理所有的事件,取出事件注冊時候的 handle 方法,加入到 _ready 執(zhí)行隊列里面
- 再從 _ready 的隊列里面取出 handle 方法,此 handle 方法是注冊時候封裝的 _connect_cb 回調(diào)方法
- 執(zhí)行 _connect_cb 方法,參數(shù)是 future,也是返回的 new_future,這時候再執(zhí)行 future 的 set_value 方法
- 設置 future 的值,并且將回調(diào)隊列里的回調(diào)方法取出,加入到事件循環(huán)的 _ready 隊列里,由于 new_future 將 step 方法加入了回調(diào)隊列,所以會再次執(zhí)行 task 的 step 方法
- step 又繼續(xù)驅(qū)動協(xié)程執(zhí)行,fetch 又開始了繼續(xù)執(zhí)行
- sock 發(fā)送數(shù)據(jù),然后注冊讀事件,由于讀取數(shù)據(jù)時一次不會全部讀完,會多次注冊讀事件,讀取全部的數(shù)據(jù)
- 讀事件和寫事件是相同的,在注冊之后,就返回 future 對象,添加 step 回調(diào)方法,暫停執(zhí)行。等待事件的發(fā)生
- 在所有的數(shù)據(jù)讀取之后,fetch 方法會執(zhí)行結束,return 當前的數(shù)據(jù)
- 由于 task 實例的 step 驅(qū)動的 fetch 方法,所以 step 方法會拋出 StopIteration 異常
- task 繼承的 Future,所以可以執(zhí)行 set_value 方法,異常的值就是 fetch 返回的值
- 在 gather 方法中,task 添加了 _gather_cb 回調(diào)方法,所以在 set_value 時,會調(diào)用回調(diào)方法
- 將 task 的值收集到 results 列表中,等所有的 task 都執(zhí)行結束之后,outer 實例開始執(zhí)行 set_value
- outer 添加了 _run_until_complete_cb 回調(diào)方法,所以這里同樣會執(zhí)行回調(diào)方法,在 _run_until_complete_cb 方法中調(diào)用事件循環(huán)的 close 方法,_stopped 設置為 True
- run_forever 退出無限循環(huán),整個流程執(zhí)行結束,將 outer 的值返回
整個流程梳理完了,Task 不斷驅(qū)動協(xié)程執(zhí)行,EventLoop 監(jiān)聽事件循環(huán),又不斷驅(qū)動 Task 執(zhí)行,F(xiàn)uture 在協(xié)程的通道中傳輸數(shù)據(jù),幾個部分配合合作完成整個流程。
以上就是簡單理解Python中的事件循環(huán)EventLoop的詳細內(nèi)容,更多關于Python事件循環(huán)的資料請關注腳本之家其它相關文章!
相關文章
讓python同時兼容python2和python3的8個技巧分享
這篇文章主要介紹了讓python同時兼容python2和python3的8個技巧分享,對代碼稍微做些修改就可以很好的同時支持python2和python3的,需要的朋友可以參考下2014-07-07