Python利用multiprocessing實現(xiàn)最簡單的分布式作業(yè)調(diào)度系統(tǒng)實例
介紹
Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調(diào)度者,將任務分布到其他多個機器的多個進程中,依靠網(wǎng)絡通信。想到這,就在想是不是可以使用此模塊來實現(xiàn)一個簡單的作業(yè)調(diào)度系統(tǒng)。在這之前,我們先來詳細了解下python中的多進程管理包multiprocessing。
multiprocessing.Process
multiprocessing包是Python中的多進程管理包。它與 threading.Thread類似,可以利用multiprocessing.Process對象來創(chuàng)建一個進程。該進程可以允許放在Python程序內(nèi)部編寫的函數(shù)中。該Process對象與Thread對象的用法相同,擁有is_alive()、join([timeout])、run()、start()、terminate()等方法。屬性有:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結(jié)束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類,用來同步進程,其用法也與threading包中的同名類一樣。multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。
這個模塊表示像線程一樣管理進程,這個是multiprocessing的核心,它與threading很相似,對多核CPU的利用率會比threading好的多。
看一下Process類的構(gòu)造方法:
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
參數(shù)說明:
- group:進程所屬組。基本不用
- target:表示調(diào)用對象。
- args:表示調(diào)用對象的位置參數(shù)元組。
- name:別名
- kwargs:表示調(diào)用對象的字典。
創(chuàng)建進程的簡單實例:
#coding=utf-8 import multiprocessing def do(n) : #獲取當前線程的名字 name = multiprocessing.current_process().name print name,'starting' print "worker ", n return if __name__ == '__main__' : numList = [] for i in xrange(5) : p = multiprocessing.Process(target=do, args=(i,)) numList.append(p) p.start() p.join() print "Process end."
執(zhí)行結(jié)果:
Process-1 starting worker 0 Process end. Process-2 starting worker 1 Process end. Process-3 starting worker 2 Process end. Process-4 starting worker 3 Process end. Process-5 starting worker 4 Process end.
創(chuàng)建子進程時,只需要傳入一個執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個Process實例,并用其start()方法啟動,join()方法表示等待子進程結(jié)束以后再繼續(xù)往下運行,通常用于進程間的同步。
注意:
在Windows上要想使用進程模塊,就必須把有關進程的代碼寫在當前.py文件的if __name__ == ‘__main__' :
語句的下面,才能正常使用Windows下的進程模塊。Unix/Linux下則不需要。
multiprocess.Pool
當被操作對象數(shù)目不大時,可以直接利用multiprocessing中的Process動態(tài)成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數(shù)量卻又太過繁瑣,此時可以發(fā)揮進程池的功效。
Pool可以提供指定數(shù)量的進程供用戶調(diào)用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值,那么該請求就會等待,直到池中有進程結(jié)束,才會創(chuàng)建新的進程來它。
apply_async和apply
函數(shù)原型:
apply_async(func[, args=()[, kwds={}[, callback=None]]])
二者都是向進程池中添加新的進程,不同的時,apply每次添加新的進程時,主進程和新的進程會并行執(zhí)行,但是主進程會阻塞,直到新進程的函數(shù)執(zhí)行結(jié)束。 這是很低效的,所以python3.x之后不再使用
apply_async和apply功能相同,但是主進程不會阻塞。
# -*- coding:utf-8 -*- import multiprocessing import time def func(msg): print "*msg: ", msg time.sleep(3) print "*end" if __name__ == "__main__": # 維持執(zhí)行的進程總數(shù)為processes,當一個進程執(zhí)行完畢后會添加新的進程進去 pool = multiprocessing.Pool(processes=3) for i in range(10): msg = "hello [{}]".format(i) # pool.apply(func, (msg,)) pool.apply_async(func, (msg,)) # 異步開啟進程, 非阻塞型, 能夠向池中添加進程而不等待其執(zhí)行完畢就能再次執(zhí)行循環(huán) print "--" * 10 pool.close() # 關閉pool, 則不會有新的進程添加進去 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢 print "All process done."
運行結(jié)果:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py -------------------- *msg: hello [0] *msg: hello [1] *msg: hello [2] *end *msg: hello [3] *end *end *msg: hello [4] *msg: hello [5] *end *msg: hello [6] *end *end *msg: hello [7] *msg: hello [8] *end *msg: hello [9] *end*end *end All process done. Process finished with exit code 0
獲得進程的執(zhí)行結(jié)果
# -*- coding:utf-8 -*- import multiprocessing import time def func_with_return(msg): print "*msg: ", msg time.sleep(3) print "*end" return "{} return".format(msg) if __name__ == "__main__": # 維持執(zhí)行的進程總數(shù)為processes,當一個進程執(zhí)行完畢后會添加新的進程進去 pool = multiprocessing.Pool(processes=3) results = [] for i in range(10): msg = "hello [{}]".format(i) res = pool.apply_async(func_with_return, (msg,)) # 異步開啟進程, 非阻塞型, 能夠向池中添加進程而不等待其執(zhí)行完畢就能再次執(zhí)行循環(huán) results.append(res) print "--" * 10 pool.close() # 關閉pool, 則不會有新的進程添加進去 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢 print "All process done." print "Return results: " for i in results: print i.get() # 獲得進程的執(zhí)行結(jié)果
結(jié)果:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py -------------------- *msg: hello [0] *msg: hello [1] *msg: hello [2] *end *end *msg: hello [3] *msg: hello [4] *end *msg: hello [5] *end *end *msg: hello [6] *msg: hello [7] *end *msg: hello [8] *end *end *msg: hello [9] *end *end All process done. Return results: hello [0] return hello [1] return hello [2] return hello [3] return hello [4] return hello [5] return hello [6] return hello [7] return hello [8] return hello [9] return Process finished with exit code 0
map
函數(shù)原型:
map(func, iterable[, chunksize=None])
Pool類中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會使進程阻塞直到返回結(jié)果。
注意,雖然第二個參數(shù)是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。
# -*- coding:utf-8 -*- import multiprocessing import time def func_with_return(msg): print "*msg: ", msg time.sleep(3) print "*end" return "{} return".format(msg) if __name__ == "__main__": # 維持執(zhí)行的進程總數(shù)為processes,當一個進程執(zhí)行完畢后會添加新的進程進去 pool = multiprocessing.Pool(processes=3) results = [] msgs = [] for i in range(10): msg = "hello [{}]".format(i) msgs.append(msg) results = pool.map(func_with_return, msgs) print "--" * 10 pool.close() # 關閉pool, 則不會有新的進程添加進去 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢 print "All process done." print "Return results: " for i in results: print i # 獲得進程的執(zhí)行結(jié)果
執(zhí)行結(jié)果:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v2.py *msg: hello [0] *msg: hello [1] *msg: hello [2] *end*end *msg: hello [3] *msg: hello [4] *end *msg: hello [5] *end*end *msg: hello [6] *msg: hello [7] *end *msg: hello [8] *end *end *msg: hello [9] *end *end -------------------- All process done. Return results: hello [0] return hello [1] return hello [2] return hello [3] return hello [4] return hello [5] return hello [6] return hello [7] return hello [8] return hello [9] return Process finished with exit code 0
注意:執(zhí)行結(jié)果中“—-”的位置,可以看到,map之后,主進程是阻塞的,等待map的結(jié)果返回
close()
關閉進程池(pool),使其不在接受新的任務。
terminate()
結(jié)束工作進程,不在處理未處理的任務。
join()
主進程阻塞等待子進程的退出,join方法必須在close或terminate之后使用。
進程間通信
多進程最麻煩的地方就是進程間通信,IPC比線程通信要難處理的多,所以留作單獨一篇來記錄
利用multiprocessing實現(xiàn)一個最簡單的分布式作業(yè)調(diào)度系統(tǒng)
Job
首先創(chuàng)建一個Job類,為了測試簡單,只包含一個job id屬性,將來可以封裝一些作業(yè)狀態(tài),作業(yè)命令,執(zhí)行用戶等屬性。
job.py
#!/usr/bin/env python # -*- coding: utf-8 -*- class Job: def __init__(self, job_id): self.job_id = job_id
Master
Master用來派發(fā)作業(yè)和顯示運行完成的作業(yè)信息
master.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from Queue import Queue from multiprocessing.managers import BaseManager from job import Job class Master: def __init__(self): # 派發(fā)出去的作業(yè)隊列 self.dispatched_job_queue = Queue() # 完成的作業(yè)隊列 self.finished_job_queue = Queue() def get_dispatched_job_queue(self): return self.dispatched_job_queue def get_finished_job_queue(self): return self.finished_job_queue def start(self): # 把派發(fā)作業(yè)隊列和完成作業(yè)隊列注冊到網(wǎng)絡上 BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue) BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue) # 監(jiān)聽端口和啟動服務 manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs') manager.start() # 使用上面注冊的方法獲取隊列 dispatched_jobs = manager.get_dispatched_job_queue() finished_jobs = manager.get_finished_job_queue() # 這里一次派發(fā)10個作業(yè),等到10個作業(yè)都運行完后,繼續(xù)再派發(fā)10個作業(yè) job_id = 0 while True: for i in range(0, 10): job_id = job_id + 1 job = Job(job_id) print('Dispatch job: %s' % job.job_id) dispatched_jobs.put(job) while not dispatched_jobs.empty(): job = finished_jobs.get(60) print('Finished Job: %s' % job.job_id) manager.shutdown() if __name__ == "__main__": master = Master() master.start()
Slave
Slave用來運行master派發(fā)的作業(yè)并將結(jié)果返回
slave.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import time from Queue import Queue from multiprocessing.managers import BaseManager from job import Job class Slave: def __init__(self): # 派發(fā)出去的作業(yè)隊列 self.dispatched_job_queue = Queue() # 完成的作業(yè)隊列 self.finished_job_queue = Queue() def start(self): # 把派發(fā)作業(yè)隊列和完成作業(yè)隊列注冊到網(wǎng)絡上 BaseManager.register('get_dispatched_job_queue') BaseManager.register('get_finished_job_queue') # 連接master server = '127.0.0.1' print('Connect to server %s...' % server) manager = BaseManager(address=(server, 8888), authkey='jobs') manager.connect() # 使用上面注冊的方法獲取隊列 dispatched_jobs = manager.get_dispatched_job_queue() finished_jobs = manager.get_finished_job_queue() # 運行作業(yè)并返回結(jié)果,這里只是模擬作業(yè)運行,所以返回的是接收到的作業(yè) while True: job = dispatched_jobs.get(timeout=1) print('Run job: %s ' % job.job_id) time.sleep(1) finished_jobs.put(job) if __name__ == "__main__": slave = Slave() slave.start()
測試
分別打開三個linux終端,第一個終端運行master,第二個和第三個終端用了運行slave,運行結(jié)果如下
master
$ python master.py Dispatch job: 1 Dispatch job: 2 Dispatch job: 3 Dispatch job: 4 Dispatch job: 5 Dispatch job: 6 Dispatch job: 7 Dispatch job: 8 Dispatch job: 9 Dispatch job: 10 Finished Job: 1 Finished Job: 2 Finished Job: 3 Finished Job: 4 Finished Job: 5 Finished Job: 6 Finished Job: 7 Finished Job: 8 Finished Job: 9 Dispatch job: 11 Dispatch job: 12 Dispatch job: 13 Dispatch job: 14 Dispatch job: 15 Dispatch job: 16 Dispatch job: 17 Dispatch job: 18 Dispatch job: 19 Dispatch job: 20 Finished Job: 10 Finished Job: 11 Finished Job: 12 Finished Job: 13 Finished Job: 14 Finished Job: 15 Finished Job: 16 Finished Job: 17 Finished Job: 18 Dispatch job: 21 Dispatch job: 22 Dispatch job: 23 Dispatch job: 24 Dispatch job: 25 Dispatch job: 26 Dispatch job: 27 Dispatch job: 28 Dispatch job: 29 Dispatch job: 30
slave1
$ python slave.py Connect to server 127.0.0.1... Run job: 1 Run job: 2 Run job: 3 Run job: 5 Run job: 7 Run job: 9 Run job: 11 Run job: 13 Run job: 15 Run job: 17 Run job: 19 Run job: 21 Run job: 23
slave2
$ python slave.py Connect to server 127.0.0.1... Run job: 4 Run job: 6 Run job: 8 Run job: 10 Run job: 12 Run job: 14 Run job: 16 Run job: 18 Run job: 20 Run job: 22 Run job: 24
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。
相關文章
python腳本當作Linux中的服務啟動實現(xiàn)方法
今天小編就為大家分享一篇python腳本當作Linux中的服務啟動實現(xiàn)方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-06-06python爬蟲模擬瀏覽器訪問-User-Agent過程解析
這篇文章主要介紹了python爬蟲模擬瀏覽器訪問-User-Agent過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-12-12pycharm中使用request和Pytest進行接口測試的方法
這篇文章主要介紹了pycharm中使用request和Pytest進行接口測試的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07