python中如何使用分步式進(jìn)程計(jì)算詳解
前言
在python中使用多進(jìn)程和多線程都能達(dá)到同時(shí)運(yùn)行多個(gè)任務(wù),和多進(jìn)程和多線程的選擇上,應(yīng)該優(yōu)先選擇多進(jìn)程的方式,因?yàn)槎噙M(jìn)程更加穩(wěn)定,且對(duì)于進(jìn)程的操作管理也更加方便,但有一點(diǎn)是多進(jìn)程獨(dú)有的殺手锏,多進(jìn)程可以將進(jìn)程分步到多臺(tái)機(jī)器上跑,假如有很多個(gè)任務(wù),一臺(tái)機(jī)器即使開(kāi)了多進(jìn)程或者多進(jìn)程跑起來(lái)還是要耗很多時(shí)間,那么這時(shí)就要想一下可否將任務(wù)分配到多臺(tái)機(jī)器上跑,這樣可以更快的完成任務(wù)。
在分步式進(jìn)程運(yùn)算中,進(jìn)程之前的通信還是依賴于Queue,但此時(shí)的隊(duì)列不能直接使用,需要使用multiprocessing.managers.BaseManager 進(jìn)行包裝,通過(guò)回調(diào)以后才能使用,既然是分步式的調(diào)用,那么應(yīng)該有一個(gè)服務(wù)端和一個(gè)客戶端,服務(wù)端通過(guò)網(wǎng)絡(luò)協(xié)議將隊(duì)列中的信息給各個(gè)客戶端進(jìn)行調(diào)用,客戶端也可以通過(guò)隊(duì)列將結(jié)果返回,然后服務(wù)端進(jìn)行結(jié)果的收集展示,流程如下

