欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python中Async語法協(xié)程的實現(xiàn)

 更新時間:2022年06月27日 11:44:56   作者:So1n  
這篇文章主要介紹了Python中Async語法協(xié)程的實現(xiàn),文章圍繞主題展開詳細的內容介紹,具有一定的參考價值,需要的小伙伴可以參考一下

前言

在io比較多的場景中, Async語法編寫的程序會以更少的時間, 更少的資源來完成相同的任務, 這篇文章則是介紹了PythonAsync語法的協(xié)程是如何實現(xiàn)的。

1.傳統(tǒng)的Sync語法請求例子

還是一樣, 在了解Async語法的實現(xiàn)之前, 先從一個Sync的語法例子開始, 現(xiàn)在假設有一個HTTP請求, 這個程序會通過這個請求獲取對應的響應內容, 并打印出來, 代碼如下:

import socket
def request(host: str) -> None:
    """模擬請求并打印響應體"""
    url: str = f"http://{host}"
    sock: socket.SocketType = socket.socket()
    sock.connect((host, 80))
    sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii"))

    response_bytes: bytes = b""
    chunk: bytes = sock.recv(4096)
    while chunk:
        response_bytes += chunk
        chunk = sock.recv(4096)
    print("\n".join([i for i in response_bytes.decode().split("\r\n")]))


if __name__ == "__main__":
    request("so1n.me")

運行程序, 程序能夠正常輸出, 上部分打印了對應的HTTP響應Header, 下部分打印了HTTP響應體, , 可以看到服務端叫我們以https的形式重新請求, 輸出結果如下:

HTTP/1.1 301 Moved Permanently
Server: GitHub.com
Content-Type: text/html
Location: https://so1n.me/
X-GitHub-Request-Id: A744:3871:4136AF:48BD9F:6188DB50
Content-Length: 162
Accept-Ranges: bytes
Date: Mon, 08 Nov 2021 08:11:37 GMT
Via: 1.1 varnish
Age: 104
Connection: close
X-Served-By: cache-qpg1272-QPG
X-Cache: HIT
X-Cache-Hits: 2
X-Timer: S1636359097.026094,VS0,VE0
Vary: Accept-Encoding
X-Fastly-Request-ID: 22fa337f777553d33503cee5282598c6a293fb5e

<html>
<head><title>301 Moved Permanently</title></head>
<body>
<center><h1>301 Moved Permanently</h1></center>
<hr><center>nginx</center>
</body>
</html>

不過這里并不是想說HTTP請求是如何實現(xiàn)的, 具體我也不太了解, 在這個代碼中, socket的默認調用是阻塞的, 當線程調用connect或者recv時(send是不用等待的, 但在高并發(fā)下需要先等待drain后才可以send, 小demo不需要用到drain方法), 程序將會暫停直到操作完成。 當一次要下載很多網頁的話, 這將會如上篇文章所說的一樣, 大部分的等待時間都花在io上面, cpu卻一直空閑時, 而使用線程池雖然可以解決這個問題, 但是開銷是很大的, 同時操作系統(tǒng)往往會限制一個進程,用戶或者機器可以使用的線程數(shù), 而協(xié)程卻沒有這些限制, 占用的資源少, 也沒有系統(tǒng)限制瓶頸。

2.異步的請求

異步可以讓一個單獨的線程處理并發(fā)的操作, 不過在上面已經說過了, socket是默認阻塞的, 所以需要把socket設置為非阻塞的, socket提供了setblocking這個方法供開發(fā)者選擇是否阻塞, 在設置了非阻塞后, connectrecv方法也要進行更改。

由于沒有了阻塞, 程序在調用了connect后會馬上返回, 只不過Python的底層是C, 這段代碼在C中調用非阻塞的socket.connect后會拋出一個異常, 我們需要捕獲它, 就像這樣:

import socket
sock: socket.SocketType = socket.socket()
sock.setblocking(Flase)
try:
    sock.connect(("so1n.me", 80))
except BlockingIOError:
    pass

經過一頓操作后, 就開始申請建立連接了, 但是我們還不知道連接啥時候完成建立, 由于連接沒建立時調用send會報錯, 所以可以一直輪詢調用send直到沒報錯就認為是成功(真實代碼需要加超時):

while True:
    try: 
        sock.send(request)
        break
    except OSError as e:
        pass

