欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python利用multiprocessing實現(xiàn)最簡單的分布式作業(yè)調(diào)度系統(tǒng)實例

 更新時間:2017年11月14日 11:44:06   作者:Parle  
這篇文章主要給大家介紹了關于Python利用multiprocessing如何實現(xiàn)最簡單的分布式作業(yè)調(diào)度系統(tǒng)的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面來一起看看吧。

介紹

Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調(diào)度者,將任務分布到其他多個機器的多個進程中,依靠網(wǎng)絡通信。想到這,就在想是不是可以使用此模塊來實現(xiàn)一個簡單的作業(yè)調(diào)度系統(tǒng)。在這之前,我們先來詳細了解下python中的多進程管理包multiprocessing。

multiprocessing.Process

multiprocessing包是Python中的多進程管理包。它與 threading.Thread類似,可以利用multiprocessing.Process對象來創(chuàng)建一個進程。該進程可以允許放在Python程序內(nèi)部編寫的函數(shù)中。該Process對象與Thread對象的用法相同,擁有is_alive()、join([timeout])、run()、start()、terminate()等方法。屬性有:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結(jié)束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類,用來同步進程,其用法也與threading包中的同名類一樣。multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

這個模塊表示像線程一樣管理進程,這個是multiprocessing的核心,它與threading很相似,對多核CPU的利用率會比threading好的多。

看一下Process類的構(gòu)造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

參數(shù)說明:

  • group:進程所屬組。基本不用
  • target:表示調(diào)用對象。
  • args:表示調(diào)用對象的位置參數(shù)元組。
  • name:別名
  • kwargs:表示調(diào)用對象的字典。

創(chuàng)建進程的簡單實例:

#coding=utf-8
import multiprocessing

def do(n) :
 #獲取當前線程的名字
 name = multiprocessing.current_process().name
 print name,'starting'
 print "worker ", n
 return 

if __name__ == '__main__' :
 numList = []
 for i in xrange(5) :
 p = multiprocessing.Process(target=do, args=(i,))
 numList.append(p)
 p.start()
 p.join()
 print "Process end."

執(zhí)行結(jié)果:

Process-1 starting
worker 0
Process end.
Process-2 starting
worker 1
Process end.
Process-3 starting
worker 2
Process end.
Process-4 starting
worker 3
Process end.
Process-5 starting
worker 4
Process end.

創(chuàng)建子進程時,只需要傳入一個執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個Process實例,并用其start()方法啟動,join()方法表示等待子進程結(jié)束以后再繼續(xù)往下運行,通常用于進程間的同步。

注意:

在Windows上要想使用進程模塊,就必須把有關進程的代碼寫在當前.py文件的if __name__ == ‘__main__' :語句的下面,才能正常使用Windows下的進程模塊。Unix/Linux下則不需要。

multiprocess.Pool

當被操作對象數(shù)目不大時,可以直接利用multiprocessing中的Process動態(tài)成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數(shù)量卻又太過繁瑣,此時可以發(fā)揮進程池的功效。

Pool可以提供指定數(shù)量的進程供用戶調(diào)用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值,那么該請求就會等待,直到池中有進程結(jié)束,才會創(chuàng)建新的進程來它。

apply_async和apply

函數(shù)原型:

apply_async(func[, args=()[, kwds={}[, callback=None]]])

二者都是向進程池中添加新的進程,不同的時,apply每次添加新的進程時,主進程和新的進程會并行執(zhí)行,但是主進程會阻塞,直到新進程的函數(shù)執(zhí)行結(jié)束。 這是很低效的,所以python3.x之后不再使用

apply_async和apply功能相同,但是主進程不會阻塞。

# -*- coding:utf-8 -*-

import multiprocessing
import time

def func(msg):
 print "*msg: ", msg
 time.sleep(3)
 print "*end"

if __name__ == "__main__":
 # 維持執(zhí)行的進程總數(shù)為processes,當一個進程執(zhí)行完畢后會添加新的進程進去
 pool = multiprocessing.Pool(processes=3)
 for i in range(10):
 msg = "hello [{}]".format(i)
 # pool.apply(func, (msg,))
 pool.apply_async(func, (msg,)) # 異步開啟進程, 非阻塞型, 能夠向池中添加進程而不等待其執(zhí)行完畢就能再次執(zhí)行循環(huán)

 print "--" * 10
 pool.close() # 關閉pool, 則不會有新的進程添加進去
 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢
 print "All process done."

