老生常談進(jìn)程線程協(xié)程那些事兒
一、進(jìn)程與線程
1.進(jìn)程
我們電腦的應(yīng)用程序,都是進(jìn)程,假設(shè)我們用的電腦是單核的,cpu同時(shí)只能執(zhí)行一個(gè)進(jìn)程。當(dāng)程序出于I/O阻塞的時(shí)候,CPU如果和程序一起等待,那就太浪費(fèi)了,cpu會(huì)去執(zhí)行其他的程序,此時(shí)就涉及到切換,切換前要保存上一個(gè)程序運(yùn)行的狀態(tài),才能恢復(fù),所以就需要有個(gè)東西來記錄這個(gè)東西,就可以引出進(jìn)程的概念了。
進(jìn)程就是一個(gè)程序在一個(gè)數(shù)據(jù)集上的一次動(dòng)態(tài)執(zhí)行過程。進(jìn)程由程序,數(shù)據(jù)集,進(jìn)程控制塊三部分組成。程序用來描述進(jìn)程哪些功能以及如何完成;數(shù)據(jù)集是程序執(zhí)行過程中所使用的資源;進(jìn)程控制塊用來保存程序運(yùn)行的狀態(tài)
2.線程
一個(gè)進(jìn)程中可以開多個(gè)線程,為什么要有進(jìn)程,而不做成線程呢?因?yàn)橐粋€(gè)程序中,線程共享一套數(shù)據(jù),如果都做成進(jìn)程,每個(gè)進(jìn)程獨(dú)占一塊內(nèi)存,那這套數(shù)據(jù)就要復(fù)制好幾份給每個(gè)程序,不合理,所以有了線程。
線程又叫輕量級進(jìn)程,是一個(gè)基本的cpu執(zhí)行單元,也是程序執(zhí)行過程中的最小單元。一個(gè)進(jìn)程最少也會(huì)有一個(gè)主線程,在主線程中通過threading模塊,在開子線程
3.進(jìn)程線程的關(guān)系
(1)一個(gè)線程只能屬于一個(gè)進(jìn)程,而一個(gè)進(jìn)程可以有多個(gè)線程,但至少有一個(gè)線程
(2)資源分配給進(jìn)程,進(jìn)程是程序的主體,同一進(jìn)程的所有線程共享該進(jìn)程的所有資源
(3)cpu分配給線程,即真正在cpu上運(yùn)行的是線程
(4)線程是最小的執(zhí)行單元,進(jìn)程是最小的資源管理單元
4.并行和并發(fā)
并行處理是指計(jì)算機(jī)系統(tǒng)中能同時(shí)執(zhí)行兩個(gè)或多個(gè)任務(wù)的計(jì)算方法,并行處理可同時(shí)工作于同一程序的不同方面
并發(fā)處理是同一時(shí)間段內(nèi)有幾個(gè)程序都在一個(gè)cpu中處于運(yùn)行狀態(tài),但任一時(shí)刻只有一個(gè)程序在cpu上運(yùn)行。
并發(fā)的重點(diǎn)在于有處理多個(gè)任務(wù)的能力,不一定要同時(shí);而并行的重點(diǎn)在于就是有同時(shí)處理多個(gè)任務(wù)的能力。并行是并發(fā)的子集
以上所說的是相對于所有語言來說的,Python的特殊之處在于Python有一把GIL鎖,這把鎖限制了同一時(shí)間內(nèi)一個(gè)進(jìn)程只能有一個(gè)線程能使用cpu
二、threading模塊
這個(gè)模塊的功能就是創(chuàng)建新的線程,有兩種創(chuàng)建線程的方法:
1.直接創(chuàng)建
import threading import time def foo(n): print('>>>>>>>>>>>>>>>%s'%n) time.sleep(3) print('tread 1') t1=threading.Thread(target=foo,args=(2,)) #arg后面一定是元組,t1就是創(chuàng)建的子線程對象 t1.start()#把子進(jìn)程運(yùn)行起來 print('ending')
上面的代碼就是在主線程中創(chuàng)建了一個(gè)子線程
運(yùn)行結(jié)果是:先打印>>>>>>>>>>>>>2,在打印ending,然后等待3秒后打印thread 1
2.另一種方式是通過繼承類創(chuàng)建線程對象
import threading import time class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): print('ok') time.sleep(2) print('end') t1=MyThread()#創(chuàng)建線程對象 t1.start()#激活線程對象 print('end again')
3.join()方法
這個(gè)方法的作用是:在子線程完成運(yùn)行之前,這個(gè)子線程的父線程將一直等待子線程運(yùn)行完再運(yùn)行
import threading import time def foo(n): print('>>>>>>>>>>>>>>>%s'%n) time.sleep(n) print('tread 1') def bar(n): print('>>>>>>>>>>>>>>>>%s'%n) time.sleep(n) print('thread 2') s=time.time() t1=threading.Thread(target=foo,args=(2,)) t1.start()#把子進(jìn)程運(yùn)行起來 t2=threading.Thread(target=bar,args=(5,)) t2.start() t1.join() #只是會(huì)阻擋主線程運(yùn)行,跟t2沒關(guān)系 t2.join() print(time.time()-s) print('ending') ''' 運(yùn)行結(jié)果: >>>>>>>>>>>>>>>2 >>>>>>>>>>>>>>>>5 tread 1 thread 2 5.001286268234253 ending '''
4.setDaemon()方法
這個(gè)方法的作用是把線程聲明為守護(hù)線程,必須在start()方法調(diào)用之前設(shè)置。
默認(rèn)情況下,主線程運(yùn)行完會(huì)檢查子線程是否完成,如果未完成,那么主線程會(huì)等待子線程完成后再退出。但是如果主線程完成后不用管子線程是否運(yùn)行完都退出,就要設(shè)置setDaemon(True)
import threading import time class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): print('ok') time.sleep(2) print('end') t1=MyThread()#創(chuàng)建線程對象 t1.setDaemon(True) t1.start()#激活線程對象 print('end again') #運(yùn)行結(jié)果是馬上打印ok和 end again #然后程序終止,不會(huì)打印end
主線程默認(rèn)是非守護(hù)線程,子線程都是繼承的主線程,所以默認(rèn)也都是非守護(hù)線程
5.其他方法
isAlive(): 返回線程是否處于活動(dòng)中
getName(): 返回線程名
setName(): 設(shè)置線程名
threading.currentThread():返回當(dāng)前的線程變量
threading.enumerate():返回一個(gè)包含正在運(yùn)行的線程的列表
threading.activeCount():返回正在運(yùn)行的線程數(shù)量
三、各種鎖
1.同步鎖(用戶鎖,互斥鎖)
先來看一個(gè)例子:
需求是有一個(gè)全局變量的值是100,我們開100個(gè)線程,每個(gè)線程執(zhí)行的操作是對這個(gè)全局變量減一,最后import threading
import threading import time def sub(): global num temp=num num=temp-1 time.sleep(2) num=100 l=[]for i in range(100): t=threading.Thread(target=sub,args=()) t.start() l.append(t) for i in l: i.join() print(num)
好像一切正常,現(xiàn)在我們改動(dòng)一下,在sub函數(shù)的temp=num,和num=temp-1 中間,加一個(gè)time.sleep(0.1),會(huì)發(fā)現(xiàn)出問題了,結(jié)果變成兩秒后打印99了,改成time.sleep(0.0001)呢,結(jié)果不確定了,但都是90幾,這是怎么回事呢?
這就要說到Python里的那把GIL鎖了,我們來捋一捋:
首次定義一個(gè)全局變量num=100,然后開辟了100個(gè)子線程,但是Python的那把GIL鎖限制了同一時(shí)刻只能有一個(gè)線程使用cpu,所以這100個(gè)線程是處于搶這把鎖的狀態(tài),誰搶到了,誰就可以運(yùn)行自己的代碼。在最開始的情況下,每個(gè)線程搶到cpu,馬上執(zhí)行了對全局變量減一的操作,所以不會(huì)出現(xiàn)問題。但是我們改動(dòng)后,在全局變量減一之前,讓他睡了0.1秒,程序睡著了,cpu可不能一直等著這個(gè)線程,當(dāng)這個(gè)線程處于I/O阻塞的時(shí)候,其他線程就又可以搶cpu了,所以其他線程搶到了,開始執(zhí)行代碼,要知道0.1秒對于cpu的運(yùn)行來說已經(jīng)很長時(shí)間了,這段時(shí)間足夠讓第一個(gè)線程還沒睡醒的時(shí)候,其他線程都搶到過cpu一次了。他們拿到的num都是100,等他們醒來后,執(zhí)行的操作都是100-1,所以最后結(jié)果是99.同樣的道理,如果睡的時(shí)間短一點(diǎn),變成0.001,可能情況就是當(dāng)?shù)?1個(gè)線程第一次搶到cpu的時(shí)候,第一個(gè)線程已經(jīng)睡醒了,并修改了全局變量。所以這第91個(gè)線程拿到的全局變量就是99,然后第二個(gè)第三個(gè)線程陸續(xù)醒過來,分別修改了全局變量,所以最后結(jié)果就是一個(gè)不可知的數(shù)了。一張圖看懂這個(gè)過程
這就是線程安全問題,只要涉及到線程,都會(huì)有這個(gè)問題。解決辦法就是加鎖
我們在全局加一把鎖,用鎖把涉及到數(shù)據(jù)運(yùn)算的操作鎖起來,就把這段代碼變成串行的了,上代碼:
import threading import time def sub(): global num lock.acquire()#獲取鎖 temp=num time.sleep(0.001) num=temp-1 lock.release()#釋放鎖 time.sleep(2) num=100 l=[] lock=threading.Lock() for i in range(100): t=threading.Thread(target=sub,args=()) t.start() l.append(t) for i in l: i.join() print(num)
獲取這把鎖之后,必須釋放掉才能再次被獲取。這把鎖就叫用戶鎖
2.死鎖與遞歸鎖
死鎖就是兩個(gè)及以上進(jìn)程或線程在執(zhí)行過程中,因相互制約造成的一種互相等待的現(xiàn)象,若無外力作用,他們將永遠(yuǎn)卡在那里。舉個(gè)例子:
死鎖示例
import threading,time class MyThread(threading.Thread): def __init(self): threading.Thread.__init__(self) def run(self): self.foo() self.bar() def foo(self): LockA.acquire() print('i am %s GET LOCKA------%s'%(self.name,time.ctime())) #每個(gè)線程有個(gè)默認(rèn)的名字,self.name就獲取這個(gè)名字 LockB.acquire() print('i am %s GET LOCKB-----%s'%(self.name,time.ctime())) LockB.release() time.sleep(1) LockA.release() def bar(self):#與 LockB.acquire() print('i am %s GET LOCKB------%s'%(self.name,time.ctime())) #每個(gè)線程有個(gè)默認(rèn)的名字,self.name就獲取這個(gè)名字 LockA.acquire() print('i am %s GET LOCKA-----%s'%(self.name,time.ctime())) LockA.release() LockB.release() LockA=threading.Lock() LockB=threading.Lock() for i in range(10): t=MyThread() t.start() #運(yùn)行結(jié)果: i am Thread-1 GET LOCKA------Sun Jul 23 11:25:48 2017 i am Thread-1 GET LOCKB-----Sun Jul 23 11:25:48 2017 i am Thread-1 GET LOCKB------Sun Jul 23 11:25:49 2017 i am Thread-2 GET LOCKA------Sun Jul 23 11:25:49 2017 然后就卡住了
上面這個(gè)例子中,線程2在等待線程1釋放B鎖,線程1在等待線程2釋放A鎖,互相制約
我們在用互斥鎖的時(shí)候,一旦用的鎖多了,很容易就出現(xiàn)這種問題
在Python中,為了解決這個(gè)問題,Python提供了一個(gè)叫可重用鎖(RLock)的概念,這個(gè)鎖內(nèi)部維護(hù)著一個(gè)lock和一個(gè)counter變量,counter記錄了acquire的次數(shù),每次acquire,counter就加1,每次release,counter就減1,只有counter的值為0的時(shí)候,其他線程才能獲得資源,下面用RLock替換Lock,在運(yùn)行就不會(huì)卡住了:
遞歸鎖示例
import threading,time class MyThread(threading.Thread): def __init(self): threading.Thread.__init__(self) def run(self): self.foo() self.bar() def foo(self): RLock.acquire() print('i am %s GET LOCKA------%s'%(self.name,time.ctime())) #每個(gè)線程有個(gè)默認(rèn)的名字,self.name就獲取這個(gè)名字 RLock.acquire() print('i am %s GET LOCKB-----%s'%(self.name,time.ctime())) RLock.release() time.sleep(1) RLock.release() def bar(self):#與 RLock.acquire() print('i am %s GET LOCKB------%s'%(self.name,time.ctime())) #每個(gè)線程有個(gè)默認(rèn)的名字,self.name就獲取這個(gè)名字 RLock.acquire() print('i am %s GET LOCKA-----%s'%(self.name,time.ctime())) RLock.release() RLock.release() LockA=threading.Lock() LockB=threading.Lock() RLock=threading.RLock() for i in range(10): t=MyThread() t.start()
這把鎖又叫遞歸鎖
3.Semaphore(信號量)
這也是一把鎖,可以指定有幾個(gè)線程可以同時(shí)獲得這把鎖,最多是5個(gè)(前面說的互斥鎖只能有一個(gè)線程獲得)
import threading import time semaphore=threading.Semaphore(5) def foo(): semaphore.acquire() time.sleep(2) print('ok') semaphore.release() for i in range(10): t=threading.Thread(target=foo,args=()) t.start()
運(yùn)行結(jié)果是每隔兩秒就打印5個(gè)ok
4.Event對象
線程的運(yùn)行是獨(dú)立的,如果線程間需要通信,或者說某個(gè)線程需要根據(jù)一個(gè)線程的狀態(tài)來執(zhí)行下一步的操作,就需要用到Event對象。可以把Event對象看作是一個(gè)標(biāo)志位,默認(rèn)值為假,如果一個(gè)線程等待Event對象,而此時(shí)Event對象中的標(biāo)志位為假,那么這個(gè)線程就會(huì)一直等待,直至標(biāo)志位為真,為真以后,所有等待Event對象的線程將被喚醒
event.isSet():返回event的狀態(tài)值; event.wait():如果 event.isSet()==False將阻塞線程; event.set(): 設(shè)置event的狀態(tài)值為True,所有阻塞池的線程激活進(jìn)入就緒狀態(tài), 等待操作系統(tǒng)調(diào)度;設(shè)置對象的時(shí)候,默認(rèn)是False的 event.clear():恢復(fù)event的狀態(tài)值為False。
用一個(gè)例子來演示Event對象的用法:
import threading,time event=threading.Event() #創(chuàng)建一個(gè)event對象 def foo(): print('wait.......') event.wait() #event.wait(1)#if event 對象內(nèi)的標(biāo)志位為Flase,則阻塞 #wait()里面的參數(shù)的意思是:只等待1秒,如果1秒后還沒有把標(biāo)志位改過來,就不等了,繼續(xù)執(zhí)行下面的代碼 print('connect to redis server') print('attempt to start redis sever)') time.sleep(3) event.set() for i in range(5): t=threading.Thread(target=foo,args=()) t.start() #3秒之后,主線程結(jié)束,但子線程并不是守護(hù)線程,子線程還沒結(jié)束,所以,程序并沒有結(jié)束,應(yīng)該是在3秒之后,把標(biāo)志位設(shè)為true,即event.set()
5.隊(duì)列
官方文檔說隊(duì)列在多線程中保證數(shù)據(jù)安全是非常有用的
隊(duì)列可以理解為是一種數(shù)據(jù)結(jié)構(gòu),可以存儲(chǔ)數(shù)據(jù),讀寫數(shù)據(jù)。就類似列表里面加了一把鎖
5.1get和put方法
import queue #隊(duì)列里讀寫數(shù)據(jù)只有put和get兩個(gè)方法,列表的那些方法都沒有 q=queue.Queue()#創(chuàng)建一個(gè)隊(duì)列對象 FIFO先進(jìn)先出 #q=queue.Queue(20) #這里面可以有一個(gè)參數(shù),設(shè)置最大存的數(shù)據(jù)量,可以理解為最大有幾個(gè)格子 #如果設(shè)置參數(shù)為20,第21次put的時(shí)候,程序就會(huì)阻塞住,直到有空位置,也就是有數(shù)據(jù)被get走 q.put(11)#放值 q.put('hello') q.put(3.14) print(q.get())#取值11 print(q.get())#取值hello print(q.get())#取值3.14 print(q.get())#阻塞,等待put一個(gè)數(shù)據(jù)
get方法中有個(gè)默認(rèn)參數(shù)block=True,把這個(gè)參數(shù)改成False,取不到值的時(shí)候就會(huì)報(bào)錯(cuò)queue.Empty
這樣寫就等同于寫成q.get_nowait())
5.2join和task_done方法
import queue import threading #隊(duì)列里只有put和get兩個(gè)方法,列表的那些方法都沒有 q=queue.Queue()# def foo():#存數(shù)據(jù) # while True: q.put(111) q.put(222) q.put(333) q.join() print('ok')#有個(gè)join,程序就停在這里 def bar(): print(q.get()) q.task_done() print(q.get()) q.task_done() print(q.get()) q.task_done()#要在每個(gè)get()語句后面都加上 t1=threading.Thread(target=foo,args=()) t1.start() t2=threading.Thread(target=bar,args=()) t2.start() #t1,t2誰先誰后無所謂,因?yàn)闀?huì)阻塞住,等待信號
5.3 其他方法
q.qsize() 返回隊(duì)列的大小
q.empty() 如果隊(duì)列為空,返回True,反之False
q.full() 如果隊(duì)列滿了,返回True,反之False
q.full 與 maxsize 大小對應(yīng)
5.4其他模式
前面說的隊(duì)列都是先進(jìn)先出(FIFO)模式,另外還有先進(jìn)后出(LIFO)模式和優(yōu)先級隊(duì)列
先進(jìn)后出模式創(chuàng)建隊(duì)列的方式是:class queue.LifoQueue(maxsize)
優(yōu)先級隊(duì)列的寫法是:class queue.Priorityueue(maxsize)
q=queue.PriorityQueue()
q.put([5,100])#這個(gè)方括號只是代表一個(gè)序列類型,元組列表都行,但是都必須所有的一樣
q.put([7,200])
q.put([3,"hello"])
q.put([4,{"name":"alex"}])
中括號里面第一個(gè)位置就是優(yōu)先級
5.5 生產(chǎn)者消費(fèi)者模型
生產(chǎn)者就相當(dāng)于產(chǎn)生數(shù)據(jù)的線程,消費(fèi)者就相當(dāng)于取數(shù)據(jù)的線程。我們在編寫程序的時(shí)候,一定要考慮生產(chǎn)數(shù)據(jù)的能力和消費(fèi)數(shù)據(jù)的能力是否匹配,如果不匹配,那肯定要有一方需要等待,所以引入了生產(chǎn)者和消費(fèi)者模型。
這個(gè)模型是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者之間的 強(qiáng)耦合問題。有了這個(gè)容器,他們不用直接通信,而是通過這個(gè)容器,這個(gè)容器就是一個(gè)阻塞隊(duì)列,相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的能力。我們寫程序時(shí)用的目錄結(jié)構(gòu),不也是為了解耦和嗎
除了解決強(qiáng)耦合問題,生產(chǎn)者消費(fèi)者模型還能實(shí)現(xiàn)并發(fā)
當(dāng)生產(chǎn)者消費(fèi)者能力不匹配的時(shí)候,就考慮加限制,類似if q.qsize()<20,這種
四、多進(jìn)程
python 中有一把全局鎖(GIL)使得多線程無法使用多核,但是如果是多進(jìn)程,這把鎖就限制不了了。如何開多個(gè)進(jìn)程呢,需要導(dǎo)入一個(gè)multiprocessing模塊
import multiprocessing import time def foo(): print('ok') time.sleep(2) if __name__ == '__main__':#必須是這個(gè)格式 p=multiprocessing.Process(target=foo,args=()) p.start() print('ending')
雖然可以開多進(jìn)程,但是一定注意不能開太多,因?yàn)檫M(jìn)程間切換非常消耗系統(tǒng)資源,如果開上千個(gè)子進(jìn)程,系統(tǒng)會(huì)崩潰的,而且進(jìn)程間的通信也是個(gè)問題。所以,進(jìn)程能不用就不用,能少用就少用
1.進(jìn)程間的通信
進(jìn)程間通信有兩種方式,隊(duì)列和管道
1.1進(jìn)程間的隊(duì)列
每個(gè)進(jìn)程在內(nèi)存中都是獨(dú)立的一塊空間,不項(xiàng)線程那樣可以共享數(shù)據(jù),所以只能由父進(jìn)程通過傳參的方式把隊(duì)列傳給子進(jìn)程
import multiprocessing import threading def foo(q): q.put([12,'hello',True]) if __name__ =='__main__': q=multiprocessing.Queue()#創(chuàng)建進(jìn)程隊(duì)列 #創(chuàng)建一個(gè)子線程 p=multiprocessing.Process(target=foo,args=(q,)) #通過傳參的方式把這個(gè)隊(duì)列對象傳給父進(jìn)程 p.start() print(q.get())
1.2管道
之前學(xué)過的socket其實(shí)就是管道,客戶端 的sock和服務(wù)端的conn是管道 的兩端,在進(jìn)程中也是這個(gè)玩法,也要有管道的兩頭
from multiprocessing import Pipe,Process def foo(sk): sk.send('hello')#主進(jìn)程發(fā)消息 print(sk.recv())#主進(jìn)程收消息 sock,conn=Pipe()#創(chuàng)建了管道的兩頭 if __name__ == '__main__': p=Process(target=foo,args=(sock,)) p.start() print(conn.recv())#子進(jìn)程接收消息 conn.send('hi son')#子進(jìn)程發(fā)消息
2.進(jìn)程間的數(shù)據(jù)共享
我們已經(jīng)通過進(jìn)程隊(duì)列和管道兩種方式實(shí)現(xiàn)了進(jìn)程間的通信,但是還沒有實(shí)現(xiàn)數(shù)據(jù)共享
進(jìn)程間的數(shù)據(jù)共享需要引用一個(gè)manager對象實(shí)現(xiàn),使用的所有的數(shù)據(jù)類型都要通過manager點(diǎn)的方式去創(chuàng)建
from multiprocessing import Process from multiprocessing import Manager def foo(l,i): l.append(i*i) if __name__ == '__main__': manager = Manager() Mlist = manager.list([11,22,33])#創(chuàng)建一個(gè)共享的列表 l=[] for i in range(5): #開辟5個(gè)子進(jìn)程 p = Process(target=foo, args=(Mlist,i)) p.start() l.append(p) for i in l: i.join()#join 方法是等待進(jìn)程結(jié)束后再執(zhí)行下一個(gè) print(Mlist)
3.進(jìn)程池
進(jìn)程池的作用是維護(hù)一個(gè)最大的進(jìn)程量,如果超出設(shè)置的最大值,程序就會(huì)阻塞,知道有可用的進(jìn)程為止
from multiprocessing import Pool import time def foo(n): print(n) time.sleep(2) if __name__ == '__main__': pool_obj=Pool(5)#創(chuàng)建進(jìn)程池 #通過進(jìn)程池創(chuàng)建進(jìn)程 for i in range(5): p=pool_obj.apply_async(func=foo,args=(i,)) #p是創(chuàng)建的池對象 # pool 的使用是先close(),在join(),記住就行了 pool_obj.close() pool_obj.join() print('ending')
進(jìn)程池中有以下幾個(gè)方法:
1.apply:從進(jìn)程池里取一個(gè)進(jìn)程并執(zhí)行 2.apply_async:apply的異步版本 3.terminate:立刻關(guān)閉線程池 4.join:主進(jìn)程等待所有子進(jìn)程執(zhí)行完畢,必須在close或terminate之后 5.close:等待所有進(jìn)程結(jié)束后,才關(guān)閉線程池
五、協(xié)程
協(xié)程在手,天下我有,說走就走。知道了協(xié)程,前面說的進(jìn)程線程就都忘記吧
協(xié)程可以開很多很多,沒有上限,切換之間的消耗可以忽略不計(jì)
1.yield
先來回想一下yield這個(gè)詞,熟悉不,對,就是生成器那用的那個(gè)。yield是個(gè)挺神奇的東西,這是Python的一個(gè)特點(diǎn)。
一般的函數(shù),是遇到return就停止,然后返回return 后面的值,默認(rèn)是None,yield和return很像,但是遇到y(tǒng)ield不會(huì)立刻停止,而是暫停住,直到遇到next(),(for循環(huán)的原理也是next())才會(huì)繼續(xù)執(zhí)行。yield 前面還可以跟一個(gè)變量,通過send()函數(shù)給yield傳值,把值保存在yield前邊的變量中
import time def consumer():#有yield,是一個(gè)生成器 r="" while True: n=yield r#程序暫停,等待next()信號 # if not n: # return print('consumer <--%s..'%n) time.sleep(1) r='200 ok' def producer(c): next(c)#激活生成器c n=0 while n<5: n=n+1 print('produer-->%s..'%n) cr = c.send(n)#向生成器發(fā)送數(shù)據(jù) print('consumer return :',cr) c.close() #生產(chǎn)過程結(jié)束,關(guān)閉生成器 if __name__ == '__main__': c=consumer() producer(c)
看上面的例子,整個(gè)過程沒有鎖的出現(xiàn),還能保證數(shù)據(jù)安全,更要命的是還可以控制順序,優(yōu)雅的實(shí)現(xiàn)了并發(fā),甩多線程幾條街
線程叫微進(jìn)程,而協(xié)程又叫微線程。協(xié)程擁有自己的寄存器上下文和棧,因此能保留上一次調(diào)用的狀態(tài)。
2.greenlet模塊
這個(gè)模塊封裝了yield,使得程序切換非常方便,但是沒法實(shí)現(xiàn)傳值的功能
from greenlet import greenlet def foo(): print('ok1') gr2.switch() print('ok3') gr2.switch() def bar(): print('ok2') gr1.switch() print('ok4') gr1=greenlet(foo) gr2=greenlet(bar) gr1.switch()#啟動(dòng)
3.gevent模塊
在greenlet模塊的基礎(chǔ)上,開發(fā)出了更牛的模塊gevent
gevent為Python提供了更完善的協(xié)程支持,其基本原理是:
當(dāng)一個(gè)greenlet遇到IO操作時(shí),就會(huì)自動(dòng)切換到其他的greenlet,等IO操作完成,再切換回來,這樣就保證了總有g(shù)reenlet在運(yùn)行,而不是等待
import requests import gevent import time def foo(url): response=requests.get(url) response_str=response.text print('get data %s'%len(response_str)) s=time.time() gevent.joinall([gevent.spawn(foo,"https://itk.org/"), gevent.spawn(foo, "https://www.github.com/"), gevent.spawn(foo, "https://zhihu.com/"),]) # foo("https://itk.org/") # foo("https://www.github.com/") # foo("https://zhihu.com/") print(time.time()-s)
4.協(xié)程的優(yōu)缺點(diǎn):
優(yōu)點(diǎn):
上下文切換消耗小
方便切換控制流,簡化編程模型
高并發(fā),高擴(kuò)展性,低成本
缺點(diǎn):
無法利用多核
進(jìn)行阻塞操作時(shí)會(huì)阻塞掉整個(gè)程序
六、IO模型
我們下面會(huì)比較四種IO模型
1.blocking IO
2.nonblocking IO
3.IO multiplexing
4.asynchronous IO
我們以網(wǎng)絡(luò)傳輸數(shù)據(jù)的IO為例,它會(huì)涉及到兩個(gè)系統(tǒng)對象,一個(gè)是調(diào)用這個(gè)IO 的線程或者進(jìn)程,另一個(gè)是系統(tǒng)內(nèi)核,而當(dāng)讀取數(shù)據(jù)的時(shí)候,又會(huì)經(jīng)歷兩個(gè)階段:
等待數(shù)據(jù)準(zhǔn)備
將數(shù)據(jù)從內(nèi)核態(tài)拷貝到用戶態(tài)的進(jìn)程中(因?yàn)榫W(wǎng)絡(luò)的數(shù)據(jù)傳輸是靠物理設(shè)備實(shí)現(xiàn)的,物理設(shè)備是硬件,只能有操作系統(tǒng)的內(nèi)核態(tài)才能處理,但是讀數(shù)據(jù)是程序使用的,所以需要這一步的切換)
1.blocking IO(阻塞IO)
典型的read操作如下圖
linux下,默認(rèn)情況的socket都是blocking,回想我們之前用的socket,sock和conn是兩個(gè)連接,服務(wù)端同時(shí)只能監(jiān)聽一個(gè)連接,所以如果服務(wù)端在等待客戶端發(fā)送消息的時(shí)候,其他連接是不能連接到服務(wù)端的。
在這種模式下,等待數(shù)據(jù)和復(fù)制數(shù)據(jù)都需要等待,所以是全程阻塞的
2.nonlocking IO (非阻塞IO)
在服務(wù)端建立連接之后,加上這個(gè)命令,就變成了非阻塞IO模式
這種模式,有數(shù)據(jù)就取,沒有就報(bào)錯(cuò),可以加一個(gè)異常捕捉。在等待數(shù)據(jù)的時(shí)候不阻塞,但是在copy數(shù)據(jù)的時(shí)候還是會(huì)阻塞,
優(yōu)點(diǎn)是可以把等待連接的這段時(shí)間利用上,但是缺點(diǎn)也很明顯:有很多次系統(tǒng)調(diào)用,消耗很大;而且當(dāng)程序去做別的事的時(shí)候,數(shù)據(jù)到了,雖然不會(huì)丟失,但是程序收到的數(shù)據(jù)也不具有實(shí)時(shí)性
3.IO multiplexing(IO多路復(fù)用)
這個(gè)比較常用,我們以前用的accept(),有兩個(gè)作用:
1.監(jiān)聽,等待連接
2.建立連接
現(xiàn)在我們用select來替代accept的第一個(gè)作用,select的優(yōu)點(diǎn)在于可以監(jiān)聽很多對象,無論哪個(gè)對象活動(dòng),都能做出反應(yīng),并將活動(dòng)的對象收集到一個(gè)列表
import socket import select sock=socket.socket() sock.bind(('127.0.0.1',8080)) sock.listen(5) inp=[sock,] while True: r=select.select(inp,[],[]) print('r',r[0]) for obj in r[0]: if obj == sock: conn,addr=obj.accept()
但是建立連接的功能還是accept做,有了這個(gè),我們就可以用并發(fā)的方式實(shí)現(xiàn)tcp的聊天了
# 服務(wù)端 import socket import time import select sock=socket.socket() sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) sock.bind(('127.0.0.1',8080)) sock.listen(5) inp=[sock,]#監(jiān)聽套接字對象的列表 while True: r=select.select(inp,[],[]) print('r',r[0]) for obj in r[0]: if obj == sock: conn,addr=obj.accept() inp.append(conn) else: data=obj.recv(1024) print(data.decode('utf8')) response=input('>>>>:') obj.send(response.encode('utf8'))
只有在建立連接的時(shí)候,sock才是活動(dòng)的,列表中才會(huì)有這個(gè)對象,如果是在建立連接之后,收發(fā)消息的過程中,活動(dòng)對象就不是sock,而是conn了,所以在實(shí)際操作中要判斷列表中的對象是不是sock
在這個(gè)模型中,等待數(shù)據(jù)與copy數(shù)據(jù)的過程都是阻塞的,所以也叫全程阻塞,與阻塞IO模型相比,這個(gè)模型優(yōu)勢在于處理多個(gè)連接
IO 多路復(fù)用除了select,還有兩種方式,poll 和 epoll
在windows下只支持select,而在linux中,這三個(gè)都有。epoll是最好的,select唯一的優(yōu)點(diǎn)是多平臺(tái)都可以用,但是缺點(diǎn)也很明顯,就是效率很差。poll是epoll和select的中間過渡,與select相比,poll可以監(jiān)聽的數(shù)量沒有限制。epoll沒有最大連接上限,另外監(jiān)聽機(jī)制也完全發(fā)生變化,select的機(jī)制是輪詢(每個(gè)數(shù)據(jù)都檢查一遍,即使找到有變化的也會(huì)繼續(xù)檢查),epoll的機(jī)制是用回調(diào)函數(shù),哪個(gè)對象有變化,那個(gè)就調(diào)用這個(gè)回調(diào)函數(shù)
4. Asynchronous IO (異步IO)
這個(gè)模式是全程無阻塞,只有全程無阻塞才能叫異步,這個(gè)模式雖然看起來不錯(cuò),但是實(shí)際操作起來,如果請求量很大,效率會(huì)很低,而且操作系統(tǒng)的任務(wù)很重
七、selectors 模塊
學(xué)會(huì)了這個(gè)模塊,就不用在乎用的是select,還是poll,或者是epoll了,他們的接口都是這個(gè)模塊。我們只需要知道這個(gè)接口怎么用,它里面封裝的是什么,就不用考慮了
在這個(gè)模塊中,套接字與函數(shù)的綁定是用的一個(gè)regesier()的方法,模塊的用法很固定,服務(wù)端示例如下:
import selectors,socket sel=selectors.DefaultSelector() sock=socket.socket() sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) sock.bind(('127.0.0.1',8080)) sock.listen(5) sock.setblocking(False) def read(conn,mask): data=conn.recv(1024) print(data.decode('utf8')) res=input('>>>>>>:') conn.send(res.encode('utf8')) def accept(sock,mask): conn,addr=sock.accept() sel.register(conn,selectors.EVENT_READ,read)#conn和read函數(shù)綁定 #綁定套接字對象和函數(shù) #綁定(register)的意思就是,套接字對象conn發(fā)生變化時(shí),綁定的函數(shù)能執(zhí)行 sel.register(sock,selectors.EVENT_READ,accept)#中間那個(gè)是固定寫法 while True: events=sel.select() #監(jiān)聽套接字對象(注冊的那個(gè)) #下面幾行代碼基本上就固定寫法了 # print('events',events) for key,mask in events: callback = key.data#綁定的函數(shù), # key.fileobj就是活動(dòng)的套接字對象 # print('callback',callable) #mask是固定的 callback(key.fileobj,mask)#callback是回調(diào)函數(shù) # print('key.fileobj',key.fileobj)
以上這篇老生常談進(jìn)程線程協(xié)程那些事兒就是小編分享給大家的全部內(nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
關(guān)于python函數(shù)的建立、調(diào)用、傳參、返回值詳解
這篇文章主要介紹了關(guān)于python函數(shù)的建立、調(diào)用、傳參、返回值詳解,Python?還支持自定義函數(shù),即將一段有規(guī)律的、可重復(fù)使用的代碼定義成函數(shù),從而達(dá)到一次編寫多次調(diào)用的目的,需要的朋友可以參考下2023-07-07Python學(xué)習(xí)教程之常用的內(nèi)置函數(shù)大全
python給我們提供了很多已經(jīng)定義好的函數(shù),下面這篇文章主要給大家介紹了關(guān)于Python學(xué)習(xí)教程之一些常用的內(nèi)置函數(shù),文中分享了關(guān)于數(shù)學(xué)函數(shù)、功能函數(shù)、類型轉(zhuǎn)換函數(shù)、字符串處理和序列處理函數(shù)的相關(guān)資料,需要的朋友可以參考借鑒,下面來一起看看吧。2017-07-07pandas:get_dummies()與pd.factorize()的用法及區(qū)別說明
這篇文章主要介紹了pandas:get_dummies()與pd.factorize()的用法及區(qū)別說明,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-05-05python實(shí)現(xiàn)掃描日志關(guān)鍵字的示例
下面小編就為大家分享一篇python實(shí)現(xiàn)掃描日志關(guān)鍵字的示例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04Pycharm中配置遠(yuǎn)程Docker運(yùn)行環(huán)境的教程圖解
這篇文章主要介紹了Pycharm中配置遠(yuǎn)程Docker運(yùn)行環(huán)境,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06