欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python Tornado異步使用場(chǎng)景源碼解析

 更新時(shí)間:2023年09月08日 09:17:36   作者:NodeKey  
這篇文章主要為大家介紹了python Tornado異步使用場(chǎng)景源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

tornado在我工作前說(shuō)實(shí)話我還沒(méi)有聽(tīng)說(shuō)過(guò),今年回來(lái)后接觸的非常多。

關(guān)于想要學(xué)習(xí)異步的內(nèi)容,起因是如下場(chǎng)景:

tornado拿到請(qǐng)求后,對(duì)他的處理時(shí)間非常的長(zhǎng),一般大概有10s,晨旭讓我把他寫成異步的,如果只是對(duì)請(qǐng)求異步的是相當(dāng)好寫的。就是十分傳統(tǒng)地在收到請(qǐng)求后立即返回,然后進(jìn)行處理,處理了之后再返回給給定的callback_url。

但我突然想到,能否對(duì)這整個(gè)處理進(jìn)行異步,將其放到后臺(tái)運(yùn)行,然后繼續(xù)接收請(qǐng)求不至于在請(qǐng)求上堵塞。

最后是沒(méi)實(shí)現(xiàn)出來(lái)……坤爺讓我去花時(shí)間了解一下tornado所著名的異步。于是我才發(fā)現(xiàn),我這想法在tornado的異步中是不可行的。(而且錯(cuò)誤的地方還蠻多的……

異步使用方式

from tornado import gen
@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    raise gen.Return(response.body)

這是最常用也是tornado推薦的方式。不過(guò)這樣寫的前提是py2,py3有些新特性,寫的方式就不太一樣了。

正如代碼中展示的,你需要給這個(gè)函數(shù)加個(gè)gen.coroutine裝飾器,然后在你想要異步處理的函數(shù)加個(gè)yield。

說(shuō)實(shí)話,這已經(jīng)有點(diǎn)跟我想的不太一樣了,callback_function呢?沒(méi)了?emmmmm好吧,這樣寫的確是沒(méi)有的。

關(guān)于協(xié)程

簡(jiǎn)單介紹以下協(xié)程,協(xié)程是線程通過(guò)自己來(lái)調(diào)度任務(wù),保存恢復(fù)上下文,而這樣做的好處就是能減少切換線程或者進(jìn)程的損耗,如果線程數(shù)量極多,但是做的事情不多,簡(jiǎn)單說(shuō)運(yùn)行類型是io密集型的話,可以考慮使用協(xié)程。
因?yàn)槿绻莄pu密集型的話,畢竟本質(zhì)上還是一個(gè)線程,所以會(huì)堵塞到其他的協(xié)程,這與我們的高效目的是相違背的。

另外正式開(kāi)始講之前,我們首先需要明確的是一個(gè)函數(shù)如果帶有yeild,那么這個(gè)函數(shù)實(shí)質(zhì)上就是一個(gè)生成器。我們直接對(duì)其調(diào)用它返回的也是一個(gè)生成器。所有會(huì)有這樣一句話:

所有生成器都是異步的

實(shí)際上也確實(shí)如此,生成器立即返回,我們想要執(zhí)行器內(nèi)容的話,可以自行調(diào)用next或者send(非第一次)

于是python的yield對(duì)于協(xié)程有了天然的支持

源碼解析

注:本文章分析的源碼是 2018.12.24 的 stable 版本

coroutine

協(xié)程,也是 tornado.gen 中的一個(gè)裝飾器

這個(gè)裝飾器其內(nèi)容只有一個(gè) return。而在 tornado.gen 中其實(shí)還有個(gè)裝飾器engine,它的replace_callback 的 默認(rèn)值為False

return _make_coroutine_wrapper(func, replace_callback=True)

以下是_make_coroutine_wrapper 的源碼,源碼十分緊湊,考慮的很全面,先說(shuō)下大多數(shù)情況