運行結(jié)果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py
--------------------
*msg: hello [0]
*msg: hello [1]
*msg: hello [2]
*end
*msg: hello [3]
*end
*end
*msg: hello [4]
*msg: hello [5]
*end
*msg: hello [6]
*end
*end
*msg: hello [7]
*msg: hello [8]
*end
*msg: hello [9]
*end*end

*end
All process done.

Process finished with exit code 0

獲得進程的執(zhí)行結(jié)果

# -*- coding:utf-8 -*-

import multiprocessing
import time

def func_with_return(msg):
 print "*msg: ", msg
 time.sleep(3)
 print "*end"
 return "{} return".format(msg)

if __name__ == "__main__":
 # 維持執(zhí)行的進程總數(shù)為processes,當一個進程執(zhí)行完畢后會添加新的進程進去
 pool = multiprocessing.Pool(processes=3)
 results = []
 for i in range(10):
 msg = "hello [{}]".format(i)
 res = pool.apply_async(func_with_return, (msg,)) # 異步開啟進程, 非阻塞型, 能夠向池中添加進程而不等待其執(zhí)行完畢就能再次執(zhí)行循環(huán)
 results.append(res)

 print "--" * 10
 pool.close() # 關閉pool, 則不會有新的進程添加進去
 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢
 print "All process done."

 print "Return results: "
 for i in results:
 print i.get() # 獲得進程的執(zhí)行結(jié)果

結(jié)果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py
--------------------
*msg: hello [0]
*msg: hello [1]
*msg: hello [2]
*end
*end
*msg: hello [3]
*msg: hello [4]
*end
*msg: hello [5]
*end
*end
*msg: hello [6]
*msg: hello [7]
*end
*msg: hello [8]
*end
*end
*msg: hello [9]
*end
*end
All process done.
Return results: 
hello [0] return
hello [1] return
hello [2] return
hello [3] return
hello [4] return
hello [5] return
hello [6] return
hello [7] return
hello [8] return
hello [9] return

Process finished with exit code 0

map

函數(shù)原型:

map(func, iterable[, chunksize=None])

Pool類中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會使進程阻塞直到返回結(jié)果。

注意,雖然第二個參數(shù)是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。

# -*- coding:utf-8 -*-

import multiprocessing
import time

def func_with_return(msg):
 print "*msg: ", msg
 time.sleep(3)
 print "*end"
 return "{} return".format(msg)

if __name__ == "__main__":
 # 維持執(zhí)行的進程總數(shù)為processes,當一個進程執(zhí)行完畢后會添加新的進程進去
 pool = multiprocessing.Pool(processes=3)
 results = []
 msgs = []
 for i in range(10):
 msg = "hello [{}]".format(i)
 msgs.append(msg)

 results = pool.map(func_with_return, msgs)

 print "--" * 10
 pool.close() # 關閉pool, 則不會有新的進程添加進去
 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢
 print "All process done."

 print "Return results: "
 for i in results:
 print i # 獲得進程的執(zhí)行結(jié)果

執(zhí)行結(jié)果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v2.py
*msg: hello [0]
*msg: hello [1]
*msg: hello [2]
*end*end

*msg: hello [3]
*msg: hello [4]
*end
*msg: hello [5]
*end*end

*msg: hello [6]
*msg: hello [7]
*end
*msg: hello [8]
*end
*end
*msg: hello [9]
*end
*end
--------------------
All process done.
Return results: 
hello [0] return
hello [1] return
hello [2] return
hello [3] return
hello [4] return
hello [5] return
hello [6] return
hello [7] return
hello [8] return
hello [9] return

Process finished with exit code 0

注意:執(zhí)行結(jié)果中“—-”的位置,可以看到,map之后,主進程是阻塞的,等待map的結(jié)果返回

close()

關閉進程池(pool),使其不在接受新的任務。

terminate()

結(jié)束工作進程,不在處理未處理的任務。

join()

