欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python 使用生成器代替線程的方法

 更新時間:2020年08月04日 11:54:47   作者:David Beazley  
這篇文章主要介紹了Python 使用生成器代替線程的方法,文中講解非常細致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下

問題

你想使用生成器(協(xié)程)替代系統(tǒng)線程來實現(xiàn)并發(fā)。這個有時又被稱為用戶級線程或綠色線程。

解決方案

要使用生成器實現(xiàn)自己的并發(fā),你首先要對生成器函數(shù)和 yield 語句有深刻理解。 yield 語句會讓一個生成器掛起它的執(zhí)行,這樣就可以編寫一個調(diào)度器, 將生成器當(dāng)做某種“任務(wù)”并使用任務(wù)協(xié)作切換來替換它們的執(zhí)行。 要演示這種思想,考慮下面兩個使用簡單的 yield 語句的生成器函數(shù):

# Two simple generator functions
def countdown(n):
  while n > 0:
    print('T-minus', n)
    yield
    n -= 1
  print('Blastoff!')

def countup(n):
  x = 0
  while x < n:
    print('Counting up', x)
    yield
    x += 1

這些函數(shù)在內(nèi)部使用yield語句,下面是一個實現(xiàn)了簡單任務(wù)調(diào)度器的代碼:

from collections import deque

class TaskScheduler:
  def __init__(self):
    self._task_queue = deque()

  def new_task(self, task):
    '''
    Admit a newly started task to the scheduler
    '''
    self._task_queue.append(task)

  def run(self):
    '''
    Run until there are no more tasks
    '''
    while self._task_queue:
      task = self._task_queue.popleft()
      try:
        # Run until the next yield statement
        next(task)
        self._task_queue.append(task)
      except StopIteration:
        # Generator is no longer executing
        pass

# Example use
sched = TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()

TaskScheduler 類在一個循環(huán)中運行生成器集合——每個都運行到碰到y(tǒng)ield語句為止。 運行這個例子,輸出如下:

T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2
...

到此為止,我們實際上已經(jīng)實現(xiàn)了一個“操作系統(tǒng)”的最小核心部分。 生成器函數(shù)就是任務(wù),而yield語句是任務(wù)掛起的信號。 調(diào)度器循環(huán)檢查任務(wù)列表直到?jīng)]有任務(wù)要執(zhí)行為止。

實際上,你可能想要使用生成器來實現(xiàn)簡單的并發(fā)。 那么,在實現(xiàn)actor或網(wǎng)絡(luò)服務(wù)器的時候你可以使用生成器來替代線程的使用。

下面的代碼演示了使用生成器來實現(xiàn)一個不依賴線程的actor:

from collections import deque

class ActorScheduler:
  def __init__(self):
    self._actors = {}     # Mapping of names to actors
    self._msg_queue = deque()  # Message queue

  def new_actor(self, name, actor):
    '''
    Admit a newly started actor to the scheduler and give it a name
    '''
    self._msg_queue.append((actor,None))
    self._actors[name] = actor

  def send(self, name, msg):
    '''
    Send a message to a named actor
    '''
    actor = self._actors.get(name)
    if actor:
      self._msg_queue.append((actor,msg))

  def run(self):
    '''
    Run as long as there are pending messages.
    '''
    while self._msg_queue:
      actor, msg = self._msg_queue.popleft()
      try:
         actor.send(msg)
      except StopIteration:
         pass

# Example use
if __name__ == '__main__':
  def printer():
    while True:
      msg = yield
      print('Got:', msg)

  def counter(sched):
    while True:
      # Receive the current count
      n = yield
      if n == 0:
        break
      # Send to the printer task
      sched.send('printer', n)
      # Send the next count to the counter task (recursive)
      sched.send('counter', n-1)

  sched = ActorScheduler()
  # Create the initial actors
  sched.new_actor('printer', printer())
  sched.new_actor('counter', counter(sched))

  # Send an initial message to the counter to initiate
  sched.send('counter', 10000)
  sched.run()

完全弄懂這段代碼需要更深入的學(xué)習(xí),但是關(guān)鍵點在于收集消息的隊列。 本質(zhì)上,調(diào)度器在有需要發(fā)送的消息時會一直運行著。 計數(shù)生成器會給自己發(fā)送消息并在一個遞歸循環(huán)中結(jié)束。

下面是一個更加高級的例子,演示了使用生成器來實現(xiàn)一個并發(fā)網(wǎng)絡(luò)應(yīng)用程序:

from collections import deque
from select import select

# This class represents a generic yield event in the scheduler
class YieldEvent:
  def handle_yield(self, sched, task):
    pass

  def handle_resume(self, sched, task):
    pass

