python 并發(fā)編程 非阻塞IO模型原理解析
非阻塞IO(non-blocking IO)
Linux下,可以通過設置socket使其變?yōu)閚on-blocking。當對一個non-blocking socket執(zhí)行讀操作時,流程是這個樣子:

從圖中可以看出,當用戶進程發(fā)出read操作時,如果kernel中的數(shù)據(jù)還沒有準備好,那么它并不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發(fā)起一個read操作后,并不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數(shù)據(jù)還沒有準備好,于是用戶就可以在本次到下次再發(fā)起read詢問的時間間隔內(nèi)做其他事情,或者直接再次發(fā)送read操作。一旦kernel中的數(shù)據(jù)準備好了,并且又再次收到了用戶進程的system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存(這一階段仍然是阻塞的,這段是本地拷貝,copy data ),然后返回。
也就是說非阻塞的recvform系統(tǒng)調(diào)用調(diào)用之后,進程并沒有被阻塞,內(nèi)核馬上返回給進程,如果數(shù)據(jù)還沒準備好,
此時會返回一個error。進程在返回之后,可以干點別的事情,然后再發(fā)起recvform系統(tǒng)調(diào)用。重復上面的過程,
循環(huán)往復的進行recvform系統(tǒng)調(diào)用。這個過程通常被稱之為輪詢。輪詢檢查內(nèi)核數(shù)據(jù),直到數(shù)據(jù)準備好,再拷貝數(shù)據(jù)到進程,
進行數(shù)據(jù)處理。需要注意,拷貝數(shù)據(jù)整個過程,進程仍然是屬于阻塞的狀態(tài)。
所以,在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問kernel操作系統(tǒng)內(nèi)存 數(shù)據(jù)準備好了沒有。
非阻塞IO示例
- 設置socket接口為 非阻塞IO接口
- 默認是True 為阻塞
- server.setblocking(False)
- 處理一下這個異常
BlockingIOError: [WinError 10035] 無法立即完成一個非阻止性套接字操作。
from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8000))
server.listen(5)
# 設置socket接口為 非阻塞IO接口
# 默認是True 為阻塞
server.setblocking(False)
print("starting...")
while True:
try:
conn,addr = server.accept()
print(addr)
except BlockingIOError:
print("干其他的工作")
server.close()
執(zhí)行結果,如上面的圖,一直返回error消息
starting... 干其他的工作 干其他的工作 干其他的工作 干其他的工作
服務端 可以與 多個客戶端建立連接,實現(xiàn)服務端可以不停的建立連接
from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8000))
server.listen(5)
# 設置socket接口為 非阻塞IO接口
# 默認是True 為阻塞
server.setblocking(False)
r_list = []
print("starting...")
while True:
try:
conn,addr = server.accept()
r_list.append(conn)
print(r_list)
except BlockingIOError:
pass
server.close()
起三個客戶端與服務端建立連接

