python Tornado異步使用場(chǎng)景源碼解析
前言
tornado在我工作前說(shuō)實(shí)話我還沒(méi)有聽(tīng)說(shuō)過(guò),今年回來(lái)后接觸的非常多。
關(guān)于想要學(xué)習(xí)異步的內(nèi)容,起因是如下場(chǎng)景:
tornado拿到請(qǐng)求后,對(duì)他的處理時(shí)間非常的長(zhǎng),一般大概有10s,晨旭讓我把他寫成異步的,如果只是對(duì)請(qǐng)求異步的是相當(dāng)好寫的。就是十分傳統(tǒng)地在收到請(qǐng)求后立即返回,然后進(jìn)行處理,處理了之后再返回給給定的callback_url。
但我突然想到,能否對(duì)這整個(gè)處理進(jìn)行異步,將其放到后臺(tái)運(yùn)行,然后繼續(xù)接收請(qǐng)求不至于在請(qǐng)求上堵塞。
最后是沒(méi)實(shí)現(xiàn)出來(lái)……坤爺讓我去花時(shí)間了解一下tornado所著名的異步。于是我才發(fā)現(xiàn),我這想法在tornado的異步中是不可行的。(而且錯(cuò)誤的地方還蠻多的……
異步使用方式
from tornado import gen
@gen.coroutine
def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = yield http_client.fetch(url)
raise gen.Return(response.body)這是最常用也是tornado推薦的方式。不過(guò)這樣寫的前提是py2,py3有些新特性,寫的方式就不太一樣了。
正如代碼中展示的,你需要給這個(gè)函數(shù)加個(gè)gen.coroutine裝飾器,然后在你想要異步處理的函數(shù)加個(gè)yield。
說(shuō)實(shí)話,這已經(jīng)有點(diǎn)跟我想的不太一樣了,callback_function呢?沒(méi)了?emmmmm好吧,這樣寫的確是沒(méi)有的。
關(guān)于協(xié)程
簡(jiǎn)單介紹以下協(xié)程,協(xié)程是線程通過(guò)自己來(lái)調(diào)度任務(wù),保存恢復(fù)上下文,而這樣做的好處就是能減少切換線程或者進(jìn)程的損耗,如果線程數(shù)量極多,但是做的事情不多,簡(jiǎn)單說(shuō)運(yùn)行類型是io密集型的話,可以考慮使用協(xié)程。
因?yàn)槿绻莄pu密集型的話,畢竟本質(zhì)上還是一個(gè)線程,所以會(huì)堵塞到其他的協(xié)程,這與我們的高效目的是相違背的。
另外正式開(kāi)始講之前,我們首先需要明確的是一個(gè)函數(shù)如果帶有yeild,那么這個(gè)函數(shù)實(shí)質(zhì)上就是一個(gè)生成器。我們直接對(duì)其調(diào)用它返回的也是一個(gè)生成器。所有會(huì)有這樣一句話:
所有生成器都是異步的
實(shí)際上也確實(shí)如此,生成器立即返回,我們想要執(zhí)行器內(nèi)容的話,可以自行調(diào)用next或者send(非第一次)
于是python的yield對(duì)于協(xié)程有了天然的支持
源碼解析
注:本文章分析的源碼是 2018.12.24 的 stable 版本
coroutine
協(xié)程,也是 tornado.gen 中的一個(gè)裝飾器
這個(gè)裝飾器其內(nèi)容只有一個(gè) return。而在 tornado.gen 中其實(shí)還有個(gè)裝飾器engine,它的replace_callback 的 默認(rèn)值為False
return _make_coroutine_wrapper(func, replace_callback=True)
以下是_make_coroutine_wrapper 的源碼,源碼十分緊湊,考慮的很全面,先說(shuō)下大多數(shù)情況
def _make_coroutine_wrapper(func, replace_callback):
wrapped = func
if hasattr(types, 'coroutine'):
func = types.coroutine(func)
@functools.wraps(wrapped)
def wrapper(*args, **kwargs):
future = _create_future()
if replace_callback and 'callback' in kwargs:
warnings.warn("callback arguments are deprecated, use the returned Future instead",
DeprecationWarning, stacklevel=2)
callback = kwargs.pop('callback')
IOLoop.current().add_future(
future, lambda future: callback(future.result()))
try:
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = _value_from_stopiteration(e)
except Exception:
future_set_exc_info(future, sys.exc_info())
try:
return future
finally:
# Avoid circular references
future = None
else:
if isinstance(result, GeneratorType):
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result)
if stack_context._state.contexts is not orig_stack_contexts:
yielded = _create_future()
yielded.set_exception(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
future_set_result_unless_cancelled(future, _value_from_stopiteration(e))
except Exception:
future_set_exc_info(future, sys.exc_info())
else:
runner = Runner(result, future, yielded)
# 這一行不太理解,注釋是說(shuō)讓它保持存在
future.add_done_callback(lambda _: runner)
yielded = None
try:
return future
finally:
future = None
future_set_result_unless_cancelled(future, result)
return future
wrapper.__wrapped__ = wrapped
wrapper.__tornado_coroutine__ = True
return wrappercoroutine 裝飾的函數(shù),首先會(huì)生成一個(gè) generator, 并對(duì)其第一次 next 調(diào)用。
這里我們可以注意到,這個(gè)yield 后的函數(shù)無(wú)論如何它都是會(huì)被立刻調(diào)用的。
所以 yield 后的函數(shù)必須也是異步或者耗時(shí)不長(zhǎng)的,才能達(dá)到預(yù)期的異步效果,否則該阻塞的還是會(huì)阻塞。
在 next調(diào)用后,有個(gè)異常捕獲,在代碼 36L,這個(gè)異常捕獲猶為重要,因?yàn)槲覀冎?nbsp;yield 等式,比如var = yield func(),var不會(huì)被賦值成func的返回。tornado提供了一個(gè)異常類Return 作為返回,通過(guò)在調(diào)用出捕獲Return異常,取出其返回值得以實(shí)現(xiàn)。
在第一次的next后,如果沒(méi)有其他異常,就會(huì)創(chuàng)建一個(gè)Runner類,這個(gè)Runner類的作用就是,把其他的代碼通過(guò)yield不斷暫?;謴?fù),放在ioloop里運(yùn)行。
Future
Future可以說(shuō)是個(gè)中介,它是用來(lái)溝通coroutine和ioloop的,coroutine返回的都是Future
但其實(shí)最重要的還是管理協(xié)程的暫停與恢復(fù),一個(gè)ioloop中保存著多個(gè)后端運(yùn)行類Runner類的runner方法,在ioloop中不斷暫?;謴?fù),而每一個(gè)runner又都會(huì)綁定一個(gè)future,只有future被set_done了,才表示上一階段已經(jīng)完成并暫停了,可以繼續(xù)恢復(fù)運(yùn)行。
class Future(object):
def done(self):# 協(xié)程執(zhí)行完畢并暫停,可對(duì)其恢復(fù)
return self._done
def result(self, timeout=None):
self._check_done() # 如果沒(méi)有 done 拋出異常
return self._result
def add_done_callback(self, fn): # 添加回調(diào)函數(shù)
if self._done:
from tornado.ioloop import IOLoop
IOLoop.current().add_callback(fn, self)
else:
self._callbacks.append(fn)
def set_result(self, result): # 設(shè)置result & done
self._result = result
self._set_done()
def _set_done(self): # 將所有回調(diào)函數(shù)放到 ioloop中
self._done = True
if self._callbacks:
from tornado.ioloop import IOLoop
loop = IOLoop.current()
for cb in self._callbacks:
loop.add_callback(cb, self)
self._callbacks = NoneIOLoop
IOLoop是在整個(gè)tornado的主事件循環(huán)。按我理解主要做了兩件事
- 執(zhí)行異步的callback
- io復(fù)用
并且這兩件事它是寫死的,它是順序執(zhí)行的,這就直接反駁了我最開(kāi)始的想法:讓兩者并行執(zhí)行
io復(fù)用根據(jù)系統(tǒng)會(huì)自動(dòng)調(diào)整的,具體的我也不再細(xì)說(shuō)。
以下是精簡(jiǎn)之后的源碼
class PollIOLoop(IOLoop):
def add_future(self, future, callback):
assert is_future(future)
callback = stack_context.wrap(callback)
future_add_done_callback(
future, lambda future: self.add_callback(callback, future))
def add_callback(self, callback, *args, **kwargs):
if self._closing:
return
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs)) # 將其以偏函數(shù)形式保存起來(lái)
if thread.get_ident() != self._thread_ident:
self._waker.wake()
else:
pass
def start(self):
# 這里有一堆初始化操作
while True:
ncallbacks = len(self._callbacks)
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512 and
self._cancellations > (len(self._timeouts) >> 1)):
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)
for i in range(ncallbacks):
self._run_callback(self._callbacks.popleft())
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
if not self._running:
break
# 這里有設(shè)置poll_timeout
event_pairs = self._impl.poll(poll_timeout)
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
# 這里有一些后置處理Runner
Runner.run是后置處理的主函數(shù),我在之前也有提到過(guò),它通過(guò)獲取future的result,再將其send以恢復(fù)協(xié)程繼續(xù)執(zhí)行。
如果不能捕獲到任何異常,就說(shuō)明有新的coroutine,新的coroutine都是通過(guò)handle_yield將其放進(jìn)ioloop
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.future = _null_future
self.yield_point = None
self.pending_callbacks = None
self.results = None
self.running = False
self.finished = False
self.had_exception = False
self.io_loop = IOLoop.current()
self.stack_context_deactivate = None
# 上面一堆不需要看的初始化
if self.handle_yield(first_yielded):
gen = result_future = first_yielded = None
self.run()
def handle_yield(self, yielded):
self.future = convert_yielded(yielded)
if self.future is moment:
self.io_loop.add_callback(self.run)
return False
elif not self.future.done():
def inner(f):
# Break a reference cycle to speed GC.
f = None
self.run()
self.io_loop.add_future(
self.future, inner)
return False
return True
def run(self):
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done():
return
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
exc_info = None
try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()
future = None
yielded = self.gen.send(value)
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
if self.pending_callbacks and not self.had_exception:
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
future_set_result_unless_cancelled(self.result_future,
_value_from_stopiteration(e))
self.result_future = None
self._deactivate_stack_context()
return
except Exception:
# 一些結(jié)束操作
return
if not self.handle_yield(yielded):
return
yielded = None
finally:
self.running = False以上就是python Tornado異步使用場(chǎng)景源碼解析的詳細(xì)內(nèi)容,更多關(guān)于python Tornado異步的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
pandas實(shí)現(xiàn)導(dǎo)出數(shù)據(jù)的四種方式
這篇文章主要介紹了pandas實(shí)現(xiàn)導(dǎo)出數(shù)據(jù)的四種方式,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12
TensorFlow2中提供的幾種處理特征列的方法小結(jié)
本文主要介紹了TensorFlow2中提供的幾種處理特征列的方法小結(jié),主要介紹了6種方式,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09
Python根據(jù)歐拉角求旋轉(zhuǎn)矩陣的實(shí)例
今天小編就為大家分享一篇Python根據(jù)歐拉角求旋轉(zhuǎn)矩陣的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-01-01
python下如何讓web元素的生成更簡(jiǎn)單的分析
做web不簡(jiǎn)單,特別是當(dāng)你需要使用一些web效果的時(shí)候, 比如顯示個(gè)圓角矩形,提示框之類的,也許你認(rèn)為很簡(jiǎn)單,好讓我們分析一下:2008-07-07
Python特征降維知識(shí)點(diǎn)總結(jié)
在本篇文章里小編給大家整理了一篇關(guān)于Python特征降維知識(shí)點(diǎn)總結(jié)內(nèi)容,有需要的朋友們可以學(xué)習(xí)參考下。2021-08-08

