Python實現(xiàn)cpu并行運算的兩種方式
Python一共有兩種并行方式
1. 使用multiprocessing
第一種方式用于單個節(jié)點內(nèi)部的并行,也就是說同時發(fā)起的進程數(shù)不能超過你單個機器CPU的線程數(shù)。
以下是第一種方式的并行程序:
import multiprocessing import time import os import numpy as np ncore=20 def run(core): Your code reture 0 if __name__ == '__main__': print(time.strftime('%Y-%m-%d %H:%M:%S')) param = np.arange(20) p = multiprocessing.Pool(ncore) p.map(run, param) p.close() p.join() print(time.strftime('%Y-%m-%d %H:%M:%S'))
提交腳本直接:
python your_job_name.py
2. 使用mpi4py
第二種方式用于跨節(jié)點的并行,可以發(fā)起成千上百個CPU的并行。
以下是第二中方式的并行程序:
from mpi4py import MPI import time import os import numpy as np ncore=20 def run(core): Your code reture 0 if __name__ == '__main__': print(time.strftime('%Y-%m-%d %H:%M:%S')) comm = MPI.COMM_WORLD rank = comm.Get_rank() run(rank) print(time.strftime('%Y-%m-%d %H:%M:%S'))
提交腳本需要用到mpi
mpiexec -n cpu_number python your_job_name.py
知識拓展:python多進程模式實現(xiàn)多核CPU的并行計算
Python中的多進程模式
在Python中,可以使用multiprocessing模塊來實現(xiàn)多進程。multiprocessing是Python標準庫中的一個模塊,用于管理多進程的創(chuàng)建和通信。
在multiprocessing中,可以使用Process類來創(chuàng)建進程,Process類的構(gòu)造函數(shù)可以接受一個函數(shù)作為參數(shù)。
該函數(shù)將在子進程中執(zhí)行。下面是一個簡單的示例:
import multiprocessing def worker(): print("Worker process started") if __name__ == '__main__': p = multiprocessing.Process(target=worker) p.start() p.join()
在上面的示例中,我們首先定義了一個worker函數(shù),然后使用Process類創(chuàng)建了一個進程,并將worker函數(shù)作為參數(shù)傳遞給Process類的構(gòu)造函數(shù)。
最后,我們調(diào)用Process類的start方法啟動進程,并調(diào)用Process類的join方法等待進程結(jié)束。
- 提高程序執(zhí)行效率的方法
在Python中使用多進程模式提高程序執(zhí)行效率,可以通過以下幾種方式來實現(xiàn):
- 1 多進程并發(fā)執(zhí)行任務(wù)
在多進程模式下,可以將任務(wù)分配給多個進程并行執(zhí)行,從而利用多核CPU的優(yōu)勢。
在Python中,可以使用multiprocessing模塊來實現(xiàn)多進程并發(fā)執(zhí)行任務(wù)。
下面是一個簡單的示例:
import multiprocessing def worker(name): print("Worker %s started" % name) if __name__ == '__main__': for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) p.start()
在上面的示例中,我們定義了一個worker函數(shù),該函數(shù)接受一個參數(shù)name,并在函數(shù)體中打印出Worker name started的信息。
然后我們使用for循環(huán)創(chuàng)建了5個進程,并將worker函數(shù)和對應(yīng)的參數(shù)傳遞給Process類的構(gòu)造函數(shù)。
最后,我們調(diào)用Process類的start方法啟動進程。
- 2 進程池
對于大量重復(fù)的任務(wù),可以使用進程池來維護一定數(shù)量的進程,每個進程執(zhí)行一個任務(wù)后返回結(jié)果,然后再由進程池分配下一個任務(wù)。
這樣可以避免頻繁地創(chuàng)建和銷毀進程,提高效率。在Python中,可以使用multiprocessing模塊的Pool類來實現(xiàn)進程池。
下面是一個簡單的示例:
import multiprocessing def worker(name): print("Worker %s started" % name) if __name__ == '__main__': with multiprocessing.Pool(processes=4) as pool: pool.map(worker, range(10))
在上面的示例中,我們定義了一個worker函數(shù),該函數(shù)接受一個參數(shù)name,并在函數(shù)體中打印出Worker name started的信息。
然后我們使用with語句創(chuàng)建了一個進程池,并指定進程池中的進程數(shù)量為4。
最后,我們使用Pool類的map方法將worker函數(shù)和對應(yīng)的參數(shù)傳遞給進程池,進程池會自動分配任務(wù)給不同的進程執(zhí)行。
- 3 消息隊列
在多進程模式下,不同的進程之間需要進行通信,可以利用消息隊列來實現(xiàn)進程間通信。
Python中可以使用Queue模塊來實現(xiàn)消息隊列。下面是一個簡單的示例:
import multiprocessing def producer(queue): for i in range(10): queue.put(i) def consumer(queue): while not queue.empty(): print(queue.get()) if __name__ == '__main__': queue = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(queue,)) p2 = multiprocessing.Process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join()
在上面的示例中,我們定義了一個producer函數(shù)和一個consumer函數(shù),producer函數(shù)將0~9的數(shù)字放入消息隊列,consumer函數(shù)從消息隊列中取出數(shù)字并打印出來。
然后我們使用multiprocessing模塊的Queue類創(chuàng)建了一個消息隊列,并使用Process類創(chuàng)建了兩個進程分別執(zhí)行producer函數(shù)和consumer函數(shù)。
- 4 共享內(nèi)存
對于需要多個進程共享的數(shù)據(jù),可以使用共享內(nèi)存來避免數(shù)據(jù)拷貝和進程間通信的開銷。
在Python中,可以使用multiprocessing模塊的Value和Array類來實現(xiàn)共享內(nèi)存。
下面是一個簡單的示例:
import multiprocessing def worker(counter): counter.value += 1 if __name__ == '__main__': counter = multiprocessing.Value('i', 0) processes = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(counter,)) processes.append(p) p.start() for p in processes: p.join() print(counter.value)
在上面的示例中,我們定義了一個worker函數(shù),該函數(shù)接受一個參數(shù)counter,每次執(zhí)行時將counter的值加1。
然后我們使用multiprocessing模塊的Value類創(chuàng)建了一個整型變量counter,并使用Process類創(chuàng)建了5個進程分別執(zhí)行worker函數(shù)。
最后,我們打印出counter的值。
- 5 異步IO
對于I/O密集型任務(wù),可以使用異步IO來提高效率。在Python中,可以使用asyncio模塊來實現(xiàn)異步IO。
下面是一個簡單的示例:
import asyncio async def worker(): await asyncio.sleep(1) print("Worker process started") loop = asyncio.get_event_loop() loop.run_until_complete(worker())
在上面的示例中,我們定義了一個worker函數(shù),該函數(shù)使用asyncio庫的異步IO特性。
在函數(shù)體中,使用asyncio.sleep函數(shù)模擬了一個長時間的I/O操作,并在操作完成后打印了一條消息。
然后我們使用asyncio庫的get_event_loop函數(shù)創(chuàng)建了一個事件循環(huán),并使用run_until_complete函數(shù)啟動worker函數(shù)。在程序執(zhí)行過程中,事件循環(huán)會負責調(diào)度和執(zhí)行異步IO操作。
- 總結(jié)
在Python中,使用多進程模式可以實現(xiàn)多核CPU的并行計算,從而提高程序的執(zhí)行效率。
在本文中,我們介紹了如何使用Python的multiprocessing模塊實現(xiàn)多進程并發(fā)執(zhí)行任務(wù)、進程池、消息隊列、共享內(nèi)存、異步IO等方式來提高程序執(zhí)行效率。
實際應(yīng)用中,需要根據(jù)具體的場景選擇合適的并行計算方式,并注意避免死鎖等常見問題。
到此這篇關(guān)于Python實現(xiàn)cpu并行運算的兩種方式的文章就介紹到這了,更多相關(guān)Python cpu并行運算內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python實現(xiàn)腳本鎖功能(同時只能執(zhí)行一個腳本)
這篇文章主要介紹了Python實現(xiàn)腳本鎖功能(同時只能執(zhí)行一個腳本),本文給大家分享了兩種方法,大家可以根據(jù)個人所需選擇適合自己的方法2017-05-05Python中查看變量的類型內(nèi)存地址所占字節(jié)的大小
這篇文章主要介紹了Python中查看變量的類型,內(nèi)存地址,所占字節(jié)的大小,本文給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-06-06解決pyinstaller 打包exe文件太大,用pipenv 縮小exe的問題
這篇文章主要介紹了解決pyinstaller 打包exe文件太大,用pipenv 縮小exe的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-07-07