詳解python之協(xié)程gevent模塊
Gevent官網(wǎng)文檔地址:http://www.gevent.org/contents.html
進(jìn)程、線程、協(xié)程區(qū)分
我們通常所說(shuō)的協(xié)程Coroutine其實(shí)是corporate routine的縮寫(xiě),直接翻譯為協(xié)同的例程,一般我們都簡(jiǎn)稱為協(xié)程。
在linux系統(tǒng)中,線程就是輕量級(jí)的進(jìn)程,而我們通常也把協(xié)程稱為輕量級(jí)的線程即微線程。
進(jìn)程和協(xié)程
下面對(duì)比一下進(jìn)程和協(xié)程的相同點(diǎn)和不同點(diǎn):
相同點(diǎn):
- 相同點(diǎn)存在于,當(dāng)我們掛起一個(gè)執(zhí)行流的時(shí),我們要保存的東西:
- 棧, 其實(shí)在你切換前你的局部變量,以及要函數(shù)的調(diào)用都需要保存,否則都無(wú)法恢復(fù)
寄存器狀態(tài),這個(gè)其實(shí)用于當(dāng)你的執(zhí)行流恢復(fù)后要做什么
而寄存器和棧的結(jié)合就可以理解為上下文,上下文切換的理解:
CPU看上去像是在并發(fā)的執(zhí)行多個(gè)進(jìn)程,這是通過(guò)處理器在進(jìn)程之間切換來(lái)實(shí)現(xiàn)的,操作系統(tǒng)實(shí)現(xiàn)這種交錯(cuò)執(zhí)行的機(jī)制稱為上下文切換
操作系統(tǒng)保持跟蹤進(jìn)程運(yùn)行所需的所有狀態(tài)信息。這種狀態(tài),就是上下文。
在任何一個(gè)時(shí)刻,操作系統(tǒng)都只能執(zhí)行一個(gè)進(jìn)程代碼,當(dāng)操作系統(tǒng)決定把控制權(quán)從當(dāng)前進(jìn)程轉(zhuǎn)移到某個(gè)新進(jìn)程時(shí),就會(huì)進(jìn)行上下文切換,即保存當(dāng)前進(jìn)程的上下文,恢復(fù)新進(jìn)程的上下文,然后將控制權(quán)傳遞到新進(jìn)程,新進(jìn)程就會(huì)從它上次停止的地方開(kāi)始。
不同點(diǎn):
- 執(zhí)行流的調(diào)度者不同,進(jìn)程是內(nèi)核調(diào)度,而協(xié)程是在用戶態(tài)調(diào)度,也就是說(shuō)進(jìn)程的上下文是在內(nèi)核態(tài)保存恢復(fù)的,而協(xié)程是在用戶態(tài)保存恢復(fù)的,很顯然用戶態(tài)的代價(jià)更低
- 進(jìn)程會(huì)被強(qiáng)占,而協(xié)程不會(huì),也就是說(shuō)協(xié)程如果不主動(dòng)讓出CPU,那么其他的協(xié)程,就沒(méi)有執(zhí)行的機(jī)會(huì)。
- 對(duì)內(nèi)存的占用不同,實(shí)際上協(xié)程可以只需要4K的棧就足夠了,而進(jìn)程占用的內(nèi)存要大的多
- 從操作系統(tǒng)的角度講,多協(xié)程的程序是單進(jìn)程,單協(xié)程
線程和協(xié)程
既然我們上面也說(shuō)了,協(xié)程也被稱為微線程,下面對(duì)比一下協(xié)程和線程:
- 線程之間需要上下文切換成本相對(duì)協(xié)程來(lái)說(shuō)是比較高的,尤其在開(kāi)啟線程較多時(shí),但協(xié)程的切換成本非常低。
- 同樣的線程的切換更多的是靠操作系統(tǒng)來(lái)控制,而協(xié)程的執(zhí)行由我們自己控制。
協(xié)程只是在單一的線程里不同的協(xié)程之間切換,其實(shí)和線程很像,線程是在一個(gè)進(jìn)程下,不同的線程之間做切換,這也可能是協(xié)程稱為微線程的原因吧。
Gevent模塊
Gevent是一種基于協(xié)程的Python網(wǎng)絡(luò)庫(kù),它用到Greenlet提供的,封裝了libevent事件循環(huán)的高層同步API。它讓開(kāi)發(fā)者在不改變編程習(xí)慣的同時(shí),用同步的方式寫(xiě)異步I/O的代碼。
簡(jiǎn)單示例:
import gevent def test1(): print 12 gevent.sleep(0) print 34 def test2(): print 56 gevent.sleep(0) print 78 gevent.joinall([ gevent.spawn(test1), gevent.spawn(test2), ])
結(jié)果:
12
56
34
78
猴子補(bǔ)丁 Monkey patching
這個(gè)補(bǔ)丁是Gevent模塊最需要注意的問(wèn)題,有了它,才會(huì)讓Gevent模塊發(fā)揮它的作用。我們往往使用Gevent是為了實(shí)現(xiàn)網(wǎng)絡(luò)通信的高并發(fā),但是,Gevent直接修改標(biāo)準(zhǔn)庫(kù)里面大部分的阻塞式系統(tǒng)調(diào)用,包括socket、ssl、threading和 select等模塊,而變?yōu)閰f(xié)作式運(yùn)行。但是我們無(wú)法保證你在復(fù)雜的生產(chǎn)環(huán)境中有哪些地方使用這些標(biāo)準(zhǔn)庫(kù)會(huì)由于打了補(bǔ)丁而出現(xiàn)奇怪的問(wèn)題。
一種方法是使用gevent下的socket模塊,我們可以通過(guò)”from gevent import socket”來(lái)導(dǎo)入。不過(guò)更常用的方法是使用猴子布?。∕onkey patching)。使用猴子補(bǔ)丁褒貶不一,但是官網(wǎng)上還是建議使用”patch_all()”,而且在程序的第一行就執(zhí)行。
from gevent import monkey; monkey.patch_socket() import gevent import socket urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org'] jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls] gevent.joinall(jobs, timeout=5) print [job.value for job in jobs]
上述代碼的第一行就是對(duì)socket標(biāo)準(zhǔn)庫(kù)打上猴子補(bǔ)丁,此后socket標(biāo)準(zhǔn)庫(kù)中的類和方法都會(huì)被替換成非阻塞式的,所有其他的代碼都不用修改,這樣協(xié)程的效率就真正體現(xiàn)出來(lái)了。Python中其它標(biāo)準(zhǔn)庫(kù)也存在阻塞的情況,gevent提供了”monkey.patch_all()”方法將所有標(biāo)準(zhǔn)庫(kù)都替換。
獲取協(xié)程狀態(tài)
- started屬性/ready()方法:判斷協(xié)程是否已啟動(dòng)。
- successful()方法:判斷協(xié)程是否成功運(yùn)行且沒(méi)有拋出異常。
- value屬性:獲取協(xié)程執(zhí)行完之后的返回值。
另外,greenlet協(xié)程運(yùn)行過(guò)程中發(fā)生的異常是不會(huì)被拋出到協(xié)程外的,因此需要用協(xié)程對(duì)象的”exception”屬性來(lái)獲取協(xié)程中的異常。
下面的例子很好的演示了各種方法和屬性的使用。
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
import gevent
def win():
return 'You win!'
def fail():
raise Exception('You failed!')
winner = gevent.spawn(win)
loser = gevent.spawn(fail)
print(winner.started) # True
print(loser.started) # True
# 在Greenlet中發(fā)生的異常,不會(huì)被拋到Greenlet外面。
# 控制臺(tái)會(huì)打出Stacktrace,但程序不會(huì)停止
try:
gevent.joinall([winner, loser])
except Exception as e:
# 這段永遠(yuǎn)不會(huì)被執(zhí)行
print('This will never be reached')
print(winner.ready()) # True
print(loser.started) # True
print(winner.value) # 'You win!'
print(loser.value) # None
print('successful ',winner.successful()) # True
print('successful ',loser.successful()) # False
# 這里可以通過(guò)raise loser.exception 或 loser.get()
# 來(lái)將協(xié)程中的異常拋出
print(loser.exception)
協(xié)程運(yùn)行超時(shí)控制
之前我們講過(guò)在”gevent.joinall()”方法中可以傳入timeout參數(shù)來(lái)設(shè)置超時(shí),我們也可以在全局范圍內(nèi)設(shè)置超時(shí)時(shí)間:
import gevent
from gevent import Timeout
timeout = Timeout(2) # 2 seconds
timeout.start()
def wait():
gevent.sleep(10)
try:
gevent.spawn(wait).join()
except Timeout:
print('Could not complete')
上例中,我們將超時(shí)設(shè)為2秒,此后所有協(xié)程的運(yùn)行,如果超過(guò)兩秒就會(huì)拋出”Timeout”異常。我們也可以將超時(shí)設(shè)置在with語(yǔ)句內(nèi),這樣該設(shè)置只在with語(yǔ)句塊中有效:
with Timeout(1): gevent.sleep(10)
此外,我們可以指定超時(shí)所拋出的異常,來(lái)替換默認(rèn)的”Timeout”異常。比如下例中超時(shí)就會(huì)拋出我們自定義的”TooLong”異常。
class TooLong(Exception): pass with Timeout(1, TooLong): gevent.sleep(10)
協(xié)程間通信
事件(Event)對(duì)象
greenlet協(xié)程間的異步通訊可以使用事件(Event)對(duì)象。該對(duì)象的”wait()”方法可以阻塞當(dāng)前協(xié)程,而”set()”方法可以喚醒之前阻塞的協(xié)程。在下面的例子中,5個(gè)waiter協(xié)程都會(huì)等待事件evt,當(dāng)setter協(xié)程在3秒后設(shè)置evt事件,所有的waiter協(xié)程即被喚醒。
#!/usr/bin/env python # _*_ coding utf-8 _*_ #Author: aaron import gevent from gevent.event import Event evt = Event() def setter(): print 'Wait for me' gevent.sleep(3) # 3秒后喚醒所有在evt上等待的協(xié)程 print "Ok, I'm done" evt.set() # 喚醒 def waiter(): print "I'll wait for you" evt.wait() # 等待 print 'Finish waiting' gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter) ])
AsyncResult事件
除了Event事件外,gevent還提供了AsyncResult事件,它可以在喚醒時(shí)傳遞消息。讓我們將上例中的setter和waiter作如下改動(dòng):
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
from gevent.event import AsyncResult
aevt = AsyncResult()
def setter():
print 'Wait for me'
gevent.sleep(3) # 3秒后喚醒所有在evt上等待的協(xié)程
print "Ok, I'm done"
aevt.set('Hello!') # 喚醒,并傳遞消息
def waiter():
print("I'll wait for you")
message = aevt.get() # 等待,并在喚醒時(shí)獲取消息
print 'Got wake up message: %s' % message
隊(duì)列 Queue
隊(duì)列Queue的概念相信大家都知道,我們可以用它的put和get方法來(lái)存取隊(duì)列中的元素。gevent的隊(duì)列對(duì)象可以讓greenlet協(xié)程之間安全的訪問(wèn)。運(yùn)行下面的程序,你會(huì)看到3個(gè)消費(fèi)者會(huì)分別消費(fèi)隊(duì)列中的產(chǎn)品,且消費(fèi)過(guò)的產(chǎn)品不會(huì)被另一個(gè)消費(fèi)者再取到:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron<br>
import gevent
from gevent.queue import Queue
products = Queue()
def consumer(name):
#while not products.empty():
while True:
try:
print('%s got product %s' % (name, products.get_nowait()))
gevent.sleep(0)
except gevent.queue.Empty:
break
print('Quit')
def producer():
for i in range(1, 10):
products.put(i)
gevent.joinall([
gevent.spawn(producer),
gevent.spawn(consumer, 'steve'),
gevent.spawn(consumer, 'john'),
gevent.spawn(consumer, 'nancy'),
])
注意:協(xié)程隊(duì)列跟線程隊(duì)列是一樣的,put和get方法都是阻塞式的,它們都有非阻塞的版本:put_nowait和get_nowait。如果調(diào)用get方法時(shí)隊(duì)列為空,則是不會(huì)拋出”gevent.queue.Empty”異常。我們只能使用get_nowait()的方式讓氣拋出異常。
信號(hào)量
信號(hào)量可以用來(lái)限制協(xié)程并發(fā)的個(gè)數(shù)。它有兩個(gè)方法,acquire和release。顧名思義,acquire就是獲取信號(hào)量,而release就是釋放。當(dāng)所有信號(hào)量都已被獲取,那剩余的協(xié)程就只能等待任一協(xié)程釋放信號(hào)量后才能得以運(yùn)行:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
import gevent
from gevent.coros import BoundedSemaphore
sem = BoundedSemaphore(2)
def worker(n):
sem.acquire()
print('Worker %i acquired semaphore' % n)
gevent.sleep(0)
sem.release()
print('Worker %i released semaphore' % n)
gevent.joinall([gevent.spawn(worker, i) for i in xrange(0, 6)])
上面的例子中,我們初始化了”BoundedSemaphore”信號(hào)量,并將其個(gè)數(shù)定為2。所以同一個(gè)時(shí)間,只能有兩個(gè)worker協(xié)程被調(diào)度。程序運(yùn)行后的結(jié)果如下:
Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 2 acquired semaphore
Worker 3 acquired semaphore
Worker 2 released semaphore
Worker 3 released semaphore
Worker 4 acquired semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore
如果信號(hào)量個(gè)數(shù)為1,那就等同于同步鎖。
協(xié)程本地變量
同線程類似,協(xié)程也有本地變量,也就是只在當(dāng)前協(xié)程內(nèi)可被訪問(wèn)的變量:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
import gevent
from gevent.local import local
data = local()
def f1():
data.x = 1
print data.x
def f2():
try:
print data.x
except AttributeError:
print 'x is not visible'
gevent.joinall([
gevent.spawn(f1),
gevent.spawn(f2)
])
通過(guò)將變量存放在local對(duì)象中,即可將其的作用域限制在當(dāng)前協(xié)程內(nèi),當(dāng)其他協(xié)程要訪問(wèn)該變量時(shí),就會(huì)拋出異常。不同協(xié)程間可以有重名的本地變量,而且互相不影響。因?yàn)閰f(xié)程本地變量的實(shí)現(xiàn),就是將其存放在以的”greenlet.getcurrent()”的返回為鍵值的私有的命名空間內(nèi)。
多并發(fā)socket模型
服務(wù)器端:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
import socket
import gevent
from gevent import socket, monkey
monkey.patch_all()
def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as ex:
print(ex)
finally:
conn.close()
if __name__ == '__main__':
server(8001)
當(dāng)客戶端連接上服務(wù)器端時(shí),服務(wù)器端通過(guò)開(kāi)辟一個(gè)協(xié)程與該客戶端完成交互任務(wù),同時(shí)由于使用了Gevent協(xié)程的方式,在每個(gè)客戶端與服務(wù)器交互時(shí),并不會(huì)影響到服務(wù)器端的工作。
客戶端:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
import socket
HOST = 'localhost' # The remote host
PORT = 8001 # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"), encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
# print(data)
print('Received', repr(data)) # repr 格式化輸出
s.close()
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- Python greenlet和gevent使用代碼示例解析
- Python協(xié)程操作之gevent(yield阻塞,greenlet),協(xié)程實(shí)現(xiàn)多任務(wù)(有規(guī)律的交替協(xié)作執(zhí)行)用法詳解
- Python協(xié)程 yield與協(xié)程greenlet簡(jiǎn)單用法示例
- 使用Python中的greenlet包實(shí)現(xiàn)并發(fā)編程的入門教程
- Python greenlet實(shí)現(xiàn)原理和使用示例
- Python中g(shù)event模塊協(xié)程使用
- 簡(jiǎn)單了解python gevent 協(xié)程使用及作用
- Python中的協(xié)程(Coroutine)操作模塊(greenlet、gevent)
相關(guān)文章
python list count統(tǒng)計(jì)個(gè)數(shù)的實(shí)現(xiàn)
這篇文章主要介紹了python list count統(tǒng)計(jì)個(gè)數(shù)的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
python數(shù)據(jù)爬下來(lái)保存的位置
在本篇文章里小編給大家整理的是關(guān)于python數(shù)據(jù)爬下來(lái)保存的位置,需要的朋友們可以參考下。2020-02-02
python polars數(shù)據(jù)科學(xué)庫(kù)對(duì)比Pandas優(yōu)勢(shì)分析
這篇文章主要為大家介紹了python polars數(shù)據(jù)科學(xué)庫(kù)對(duì)比Pandas優(yōu)勢(shì)分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
Pandas庫(kù)之DataFrame使用的學(xué)習(xí)筆記
這篇文章主要介紹了Pandas庫(kù)之DataFrame使用的學(xué)習(xí)筆記,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06
Python報(bào)錯(cuò)ModuleNotFoundError: No module named&
在嘗試導(dǎo)入TensorBoard模塊時(shí),你可能會(huì)遇到ModuleNotFoundError: No module named 'tensorboard'的錯(cuò)誤,下面我們來(lái)分析這個(gè)問(wèn)題并提供解決方案,需要的朋友可以參考下2024-09-09

