基于并發(fā)服務(wù)器幾種實(shí)現(xiàn)方法(總結(jié))
今天主題是實(shí)現(xiàn)并發(fā)服務(wù)器,實(shí)現(xiàn)方法有多種版本,先從簡(jiǎn)單的單進(jìn)程代碼實(shí)現(xiàn)到多進(jìn)程,多線程的實(shí)現(xiàn),最終引入一些高級(jí)模塊來實(shí)現(xiàn)并發(fā)TCP服務(wù)器。
說到TCP,想起吐槽大會(huì)有個(gè)段子提到三次握手,也只有程序猿(媛)能get。
UDP服務(wù)器數(shù)據(jù)傳輸不可靠,這里就忽略了。
>>:
簡(jiǎn)單的單進(jìn)程TCP服務(wù)器
假代碼:
#創(chuàng)建tcp服務(wù)器套接字
#綁定端口
#設(shè)置正常情況退出的服務(wù)器下,端口可以重用
#設(shè)置監(jiān)聽,變?yōu)橹鲃?dòng)監(jiān)聽
# 等待客戶端的鏈接,返回新的socket和地址
#關(guān)閉tcp服務(wù)器套接字
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR #創(chuàng)建tcp服務(wù)器套接字 server_socket = socket(AF_INET,SOCK_STREAM) #綁定端口 server_socket.bind(("",9999)) #設(shè)置正常情況退出的服務(wù)器下,端口可以重用 server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #設(shè)置監(jiān)聽,變?yōu)橹鲃?dòng)監(jiān)聽 server_socket.listen(5) while True: # 等待客戶端的鏈接,返回新的socket和地址 new_socket,new_address = server_socket.accept() #接收數(shù)據(jù),并且發(fā)送數(shù)據(jù) try: while True: recv_data = new_socket.recv(1024) #當(dāng)有客戶端關(guān)閉后,recv解除阻塞,并且返回長度為0 if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address),recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客戶端%s已經(jīng)關(guān)閉" % (str(new_address))) break finally: new_socket.close() print("關(guān)閉%s客戶端" % (str(new_address))) #關(guān)閉tcp服務(wù)器套接字 server_socket.close()
多進(jìn)程TCP服務(wù)器
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR from multiprocessing import Process #在子進(jìn)程中接收消息 def recv_data(new_socket,new_address): while True: recv_data = new_socket.recv(1024) # 當(dāng)有客戶端關(guān)閉后,recv解除阻塞,并且返回長度為0 if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address), recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客戶端%s已經(jīng)關(guān)閉" % (str(new_address))) break #關(guān)閉與客戶端的連接 print("關(guān)閉與客戶端的連接") new_socket.close() def main(): #創(chuàng)建tcp服務(wù)器套接字 server_socket = socket(AF_INET,SOCK_STREAM) #綁定端口 server_socket.bind(("",8888)) #設(shè)置正常情況退出的服務(wù)器下,端口可以重用 server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #設(shè)置監(jiān)聽,變?yōu)楸粍?dòng)連接 server_socket.listen(3) try: while True: # 等待客戶端的鏈接,返回新的socket和地址 new_socket,new_address = server_socket.accept() #接收數(shù)據(jù),并且發(fā)送數(shù)據(jù) Process(target=recv_data,args=(new_socket,new_address)).start() #因?yàn)橹鬟M(jìn)程和子進(jìn)程不共享數(shù)據(jù) #如果我們直接關(guān)閉new_socket,只是關(guān)閉主進(jìn)程的new_socket,而子進(jìn)程的不受影響 new_socket.close() finally: #關(guān)閉tcp服務(wù)器套接字 server_socket.close() if __name__ == "__main__": main()
多進(jìn)程TCP服務(wù)器
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR from multiprocessing import Process #在子進(jìn)程中接收消息 def recv_data(new_socket,new_address): while True: recv_data = new_socket.recv(1024) # 當(dāng)有客戶端關(guān)閉后,recv解除阻塞,并且返回長度為0 if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address), recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客戶端%s已經(jīng)關(guān)閉" % (str(new_address))) break #關(guān)閉與客戶端的連接 print("關(guān)閉與客戶端的連接") new_socket.close() def main(): #創(chuàng)建tcp服務(wù)器套接字 server_socket = socket(AF_INET,SOCK_STREAM) #綁定端口 server_socket.bind(("",8888)) #設(shè)置正常情況退出的服務(wù)器下,端口可以重用 server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #設(shè)置監(jiān)聽,變?yōu)楸粍?dòng)連接 server_socket.listen(3) try: while True: # 等待客戶端的鏈接,返回新的socket和地址 new_socket,new_address = server_socket.accept() #接收數(shù)據(jù),并且發(fā)送數(shù)據(jù) Process(target=recv_data,args=(new_socket,new_address)).start() #因?yàn)橹鬟M(jìn)程和子進(jìn)程不共享數(shù)據(jù) #如果我們直接關(guān)閉new_socket,只是關(guān)閉主進(jìn)程的new_socket,而子進(jìn)程的不受影響 new_socket.close() finally: #關(guān)閉tcp服務(wù)器套接字 server_socket.close() if __name__ == "__main__": main()
多線程TCP服務(wù)器
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR from threading import Thread #接收消息 def recv_data(new_socket,new_address): while True: recv_data = new_socket.recv(1024) # 當(dāng)有客戶端關(guān)閉后,recv解除阻塞,并且返回長度為0 if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address), recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客戶端%s已經(jīng)關(guān)閉" % (str(new_address))) break def main(): #創(chuàng)建tcp服務(wù)器套接字 server_socket = socket(AF_INET,SOCK_STREAM) #綁定端口 server_socket.bind(("",9999)) #設(shè)置正常情況退出的服務(wù)器下,端口可以重用 server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #設(shè)置監(jiān)聽,變?yōu)楸粍?dòng)連接 server_socket.listen(3) try: while True: # 等待客戶端的鏈接,返回新的socket和地址 new_socket,new_address = server_socket.accept() #接收數(shù)據(jù),并且發(fā)送數(shù)據(jù) Thread(target=recv_data,args=(new_socket,new_address)).start() finally: #關(guān)閉tcp服務(wù)器套接字 server_socket.close() if __name__ == "__main__": main()
多任務(wù)協(xié)程實(shí)現(xiàn) ——
greenlet和gevent
#coding=utf-8 from greenlet import greenlet import time def test1(): while True: print "---A--" gr2.switch() time.sleep(0.5) def test2(): while True: print "---B--" gr1.switch() time.sleep(0.5) gr1 = greenlet(test1) gr2 = greenlet(test2) #切換到gr1中運(yùn)行 gr1.switch()
import gevent #函數(shù) def f(n): for i in range(n): print("%s:%s" % (gevent.getcurrent(),i)) f1 = gevent.spawn(f,5) f2 = gevent.spawn(f,5) f3 = gevent.spawn(f,5) #讓主線程等待三個(gè)協(xié)程執(zhí)行完畢,否則沒有機(jī)會(huì)執(zhí)行 f1.join() f2.join() f3.join() #可以看到,3個(gè)greenlet是依次運(yùn)行而不是交替運(yùn)行。要讓greenlet交替運(yùn)行,可以通過gevent.sleep()交出控制權(quán)。
#coding=utf-8 import gevent def f(n): for i in range(n): print gevent.getcurrent(), i #用來模擬一個(gè)耗時(shí)操作,注意不是time模塊中的sleep gevent.sleep(1) g1 = gevent.spawn(f, 5) g2 = gevent.spawn(f, 5) g3 = gevent.spawn(f, 5) #下面三行代碼意思:主線程等待各個(gè)協(xié)成支持完,否則協(xié)成沒有機(jī)會(huì)執(zhí)行 g1.join() g2.join() g3.join()
單進(jìn)程TCP服務(wù)器 ——
非堵塞式
from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET def main(): #創(chuàng)建tcp的socket套接字 server_socket = socket(AF_INET,SOCK_STREAM) server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #綁定端口 server_socket.bind(("",9999)) #設(shè)置非阻塞,也就是說accept方法不阻塞了, # 但是在沒有客戶端鏈接且被執(zhí)行的時(shí)候會(huì)報(bào)錯(cuò) #有客戶端鏈接的時(shí)候正常執(zhí)行 server_socket.setblocking(False) #設(shè)置監(jiān)聽 server_socket.listen(5) #客戶端列表 client_lists = [] try: #不斷調(diào)用accept while True: try: # print("accept--111") new_socket,new_address = server_socket.accept() print("accept--2222") except Exception as result: # print(result) pass else: print("新的客戶%s鏈接上" % str(new_address)) #新鏈接的new_sokect默認(rèn)也是阻塞,也設(shè)置為非阻塞后,recv為非阻塞 new_socket.setblocking(False) client_lists.append((new_socket,new_address)) # print(111) for client_sokect,client_address in client_lists: #接收數(shù)據(jù) try: recv_data = client_sokect.recv(1024) except Exception as result: # print(result) pass else: # print("正常數(shù)據(jù):%s" %recv_data) if len(recv_data) > 0 : print("收到%s:%s" % (str(client_address),recv_data)) client_sokect.send("thank you!".encode("gb2312")) else: #客戶端已經(jīng)端口,要把該客戶端從列表中異常 client_lists.remove((client_sokect,new_address)) client_sokect.close() print("%s已經(jīng)斷開" % str(new_address)) finally: #關(guān)閉套接字 server_socket.close() if __name__ == "__main__": main()
單進(jìn)程TCP服務(wù)器 ——
select版
select 原理
其他語言(c或者c++)也有使用select實(shí)現(xiàn)多任務(wù)服務(wù)器。
select 能夠完成一些套接字的檢查,從頭到尾檢查一遍后,標(biāo)記哪些套接字是否可以收數(shù)據(jù),返回的時(shí)候,就返回能接收數(shù)據(jù)的套接字,返回的是列表。select是由操作系統(tǒng)提供的,效率要高些,非常快的方式檢測(cè)哪些套接字可以接收數(shù)據(jù)。select是跨平臺(tái)的,在window也可以用。
io多路復(fù)用:沒有使用多進(jìn)程和多線程的情況下完成多個(gè)套接字的使用。
from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET from select import select import sys def main(): #創(chuàng)建tcp的socket套接字 server_socket = socket(AF_INET,SOCK_STREAM) server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #綁定端口 server_socket.bind(("",9999)) #設(shè)置監(jiān)聽 server_socket.listen(5) #客戶端列表 socket_lists = [server_socket,sys.stdin] wirte_list = [] #是否退出 is_run = False try: while True: #檢測(cè)列表client_lists那些socket可以接收數(shù)據(jù), #檢測(cè)列表[]那些套接字(socket)可否發(fā)送數(shù)據(jù) #檢測(cè)列表[]那些套接字(socket)是否產(chǎn)生了異常 print("select--111") #這個(gè)select函數(shù)默認(rèn)是堵塞,當(dāng)有客戶端鏈接的時(shí)候解除阻塞, # 當(dāng)有數(shù)據(jù)可以接收的時(shí)候解除阻塞,當(dāng)客戶端斷開的時(shí)候解除阻塞 readable, wirteable,excep = select(socket_lists,wirte_list,[]) # print("select--2222") # print(111) for sock in wirteable: #這個(gè)會(huì)一直發(fā)送,因?yàn)樗翘幱谝呀?jīng)發(fā)的狀態(tài) sock.send("thank you!".encode("gb2312")) for sock in readable: #接收數(shù)據(jù) if sock == server_socket: print("sock == server_socket") #有新的客戶端鏈接進(jìn)來 new_socket,new_address = sock.accept() #新的socket添加到列表中,便于下次socket的時(shí)候能檢查到 socket_lists.append(new_socket) elif sock == sys.stdin: cmd = sys.stdin.readline() print(cmd) is_run = cmd else: # print("sock.recv(1024)....") #此時(shí)的套接字sock是直接可以取數(shù)據(jù)的 recv_data = sock.recv(1024) if len(recv_data) > 0: print("從[%s]:%s" % (str(new_address),recv_data)) sock.send(recv_data) #把鏈接上有消息接收的socket添加到監(jiān)聽寫的列表中 wirte_list.append(sock) else: print("客戶端已經(jīng)斷開") #客戶端已經(jīng)斷開,要移除 sock.close() socket_lists.remove(sock) #是否退出程序 if is_run: break finally: #關(guān)閉套接字 server_socket.close() if __name__ == "__main__": main()
單進(jìn)程TCP服務(wù)器 ——
epoll版
from socket import * import select def main(): #創(chuàng)建tcp服務(wù)器套接字 server_socket = socket(AF_INET,SOCK_STREAM) #設(shè)置端口可以重用 server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #綁定端口 server_socket.bind(("",9999)) #設(shè)置監(jiān)聽 server_socket.listen(5) #用epoll設(shè)置監(jiān)聽收數(shù)據(jù) epoll = select.epoll() #把server_socket注冊(cè)到epoll的事件監(jiān)聽中,如果已經(jīng)注冊(cè)過會(huì)發(fā)生異常 epoll.register(server_socket.fileno(),select.EPOLLIN|select.EPOLLET) #裝socket列表 socket_lists = {} #裝socket對(duì)應(yīng)的地址 socket_address = {} while True: #返回套接字列表[(socket的文件描述符,select.EPOLLIN)], # 如果有新的鏈接,有數(shù)據(jù)發(fā)過來,斷開鏈接等都會(huì)解除阻塞 print("epoll.poll--111") epoll_list = epoll.poll() print("epoll.poll--222") print(epoll_list) for fd,event in epoll_list: #有新的鏈接 if fd == server_socket.fileno(): print("新的客戶fd==%s" % fd) new_sokect,new_address = server_socket.accept() #往字典添加數(shù)據(jù) socket_lists[new_sokect.fileno()] = new_sokect socket_address[new_sokect.fileno()] = new_address #注冊(cè)新的socket也注冊(cè)到epoll的事件監(jiān)聽中 epoll.register(new_sokect.fileno(), select.EPOLLIN | select.EPOLLET) elif event ==select.EPOLLIN: print("收到數(shù)據(jù)了") #根據(jù)文件操作符取出對(duì)應(yīng)socket new_sokect = socket_lists[fd] address = socket_address[fd] recv_data = new_sokect.recv(1024) if len(recv_data) > 0: print("已經(jīng)收到[%s]:%s" % (str(address),recv_data.decode("gb2312"))) else: #客戶端端口,取消監(jiān)聽 epoll.unregister(fd) #關(guān)閉鏈接 new_sokect.close() print("[%s]已經(jīng)下線" % str(address)) #關(guān)閉套接字鏈接 server_socket.close() if __name__ == "__main__": main()
單進(jìn)程TCP服務(wù)器 ——
gevent版
gevent原理
greenlet已經(jīng)實(shí)現(xiàn)了協(xié)程,但是這個(gè)還得人工切換,是不是覺得太麻煩了,莫要捉急,python還有一個(gè)比greenlet更強(qiáng)大的并且能夠自動(dòng)切換任務(wù)的模塊gevent
原理------當(dāng)一個(gè)greenlet遇到IO(指的是input output 輸入輸出,比如網(wǎng)絡(luò)、文件操作等)操作時(shí),比如訪問網(wǎng)絡(luò),就自動(dòng)切換到其他的greenlet,等到IO操作完成,再在適當(dāng)?shù)臅r(shí)候切換回來繼續(xù)執(zhí)行。
由于IO操作非常耗時(shí),經(jīng)常使程序處于等待狀態(tài),有了gevent為我們自動(dòng)切換協(xié)程,就保證總有g(shù)reenlet在運(yùn)行,而不是等待IO.
import sys import time import gevent from gevent import socket,monkey monkey.patch_all() def handle_request(conn): while True: data = conn.recv(1024) if not data: conn.close() break print("recv:", data) conn.send(data) def server(port): s = socket.socket() s.bind(('', port)) s.listen(5) while True: newSocket, addr = s.accept() gevent.spawn(handle_request, newSocket) if __name__ == '__main__': server(7788)
首先基于以上代碼模塊,撒點(diǎn)概念問題:
1.什么是協(xié)程?
協(xié)程:存在線程中,是比線程更小的執(zhí)行單元,又稱微線程,纖程。自帶cpu上下文,操作協(xié)程由程序員決定,它可以將一個(gè)線程分解為多個(gè)微線程,每個(gè)協(xié)程間共享全局空間的變量,每秒鐘切換頻率高達(dá)百萬次。
2. 什么是計(jì)算密集型和IO密集型
計(jì)算密集型:要進(jìn)行大量的計(jì)算,消耗cpu資源。如復(fù)雜計(jì)算,對(duì)視頻進(jìn)行高清解碼等,全靠cpu的運(yùn)算能力。而計(jì)算密集型任務(wù)完成多任務(wù)切換任務(wù)比較耗時(shí),cpu執(zhí)行任務(wù)效率就越低。在python中,多進(jìn)程適合計(jì)算密集型任務(wù)。
IO密集型:涉及到網(wǎng)絡(luò)、磁盤io的任務(wù)都是io密集型。cpu消耗少,計(jì)算量小,如請(qǐng)求網(wǎng)頁,讀寫文件等。在python中,使用sleep達(dá)到IO密集型任務(wù)的目的,多線程適合IO密集型任務(wù)。
各大實(shí)現(xiàn)版本對(duì)比:
select:
1)支持跨平臺(tái),最大缺陷是單個(gè)進(jìn)程打開的FD是有限的,由FD_SETSIZE設(shè)置,默認(rèn)是1024;
2)對(duì)socket掃描時(shí)是線性掃描,及采用輪詢方式,效率低;
3)需要維護(hù)一個(gè)存放大量FD的數(shù)據(jù)結(jié)構(gòu),使得用戶空間和內(nèi)核空間在傳遞該數(shù)據(jù)結(jié)構(gòu)時(shí)復(fù)制開銷大。
poll:
1)poll與select本質(zhì)上沒有區(qū)別,但poll沒有最大連接數(shù)的限制;
2)大量的fd數(shù)組被整體復(fù)制于用戶態(tài)和內(nèi)核地址空間之間,不管這樣的復(fù)制是不是有意義;
3)‘水平觸發(fā)',如果報(bào)告了fd后,沒有被處理,下次poll時(shí)還會(huì)再次報(bào)告該fd。
epoll:
1)是之前poll和select的增強(qiáng)版,epoll更靈活,沒有描述符限制,能打開的fd遠(yuǎn)大于1024(1G的內(nèi)存上能監(jiān)聽約10萬個(gè)端口);
2)‘邊緣出發(fā)',事件通知機(jī)制,效率提升,最大的特點(diǎn)在于它只管你活躍的連接,而跟連接總數(shù)無關(guān)。而epoll對(duì)文件描述符的操作模式之一ET是一種高效的工作方式,很大程度減少事件反復(fù)觸發(fā)的次數(shù),內(nèi)核不會(huì)發(fā)送更多的通知(only once)。
以上這篇基于并發(fā)服務(wù)器幾種實(shí)現(xiàn)方法(總結(jié))就是小編分享給大家的全部內(nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python數(shù)據(jù)分析之?Matplotlib?散點(diǎn)圖繪制
這篇文章主要介紹了Python數(shù)據(jù)分析之?Matplotlib?散點(diǎn)圖繪制,散點(diǎn)圖又稱散點(diǎn)圖,是使用多個(gè)坐標(biāo)點(diǎn)的分布反映數(shù)據(jù)點(diǎn)分布規(guī)律、數(shù)據(jù)關(guān)聯(lián)關(guān)系的圖表,下文對(duì)散點(diǎn)圖的詳細(xì)介紹及繪制,需要的小伙伴可以參考以一下2022-05-05盤點(diǎn)Python中讀取和提取JSON文件的4種方法
JSON(JavaScript?Object?Notation)是一種輕量級(jí)的數(shù)據(jù)交換格式,Python中提供了多種方式來讀取和處理JSON文件,本文將詳細(xì)介紹四種常見的方法,希望對(duì)大家有所幫助2024-03-03Python實(shí)現(xiàn)字符串模糊匹配的兩種實(shí)現(xiàn)方法
本文主要介紹了Python實(shí)現(xiàn)字符串模糊匹配的兩種實(shí)現(xiàn)方法,Python中通過re.search()方法實(shí)現(xiàn),對(duì)于首位起始的內(nèi)容匹配,也可通過re.match()方法實(shí)現(xiàn),感興趣的可以了解一下2023-11-11Python網(wǎng)頁解析利器BeautifulSoup安裝使用介紹
這篇文章主要介紹了Python網(wǎng)頁解析利器BeautifulSoup安裝使用介紹,本文用一個(gè)完整示例一步一步安裝了BeautifulSoup的安裝和使用過程,需要的朋友可以參考下2015-03-03python使用opencv進(jìn)行人臉識(shí)別
本文主要介紹了python使用opencv進(jìn)行人臉識(shí)別的相關(guān)資料。具有很好的參考價(jià)值。下面跟著小編一起來看下吧2017-04-04