關于python多進程中的常用方法詳解
1、進程間的通信
全局變量在多個進程中不共享資源,進程之間的數(shù)據(jù)是獨立的,默認情況下是互不影響的。
示例代碼:
from multiprocessing import Process
num = 1
def task1():
global num
num += 5
print("子進程1運行,num:", num)
def task2():
global num
num += 10
print("子進程2運行,num:", num)
if __name__ == '__main__':
print("父進程開始運行...")
p1 = Process(target=task1)
p2 = Process(target=task2)
p1.start()
p2.start()
p1.join()
p2.join()運行結果:

2、用 Queue()實現(xiàn)多進程之間的數(shù)據(jù)傳遞
Queue 是多進程安全的隊列,可以使用 Queue 實現(xiàn)多進程之間的數(shù)據(jù)傳遞。
put 方法用以插入數(shù)據(jù)到隊列中, put 方法還有兩個可選參數(shù): blocked 和 timeout。如果 blocked 為 True(默認值),并且 timeout 為正值,該方法會阻塞 timeout 指定的時間,直到該隊列有剩余的空間。如果超時,會拋出 Queue.full 異常。如果 blocked 為 False,但該 Queue 已滿,會立即拋出 Queue.full 異常。
get 方法可以從隊列讀取并且刪除一個元素。同樣, get 方法有兩個可選參數(shù): blocked和 timeout。如果 blocked 為 True(默認值),并且 timeout 為正值,那么在等待時間內(nèi)沒有取到任何元素,會拋出 Queue.Empty 異常。如果 blocked 為 False,有兩種情況存在,如果Queue 有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty 異常
示例代碼:
from multiprocessing import Queue
q = Queue(3)
q.put('msg1')
q.put('msg2')
print('消息隊列是否已滿:', q.full())
q.put('msg3')
print('消息隊列是否已滿:', q.full())
# q.put('msg4') # 以為消息隊列已經(jīng)滿了,需要直接寫入需要等待,如果超時會拋出異常
# 寫入數(shù)據(jù)時先判斷,判斷隊列是否已滿
if not q.full():
q.put('msg4')
# 同理,取消息時可以先判斷隊列是否有數(shù)據(jù)
if not q.empty():
for _ in range(q.qsize()):
print(q.get())運行結果:

示例代碼:
from multiprocessing import Process
from multiprocessing import Queue
import random
import os
# 向queue中輸入數(shù)據(jù)的函數(shù)
def inputQ(queue):
info = random.randint(1, 100)
queue.put(info)
print('進程{}往隊列中存了一個數(shù)據(jù):{}'.format(os.getpid(), info))
# 向queue中輸出數(shù)據(jù)的函數(shù)
def outputQ(queue):
info = queue.get()
print('進程{}從隊列中取出一個數(shù)據(jù):{}'.format(os.getpid(), info))
if __name__ == '__main__':
queue = Queue(5)
lst_1 = []
lst_2 = []
for i in range(3):
process = Process(target=inputQ, args=(queue,))
process.start()
lst_1.append(process)
# 輸出進程
for i in range(2):
process = Process(target=outputQ, args=(queue,))
process.start()
lst_2.append(process)
for p in lst_1:
p.join()
for p in lst_2:
p.join()運行結果:

3、Queue 隊列實現(xiàn)進程間通信
示例代碼:
import time
from multiprocessing import Queue, Process
def write_data(q):
# 將列表元素寫入到隊列中
for i in ['aa', 'bb', 'cc', 'dd']:
print('開始寫入值%s' %i)
q.put(i)
time.sleep(1)
def read_data(q):
print("開始讀取數(shù)據(jù)...")
while True:
if not q.empty():
print("讀取到數(shù)據(jù):", q.get())
time.sleep(1)
else:
break
if __name__ == '__main__':
# 創(chuàng)建隊列
q = Queue()
# 創(chuàng)建進程
qw = Process(target=write_data, args=(q, ))
qr = Process(target=read_data, args=(q, ))
# 啟動進程
qw.start()
qr.start()
qw.join()
qr.join()運行結果:

4、Manage()的使用
如果使用 Pool 創(chuàng)建進程,就需要使用 multiprocessing.Manager()中的 Queue()來完成進程間的通信,而不是 multiprocessing.Queue(),否則會拋出異常。
示例代碼:
import time
from multiprocessing import Manager, Pool, Queue
def write_data(q):
# 將列表元素寫入到隊列中
for i in ['aa', 'bb', 'cc', 'dd']:
print('開始寫入值%s' %i)
q.put(i)
time.sleep(1)
def read_data(q):
print("開始讀取數(shù)據(jù)...")
while True:
if not q.empty():
print("讀取到數(shù)據(jù):", q.get())
time.sleep(1)
else:
break
if __name__ == '__main__':
# 創(chuàng)建隊列
q = Manager().Queue()
# q = Queue() # 直接這樣使用Queue()會報錯
# 創(chuàng)建進程池
p = Pool(3)
# 使用apply阻塞模式創(chuàng)建進程
p.apply(write_data, (q, ))
p.apply(read_data, (q, ))
p.close()
p.join()運行結果:

示例代碼:
import os
from multiprocessing import Manager, Process
# 定義了一個foo函數(shù),接收一個字典和一個列表
def foo(dic, lst):
# 字典和列表都放進程ID
dic[os.getpid()] = os.getpid()
lst.append(os.getpid())
if __name__ == '__main__':
# 生成Manager對象
manager = Manager()
dic = manager.dict()
print(dic)
lst = manager.list(range(3))
print(lst)
# 10個進程分別join
p_list = []
for i in range(10):
p = Process(target=foo, args=(dic, lst))
p.start()
p_list.append(p)
for res in p_list:
res.join()
# 打印字典和列表
print(dic)
print(lst)運行結果:

示例代碼:
from multiprocessing import Pool, current_process, Manager
import time
def produce_data(queue):
for i in range(10):
queue.put(i)
def consume_data(queue):
while queue.qsize() > 0:
data = queue.get() # 注意:當get()拿不到數(shù)據(jù)時,會一直處于等待狀態(tài)
print(f"當前進程為:{current_process().name}, 隊列獲取數(shù)據(jù)為:{data},隊列剩余數(shù)據(jù)為:{queue.qsize()}個!")
time.sleep(0.01)
if __name__ == '__main__':
print(f"主進程{current_process().name}開始執(zhí)行!")
p = Pool(processes=6, maxtasksperchild=6)
queue = Manager().Queue(maxsize=20)
p.apply_async(produce_data, args=(queue, ))
time.sleep(1)
for i in range(5):
p.apply_async(consume_data, args=(queue, ))
p.close() # 關閉進程池,防止將任何其他任務提交到池中。需要在join之前調(diào)用,否則會報ValueError: Pool is still running錯誤
p.join() # 等待進程池中的所有進程執(zhí)行完畢
print(f"主進程{current_process().name}結束!")運行結果:

注意:
- p.close() # 關閉進程池,防止將任何其他任務提交到池中。需要在join之前調(diào)用,否則會報ValueError: Pool is still running錯誤
- p.join() # 等待進程池中的所有進程執(zhí)行完畢
- p.close()是關掉進程池子,是不再向里面添加進程了,對Pool對象調(diào)用join()方法會等待所有子進程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的Process了。
5、current_process()的使用
示例代碼:
from multiprocessing import Process, current_process
import time
import random
lst = []
def task(i):
print(current_process().name, i, 'start...') # current_process().name輸出進程的名字
time.sleep(random.randint(1, 4))
lst.append(i)
print(lst)
print(current_process().name, i, 'end.....')
if __name__ == "__main__":
p_lst = []
for i in range(4):
p = Process(target=task, args=(i, ))
p_lst.append(p)
p.start()
for p in p_lst:
p.join() # 阻塞當前進程,直到子進程全部退出
print("main end.......")運行結果:

6、進程池
進程池里有固定數(shù)量的進程,每次執(zhí)行任務時都從進程池中取出一個空閑進程來執(zhí)行,如果任務數(shù)量超過進程池中進程數(shù)量,那么就等待已經(jīng)在執(zhí)行的任務結束之后,有進程空閑之后再執(zhí)行,也就是說,同一時間,只有固定數(shù)量的進程在執(zhí)行,這樣對操作系統(tǒng)得壓力也不會太大,效率也得到保證。
示例代碼:
from multiprocessing import Pool, current_process
import time
import random
lst = []
def task(i):
print(current_process().name, i, 'start...')
time.sleep(random.randint(1, 5))
lst.append(i)
print(lst)
print(current_process().name, i, 'end.....')
if __name__ == "__main__":
p = Pool(processes=3, maxtasksperchild=3)
for i in range(10):
p.apply_async(func=task, args=(i,)) # 進程池接收任務
p.close() # 關閉進程池 ==》 不接受任務
p.join() # 等待子進程執(zhí)行完畢,父進程再執(zhí)行
print("end.............")運行結果:

示例代碼: 【同步執(zhí)行】
import os
import time
import random
from multiprocessing import Pool
def func1(n):
print('任務{(diào)}開始執(zhí)行,進程為:{}'.format(n, os.getpid()))
time.sleep(random.randint(1, 4))
print('任務{(diào)}結束執(zhí)行,進程為:{}'.format(n, os.getpid()))
if __name__ == '__main__':
# c創(chuàng)建一個進程池,里面有三個進程
p = Pool(3)
for i in range(10):
res = p.apply(func1, args=(i,))運行結果:

示例代碼: 【異步執(zhí)行】
import os
import time
import random
from multiprocessing import Pool
def func1(n):
print('任務{(diào)}開始執(zhí)行,進程為:{}'.format(n, os.getpid()))
time.sleep(random.randint(1, 4))
print('任務{(diào)}結束執(zhí)行,進程為:{}'.format(n, os.getpid()))
if __name__ == '__main__':
# c創(chuàng)建一個進程池,里面有三個進程
p = Pool(3)
for i in range(5):
res = p.apply_async(func1, args=(i,))
p.close() # 一定要關閉
p.join() # 一定要使用join,不然進程池里的進程沒來得及執(zhí)行,主進程結束了,子進程也都跟著結束。運行結果:

7、進程共享變量
共享變量不適用于多進程,進程間的變量是互相隔離的,子進程的全局變量是完全復制一份父進程的數(shù)據(jù),對子進程的全局變量修改完全影響不到其他進程的全局變量。
示例代碼:
import time
from multiprocessing import Process
def producer(a):
a += 1
time.sleep(2)
def consumer(a):
time.sleep(3)
data = a
print(data)
if __name__ == "__main__":
a = 1
my_producer = Process(target=producer, args=(a, ))
my_consumer = Process(target=consumer, args=(a, ))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
# 輸出結果為1運行結果:

示例代碼: 【進程之間的變量是無法共享的,即使是全局變量也是不能共享的】
from multiprocessing import Process
import os
def func():
global n
n = 10
print('子進程pid:{},n:{}'.format(os.getppid(), n))
if __name__ == '__main__':
n = 100
print('主進程pid:{},n:{}'.format(os.getppid(), n))
p = Process(target=func)
p.start()
p.join()
print('主進程中輸出n:{}'.format(n))運行結果:

8、管道Pipe(兩進程間的通信優(yōu)先考慮)
Pipe([duplex]):在進程之間創(chuàng)建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調(diào)一點:必須在產(chǎn)生Process對象之前產(chǎn)生管道。dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發(fā)送。
- conn1.recv():接收conn2.send(obj)發(fā)送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經(jīng)關閉,那么recv方法會拋出EOFError。
- conn1.send(obj):通過連接發(fā)送對象。obj是與序列化兼容的任意對象
- conn1.close():關閉連接。如果conn1被垃圾回收,將自動調(diào)用此方法
- conn1.fileno():返回連接使用的整數(shù)文件描述符
- conn1.poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長時限。如果省略此參數(shù),方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數(shù)據(jù)到達。
- conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進入的消息,超過了這個最大值,將引發(fā)IOError異常,并且在連接上無法進行進一步讀取。如果連接的另外一端已經(jīng)關閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。
- conn.send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進行接收
- conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對象中,該對象支持可寫入的緩沖區(qū)接口(即bytearray對象或類似的對象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。
示例代碼:
import time
from multiprocessing import Process, Queue, Pool, Manager, Pipe
def producer(pipe):
pipe.send("a")
time.sleep(3)
print(pipe.recv())
def consumer(pipe):
time.sleep(2)
data = pipe.recv()
pipe.send("b")
print(data)
if __name__ == "__main__":
# Pipe實現(xiàn)兩進程間通信
s_pipe, r_pipe = Pipe()
pool = Pool()
pool.apply_async(producer, args=(s_pipe, ))
pool.apply_async(consumer, args=(r_pipe, ))
pool.close()
pool.join()運行結果:

示例代碼:
from multiprocessing import Process, Pipe
def f(conn):
conn.send('主進程,你好呀!') # 發(fā)送數(shù)據(jù)給主進程
print('子進程收到主進程發(fā)來的數(shù)據(jù):{}'.format(conn.recv()))
conn.close() # 關閉
if __name__ == '__main__':
# Pipe是一個函數(shù),返回的是一個元組
parent_conn, child_conn = Pipe()
# 創(chuàng)建一個子進程
p = Process(target=f, args=(child_conn,))
p.start()
print("主進程收到子進程發(fā)來的數(shù)據(jù):{}".format(parent_conn.recv()))
parent_conn.send('子進程,你好?。?) # 發(fā)送數(shù)據(jù)給子進程
p.join()運行結果:

9、進程之間的同步控制
9.1 進程鎖:Lock()
當多個進程對同一資源進行IO操作時,需要對資源“上鎖”,否則會出現(xiàn)意外結果。上鎖之后,同一件就只能有一個進程運行上鎖的代碼塊。例如有一個txt文件,里面內(nèi)容是一個數(shù)字10,我們要用多進程去讀取這個文件的值,然后每讀一次,讓txt中的這個數(shù)字減1,不加鎖時代碼如下:
import time
import os
from multiprocessing import Process
from multiprocessing import Lock
def func():
if os.path.exists('num.txt'):
with open('num.txt', 'r') as rf:
num = int(rf.read())
num -= 1
time.sleep(1)
with open('num.txt', 'w') as wf:
wf.write(str(num))
else:
with open('num.txt', 'w') as wf:
wf.write('10')
if __name__ == '__main__':
print("主進程開始運行……")
p_list = []
for i in range(10):
p = Process(target=func)
p_list.append(p)
p.start()
for p in p_list:
p.join()
with open('num.txt', 'r') as f:
num = int(f.read())
print('最后結果為:{}'.format(num))
print("主進程結束運行……" )運行結果:

雖然用了10個進程讀取并修改txt文件,但最后的值卻不是1。這正是多進程共同訪問資源造成混亂造成的。要達到預期結果,就要給資源上鎖:
import time
import os
from multiprocessing import Process
from multiprocessing import Lock
def func(lock):
if os.path.exists('num.txt'):
lock.acquire()
with open('num.txt', 'r') as f:
num = int(f.read())
num -= 1
time.sleep(1)
with open('num.txt', 'w') as f:
f.write(str(num))
lock.release()
else:
with open('num.txt', 'w') as f:
f.write('10')
if __name__ == '__main__':
print("主進程開始運行……")
lock = Lock()
p_list = []
for i in range(10):
p = Process(target=func, args=(lock,))
p_list.append(p)
p.start()
for p in p_list:
p.join()
with open('num.txt', 'r') as f:
num = int(f.read())
print('最后結果為:{}'.format(num))
print("主進程結束運行……")運行結果:

果然,用了進程鎖之后獲得了預料中的結果。但是,如果你運行了上面兩塊代碼你就會發(fā)現(xiàn),加了鎖之后,程序明顯變慢了很多,因為程序成了串行的了,當然好處是數(shù)據(jù)安全有保證。
9.2 信號量:Semaphore
鎖同時只允許一個線程更改數(shù)據(jù),而信號量是同時允許一定數(shù)量的進程更改數(shù)據(jù) 。假如有一下應用場景:有10個人吃飯,但只有一張餐桌,只允許做3個人,沒上桌的人不允許吃飯,已上桌吃完飯離座之后,下面的人才能搶占桌子繼續(xù)吃飯,如果不用信號量,肯定是10人一窩蜂一起吃飯:
from multiprocessing import Process
import time
import random
def fun(i):
print('{}號顧客上座,開始吃飯'.format(i))
time.sleep(random.randint(3, 8))
print('{}號顧客吃完飯了,離座'.format(i))
if __name__ == '__main__':
for i in range(10):
p = Process(target=fun, args=(i,))
p.start()運行結果:

用了信號量,實現(xiàn)了輪流吃飯,每次只有3個人吃飯:
示例代碼:
from multiprocessing import Process
import time
import random
from multiprocessing import Semaphore
def fun(i , sem):
sem.acquire()
print('{}號顧客上座,開始吃飯'.format(i))
time.sleep(random.randint(3, 8))
print('{}號顧客吃完飯了,離座'.format(i))
sem.release()
if __name__ == '__main__':
sem = Semaphore(3)
for i in range(10):
p = Process(target=fun, args=(i,sem))
p.start()運行結果:

事實上,Semaphore的作用也類似于鎖,只不過在鎖機制上添加了一個計數(shù)器,允許多個人擁有“鑰匙”。
9.3 事件:Event
python進程的事件用于主進程控制其他子進程的執(zhí)行,Event類有如下幾個主要方法:
1)wait() 插入在進程中插入一個標記(flag)默認為 False,當 flag為False時,程序會停止運行進入阻塞狀態(tài);
2)set() 使flag為True,程序會進入非阻塞狀態(tài)
3)clear() 使flag為False,程序會停止運行,進入阻塞狀態(tài)
4)is_set() 判斷flag 是否為True,是的話返回True,不是則返回False
有如下需求:獲取當前時間的秒數(shù)的個位數(shù),如果小于5,則設置子進程阻塞,如果大于5則設置子進程非阻塞。代碼如下:
from multiprocessing import Event, Processimport timefrom datetime import datetimedef func(e): print('子進程:開始運行……') while True: print('子進程:現(xiàn)在事件秒數(shù)是{}'.format(datetime.now().second)) e.wait() # 阻塞等待信號 這里插入了一個flag 默認為 False time.sleep(1)if __name__ == '__main__': e = Event() p = Process(target=func, args=(e,)) p.start() for i in range(10): s = int(str(datetime.now().second)[-1]) # 獲取當前秒數(shù)的個位數(shù) if s < 5: print('子進程進入阻塞狀態(tài)') e.clear() # 使插入的flag為False 進程進入阻塞狀態(tài) else: print('子進程取消阻塞狀態(tài)') e.set() # 使插入的flag為True,進程進入非阻塞狀態(tài) time.sleep(1) e.set() time.sleep(3) p.terminate() print("主進程運行結束……")運行結果:

10、內(nèi)置線程池
示例代碼:
import time
import os
import random
from multiprocessing.pool import ThreadPool
def task():
print(f'開始執(zhí)行任務:{os.getpid()}')
time.sleep(random.randint(0, 5))
print(f"執(zhí)行任務結束:{os.getpid()}")
if __name__ == '__main__':
pool = ThreadPool(2)
for i in range(5):
pool.apply_async(task)
pool.close()
pool.join()運行結果:

到此這篇關于關于python多進程中的常用方法詳解的文章就介紹到這了,更多相關python多進程常用方法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
python基于socket進行端口轉發(fā)實現(xiàn)后門隱藏的示例
今天小編就為大家分享一篇python基于socket進行端口轉發(fā)實現(xiàn)后門隱藏的示例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-07-07
Python實現(xiàn)快速提取Word表格并轉Markdown
這篇文章主要為大家詳細介紹了一套Python零基礎可操作的代碼方案,幫助測試工程師3分鐘內(nèi)完成表格提取與轉換,直接對接自動化測試或大模型,需要的小伙伴可以參考下2025-04-04
python函數(shù)也可以是一個對象,可以存放在列表中并調(diào)用方式
這篇文章主要介紹了python函數(shù)也可以是一個對象,可以存放在列表中并調(diào)用方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02
Python簡單調(diào)用MySQL存儲過程并獲得返回值的方法
這篇文章主要介紹了Python調(diào)用MySQL存儲過程并獲得返回值的方法,涉及Python操作MySQL存儲過程的使用技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-07-07
Python辦公自動化之JSOM數(shù)據(jù)處理與SQL Server數(shù)據(jù)庫操作
這篇文章主要為大家詳細介紹了Python辦公自動化中JSOM數(shù)據(jù)處理與SQL Server數(shù)據(jù)庫操作的相關知識,文中的示例代碼簡潔易懂,有需要的小伙伴可以參考下2024-01-01

