python?tornado協(xié)程調(diào)度原理示例解析
tornado 的協(xié)程實(shí)現(xiàn)原理
本文討論 tornado 的協(xié)程實(shí)現(xiàn)原理,簡(jiǎn)單做了一份筆記。
首先看一段最常見的 tornado web 代碼:
import tornado import tornado.web import tornado.gen from tornado.gen import coroutine from tornado.httpclient import AsyncHTTPClient class GenHandler(tornado.web.RequestHandler): @coroutine def get(self): url = 'http://www.baidu.com' http_client = AsyncHTTPClient() response = yield http_client.fetch(url) yield tornado.gen.sleep(5) self.write(response.body) class MainHanler(tornado.web.RequestHandler): def get(self): self.write('root') if __name__ == "__main__": application = tornado.web.Application([ (r"/", MainHanler), (r"/gen_async/", GenHandler), ], autoreload=True) application.listen(8888) tornado.ioloop.IOLoop.current().start()
其中最后一行代碼 tornado.ioloop.IOLoop.current().start()
啟動(dòng)服務(wù)。
帶著幾個(gè)問題往下看:
- 知道 yield 可以暫存執(zhí)行狀態(tài),等「合適的時(shí)機(jī)」重新恢復(fù)執(zhí)行,那么保存的狀態(tài)到哪去了?
- 上一個(gè)問題中「合適的時(shí)機(jī)」是到底是什么時(shí)候?
- 繼續(xù)接上一個(gè)問題,具體是怎么恢復(fù)執(zhí)行的?
IOLoop 類相當(dāng)于是對(duì)多路復(fù)用的封裝,起到事件循環(huán)的作用,調(diào)度整個(gè)協(xié)程執(zhí)行過程。
查看 IOLoop 的源碼,可以看到 IOLoop 繼承自 Configurable,PollIOLoop 又繼承自 IOLoop。當(dāng) IOLoop 啟動(dòng)時(shí),會(huì)確定使用哪一種多路復(fù)用方式,epoll、kqueue 還是 select?
# IOLoop 類 # IOLoop 中的 configurable_default 方法是重寫 Configurable 的 # 這里會(huì)確定使用哪種多路復(fù)用方式 @classmethod def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
# PollIOLoop類 def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl if hasattr(self._impl, 'fileno'): set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} self._events = {} self._callbacks = [] self._callback_lock = threading.Lock() self._timeouts = [] self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
PollIOLoop 中 initalize 方法中調(diào)用 add_handler 方法,注冊(cè)對(duì)應(yīng)事件的處理函數(shù),如 socket 可讀時(shí),回調(diào)哪個(gè)函數(shù)去處理。
IOLoop 和協(xié)程之間的信使:Future
class Future(object): def __init__(self): self._result = None self._exc_info = None self._callbacks = [] self.running = True def set_result(self, result): ... def set_exc_info(self, exce_info): ... def result(self): ... def exc_info(self): ... def add_done_callback(self, callback): self._callbacks.append(callback)
Future 對(duì)象起到“占位符”的作用,協(xié)程的執(zhí)行結(jié)果會(huì)通過 set_result 方式寫入其中,并調(diào)用通過 add_done_callback 設(shè)置的回調(diào)。
恢復(fù)喚醒協(xié)程的 Runner
class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() self.stack_context_deactivate = None # 上面一堆不需要看的初始化 if self.handle_yield(first_yielded): gen = result_future = first_yielded = None self.run() def handle_yield(self, yielded): self.future = convert_yielded(yielded) if self.future is moment: self.io_loop.add_callback(self.run) return False elif not self.future.done(): def inner(f): # Break a reference cycle to speed GC. f = None self.run() self.io_loop.add_future( self.future, inner) return False return True def run(self): if self.running or self.finished: return try: self.running = True while True: future = self.future if not future.done(): return self.future = None try: orig_stack_contexts = stack_context._state.contexts exc_info = None try: value = future.result() except Exception: self.had_exception = True exc_info = sys.exc_info() future = None yielded = self.gen.send(value) except (StopIteration, Return) as e: self.finished = True self.future = _null_future if self.pending_callbacks and not self.had_exception: raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) future_set_result_unless_cancelled(self.result_future, _value_from_stopiteration(e)) self.result_future = None self._deactivate_stack_context() return except Exception: # 一些結(jié)束操作 return if not self.handle_yield(yielded): return yielded = None finally: self.running = False
協(xié)程每生成一個(gè) Future,都會(huì)生成對(duì)應(yīng)的一個(gè) Runner,并將 Future 初始化注入都其中。Runner 的 run
方法中,通過 self.gen.send(Future)
來啟動(dòng) Future,當(dāng) Future 完成時(shí),將其設(shè)置成 done,并回調(diào)其預(yù)設(shè)的 callback。
第一個(gè)問題:協(xié)程的狀態(tài)保存到哪去了
IOLoop 中通過 add_future 調(diào)用實(shí)現(xiàn)類 PollIOLoop 中的 add_callback 方法,其中通過 functools 生成偏函數(shù),放入 _callbacks 列表,等待被回調(diào)執(zhí)行。
# IOLoop 的add_future def add_future(self, future, callback): """Schedules a callback on the ``IOLoop`` when the given `.Future` is finished. The callback is invoked with one argument, the `.Future`. """ assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future)) # PollIOLoop 的add_callback def add_callback(self, callback, *args, **kwargs): if thread.get_ident() != self._thread_ident: with self._callback_lock: if self._closing: return list_empty = not self._callbacks self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if list_empty: self._waker.wake() else: if self._closing: return self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs))
第二個(gè)問題:「合適的時(shí)機(jī)」是什么?
IOLoop 實(shí)際上就是對(duì)多路復(fù)用的封裝,當(dāng)?shù)讓?epoll_wait 事件發(fā)生時(shí),即會(huì)通知 IOLoop 主線程。
這一段是 IOLoop 中等待多路復(fù)用的事件,以及處理事件。
try: # 等待事件 event_pairs = self._impl.poll(poll_timeout) except Exception as e: print("wait fail") if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # 處理事件 self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None
第三個(gè)問題:具體是怎么恢復(fù)的。
Runner 通過不斷 check Future 的狀態(tài),最后調(diào)用 callback 來返回結(jié)果。
總結(jié)
首先 tornado 對(duì)多路復(fù)用系統(tǒng)調(diào)用做了封裝,來實(shí)現(xiàn)非阻塞 web 服務(wù)。
其次 tornado 通過 yield+Future+Runner 實(shí)現(xiàn)了生成 Future,Runner 監(jiān)控結(jié)果,回調(diào) callback 來實(shí)現(xiàn)協(xié)程的執(zhí)行。
參考:
http://www.dbjr.com.cn/python/2976505cr.htm
http://www.dbjr.com.cn/article/132918.htm
以上就是python tornado協(xié)程調(diào)度原理示例解析的詳細(xì)內(nèi)容,更多關(guān)于python tornado協(xié)程調(diào)度的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
python調(diào)用matplotlib模塊繪制柱狀圖
這篇文章主要為大家介紹了python調(diào)用matplotlib模塊繪制柱狀圖,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-10-10Python實(shí)現(xiàn)設(shè)置windows桌面壁紙代碼分享
這篇文章主要介紹了Python實(shí)現(xiàn)設(shè)置windows桌面壁紙,本文直接給出實(shí)現(xiàn)代碼,需要的朋友可以參考下2015-03-03利用Python快速搭建Markdown筆記發(fā)布系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了使用Python生態(tài)的成熟工具,在30分鐘內(nèi)搭建一個(gè)支持Markdown渲染、分類標(biāo)簽、全文搜索的私有化知識(shí)發(fā)布系統(tǒng),感興趣的小伙伴可以參考下2025-04-04PyTorch 如何設(shè)置隨機(jī)數(shù)種子使結(jié)果可復(fù)現(xiàn)
這篇文章主要介紹了PyTorch 設(shè)置隨機(jī)數(shù)種子使結(jié)果可復(fù)現(xiàn)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-05-05Python 實(shí)現(xiàn)簡(jiǎn)單的shell sed替換功能(實(shí)例講解)
下面小編就為大家?guī)硪黄狿ython 實(shí)現(xiàn)簡(jiǎn)單的shell sed替換功能(實(shí)例講解)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-09-09基于python3 pyQt5 QtDesignner實(shí)現(xiàn)窗口化猜數(shù)字游戲功能
這篇文章主要介紹了基于python3 pyQt5 QtDesignner實(shí)現(xiàn)窗口化猜數(shù)字游戲功能,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-07-07