欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python多進(jìn)程并發(fā)與多線(xiàn)程并發(fā)編程實(shí)例總結(jié)

 更新時(shí)間:2018年02月08日 13:58:29   作者:cskchenshengkun  
這篇文章主要介紹了Python多進(jìn)程并發(fā)與多線(xiàn)程并發(fā)編程,結(jié)合實(shí)例形式總結(jié)分析了Python編程中的多進(jìn)程并發(fā)與多線(xiàn)程并發(fā)相關(guān)概念、使用方法與操作注意事項(xiàng),需要的朋友可以參考下

本文實(shí)例總結(jié)了Python多進(jìn)程并發(fā)與多線(xiàn)程并發(fā)。分享給大家供大家參考,具體如下:

這里對(duì)python支持的幾種并發(fā)方式進(jìn)行簡(jiǎn)單的總結(jié)。

Python支持的并發(fā)分為多線(xiàn)程并發(fā)與多進(jìn)程并發(fā)(異步IO本文不涉及)。概念上來(lái)說(shuō),多進(jìn)程并發(fā)即運(yùn)行多個(gè)獨(dú)立的程序,優(yōu)勢(shì)在于并發(fā)處理的任務(wù)都由操作系統(tǒng)管理,不足之處在于程序與各進(jìn)程之間的通信和數(shù)據(jù)共享不方便;多線(xiàn)程并發(fā)則由程序員管理并發(fā)處理的任務(wù),這種并發(fā)方式可以方便地在線(xiàn)程間共享數(shù)據(jù)(前提是不能互斥)。Python對(duì)多線(xiàn)程和多進(jìn)程的支持都比一般編程語(yǔ)言更高級(jí),最小化了需要我們完成的工作。

一.多進(jìn)程并發(fā)

Mark Summerfield指出,對(duì)于計(jì)算密集型程序,多進(jìn)程并發(fā)優(yōu)于多線(xiàn)程并發(fā)。計(jì)算密集型程序指的程序的運(yùn)行時(shí)間大部分消耗在CPU的運(yùn)算處理過(guò)程,而硬盤(pán)和內(nèi)存的讀寫(xiě)消耗的時(shí)間很短;相對(duì)地,IO密集型程序指的則是程序的運(yùn)行時(shí)間大部分消耗在硬盤(pán)和內(nèi)存的讀寫(xiě)上,CPU的運(yùn)算時(shí)間很短。

對(duì)于多進(jìn)程并發(fā),python支持兩種實(shí)現(xiàn)方式,一種是采用進(jìn)程安全的數(shù)據(jù)結(jié)構(gòu):multiprocessing.JoinableQueue,這種數(shù)據(jù)結(jié)構(gòu)自己管理“加鎖”的過(guò)程,程序員無(wú)需擔(dān)心“死鎖”的問(wèn)題;python還提供了一種更為優(yōu)雅而高級(jí)的實(shí)現(xiàn)方式:采用進(jìn)程池。下面一一介紹。

1.隊(duì)列實(shí)現(xiàn)——使用multiprocessing.JoinableQueue

multiprocessing是python標(biāo)準(zhǔn)庫(kù)中支持多進(jìn)程并發(fā)的模塊,我們這里采用multiprocessing中的數(shù)據(jù)結(jié)構(gòu):JoinableQueue,它本質(zhì)上仍是一個(gè)FIFO的隊(duì)列,它與一般隊(duì)列(如queue中的Queue)的區(qū)別在于它是多進(jìn)程安全的,這意味著我們不用擔(dān)心它的互斥和死鎖問(wèn)題。JoinableQueue主要可以用來(lái)存放執(zhí)行的任務(wù)和收集任務(wù)的執(zhí)行結(jié)果。舉例來(lái)看(以下皆省去導(dǎo)入包的過(guò)程):

def read(q):
  while True:
    try:
      value = q.get()
      print('Get %s from queue.' % value)
      time.sleep(random.random())
    finally:
      q.task_done()
def main():
  q = multiprocessing.JoinableQueue()
  pw1 = multiprocessing.Process(target=read, args=(q,))
  pw2 = multiprocessing.Process(target=read, args=(q,))
  pw1.daemon = True
  pw2.daemon = True
  pw1.start()
  pw2.start()
  for c in [chr(ord('A')+i) for i in range(26)]:
    q.put(c)
  try:
    q.join()
  except KeyboardInterrupt:
    print("stopped by hand")