def _make_coroutine_wrapper(func, replace_callback):
    wrapped = func
    if hasattr(types, 'coroutine'):
        func = types.coroutine(func)
    @functools.wraps(wrapped)
    def wrapper(*args, **kwargs):
        future = _create_future()
        if replace_callback and 'callback' in kwargs:
            warnings.warn("callback arguments are deprecated, use the returned Future instead",
                          DeprecationWarning, stacklevel=2)
            callback = kwargs.pop('callback')
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))
        try:
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            result = _value_from_stopiteration(e)
        except Exception:
            future_set_exc_info(future, sys.exc_info())
            try:
                return future
            finally:
                # Avoid circular references
                future = None
        else:
            if isinstance(result, GeneratorType):
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = _create_future()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    future_set_result_unless_cancelled(future, _value_from_stopiteration(e))
                except Exception:
                    future_set_exc_info(future, sys.exc_info())
                else:
                    runner = Runner(result, future, yielded)
                    # 這一行不太理解,注釋是說(shuō)讓它保持存在
                    future.add_done_callback(lambda _: runner)
                yielded = None
                try:
                    return future
                finally:
                    future = None
        future_set_result_unless_cancelled(future, result)
        return future
    wrapper.__wrapped__ = wrapped
    wrapper.__tornado_coroutine__ = True
    return wrapper

coroutine 裝飾的函數(shù),首先會(huì)生成一個(gè) generator, 并對(duì)其第一次 next 調(diào)用。

這里我們可以注意到,這個(gè)yield 后的函數(shù)無(wú)論如何它都是會(huì)被立刻調(diào)用的。

所以 yield 后的函數(shù)必須也是異步或者耗時(shí)不長(zhǎng)的,才能達(dá)到預(yù)期的異步效果,否則該阻塞的還是會(huì)阻塞。

在 next調(diào)用后,有個(gè)異常捕獲,在代碼 36L,這個(gè)異常捕獲猶為重要,因?yàn)槲覀冎?nbsp;yield 等式,比如var = yield func(),var不會(huì)被賦值成func的返回。tornado提供了一個(gè)異常類Return 作為返回,通過(guò)在調(diào)用出捕獲Return異常,取出其返回值得以實(shí)現(xiàn)。

在第一次的next后,如果沒(méi)有其他異常,就會(huì)創(chuàng)建一個(gè)Runner類,這個(gè)Runner類的作用就是,把其他的代碼通過(guò)yield不斷暫?;謴?fù),放在ioloop里運(yùn)行。

Future

Future可以說(shuō)是個(gè)中介,它是用來(lái)溝通coroutineioloop的,coroutine返回的都是Future

但其實(shí)最重要的還是管理協(xié)程的暫停與恢復(fù),一個(gè)ioloop中保存著多個(gè)后端運(yùn)行類Runner類的runner方法,在ioloop中不斷暫?;謴?fù),而每一個(gè)runner又都會(huì)綁定一個(gè)future,只有futureset_done了,才表示上一階段已經(jīng)完成并暫停了,可以繼續(xù)恢復(fù)運(yùn)行。

class Future(object):
    def done(self):# 協(xié)程執(zhí)行完畢并暫停,可對(duì)其恢復(fù)
        return self._done
    def result(self, timeout=None):
        self._check_done() # 如果沒(méi)有 done 拋出異常 
        return self._result
    def add_done_callback(self, fn): # 添加回調(diào)函數(shù)
        if self._done:
            from tornado.ioloop import IOLoop
            IOLoop.current().add_callback(fn, self)
        else:
            self._callbacks.append(fn)
    def set_result(self, result): # 設(shè)置result & done
        self._result = result
        self._set_done()
    def _set_done(self): # 將所有回調(diào)函數(shù)放到 ioloop中
        self._done = True
        if self._callbacks:
            from tornado.ioloop import IOLoop
            loop = IOLoop.current()
            for cb in self._callbacks:
                loop.add_callback(cb, self)
            self._callbacks = None

IOLoop

IOLoop是在整個(gè)tornado的主事件循環(huán)。按我理解主要做了兩件事

  • 執(zhí)行異步的callback
  • io復(fù)用