但是這樣讓CPU空轉太浪費性能了, 而且期間還不能做別的事情, 就像我們點外賣后一直打電話過去問飯菜做好了沒有, 十分浪費電話費用, 要是飯菜做完了就打電話告訴我們, 那就只產生了一筆費用, 非常的省錢(正常情況下也是這樣子)。
這時就需要事件循環(huán)登場了,在類UNIX中, 有一個叫select的功能, 它可以等待事件發(fā)生后再調用監(jiān)聽的函數(shù), 不過一開始的實現(xiàn)性能不是很好, 在Linux上被epoll取代, 不過接口是類似的, 所在在Python中把這幾個不同的事件循環(huán)都封裝在selectors庫中, 同時可以通過DefaultSelector從系統(tǒng)中挑出最好的類select函數(shù)。
這里先暫時不說事件循環(huán)的原理, 事件循環(huán)最主要的是他名字的兩部分, 一個是事件, 一個是循環(huán), 在Python中, 可以通過如下方法把事件注冊到事件循環(huán)中:

def demo(): pass
selector.register(fd, EVENT_WRITE, demo)

這樣這個事件循環(huán)就會監(jiān)聽對應的文件描述符fd, 當這個文件描述符觸發(fā)寫入事件(EVENT_WRITE)時,事件循環(huán)就會告訴我們可以去調用注冊的函數(shù)demo。不過如果把上面的代碼都改為這種方法去運行的話就會發(fā)現(xiàn), 程序好像沒跑就結束了, 但程序其實是有跑的, 只不過他們是完成的了注冊, 然后就等待開發(fā)者接收事件循環(huán)的事件進行下一步的操作, 所以我們只需要在代碼的最后面寫上如下代碼:

while True:
    for key, mask in selector.select():
        key.data()

這樣程序就會一直運行, 當捕獲到事件的時候, 就會通過for循環(huán)告訴我們, 其中key.data是我們注冊的回調函數(shù), 當事件發(fā)生時, 就會通知我們, 我們可以通過拿到回調函數(shù)然后就運行, 了解完畢后, 我們可以來編寫我們的第一個并發(fā)程序, 他實現(xiàn)了一個簡單的I/O復用的小邏輯, 代碼和注釋如下:

import socket
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
# 選擇事件循環(huán)
selector: DefaultSelector = DefaultSelector()
# 用于判斷是否有事件在運行
running_cnt: int = 0
def request(host: str) -> None:
    """模擬請求并打印響應體"""
    # 告訴主函數(shù), 自己的事件還在運行
    global running_cnt
    running_cnt += 1
    
    # 初始化socket
    url: str = f"http://{host}"
    sock: socket.SocketType = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect((host, 80))
    except BlockingIOError:
        pass

    response_bytes: bytes = b""

    def read_response() -> None:
        """接收響應參數(shù), 并判斷請求是否結束"""
        nonlocal response_bytes
        chunk: bytes = sock.recv(4096)
        print(f"recv {host} body success")
        if chunk:
            response_bytes += chunk
        else:
            # 沒有數(shù)據(jù)代表請求結束了, 注銷監(jiān)聽
            selector.unregister(sock.fileno())
            global running_cnt
            running_cnt -= 1
    def connected() -> None:
        """socket建立連接時的回調"""
        # 取消監(jiān)聽
        selector.unregister(sock.fileno())
        print(f"{host} connect success")
        # 發(fā)送請求, 并監(jiān)聽讀事件, 以及注冊對應的接收響應函數(shù)
        sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii"))
        selector.register(sock.fileno(), EVENT_READ, read_response)
    selector.register(sock.fileno(), EVENT_WRITE, connected)
if __name__ == "__main__":
    # 同時多個請求
    request("so1n.me")
    request("github.com")
    request("google.com")
    request("baidu.com")
    # 監(jiān)聽是否有事件在運行
    while running_cnt > 0:
        # 等待事件循環(huán)通知事件是否已經完成
        for key, mask in selector.select():
            key.data()

這段代碼接近同時注冊了4個請求并注冊建立連接回調, 然后就進入事件循環(huán)邏輯, 也就是把控制權交給事件循環(huán), 直到事件循環(huán)告訴程序說收到了socket建立的通知, 程序就會取消注冊的回調然后發(fā)送請求, 并注冊一個讀的事件回調, 然后又把控制權交給事件循環(huán), 直到收到了響應的結果才進入處理響應結果函數(shù)并且只有收完所有響應結果才會退出程序。

