欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python利用multiprocessing實(shí)現(xiàn)最簡(jiǎn)單的分布式作業(yè)調(diào)度系統(tǒng)實(shí)例

 更新時(shí)間:2017年11月14日 11:44:06   作者:Parle  
這篇文章主要給大家介紹了關(guān)于Python利用multiprocessing如何實(shí)現(xiàn)最簡(jiǎn)單的分布式作業(yè)調(diào)度系統(tǒng)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起看看吧。

介紹

Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺(tái)機(jī)器上。一個(gè)服務(wù)進(jìn)程可以作為調(diào)度者,將任務(wù)分布到其他多個(gè)機(jī)器的多個(gè)進(jìn)程中,依靠網(wǎng)絡(luò)通信。想到這,就在想是不是可以使用此模塊來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的作業(yè)調(diào)度系統(tǒng)。在這之前,我們先來(lái)詳細(xì)了解下python中的多進(jìn)程管理包multiprocessing。

multiprocessing.Process

multiprocessing包是Python中的多進(jìn)程管理包。它與 threading.Thread類(lèi)似,可以利用multiprocessing.Process對(duì)象來(lái)創(chuàng)建一個(gè)進(jìn)程。該進(jìn)程可以允許放在Python程序內(nèi)部編寫(xiě)的函數(shù)中。該P(yáng)rocess對(duì)象與Thread對(duì)象的用法相同,擁有is_alive()、join([timeout])、run()、start()、terminate()等方法。屬性有:authkey、daemon(要通過(guò)start()設(shè)置)、exitcode(進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號(hào)N結(jié)束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類(lèi),用來(lái)同步進(jìn)程,其用法也與threading包中的同名類(lèi)一樣。multiprocessing的很大一部份與threading使用同一套API,只不過(guò)換到了多進(jìn)程的情境。

這個(gè)模塊表示像線程一樣管理進(jìn)程,這個(gè)是multiprocessing的核心,它與threading很相似,對(duì)多核CPU的利用率會(huì)比threading好的多。

看一下Process類(lèi)的構(gòu)造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

參數(shù)說(shuō)明:

  • group:進(jìn)程所屬組。基本不用
  • target:表示調(diào)用對(duì)象。
  • args:表示調(diào)用對(duì)象的位置參數(shù)元組。
  • name:別名
  • kwargs:表示調(diào)用對(duì)象的字典。

創(chuàng)建進(jìn)程的簡(jiǎn)單實(shí)例:

#coding=utf-8
import multiprocessing

def do(n) :
 #獲取當(dāng)前線程的名字
 name = multiprocessing.current_process().name
 print name,'starting'
 print "worker ", n
 return 

if __name__ == '__main__' :
 numList = []
 for i in xrange(5) :
 p = multiprocessing.Process(target=do, args=(i,))
 numList.append(p)
 p.start()
 p.join()
 print "Process end."

執(zhí)行結(jié)果:

Process-1 starting
worker 0
Process end.
Process-2 starting
worker 1
Process end.
Process-3 starting
worker 2
Process end.
Process-4 starting
worker 3
Process end.
Process-5 starting
worker 4
Process end.

創(chuàng)建子進(jìn)程時(shí),只需要傳入一個(gè)執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個(gè)Process實(shí)例,并用其start()方法啟動(dòng),join()方法表示等待子進(jìn)程結(jié)束以后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步。

注意:

在Windows上要想使用進(jìn)程模塊,就必須把有關(guān)進(jìn)程的代碼寫(xiě)在當(dāng)前.py文件的if __name__ == ‘__main__' :語(yǔ)句的下面,才能正常使用Windows下的進(jìn)程模塊。Unix/Linux下則不需要。

multiprocess.Pool

當(dāng)被操作對(duì)象數(shù)目不大時(shí),可以直接利用multiprocessing中的Process動(dòng)態(tài)成生多個(gè)進(jìn)程,十幾個(gè)還好,但如果是上百個(gè),上千個(gè)目標(biāo),手動(dòng)的去限制進(jìn)程數(shù)量卻又太過(guò)繁瑣,此時(shí)可以發(fā)揮進(jìn)程池的功效。

Pool可以提供指定數(shù)量的進(jìn)程供用戶(hù)調(diào)用,當(dāng)有新的請(qǐng)求提交到pool中時(shí),如果池還沒(méi)有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來(lái)執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來(lái)它。

apply_async和apply

函數(shù)原型:

apply_async(func[, args=()[, kwds={}[, callback=None]]])

二者都是向進(jìn)程池中添加新的進(jìn)程,不同的時(shí),apply每次添加新的進(jìn)程時(shí),主進(jìn)程和新的進(jìn)程會(huì)并行執(zhí)行,但是主進(jìn)程會(huì)阻塞,直到新進(jìn)程的函數(shù)執(zhí)行結(jié)束。 這是很低效的,所以python3.x之后不再使用

apply_async和apply功能相同,但是主進(jìn)程不會(huì)阻塞。

# -*- coding:utf-8 -*-

import multiprocessing
import time

def func(msg):
 print "*msg: ", msg
 time.sleep(3)
 print "*end"

if __name__ == "__main__":
 # 維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會(huì)添加新的進(jìn)程進(jìn)去
 pool = multiprocessing.Pool(processes=3)
 for i in range(10):
 msg = "hello [{}]".format(i)
 # pool.apply(func, (msg,))
 pool.apply_async(func, (msg,)) # 異步開(kāi)啟進(jìn)程, 非阻塞型, 能夠向池中添加進(jìn)程而不等待其執(zhí)行完畢就能再次執(zhí)行循環(huán)

 print "--" * 10
 pool.close() # 關(guān)閉pool, 則不會(huì)有新的進(jìn)程添加進(jìn)去
 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢
 print "All process done."

運(yùn)行結(jié)果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py
--------------------
*msg: hello [0]
*msg: hello [1]
*msg: hello [2]
*end
*msg: hello [3]
*end
*end
*msg: hello [4]
*msg: hello [5]
*end
*msg: hello [6]
*end
*end
*msg: hello [7]
*msg: hello [8]
*end
*msg: hello [9]
*end*end

*end
All process done.

Process finished with exit code 0

獲得進(jìn)程的執(zhí)行結(jié)果

# -*- coding:utf-8 -*-

import multiprocessing
import time

def func_with_return(msg):
 print "*msg: ", msg
 time.sleep(3)
 print "*end"
 return "{} return".format(msg)

if __name__ == "__main__":
 # 維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會(huì)添加新的進(jìn)程進(jìn)去
 pool = multiprocessing.Pool(processes=3)
 results = []
 for i in range(10):
 msg = "hello [{}]".format(i)
 res = pool.apply_async(func_with_return, (msg,)) # 異步開(kāi)啟進(jìn)程, 非阻塞型, 能夠向池中添加進(jìn)程而不等待其執(zhí)行完畢就能再次執(zhí)行循環(huán)
 results.append(res)

 print "--" * 10
 pool.close() # 關(guān)閉pool, 則不會(huì)有新的進(jìn)程添加進(jìn)去
 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢
 print "All process done."

 print "Return results: "
 for i in results:
 print i.get() # 獲得進(jìn)程的執(zhí)行結(jié)果

結(jié)果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py
--------------------
*msg: hello [0]
*msg: hello [1]
*msg: hello [2]
*end
*end
*msg: hello [3]
*msg: hello [4]
*end
*msg: hello [5]
*end
*end
*msg: hello [6]
*msg: hello [7]
*end
*msg: hello [8]
*end
*end
*msg: hello [9]
*end
*end
All process done.
Return results: 
hello [0] return
hello [1] return
hello [2] return
hello [3] return
hello [4] return
hello [5] return
hello [6] return
hello [7] return
hello [8] return
hello [9] return

Process finished with exit code 0

map

函數(shù)原型:

map(func, iterable[, chunksize=None])

Pool類(lèi)中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會(huì)使進(jìn)程阻塞直到返回結(jié)果。

注意,雖然第二個(gè)參數(shù)是一個(gè)迭代器,但在實(shí)際使用中,必須在整個(gè)隊(duì)列都就緒后,程序才會(huì)運(yùn)行子進(jìn)程。

# -*- coding:utf-8 -*-

import multiprocessing
import time

def func_with_return(msg):
 print "*msg: ", msg
 time.sleep(3)
 print "*end"
 return "{} return".format(msg)

if __name__ == "__main__":
 # 維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會(huì)添加新的進(jìn)程進(jìn)去
 pool = multiprocessing.Pool(processes=3)
 results = []
 msgs = []
 for i in range(10):
 msg = "hello [{}]".format(i)
 msgs.append(msg)

 results = pool.map(func_with_return, msgs)

 print "--" * 10
 pool.close() # 關(guān)閉pool, 則不會(huì)有新的進(jìn)程添加進(jìn)去
 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢
 print "All process done."

 print "Return results: "
 for i in results:
 print i # 獲得進(jìn)程的執(zhí)行結(jié)果

執(zhí)行結(jié)果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v2.py
*msg: hello [0]
*msg: hello [1]
*msg: hello [2]
*end*end

*msg: hello [3]
*msg: hello [4]
*end
*msg: hello [5]
*end*end

*msg: hello [6]
*msg: hello [7]
*end
*msg: hello [8]
*end
*end
*msg: hello [9]
*end
*end
--------------------
All process done.
Return results: 
hello [0] return
hello [1] return
hello [2] return
hello [3] return
hello [4] return
hello [5] return
hello [6] return
hello [7] return
hello [8] return
hello [9] return

Process finished with exit code 0

注意:執(zhí)行結(jié)果中“—-”的位置,可以看到,map之后,主進(jìn)程是阻塞的,等待map的結(jié)果返回

close()

關(guān)閉進(jìn)程池(pool),使其不在接受新的任務(wù)。

terminate()

結(jié)束工作進(jìn)程,不在處理未處理的任務(wù)。

join()

主進(jìn)程阻塞等待子進(jìn)程的退出,join方法必須在close或terminate之后使用。

進(jìn)程間通信

多進(jìn)程最麻煩的地方就是進(jìn)程間通信,IPC比線程通信要難處理的多,所以留作單獨(dú)一篇來(lái)記錄

利用multiprocessing實(shí)現(xiàn)一個(gè)最簡(jiǎn)單的分布式作業(yè)調(diào)度系統(tǒng)

Job

首先創(chuàng)建一個(gè)Job類(lèi),為了測(cè)試簡(jiǎn)單,只包含一個(gè)job id屬性,將來(lái)可以封裝一些作業(yè)狀態(tài),作業(yè)命令,執(zhí)行用戶(hù)等屬性。

job.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

class Job:
 def __init__(self, job_id):
 self.job_id = job_id

Master

Master用來(lái)派發(fā)作業(yè)和顯示運(yùn)行完成的作業(yè)信息

master.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job


class Master:

 def __init__(self):
 # 派發(fā)出去的作業(yè)隊(duì)列
 self.dispatched_job_queue = Queue()
 # 完成的作業(yè)隊(duì)列
 self.finished_job_queue = Queue()

 def get_dispatched_job_queue(self):
 return self.dispatched_job_queue

 def get_finished_job_queue(self):
 return self.finished_job_queue

 def start(self):
 # 把派發(fā)作業(yè)隊(duì)列和完成作業(yè)隊(duì)列注冊(cè)到網(wǎng)絡(luò)上
 BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue)
 BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue)

 # 監(jiān)聽(tīng)端口和啟動(dòng)服務(wù)
 manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs')
 manager.start()

 # 使用上面注冊(cè)的方法獲取隊(duì)列
 dispatched_jobs = manager.get_dispatched_job_queue()
 finished_jobs = manager.get_finished_job_queue()

 # 這里一次派發(fā)10個(gè)作業(yè),等到10個(gè)作業(yè)都運(yùn)行完后,繼續(xù)再派發(fā)10個(gè)作業(yè)
 job_id = 0
 while True:
  for i in range(0, 10):
  job_id = job_id + 1
  job = Job(job_id)
  print('Dispatch job: %s' % job.job_id)
  dispatched_jobs.put(job)

  while not dispatched_jobs.empty():
  job = finished_jobs.get(60)
  print('Finished Job: %s' % job.job_id)

 manager.shutdown()