并且這兩件事它是寫死的,它是順序執(zhí)行的,這就直接反駁了我最開(kāi)始的想法:讓兩者并行執(zhí)行
io復(fù)用根據(jù)系統(tǒng)會(huì)自動(dòng)調(diào)整的,具體的我也不再細(xì)說(shuō)。

以下是精簡(jiǎn)之后的源碼

class PollIOLoop(IOLoop):
    def add_future(self, future, callback):
        assert is_future(future)
        callback = stack_context.wrap(callback)
        future_add_done_callback(
            future, lambda future: self.add_callback(callback, future))
    def add_callback(self, callback, *args, **kwargs):
        if self._closing:
            return
        self._callbacks.append(functools.partial(
            stack_context.wrap(callback), *args, **kwargs)) # 將其以偏函數(shù)形式保存起來(lái)
        if thread.get_ident() != self._thread_ident:
            self._waker.wake()
        else:
            pass
    def start(self):
        # 這里有一堆初始化操作
        while True:
            ncallbacks = len(self._callbacks)
            due_timeouts = []
            if self._timeouts:
            	now = self.time()
                while self._timeouts:
                    if self._timeouts[0].callback is None:
                    	heapq.heappop(self._timeouts)
                        self._cancellations -= 1
                    elif self._timeouts[0].deadline <= now:
                        due_timeouts.append(heapq.heappop(self._timeouts))
                    else:
                        break
                if (self._cancellations > 512 and
                        self._cancellations > (len(self._timeouts) >> 1)):
                    self._cancellations = 0
                    self._timeouts = [x for x in self._timeouts
                                      if x.callback is not None]
                    heapq.heapify(self._timeouts)
            for i in range(ncallbacks):
            	self._run_callback(self._callbacks.popleft())
            for timeout in due_timeouts:
                if timeout.callback is not None:
                    self._run_callback(timeout.callback)
            if not self._running:
                break
            # 這里有設(shè)置poll_timeout
            event_pairs = self._impl.poll(poll_timeout)
            self._events.update(event_pairs)
            while self._events:
                fd, events = self._events.popitem()
                try:
                    fd_obj, handler_func = self._handlers[fd]
                    handler_func(fd_obj, events)
                except (OSError, IOError) as e:
                    if errno_from_exception(e) == errno.EPIPE:
                        pass
                    else:
                        self.handle_callback_exception(self._handlers.get(fd))
                except Exception:
                    self.handle_callback_exception(self._handlers.get(fd))
            fd_obj = handler_func = None
            # 這里有一些后置處理

Runner

Runner.run是后置處理的主函數(shù),我在之前也有提到過(guò),它通過(guò)獲取futureresult,再將其send以恢復(fù)協(xié)程繼續(xù)執(zhí)行。

如果不能捕獲到任何異常,就說(shuō)明有新的coroutine,新的coroutine都是通過(guò)handle_yield將其放進(jìn)ioloop

class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        self.yield_point = None
        self.pending_callbacks = None
        self.results = None
        self.running = False
        self.finished = False
        self.had_exception = False
        self.io_loop = IOLoop.current()
        self.stack_context_deactivate = None
        # 上面一堆不需要看的初始化
        if self.handle_yield(first_yielded):
            gen = result_future = first_yielded = None
            self.run()
	def handle_yield(self, yielded):
        self.future = convert_yielded(yielded)
        if self.future is moment:
            self.io_loop.add_callback(self.run)
            return False
        elif not self.future.done():
            def inner(f):
                # Break a reference cycle to speed GC.
                f = None
                self.run()
            self.io_loop.add_future(
                self.future, inner)
            return False
        return True
    def run(self):
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None
                    try:
                        value = future.result()
                    except Exception:
                        self.had_exception = True
                        exc_info = sys.exc_info()
                    future = None
                    yielded = self.gen.send(value)
                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    if self.pending_callbacks and not self.had_exception:
                        raise LeakedCallbackError(
                            "finished without waiting for callbacks %r" %
                            self.pending_callbacks)
                    future_set_result_unless_cancelled(self.result_future,
_value_from_stopiteration(e))
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                except Exception:
                    # 一些結(jié)束操作
                    return
                if not self.handle_yield(yielded):
                    return
                yielded = None
        finally:
            self.running = False

