基于并發(fā)服務(wù)器幾種實(shí)現(xiàn)方法(總結(jié))
今天主題是實(shí)現(xiàn)并發(fā)服務(wù)器,實(shí)現(xiàn)方法有多種版本,先從簡單的單進(jìn)程代碼實(shí)現(xiàn)到多進(jìn)程,多線程的實(shí)現(xiàn),最終引入一些高級模塊來實(shí)現(xiàn)并發(fā)TCP服務(wù)器。
說到TCP,想起吐槽大會有個(gè)段子提到三次握手,也只有程序猿(媛)能get。
UDP服務(wù)器數(shù)據(jù)傳輸不可靠,這里就忽略了。
>>:
簡單的單進(jìn)程TCP服務(wù)器
假代碼:
#創(chuàng)建tcp服務(wù)器套接字
#綁定端口
#設(shè)置正常情況退出的服務(wù)器下,端口可以重用
#設(shè)置監(jiān)聽,變?yōu)橹鲃颖O(jiān)聽
# 等待客戶端的鏈接,返回新的socket和地址
#關(guān)閉tcp服務(wù)器套接字
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR
#創(chuàng)建tcp服務(wù)器套接字
server_socket = socket(AF_INET,SOCK_STREAM)
#綁定端口
server_socket.bind(("",9999))
#設(shè)置正常情況退出的服務(wù)器下,端口可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#設(shè)置監(jiān)聽,變?yōu)橹鲃颖O(jiān)聽
server_socket.listen(5)
while True:
# 等待客戶端的鏈接,返回新的socket和地址
new_socket,new_address = server_socket.accept()
#接收數(shù)據(jù),并且發(fā)送數(shù)據(jù)
try:
while True:
recv_data = new_socket.recv(1024)
#當(dāng)有客戶端關(guān)閉后,recv解除阻塞,并且返回長度為0
if len(recv_data) > 0:
recv_content = recv_data.decode("gb2312")
print("收到:%s的信息是:%s" % (str(new_address),recv_content))
new_socket.send("thank you!".encode("gb2312"))
else:
print("客戶端%s已經(jīng)關(guān)閉" % (str(new_address)))
break
finally:
new_socket.close()
print("關(guān)閉%s客戶端" % (str(new_address)))
#關(guān)閉tcp服務(wù)器套接字
server_socket.close()
多進(jìn)程TCP服務(wù)器
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR
from multiprocessing import Process
#在子進(jìn)程中接收消息
def recv_data(new_socket,new_address):
while True:
recv_data = new_socket.recv(1024)
# 當(dāng)有客戶端關(guān)閉后,recv解除阻塞,并且返回長度為0
if len(recv_data) > 0:
recv_content = recv_data.decode("gb2312")
print("收到:%s的信息是:%s" % (str(new_address), recv_content))
new_socket.send("thank you!".encode("gb2312"))
else:
print("客戶端%s已經(jīng)關(guān)閉" % (str(new_address)))
break
#關(guān)閉與客戶端的連接
print("關(guān)閉與客戶端的連接")
new_socket.close()
def main():
#創(chuàng)建tcp服務(wù)器套接字
server_socket = socket(AF_INET,SOCK_STREAM)
#綁定端口
server_socket.bind(("",8888))
#設(shè)置正常情況退出的服務(wù)器下,端口可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#設(shè)置監(jiān)聽,變?yōu)楸粍舆B接
server_socket.listen(3)
try:
while True:
# 等待客戶端的鏈接,返回新的socket和地址
new_socket,new_address = server_socket.accept()
#接收數(shù)據(jù),并且發(fā)送數(shù)據(jù)
Process(target=recv_data,args=(new_socket,new_address)).start()
#因?yàn)橹鬟M(jìn)程和子進(jìn)程不共享數(shù)據(jù)
#如果我們直接關(guān)閉new_socket,只是關(guān)閉主進(jìn)程的new_socket,而子進(jìn)程的不受影響
new_socket.close()
finally:
#關(guān)閉tcp服務(wù)器套接字
server_socket.close()
if __name__ == "__main__":
main()
多進(jìn)程TCP服務(wù)器
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR
from multiprocessing import Process
#在子進(jìn)程中接收消息
def recv_data(new_socket,new_address):
while True:
recv_data = new_socket.recv(1024)
# 當(dāng)有客戶端關(guān)閉后,recv解除阻塞,并且返回長度為0
if len(recv_data) > 0:
recv_content = recv_data.decode("gb2312")
print("收到:%s的信息是:%s" % (str(new_address), recv_content))
new_socket.send("thank you!".encode("gb2312"))
else:
print("客戶端%s已經(jīng)關(guān)閉" % (str(new_address)))
break
#關(guān)閉與客戶端的連接
print("關(guān)閉與客戶端的連接")
new_socket.close()
def main():
#創(chuàng)建tcp服務(wù)器套接字
server_socket = socket(AF_INET,SOCK_STREAM)
#綁定端口
server_socket.bind(("",8888))
#設(shè)置正常情況退出的服務(wù)器下,端口可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#設(shè)置監(jiān)聽,變?yōu)楸粍舆B接
server_socket.listen(3)
try:
while True:
# 等待客戶端的鏈接,返回新的socket和地址
new_socket,new_address = server_socket.accept()
#接收數(shù)據(jù),并且發(fā)送數(shù)據(jù)
Process(target=recv_data,args=(new_socket,new_address)).start()
#因?yàn)橹鬟M(jìn)程和子進(jìn)程不共享數(shù)據(jù)
#如果我們直接關(guān)閉new_socket,只是關(guān)閉主進(jìn)程的new_socket,而子進(jìn)程的不受影響
new_socket.close()
finally:
#關(guān)閉tcp服務(wù)器套接字
server_socket.close()
if __name__ == "__main__":
main()
多線程TCP服務(wù)器
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR
from threading import Thread
#接收消息
def recv_data(new_socket,new_address):
while True:
recv_data = new_socket.recv(1024)
# 當(dāng)有客戶端關(guān)閉后,recv解除阻塞,并且返回長度為0
if len(recv_data) > 0:
recv_content = recv_data.decode("gb2312")
print("收到:%s的信息是:%s" % (str(new_address), recv_content))
new_socket.send("thank you!".encode("gb2312"))
else:
print("客戶端%s已經(jīng)關(guān)閉" % (str(new_address)))
break
def main():
#創(chuàng)建tcp服務(wù)器套接字
server_socket = socket(AF_INET,SOCK_STREAM)
#綁定端口
server_socket.bind(("",9999))
#設(shè)置正常情況退出的服務(wù)器下,端口可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#設(shè)置監(jiān)聽,變?yōu)楸粍舆B接
server_socket.listen(3)
try:
while True:
# 等待客戶端的鏈接,返回新的socket和地址
new_socket,new_address = server_socket.accept()
#接收數(shù)據(jù),并且發(fā)送數(shù)據(jù)
Thread(target=recv_data,args=(new_socket,new_address)).start()
finally:
#關(guān)閉tcp服務(wù)器套接字
server_socket.close()
if __name__ == "__main__":
main()
多任務(wù)協(xié)程實(shí)現(xiàn) ——
greenlet和gevent
#coding=utf-8 from greenlet import greenlet import time def test1(): while True: print "---A--" gr2.switch() time.sleep(0.5) def test2(): while True: print "---B--" gr1.switch() time.sleep(0.5) gr1 = greenlet(test1) gr2 = greenlet(test2) #切換到gr1中運(yùn)行 gr1.switch()
import gevent
#函數(shù)
def f(n):
for i in range(n):
print("%s:%s" % (gevent.getcurrent(),i))
f1 = gevent.spawn(f,5)
f2 = gevent.spawn(f,5)
f3 = gevent.spawn(f,5)
#讓主線程等待三個(gè)協(xié)程執(zhí)行完畢,否則沒有機(jī)會執(zhí)行
f1.join()
f2.join()
f3.join()
#可以看到,3個(gè)greenlet是依次運(yùn)行而不是交替運(yùn)行。要讓greenlet交替運(yùn)行,可以通過gevent.sleep()交出控制權(quán)。
#coding=utf-8 import gevent def f(n): for i in range(n): print gevent.getcurrent(), i #用來模擬一個(gè)耗時(shí)操作,注意不是time模塊中的sleep gevent.sleep(1) g1 = gevent.spawn(f, 5) g2 = gevent.spawn(f, 5) g3 = gevent.spawn(f, 5) #下面三行代碼意思:主線程等待各個(gè)協(xié)成支持完,否則協(xié)成沒有機(jī)會執(zhí)行 g1.join() g2.join() g3.join()
單進(jìn)程TCP服務(wù)器 ——
非堵塞式
from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET
def main():
#創(chuàng)建tcp的socket套接字
server_socket = socket(AF_INET,SOCK_STREAM)
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#綁定端口
server_socket.bind(("",9999))
#設(shè)置非阻塞,也就是說accept方法不阻塞了,
# 但是在沒有客戶端鏈接且被執(zhí)行的時(shí)候會報(bào)錯
#有客戶端鏈接的時(shí)候正常執(zhí)行
server_socket.setblocking(False)
#設(shè)置監(jiān)聽
server_socket.listen(5)
#客戶端列表
client_lists = []
try:
#不斷調(diào)用accept
while True:
try:
# print("accept--111")
new_socket,new_address = server_socket.accept()
print("accept--2222")
except Exception as result:
# print(result)
pass
else:
print("新的客戶%s鏈接上" % str(new_address))
#新鏈接的new_sokect默認(rèn)也是阻塞,也設(shè)置為非阻塞后,recv為非阻塞
new_socket.setblocking(False)
client_lists.append((new_socket,new_address))
# print(111)
for client_sokect,client_address in client_lists:
#接收數(shù)據(jù)
try:
recv_data = client_sokect.recv(1024)
except Exception as result:
# print(result)
pass
else:
# print("正常數(shù)據(jù):%s" %recv_data)
if len(recv_data) > 0 :
print("收到%s:%s" % (str(client_address),recv_data))
client_sokect.send("thank you!".encode("gb2312"))
else:
#客戶端已經(jīng)端口,要把該客戶端從列表中異常
client_lists.remove((client_sokect,new_address))
client_sokect.close()
print("%s已經(jīng)斷開" % str(new_address))
finally:
#關(guān)閉套接字
server_socket.close()
if __name__ == "__main__":
main()
單進(jìn)程TCP服務(wù)器 ——
select版
select 原理
其他語言(c或者c++)也有使用select實(shí)現(xiàn)多任務(wù)服務(wù)器。
select 能夠完成一些套接字的檢查,從頭到尾檢查一遍后,標(biāo)記哪些套接字是否可以收數(shù)據(jù),返回的時(shí)候,就返回能接收數(shù)據(jù)的套接字,返回的是列表。select是由操作系統(tǒng)提供的,效率要高些,非??斓姆绞綑z測哪些套接字可以接收數(shù)據(jù)。select是跨平臺的,在window也可以用。
io多路復(fù)用:沒有使用多進(jìn)程和多線程的情況下完成多個(gè)套接字的使用。
from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET
from select import select
import sys
def main():
#創(chuàng)建tcp的socket套接字
server_socket = socket(AF_INET,SOCK_STREAM)
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#綁定端口
server_socket.bind(("",9999))
#設(shè)置監(jiān)聽
server_socket.listen(5)
#客戶端列表
socket_lists = [server_socket,sys.stdin]
wirte_list = []
#是否退出
is_run = False
try:
while True:
#檢測列表client_lists那些socket可以接收數(shù)據(jù),
#檢測列表[]那些套接字(socket)可否發(fā)送數(shù)據(jù)
#檢測列表[]那些套接字(socket)是否產(chǎn)生了異常
print("select--111")
#這個(gè)select函數(shù)默認(rèn)是堵塞,當(dāng)有客戶端鏈接的時(shí)候解除阻塞,
# 當(dāng)有數(shù)據(jù)可以接收的時(shí)候解除阻塞,當(dāng)客戶端斷開的時(shí)候解除阻塞
readable, wirteable,excep = select(socket_lists,wirte_list,[])
# print("select--2222")
# print(111)
for sock in wirteable:
#這個(gè)會一直發(fā)送,因?yàn)樗翘幱谝呀?jīng)發(fā)的狀態(tài)
sock.send("thank you!".encode("gb2312"))
for sock in readable:
#接收數(shù)據(jù)
if sock == server_socket:
print("sock == server_socket")
#有新的客戶端鏈接進(jìn)來
new_socket,new_address = sock.accept()
#新的socket添加到列表中,便于下次socket的時(shí)候能檢查到
socket_lists.append(new_socket)
elif sock == sys.stdin:
cmd = sys.stdin.readline()
print(cmd)
is_run = cmd
else:
# print("sock.recv(1024)....")
#此時(shí)的套接字sock是直接可以取數(shù)據(jù)的
recv_data = sock.recv(1024)
if len(recv_data) > 0:
print("從[%s]:%s" % (str(new_address),recv_data))
sock.send(recv_data)
#把鏈接上有消息接收的socket添加到監(jiān)聽寫的列表中
wirte_list.append(sock)
else:
print("客戶端已經(jīng)斷開")
#客戶端已經(jīng)斷開,要移除
sock.close()
socket_lists.remove(sock)
#是否退出程序
if is_run:
break
finally:
#關(guān)閉套接字
server_socket.close()
if __name__ == "__main__":
main()
單進(jìn)程TCP服務(wù)器 ——
epoll版
from socket import *
import select
def main():
#創(chuàng)建tcp服務(wù)器套接字
server_socket = socket(AF_INET,SOCK_STREAM)
#設(shè)置端口可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#綁定端口
server_socket.bind(("",9999))
#設(shè)置監(jiān)聽
server_socket.listen(5)
#用epoll設(shè)置監(jiān)聽收數(shù)據(jù)
epoll = select.epoll()
#把server_socket注冊到epoll的事件監(jiān)聽中,如果已經(jīng)注冊過會發(fā)生異常
epoll.register(server_socket.fileno(),select.EPOLLIN|select.EPOLLET)
#裝socket列表
socket_lists = {}
#裝socket對應(yīng)的地址
socket_address = {}
while True:
#返回套接字列表[(socket的文件描述符,select.EPOLLIN)],
# 如果有新的鏈接,有數(shù)據(jù)發(fā)過來,斷開鏈接等都會解除阻塞
print("epoll.poll--111")
epoll_list = epoll.poll()
print("epoll.poll--222")
print(epoll_list)
for fd,event in epoll_list:
#有新的鏈接
if fd == server_socket.fileno():
print("新的客戶fd==%s" % fd)
new_sokect,new_address = server_socket.accept()
#往字典添加數(shù)據(jù)
socket_lists[new_sokect.fileno()] = new_sokect
socket_address[new_sokect.fileno()] = new_address
#注冊新的socket也注冊到epoll的事件監(jiān)聽中
epoll.register(new_sokect.fileno(), select.EPOLLIN | select.EPOLLET)
elif event ==select.EPOLLIN:
print("收到數(shù)據(jù)了")
#根據(jù)文件操作符取出對應(yīng)socket
new_sokect = socket_lists[fd]
address = socket_address[fd]
recv_data = new_sokect.recv(1024)
if len(recv_data) > 0:
print("已經(jīng)收到[%s]:%s" % (str(address),recv_data.decode("gb2312")))
else:
#客戶端端口,取消監(jiān)聽
epoll.unregister(fd)
#關(guān)閉鏈接
new_sokect.close()
print("[%s]已經(jīng)下線" % str(address))
#關(guān)閉套接字鏈接
server_socket.close()
if __name__ == "__main__":
main()
單進(jìn)程TCP服務(wù)器 ——
gevent版
gevent原理
greenlet已經(jīng)實(shí)現(xiàn)了協(xié)程,但是這個(gè)還得人工切換,是不是覺得太麻煩了,莫要捉急,python還有一個(gè)比greenlet更強(qiáng)大的并且能夠自動切換任務(wù)的模塊gevent
原理------當(dāng)一個(gè)greenlet遇到IO(指的是input output 輸入輸出,比如網(wǎng)絡(luò)、文件操作等)操作時(shí),比如訪問網(wǎng)絡(luò),就自動切換到其他的greenlet,等到IO操作完成,再在適當(dāng)?shù)臅r(shí)候切換回來繼續(xù)執(zhí)行。
由于IO操作非常耗時(shí),經(jīng)常使程序處于等待狀態(tài),有了gevent為我們自動切換協(xié)程,就保證總有g(shù)reenlet在運(yùn)行,而不是等待IO.
import sys
import time
import gevent
from gevent import socket,monkey
monkey.patch_all()
def handle_request(conn):
while True:
data = conn.recv(1024)
if not data:
conn.close()
break
print("recv:", data)
conn.send(data)
def server(port):
s = socket.socket()
s.bind(('', port))
s.listen(5)
while True:
newSocket, addr = s.accept()
gevent.spawn(handle_request, newSocket)
if __name__ == '__main__':
server(7788)
首先基于以上代碼模塊,撒點(diǎn)概念問題:
1.什么是協(xié)程?
協(xié)程:存在線程中,是比線程更小的執(zhí)行單元,又稱微線程,纖程。自帶cpu上下文,操作協(xié)程由程序員決定,它可以將一個(gè)線程分解為多個(gè)微線程,每個(gè)協(xié)程間共享全局空間的變量,每秒鐘切換頻率高達(dá)百萬次。
2. 什么是計(jì)算密集型和IO密集型
計(jì)算密集型:要進(jìn)行大量的計(jì)算,消耗cpu資源。如復(fù)雜計(jì)算,對視頻進(jìn)行高清解碼等,全靠cpu的運(yùn)算能力。而計(jì)算密集型任務(wù)完成多任務(wù)切換任務(wù)比較耗時(shí),cpu執(zhí)行任務(wù)效率就越低。在python中,多進(jìn)程適合計(jì)算密集型任務(wù)。
IO密集型:涉及到網(wǎng)絡(luò)、磁盤io的任務(wù)都是io密集型。cpu消耗少,計(jì)算量小,如請求網(wǎng)頁,讀寫文件等。在python中,使用sleep達(dá)到IO密集型任務(wù)的目的,多線程適合IO密集型任務(wù)。
各大實(shí)現(xiàn)版本對比:
select:
1)支持跨平臺,最大缺陷是單個(gè)進(jìn)程打開的FD是有限的,由FD_SETSIZE設(shè)置,默認(rèn)是1024;
2)對socket掃描時(shí)是線性掃描,及采用輪詢方式,效率低;
3)需要維護(hù)一個(gè)存放大量FD的數(shù)據(jù)結(jié)構(gòu),使得用戶空間和內(nèi)核空間在傳遞該數(shù)據(jù)結(jié)構(gòu)時(shí)復(fù)制開銷大。
poll:
1)poll與select本質(zhì)上沒有區(qū)別,但poll沒有最大連接數(shù)的限制;
2)大量的fd數(shù)組被整體復(fù)制于用戶態(tài)和內(nèi)核地址空間之間,不管這樣的復(fù)制是不是有意義;
3)‘水平觸發(fā)',如果報(bào)告了fd后,沒有被處理,下次poll時(shí)還會再次報(bào)告該fd。
epoll:
1)是之前poll和select的增強(qiáng)版,epoll更靈活,沒有描述符限制,能打開的fd遠(yuǎn)大于1024(1G的內(nèi)存上能監(jiān)聽約10萬個(gè)端口);
2)‘邊緣出發(fā)',事件通知機(jī)制,效率提升,最大的特點(diǎn)在于它只管你活躍的連接,而跟連接總數(shù)無關(guān)。而epoll對文件描述符的操作模式之一ET是一種高效的工作方式,很大程度減少事件反復(fù)觸發(fā)的次數(shù),內(nèi)核不會發(fā)送更多的通知(only once)。
以上這篇基于并發(fā)服務(wù)器幾種實(shí)現(xiàn)方法(總結(jié))就是小編分享給大家的全部內(nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python數(shù)據(jù)分析之?Matplotlib?散點(diǎn)圖繪制
這篇文章主要介紹了Python數(shù)據(jù)分析之?Matplotlib?散點(diǎn)圖繪制,散點(diǎn)圖又稱散點(diǎn)圖,是使用多個(gè)坐標(biāo)點(diǎn)的分布反映數(shù)據(jù)點(diǎn)分布規(guī)律、數(shù)據(jù)關(guān)聯(lián)關(guān)系的圖表,下文對散點(diǎn)圖的詳細(xì)介紹及繪制,需要的小伙伴可以參考以一下2022-05-05
盤點(diǎn)Python中讀取和提取JSON文件的4種方法
JSON(JavaScript?Object?Notation)是一種輕量級的數(shù)據(jù)交換格式,Python中提供了多種方式來讀取和處理JSON文件,本文將詳細(xì)介紹四種常見的方法,希望對大家有所幫助2024-03-03
Python實(shí)現(xiàn)字符串模糊匹配的兩種實(shí)現(xiàn)方法
本文主要介紹了Python實(shí)現(xiàn)字符串模糊匹配的兩種實(shí)現(xiàn)方法,Python中通過re.search()方法實(shí)現(xiàn),對于首位起始的內(nèi)容匹配,也可通過re.match()方法實(shí)現(xiàn),感興趣的可以了解一下2023-11-11
Python網(wǎng)頁解析利器BeautifulSoup安裝使用介紹
這篇文章主要介紹了Python網(wǎng)頁解析利器BeautifulSoup安裝使用介紹,本文用一個(gè)完整示例一步一步安裝了BeautifulSoup的安裝和使用過程,需要的朋友可以參考下2015-03-03