if __name__ == '__main__':
  main()

對(duì)于windows系統(tǒng)的多進(jìn)程并發(fā),程序文件里必須含有“入口函數(shù)”(如main函數(shù)),且結(jié)尾處必須調(diào)用入口點(diǎn)。例如以if __name__ == '__main__': main()結(jié)尾。

在這個(gè)最簡(jiǎn)單的多進(jìn)程并發(fā)例子里,我們用多進(jìn)程實(shí)現(xiàn)將26個(gè)字母打印出來(lái)。首先定義一個(gè)存放任務(wù)的JoinableQueue對(duì)象,然后實(shí)例化兩個(gè)Process對(duì)象(每個(gè)對(duì)象對(duì)應(yīng)一個(gè)子進(jìn)程),實(shí)例化Process對(duì)象需要傳送target和args參數(shù),target是實(shí)現(xiàn)每個(gè)任務(wù)工作中的具體函數(shù),args是target函數(shù)的參數(shù)。

pw1.daemon = True
pw2.daemon = True

這兩句話(huà)將子進(jìn)程設(shè)置為守護(hù)進(jìn)程——主進(jìn)程結(jié)束后隨之結(jié)束。

pw1.start()
pw2.start()

一旦運(yùn)行到這兩句話(huà),子進(jìn)程就開(kāi)始獨(dú)立于父進(jìn)程運(yùn)行了,它會(huì)在單獨(dú)的進(jìn)程里調(diào)用target引用的函數(shù)——在這里即read函數(shù),它是一個(gè)死循環(huán),將參數(shù)q中的數(shù)一一讀取并打印出來(lái)。

value = q.get()

這是多進(jìn)程并發(fā)的要點(diǎn),q是一個(gè)JoinableQueue對(duì)象,支持get方法讀取第一個(gè)元素,如果q中沒(méi)有元素,進(jìn)程就會(huì)阻塞,直至q中被存入新元素。

因此執(zhí)行完pw1.start() pw2.start()這兩句話(huà)后,子進(jìn)程雖然開(kāi)始運(yùn)行了,但很快就堵塞住。

for c in [chr(ord('A')+i) for i in range(26)]:
    q.put(c)

將26個(gè)字母依次放入JoinableQueue對(duì)象中,這時(shí)候兩個(gè)子進(jìn)程不再阻塞,開(kāi)始真正地執(zhí)行任務(wù)。兩個(gè)子進(jìn)程都用value = q.get()來(lái)讀取數(shù)據(jù),它們都在修改q對(duì)象,而我們并不用擔(dān)心同步問(wèn)題,這就是multiProcessing.Joinable數(shù)據(jù)結(jié)構(gòu)的優(yōu)勢(shì)所在——它是多進(jìn)程安全的,它會(huì)自動(dòng)處理“加鎖”的過(guò)程。

try:
    q.join()

q.join()方法會(huì)查詢(xún)q中的數(shù)據(jù)是否已讀完——這里指的就是任務(wù)是否執(zhí)行完,如果沒(méi)有,程序會(huì)阻塞住等待q中數(shù)據(jù)讀完才開(kāi)始繼續(xù)執(zhí)行(可以用Ctrl+C強(qiáng)制停止)。

對(duì)Windows系統(tǒng),調(diào)用任務(wù)管理器應(yīng)該可以看到有多個(gè)子進(jìn)程在運(yùn)行。

2.進(jìn)程池實(shí)現(xiàn)——使用concurrent.futures.ProcessPoolExecutor

Python還支持一種更為優(yōu)雅的多進(jìn)程并發(fā)方式,直接看例子:

def read(q):
    print('Get %s from queue.' % q)
    time.sleep(random.random())
def main():
  futures = set()
  with concurrent.futures.ProcessPoolExecutor() as executor:
    for q in (chr(ord('A')+i) for i in range(26)):
      future = executor.submit(read, q)
      futures.add(future)
  try:
    for future in concurrent.futures.as_completed(futures):
      err = future.exception()
      if err is not None:
        raise err
  except KeyboardInterrupt:
    print("stopped by hand")
if __name__ == '__main__':
  main()