主進程阻塞等待子進程的退出,join方法必須在close或terminate之后使用。

進程間通信

多進程最麻煩的地方就是進程間通信,IPC比線程通信要難處理的多,所以留作單獨一篇來記錄

利用multiprocessing實現(xiàn)一個最簡單的分布式作業(yè)調(diào)度系統(tǒng)

Job

首先創(chuàng)建一個Job類,為了測試簡單,只包含一個job id屬性,將來可以封裝一些作業(yè)狀態(tài),作業(yè)命令,執(zhí)行用戶等屬性。

job.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

class Job:
 def __init__(self, job_id):
 self.job_id = job_id

Master

Master用來派發(fā)作業(yè)和顯示運行完成的作業(yè)信息

master.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job


class Master:

 def __init__(self):
 # 派發(fā)出去的作業(yè)隊列
 self.dispatched_job_queue = Queue()
 # 完成的作業(yè)隊列
 self.finished_job_queue = Queue()

 def get_dispatched_job_queue(self):
 return self.dispatched_job_queue

 def get_finished_job_queue(self):
 return self.finished_job_queue

 def start(self):
 # 把派發(fā)作業(yè)隊列和完成作業(yè)隊列注冊到網(wǎng)絡上
 BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue)
 BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue)

 # 監(jiān)聽端口和啟動服務
 manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs')
 manager.start()

 # 使用上面注冊的方法獲取隊列
 dispatched_jobs = manager.get_dispatched_job_queue()
 finished_jobs = manager.get_finished_job_queue()

 # 這里一次派發(fā)10個作業(yè),等到10個作業(yè)都運行完后,繼續(xù)再派發(fā)10個作業(yè)
 job_id = 0
 while True:
  for i in range(0, 10):
  job_id = job_id + 1
  job = Job(job_id)
  print('Dispatch job: %s' % job.job_id)
  dispatched_jobs.put(job)

  while not dispatched_jobs.empty():
  job = finished_jobs.get(60)
  print('Finished Job: %s' % job.job_id)

 manager.shutdown()

if __name__ == "__main__":
 master = Master()
 master.start()

Slave

Slave用來運行master派發(fā)的作業(yè)并將結(jié)果返回

slave.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job


class Slave:

 def __init__(self):
 # 派發(fā)出去的作業(yè)隊列
 self.dispatched_job_queue = Queue()
 # 完成的作業(yè)隊列
 self.finished_job_queue = Queue()

 def start(self):
 # 把派發(fā)作業(yè)隊列和完成作業(yè)隊列注冊到網(wǎng)絡上
 BaseManager.register('get_dispatched_job_queue')
 BaseManager.register('get_finished_job_queue')

 # 連接master
 server = '127.0.0.1'
 print('Connect to server %s...' % server)
 manager = BaseManager(address=(server, 8888), authkey='jobs')
 manager.connect()

 # 使用上面注冊的方法獲取隊列
 dispatched_jobs = manager.get_dispatched_job_queue()
 finished_jobs = manager.get_finished_job_queue()

 # 運行作業(yè)并返回結(jié)果,這里只是模擬作業(yè)運行,所以返回的是接收到的作業(yè)
 while True:
  job = dispatched_jobs.get(timeout=1)
  print('Run job: %s ' % job.job_id)
  time.sleep(1)
  finished_jobs.put(job)

if __name__ == "__main__":
 slave = Slave()
 slave.start()

測試

分別打開三個linux終端,第一個終端運行master,第二個和第三個終端用了運行slave,運行結(jié)果如下

master

$ python master.py 
Dispatch job: 1
Dispatch job: 2
Dispatch job: 3
Dispatch job: 4
Dispatch job: 5
Dispatch job: 6
Dispatch job: 7
Dispatch job: 8
Dispatch job: 9
Dispatch job: 10
Finished Job: 1
Finished Job: 2
Finished Job: 3
Finished Job: 4
Finished Job: 5
Finished Job: 6
Finished Job: 7
Finished Job: 8
Finished Job: 9
Dispatch job: 11
Dispatch job: 12
Dispatch job: 13
Dispatch job: 14
Dispatch job: 15
Dispatch job: 16
Dispatch job: 17
Dispatch job: 18
Dispatch job: 19
Dispatch job: 20
Finished Job: 10
Finished Job: 11
Finished Job: 12
Finished Job: 13
Finished Job: 14
Finished Job: 15
Finished Job: 16
Finished Job: 17
Finished Job: 18
Dispatch job: 21
Dispatch job: 22
Dispatch job: 23
Dispatch job: 24
Dispatch job: 25
Dispatch job: 26
Dispatch job: 27
Dispatch job: 28
Dispatch job: 29
Dispatch job: 30

