Python 使用生成器代替線程的方法
問(wèn)題
你想使用生成器(協(xié)程)替代系統(tǒng)線程來(lái)實(shí)現(xiàn)并發(fā)。這個(gè)有時(shí)又被稱為用戶級(jí)線程或綠色線程。
解決方案
要使用生成器實(shí)現(xiàn)自己的并發(fā),你首先要對(duì)生成器函數(shù)和 yield 語(yǔ)句有深刻理解。 yield 語(yǔ)句會(huì)讓一個(gè)生成器掛起它的執(zhí)行,這樣就可以編寫(xiě)一個(gè)調(diào)度器, 將生成器當(dāng)做某種“任務(wù)”并使用任務(wù)協(xié)作切換來(lái)替換它們的執(zhí)行。 要演示這種思想,考慮下面兩個(gè)使用簡(jiǎn)單的 yield 語(yǔ)句的生成器函數(shù):
# Two simple generator functions
def countdown(n):
while n > 0:
print('T-minus', n)
yield
n -= 1
print('Blastoff!')
def countup(n):
x = 0
while x < n:
print('Counting up', x)
yield
x += 1
這些函數(shù)在內(nèi)部使用yield語(yǔ)句,下面是一個(gè)實(shí)現(xiàn)了簡(jiǎn)單任務(wù)調(diào)度器的代碼:
from collections import deque
class TaskScheduler:
def __init__(self):
self._task_queue = deque()
def new_task(self, task):
'''
Admit a newly started task to the scheduler
'''
self._task_queue.append(task)
def run(self):
'''
Run until there are no more tasks
'''
while self._task_queue:
task = self._task_queue.popleft()
try:
# Run until the next yield statement
next(task)
self._task_queue.append(task)
except StopIteration:
# Generator is no longer executing
pass
# Example use
sched = TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()
TaskScheduler 類在一個(gè)循環(huán)中運(yùn)行生成器集合——每個(gè)都運(yùn)行到碰到y(tǒng)ield語(yǔ)句為止。 運(yùn)行這個(gè)例子,輸出如下:
T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2
...
到此為止,我們實(shí)際上已經(jīng)實(shí)現(xiàn)了一個(gè)“操作系統(tǒng)”的最小核心部分。 生成器函數(shù)就是任務(wù),而yield語(yǔ)句是任務(wù)掛起的信號(hào)。 調(diào)度器循環(huán)檢查任務(wù)列表直到?jīng)]有任務(wù)要執(zhí)行為止。
實(shí)際上,你可能想要使用生成器來(lái)實(shí)現(xiàn)簡(jiǎn)單的并發(fā)。 那么,在實(shí)現(xiàn)actor或網(wǎng)絡(luò)服務(wù)器的時(shí)候你可以使用生成器來(lái)替代線程的使用。
下面的代碼演示了使用生成器來(lái)實(shí)現(xiàn)一個(gè)不依賴線程的actor:
from collections import deque
class ActorScheduler:
def __init__(self):
self._actors = {} # Mapping of names to actors
self._msg_queue = deque() # Message queue
def new_actor(self, name, actor):
'''
Admit a newly started actor to the scheduler and give it a name
'''
self._msg_queue.append((actor,None))
self._actors[name] = actor
def send(self, name, msg):
'''
Send a message to a named actor
'''
actor = self._actors.get(name)
if actor:
self._msg_queue.append((actor,msg))
def run(self):
'''
Run as long as there are pending messages.
'''
while self._msg_queue:
actor, msg = self._msg_queue.popleft()
try:
actor.send(msg)
except StopIteration:
pass
# Example use
if __name__ == '__main__':
def printer():
while True:
msg = yield
print('Got:', msg)
def counter(sched):
while True:
# Receive the current count
n = yield
if n == 0:
break
# Send to the printer task
sched.send('printer', n)
# Send the next count to the counter task (recursive)
sched.send('counter', n-1)
sched = ActorScheduler()
# Create the initial actors
sched.new_actor('printer', printer())
sched.new_actor('counter', counter(sched))
# Send an initial message to the counter to initiate
sched.send('counter', 10000)
sched.run()
完全弄懂這段代碼需要更深入的學(xué)習(xí),但是關(guān)鍵點(diǎn)在于收集消息的隊(duì)列。 本質(zhì)上,調(diào)度器在有需要發(fā)送的消息時(shí)會(huì)一直運(yùn)行著。 計(jì)數(shù)生成器會(huì)給自己發(fā)送消息并在一個(gè)遞歸循環(huán)中結(jié)束。
下面是一個(gè)更加高級(jí)的例子,演示了使用生成器來(lái)實(shí)現(xiàn)一個(gè)并發(fā)網(wǎng)絡(luò)應(yīng)用程序:
from collections import deque
from select import select
# This class represents a generic yield event in the scheduler
class YieldEvent:
def handle_yield(self, sched, task):
pass
def handle_resume(self, sched, task):
pass
# Task Scheduler
class Scheduler:
def __init__(self):
self._numtasks = 0 # Total num of tasks
self._ready = deque() # Tasks ready to run
self._read_waiting = {} # Tasks waiting to read
self._write_waiting = {} # Tasks waiting to write
# Poll for I/O events and restart waiting tasks
def _iopoll(self):
rset,wset,eset = select(self._read_waiting,
self._write_waiting,[])
for r in rset:
evt, task = self._read_waiting.pop(r)
evt.handle_resume(self, task)
for w in wset:
evt, task = self._write_waiting.pop(w)
evt.handle_resume(self, task)
def new(self,task):
'''
Add a newly started task to the scheduler
'''
self._ready.append((task, None))
self._numtasks += 1
def add_ready(self, task, msg=None):
'''
Append an already started task to the ready queue.
msg is what to send into the task when it resumes.
'''
self._ready.append((task, msg))
# Add a task to the reading set
def _read_wait(self, fileno, evt, task):
self._read_waiting[fileno] = (evt, task)
# Add a task to the write set
def _write_wait(self, fileno, evt, task):
self._write_waiting[fileno] = (evt, task)
def run(self):
'''
Run the task scheduler until there are no tasks
'''
while self._numtasks:
if not self._ready:
self._iopoll()
task, msg = self._ready.popleft()
try:
# Run the coroutine to the next yield
r = task.send(msg)
if isinstance(r, YieldEvent):
r.handle_yield(self, task)
else:
raise RuntimeError('unrecognized yield event')
except StopIteration:
self._numtasks -= 1
# Example implementation of coroutine-based socket I/O
class ReadSocket(YieldEvent):
def __init__(self, sock, nbytes):
self.sock = sock
self.nbytes = nbytes
def handle_yield(self, sched, task):
sched._read_wait(self.sock.fileno(), self, task)
def handle_resume(self, sched, task):
data = self.sock.recv(self.nbytes)
sched.add_ready(task, data)
class WriteSocket(YieldEvent):
def __init__(self, sock, data):
self.sock = sock
self.data = data
def handle_yield(self, sched, task):
sched._write_wait(self.sock.fileno(), self, task)
def handle_resume(self, sched, task):
nsent = self.sock.send(self.data)
sched.add_ready(task, nsent)
class AcceptSocket(YieldEvent):
def __init__(self, sock):
self.sock = sock
def handle_yield(self, sched, task):
sched._read_wait(self.sock.fileno(), self, task)
def handle_resume(self, sched, task):
r = self.sock.accept()
sched.add_ready(task, r)
# Wrapper around a socket object for use with yield
class Socket(object):
def __init__(self, sock):
self._sock = sock
def recv(self, maxbytes):
return ReadSocket(self._sock, maxbytes)
def send(self, data):
return WriteSocket(self._sock, data)
def accept(self):
return AcceptSocket(self._sock)
def __getattr__(self, name):
return getattr(self._sock, name)
if __name__ == '__main__':
from socket import socket, AF_INET, SOCK_STREAM
import time
# Example of a function involving generators. This should
# be called using line = yield from readline(sock)
def readline(sock):
chars = []
while True:
c = yield sock.recv(1)
if not c:
break
chars.append(c)
if c == b'\n':
break
return b''.join(chars)
# Echo server using generators
class EchoServer:
def __init__(self,addr,sched):
self.sched = sched
sched.new(self.server_loop(addr))
def server_loop(self,addr):
s = Socket(socket(AF_INET,SOCK_STREAM))
s.bind(addr)
s.listen(5)
while True:
c,a = yield s.accept()
print('Got connection from ', a)
self.sched.new(self.client_handler(Socket(c)))
def client_handler(self,client):
while True:
line = yield from readline(client)
if not line:
break
line = b'GOT:' + line
while line:
nsent = yield client.send(line)
line = line[nsent:]
client.close()
print('Client closed')
sched = Scheduler()
EchoServer(('',16000),sched)
sched.run()
這段代碼有點(diǎn)復(fù)雜。不過(guò),它實(shí)現(xiàn)了一個(gè)小型的操作系統(tǒng)。 有一個(gè)就緒的任務(wù)隊(duì)列,并且還有因I/O休眠的任務(wù)等待區(qū)域。 還有很多調(diào)度器負(fù)責(zé)在就緒隊(duì)列和I/O等待區(qū)域之間移動(dòng)任務(wù)。
討論
在構(gòu)建基于生成器的并發(fā)框架時(shí),通常會(huì)使用更常見(jiàn)的yield形式:
def some_generator(): ... result = yield data ...
使用這種形式的yield語(yǔ)句的函數(shù)通常被稱為“協(xié)程”。 通過(guò)調(diào)度器,yield語(yǔ)句在一個(gè)循環(huán)中被處理,如下:
f = some_generator()
# Initial result. Is None to start since nothing has been computed
result = None
while True:
try:
data = f.send(result)
result = ... do some calculation ...
except StopIteration:
break
這里的邏輯稍微有點(diǎn)復(fù)雜。不過(guò),被傳給 send() 的值定義了在yield語(yǔ)句醒來(lái)時(shí)的返回值。 因此,如果一個(gè)yield準(zhǔn)備在對(duì)之前yield數(shù)據(jù)的回應(yīng)中返回結(jié)果時(shí),會(huì)在下一次 send() 操作返回。 如果一個(gè)生成器函數(shù)剛開(kāi)始運(yùn)行,發(fā)送一個(gè)None值會(huì)讓它排在第一個(gè)yield語(yǔ)句前面。
除了發(fā)送值外,還可以在一個(gè)生成器上面執(zhí)行一個(gè) close() 方法。 它會(huì)導(dǎo)致在執(zhí)行yield語(yǔ)句時(shí)拋出一個(gè) GeneratorExit 異常,從而終止執(zhí)行。 如果進(jìn)一步設(shè)計(jì),一個(gè)生成器可以捕獲這個(gè)異常并執(zhí)行清理操作。 同樣還可以使用生成器的 throw() 方法在yield語(yǔ)句執(zhí)行時(shí)生成一個(gè)任意的執(zhí)行指令。 一個(gè)任務(wù)調(diào)度器可利用它來(lái)在運(yùn)行的生成器中處理錯(cuò)誤。
最后一個(gè)例子中使用的 yield from 語(yǔ)句被用來(lái)實(shí)現(xiàn)協(xié)程,可以被其它生成器作為子程序或過(guò)程來(lái)調(diào)用。 本質(zhì)上就是將控制權(quán)透明的傳輸給新的函數(shù)。 不像普通的生成器,一個(gè)使用 yield from 被調(diào)用的函數(shù)可以返回一個(gè)作為 yield from 語(yǔ)句結(jié)果的值。 關(guān)于 yield from 的更多信息可以在 PEP 380 中找到。
最后,如果使用生成器編程,要提醒你的是它還是有很多缺點(diǎn)的。 特別是,你得不到任何線程可以提供的好處。例如,如果你執(zhí)行CPU依賴或I/O阻塞程序, 它會(huì)將整個(gè)任務(wù)掛起直到操作完成。為了解決這個(gè)問(wèn)題, 你只能選擇將操作委派給另外一個(gè)可以獨(dú)立運(yùn)行的線程或進(jìn)程。 另外一個(gè)限制是大部分Python庫(kù)并不能很好的兼容基于生成器的線程。 如果你選擇這個(gè)方案,你會(huì)發(fā)現(xiàn)你需要自己改寫(xiě)很多標(biāo)準(zhǔn)庫(kù)函數(shù)。 作為本節(jié)提到的協(xié)程和相關(guān)技術(shù)的一個(gè)基礎(chǔ)背景,可以查看 PEP 342 和 “協(xié)程和并發(fā)的一門有趣課程”
PEP 3156 同樣有一個(gè)關(guān)于使用協(xié)程的異步I/O模型。 特別的,你不可能自己去實(shí)現(xiàn)一個(gè)底層的協(xié)程調(diào)度器。 不過(guò),關(guān)于協(xié)程的思想是很多流行庫(kù)的基礎(chǔ), 包括 gevent, greenlet, Stackless Python 以及其他類似工程。
以上就是Python 使用生成器代替線程的方法的詳細(xì)內(nèi)容,更多關(guān)于Python 生成器代替線程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
pyspark 隨機(jī)森林的實(shí)現(xiàn)
這篇文章主要介紹了pyspark 隨機(jī)森林的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04
Pandas實(shí)現(xiàn)Excel文件讀取,增刪,打開(kāi),保存操作
Pandas?是一種基于?NumPy?的開(kāi)源數(shù)據(jù)分析工具,用于處理和分析大量數(shù)據(jù)。本文將通過(guò)Pandas實(shí)現(xiàn)對(duì)Excel文件進(jìn)行讀取、增刪、打開(kāi)、保存等操作,需要的可以參考一下2023-04-04
python神經(jīng)網(wǎng)絡(luò)Densenet模型復(fù)現(xiàn)詳解
這篇文章主要為大家介紹了python神經(jīng)網(wǎng)絡(luò)Densenet模型復(fù)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05
python實(shí)現(xiàn)QQ定時(shí)發(fā)送新年祝福信息
大家好,本篇文章主要講的是python實(shí)現(xiàn)QQ定時(shí)發(fā)送新年祝福信息,感興趣的同學(xué)感快來(lái)看一看吧,對(duì)你有幫助的話記得收藏一下2022-02-02

