Python中的協(xié)程(Coroutine)操作模塊(greenlet、gevent)
一、協(xié)程介紹
協(xié)程:英文名Coroutine,是單線程下的并發(fā),又稱微線程,纖程。
協(xié)程是一種用戶態(tài)的輕量級線程,即協(xié)程是由用戶程序自己控制調(diào)度的。對比操作系統(tǒng)控制線程的切換,用戶在單線程內(nèi)控制協(xié)程的切換。
協(xié)程自己本身無法實現(xiàn)并發(fā)(甚至性能會降低),協(xié)程+IO切換性能提高。
1、介紹
通常程序中子程序調(diào)用總是一個入口,一次返回,調(diào)用順序是明確的。而協(xié)程的調(diào)用和子程序不同。
協(xié)程看上去也是子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序,在適當(dāng)?shù)臅r候再返回來接著執(zhí)行。
注意,在一個子程序中中斷,去執(zhí)行其他子程序,不是函數(shù)調(diào)用,有點(diǎn)類似CPU的中斷。
看起來A、B的執(zhí)行有點(diǎn)像多線程,但協(xié)程的特點(diǎn)在于是一個線程執(zhí)行,那和多線程比,協(xié)程有何優(yōu)勢?
最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯。
第二大優(yōu)勢就是不需要多線程的鎖機(jī)制,因為只有一個線程,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。
因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?最簡單的方法是多進(jìn)程+協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。
2、舉例
Python對協(xié)程的支持是通過generator實現(xiàn)的。
在generator中,我們不但可以通過for
循環(huán)來迭代,還可以不斷調(diào)用next()
函數(shù)獲取由yield
語句返回的下一個值。
但是Python的yield
不但可以返回一個值,它還可以接收調(diào)用者發(fā)出的參數(shù)。
來看例子:
傳統(tǒng)的生產(chǎn)者-消費(fèi)者模型是一個線程寫消息,一個線程取消息,通過鎖機(jī)制控制隊列和等待,但一不小心就可能死鎖。
如果改用協(xié)程,生產(chǎn)者生產(chǎn)消息后,直接通過yield
跳轉(zhuǎn)到消費(fèi)者開始執(zhí)行,待消費(fèi)者執(zhí)行完畢后,切換回生產(chǎn)者繼續(xù)生產(chǎn),效率極高:
def consumer(): r = '' while True: n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) r = '200 OK' def produce(c): c.send(None) n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c)
執(zhí)行結(jié)果:
[PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK
注意到consumer
函數(shù)是一個generator
,把一個consumer
傳入produce
后:
- 首先調(diào)用
c.send(None)
啟動生成器; - 然后,一旦生產(chǎn)了東西,通過
c.send(n)
切換到consumer
執(zhí)行; consumer
通過yield
拿到消息,處理,又通過yield
把結(jié)果傳回;produce
拿到consumer
處理的結(jié)果,繼續(xù)生產(chǎn)下一條消息;produce
決定不生產(chǎn)了,通過c.close()
關(guān)閉consumer
,整個過程結(jié)束。
整個流程無鎖,由一個線程執(zhí)行,produce
和consumer
協(xié)作完成任務(wù),所以稱為“協(xié)程”,而非線程的搶占式多任務(wù)。
最后套用Donald Knuth的一句話總結(jié)協(xié)程的特點(diǎn):“子程序就是協(xié)程的一種特例。”
3、優(yōu)點(diǎn)如下:
- 協(xié)程的切換開銷更小,屬于程序級別的切換,操作系統(tǒng)完全感知不到,因而更加輕量級
- 單線程內(nèi)就可以實現(xiàn)并發(fā)的效果,最大限度地利用cpu
4、缺點(diǎn)如下:
- 協(xié)程的本質(zhì)是單線程下,無法利用多核,可以是一個程序開啟多個進(jìn)程,每個進(jìn)程內(nèi)開啟多個線程,每個線程內(nèi)開啟協(xié)程
- 協(xié)程指的是單個線程,因而一旦協(xié)程出現(xiàn)阻塞,將會阻塞整個線程
5、總結(jié)協(xié)程特點(diǎn):
- 必須在只有一個單線程里實現(xiàn)并發(fā)
- 修改共享數(shù)據(jù)不需加鎖
- 用戶程序里自己保存多個控制流的上下文棧
- 附加:一個協(xié)程遇到IO操作自動切換到其它協(xié)程(如何實現(xiàn)檢測IO,yield、greenlet都無法實現(xiàn),就用到了gevent模塊(select機(jī)制))
二、greenlet(綠葉)模塊
如果我們在單個線程內(nèi)有20個任務(wù),要想實現(xiàn)在多個任務(wù)之間切換,使用yield生成器的方式過于麻煩(需要先得到初始化一次的生成器,然后再調(diào)用send。。。非常麻煩),而使用greenlet模塊可以非常簡單地實現(xiàn)這20個任務(wù)直接的切換。
1、安裝模塊
pip3 install greenlet
2、greenlet實現(xiàn)狀態(tài)切換
單純的切換(在沒有io的情況下或者沒有重復(fù)開辟內(nèi)存空間的操作),反而會降低程序的執(zhí)行速度。
from greenlet import greenlet def eat(name): print('%s eat 1' % name) g2.switch('nick') print('%s eat 2' % name) g2.switch() def play(name): print('%s play 1' % name) g1.switch() print('%s play 2' % name) g1 = greenlet(eat) g2 = greenlet(play) g1.switch('nick') # 可以在第一次switch時傳入?yún)?shù),以后都不需要
3、效率對比
greenlet只是提供了一種比generator更加便捷的切換方式,當(dāng)切到一個任務(wù)執(zhí)行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。
單線程里的這20個任務(wù)的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執(zhí)行任務(wù)1時遇到阻塞,就利用阻塞的時間去執(zhí)行任務(wù)2...如此,才能提高效率,這就用到了Gevent模塊。
#順序執(zhí)行 import time def f1(): res=1 for i in range(100000000): res+=i def f2(): res=1 for i in range(100000000): res*=i start=time.time() f1() f2() stop=time.time() print('run time is %s' %(stop-start)) #10.985628366470337 #切換 from greenlet import greenlet import time def f1(): res=1 for i in range(100000000): res+=i g2.switch() def f2(): res=1 for i in range(100000000): res*=i g1.switch() start=time.time() g1=greenlet(f1) g2=greenlet(f2) g1.switch() stop=time.time() print('run time is %s' %(stop-start)) # 52.763017892837524
三、gevent模塊
Gevent 是一個第三方庫,可以輕松實現(xiàn)并發(fā)同步或異步編程,在gevent中用到的主要模式是Greenlet,它是以C擴(kuò)展模塊形式接入Python的輕量級協(xié)程。
1、安裝
pip3 install gevent
2、 用法介紹
g1=gevent.spawn(func,1,,2,3,x=4,y=5):# 創(chuàng)建一個協(xié)程對象g1,spawn括號內(nèi)第一個參數(shù)是函數(shù)名,如eat,后面可以有多個參數(shù),可以是位置實參或關(guān)鍵字實參,都是傳給函數(shù)eat的 g2=gevent.spawn(func2) g1.join():#等待g1結(jié)束 g2.join():#等待g2結(jié)束 #上述兩步合成一步: gevent.joinall([g1,g2]) g1.value :#拿到func1的返回值
1、遇到io主動切換
import gevent def eat(name): print('%s eat 1' %name) gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,name='egon') g1.join() g2.join() # 或者gevent.joinall([g1,g2]) print('主')
上例gevent.sleep(2)
模擬的是gevent可以識別的io阻塞,而time.sleep(2)
或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補(bǔ)丁,就可以識別了。
from gevent import monkey;monkey.patch_all()
必須放到被打補(bǔ)丁者的前面,如time,socket模塊之前?;蛘呶覀兏纱嘤洃洺桑阂胓event,需要將from gevent import monkey;monkey.patch_all()
放到文件的開頭。
from gevent import monkey;monkey.patch_all() import gevent import time def eat(): print('eat food 1') time.sleep(2) print('eat food 2') def play(): print('play 1') time.sleep(1) print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) print('主')
2、 查看threading.current_thread().getName()
我們可以用threading.current_thread().getName()
來查看每個g1和g2,查看的結(jié)果為DummyThread-n,即假線程
from gevent import monkey;monkey.patch_all() import threading import gevent import time def eat(): print(threading.current_thread().getName()) print('eat food 1') time.sleep(2) print('eat food 2') def play(): print(threading.current_thread().getName()) print('play 1') time.sleep(1) print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) print('主')
3、Gevent之同步與異步
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): # 同步 for i in range(10): task(i) def asynchronous(): # 異步 g_l=[spawn(task,i) for i in range(10)] joinall(g_l) print('DONE') if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() # 上面程序的重要部分是將task函數(shù)封裝到Greenlet內(nèi)部線程的gevent.spawn。 # 初始化的greenlet列表存放在數(shù)組threads中,此數(shù)組被傳給gevent.joinall 函數(shù), # 后者阻塞當(dāng)前流程,并執(zhí)行所有給定的greenlet任務(wù)。執(zhí)行流程只會在 所有g(shù)reenlet執(zhí)行完后才會繼續(xù)向下走。
4、Gevent之應(yīng)用
通過gevent實現(xiàn)單線程下的socket并發(fā)
注意:from gevent import monkey;monkey.patch_all()一定要放到導(dǎo)入socket模塊之前,否則gevent無法識別socket的阻塞。
1、 服務(wù)端
from gevent import monkey;monkey.patch_all() from socket import * import gevent #如果不想用money.patch_all()打補(bǔ)丁,可以用gevent自帶的socket # from gevent import socket # s=socket.socket() def server(server_ip,port): s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind((server_ip,port)) s.listen(5) while True: conn,addr=s.accept() gevent.spawn(talk,conn,addr) def talk(conn,addr): try: while True: res=conn.recv(1024) print('client %s:%s msg: %s' %(addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == '__main__': server('127.0.0.1',8080)
2、多線程并發(fā)多個客戶端
from threading import Thread from socket import * import threading def client(server_ip,port): c=socket(AF_INET,SOCK_STREAM) #套接字對象一定要加到函數(shù)內(nèi),即局部名稱空間內(nèi),放在函數(shù)外則被所有線程共享,則大家公用一個套接字對象,那么客戶端端口永遠(yuǎn)一樣了 c.connect((server_ip,port)) count=0 while True: c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8')) msg=c.recv(1024) print(msg.decode('utf-8')) count+=1 if __name__ == '__main__': for i in range(500): t=Thread(target=client,args=('127.0.0.1',8080)) t.start()
到此這篇關(guān)于Python協(xié)程操作模塊的文章就介紹到這了。希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
pytorch算子torch.arange在CPU?GPU?NPU中支持?jǐn)?shù)據(jù)類型格式
這篇文章主要為大家介紹了pytorch算子torch.arange在CPU?GPU?NPU支持?jǐn)?shù)據(jù)類型格式,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09Python利用wxPython實現(xiàn)ocr識別圖片漢字程序
在這篇博客中,我們將介紹一個如何使用wxPython構(gòu)建的簡單OCR識別圖片漢字應(yīng)用程序,文章的示例代碼講解詳細(xì),感興趣的小伙伴可以學(xué)習(xí)一下2023-08-08python和pywin32實現(xiàn)窗口查找、遍歷和點(diǎn)擊的示例代碼
這篇文章主要介紹了python和pywin32實現(xiàn)窗口查找、遍歷和點(diǎn)擊的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04基于python pygame實現(xiàn)的兔子吃月餅小游戲
pygame是用來開發(fā)游戲的一套基于SDL的模板,它可以是python創(chuàng)建完全界面化的游戲和多媒體程序,而且它基本上可以在任何系統(tǒng)上運(yùn)行,這篇文章主要給大家介紹了基于python pygame實現(xiàn)的兔子吃月餅小游戲的相關(guān)資料,需要的朋友可以參考下2021-09-09pyecharts如何旋轉(zhuǎn)折線圖的X軸標(biāo)簽
這篇文章主要介紹了pyecharts如何旋轉(zhuǎn)折線圖的X軸標(biāo)簽,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-11-11Python中用pycurl監(jiān)控http響應(yīng)時間腳本分享
這篇文章主要介紹了Python中用pycurl監(jiān)控http響應(yīng)時間腳本分享,本文腳本實現(xiàn)監(jiān)控http相應(yīng)碼,響應(yīng)大小,建立連接時間,準(zhǔn)備傳輸時間,傳輸?shù)谝粋€字節(jié)時間,完成時間,需要的朋友可以參考下2015-02-02