python 并發(fā)編程 非阻塞IO模型原理解析
非阻塞IO(non-blocking IO)
Linux下,可以通過設(shè)置socket使其變?yōu)閚on-blocking。當對一個non-blocking socket執(zhí)行讀操作時,流程是這個樣子:
從圖中可以看出,當用戶進程發(fā)出read操作時,如果kernel中的數(shù)據(jù)還沒有準備好,那么它并不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發(fā)起一個read操作后,并不需要等待,而是馬上就得到了一個結(jié)果。用戶進程判斷結(jié)果是一個error時,它就知道數(shù)據(jù)還沒有準備好,于是用戶就可以在本次到下次再發(fā)起read詢問的時間間隔內(nèi)做其他事情,或者直接再次發(fā)送read操作。一旦kernel中的數(shù)據(jù)準備好了,并且又再次收到了用戶進程的system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存(這一階段仍然是阻塞的,這段是本地拷貝,copy data ),然后返回。
也就是說非阻塞的recvform系統(tǒng)調(diào)用調(diào)用之后,進程并沒有被阻塞,內(nèi)核馬上返回給進程,如果數(shù)據(jù)還沒準備好,
此時會返回一個error。進程在返回之后,可以干點別的事情,然后再發(fā)起recvform系統(tǒng)調(diào)用。重復(fù)上面的過程,
循環(huán)往復(fù)的進行recvform系統(tǒng)調(diào)用。這個過程通常被稱之為輪詢。輪詢檢查內(nèi)核數(shù)據(jù),直到數(shù)據(jù)準備好,再拷貝數(shù)據(jù)到進程,
進行數(shù)據(jù)處理。需要注意,拷貝數(shù)據(jù)整個過程,進程仍然是屬于阻塞的狀態(tài)。
所以,在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問kernel操作系統(tǒng)內(nèi)存 數(shù)據(jù)準備好了沒有。
非阻塞IO示例
- 設(shè)置socket接口為 非阻塞IO接口
- 默認是True 為阻塞
- server.setblocking(False)
- 處理一下這個異常
BlockingIOError: [WinError 10035] 無法立即完成一個非阻止性套接字操作。
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認是True 為阻塞 server.setblocking(False) print("starting...") while True: try: conn,addr = server.accept() print(addr) except BlockingIOError: print("干其他的工作") server.close()
執(zhí)行結(jié)果,如上面的圖,一直返回error消息
starting... 干其他的工作 干其他的工作 干其他的工作 干其他的工作
服務(wù)端 可以與 多個客戶端建立連接,實現(xiàn)服務(wù)端可以不停的建立連接
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認是True 為阻塞 server.setblocking(False) r_list = [] print("starting...") while True: try: conn,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: pass server.close()
起三個客戶端與服務(wù)端建立連接
r_list 存著所有建立的連接
有連接來,就建立連接,沒有連接來,就拋出異常
實現(xiàn)IO非阻塞 并發(fā) 多個連接
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認是True 為阻塞 server.setblocking(False) r_list = [] print("starting...") while True: try: conn,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: # 定義刪除連接列表 del_rlist = [] for conn in r_list: try: data = conn.recv(1024) # 收空數(shù)據(jù)時候 if not data: del_rlist.append(conn) continue conn.send(data.upper()) # 沒有連接,拋出異常,就結(jié)束這次循環(huán),繼續(xù) except BlockingIOError: continue # 套接字出現(xiàn)異常,客戶端單方面連接斷開 except Exception: conn.close() del_rlist.append(conn) break # 結(jié)束上面循環(huán)之后,循環(huán)del_list 連接元素 刪除連接 for conn in del_rlist: del_rlist.remove(conn) server.close()
BUG:send也是IO阻塞接口
當send在數(shù)據(jù)量過大時候,也會阻塞。
send操作是,把應(yīng)用程序把數(shù)據(jù)發(fā)送到操作系統(tǒng)緩存區(qū)里,而操作系統(tǒng)緩存區(qū)空間也是有限的。緩存區(qū)也會滿了,后面還有數(shù)據(jù)需要發(fā)送,那只能等緩存區(qū)清掉數(shù)據(jù),有空間了,才能發(fā)送數(shù)據(jù)。所以在這里緩存區(qū)滿了,就阻塞。
修改后服務(wù)端的代碼 可以自己檢測IO,遇到IO切換單個線程的其他任務(wù),去運行,實現(xiàn)單線程并發(fā)
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認是True 為阻塞 server.setblocking(False) r_list = [] w_list = [] print("starting...") while True: try: conn,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: # 收消息 # 定義刪除連接列表 del_rlist = [] for conn in r_list: try: data = conn.recv(1024) # 收空數(shù)據(jù)時候 if not data: del_rlist.append(conn) continue '''加入元祖 元祖有兩個元素 1.存放套接字連接 2.準備要發(fā)送的的數(shù)據(jù) ''' w_list.append((conn, data.upper())) # 沒有連接,拋出異常,就結(jié)束這次循環(huán),繼續(xù) except BlockingIOError: continue # 套接字出現(xiàn)異常,客戶端單方面連接斷開 except Exception: conn.close() del_rlist.append(conn) break # 發(fā)消息 # 用于 發(fā)成功數(shù)據(jù)后,刪除套接字連接的列表 del_wlist = [] for item in w_list: try: conn = item[0] data = item[1] conn.send(data) # 發(fā)成功后,從列表刪除連接 del_wlist.append(item) # send 有可能出現(xiàn)異常 沒發(fā)完情況 except BlockingIOError: pass # 結(jié)束上面循環(huán)之后,循環(huán)del_wlist 連接元素 刪除連接 for item in del_wlist: del_wlist.remove(item) # 結(jié)束上面循環(huán)之后,循環(huán)del_rlist 連接元素 刪除連接 for conn in del_rlist: del_rlist.remove(conn) server.close()
這就是非阻塞IO
但是非阻塞IO模型絕不被推薦。
我們不能否則其優(yōu)點:能夠在等待任務(wù)完成的時間里干其他活了(包括提交其他任務(wù),也就是 “后臺” 可以有多個任務(wù)在“”同時“”執(zhí)行)。
干其他活時候,有可能來新的連接,新的連接來了,不能及時響應(yīng)與該新的連接,建立連接。所以會導(dǎo)致問題:數(shù)據(jù)不會及時響應(yīng)
但是也難掩其缺點:
1. 循環(huán)調(diào)用recv()將大幅度推高CPU占用率;這也是我們在代碼中留一句time.sleep(2)的原因,否則在低配主機下極容易出現(xiàn)卡機情況
2. 任務(wù)完成的響應(yīng)延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務(wù)可能在兩次輪詢之間的任意時間完成。
這會導(dǎo)致整體數(shù)據(jù)吞吐量的降低。
3.死循環(huán)While True會導(dǎo)致CPU的無用的耗用、占用
此外,在這個方案中recv()更多的是起到檢測“操作是否完成”的作用,實際操作系統(tǒng)提供了更為高效的檢測“操作是否完成“作用的接口,例如select()多路復(fù)用模式,可以一次檢測多個連接是否活躍
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
python編程簡單幾行代碼實現(xiàn)視頻轉(zhuǎn)換Gif示例
這篇文章主要為大家介紹了簡單使用幾行python代碼就可以實現(xiàn)將視頻轉(zhuǎn)換Gif的示例過程,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-10-10