# Task Scheduler
class Scheduler:
  def __init__(self):
    self._numtasks = 0    # Total num of tasks
    self._ready = deque()  # Tasks ready to run
    self._read_waiting = {} # Tasks waiting to read
    self._write_waiting = {} # Tasks waiting to write

  # Poll for I/O events and restart waiting tasks
  def _iopoll(self):
    rset,wset,eset = select(self._read_waiting,
                self._write_waiting,[])
    for r in rset:
      evt, task = self._read_waiting.pop(r)
      evt.handle_resume(self, task)
    for w in wset:
      evt, task = self._write_waiting.pop(w)
      evt.handle_resume(self, task)

  def new(self,task):
    '''
    Add a newly started task to the scheduler
    '''
    self._ready.append((task, None))
    self._numtasks += 1

  def add_ready(self, task, msg=None):
    '''
    Append an already started task to the ready queue.
    msg is what to send into the task when it resumes.
    '''
    self._ready.append((task, msg))

  # Add a task to the reading set
  def _read_wait(self, fileno, evt, task):
    self._read_waiting[fileno] = (evt, task)

  # Add a task to the write set
  def _write_wait(self, fileno, evt, task):
    self._write_waiting[fileno] = (evt, task)

  def run(self):
    '''
    Run the task scheduler until there are no tasks
    '''
    while self._numtasks:
       if not self._ready:
         self._iopoll()
       task, msg = self._ready.popleft()
       try:
         # Run the coroutine to the next yield
         r = task.send(msg)
         if isinstance(r, YieldEvent):
           r.handle_yield(self, task)
         else:
           raise RuntimeError('unrecognized yield event')
       except StopIteration:
         self._numtasks -= 1

# Example implementation of coroutine-based socket I/O
class ReadSocket(YieldEvent):
  def __init__(self, sock, nbytes):
    self.sock = sock
    self.nbytes = nbytes
  def handle_yield(self, sched, task):
    sched._read_wait(self.sock.fileno(), self, task)
  def handle_resume(self, sched, task):
    data = self.sock.recv(self.nbytes)
    sched.add_ready(task, data)

class WriteSocket(YieldEvent):
  def __init__(self, sock, data):
    self.sock = sock
    self.data = data

  def handle_yield(self, sched, task):
    sched._write_wait(self.sock.fileno(), self, task)

  def handle_resume(self, sched, task):
    nsent = self.sock.send(self.data)
    sched.add_ready(task, nsent)

class AcceptSocket(YieldEvent):
  def __init__(self, sock):
    self.sock = sock

  def handle_yield(self, sched, task):
    sched._read_wait(self.sock.fileno(), self, task)

  def handle_resume(self, sched, task):
    r = self.sock.accept()
    sched.add_ready(task, r)

# Wrapper around a socket object for use with yield
class Socket(object):
  def __init__(self, sock):
    self._sock = sock

  def recv(self, maxbytes):
    return ReadSocket(self._sock, maxbytes)

  def send(self, data):
    return WriteSocket(self._sock, data)

  def accept(self):
    return AcceptSocket(self._sock)

  def __getattr__(self, name):
    return getattr(self._sock, name)

if __name__ == '__main__':
  from socket import socket, AF_INET, SOCK_STREAM
  import time

  # Example of a function involving generators. This should
  # be called using line = yield from readline(sock)
  def readline(sock):
    chars = []
    while True:
      c = yield sock.recv(1)
      if not c:
        break
      chars.append(c)
      if c == b'\n':
        break
    return b''.join(chars)

  # Echo server using generators
  class EchoServer:
    def __init__(self,addr,sched):
      self.sched = sched
      sched.new(self.server_loop(addr))

    def server_loop(self,addr):
      s = Socket(socket(AF_INET,SOCK_STREAM))

      s.bind(addr)
      s.listen(5)
      while True:
        c,a = yield s.accept()
        print('Got connection from ', a)
        self.sched.new(self.client_handler(Socket(c)))

    def client_handler(self,client):
      while True:
        line = yield from readline(client)
        if not line:
          break
        line = b'GOT:' + line
        while line:
          nsent = yield client.send(line)
          line = line[nsent:]
      client.close()
      print('Client closed')

  sched = Scheduler()
  EchoServer(('',16000),sched)
  sched.run()

這段代碼有點復(fù)雜。不過,它實現(xiàn)了一個小型的操作系統(tǒng)。 有一個就緒的任務(wù)隊列,并且還有因I/O休眠的任務(wù)等待區(qū)域。 還有很多調(diào)度器負責(zé)在就緒隊列和I/O等待區(qū)域之間移動任務(wù)。

討論

在構(gòu)建基于生成器的并發(fā)框架時,通常會使用更常見的yield形式:

def some_generator():
  ...
  result = yield data
  ...

使用這種形式的yield語句的函數(shù)通常被稱為“協(xié)程”。 通過調(diào)度器,yield語句在一個循環(huán)中被處理,如下:

f = some_generator()

# Initial result. Is None to start since nothing has been computed
result = None
while True:
  try:
    data = f.send(result)
    result = ... do some calculation ...
  except StopIteration:
    break

這里的邏輯稍微有點復(fù)雜。不過,被傳給 send() 的值定義了在yield語句醒來時的返回值。 因此,如果一個yield準(zhǔn)備在對之前yield數(shù)據(jù)的回應(yīng)中返回結(jié)果時,會在下一次 send() 操作返回。 如果一個生成器函數(shù)剛開始運行,發(fā)送一個None值會讓它排在第一個yield語句前面。

