Python利用multiprocessing實(shí)現(xiàn)最簡(jiǎn)單的分布式作業(yè)調(diào)度系統(tǒng)實(shí)例
介紹
Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺(tái)機(jī)器上。一個(gè)服務(wù)進(jìn)程可以作為調(diào)度者,將任務(wù)分布到其他多個(gè)機(jī)器的多個(gè)進(jìn)程中,依靠網(wǎng)絡(luò)通信。想到這,就在想是不是可以使用此模塊來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的作業(yè)調(diào)度系統(tǒng)。在這之前,我們先來(lái)詳細(xì)了解下python中的多進(jìn)程管理包multiprocessing。
multiprocessing.Process
multiprocessing包是Python中的多進(jìn)程管理包。它與 threading.Thread類(lèi)似,可以利用multiprocessing.Process對(duì)象來(lái)創(chuàng)建一個(gè)進(jìn)程。該進(jìn)程可以允許放在Python程序內(nèi)部編寫(xiě)的函數(shù)中。該P(yáng)rocess對(duì)象與Thread對(duì)象的用法相同,擁有is_alive()、join([timeout])、run()、start()、terminate()等方法。屬性有:authkey、daemon(要通過(guò)start()設(shè)置)、exitcode(進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號(hào)N結(jié)束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類(lèi),用來(lái)同步進(jìn)程,其用法也與threading包中的同名類(lèi)一樣。multiprocessing的很大一部份與threading使用同一套API,只不過(guò)換到了多進(jìn)程的情境。
這個(gè)模塊表示像線程一樣管理進(jìn)程,這個(gè)是multiprocessing的核心,它與threading很相似,對(duì)多核CPU的利用率會(huì)比threading好的多。
看一下Process類(lèi)的構(gòu)造方法:
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
參數(shù)說(shuō)明:
- group:進(jìn)程所屬組。基本不用
- target:表示調(diào)用對(duì)象。
- args:表示調(diào)用對(duì)象的位置參數(shù)元組。
- name:別名
- kwargs:表示調(diào)用對(duì)象的字典。
創(chuàng)建進(jìn)程的簡(jiǎn)單實(shí)例:
#coding=utf-8 import multiprocessing def do(n) : #獲取當(dāng)前線程的名字 name = multiprocessing.current_process().name print name,'starting' print "worker ", n return if __name__ == '__main__' : numList = [] for i in xrange(5) : p = multiprocessing.Process(target=do, args=(i,)) numList.append(p) p.start() p.join() print "Process end."
執(zhí)行結(jié)果:
Process-1 starting worker 0 Process end. Process-2 starting worker 1 Process end. Process-3 starting worker 2 Process end. Process-4 starting worker 3 Process end. Process-5 starting worker 4 Process end.
創(chuàng)建子進(jìn)程時(shí),只需要傳入一個(gè)執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個(gè)Process實(shí)例,并用其start()方法啟動(dòng),join()方法表示等待子進(jìn)程結(jié)束以后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步。
注意:
在Windows上要想使用進(jìn)程模塊,就必須把有關(guān)進(jìn)程的代碼寫(xiě)在當(dāng)前.py文件的if __name__ == ‘__main__' :
語(yǔ)句的下面,才能正常使用Windows下的進(jìn)程模塊。Unix/Linux下則不需要。
multiprocess.Pool
當(dāng)被操作對(duì)象數(shù)目不大時(shí),可以直接利用multiprocessing中的Process動(dòng)態(tài)成生多個(gè)進(jìn)程,十幾個(gè)還好,但如果是上百個(gè),上千個(gè)目標(biāo),手動(dòng)的去限制進(jìn)程數(shù)量卻又太過(guò)繁瑣,此時(shí)可以發(fā)揮進(jìn)程池的功效。
Pool可以提供指定數(shù)量的進(jìn)程供用戶(hù)調(diào)用,當(dāng)有新的請(qǐng)求提交到pool中時(shí),如果池還沒(méi)有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來(lái)執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來(lái)它。
apply_async和apply
函數(shù)原型:
apply_async(func[, args=()[, kwds={}[, callback=None]]])
二者都是向進(jìn)程池中添加新的進(jìn)程,不同的時(shí),apply每次添加新的進(jìn)程時(shí),主進(jìn)程和新的進(jìn)程會(huì)并行執(zhí)行,但是主進(jìn)程會(huì)阻塞,直到新進(jìn)程的函數(shù)執(zhí)行結(jié)束。 這是很低效的,所以python3.x之后不再使用
apply_async和apply功能相同,但是主進(jìn)程不會(huì)阻塞。
# -*- coding:utf-8 -*- import multiprocessing import time def func(msg): print "*msg: ", msg time.sleep(3) print "*end" if __name__ == "__main__": # 維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會(huì)添加新的進(jìn)程進(jìn)去 pool = multiprocessing.Pool(processes=3) for i in range(10): msg = "hello [{}]".format(i) # pool.apply(func, (msg,)) pool.apply_async(func, (msg,)) # 異步開(kāi)啟進(jìn)程, 非阻塞型, 能夠向池中添加進(jìn)程而不等待其執(zhí)行完畢就能再次執(zhí)行循環(huán) print "--" * 10 pool.close() # 關(guān)閉pool, 則不會(huì)有新的進(jìn)程添加進(jìn)去 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢 print "All process done."
運(yùn)行結(jié)果:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py -------------------- *msg: hello [0] *msg: hello [1] *msg: hello [2] *end *msg: hello [3] *end *end *msg: hello [4] *msg: hello [5] *end *msg: hello [6] *end *end *msg: hello [7] *msg: hello [8] *end *msg: hello [9] *end*end *end All process done. Process finished with exit code 0
獲得進(jìn)程的執(zhí)行結(jié)果
# -*- coding:utf-8 -*- import multiprocessing import time def func_with_return(msg): print "*msg: ", msg time.sleep(3) print "*end" return "{} return".format(msg) if __name__ == "__main__": # 維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會(huì)添加新的進(jìn)程進(jìn)去 pool = multiprocessing.Pool(processes=3) results = [] for i in range(10): msg = "hello [{}]".format(i) res = pool.apply_async(func_with_return, (msg,)) # 異步開(kāi)啟進(jìn)程, 非阻塞型, 能夠向池中添加進(jìn)程而不等待其執(zhí)行完畢就能再次執(zhí)行循環(huán) results.append(res) print "--" * 10 pool.close() # 關(guān)閉pool, 則不會(huì)有新的進(jìn)程添加進(jìn)去 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢 print "All process done." print "Return results: " for i in results: print i.get() # 獲得進(jìn)程的執(zhí)行結(jié)果
結(jié)果:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py -------------------- *msg: hello [0] *msg: hello [1] *msg: hello [2] *end *end *msg: hello [3] *msg: hello [4] *end *msg: hello [5] *end *end *msg: hello [6] *msg: hello [7] *end *msg: hello [8] *end *end *msg: hello [9] *end *end All process done. Return results: hello [0] return hello [1] return hello [2] return hello [3] return hello [4] return hello [5] return hello [6] return hello [7] return hello [8] return hello [9] return Process finished with exit code 0
map
函數(shù)原型:
map(func, iterable[, chunksize=None])
Pool類(lèi)中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會(huì)使進(jìn)程阻塞直到返回結(jié)果。
注意,雖然第二個(gè)參數(shù)是一個(gè)迭代器,但在實(shí)際使用中,必須在整個(gè)隊(duì)列都就緒后,程序才會(huì)運(yùn)行子進(jìn)程。
# -*- coding:utf-8 -*- import multiprocessing import time def func_with_return(msg): print "*msg: ", msg time.sleep(3) print "*end" return "{} return".format(msg) if __name__ == "__main__": # 維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會(huì)添加新的進(jìn)程進(jìn)去 pool = multiprocessing.Pool(processes=3) results = [] msgs = [] for i in range(10): msg = "hello [{}]".format(i) msgs.append(msg) results = pool.map(func_with_return, msgs) print "--" * 10 pool.close() # 關(guān)閉pool, 則不會(huì)有新的進(jìn)程添加進(jìn)去 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢 print "All process done." print "Return results: " for i in results: print i # 獲得進(jìn)程的執(zhí)行結(jié)果
執(zhí)行結(jié)果:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v2.py *msg: hello [0] *msg: hello [1] *msg: hello [2] *end*end *msg: hello [3] *msg: hello [4] *end *msg: hello [5] *end*end *msg: hello [6] *msg: hello [7] *end *msg: hello [8] *end *end *msg: hello [9] *end *end -------------------- All process done. Return results: hello [0] return hello [1] return hello [2] return hello [3] return hello [4] return hello [5] return hello [6] return hello [7] return hello [8] return hello [9] return Process finished with exit code 0
注意:執(zhí)行結(jié)果中“—-”的位置,可以看到,map之后,主進(jìn)程是阻塞的,等待map的結(jié)果返回
close()
關(guān)閉進(jìn)程池(pool),使其不在接受新的任務(wù)。
terminate()
結(jié)束工作進(jìn)程,不在處理未處理的任務(wù)。
join()
主進(jìn)程阻塞等待子進(jìn)程的退出,join方法必須在close或terminate之后使用。
進(jìn)程間通信
多進(jìn)程最麻煩的地方就是進(jìn)程間通信,IPC比線程通信要難處理的多,所以留作單獨(dú)一篇來(lái)記錄
利用multiprocessing實(shí)現(xiàn)一個(gè)最簡(jiǎn)單的分布式作業(yè)調(diào)度系統(tǒng)
Job
首先創(chuàng)建一個(gè)Job類(lèi),為了測(cè)試簡(jiǎn)單,只包含一個(gè)job id屬性,將來(lái)可以封裝一些作業(yè)狀態(tài),作業(yè)命令,執(zhí)行用戶(hù)等屬性。
job.py
#!/usr/bin/env python # -*- coding: utf-8 -*- class Job: def __init__(self, job_id): self.job_id = job_id
Master
Master用來(lái)派發(fā)作業(yè)和顯示運(yùn)行完成的作業(yè)信息
master.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from Queue import Queue from multiprocessing.managers import BaseManager from job import Job class Master: def __init__(self): # 派發(fā)出去的作業(yè)隊(duì)列 self.dispatched_job_queue = Queue() # 完成的作業(yè)隊(duì)列 self.finished_job_queue = Queue() def get_dispatched_job_queue(self): return self.dispatched_job_queue def get_finished_job_queue(self): return self.finished_job_queue def start(self): # 把派發(fā)作業(yè)隊(duì)列和完成作業(yè)隊(duì)列注冊(cè)到網(wǎng)絡(luò)上 BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue) BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue) # 監(jiān)聽(tīng)端口和啟動(dòng)服務(wù) manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs') manager.start() # 使用上面注冊(cè)的方法獲取隊(duì)列 dispatched_jobs = manager.get_dispatched_job_queue() finished_jobs = manager.get_finished_job_queue() # 這里一次派發(fā)10個(gè)作業(yè),等到10個(gè)作業(yè)都運(yùn)行完后,繼續(xù)再派發(fā)10個(gè)作業(yè) job_id = 0 while True: for i in range(0, 10): job_id = job_id + 1 job = Job(job_id) print('Dispatch job: %s' % job.job_id) dispatched_jobs.put(job) while not dispatched_jobs.empty(): job = finished_jobs.get(60) print('Finished Job: %s' % job.job_id) manager.shutdown() if __name__ == "__main__": master = Master() master.start()
Slave
Slave用來(lái)運(yùn)行master派發(fā)的作業(yè)并將結(jié)果返回
slave.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import time from Queue import Queue from multiprocessing.managers import BaseManager from job import Job class Slave: def __init__(self): # 派發(fā)出去的作業(yè)隊(duì)列 self.dispatched_job_queue = Queue() # 完成的作業(yè)隊(duì)列 self.finished_job_queue = Queue() def start(self): # 把派發(fā)作業(yè)隊(duì)列和完成作業(yè)隊(duì)列注冊(cè)到網(wǎng)絡(luò)上 BaseManager.register('get_dispatched_job_queue') BaseManager.register('get_finished_job_queue') # 連接master server = '127.0.0.1' print('Connect to server %s...' % server) manager = BaseManager(address=(server, 8888), authkey='jobs') manager.connect() # 使用上面注冊(cè)的方法獲取隊(duì)列 dispatched_jobs = manager.get_dispatched_job_queue() finished_jobs = manager.get_finished_job_queue() # 運(yùn)行作業(yè)并返回結(jié)果,這里只是模擬作業(yè)運(yùn)行,所以返回的是接收到的作業(yè) while True: job = dispatched_jobs.get(timeout=1) print('Run job: %s ' % job.job_id) time.sleep(1) finished_jobs.put(job) if __name__ == "__main__": slave = Slave() slave.start()
測(cè)試
分別打開(kāi)三個(gè)linux終端,第一個(gè)終端運(yùn)行master,第二個(gè)和第三個(gè)終端用了運(yùn)行slave,運(yùn)行結(jié)果如下
master
$ python master.py Dispatch job: 1 Dispatch job: 2 Dispatch job: 3 Dispatch job: 4 Dispatch job: 5 Dispatch job: 6 Dispatch job: 7 Dispatch job: 8 Dispatch job: 9 Dispatch job: 10 Finished Job: 1 Finished Job: 2 Finished Job: 3 Finished Job: 4 Finished Job: 5 Finished Job: 6 Finished Job: 7 Finished Job: 8 Finished Job: 9 Dispatch job: 11 Dispatch job: 12 Dispatch job: 13 Dispatch job: 14 Dispatch job: 15 Dispatch job: 16 Dispatch job: 17 Dispatch job: 18 Dispatch job: 19 Dispatch job: 20 Finished Job: 10 Finished Job: 11 Finished Job: 12 Finished Job: 13 Finished Job: 14 Finished Job: 15 Finished Job: 16 Finished Job: 17 Finished Job: 18 Dispatch job: 21 Dispatch job: 22 Dispatch job: 23 Dispatch job: 24 Dispatch job: 25 Dispatch job: 26 Dispatch job: 27 Dispatch job: 28 Dispatch job: 29 Dispatch job: 30
slave1
$ python slave.py Connect to server 127.0.0.1... Run job: 1 Run job: 2 Run job: 3 Run job: 5 Run job: 7 Run job: 9 Run job: 11 Run job: 13 Run job: 15 Run job: 17 Run job: 19 Run job: 21 Run job: 23
slave2
$ python slave.py Connect to server 127.0.0.1... Run job: 4 Run job: 6 Run job: 8 Run job: 10 Run job: 12 Run job: 14 Run job: 16 Run job: 18 Run job: 20 Run job: 22 Run job: 24
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
Pycharm無(wú)法正常安裝第三方庫(kù)的幾條應(yīng)對(duì)方法匯總
在使用pycharm學(xué)習(xí)python的時(shí)候,經(jīng)常需要第三方庫(kù),沒(méi)有第三方庫(kù)程序就會(huì)報(bào)錯(cuò),下面這篇文章主要給大家介紹了關(guān)于Pycharm無(wú)法正常安裝第三方庫(kù)的幾條應(yīng)對(duì)方法,需要的朋友可以參考下2023-04-04全面解讀Python Web開(kāi)發(fā)框架Django
Django是一個(gè)開(kāi)源的Web應(yīng)用框架,由Python寫(xiě)成。采用MVC的軟件設(shè)計(jì)模式,主要目標(biāo)是使得開(kāi)發(fā)復(fù)雜的、數(shù)據(jù)庫(kù)驅(qū)動(dòng)的網(wǎng)站變得簡(jiǎn)單。Django注重組件的重用性和“可插拔性”,敏捷開(kāi)發(fā)和DRY法則(Don’t Repeat Yoursef)。2014-06-06python腳本當(dāng)作Linux中的服務(wù)啟動(dòng)實(shí)現(xiàn)方法
今天小編就為大家分享一篇python腳本當(dāng)作Linux中的服務(wù)啟動(dòng)實(shí)現(xiàn)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-06-06python爬蟲(chóng)模擬瀏覽器訪問(wèn)-User-Agent過(guò)程解析
這篇文章主要介紹了python爬蟲(chóng)模擬瀏覽器訪問(wèn)-User-Agent過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12pycharm中使用request和Pytest進(jìn)行接口測(cè)試的方法
這篇文章主要介紹了pycharm中使用request和Pytest進(jìn)行接口測(cè)試的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07