Python多進(jìn)程multiprocessing用法實(shí)例分析
本文實(shí)例講述了Python多進(jìn)程multiprocessing用法。分享給大家供大家參考,具體如下:
mutilprocess簡(jiǎn)介
像線程一樣管理進(jìn)程,這個(gè)是mutilprocess的核心,他與threading很是相像,對(duì)多核CPU的利用率會(huì)比threading好的多。
簡(jiǎn)單的創(chuàng)建進(jìn)程:
import multiprocessing
def worker(num):
"""thread worker function"""
print 'Worker:', num
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
確定當(dāng)前的進(jìn)程,即是給進(jìn)程命名,方便標(biāo)識(shí)區(qū)分,跟蹤
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(2)
print name, 'Exiting'
def my_service():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(3)
print name, 'Exiting'
if __name__ == '__main__':
service = multiprocessing.Process(name='my_service',
target=my_service)
worker_1 = multiprocessing.Process(name='worker 1',
target=worker)
worker_2 = multiprocessing.Process(target=worker) # default name
worker_1.start()
worker_2.start()
service.start()
守護(hù)進(jìn)程就是不阻擋主程序退出,自己干自己的 mutilprocess.setDaemon(True)就這句等待守護(hù)進(jìn)程退出,要加上join,join可以傳入浮點(diǎn)數(shù)值,等待n久就不等了
守護(hù)進(jìn)程:
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print 'Starting:', name
time.sleep(2)
print 'Exiting :', name
def non_daemon():
name = multiprocessing.current_process().name
print 'Starting:', name
print 'Exiting :', name
if __name__ == '__main__':
d = multiprocessing.Process(name='daemon',
target=daemon)
d.daemon = True
n = multiprocessing.Process(name='non-daemon',
target=non_daemon)
n.daemon = False
d.start()
n.start()
d.join(1)
print 'd.is_alive()', d.is_alive()
n.join()
最好使用 poison pill,強(qiáng)制的使用terminate()注意 terminate之后要join,使其可以更新?tīng)顟B(tài)
終止進(jìn)程:
import multiprocessing import time def slow_worker(): print 'Starting worker' time.sleep(0.1) print 'Finished worker' if __name__ == '__main__': p = multiprocessing.Process(target=slow_worker) print 'BEFORE:', p, p.is_alive() p.start() print 'DURING:', p, p.is_alive() p.terminate() print 'TERMINATED:', p, p.is_alive() p.join() print 'JOINED:', p, p.is_alive()
①. == 0 未生成任何錯(cuò)誤
②. 0 進(jìn)程有一個(gè)錯(cuò)誤,并以該錯(cuò)誤碼退出
③. < 0 進(jìn)程由一個(gè)-1 * exitcode信號(hào)結(jié)束
進(jìn)程的退出狀態(tài):
import multiprocessing
import sys
import time
def exit_error():
sys.exit(1)
def exit_ok():
return
def return_value():
return 1
def raises():
raise RuntimeError('There was an error!')
def terminated():
time.sleep(3)
if __name__ == '__main__':
jobs = []
for f in [exit_error, exit_ok, return_value, raises, terminated]:
print 'Starting process for', f.func_name
j = multiprocessing.Process(target=f, name=f.func_name)
jobs.append(j)
j.start()
jobs[-1].terminate()
for j in jobs:
j.join()
print '%15s.exitcode = %s' % (j.name, j.exitcode)
方便的調(diào)試,可以用logging
日志:
import multiprocessing import logging import sys def worker(): print 'Doing some work' sys.stdout.flush() if __name__ == '__main__': multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) p = multiprocessing.Process(target=worker) p.start() p.join()
利用class來(lái)創(chuàng)建進(jìn)程,定制子類(lèi)
派生進(jìn)程:
import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print 'In %s' % self.name
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
python進(jìn)程間傳遞消息:
import multiprocessing
class MyFancyClass(object):
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print 'Doing something fancy in %s for %s!' % \
(proc_name, self.name)
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
print '%s: %s' % (proc_name, next_task)
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print 'Creating %d consumers' % num_consumers
consumers = [ Consumer(tasks, results)
for i in xrange(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in xrange(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in xrange(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print 'Result:', result
num_jobs -= 1
Event提供一種簡(jiǎn)單的方法,可以在進(jìn)程間傳遞狀態(tài)信息。事件可以切換設(shè)置和未設(shè)置狀態(tài)。通過(guò)使用一個(gè)可選的超時(shí)值,時(shí)間對(duì)象的用戶可以等待其狀態(tài)從未設(shè)置變?yōu)樵O(shè)置。
進(jìn)程間信號(hào)傳遞:
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print 'wait_for_event: starting'
e.wait()
print 'wait_for_event: e.is_set()->', e.is_set()
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print 'wait_for_event_timeout: starting'
e.wait(t)
print 'wait_for_event_timeout: e.is_set()->', e.is_set()
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(name='block',
target=wait_for_event,
args=(e,))
w1.start()
w2 = multiprocessing.Process(name='nonblock',
target=wait_for_event_timeout,
args=(e, 2))
w2.start()
print 'main: waiting before calling Event.set()'
time.sleep(3)
e.set()
print 'main: event is set'
Python多進(jìn)程,一般的情況是Queue來(lái)傳遞。
Queue:
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join()
多線程優(yōu)先隊(duì)列Queue:
import Queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print "Starting " + self.name
process_data(self.name, self.q)
print "Exiting " + self.name
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
queueLock.release()
print "%s processing %s" % (threadName, data)
else:
queueLock.release()
time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1
# Create new threads
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# Fill the queue
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# Wait for queue to empty
while not workQueue.empty():
pass
# Notify threads it's time to exit
exitFlag = 1
# Wait for all threads to complete
for t in threads:
t.join()
print "Exiting Main Thread"
多進(jìn)程使用Queue通信的例子
import time
from multiprocessing import Process,Queue
MSG_QUEUE = Queue(5)
def startA(msgQueue):
while True:
if msgQueue.empty() > 0:
print ('queue is empty %d' % (msgQueue.qsize()))
else:
msg = msgQueue.get()
print( 'get msg %s' % (msg,))
time.sleep(1)
def startB(msgQueue):
while True:
msgQueue.put('hello world')
print( 'put hello world queue size is %d' % (msgQueue.qsize(),))
time.sleep(3)
if __name__ == '__main__':
processA = Process(target=startA,args=(MSG_QUEUE,))
processB = Process(target=startB,args=(MSG_QUEUE,))
processA.start()
print( 'processA start..')
主進(jìn)程定義了一個(gè)Queue類(lèi)型的變量,并作為Process的args參數(shù)傳給子進(jìn)程processA和processB,兩個(gè)進(jìn)程一個(gè)向隊(duì)列中寫(xiě)數(shù)據(jù),一個(gè)讀數(shù)據(jù)。
更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專(zhuān)題:《Python進(jìn)程與線程操作技巧總結(jié)》、《Python Socket編程技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門(mén)與進(jìn)階經(jīng)典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對(duì)大家Python程序設(shè)計(jì)有所幫助。
- Python多進(jìn)程并發(fā)(multiprocessing)用法實(shí)例詳解
- Python3多進(jìn)程 multiprocessing 模塊實(shí)例詳解
- python multiprocessing多進(jìn)程變量共享與加鎖的實(shí)現(xiàn)
- Python標(biāo)準(zhǔn)庫(kù)之多進(jìn)程(multiprocessing包)介紹
- python基于multiprocessing的多進(jìn)程創(chuàng)建方法
- python multiprocessing 多進(jìn)程并行計(jì)算的操作
- 簡(jiǎn)單學(xué)習(xí)Python多進(jìn)程Multiprocessing
- Python使用multiprocessing實(shí)現(xiàn)多進(jìn)程的詳細(xì)步驟記錄
相關(guān)文章
Python實(shí)現(xiàn)簡(jiǎn)單的"導(dǎo)彈" 自動(dòng)追蹤原理解析
這篇文章主要介紹了Python實(shí)現(xiàn)簡(jiǎn)單的"導(dǎo)彈" 自動(dòng)追蹤原理解析,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03
通過(guò)實(shí)例解析python創(chuàng)建進(jìn)程常用方法
這篇文章主要介紹了通過(guò)實(shí)例解析python創(chuàng)建進(jìn)程常用方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06
Scrapy中詭異xpath的匹配內(nèi)容失效問(wèn)題及解決
這篇文章主要介紹了Scrapy中詭異xpath的匹配內(nèi)容失效問(wèn)題及解決方案,具有很好的價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12
python tensorflow基于cnn實(shí)現(xiàn)手寫(xiě)數(shù)字識(shí)別
這篇文章主要為大家詳細(xì)介紹了python tensorflow基于cnn實(shí)現(xiàn)手寫(xiě)數(shù)字識(shí)別,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01
python爬蟲(chóng)之你好,李煥英電影票房數(shù)據(jù)分析
這篇文章主要介紹了python爬蟲(chóng)之你好,李煥英電影票房數(shù)據(jù)分析,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)python爬蟲(chóng)的小伙伴們有一定的幫助,需要的朋友可以參考下2021-04-04