這里我們采用concurrent.futures.ProcessPoolExecutor對(duì)象,可以把它想象成一個(gè)進(jìn)程池,子進(jìn)程往里“填”。我們通過(guò)submit方法實(shí)例一個(gè)Future對(duì)象,然后把這里Future對(duì)象都填到池——futures里,這里futures是一個(gè)set對(duì)象。只要進(jìn)程池里有future,就會(huì)開(kāi)始執(zhí)行任務(wù)。這里的read函數(shù)更為簡(jiǎn)單——只是把一個(gè)字符打印并休眠一會(huì)而已。

try:
    for future in concurrent.futures.as_completed(futures):

這是等待所有子進(jìn)程都執(zhí)行完畢。子進(jìn)程執(zhí)行過(guò)程中可能拋出異常,err = future.exception()可以收集這些異常,便于后期處理。

可以看出用Future對(duì)象處理多進(jìn)程并發(fā)更為簡(jiǎn)潔,無(wú)論是target函數(shù)的編寫(xiě)、子進(jìn)程的啟動(dòng)等等,future對(duì)象還可以向使用者匯報(bào)其狀態(tài),也可以匯報(bào)執(zhí)行結(jié)果或執(zhí)行時(shí)的異常。

二.多線(xiàn)程并發(fā)

對(duì)于IO密集型程序,多線(xiàn)程并發(fā)可能要優(yōu)于多進(jìn)程并發(fā)。因?yàn)閷?duì)于網(wǎng)絡(luò)通信等IO密集型任務(wù)來(lái)說(shuō),決定程序效率的主要是網(wǎng)絡(luò)延遲,這時(shí)候是使用進(jìn)程還是線(xiàn)程就沒(méi)有太大關(guān)系了。

1.隊(duì)列實(shí)現(xiàn)——使用queue.Queue

程序與多進(jìn)程基本一致,只是這里我們不必使用multiProcessing.JoinableQueue對(duì)象了,一般的隊(duì)列(來(lái)自queue.Queue)就可以滿(mǎn)足要求:

def read(q):
  while True:
    try:
      value = q.get()
      print('Get %s from queue.' % value)
      time.sleep(random.random())
    finally:
      q.task_done()
def main():
  q = queue.Queue()
  pw1 = threading.Thread(target=read, args=(q,))
  pw2 = threading.Thread(target=read, args=(q,))
  pw1.daemon = True
  pw2.daemon = True
  pw1.start()
  pw2.start()
  for c in [chr(ord('A')+i) for i in range(26)]:
    q.put(c)
  try:
    q.join()
  except KeyboardInterrupt:
    print("stopped by hand")
if __name__ == '__main__':
  main()

并且這里我們實(shí)例化的是Thread對(duì)象,而不是Process對(duì)象,程序的其余部分看起來(lái)與多進(jìn)程并沒(méi)有什么兩樣。

2. 線(xiàn)程池實(shí)現(xiàn)——使用concurrent.futures.ThreadPoolExecutor

直接看例子:

def read(q):
    print('Get %s from queue.' % q)
    time.sleep(random.random())
def main():
  futures = set()
  with concurrent.futures.ThreadPoolExecutor(multiprocessing.cpu_count()*4) as executor:
    for q in (chr(ord('A')+i) for i in range(26)):
      future = executor.submit(read, q)
      futures.add(future)
  try:
    for future in concurrent.futures.as_completed(futures):
      err = future.exception()
      if err is not None:
        raise err
  except KeyboardInterrupt:
    print("stopped by hand")
if __name__ == '__main__':
  main()

用ThreadPoolExecutor與用ProcessPoolExecutor看起來(lái)沒(méi)什么區(qū)別,只是改了一下簽名而已。

不難看出,不管是使用隊(duì)列還是使用進(jìn)/線(xiàn)程池,從多進(jìn)程轉(zhuǎn)化到多線(xiàn)程是十分容易的——僅僅是修改了幾個(gè)簽名而已。當(dāng)然內(nèi)部機(jī)制完全不同,只是python的封裝非常好,使我們可以不用關(guān)心這些細(xì)節(jié),這正是python優(yōu)雅之處。

更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專(zhuān)題:《Python進(jìn)程與線(xiàn)程操作技巧總結(jié)》、《Python Socket編程技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門(mén)與進(jìn)階經(jīng)典教程》及《Python文件與目錄操作技巧匯總

希望本文所述對(duì)大家Python程序設(shè)計(jì)有所幫助。

相關(guān)文章

最新評(píng)論