分步式流程
服務(wù)端將任務(wù)放到 task_queue 中,然后四個(gè)客戶端通過(guò)網(wǎng)絡(luò)端口從task_queue中獲取到任務(wù),然后進(jìn)行計(jì)算,再將結(jié)果放到result_queue中,最后服務(wù)端統(tǒng)一處理結(jié)果。整體的流程比較清晰,只是需要強(qiáng)調(diào),這里的隊(duì)列不能是原始的隊(duì)列,需要使用BaseManager 進(jìn)行包裝。
先看一下服務(wù)端的代碼
#coding:gbk
import time, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
# 任務(wù)個(gè)數(shù)
task_number = 10
# 定義收發(fā)隊(duì)列
task_queue = queue.Queue(task_number)
result_queue = queue.Queue(task_number)
def gettask():
return task_queue
def getresult():
return result_queue
def test():
# windows下綁定調(diào)用接口不能使用lambda,所以只能先定義函數(shù)再綁定
BaseManager.register('get_task', callable=gettask)
BaseManager.register('get_result', callable=getresult)
# 綁定端口并設(shè)置驗(yàn)證碼,windows下需要填寫ip地址,linux下不填默認(rèn)為本地
manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123')
# 啟動(dòng)
manager.start()
try:
# 通過(guò)網(wǎng)絡(luò)獲取任務(wù)隊(duì)列和結(jié)果隊(duì)列
task = manager.get_task()
result = manager.get_result()
# 添加任務(wù)
for i in range(task_number):
print('Put task %d...' % i)
task.put(i)
# 每秒檢測(cè)一次是否所有任務(wù)都被執(zhí)行完
while not result.full():
print(task.qsize())
time.sleep(1)
for i in range(result.qsize()):
ans = result.get()
print('task %d is finish , runtime:%d s' % ans)
except:
print('Manager error')
finally:
manager.shutdown()
if __name__ == '__main__':
# windows下多進(jìn)程可能會(huì)炸,添加這句可以緩解
freeze_support()
test()
這里重點(diǎn)說(shuō)一下 BaseManager.register('get_task', callable=gettask) 這行代碼,它的意思是注冊(cè)一個(gè)get_task的操作,執(zhí)行的操作是gettask()函數(shù),上面定義了gettask()函數(shù),返回的是task_queue,這也是之前說(shuō)的不能直接使用queue.Queue,必須要使用通過(guò)BaseManager的register接口封裝過(guò)的的隊(duì)列,下面使用task = manager.get_task()來(lái)獲取到這個(gè)隊(duì)列。
manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123')
這行代碼初始了一個(gè)manager,它綁定了本機(jī)的5002端口,并且在客戶端連接的時(shí)候需要一個(gè)密碼:123。
接下來(lái)看一下客戶端代碼。
#coding:gbk
import time, sys, queue, random
from multiprocessing.managers import BaseManager
BaseManager.register('get_task')
BaseManager.register('get_result')
conn = BaseManager(address = ('127.0.0.1',5002), authkey = b'123')
try:
conn.connect()
except:
print('連接失敗')
sys.exit()
task = conn.get_task()
result = conn.get_result()
while not task.empty():
print(task.qsize())
n = task.get(timeout = 1)
print('run task %d' % n)
sleeptime = random.randint(0,3)
time.sleep(sleeptime)
rt = (n, sleeptime)
result.put(rt)
if __name__ == '__main__':
pass;
這里主要看以下的代碼
BaseManager.register('get_task')
BaseManager.register('get_result')
這兩個(gè)是注冊(cè)函數(shù),和之前的服務(wù)端所對(duì)應(yīng),之前服務(wù)端注冊(cè)了這兩個(gè)函數(shù),這里才能注冊(cè)使用,注意這里不能注冊(cè)服務(wù)端沒(méi)有注冊(cè)的函數(shù)
運(yùn)行一下,先運(yùn)行服務(wù)端,然后再啟兩個(gè)cmd運(yùn)行客戶端,也可以在局域網(wǎng)中的另外的機(jī)器上運(yùn)行,但是要修改服務(wù)端的ip地址
服務(wù)端的結(jié)果如下
Put task 0...
Put task 1...
Put task 2...
Put task 3...
Put task 4...
Put task 5...
Put task 6...
Put task 7...
Put task 8...
Put task 9...
task 0 is finish , runtime:3 s
task 1 is finish , runtime:0 s
task 2 is finish , runtime:2 s
task 4 is finish , runtime:1 s
task 3 is finish , runtime:3 s
task 6 is finish , runtime:1 s
task 7 is finish , runtime:0 s
task 5 is finish , runtime:3 s
task 8 is finish , runtime:2 s
task 9 is finish , runtime:3 s
兩個(gè)客戶端的結(jié)果分別如下
客戶端1
10
run task 0
9
run task 1
8
run task 2
6
run task 4
5
run task 5
1
run task 9
客戶端2
7
run task 3
4
run task 6
3
run task 7
2
run task 8
一起運(yùn)行的截圖如下
結(jié)果
由于隊(duì)列是線程安全的,所以這里不用加鎖,在客戶端中打印print(task.qsize()) 當(dāng)前的隊(duì)列大小,可以看到隊(duì)列的信息中同步到各個(gè)客戶端的。
最后還是要多說(shuō)一句,分步式多進(jìn)程雖然可以把任務(wù)分散到不同的機(jī)器上運(yùn)行,可以處理多任務(wù),但是如果此時(shí)服務(wù)端掛掉的話,任務(wù)就全丟掉了,所以在生產(chǎn)環(huán)境下還是考慮使用消息中間件如kafka等。
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
Python調(diào)用Zoomeye搜索接口的實(shí)現(xiàn)
本文主要介紹了Python調(diào)用Zoomeye搜索接口的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01
python通過(guò)re正則表達(dá)式切割中英文的操作
這篇文章主要介紹了python通過(guò)re正則表達(dá)式切割中英文的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-03-03
python 的 scapy庫(kù),實(shí)現(xiàn)網(wǎng)卡收發(fā)包的例子
今天小編就為大家分享一篇python 的 scapy庫(kù),實(shí)現(xiàn)網(wǎng)卡收發(fā)包的例子,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-07-07
pytest解讀fixtures中yield與addfinalizer區(qū)別
這篇文章主要為大家介紹了pytest官方解讀fixtures中yield與addfinalizer區(qū)別,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06
Python編程實(shí)現(xiàn)簡(jiǎn)易的音樂(lè)播放器基本操作
這篇文章主要來(lái)教大家利用Python編程來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)易的音樂(lè)播放器,文中含有基本功能的操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-10-10
python實(shí)現(xiàn)字符串連接的三種方法及其效率、適用場(chǎng)景詳解
本篇文章主要介紹了python實(shí)現(xiàn)字符串連接的三種方法及其效率、適用場(chǎng)景詳解,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-01-01