下面是我其中的一次執(zhí)行結果

so1n.me connect success
github.com connect success
google.com connect success
recv google.com body success
recv google.com body success
baidu.com connect success
recv github.com body success
recv github.com body success
recv baidu.com body success
recv baidu.com body success
recv so1n.me body success
recv so1n.me body success

可以看到他們的執(zhí)行順序是隨機的, 不是嚴格的按照so1n.megithub.comgoogle.combaidu.com順序執(zhí)行, 同時他們執(zhí)行速度很快, 這個程序的耗時約等于響應時長最長的函數(shù)耗時。
但是可以看出, 這個程序里面出現(xiàn)了兩個回調, 回調會讓代碼變得非常的奇怪, 降低可讀性, 也容易造成回調地獄, 而且當回調發(fā)生報錯的時候, 我們是很難知道這是由于什么導致的錯誤, 因為它的上下文丟失了, 這樣子排查問題十分的困惑。 作為程序員, 一般都不止?jié)M足于速度快的代碼, 真正想要的是又快, 又能像Sync的代碼一樣簡單, 可讀性強, 也能容易排查問題的代碼, 這種組合形式的代碼的設計模式就叫協(xié)程。

協(xié)程出現(xiàn)得很早, 它不像線程一樣, 被系統(tǒng)調度, 而是能自主的暫停, 并等待事件循環(huán)通知恢復。由于協(xié)程是軟件層面實現(xiàn)的, 所以它的實現(xiàn)方式有很多種, 這里要說的是基于生成器的協(xié)程, 因為生成器跟協(xié)程一樣, 都有暫停讓步和恢復的方法(還可以通過throw來拋錯), 同時它跟Async語法的協(xié)程很像, 通過了解基于生成器的協(xié)程, 可以了解Async的協(xié)程是如何實現(xiàn)的。

3.基于生成器的協(xié)程

3.1生成器

在了解基于生成器的協(xié)程之前, 需要先了解下生成器, Python的生成器函數(shù)與普通的函數(shù)會有一些不同, 只有普通函數(shù)中帶有關鍵字yield, 那么它就是生成器函數(shù), 具體有什么不同可以通過他們的字節(jié)碼來了解:

In [1]: import dis

# 普通函數(shù)
In [2]: def aaa(): pass

In [3]: dis.dis(aaa)

  1           0 LOAD_CONST               0 (None)
              2 RETURN_VALUE

# 普通函數(shù)調用函數(shù)
In [4]: def bbb(): 
   ...:     aaa() 
   ...:


In [5]: dis.dis(bbb)

  2           0 LOAD_GLOBAL              0 (aaa)
              2 CALL_FUNCTION            0
              4 POP_TOP
              6 LOAD_CONST               0 (None)
              8 RETURN_VALUE

# 普通生成器函數(shù)
In [6]: def ccc(): yield

In [7]: dis.dis(ccc)

  1           0 LOAD_CONST               0 (None)
              2 YIELD_VALUE
              4 POP_TOP
              6 LOAD_CONST               0 (None)
              8 RETURN_VALUE

上面分別是普通函數(shù), 普通函數(shù)調用函數(shù)和普通生成器函數(shù)的字節(jié)碼, 從字節(jié)碼可以看出來, 最簡單的函數(shù)只需要LOAD_CONST來加載變量None壓入自己的棧, 然后通過RETURN_VALUE來返回值, 而有函數(shù)調用的普通函數(shù)則先加載變量, 把全局變量的函數(shù)aaa加載到自己的棧里面, 然后通過CALL_FUNCTION來調用函數(shù), 最后通過POP_TOP把函數(shù)的返回值從棧里拋出來, 再把通過LOAD_CONST把None壓入自己的棧, 最后返回值。
而生成器函數(shù)則不一樣, 它會先通過LOAD_CONST來加載變量None壓入自己的棧, 然后通過YIELD_VALUE返回值, 接著通過POP_TOP彈出剛才的棧并重新把變量None壓入自己的棧, 最后通過RETURN_VALUE來返回值。從字節(jié)碼來分析可以很清楚的看到, 生成器能夠在yield區(qū)分兩個棧幀, 一個函數(shù)調用可以分為多次返回, 很符合協(xié)程多次等待的特點。