r_list 存著所有建立的連接
有連接來,就建立連接,沒有連接來,就拋出異常
實現(xiàn)IO非阻塞 并發(fā) 多個連接
from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8000))
server.listen(5)
# 設置socket接口為 非阻塞IO接口
# 默認是True 為阻塞
server.setblocking(False)
r_list = []
print("starting...")
while True:
try:
conn,addr = server.accept()
r_list.append(conn)
print(r_list)
except BlockingIOError:
# 定義刪除連接列表
del_rlist = []
for conn in r_list:
try:
data = conn.recv(1024)
# 收空數(shù)據(jù)時候
if not data:
del_rlist.append(conn)
continue
conn.send(data.upper())
# 沒有連接,拋出異常,就結束這次循環(huán),繼續(xù)
except BlockingIOError:
continue
# 套接字出現(xiàn)異常,客戶端單方面連接斷開
except Exception:
conn.close()
del_rlist.append(conn)
break
# 結束上面循環(huán)之后,循環(huán)del_list 連接元素 刪除連接
for conn in del_rlist:
del_rlist.remove(conn)
server.close()
BUG:send也是IO阻塞接口
當send在數(shù)據(jù)量過大時候,也會阻塞。
send操作是,把應用程序把數(shù)據(jù)發(fā)送到操作系統(tǒng)緩存區(qū)里,而操作系統(tǒng)緩存區(qū)空間也是有限的。緩存區(qū)也會滿了,后面還有數(shù)據(jù)需要發(fā)送,那只能等緩存區(qū)清掉數(shù)據(jù),有空間了,才能發(fā)送數(shù)據(jù)。所以在這里緩存區(qū)滿了,就阻塞。
修改后服務端的代碼 可以自己檢測IO,遇到IO切換單個線程的其他任務,去運行,實現(xiàn)單線程并發(fā)
from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8000))
server.listen(5)
# 設置socket接口為 非阻塞IO接口
# 默認是True 為阻塞
server.setblocking(False)
r_list = []
w_list = []
print("starting...")
while True:
try:
conn,addr = server.accept()
r_list.append(conn)
print(r_list)
except BlockingIOError:
# 收消息
# 定義刪除連接列表
del_rlist = []
for conn in r_list:
try:
data = conn.recv(1024)
# 收空數(shù)據(jù)時候
if not data:
del_rlist.append(conn)
continue
'''加入元祖 元祖有兩個元素
1.存放套接字連接
2.準備要發(fā)送的的數(shù)據(jù)
'''
w_list.append((conn, data.upper()))
# 沒有連接,拋出異常,就結束這次循環(huán),繼續(xù)
except BlockingIOError:
continue
# 套接字出現(xiàn)異常,客戶端單方面連接斷開
except Exception:
conn.close()
del_rlist.append(conn)
break
# 發(fā)消息
# 用于 發(fā)成功數(shù)據(jù)后,刪除套接字連接的列表
del_wlist = []
for item in w_list:
try:
conn = item[0]
data = item[1]
conn.send(data)
# 發(fā)成功后,從列表刪除連接
del_wlist.append(item)
# send 有可能出現(xiàn)異常 沒發(fā)完情況
except BlockingIOError:
pass
# 結束上面循環(huán)之后,循環(huán)del_wlist 連接元素 刪除連接
for item in del_wlist:
del_wlist.remove(item)
# 結束上面循環(huán)之后,循環(huán)del_rlist 連接元素 刪除連接
for conn in del_rlist:
del_rlist.remove(conn)
server.close()
這就是非阻塞IO
但是非阻塞IO模型絕不被推薦。
我們不能否則其優(yōu)點:能夠在等待任務完成的時間里干其他活了(包括提交其他任務,也就是 “后臺” 可以有多個任務在“”同時“”執(zhí)行)。
干其他活時候,有可能來新的連接,新的連接來了,不能及時響應與該新的連接,建立連接。所以會導致問題:數(shù)據(jù)不會及時響應
但是也難掩其缺點:
1. 循環(huán)調(diào)用recv()將大幅度推高CPU占用率;這也是我們在代碼中留一句time.sleep(2)的原因,否則在低配主機下極容易出現(xiàn)卡機情況
2. 任務完成的響應延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務可能在兩次輪詢之間的任意時間完成。
這會導致整體數(shù)據(jù)吞吐量的降低。
3.死循環(huán)While True會導致CPU的無用的耗用、占用
此外,在這個方案中recv()更多的是起到檢測“操作是否完成”的作用,實際操作系統(tǒng)提供了更為高效的檢測“操作是否完成“作用的接口,例如select()多路復用模式,可以一次檢測多個連接是否活躍
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
python編程簡單幾行代碼實現(xiàn)視頻轉換Gif示例
這篇文章主要為大家介紹了簡單使用幾行python代碼就可以實現(xiàn)將視頻轉換Gif的示例過程,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-10-10