slave1

$ python slave.py 
Connect to server 127.0.0.1...
Run job: 1 
Run job: 2 
Run job: 3 
Run job: 5 
Run job: 7 
Run job: 9 
Run job: 11 
Run job: 13 
Run job: 15 
Run job: 17 
Run job: 19 
Run job: 21 
Run job: 23 

slave2

$ python slave.py 
Connect to server 127.0.0.1...
Run job: 4 
Run job: 6 
Run job: 8 
Run job: 10 
Run job: 12 
Run job: 14 
Run job: 16 
Run job: 18 
Run job: 20 
Run job: 22 
Run job: 24 

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。

相關文章

  • Pycharm無法正常安裝第三方庫的幾條應對方法匯總

    Pycharm無法正常安裝第三方庫的幾條應對方法匯總

    在使用pycharm學習python的時候,經(jīng)常需要第三方庫,沒有第三方庫程序就會報錯,下面這篇文章主要給大家介紹了關于Pycharm無法正常安裝第三方庫的幾條應對方法,需要的朋友可以參考下
    2023-04-04
  • Python2.x與3​​.x版本有哪些區(qū)別

    Python2.x與3​​.x版本有哪些區(qū)別

    這篇文章主要介紹了Python2.x與3​​.x版本有哪些區(qū)別,文中講解非常詳細,幫助大家更好的理解和學習,感興趣的朋友可以了解下
    2020-07-07
  • 全面解讀Python Web開發(fā)框架Django

    全面解讀Python Web開發(fā)框架Django

    Django是一個開源的Web應用框架,由Python寫成。采用MVC的軟件設計模式,主要目標是使得開發(fā)復雜的、數(shù)據(jù)庫驅(qū)動的網(wǎng)站變得簡單。Django注重組件的重用性和“可插拔性”,敏捷開發(fā)和DRY法則(Don’t Repeat Yoursef)。
    2014-06-06
  • python docx如何修改word表格內(nèi)容

    python docx如何修改word表格內(nèi)容

    使用Python-docx庫,可以方便地修改Word文檔中的表格內(nèi)容,首先需要安裝python-docx庫,然后使用該庫打開Word文檔,遍歷文檔中的表格并修改指定單元格內(nèi)容,最后另存為新文檔
    2024-09-09
  • python腳本當作Linux中的服務啟動實現(xiàn)方法

    python腳本當作Linux中的服務啟動實現(xiàn)方法

    今天小編就為大家分享一篇python腳本當作Linux中的服務啟動實現(xiàn)方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2019-06-06
  • Python批量獲取基金數(shù)據(jù)的方法步驟

    Python批量獲取基金數(shù)據(jù)的方法步驟

    這篇文章主要介紹了Python批量獲取基金數(shù)據(jù)的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-03-03
  • python爬蟲模擬瀏覽器訪問-User-Agent過程解析

    python爬蟲模擬瀏覽器訪問-User-Agent過程解析

    這篇文章主要介紹了python爬蟲模擬瀏覽器訪問-User-Agent過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-12-12
  • python 第三方庫的安裝及pip的使用詳解

    python 第三方庫的安裝及pip的使用詳解

    下面小編就為大家?guī)硪黄猵ython 第三方庫的安裝及pip的使用詳解。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-05-05
  • pycharm中使用request和Pytest進行接口測試的方法

    pycharm中使用request和Pytest進行接口測試的方法

    這篇文章主要介紹了pycharm中使用request和Pytest進行接口測試的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-07-07
  • 對Pytorch中Tensor的各種池化操作解析

    對Pytorch中Tensor的各種池化操作解析

    今天小編就為大家一篇對Pytorch中Tensor的各種池化操作解析,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-01-01

最新評論