接著來看看生成器的一個使用, 這個生成器會有兩次yield調用, 并在最后返回字符串'None', 代碼如下:

In [8]: def demo(): 
   ...:     a = 1 
   ...:     b = 2 
   ...:     print('aaa', locals()) 
   ...:     yield 1 
   ...:     print('bbb', locals()) 
   ...:     yield 2 
   ...:     return 'None' 
   ...:

In [9]: demo_gen = demo()

In [10]: demo_gen.send(None)

aaa {'a': 1, 'b': 2}
Out[10]: 1

In [11]: demo_gen.send(None)

bbb {'a': 1, 'b': 2}
Out[11]: 2

In [12]: demo_gen.send(None)

---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-12-8f8cb075d6af> in <module>
----> 1 demo_gen.send(None)
StopIteration: None

這段代碼首先通過函數(shù)調用生成一個demo_gen的生成器對象, 然后第一次send調用時返回值1, 第二次send調用時返回值2, 第三次send調用則拋出StopIteration異常, 異常提示為None, 同時可以看到第一次打印aaa和第二次打印bbb時, 他們都能打印到當前的函數(shù)局部變量, 可以發(fā)現(xiàn)在即使在不同的棧幀中, 他們讀取到當前的局部函數(shù)內的局部變量是一致的, 這意味著如果使用生成器來模擬協(xié)程時, 它還是會一直讀取到當前上下文的, 非常的完美。

此外, Python還支持通過yield from語法來返回一個生成器, 代碼如下:

In [1]: def demo_gen_1(): 
   ...:     for i in range(3): 
   ...:         yield i 
   ...:

In [2]: def demo_gen_2(): 
   ...:     yield from demo_gen_1() 
   ...:

In [3]: demo_gen_obj = demo_gen_2()

In [4]: demo_gen_obj.send(None)

Out[4]: 0

In [5]: demo_gen_obj.send(None)

Out[5]: 1

In [6]: demo_gen_obj.send(None)

Out[6]: 2

In [7]: demo_gen_obj.send(None)

---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-7-f9922a2f64c9> in <module>
----> 1 demo_gen_obj.send(None)
StopIteration: 

通過yield from就可以很方便的支持生成器調用, 假如把每個生成器函數(shù)都當做一個協(xié)程, 那通過yield from就可以很方便的實現(xiàn)協(xié)程間的調用, 此外生成器的拋出異常后的提醒非常人性化, 也支持throw來拋出異常, 這樣我們就可以實現(xiàn)在協(xié)程運行時設置異常, 比如Cancel,演示代碼如下:

In [1]: def demo_exc(): 
   ...:     yield 1 
   ...:     raise RuntimeError() 
   ...:

In [2]: def demo_exc_1(): 
   ...:     for i in range(3): 
   ...:         yield i 
   ...:

In [3]: demo_exc_gen = demo_exc()

In [4]: demo_exc_gen.send(None)

Out[4]: 1

In [5]: demo_exc_gen.send(None)

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-5-09fbb75fdf7d> in <module>
----> 1 demo_exc_gen.send(None)

<ipython-input-1-69afbc1f9c19> in demo_exc()
      1 def demo_exc():
      2     yield 1
----> 3     raise RuntimeError()
      4 

RuntimeError: 

In [6]: demo_exc_gen_1 = demo_exc_1()

In [7]: demo_exc_gen_1.send(None)
                                                                                                                                        Out[7]: 0
In [8]: demo_exc_gen_1.send(None)
                                                                                                                                        Out[8]: 1
In [9]: demo_exc_gen_1.throw(RuntimeError)
                                                                                                                                        ---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-9-1a1cc55d71f4> in <module>
----> 1 demo_exc_gen_1.throw(RuntimeError)

<ipython-input-2-2617b2366dce> in demo_exc_1()
      1 def demo_exc_1():
      2     for i in range(3):
----> 3         yield i
      4 
RuntimeError: 

從中可以看到在運行中拋出異常時, 會有一個非常清楚的拋錯, 可以明顯看出錯誤堆棧, 同時throw指定異常后, 會在下一處yield拋出異常(所以協(xié)程調用Cancel后不會馬上取消, 而是下一次調用的時候才被取消)。

3.2用生成器實現(xiàn)協(xié)程