除了發(fā)送值外,還可以在一個生成器上面執(zhí)行一個 close() 方法。 它會導(dǎo)致在執(zhí)行yield語句時拋出一個 GeneratorExit 異常,從而終止執(zhí)行。 如果進一步設(shè)計,一個生成器可以捕獲這個異常并執(zhí)行清理操作。 同樣還可以使用生成器的 throw() 方法在yield語句執(zhí)行時生成一個任意的執(zhí)行指令。 一個任務(wù)調(diào)度器可利用它來在運行的生成器中處理錯誤。

最后一個例子中使用的 yield from 語句被用來實現(xiàn)協(xié)程,可以被其它生成器作為子程序或過程來調(diào)用。 本質(zhì)上就是將控制權(quán)透明的傳輸給新的函數(shù)。 不像普通的生成器,一個使用 yield from 被調(diào)用的函數(shù)可以返回一個作為 yield from 語句結(jié)果的值。 關(guān)于 yield from 的更多信息可以在 PEP 380 中找到。

最后,如果使用生成器編程,要提醒你的是它還是有很多缺點的。 特別是,你得不到任何線程可以提供的好處。例如,如果你執(zhí)行CPU依賴或I/O阻塞程序, 它會將整個任務(wù)掛起直到操作完成。為了解決這個問題, 你只能選擇將操作委派給另外一個可以獨立運行的線程或進程。 另外一個限制是大部分Python庫并不能很好的兼容基于生成器的線程。 如果你選擇這個方案,你會發(fā)現(xiàn)你需要自己改寫很多標(biāo)準(zhǔn)庫函數(shù)。 作為本節(jié)提到的協(xié)程和相關(guān)技術(shù)的一個基礎(chǔ)背景,可以查看 PEP 342 和 “協(xié)程和并發(fā)的一門有趣課程

PEP 3156 同樣有一個關(guān)于使用協(xié)程的異步I/O模型。 特別的,你不可能自己去實現(xiàn)一個底層的協(xié)程調(diào)度器。 不過,關(guān)于協(xié)程的思想是很多流行庫的基礎(chǔ), 包括 gevent, greenlet, Stackless Python 以及其他類似工程。

以上就是Python 使用生成器代替線程的方法的詳細內(nèi)容,更多關(guān)于Python 生成器代替線程的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 淺談Django的緩存機制

    淺談Django的緩存機制

    這篇文章主要介紹了淺談Django的緩存機制,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-08-08
  • pyspark 隨機森林的實現(xiàn)

    pyspark 隨機森林的實現(xiàn)

    這篇文章主要介紹了pyspark 隨機森林的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-04-04
  • 詳解Python中xlwt庫的基本操作

    詳解Python中xlwt庫的基本操作

    xlwt 是一個用于在Python中操作Excel文件的庫,它允許用戶創(chuàng)建、修改和寫入Excel文件,本文主要為大家介紹了xlwt庫的一些基本操作,需要的可以參考一下
    2023-11-11
  • Python項目跨域問題解決方案

    Python項目跨域問題解決方案

    這篇文章主要介紹了Python項目跨域問題解決方案,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-06-06
  • Pandas實現(xiàn)Excel文件讀取,增刪,打開,保存操作

    Pandas實現(xiàn)Excel文件讀取,增刪,打開,保存操作

    Pandas?是一種基于?NumPy?的開源數(shù)據(jù)分析工具,用于處理和分析大量數(shù)據(jù)。本文將通過Pandas實現(xiàn)對Excel文件進行讀取、增刪、打開、保存等操作,需要的可以參考一下
    2023-04-04
  • python神經(jīng)網(wǎng)絡(luò)Densenet模型復(fù)現(xiàn)詳解

    python神經(jīng)網(wǎng)絡(luò)Densenet模型復(fù)現(xiàn)詳解

    這篇文章主要為大家介紹了python神經(jīng)網(wǎng)絡(luò)Densenet模型復(fù)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-05-05
  • 5行Python代碼實現(xiàn)一鍵批量扣圖

    5行Python代碼實現(xiàn)一鍵批量扣圖

    在日常生活或者工作中,經(jīng)常會遇到想將某張照片中的人物摳出來,本文就介紹了Python代碼實現(xiàn)一鍵批量扣圖,感興趣的可以了解一下
    2021-06-06
  • 使用pytorch進行圖像的順序讀取方法

    使用pytorch進行圖像的順序讀取方法

    今天小編就為大家分享一篇使用pytorch進行圖像的順序讀取方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-07-07
  • python實現(xiàn)QQ定時發(fā)送新年祝福信息

    python實現(xiàn)QQ定時發(fā)送新年祝福信息

    大家好,本篇文章主要講的是python實現(xiàn)QQ定時發(fā)送新年祝福信息,感興趣的同學(xué)感快來看一看吧,對你有幫助的話記得收藏一下
    2022-02-02
  • Python splitlines使用技巧

    Python splitlines使用技巧

    Python中的splitlines用來分割行。當(dāng)傳入的參數(shù)為True時,表示保留換行符 \n。通過下面的例子就很明白了
    2008-09-09

最新評論