if __name__ == "__main__":
 master = Master()
 master.start()

Slave

Slave用來(lái)運(yùn)行master派發(fā)的作業(yè)并將結(jié)果返回

slave.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job


class Slave:

 def __init__(self):
 # 派發(fā)出去的作業(yè)隊(duì)列
 self.dispatched_job_queue = Queue()
 # 完成的作業(yè)隊(duì)列
 self.finished_job_queue = Queue()

 def start(self):
 # 把派發(fā)作業(yè)隊(duì)列和完成作業(yè)隊(duì)列注冊(cè)到網(wǎng)絡(luò)上
 BaseManager.register('get_dispatched_job_queue')
 BaseManager.register('get_finished_job_queue')

 # 連接master
 server = '127.0.0.1'
 print('Connect to server %s...' % server)
 manager = BaseManager(address=(server, 8888), authkey='jobs')
 manager.connect()

 # 使用上面注冊(cè)的方法獲取隊(duì)列
 dispatched_jobs = manager.get_dispatched_job_queue()
 finished_jobs = manager.get_finished_job_queue()

 # 運(yùn)行作業(yè)并返回結(jié)果,這里只是模擬作業(yè)運(yùn)行,所以返回的是接收到的作業(yè)
 while True:
  job = dispatched_jobs.get(timeout=1)
  print('Run job: %s ' % job.job_id)
  time.sleep(1)
  finished_jobs.put(job)