我們已經簡單的了解到了生成器是非常的貼合協(xié)程的編程模型, 同時也知道哪些生成器API是我們需要的API, 接下來可以模仿Asyncio的接口來實現(xiàn)一個簡單的協(xié)程。

首先是在Asyncio中有一個封裝叫Feature, 它用來表示協(xié)程正在等待將來時的結果, 以下是我根據(jù)asyncio.Feature封裝的一個簡單的Feature, 它的API沒有asyncio.Feature全, 代碼和注釋如下:

class Status:
    """用于判斷Future狀態(tài)"""
    pending: int = 1
    finished: int = 2
    cancelled: int = 3
class Future(object):

    def __init__(self) -> None:
        """初始化時, Feature處理pending狀態(tài), 等待set result"""
        self.status: int = Status.pending
        self._result: Any = None
        self._exception: Optional[Exception] = None
        self._callbacks: List[Callable[['Future'], None]] = []

    def add_done_callback(self, fn: [['Future'], None]Callable) -> None:
        """添加完成時的回調"""
        self._callbacks.append(fn)

    def cancel(self):
        """取消當前的Feature"""
        if self.status != Status.pending:
            return False
        self.status = Status.cancelled
        for fn in self._callbacks:
            fn(self)
        return True

    def set_exception(self, exc: Exception) -> None:
        """設置異常"""
        if self.status != Status.pending:
            raise RuntimeError("Can not set exc")
        self._exception = exc
        self.status = Status.finished

    def set_result(self, result: Any) -> None:
        """設置結果"""
        if self.status != Status.pending:
            raise RuntimeError("Can not set result")
        self.status = Status.finished
        self._result = result
        for fn in self._callbacks:
            fn(self)

    def result(self):
        """獲取結果"""
        if self.status == Status.cancelled:
            raise asyncio.CancelledError
        elif self.status != Status.finished:
            raise RuntimeError("Result is not read")
        elif self._exception is not None:
            raise self._exception
        return self._result
    def __iter__(self):
        """通過生成器來模擬協(xié)程, 當收到結果通知時, 會返回結果"""
        if self.status == Status.pending:
            yield self
        return self.result()

在理解Future時, 可以把它假想為一個狀態(tài)機, 在啟動初始化的時候是peding狀態(tài), 在運行的時候我們可以切換它的狀態(tài), 并且通過__iter__方法來支持調用者使用yield from Future()來等待Future本身, 直到收到了事件通知時, 可以得到結果。

但是可以發(fā)現(xiàn)這個Future是無法自我驅動, 調用了__iter__的程序不知道何時被調用了set_result, 在Asyncio中是通過一個叫Task的類來驅動Future, 它將一個協(xié)程的執(zhí)行過程安排好, 并負責在事件循環(huán)中執(zhí)行該協(xié)程。它主要有兩個方法:

  • 1.初始化時, 會先通過send方法激活生成器
  • 2.后續(xù)被調度后馬上安排下一次等待, 除非拋出StopIteration異常

還有一個支持取消運行托管協(xié)程的方法(在原代碼中, Task是繼承于Future, 所以Future有的它都有), 經過簡化后的代碼如下:

class Task:
    def __init__(self, coro: Generator) -> None:
        # 初始化狀態(tài)
        self.cancelled: bool = False
        self.coro: Generator = coro
        # 預激一個普通的future
        f: Future = Future()
        f.set_result(None)
        self.step(f)

    def cancel(self) -> None:
        """用于取消托管的coro"""
        self.coro.throw(asyncio.CancelledError)

    def step(self, f: Future) -> None:
        """用于調用coro的下一步, 從第一次激活開始, 每次都添加完成時的回調, 直到遇到取消或者StopIteration異常"""
        try:
            _future = self.coro.send(f.result())
        except asyncio.CancelledError:
            self.cancelled = True
            return
        except StopIteration:
            return
        _future.add_done_callback(self.step)

這樣FutureTask就封裝好了, 可以簡單的試一試效果如何:

In [2]:def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]: 
   ...:    result = yield from f 
   ...:    print(flag_int, result) 
   ...:
   ...:future: Future = Future() 
   ...:for i in range(3): 
   ...:    coro = wait_future(future, i) 
   ...:    # 托管wait_future這個協(xié)程, 里面的Future也會通過yield from被托管 
   ...:    Task(coro) 
   ...:
   ...:print('ready') 
   ...:future.set_result('ok') 
   ...:
   ...:future = Future() 
   ...:Task(wait_future(future, 3)).cancel() 
   ...: 
                                                                                                                                        ready
