深入了解Python并發(fā)編程
并發(fā)方式
線程([Thread])
多線程幾乎是每一個(gè)程序猿在使用每一種語(yǔ)言時(shí)都會(huì)首先想到用于解決并發(fā)的工具(JS程序員請(qǐng)回避),使用多線程可以有效的利用CPU資源(Python例外)。然而多線程所帶來(lái)的程序的復(fù)雜度也不可避免,尤其是對(duì)競(jìng)爭(zhēng)資源的同步問題。
然而在python中由于使用了全局解釋鎖(GIL)的原因,代碼并不能同時(shí)在多核上并發(fā)的運(yùn)行,也就是說(shuō),Python的多線程不能并發(fā),很多人會(huì)發(fā)現(xiàn)使用多線程來(lái)改進(jìn)自己的Python代碼后,程序的運(yùn)行效率卻下降了,這是多么蛋疼的一件事呀!實(shí)際上使用多線程的編程模型是很困難的,程序員很容易犯錯(cuò),這并不是程序員的錯(cuò)誤,因?yàn)椴⑿兴季S是反人類的,我們大多數(shù)人的思維是串行(精神分裂不討論),而且馮諾依曼設(shè)計(jì)的計(jì)算機(jī)架構(gòu)也是以順序執(zhí)行為基礎(chǔ)的。所以如果你總是不能把你的多線程程序搞定,恭喜你,你是個(gè)思維正常的程序猿:)
Python提供兩組線程的接口,一組是thread模塊,提供基礎(chǔ)的,低等級(jí)(Low Level)接口,使用Function作為線程的運(yùn)行體。還有一組是threading模塊,提供更容易使用的基于對(duì)象的接口(類似于Java),可以繼承Thread對(duì)象來(lái)實(shí)現(xiàn)線程,還提供了其它一些線程相關(guān)的對(duì)象,例如Timer,Lock
使用thread模塊的例子
import thread def worker(): """thread worker function""" print 'Worker' thread.start_new_thread(worker)
使用threading模塊的例子
import threading def worker(): """thread worker function""" print 'Worker' t = threading.Thread(target=worker) t.start()
或者Java Style
import threading class worker(threading.Thread): def __init__(self): pass def run(): """thread worker function""" print 'Worker' t = worker() t.start()
進(jìn)程 (Process)
由于前文提到的全局解釋鎖的問題,Python下比較好的并行方式是使用多進(jìn)程,這樣可以非常有效的使用CPU資源,并實(shí)現(xiàn)真正意義上的并發(fā)。當(dāng)然,進(jìn)程的開銷比線程要大,也就是說(shuō)如果你要?jiǎng)?chuàng)建數(shù)量驚人的并發(fā)進(jìn)程的話,需要考慮一下你的機(jī)器是不是有一顆強(qiáng)大的心。
Python的mutliprocess模塊和threading具有類似的接口。
from multiprocessing import Process def worker(): """thread worker function""" print 'Worker' p = Process(target=worker) p.start() p.join()
由于線程共享相同的地址空間和內(nèi)存,所以線程之間的通信是非常容易的,然而進(jìn)程之間的通信就要復(fù)雜一些了。常見的進(jìn)程間通信有,管道,消息隊(duì)列,Socket接口(TCP/IP)等等。
Python的mutliprocess模塊提供了封裝好的管道和隊(duì)列,可以方便的在進(jìn)程間傳遞消息。
Python進(jìn)程間的同步使用鎖,這一點(diǎn)喝線程是一樣的。
另外,Python還提供了進(jìn)程池Pool對(duì)象,可以方便的管理和控制線程。
遠(yuǎn)程分布式主機(jī) (Distributed Node)
隨著大數(shù)據(jù)時(shí)代的到臨,摩爾定理在單機(jī)上似乎已經(jīng)失去了效果,數(shù)據(jù)的計(jì)算和處理需要分布式的計(jì)算機(jī)網(wǎng)絡(luò)來(lái)運(yùn)行,程序并行的運(yùn)行在多個(gè)主機(jī)節(jié)點(diǎn)上,已經(jīng)是現(xiàn)在的軟件架構(gòu)所必需考慮的問題。
遠(yuǎn)程主機(jī)間的進(jìn)程間通信有幾種常見的方式
- TCP/IP
TCP/IP是所有遠(yuǎn)程通信的基礎(chǔ),然而API比較低級(jí)別,使用起來(lái)比較繁瑣,所以一般不會(huì)考慮
- 遠(yuǎn)程方法調(diào)用 Remote Function Call
[RPC]
- 遠(yuǎn)程對(duì)象 Remote Object
遠(yuǎn)程對(duì)象是更高級(jí)別的封裝,程序可以想操作本地對(duì)象一樣去操作一個(gè)遠(yuǎn)程對(duì)象在本地的代理。遠(yuǎn)程對(duì)象最廣為使用的規(guī)范CORBA,CORBA最大的好處是可以在不同語(yǔ)言和平臺(tái)中進(jìn)行通信。當(dāng)讓不用的語(yǔ)言和平臺(tái)還有一些各自的遠(yuǎn)程對(duì)象實(shí)現(xiàn),例如Java的RMI,MS的DCOM
Python的開源實(shí)現(xiàn),有許多對(duì)遠(yuǎn)程對(duì)象的支持
- Dopy]
- Fnorb (CORBA)
- ICE
- omniORB (CORBA)
- Pyro
- YAMI
- 消息隊(duì)列 Message Queue
比起RPC或者遠(yuǎn)程對(duì)象,消息是一種更為靈活的通信手段,常見的支持Python接口的消息機(jī)制有
- RabbitMQ
- ZeroMQ
- Kafka
- AWS SQS + BOTO
在遠(yuǎn)程主機(jī)上執(zhí)行并發(fā)和本地的多進(jìn)程并沒有非常大的差異,都需要解決進(jìn)程間通信的問題。當(dāng)然對(duì)遠(yuǎn)程進(jìn)程的管理和協(xié)調(diào)比起本地要復(fù)雜。
Python下有許多開源的框架來(lái)支持分布式的并發(fā),提供有效的管理手段包括:
- Celery
Celery是一個(gè)非常成熟的Python分布式框架,可以在分布式的系統(tǒng)中,異步的執(zhí)行任務(wù),并提供有效的管理和調(diào)度功能。
- SCOOP
SCOOP (Scalable COncurrent Operations in Python)提供簡(jiǎn)單易用的分布式調(diào)用接口,使用Future接口來(lái)進(jìn)行并發(fā)。
- Dispy
相比起Celery和SCOOP,Dispy提供更為輕量級(jí)的分布式并行服務(wù)
- PP
PP (Parallel Python)是另外一個(gè)輕量級(jí)的Python并行服務(wù)
- Asyncoro
Asyncoro是另一個(gè)利用Generator實(shí)現(xiàn)分布式并發(fā)的Python框架,
當(dāng)然還有許多其它的系統(tǒng),我沒有一一列出
另外,許多的分布式系統(tǒng)多提供了對(duì)Python接口的支持,例如Spark
偽線程 (Pseudo-Thread)
還有一種并發(fā)手段并不常見,我們可以稱之為偽線程,就是看上去像是線程,使用的接口類似線程接口,但是實(shí)際使用非線程的方式,對(duì)應(yīng)的線程開銷也不存的。
- greenlet
greenlet提供輕量級(jí)的coroutines來(lái)支持進(jìn)程內(nèi)的并發(fā)。
greenlet是Stackless的一個(gè)副產(chǎn)品,使用tasklet來(lái)支持一中被稱之為微線程(mirco-thread)的技術(shù),這里是一個(gè)使用greenlet的偽線程的例子
from greenlet import greenlet def test1(): print 12 gr2.switch() print 34 def test2(): print 56 gr1.switch() print 78 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
運(yùn)行以上程序得到如下結(jié)果:
12
56
34
偽線程gr1 switch會(huì)打印12,然后調(diào)用gr2 switch得到56,然后switch回到gr1,打印34,然后偽線程gr1結(jié)束,程序退出,所以78永遠(yuǎn)不會(huì)被打印。通過(guò)這個(gè)例子我們可以看出,使用偽線程,我們可以有效的控制程序的執(zhí)行流程,但是偽線程并不存在真正意義上的并發(fā)。
eventlet,gevent和concurence都是基于greenlet提供并發(fā)的。
eventlet是一個(gè)提供網(wǎng)絡(luò)調(diào)用并發(fā)的Python庫(kù),使用者可以以非阻塞的方式來(lái)調(diào)用阻塞的IO操作。
import eventlet from eventlet.green import urllib2 urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org'] def fetch(url): return urllib2.urlopen(url).read() pool = eventlet.GreenPool() for body in pool.imap(fetch, urls): print("got body", len(body))
執(zhí)行結(jié)果如下
('got body', 17629)
('got body', 1270)
('got body', 46949)
eventlet為了支持generator的操作對(duì)urllib2做了修改,接口和urllib2是一致的。這里的GreenPool和Python的Pool接口一致。
- gevent
gevent和eventlet類似,
import gevent from gevent import socket urls = ['www.google.com', 'www.example.com', 'www.python.org'] jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls] gevent.joinall(jobs, timeout=2) print [job.value for job in jobs]
執(zhí)行結(jié)果如下:
['206.169.145.226', '93.184.216.34', '23.235.39.223']
concurence是另外一個(gè)利用greenlet提供網(wǎng)絡(luò)并發(fā)的開源庫(kù),我沒有用過(guò),大家可以自己嘗試一下。
實(shí)戰(zhàn)運(yùn)用
通常需要用到并發(fā)的場(chǎng)合有兩種,一種是計(jì)算密集型,也就是說(shuō)你的程序需要大量的CPU資源;另一種是IO密集型,程序可能有大量的讀寫操作,包括讀寫文件,收發(fā)網(wǎng)絡(luò)請(qǐng)求等等。
計(jì)算密集型
對(duì)應(yīng)計(jì)算密集型的應(yīng)用,我們選用著名的蒙特卡洛算法來(lái)計(jì)算PI值?;驹砣缦?/p>
蒙特卡洛算法利用統(tǒng)計(jì)學(xué)原理來(lái)模擬計(jì)算圓周率,在一個(gè)正方形中,一個(gè)隨機(jī)的點(diǎn)落在1/4圓的區(qū)域(紅色點(diǎn))的概率與其面積成正比。也就該概率 p = Pi * R*R /4 : R* R , 其中R是正方形的邊長(zhǎng),圓的半徑。也就是說(shuō)該概率是圓周率的1/4, 利用這個(gè)結(jié)論,只要我們模擬出點(diǎn)落在四分之一圓上的概率就可以知道圓周率了,為了得到這個(gè)概率,我們可以通過(guò)大量的實(shí)驗(yàn),也就是生成大量的點(diǎn),看看這個(gè)點(diǎn)在哪個(gè)區(qū)域,然后統(tǒng)計(jì)出結(jié)果。
基本算法如下:
from math import hypot from random import random def test(tries): return sum(hypot(random(), random()) < 1 for _ in range(tries))
這里test方法做了n(tries)次試驗(yàn),返回落在四分之一圓中的點(diǎn)的個(gè)數(shù)。判斷方法是檢查該點(diǎn)到圓心的距離,如果小于R則是在圓上。
通過(guò)大量的并發(fā),我們可以快速的運(yùn)行多次試驗(yàn),試驗(yàn)的次數(shù)越多,結(jié)果越接近真實(shí)的圓周率。
這里給出不同并發(fā)方法的程序代碼
- 非并發(fā)
我們先在單線程,但進(jìn)程運(yùn)行,看看性能如何
from math import hypot from random import random import eventlet import time def test(tries): return sum(hypot(random(), random()) < 1 for _ in range(tries)) def calcPi(nbFutures, tries): ts = time.time() result = map(test, [tries] * nbFutures) ret = 4. * sum(result) / float(nbFutures * tries) span = time.time() - ts print "time spend ", span return ret print calcPi(3000,4000)
- 多線程 thread
為了使用線程池,我們用multiprocessing的dummy包,它是對(duì)多線程的一個(gè)封裝。注意這里代碼雖然一個(gè)字的沒有提到線程,但它千真萬(wàn)確是多線程。
通過(guò)測(cè)試我們開(jing)心(ya)的發(fā)現(xiàn),果然不出所料,當(dāng)線程池為1是,它的運(yùn)行結(jié)果和沒有并發(fā)時(shí)一樣,當(dāng)我們把線程池?cái)?shù)字設(shè)置為5時(shí),耗時(shí)幾乎是沒有并發(fā)的2倍,我的測(cè)試數(shù)據(jù)從5秒到9秒。所以對(duì)于計(jì)算密集型的任務(wù),還是放棄多線程吧。
from multiprocessing.dummy import Pool from math import hypot from random import random import time def test(tries): return sum(hypot(random(), random()) < 1 for _ in range(tries)) def calcPi(nbFutures, tries): ts = time.time() p = Pool(1) result = p.map(test, [tries] * nbFutures) ret = 4. * sum(result) / float(nbFutures * tries) span = time.time() - ts print "time spend ", span return ret if __name__ == '__main__': p = Pool() print("pi = {}".format(calcPi(3000, 4000)))
- 多進(jìn)程 multiprocess
理論上對(duì)于計(jì)算密集型的任務(wù),使用多進(jìn)程并發(fā)比較合適,在以下的例子中,進(jìn)程池的規(guī)模設(shè)置為5,修改進(jìn)程池的大小可以看到對(duì)結(jié)果的影響,當(dāng)進(jìn)程池設(shè)置為1時(shí),和多線程的結(jié)果所需的時(shí)間類似,因?yàn)檫@時(shí)候并不存在并發(fā);當(dāng)設(shè)置為2時(shí),響應(yīng)時(shí)間有了明顯的改進(jìn),是之前沒有并發(fā)的一半;然而繼續(xù)擴(kuò)大進(jìn)程池對(duì)性能影響并不大,甚至有所下降,也許我的Apple Air的CPU只有兩個(gè)核?
當(dāng)心,如果你設(shè)置一個(gè)非常大的進(jìn)程池,你會(huì)遇到 Resource temporarily unavailable的錯(cuò)誤,系統(tǒng)并不能支持創(chuàng)建太多的進(jìn)程,畢竟資源是有限的。
from multiprocessing import Pool from math import hypot from random import random import time def test(tries): return sum(hypot(random(), random()) < 1 for _ in range(tries)) def calcPi(nbFutures, tries): ts = time.time() p = Pool(5) result = p.map(test, [tries] * nbFutures) ret = 4. * sum(result) / float(nbFutures * tries) span = time.time() - ts print "time spend ", span return ret if __name__ == '__main__': print("pi = {}".format(calcPi(3000, 4000)))
- gevent (偽線程)
不論是gevent還是eventlet,因?yàn)椴淮嬖趯?shí)際的并發(fā),響應(yīng)時(shí)間和沒有并發(fā)區(qū)別不大,這個(gè)和測(cè)試結(jié)果一致。
import gevent from math import hypot from random import random import time def test(tries): return sum(hypot(random(), random()) < 1 for _ in range(tries)) def calcPi(nbFutures, tries): ts = time.time() jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures] gevent.joinall(jobs, timeout=2) ret = 4. * sum([job.value for job in jobs]) / float(nbFutures * tries) span = time.time() - ts print "time spend ", span return ret print calcPi(3000,4000)
- eventlet (偽線程)
from math import hypot from random import random import eventlet import time def test(tries): return sum(hypot(random(), random()) < 1 for _ in range(tries)) def calcPi(nbFutures, tries): ts = time.time() pool = eventlet.GreenPool() result = pool.imap(test, [tries] * nbFutures) ret = 4. * sum(result) / float(nbFutures * tries) span = time.time() - ts print "time spend ", span return ret print calcPi(3000,4000)
- SCOOP
SCOOP中的Future接口符合PEP-3148的定義,也就是在Python3中提供的Future接口。
在缺省的SCOOP配置環(huán)境下(單機(jī),4個(gè)Worker),并發(fā)的性能有提高,但是不如兩個(gè)進(jìn)程池配置的多進(jìn)程。
from math import hypot from random import random from scoop import futures import time def test(tries): return sum(hypot(random(), random()) < 1 for _ in range(tries)) def calcPi(nbFutures, tries): ts = time.time() expr = futures.map(test, [tries] * nbFutures) ret = 4. * sum(expr) / float(nbFutures * tries) span = time.time() - ts print "time spend ", span return ret if __name__ == "__main__": print("pi = {}".format(calcPi(3000, 4000)))
- Celery
任務(wù)代碼
from celery import Celery from math import hypot from random import random app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//') app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite' @app.task def test(tries): return sum(hypot(random(), random()) < 1 for _ in range(tries))
客戶端代碼
from celery import group from tasks import test import time def calcPi(nbFutures, tries): ts = time.time() result = group(test.s(tries) for i in xrange(nbFutures))().get() ret = 4. * sum(result) / float(nbFutures * tries) span = time.time() - ts print "time spend ", span return ret print calcPi(3000, 4000)
使用Celery做并發(fā)的測(cè)試結(jié)果出乎意料(環(huán)境是單機(jī),4frefork的并發(fā),消息broker是rabbitMQ),是所有測(cè)試用例里最糟糕的,響應(yīng)時(shí)間是沒有并發(fā)的5~6倍。這也許是因?yàn)榭刂茀f(xié)調(diào)的開銷太大。對(duì)于這樣的計(jì)算任務(wù),Celery也許不是一個(gè)好的選擇。
- asyncoro
Asyncoro的測(cè)試結(jié)果和非并發(fā)保持一致。
import asyncoro from math import hypot from random import random import time def test(tries): yield sum(hypot(random(), random()) < 1 for _ in range(tries)) def calcPi(nbFutures, tries): ts = time.time() coros = [ asyncoro.Coro(test,t) for t in [tries] * nbFutures] ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * tries) span = time.time() - ts print "time spend ", span return ret print calcPi(3000,4000)
IO密集型
IO密集型的任務(wù)是另一種常見的用例,例如網(wǎng)絡(luò)WEB服務(wù)器就是一個(gè)例子,每秒鐘能處理多少個(gè)請(qǐng)求時(shí)WEB服務(wù)器的重要指標(biāo)。
我們就以網(wǎng)頁(yè)讀取作為最簡(jiǎn)單的例子
from math import hypot import time import urllib2 urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org'] def test(url): return urllib2.urlopen(url).read() def testIO(nbFutures): ts = time.time() map(test, urls * nbFutures) span = time.time() - ts print "time spend ", span testIO(10)
在不同并發(fā)庫(kù)下的代碼,由于比較類似,我就不一一列出。大家可以參考計(jì)算密集型中代碼做參考。
通過(guò)測(cè)試我們可以發(fā)現(xiàn),對(duì)于IO密集型的任務(wù),使用多線程,或者是多進(jìn)程都可以有效的提高程序的效率,而使用偽線程性能提升非常顯著,eventlet比沒有并發(fā)的情況下,響應(yīng)時(shí)間從9秒提高到0.03秒。同時(shí)eventlet/gevent提供了非阻塞的異步調(diào)用模式,非常方便。這里推薦使用線程或者偽線程,因?yàn)樵陧憫?yīng)時(shí)間類似的情況下,線程和偽線程消耗的資源更少。
總結(jié)
Python提供了不同的并發(fā)方式,對(duì)應(yīng)于不同的場(chǎng)景,我們需要選擇不同的方式進(jìn)行并發(fā)。選擇合適的方式,不但要對(duì)該方法的原理有所了解,還應(yīng)該做一些測(cè)試和試驗(yàn),數(shù)據(jù)才是你做選擇的最好參考。
以上就是深入了解Python并發(fā)編程的詳細(xì)內(nèi)容,更多關(guān)于Python并發(fā)編程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python實(shí)現(xiàn)完全數(shù)的示例詳解
完全數(shù),又稱完美數(shù),定義為:這個(gè)數(shù)的所有因數(shù)(不包括這個(gè)數(shù)本身)加起來(lái)剛好等于這個(gè)數(shù)。本文就來(lái)用Python實(shí)現(xiàn)計(jì)算完全數(shù),需要的可以參考一下2023-01-01Python腳本實(shí)現(xiàn)監(jiān)聽服務(wù)器的思路代碼詳解
這篇文章主要介紹了Python腳本實(shí)現(xiàn)監(jiān)聽服務(wù)器的思路,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05python利用pandas分析學(xué)生期末成績(jī)實(shí)例代碼
pandas是數(shù)據(jù)分析師最常用的工具之一,這篇文章主要給大家介紹了關(guān)于python如何利用pandas分析學(xué)生期末成績(jī)的相關(guān)資料,文中給出了詳細(xì)的實(shí)現(xiàn)方法,需要的朋友可以參考下2021-07-07python實(shí)現(xiàn)Dijkstra靜態(tài)尋路算法
這篇文章主要介紹了python實(shí)現(xiàn)Dijkstra靜態(tài)尋路算法,常用于路由算法或者作為其他圖算法的一個(gè)子模塊,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-01-0115款Python編輯器的優(yōu)缺點(diǎn),別再問我“選什么編輯器”啦
這篇文章主要介紹了15款Python編輯器的優(yōu)缺點(diǎn),別再問我“選什么編輯器”啦,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2020-10-10