python Tornado異步使用場景源碼解析
前言
tornado在我工作前說實話我還沒有聽說過,今年回來后接觸的非常多。
關(guān)于想要學習異步的內(nèi)容,起因是如下場景:
tornado拿到請求后,對他的處理時間非常的長,一般大概有10s,晨旭讓我把他寫成異步的,如果只是對請求異步的是相當好寫的。就是十分傳統(tǒng)地在收到請求后立即返回,然后進行處理,處理了之后再返回給給定的callback_url。
但我突然想到,能否對這整個處理進行異步,將其放到后臺運行,然后繼續(xù)接收請求不至于在請求上堵塞。
最后是沒實現(xiàn)出來……坤爺讓我去花時間了解一下tornado所著名的異步。于是我才發(fā)現(xiàn),我這想法在tornado的異步中是不可行的。(而且錯誤的地方還蠻多的……
異步使用方式
from tornado import gen
@gen.coroutine
def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = yield http_client.fetch(url)
raise gen.Return(response.body)這是最常用也是tornado推薦的方式。不過這樣寫的前提是py2,py3有些新特性,寫的方式就不太一樣了。
正如代碼中展示的,你需要給這個函數(shù)加個gen.coroutine裝飾器,然后在你想要異步處理的函數(shù)加個yield。
說實話,這已經(jīng)有點跟我想的不太一樣了,callback_function呢?沒了?emmmmm好吧,這樣寫的確是沒有的。
關(guān)于協(xié)程
簡單介紹以下協(xié)程,協(xié)程是線程通過自己來調(diào)度任務,保存恢復上下文,而這樣做的好處就是能減少切換線程或者進程的損耗,如果線程數(shù)量極多,但是做的事情不多,簡單說運行類型是io密集型的話,可以考慮使用協(xié)程。
因為如果是cpu密集型的話,畢竟本質(zhì)上還是一個線程,所以會堵塞到其他的協(xié)程,這與我們的高效目的是相違背的。
另外正式開始講之前,我們首先需要明確的是一個函數(shù)如果帶有yeild,那么這個函數(shù)實質(zhì)上就是一個生成器。我們直接對其調(diào)用它返回的也是一個生成器。所有會有這樣一句話:
所有生成器都是異步的
實際上也確實如此,生成器立即返回,我們想要執(zhí)行器內(nèi)容的話,可以自行調(diào)用next或者send(非第一次)
于是python的yield對于協(xié)程有了天然的支持
源碼解析
注:本文章分析的源碼是 2018.12.24 的 stable 版本
coroutine
協(xié)程,也是 tornado.gen 中的一個裝飾器
這個裝飾器其內(nèi)容只有一個 return。而在 tornado.gen 中其實還有個裝飾器engine,它的replace_callback 的 默認值為False
return _make_coroutine_wrapper(func, replace_callback=True)
以下是_make_coroutine_wrapper 的源碼,源碼十分緊湊,考慮的很全面,先說下大多數(shù)情況
def _make_coroutine_wrapper(func, replace_callback):
wrapped = func
if hasattr(types, 'coroutine'):
func = types.coroutine(func)
@functools.wraps(wrapped)
def wrapper(*args, **kwargs):
future = _create_future()
if replace_callback and 'callback' in kwargs:
warnings.warn("callback arguments are deprecated, use the returned Future instead",
DeprecationWarning, stacklevel=2)
callback = kwargs.pop('callback')
IOLoop.current().add_future(
future, lambda future: callback(future.result()))
try:
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = _value_from_stopiteration(e)
except Exception:
future_set_exc_info(future, sys.exc_info())
try:
return future
finally:
# Avoid circular references
future = None
else:
if isinstance(result, GeneratorType):
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result)
if stack_context._state.contexts is not orig_stack_contexts:
yielded = _create_future()
yielded.set_exception(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
future_set_result_unless_cancelled(future, _value_from_stopiteration(e))
except Exception:
future_set_exc_info(future, sys.exc_info())
else:
runner = Runner(result, future, yielded)
# 這一行不太理解,注釋是說讓它保持存在
future.add_done_callback(lambda _: runner)
yielded = None
try:
return future
finally:
future = None
future_set_result_unless_cancelled(future, result)
return future
wrapper.__wrapped__ = wrapped
wrapper.__tornado_coroutine__ = True
return wrappercoroutine 裝飾的函數(shù),首先會生成一個 generator, 并對其第一次 next 調(diào)用。
這里我們可以注意到,這個yield 后的函數(shù)無論如何它都是會被立刻調(diào)用的。
所以 yield 后的函數(shù)必須也是異步或者耗時不長的,才能達到預期的異步效果,否則該阻塞的還是會阻塞。
在 next調(diào)用后,有個異常捕獲,在代碼 36L,這個異常捕獲猶為重要,因為我們知道 yield 等式,比如var = yield func(),var不會被賦值成func的返回。tornado提供了一個異常類Return 作為返回,通過在調(diào)用出捕獲Return異常,取出其返回值得以實現(xiàn)。
在第一次的next后,如果沒有其他異常,就會創(chuàng)建一個Runner類,這個Runner類的作用就是,把其他的代碼通過yield不斷暫?;謴?,放在ioloop里運行。
Future
Future可以說是個中介,它是用來溝通coroutine和ioloop的,coroutine返回的都是Future
但其實最重要的還是管理協(xié)程的暫停與恢復,一個ioloop中保存著多個后端運行類Runner類的runner方法,在ioloop中不斷暫停恢復,而每一個runner又都會綁定一個future,只有future被set_done了,才表示上一階段已經(jīng)完成并暫停了,可以繼續(xù)恢復運行。
class Future(object):
def done(self):# 協(xié)程執(zhí)行完畢并暫停,可對其恢復
return self._done
def result(self, timeout=None):
self._check_done() # 如果沒有 done 拋出異常
return self._result
def add_done_callback(self, fn): # 添加回調(diào)函數(shù)
if self._done:
from tornado.ioloop import IOLoop
IOLoop.current().add_callback(fn, self)
else:
self._callbacks.append(fn)
def set_result(self, result): # 設置result & done
self._result = result
self._set_done()
def _set_done(self): # 將所有回調(diào)函數(shù)放到 ioloop中
self._done = True
if self._callbacks:
from tornado.ioloop import IOLoop
loop = IOLoop.current()
for cb in self._callbacks:
loop.add_callback(cb, self)
self._callbacks = NoneIOLoop
IOLoop是在整個tornado的主事件循環(huán)。按我理解主要做了兩件事
- 執(zhí)行異步的callback
- io復用
并且這兩件事它是寫死的,它是順序執(zhí)行的,這就直接反駁了我最開始的想法:讓兩者并行執(zhí)行
io復用根據(jù)系統(tǒng)會自動調(diào)整的,具體的我也不再細說。
以下是精簡之后的源碼
class PollIOLoop(IOLoop):
def add_future(self, future, callback):
assert is_future(future)
callback = stack_context.wrap(callback)
future_add_done_callback(
future, lambda future: self.add_callback(callback, future))
def add_callback(self, callback, *args, **kwargs):
if self._closing:
return
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs)) # 將其以偏函數(shù)形式保存起來
if thread.get_ident() != self._thread_ident:
self._waker.wake()
else:
pass
def start(self):
# 這里有一堆初始化操作
while True:
ncallbacks = len(self._callbacks)
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512 and
self._cancellations > (len(self._timeouts) >> 1)):
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)
for i in range(ncallbacks):
self._run_callback(self._callbacks.popleft())
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
if not self._running:
break
# 這里有設置poll_timeout
event_pairs = self._impl.poll(poll_timeout)
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
# 這里有一些后置處理Runner
Runner.run是后置處理的主函數(shù),我在之前也有提到過,它通過獲取future的result,再將其send以恢復協(xié)程繼續(xù)執(zhí)行。
如果不能捕獲到任何異常,就說明有新的coroutine,新的coroutine都是通過handle_yield將其放進ioloop
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.future = _null_future
self.yield_point = None
self.pending_callbacks = None
self.results = None
self.running = False
self.finished = False
self.had_exception = False
self.io_loop = IOLoop.current()
self.stack_context_deactivate = None
# 上面一堆不需要看的初始化
if self.handle_yield(first_yielded):
gen = result_future = first_yielded = None
self.run()
def handle_yield(self, yielded):
self.future = convert_yielded(yielded)
if self.future is moment:
self.io_loop.add_callback(self.run)
return False
elif not self.future.done():
def inner(f):
# Break a reference cycle to speed GC.
f = None
self.run()
self.io_loop.add_future(
self.future, inner)
return False
return True
def run(self):
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done():
return
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
exc_info = None
try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()
future = None
yielded = self.gen.send(value)
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
if self.pending_callbacks and not self.had_exception:
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
future_set_result_unless_cancelled(self.result_future,
_value_from_stopiteration(e))
self.result_future = None
self._deactivate_stack_context()
return
except Exception:
# 一些結(jié)束操作
return
if not self.handle_yield(yielded):
return
yielded = None
finally:
self.running = False以上就是python Tornado異步使用場景源碼解析的詳細內(nèi)容,更多關(guān)于python Tornado異步的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
pandas實現(xiàn)導出數(shù)據(jù)的四種方式
這篇文章主要介紹了pandas實現(xiàn)導出數(shù)據(jù)的四種方式,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-12-12
TensorFlow2中提供的幾種處理特征列的方法小結(jié)
本文主要介紹了TensorFlow2中提供的幾種處理特征列的方法小結(jié),主要介紹了6種方式,具有一定的參考價值,感興趣的可以了解一下2023-09-09
Python根據(jù)歐拉角求旋轉(zhuǎn)矩陣的實例
今天小編就為大家分享一篇Python根據(jù)歐拉角求旋轉(zhuǎn)矩陣的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-01-01