0 ok
1 ok
2 ok
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
<ipython-input-2-2d1b04db2604> in <module>
     12 
     13 future = Future()
---> 14 Task(wait_future(future, 3)).cancel()

<ipython-input-1-ec3831082a88> in cancel(self)
     81 
     82     def cancel(self) -> None:
---> 83         self.coro.throw(asyncio.CancelledError)
     84 
     85     def step(self, f: Future) -> None:

<ipython-input-2-2d1b04db2604> in wait_future(f, flag_int)
      1 def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]:
----> 2     result = yield from f
      3     print(flag_int, result)
      4 
      5 future: Future = Future()

<ipython-input-1-ec3831082a88> in __iter__(self)
     68         """通過生成器來模擬協(xié)程, 當收到結果通知時, 會返回結果"""
     69         if self.status == Status.pending:
---> 70             yield self
     71         return self.result()
     72 

CancelledError: 

這段程序會先初始化Future, 并把Future傳給wait_future并生成生成器, 再交由給Task托管, 預激, 由于Future是在生成器函數(shù)wait_future中通過yield from與函數(shù)綁定的, 真正被預激的其實是Future__iter__方法中的yield self, 此時代碼邏輯會暫停在yield self并返回。
在全部預激后, 通過調用Futureset_result方法, 使Future變?yōu)榻Y束狀態(tài), 由于set_result會執(zhí)行注冊的回調, 這時它就會執(zhí)行托管它的Taskstep方法中的send方法, 代碼邏輯回到Future__iter__方法中的yield self, 并繼續(xù)往下走, 然后遇到return返回結果, 并繼續(xù)走下去, 從輸出可以發(fā)現(xiàn)程序封裝完成且打印了ready后, 會依次打印對應的返回結果, 而在最后一個的測試cancel方法中可以看到,Future拋出異常了, 同時這些異常很容易看懂, 能夠追隨到調用的地方。

現(xiàn)在FutureTask正常運行了, 可以跟我們一開始執(zhí)行的程序進行整合, 代碼如下:

class HttpRequest(object):
    def __init__(self, host: str):
        """初始化變量和sock"""
        self._host: str = host
        global running_cnt
        running_cnt += 1
        self.url: str = f"http://{host}"
        self.sock: socket.SocketType = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect((host, 80))
        except BlockingIOError:
            pass
    def read(self) -> Generator[Future, None, bytes]:
        """從socket獲取響應數(shù)據(jù), 并set到Future中, 并通過Future.__iter__方法或得到數(shù)據(jù)并通過變量chunk_future返回"""
        f: Future = Future()
        selector.register(self.sock.fileno(), EVENT_READ, lambda: f.set_result(self.sock.recv(4096)))
        chunk_future = yield from f
        selector.unregister(self.sock.fileno())
        return chunk_future  # type: ignore

    def read_response(self) -> Generator[Future, None, bytes]:
        """接收響應參數(shù), 并判斷請求是否結束"""
        response_bytes: bytes = b""
        chunk = yield from self.read()
        while chunk:
            response_bytes += chunk
            chunk = yield from self.read()
        return response_bytes

    def connected(self) -> Generator[Future, None, None]:
        """socket建立連接時的回調"""
        # 取消監(jiān)聽
        f: Future = Future()
        selector.register(self.sock.fileno(), EVENT_WRITE, lambda: f.set_result(None))
        yield f
        selector.unregister(self.sock.fileno())
        print(f"{self._host} connect success")
    def request(self) -> Generator[Future, None, None]:
        # 發(fā)送請求, 并監(jiān)聽讀事件, 以及注冊對應的接收響應函數(shù)
        yield from self.connected()
        self.sock.send(f"GET {self.url} HTTP/1.0\r\nHost: {self._host}\r\n\r\n".encode("ascii"))
        response = yield from self.read_response()
        print(f"request {self._host} success, length:{len(response)}")
        global running_cnt
        running_cnt -= 1
if __name__ == "__main__":
    # 同時多個請求
    Task(HttpRequest("so1n.me").request())
    Task(HttpRequest("github.com").request())
    Task(HttpRequest("google.com").request())
    Task(HttpRequest("baidu.com").request())
    # 監(jiān)聽是否有事件在運行
    while running_cnt > 0:
        # 等待事件循環(huán)通知事件是否已經完成
        for key, mask in selector.select():
            key.data()

