python Tornado異步使用場(chǎng)景源碼解析
前言
tornado在我工作前說(shuō)實(shí)話我還沒(méi)有聽(tīng)說(shuō)過(guò),今年回來(lái)后接觸的非常多。
關(guān)于想要學(xué)習(xí)異步的內(nèi)容,起因是如下場(chǎng)景:
tornado拿到請(qǐng)求后,對(duì)他的處理時(shí)間非常的長(zhǎng),一般大概有10s,晨旭讓我把他寫成異步的,如果只是對(duì)請(qǐng)求異步的是相當(dāng)好寫的。就是十分傳統(tǒng)地在收到請(qǐng)求后立即返回,然后進(jìn)行處理,處理了之后再返回給給定的callback_url
。
但我突然想到,能否對(duì)這整個(gè)處理進(jìn)行異步,將其放到后臺(tái)運(yùn)行,然后繼續(xù)接收請(qǐng)求不至于在請(qǐng)求上堵塞。
最后是沒(méi)實(shí)現(xiàn)出來(lái)……坤爺讓我去花時(shí)間了解一下tornado所著名的異步。于是我才發(fā)現(xiàn),我這想法在tornado的異步中是不可行的。(而且錯(cuò)誤的地方還蠻多的……
異步使用方式
from tornado import gen @gen.coroutine def fetch_coroutine(url): http_client = AsyncHTTPClient() response = yield http_client.fetch(url) raise gen.Return(response.body)
這是最常用也是tornado推薦的方式。不過(guò)這樣寫的前提是py2,py3有些新特性,寫的方式就不太一樣了。
正如代碼中展示的,你需要給這個(gè)函數(shù)加個(gè)gen.coroutine
裝飾器,然后在你想要異步處理的函數(shù)加個(gè)yield
。
說(shuō)實(shí)話,這已經(jīng)有點(diǎn)跟我想的不太一樣了,callback_function
呢?沒(méi)了?emmmmm好吧,這樣寫的確是沒(méi)有的。
關(guān)于協(xié)程
簡(jiǎn)單介紹以下協(xié)程,協(xié)程是線程通過(guò)自己來(lái)調(diào)度任務(wù),保存恢復(fù)上下文,而這樣做的好處就是能減少切換線程或者進(jìn)程的損耗,如果線程數(shù)量極多,但是做的事情不多,簡(jiǎn)單說(shuō)運(yùn)行類型是io密集型的話,可以考慮使用協(xié)程。
因?yàn)槿绻莄pu密集型的話,畢竟本質(zhì)上還是一個(gè)線程,所以會(huì)堵塞到其他的協(xié)程,這與我們的高效目的是相違背的。
另外正式開(kāi)始講之前,我們首先需要明確的是一個(gè)函數(shù)如果帶有yeild
,那么這個(gè)函數(shù)實(shí)質(zhì)上就是一個(gè)生成器。我們直接對(duì)其調(diào)用它返回的也是一個(gè)生成器。所有會(huì)有這樣一句話:
所有生成器都是異步的
實(shí)際上也確實(shí)如此,生成器立即返回,我們想要執(zhí)行器內(nèi)容的話,可以自行調(diào)用next
或者send
(非第一次)
于是python的yield對(duì)于協(xié)程有了天然的支持
源碼解析
注:本文章分析的源碼是 2018.12.24 的 stable 版本
coroutine
協(xié)程,也是 tornado.gen
中的一個(gè)裝飾器
這個(gè)裝飾器其內(nèi)容只有一個(gè) return。而在 tornado.gen
中其實(shí)還有個(gè)裝飾器engine
,它的replace_callback
的 默認(rèn)值為False
return _make_coroutine_wrapper(func, replace_callback=True)
以下是_make_coroutine_wrapper
的源碼,源碼十分緊湊,考慮的很全面,先說(shuō)下大多數(shù)情況
def _make_coroutine_wrapper(func, replace_callback): wrapped = func if hasattr(types, 'coroutine'): func = types.coroutine(func) @functools.wraps(wrapped) def wrapper(*args, **kwargs): future = _create_future() if replace_callback and 'callback' in kwargs: warnings.warn("callback arguments are deprecated, use the returned Future instead", DeprecationWarning, stacklevel=2) callback = kwargs.pop('callback') IOLoop.current().add_future( future, lambda future: callback(future.result())) try: result = func(*args, **kwargs) except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: future_set_exc_info(future, sys.exc_info()) try: return future finally: # Avoid circular references future = None else: if isinstance(result, GeneratorType): try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) if stack_context._state.contexts is not orig_stack_contexts: yielded = _create_future() yielded.set_exception( stack_context.StackContextInconsistentError( 'stack_context inconsistency (probably caused ' 'by yield within a "with StackContext" block)')) except (StopIteration, Return) as e: future_set_result_unless_cancelled(future, _value_from_stopiteration(e)) except Exception: future_set_exc_info(future, sys.exc_info()) else: runner = Runner(result, future, yielded) # 這一行不太理解,注釋是說(shuō)讓它保持存在 future.add_done_callback(lambda _: runner) yielded = None try: return future finally: future = None future_set_result_unless_cancelled(future, result) return future wrapper.__wrapped__ = wrapped wrapper.__tornado_coroutine__ = True return wrapper
coroutine 裝飾的函數(shù),首先會(huì)生成一個(gè) generator, 并對(duì)其第一次 next 調(diào)用。
這里我們可以注意到,這個(gè)yield
后的函數(shù)無(wú)論如何它都是會(huì)被立刻調(diào)用的。
所以 yield 后的函數(shù)必須也是異步或者耗時(shí)不長(zhǎng)的,才能達(dá)到預(yù)期的異步效果,否則該阻塞的還是會(huì)阻塞。
在 next
調(diào)用后,有個(gè)異常捕獲,在代碼 36L,這個(gè)異常捕獲猶為重要,因?yàn)槲覀冎?nbsp;yield
等式,比如var = yield func()
,var
不會(huì)被賦值成func
的返回。tornado
提供了一個(gè)異常類Return
作為返回,通過(guò)在調(diào)用出捕獲Return
異常,取出其返回值得以實(shí)現(xiàn)。
在第一次的next
后,如果沒(méi)有其他異常,就會(huì)創(chuàng)建一個(gè)Runner
類,這個(gè)Runner
類的作用就是,把其他的代碼通過(guò)yield
不斷暫?;謴?fù),放在ioloop里運(yùn)行。
Future
Future
可以說(shuō)是個(gè)中介,它是用來(lái)溝通coroutine
和ioloop
的,coroutine
返回的都是Future
但其實(shí)最重要的還是管理協(xié)程的暫停與恢復(fù),一個(gè)ioloop中保存著多個(gè)后端運(yùn)行類Runner
類的runner
方法,在ioloop中不斷暫?;謴?fù),而每一個(gè)runner
又都會(huì)綁定一個(gè)future
,只有future
被set_done
了,才表示上一階段已經(jīng)完成并暫停了,可以繼續(xù)恢復(fù)運(yùn)行。
class Future(object): def done(self):# 協(xié)程執(zhí)行完畢并暫停,可對(duì)其恢復(fù) return self._done def result(self, timeout=None): self._check_done() # 如果沒(méi)有 done 拋出異常 return self._result def add_done_callback(self, fn): # 添加回調(diào)函數(shù) if self._done: from tornado.ioloop import IOLoop IOLoop.current().add_callback(fn, self) else: self._callbacks.append(fn) def set_result(self, result): # 設(shè)置result & done self._result = result self._set_done() def _set_done(self): # 將所有回調(diào)函數(shù)放到 ioloop中 self._done = True if self._callbacks: from tornado.ioloop import IOLoop loop = IOLoop.current() for cb in self._callbacks: loop.add_callback(cb, self) self._callbacks = None
IOLoop
IOLoop是在整個(gè)tornado的主事件循環(huán)。按我理解主要做了兩件事
- 執(zhí)行異步的callback
- io復(fù)用
并且這兩件事它是寫死的,它是順序執(zhí)行的,這就直接反駁了我最開(kāi)始的想法:讓兩者并行執(zhí)行
io復(fù)用根據(jù)系統(tǒng)會(huì)自動(dòng)調(diào)整的,具體的我也不再細(xì)說(shuō)。
以下是精簡(jiǎn)之后的源碼
class PollIOLoop(IOLoop): def add_future(self, future, callback): assert is_future(future) callback = stack_context.wrap(callback) future_add_done_callback( future, lambda future: self.add_callback(callback, future)) def add_callback(self, callback, *args, **kwargs): if self._closing: return self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) # 將其以偏函數(shù)形式保存起來(lái) if thread.get_ident() != self._thread_ident: self._waker.wake() else: pass def start(self): # 這里有一堆初始化操作 while True: ncallbacks = len(self._callbacks) due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) if not self._running: break # 這里有設(shè)置poll_timeout event_pairs = self._impl.poll(poll_timeout) self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None # 這里有一些后置處理
Runner
Runner.run
是后置處理的主函數(shù),我在之前也有提到過(guò),它通過(guò)獲取future
的result
,再將其send
以恢復(fù)協(xié)程繼續(xù)執(zhí)行。
如果不能捕獲到任何異常,就說(shuō)明有新的coroutine
,新的coroutine
都是通過(guò)handle_yield
將其放進(jìn)ioloop
class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() self.stack_context_deactivate = None # 上面一堆不需要看的初始化 if self.handle_yield(first_yielded): gen = result_future = first_yielded = None self.run() def handle_yield(self, yielded): self.future = convert_yielded(yielded) if self.future is moment: self.io_loop.add_callback(self.run) return False elif not self.future.done(): def inner(f): # Break a reference cycle to speed GC. f = None self.run() self.io_loop.add_future( self.future, inner) return False return True def run(self): if self.running or self.finished: return try: self.running = True while True: future = self.future if not future.done(): return self.future = None try: orig_stack_contexts = stack_context._state.contexts exc_info = None try: value = future.result() except Exception: self.had_exception = True exc_info = sys.exc_info() future = None yielded = self.gen.send(value) except (StopIteration, Return) as e: self.finished = True self.future = _null_future if self.pending_callbacks and not self.had_exception: raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) future_set_result_unless_cancelled(self.result_future, _value_from_stopiteration(e)) self.result_future = None self._deactivate_stack_context() return except Exception: # 一些結(jié)束操作 return if not self.handle_yield(yielded): return yielded = None finally: self.running = False
以上就是python Tornado異步使用場(chǎng)景源碼解析的詳細(xì)內(nèi)容,更多關(guān)于python Tornado異步的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
pandas實(shí)現(xiàn)導(dǎo)出數(shù)據(jù)的四種方式
這篇文章主要介紹了pandas實(shí)現(xiàn)導(dǎo)出數(shù)據(jù)的四種方式,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12TensorFlow2中提供的幾種處理特征列的方法小結(jié)
本文主要介紹了TensorFlow2中提供的幾種處理特征列的方法小結(jié),主要介紹了6種方式,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09Python根據(jù)歐拉角求旋轉(zhuǎn)矩陣的實(shí)例
今天小編就為大家分享一篇Python根據(jù)歐拉角求旋轉(zhuǎn)矩陣的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-01-01python下如何讓web元素的生成更簡(jiǎn)單的分析
做web不簡(jiǎn)單,特別是當(dāng)你需要使用一些web效果的時(shí)候, 比如顯示個(gè)圓角矩形,提示框之類的,也許你認(rèn)為很簡(jiǎn)單,好讓我們分析一下:2008-07-07Python特征降維知識(shí)點(diǎn)總結(jié)
在本篇文章里小編給大家整理了一篇關(guān)于Python特征降維知識(shí)點(diǎn)總結(jié)內(nèi)容,有需要的朋友們可以學(xué)習(xí)參考下。2021-08-08