if __name__ == "__main__":
 slave = Slave()
 slave.start()

測(cè)試

分別打開(kāi)三個(gè)linux終端,第一個(gè)終端運(yùn)行master,第二個(gè)和第三個(gè)終端用了運(yùn)行slave,運(yùn)行結(jié)果如下

master

$ python master.py 
Dispatch job: 1
Dispatch job: 2
Dispatch job: 3
Dispatch job: 4
Dispatch job: 5
Dispatch job: 6
Dispatch job: 7
Dispatch job: 8
Dispatch job: 9
Dispatch job: 10
Finished Job: 1
Finished Job: 2
Finished Job: 3
Finished Job: 4
Finished Job: 5
Finished Job: 6
Finished Job: 7
Finished Job: 8
Finished Job: 9
Dispatch job: 11
Dispatch job: 12
Dispatch job: 13
Dispatch job: 14
Dispatch job: 15
Dispatch job: 16
Dispatch job: 17
Dispatch job: 18
Dispatch job: 19
Dispatch job: 20
Finished Job: 10
Finished Job: 11
Finished Job: 12
Finished Job: 13
Finished Job: 14
Finished Job: 15
Finished Job: 16
Finished Job: 17
Finished Job: 18
Dispatch job: 21
Dispatch job: 22
Dispatch job: 23
Dispatch job: 24
Dispatch job: 25
Dispatch job: 26
Dispatch job: 27
Dispatch job: 28
Dispatch job: 29
Dispatch job: 30

