Python中IO多路復(fù)用模塊selector的用法詳解
selector 簡介
selector 是一個實(shí)現(xiàn)了IO復(fù)用模型的python包,實(shí)現(xiàn)了IO多路復(fù)用模型的 select、poll 和 epoll 等函數(shù)。它允許程序同時監(jiān)聽多個文件描述符(例如套接字),并在其中任何一個就緒時進(jìn)行相應(yīng)的操作。這樣可以有效地管理并發(fā) I/O 操作,提高程序的性能和資源利用率。
本篇主要講解selector編程示例,以socket編程為主題,首先分析阻塞IO模型的網(wǎng)絡(luò)編程,然后對比selector實(shí)現(xiàn)的IO多路復(fù)用模型的網(wǎng)絡(luò)編程。
阻塞IO模型下的 socket 網(wǎng)絡(luò)編程
通過socket實(shí)現(xiàn)最簡單的客戶端和服務(wù)端通信的功能,阻塞IO模型的特點(diǎn)就是在文件IO或網(wǎng)絡(luò)IO時獲取數(shù)據(jù)的函數(shù)會一直阻塞,直到數(shù)據(jù)到來。服務(wù)端:server.py
import socket # 創(chuàng)建TCP套接字 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 綁定IP地址和端口號 server_address = ('0.0.0.0', 12346) server_socket.bind(server_address) # 監(jiān)聽連接 server_socket.listen() # 接受客戶端連接請求 print("服務(wù)器已啟動,等待客戶端連接...") client_socket, client_address = server_socket.accept() print(f"與客戶端 {client_address} 建立連接") # 向客戶端發(fā)送消息 message = "歡迎連接到服務(wù)器!" client_socket.sendall(message.encode()) while True: # 從客戶端接收消息 data = client_socket.recv(1024).decode() print(f"客戶端消息:{data}") client_socket.sendall(f"服務(wù)器收到消息:{data}".encode()) if data == "close": # 關(guān)閉客戶端套接字 client_socket.close()
客戶端:client.py
import socket # 創(chuàng)建TCP套接字 client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 服務(wù)器地址和端口號 server_address = ('localhost', 12346) # 連接服務(wù)器 client_socket.connect(server_address) # 接收服務(wù)器的消息 data = client_socket.recv(1024).decode() print(f"已連接到服務(wù)器, 服務(wù)器消息:{data}") while True: # 向服務(wù)器發(fā)送消息 message = input() if message == "end": break client_socket.sendall(message.encode()) print(client_socket.recv(1024).decode()) # 關(guān)閉客戶端套接字 client_socket.close()
啟動服務(wù)端,未啟動客戶端
啟動服務(wù)端,代碼執(zhí)行到client_socket, client_address = server_socket.accept()
暫停,accept 是一個阻塞函數(shù)。
函數(shù)說明:
client_socket, client_address = server_socket.accept()
阻塞接口,等待客戶端的連接,沒有客戶端連接時阻塞等待,有客戶端連接時返回新的聊天socket,用于后續(xù)發(fā)送和接收消息
繼續(xù)啟動客戶端
啟動客戶端之后,客戶端連接服務(wù)端,服務(wù)端代碼執(zhí)行到data = client_socket.recv(1024).decode()
, recv 是一個阻塞函數(shù)。
函數(shù)說明:
data = client_socket.recv(1024).decode()
阻塞接口,等待緩沖區(qū)有消息到來。沒有消息時阻塞等待,有消息到來返回消息內(nèi)容
客戶端發(fā)送消息
客戶端發(fā)送消息,data = client_socket.recv(1024).decode()
收到消息,從網(wǎng)絡(luò)協(xié)議棧中獲取消息,并返回。繼續(xù)whie True 循環(huán)的下一輪循環(huán),阻塞在相同地方。
IO多路復(fù)用模型下的socket 網(wǎng)絡(luò)編程 selector
selector是實(shí)現(xiàn)IO多路復(fù)用模型的模塊,首先回憶一下IO多路復(fù)用。
IO多路復(fù)用是通過select
、poll
、epoll
監(jiān)聽文件句柄,當(dāng)有文件句柄處于就緒狀態(tài),就通知對應(yīng)的應(yīng)用程序處理。
服務(wù)端:server.py
import selectors import socket # 選擇一個當(dāng)前平臺最優(yōu)的IO多路復(fù)用模型 sel = selectors.DefaultSelector() def accept(server_socket): conn, addr = server_socket.accept() print(f"與客戶端 {addr} 建立連接") conn.setblocking(False) # 設(shè)定非阻塞 # 注冊conn對象到selector中,當(dāng)conn可讀時,返回conn和回調(diào)函數(shù)read sel.register(conn, selectors.EVENT_READ, read) def read(conn): data = conn.recv(1024).decode('utf-8') print(f"客戶端消息:{data}") if data == "close": sel.unregister(conn) conn.close() else: conn.sendall(f"服務(wù)器收到消息:{data}".encode()) if __name__ == "__main__": sock = socket.socket() sock.bind(("0.0.0.0", 9999)) sock.listen() sock.setblocking(False) # 設(shè)置sock非阻塞 # 將sock注冊到selector中,當(dāng)sock可讀時,返回sock和回調(diào)函數(shù)accept sel.register(sock, selectors.EVENT_READ, accept) print("創(chuàng)建事件循環(huán)") while True: events = sel.select() # 阻塞運(yùn)行,有就緒的事件返回就緒事件列表 for key, _ in events: print(key) # key.data: 注冊的回調(diào)函數(shù) key.fileobj: 注冊的文件句柄 callback = key.data # 注冊的回調(diào)函數(shù) callback(key.fileobj)
主要的函數(shù):
一、自動選擇文件IO模型selectors.DefaultSelector()
選擇一個當(dāng)前平臺最優(yōu)的IO模型,一般來說是epoll或kqueue。存在的可選項(xiàng)包括:
- SelectSelector
- PollSelector
- EpollSelector
- DevpollSelector
- KqueueSelector
DefaultSelector 是一個指向當(dāng)前平臺上可用的最高效實(shí)現(xiàn)的別名,當(dāng)選擇epoll時,可以認(rèn)為 sel = EpollSelector
。返回:一個select對象
二、文件注冊 sel.register(sock, selectors.EVENT_READ, accept)
函數(shù)原型:
register(fileobj, events, data=None)
注冊一個用于選擇的文件對象,在其上監(jiān)視 I/O 事件。
fileobj 是要監(jiān)視的文件對象。它可以是整數(shù)形式的文件描述符或者具有 fileno() 方法的對象。
events 是要監(jiān)視的事件的位掩碼。
data 是一個任意對象或變量。
返回:
這將返回一個新的 SelectorKey 實(shí)例,實(shí)例具體內(nèi)容見下一個函數(shù)的key
三、獲取就緒文件events = sel.select()
函數(shù)原型:
select(timeout=None)
可用于遍歷獲取狀態(tài)變?yōu)榫途w注冊的文件,如果設(shè)置超時時間則可能會拋出超時異常。返回:一個(key, events)的元組,
- key: 一個SelectorKey類的實(shí)例,包括
- fileobj: 已注冊的文件對象。
- fd: 下層的文件描述符
events: 必須在此文件對象上被等待的事件。
events:文件句柄可讀還是可寫的標(biāo)識。為EVENT_READ或EVENT_WRITE,或者二者的組合
client.py
import socket client = socket.socket(family=socket. AF_INET, type=socket.SOCK_STREAM) host = socket.gethostname() client.connect((host, 9999)) while True: data = input("客戶端發(fā)送數(shù)據(jù):").strip() client.send(data.encode()) if data == "end" or data == "": client.close() break print(client.recv(1024).decode("utf-8"))
服務(wù)端啟動,客戶端未啟動
if __name__ == "__main__": sock = socket.socket() sock.bind(("0.0.0.0", 9999)) sock.listen() sock.setblocking(False) # 設(shè)置sock非阻塞 # 將sock注冊到selector中,當(dāng)sock可讀時,返回sock和回調(diào)函數(shù)accept sel.register(sock, selectors.EVENT_READ, accept) print("創(chuàng)建事件循環(huán)") while True: events = sel.select() # 阻塞運(yùn)行,有就緒的事件返回就緒事件列表
服務(wù)端啟動,代碼完成的功能包括:
- 創(chuàng)建一個socket,并綁定IP,監(jiān)聽端口
- 設(shè)置socket為非阻塞,否則超時會報(bào)錯
- 將socket注冊到 selector 中,等待socket就緒,綁定就緒之后的回調(diào)函數(shù)accept
- 進(jìn)入while True循環(huán),訪問select返回的就緒列表。這個阻塞函數(shù),沒有文件讀寫就緒就會阻塞。
繼續(xù)啟動客戶端
啟動一個客戶端,客戶端連接到服務(wù)端,socket文件句柄有連接請求,select返回可讀狀態(tài)的socket。返回的events是一個列表,當(dāng)中只有一個就緒的文件句柄。
key.data拿到注冊的回調(diào)函數(shù)也就是accept函數(shù),key.fileobj拿到文件句柄的socket對象。調(diào)用accept函數(shù),傳入socket對象。
def accept(server_socket): conn, addr = server_socket.accept() print(f"與客戶端 {addr} 建立連接") conn.setblocking(False) # 設(shè)定非阻塞 # 注冊conn對象到selector中,當(dāng)conn可讀時,返回conn和回調(diào)函數(shù)read sel.register(conn, selectors.EVENT_READ, read)
accept中先通過accept接收連接,返回通信使用的文件句柄conn,然后設(shè)置conn為非阻塞,最后將conn阻塞到selector中,傳入回調(diào)函數(shù)read。等conn文件句柄可讀時,就表示有數(shù)據(jù)發(fā)送過來,就可以調(diào)用read函數(shù)讀取內(nèi)容了。
客戶端發(fā)送消息
客戶端發(fā)送消息時,selector會返回可讀狀態(tài)的conn文件句柄,從返回對象中獲取回調(diào)函數(shù),調(diào)用回調(diào)函數(shù)read,傳入文件句柄。
def read(conn): data = conn.recv(1024).decode('utf-8') print(f"客戶端消息:{data}") if data == "close": sel.unregister(conn) conn.close() else: conn.sendall(f"服務(wù)器收到消息:{data}".encode())
在read函數(shù)中,首先獲取了網(wǎng)絡(luò)協(xié)議棧中的消息內(nèi)容,然后判斷消息是否為關(guān)閉連接。如果不是則發(fā)送一條消息給對方。
整個基于IO多路復(fù)用模型的網(wǎng)絡(luò)編程流程就是這樣。
selector 原理分析
selector是操作系統(tǒng)的IO多路復(fù)用模型的一種實(shí)現(xiàn)。通過select
、poll
、epoll
監(jiān)聽文件句柄,在文件句柄可讀的狀態(tài)下,會返回就緒的文件句柄。
返回就緒狀態(tài)文件句柄
sel = selectors.DefaultSelector() while True: events = sel.select()
循環(huán)中訪問sel.select()
就是監(jiān)聽文件句柄狀態(tài)的函數(shù),一個阻塞函數(shù)。應(yīng)用程序調(diào)用該函數(shù)后會等待,直到有數(shù)據(jù)到來,數(shù)據(jù)從設(shè)備發(fā)送到內(nèi)核空間,在socket編程中就是數(shù)據(jù)流從網(wǎng)卡到內(nèi)核空間中。當(dāng)數(shù)據(jù)到達(dá)內(nèi)核空間中,該函數(shù)返回文件句柄相關(guān)的內(nèi)容。
數(shù)據(jù)拷貝
當(dāng)文件句柄就緒之后,就可以從文件句柄里讀取數(shù)據(jù)了。
在selector中相關(guān)的函數(shù)是
conn, addr = server_socket.accept()
data = conn.recv(1024).decode('utf-8')
總結(jié)
一個完整的IO多路復(fù)用模型就是由兩個部分組成,分別是
- 返回就緒狀態(tài)文件句柄
- 數(shù)據(jù)拷貝
asyncio 和 selector 的關(guān)系
selectors 則是 asyncio 的底層實(shí)現(xiàn)之一。asyncio實(shí)現(xiàn)的協(xié)程是由事件循環(huán)
+ 任務(wù)
組成的,而selector就是事件循環(huán)的重要依賴模塊。
asyncio 使用了 selectors 模塊來實(shí)現(xiàn)底層的并發(fā) I/O 操作。通過將 selectors 的功能封裝為 asyncio 提供的事件循環(huán)(Event Loop)和其他協(xié)程相關(guān)的工具。
回顧一下事件循環(huán)的機(jī)制
任務(wù)列表 = [ 任務(wù)1, 任務(wù)2, 任務(wù)3,... ]
while True:
可執(zhí)行的任務(wù)列表,已完成的任務(wù)列表 = 去任務(wù)列表中檢查所有的任務(wù),將'可執(zhí)行'和'已完成'的任務(wù)返回
for 就緒任務(wù) in 已準(zhǔn)備就緒的任務(wù)列表:
執(zhí)行已就緒的任務(wù)
for 已完成的任務(wù) in 已完成的任務(wù)列表:
在任務(wù)列表中移除 已完成的任務(wù)
如果 任務(wù)列表 中的任務(wù)都已完成,則終止循環(huán)
事件循環(huán)就是一個while True的循環(huán),循環(huán)中做的事情有三個:
- 獲取就緒狀態(tài)的任務(wù)和已完成的任務(wù)
- 執(zhí)行就緒狀態(tài)的任務(wù)
- 移除已完成的任務(wù)
那么selector的功能在事件循環(huán)中的功能就非常明顯了,就是負(fù)責(zé)返回IO相關(guān)的就緒任務(wù)。
asyncio 庫使用了底層的 selectors 模塊來監(jiān)聽和管理文件描述符的狀態(tài)變化,并在合適的時候?qū)⒖刂茩?quán)交給其他的協(xié)程。這樣可以實(shí)現(xiàn)非阻塞的 I/O 操作,并支持高并發(fā)和并行執(zhí)行。
selectors 提供了底層的 I/O 多路復(fù)用機(jī)制,而 asyncio 在其之上提供了更高級的異步編程框架。
附錄asyncio模塊事件循環(huán)核心模塊
def run_forever(self): """Run until stop() is called.""" self._check_closed() self._check_running() self._set_coroutine_origin_tracking(self._debug) old_agen_hooks = sys.get_asyncgen_hooks() try: self._thread_id = threading.get_ident() sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) events._set_running_loop(self) while True: self._run_once() if self._stopping: break finally: self._stopping = False self._thread_id = None events._set_running_loop(None) self._set_coroutine_origin_tracking(False) sys.set_asyncgen_hooks(*old_agen_hooks)
到此這篇關(guān)于Python中IO多路復(fù)用模塊selector的用法詳解的文章就介紹到這了,更多相關(guān)Python多路復(fù)用模塊selector內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python實(shí)現(xiàn)pdf轉(zhuǎn)word和excel的示例代碼
本文主要介紹了python實(shí)現(xiàn)pdf轉(zhuǎn)word和excel的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-01-01Python Collections強(qiáng)大的數(shù)據(jù)結(jié)構(gòu)工具使用實(shí)例探索
這篇文章主要介紹了Python Collections強(qiáng)大的數(shù)據(jù)結(jié)構(gòu)工具的使用實(shí)例探索,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01Python中的sorted函數(shù)應(yīng)用及文件操作詳解
這篇文章主要介紹了Python中的sorted函數(shù)應(yīng)用及文件操作詳解,python只能將字符串寫入到文本文件,要將數(shù)值數(shù)據(jù)存儲到文本本件中,必須先試用函數(shù)str()將其轉(zhuǎn)換為字符串格式,需要的朋友可以參考下2023-12-12