以上就是python Tornado異步使用場(chǎng)景源碼解析的詳細(xì)內(nèi)容,更多關(guān)于python Tornado異步的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Python生成隨機(jī)密碼

    Python生成隨機(jī)密碼

    這篇文章主要介紹了Python生成隨機(jī)密碼的代碼分享,由于是新手,僅僅是簡(jiǎn)單的實(shí)現(xiàn),未作任何其他處理,小伙伴們自己參考下吧。
    2015-03-03
  • pandas實(shí)現(xiàn)導(dǎo)出數(shù)據(jù)的四種方式

    pandas實(shí)現(xiàn)導(dǎo)出數(shù)據(jù)的四種方式

    這篇文章主要介紹了pandas實(shí)現(xiàn)導(dǎo)出數(shù)據(jù)的四種方式,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • 基于Python制作三款起床鬧鐘的示例代碼

    基于Python制作三款起床鬧鐘的示例代碼

    每天上班最痛苦的事情就是早起早起早起!這是大部分上班族的痛苦,但是不上班又是不可能的啦,因?yàn)槎际菫榱烁沐X。本文用Python制作了三款有趣的鬧鐘,感興趣的可以學(xué)習(xí)一下
    2022-05-05
  • TensorFlow2中提供的幾種處理特征列的方法小結(jié)

    TensorFlow2中提供的幾種處理特征列的方法小結(jié)

    本文主要介紹了TensorFlow2中提供的幾種處理特征列的方法小結(jié),主要介紹了6種方式,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-09-09
  • Python轉(zhuǎn)換時(shí)間的圖文方法

    Python轉(zhuǎn)換時(shí)間的圖文方法

    在本篇文章里小編給大家整理的是關(guān)于Python轉(zhuǎn)換時(shí)間的方法以及具體步驟流程,需要的朋友們參考下。
    2019-07-07
  • Python根據(jù)歐拉角求旋轉(zhuǎn)矩陣的實(shí)例

    Python根據(jù)歐拉角求旋轉(zhuǎn)矩陣的實(shí)例

    今天小編就為大家分享一篇Python根據(jù)歐拉角求旋轉(zhuǎn)矩陣的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-01-01
  • python下如何讓web元素的生成更簡(jiǎn)單的分析

    python下如何讓web元素的生成更簡(jiǎn)單的分析

    做web不簡(jiǎn)單,特別是當(dāng)你需要使用一些web效果的時(shí)候, 比如顯示個(gè)圓角矩形,提示框之類的,也許你認(rèn)為很簡(jiǎn)單,好讓我們分析一下:
    2008-07-07
  • python的re模塊應(yīng)用實(shí)例

    python的re模塊應(yīng)用實(shí)例

    這篇文章主要介紹了python的re模塊應(yīng)用實(shí)例,包括了常見(jiàn)的正則匹配技巧,需要的朋友可以參考下
    2014-09-09
  • Python特征降維知識(shí)點(diǎn)總結(jié)

    Python特征降維知識(shí)點(diǎn)總結(jié)

    在本篇文章里小編給大家整理了一篇關(guān)于Python特征降維知識(shí)點(diǎn)總結(jié)內(nèi)容,有需要的朋友們可以學(xué)習(xí)參考下。
    2021-08-08
  • 關(guān)于Python中的排列組合生成器詳解

    關(guān)于Python中的排列組合生成器詳解

    這篇文章主要介紹了關(guān)于Python中的排列組合生成器詳解,在Python的內(nèi)置模塊?functools中,提供了高階類?product()?,用于實(shí)現(xiàn)多個(gè)可迭代對(duì)象中元素的組合,返回可迭代對(duì)象中元素組合的笛卡爾積,效果相當(dāng)于嵌套的循環(huán),需要的朋友可以參考下
    2023-07-07

最新評(píng)論