這段代碼通過Future和生成器方法盡量的解耦回調函數(shù), 如果忽略了HttpRequest中的connectedread方法則可以發(fā)現(xiàn)整段代碼跟同步的代碼基本上是一樣的, 只是通過yieldyield from交出控制權和通過事件循環(huán)恢復控制權。 同時通過上面的異常例子可以發(fā)現(xiàn)異常排查非常的方便, 這樣一來就沒有了回調的各種糟糕的事情, 開發(fā)者只需要按照同步的思路進行開發(fā)即可, 不過我們的事件循環(huán)是一個非常簡單的事件循環(huán)例子, 同時對于socket相關都沒有進行封裝, 也缺失一些常用的API, 而這些都會被Python官方封裝到Asyncio這個庫中, 通過該庫, 我們可以近乎完美的編寫Async語法的代碼。

NOTE: 由于生成器協(xié)程中無法通過yield from語法使用生成器, 所以Python在3.5之后使用了Await的原生協(xié)程。

到此這篇關于Python中Async語法協(xié)程的實現(xiàn)的文章就介紹到這了,更多相關Python協(xié)程內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Python入門篇之面向對象

    Python入門篇之面向對象

    面向對象設計(OOD)不會特別要求面向對象編程語言。事實上OOD可以由純結構化語言來實現(xiàn),比如C,但如果想要構造具備對象性質和特點的數(shù)據(jù)類型,就需要在程序上作更多的努力。當一門語言內建OO特性,OO編程開發(fā)就會更加方便高效。
    2014-10-10
  • 利用python操作SQLite數(shù)據(jù)庫及文件操作詳解

    利用python操作SQLite數(shù)據(jù)庫及文件操作詳解

    這篇文章主要給大家介紹了關于利用python操作SQLite數(shù)據(jù)庫及文件操作的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧。
    2017-09-09
  • python字典序問題實例

    python字典序問題實例

    這篇文章主要介紹了python字典序問題,是字符串操作一個比較典型的應用,需要的朋友可以參考下
    2014-09-09
  • python動態(tài)進度條的實現(xiàn)代碼

    python動態(tài)進度條的實現(xiàn)代碼

    有時候我們需要使用print打印工作進度,正常使用print函數(shù)會導致刷屏的現(xiàn)象,本文通過實例代碼給大家介紹python動態(tài)進度條的實現(xiàn)方法,感興趣的朋友跟隨小編一起看看吧
    2019-07-07
  • 帶你徹底搞懂python操作mysql數(shù)據(jù)庫(cursor游標講解)

    帶你徹底搞懂python操作mysql數(shù)據(jù)庫(cursor游標講解)

    這篇文章主要介紹了帶你徹底搞懂python操作mysql數(shù)據(jù)庫(cursor游標講解),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-01-01
  • Pycharm安裝第三方庫的超詳細步驟

    Pycharm安裝第三方庫的超詳細步驟

    使用python時,為了提高效率,安裝添加第三方庫是必不可少的,下面這篇文章主要給大家介紹了關于Pycharm安裝第三方庫的相關資料,需要的朋友可以參考下
    2022-04-04
  • python實現(xiàn)簡單登陸流程的方法

    python實現(xiàn)簡單登陸流程的方法

    下面小編就為大家分享一篇python實現(xiàn)簡單登陸流程的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-04-04
  • pytorch加載語音類自定義數(shù)據(jù)集的方法教程

    pytorch加載語音類自定義數(shù)據(jù)集的方法教程

    這篇文章主要給大家介紹了關于pytorch加載語音類自定義數(shù)據(jù)集的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-11-11
  • 使用python如何提取JSON數(shù)據(jù)指定內容

    使用python如何提取JSON數(shù)據(jù)指定內容

    這篇文章主要介紹了使用python如何提取JSON數(shù)據(jù)指定內容,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-07-07
  • 使用Python實現(xiàn)圖像標記點的坐標輸出功能

    使用Python實現(xiàn)圖像標記點的坐標輸出功能

    這篇文章主要介紹了使用Python實現(xiàn)圖像標記點的坐標輸出功能,非常不錯,具有一定的參考借鑒價值,需要的朋友參考下吧
    2019-08-08

最新評論