slave1

$ python slave.py 
Connect to server 127.0.0.1...
Run job: 1 
Run job: 2 
Run job: 3 
Run job: 5 
Run job: 7 
Run job: 9 
Run job: 11 
Run job: 13 
Run job: 15 
Run job: 17 
Run job: 19 
Run job: 21 
Run job: 23 

slave2

$ python slave.py 
Connect to server 127.0.0.1...
Run job: 4 
Run job: 6 
Run job: 8 
Run job: 10 
Run job: 12 
Run job: 14 
Run job: 16 
Run job: 18 
Run job: 20 
Run job: 22 
Run job: 24 

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

相關(guān)文章

  • Pycharm無(wú)法正常安裝第三方庫(kù)的幾條應(yīng)對(duì)方法匯總

    Pycharm無(wú)法正常安裝第三方庫(kù)的幾條應(yīng)對(duì)方法匯總

    在使用pycharm學(xué)習(xí)python的時(shí)候,經(jīng)常需要第三方庫(kù),沒(méi)有第三方庫(kù)程序就會(huì)報(bào)錯(cuò),下面這篇文章主要給大家介紹了關(guān)于Pycharm無(wú)法正常安裝第三方庫(kù)的幾條應(yīng)對(duì)方法,需要的朋友可以參考下
    2023-04-04
  • Python2.x與3​​.x版本有哪些區(qū)別

    Python2.x與3​​.x版本有哪些區(qū)別

    這篇文章主要介紹了Python2.x與3​​.x版本有哪些區(qū)別,文中講解非常詳細(xì),幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下
    2020-07-07
  • 全面解讀Python Web開(kāi)發(fā)框架Django

    全面解讀Python Web開(kāi)發(fā)框架Django

    Django是一個(gè)開(kāi)源的Web應(yīng)用框架,由Python寫(xiě)成。采用MVC的軟件設(shè)計(jì)模式,主要目標(biāo)是使得開(kāi)發(fā)復(fù)雜的、數(shù)據(jù)庫(kù)驅(qū)動(dòng)的網(wǎng)站變得簡(jiǎn)單。Django注重組件的重用性和“可插拔性”,敏捷開(kāi)發(fā)和DRY法則(Don’t Repeat Yoursef)。
    2014-06-06
  • python docx如何修改word表格內(nèi)容

    python docx如何修改word表格內(nèi)容

    使用Python-docx庫(kù),可以方便地修改Word文檔中的表格內(nèi)容,首先需要安裝python-docx庫(kù),然后使用該庫(kù)打開(kāi)Word文檔,遍歷文檔中的表格并修改指定單元格內(nèi)容,最后另存為新文檔
    2024-09-09
  • python腳本當(dāng)作Linux中的服務(wù)啟動(dòng)實(shí)現(xiàn)方法

    python腳本當(dāng)作Linux中的服務(wù)啟動(dòng)實(shí)現(xiàn)方法

    今天小編就為大家分享一篇python腳本當(dāng)作Linux中的服務(wù)啟動(dòng)實(shí)現(xiàn)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-06-06
  • Python批量獲取基金數(shù)據(jù)的方法步驟

    Python批量獲取基金數(shù)據(jù)的方法步驟

    這篇文章主要介紹了Python批量獲取基金數(shù)據(jù)的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • python爬蟲(chóng)模擬瀏覽器訪問(wèn)-User-Agent過(guò)程解析

    python爬蟲(chóng)模擬瀏覽器訪問(wèn)-User-Agent過(guò)程解析

    這篇文章主要介紹了python爬蟲(chóng)模擬瀏覽器訪問(wèn)-User-Agent過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12
  • python 第三方庫(kù)的安裝及pip的使用詳解

    python 第三方庫(kù)的安裝及pip的使用詳解

    下面小編就為大家?guī)?lái)一篇python 第三方庫(kù)的安裝及pip的使用詳解。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-05-05
  • pycharm中使用request和Pytest進(jìn)行接口測(cè)試的方法

    pycharm中使用request和Pytest進(jìn)行接口測(cè)試的方法

    這篇文章主要介紹了pycharm中使用request和Pytest進(jìn)行接口測(cè)試的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-07-07
  • 對(duì)Pytorch中Tensor的各種池化操作解析

    對(duì)Pytorch中Tensor的各種池化操作解析

    今天小編就為大家一篇對(duì)Pytorch中Tensor的各種池化操作解析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-01-01

最新評(píng)論