欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python multiprocessing 進(jìn)程間通信方式實現(xiàn)

 更新時間:2023年02月20日 09:14:03   作者:__彎弓__  
本文主要介紹了Python multiprocessing 進(jìn)程間通信方式實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

1、為什么要掌握進(jìn)程間通信

python的多線程代碼效率由于受制于GIL,不能利用多核CPU來加速,而多進(jìn)程方式可以繞過GIL, 發(fā)揮多CPU加速的優(yōu)勢,能夠明顯提高程序的性能

但進(jìn)程間通信卻是不得不考慮的問題。 進(jìn)程不同于線程,進(jìn)程有自己的獨立內(nèi)存空間,不能使用全局變量在進(jìn)程間傳遞數(shù)據(jù)。

在這里插入圖片描述

實際項目需求中,常常存在密集計算、或?qū)崟r性任務(wù),進(jìn)程之間有時需要傳遞大量數(shù)據(jù),如圖片、大對象等,傳遞數(shù)據(jù)如果通過文件序列化、或網(wǎng)絡(luò)接口來進(jìn)行,難以滿足實時性要求,采用redis,或者kaffka, rabbitMQ 之第3方消息隊列包,又使系統(tǒng)復(fù)雜化了。

Python multiprocessing 模塊本身就提供了消息機(jī)制、同步機(jī)制、共享內(nèi)存等各種非常高效的進(jìn)程間通信方式

了解并掌握 python 進(jìn)程間通信的各類方式的使用,以及安全機(jī)制,可以幫助大幅提升程序運行性能。

2、進(jìn)程間各類通信方式簡介

進(jìn)程間通信的主要方式總結(jié)如下

在這里插入圖片描述

關(guān)于進(jìn)程間通信的內(nèi)存安全
內(nèi)存安全意味著,多進(jìn)程間可能會因同搶,意外銷毀等原因造成共享變量異常。
Multiprocessing 模塊提供的Queue, Pipe, Lock, Event 對象,都已實現(xiàn)了進(jìn)程間通信安全機(jī)制。
采用共享內(nèi)存方式通信,需要在代碼中自已來跟蹤、銷毀這些共享內(nèi)存變量,否則可能會出同搶、未正常銷毀等。造成系統(tǒng)異常。 除非開發(fā)者很清楚共享內(nèi)存使用特點,否則不建議直接使用此共享內(nèi)存,而是通過Manager管理器來使用共享內(nèi)存。

內(nèi)存管理器Manager
Multiprocessing提供了內(nèi)存管理器Manager類,可統(tǒng)一解決進(jìn)程通信的內(nèi)存安全問題,可以將各種共享數(shù)據(jù)加入管理器,包括 list, dict, Queue, Lock, Event, Shared Memory 等,由其統(tǒng)一跟蹤與銷毀。

3、消息機(jī)制通信

1) 管道 Pipe 通信方式

類似于1上簡單的socket通道,雙端均可收發(fā)消息。
Pipe 對象的構(gòu)建方法:

parent_conn, child_conn = Pipe(duplex=True/False) 

參數(shù)說明

  • duplex=True, 管道為雙向通信
  • duplex=False, 管道為單向通信,只有child_conn可以發(fā)消息,parent_conn只能接收。

示例代碼:

from multiprocessing import Process, Pipe
   def myfunction(conn):
      conn.send(['hi!! I am Python'])
      conn.close()

if __name__ == '__main__':
      parent_conn, child_conn = Pipe()
      p = Process(target=myfunction, args=(child_conn,))
      p.start()
  	print (parent_conn.recv() )
	p.join()

2) 消息隊列Queue 通信方式

Multiprocessing 的Queue 類,是在python queue 3.0版本上修改的, 可以很容易實現(xiàn)生產(chǎn)者 – 消息者間傳遞數(shù)據(jù),而且Multiprocessing的Queue 模塊實現(xiàn)了lock安全機(jī)制。

在這里插入圖片描述

Queue模塊共提供了3種類型的隊列。

(1) FIFO queue , 先進(jìn)先出,

class queue.Queue(maxsize=0)

(2) LIFO queue, 后進(jìn)先出, 實際上就是堆棧

class queue.LifoQueue(maxsize=0)

(3) 帶優(yōu)先級隊列, 優(yōu)先級最低entry value lowest 先了列

class queue.PriorityQueue(maxsize=0)

Multiprocessing.Queue類的主要方法:

methodDescription
queue.qsize()返回隊列長度
queue.full()隊列滿,返回 True, 否則返回False
queue.empty()隊列空,返回 True, 否則返回False
queue.put(item)將數(shù)據(jù)寫入隊列
queue.get()將數(shù)據(jù)拋出隊列 ,
queue.put_nowait(item), queue.get_nowait()無等待寫入或拋出

說明:

  • put(), get() 是阻塞方法, 而put_notwait(), get_nowait()是非阻塞方法。
  • Multiprocessing 的Queue類沒有提供Task_done, join方法

Queue模塊的其它隊列類:
(1) SimpleQueue
簡潔版的FIFO隊列, 適事簡單場景使用

(2) JoinableQueue子類
Python 3.5 后新增的 Queue的子類,擁有 task_done(), join() 方法

  • task_done()表示,最近讀出的1個任務(wù)已經(jīng)完成。
  • join()阻塞隊列,直到queue中的所有任務(wù)都已完成。

producer – consumer 場景,使用Queue的示例

import multiprocessing

def producer(numbers, q):
    for x in numbers:
        if x % 2 == 0:
            if q.full():
                print("queue is full")
                break
            q.put(x)
            print(f"put {x} in queue by producer")
    return None

def consumer(q):
    while not q.empty():
        print(f"take data {q.get()} from queue by consumer")
    return None

if __name__ == "__main__":
    # 設(shè)置1個queue對象,最大長度為5
    qu = multiprocessing.Queue(maxsize=5,) 

    # 創(chuàng)建producer子進(jìn)程,把queue做為其中1個參數(shù)傳給它,該進(jìn)程負(fù)責(zé)寫
    p5 = multiprocessing.Process(
        name="producer-1",
        target=producer,
        args=([random.randint(1, 100) for i in range(0, 10)], qu)
    )
    p5.start()
    p5.join()
    #創(chuàng)建consumer子進(jìn)程,把queue做為1個參數(shù)傳給它,該進(jìn)程中隊列中讀
    p6 = multiprocessing.Process(
        name="consumer-1",
        target=consumer,
        args=(qu,)
    )
    p6.start()
    p6.join()

    print(qu.qsize())

4、同步機(jī)制通信

(1) 進(jìn)程間同步鎖 – Lock

Multiprocessing也提供了與threading 類似的同步鎖機(jī)制,確保某個時刻只有1個子進(jìn)程可以訪問某個資源或執(zhí)行某項任務(wù), 以避免同搶。

例如:多個子進(jìn)程同時訪問數(shù)據(jù)庫表時,如果沒有同步鎖,用戶A修改1條數(shù)據(jù)后,還未提交,此時,用戶B也進(jìn)行了修改,可以預(yù)見,用戶A提交的將是B個修改的數(shù)據(jù)。

添加了同步鎖,可以確保同時只有1個子進(jìn)程能夠進(jìn)行寫入數(shù)據(jù)庫與提交操作。

如下面的示例,同時只有1個進(jìn)程可以執(zhí)行打印操作。

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

(2) 子進(jìn)程間協(xié)調(diào)機(jī)制 – Event

Event 機(jī)制的工作原理:

1個event 對象實例管理著1個 flag標(biāo)記, 可以用set()方法將其置為true, 用clear()方法將其置為false, 使用wait()將阻塞當(dāng)前子進(jìn)程,直至flag被置為true.
這樣由1個進(jìn)程通過event flag 就可以控制、協(xié)調(diào)各子進(jìn)程運行。

Event object的使用方法:
1)主函數(shù): 創(chuàng)建1個event 對象, flag = multiprocessing.Event() , 做為參數(shù)傳給各子進(jìn)程
2) 子進(jìn)程A: 不受event影響,通過event 控制其它進(jìn)程的運行
o 先clear(),將event 置為False, 占用運行權(quán).
o 完成工作后,用set()把flag置為True。
3) 子進(jìn)程B, C: 受event 影響
o 設(shè)置 wait() 狀態(tài),暫停運行
o 直到flag重新變?yōu)門rue,恢復(fù)運行

主要方法:

  • set(), clear()設(shè)置 True/False,
  • wait() 使進(jìn)程等待,直到flag被改為true.
  • is_set(), Return True if and only if the internal flag is true.

驗證進(jìn)程間通信 – Event

import multiprocessing
import time
import random

def joo_a(q, ev):
    print("subprocess joo_a start")
    if not ev.is_set():
        ev.wait()
    q.put(random.randint(1, 100))
    print("subprocess joo_a ended")

def joo_b(q, ev):
    print("subprocess joo_b start")
    ev.clear()
    time.sleep(2)
    q.put(random.randint(200, 300))
    ev.set()
    print("subprocess joo_b ended")

def main_event():
    qu = multiprocessing.Queue()
    ev = multiprocessing.Event()
    sub_a = multiprocessing.Process(target=joo_a, args=(qu, ev))
    sub_b = multiprocessing.Process(target=joo_b, args=(qu, ev,))
    sub_a.start()
    sub_b.start()
    # ev.set()
    sub_a.join()
    sub_b.join()
    while not qu.empty():
        print(qu.get())

if __name__ == "__main__":
    main_event()

5、共享內(nèi)存方式通信

(1) 共享變量

子進(jìn)程之間共存內(nèi)存變量,要用 multiprocessing.Value(), Array() 來定義變量。 實際上是ctypes 類型,由multiprocessing.sharedctypes模塊提供相關(guān)功能

注意 使用 share memory 要考慮同搶等問題,釋放等問題,需要手工實現(xiàn)。因此在使用共享變量時,建議使用Manager管程來管理這些共享變量。

def  func(num):
    num.value=10.78   #子進(jìn)程改變數(shù)值的值,主進(jìn)程跟著改變
 
if  __name__=="__main__":
num = multiprocessing.Value("d", 10.0) 
# d表示數(shù)值,主進(jìn)程與子進(jìn)程可共享這個變量。

    p=multiprocessing.Process(target=func,args=(num,))
    p.start()
    p.join()
 
    print(num.value)

進(jìn)程之間共享數(shù)據(jù)(數(shù)組型):

import multiprocessing
 
def  func(num):
    num[2]=9999   #子進(jìn)程改變數(shù)組,主進(jìn)程跟著改變
 
if  __name__=="__main__":
    num=multiprocessing.Array("i",[1,2,3,4,5])   

    p=multiprocessing.Process(target=func,args=(num,))
    p.start() 
    p.join()
 
    print(num[:])

(2) 共享內(nèi)存 Shared_memory

如果進(jìn)程間需要共享對象數(shù)據(jù),或共享內(nèi)容,數(shù)據(jù)較大,multiprocessing 提供了SharedMemory類來實現(xiàn)進(jìn)程間實時通信,不需要通過發(fā)消息,讀寫磁盤文件來實現(xiàn),速度更快。
注意:直接使用SharedMemory 存在著同搶、泄露隱患,應(yīng)通過SharedMemory Manager 管程類來使用, 以確保內(nèi)存安全。

創(chuàng)建共享內(nèi)存區(qū):

multiprocessing.shared_memory.SharedMemory(name=none, create=False, size=0)

方法:
父進(jìn)程創(chuàng)建shared_memory 后,子進(jìn)程可以使用它,當(dāng)不再需要后,使用close(), 刪除使用unlink()方法
相關(guān)屬性:
獲取內(nèi)存區(qū)內(nèi)容: shm.buf
獲取內(nèi)存區(qū)名稱: shm.name
獲取內(nèi)存區(qū)字節(jié)數(shù): shm.size

示例:

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

3) ShareableList 共享列表

sharedMemory類還提供了1個共享列表類型,這樣就更方便了,進(jìn)程間可以直接共享python強(qiáng)大的列表
構(gòu)建方法:
multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported


b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

6、共享內(nèi)存管理器Manager

Multiprocessing 提供了 Manager 內(nèi)存管理器類,當(dāng)調(diào)用1個Manager實例對象的start()方法時,會創(chuàng)建1個manager進(jìn)程,其唯一目的就是管理共享內(nèi)存, 避免出現(xiàn)進(jìn)程間共享數(shù)據(jù)不同步,內(nèi)存泄露等現(xiàn)象。

其原理如下:

在這里插入圖片描述

Manager管理器相當(dāng)于提供了1個共享內(nèi)存的服務(wù),不僅可以被主進(jìn)程創(chuàng)建的多個子進(jìn)程使用,還可以被其它進(jìn)程訪問,甚至跨網(wǎng)絡(luò)訪問。本文僅聚焦于由單一主進(jìn)程創(chuàng)建的各進(jìn)程之間的通信。

1) Manager的主要數(shù)據(jù)結(jié)構(gòu)

相關(guān)類:multiprocessing.Manager
子類有:

  • multiprocessing.managers.SharedMemoryManager
  • multiprocessing.managers.BaseManager

支持共享變量類型:

  • python基本類型 int, str, list, tuple, list
  • 進(jìn)程通信對象: Queue, Lock, Event,
  • Condition, Semaphore, Barrier ctypes類型: Value, Array

2) 使用步驟

1)創(chuàng)建管理器對象

snm = Manager()
snm = SharedMemoryManager()

2)創(chuàng)建共享內(nèi)存變量
新建list, dict

sl = snm.list(), snm.dict()

新建1塊bytes共享內(nèi)存變量,需要指定大小

sx = snm.SharedMemory(size)

新建1個共享列表變量,可用列表來初始化

sl = snm.ShareableList(sequence) 如
sl = smm.ShareableList([‘howdy', b'HoWdY', -273.154, 100, True])

新建1個queue, 使用multiprocessing 的Queue類型

snm = Manager()
q = snm.Queue()

示例 :

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

將打印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

3) 銷毀共享內(nèi)存變量

方法一:
調(diào)用snm.shutdown()方法,會自動調(diào)用每個內(nèi)存塊的unlink()方法釋放內(nèi)存。或者 snm.close()
方法二
使用with語句,結(jié)束后會自動釋放所有manager變量

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

4) 向管理器注冊自定義類型

managers的子類BaseManager提供register()方法,支持注冊自定義數(shù)據(jù)類型。如下例,注冊1個自定義MathsClass類,并生成實例。

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))  

7、總結(jié)

Python多進(jìn)程(multiprocessing) 編程是繞開GIL提升程序性能的重要方式,進(jìn)程間通信方式包括消息機(jī)制(pipe, queue)、同步機(jī)制( Lock, Event) 、Shared Memory(Value, Array, Shared_Memory, etc)等。

直接使用Shared Memory共享內(nèi)存是不安全的,Multiprocessing.Manager模塊提供了安全管理共享內(nèi)存變量的管理器功能。

在實際編程時,根據(jù)主進(jìn)程與子進(jìn)程,子進(jìn)程之間所要交換數(shù)據(jù)的類型、大小,頻度、實時性等需求,來選擇適合的通信方式。

到此這篇關(guān)于Python multiprocessing 進(jìn)程間通信方式實現(xiàn)的文章就介紹到這了,更多相關(guān)Python multiprocessing 進(jìn)程間通信內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Windows環(huán)境中Python應(yīng)用服務(wù)自啟動及其監(jiān)控問題

    Windows環(huán)境中Python應(yīng)用服務(wù)自啟動及其監(jiān)控問題

    這篇文章主要介紹了Windows環(huán)境中Python應(yīng)用服務(wù)自啟動及其監(jiān)控問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-06-06
  • python 函數(shù)傳參之傳值還是傳引用的分析

    python 函數(shù)傳參之傳值還是傳引用的分析

    這篇文章主要介紹了python 函數(shù)傳參之傳值還是傳引用的分析,需要的朋友可以參考下
    2017-09-09
  • Python 繪圖和可視化詳細(xì)介紹

    Python 繪圖和可視化詳細(xì)介紹

    這篇文章主要介紹了Python 繪圖和可視化詳細(xì)介紹的相關(guān)資料,需要的朋友可以參考下
    2017-02-02
  • Python 完美解決 Import “模塊“ could not be resolved ...的問題

    Python 完美解決 Import “模塊“ could not&n

    這篇文章主要介紹了Python 完美解決 Import “模塊“ could not be resolved ...,本文給大家分享問題原因及解決方法,需要的朋友可以參考下
    2022-11-11
  • python實現(xiàn)線程池的方法

    python實現(xiàn)線程池的方法

    這篇文章主要介紹了python實現(xiàn)線程池的方法,實例分析了Python線程池的原理與相關(guān)實現(xiàn)技巧,需要的朋友可以參考下
    2015-06-06
  • Scrapy框架實現(xiàn)的登錄網(wǎng)站操作示例

    Scrapy框架實現(xiàn)的登錄網(wǎng)站操作示例

    這篇文章主要介紹了Scrapy框架實現(xiàn)的登錄網(wǎng)站操作,結(jié)合實例形式分析了Scrapy登錄網(wǎng)站cookies方式、post請求方式登錄網(wǎng)站相關(guān)實現(xiàn)技巧,需要的朋友可以參考下
    2020-02-02
  • Python簡單實現(xiàn)網(wǎng)頁內(nèi)容抓取功能示例

    Python簡單實現(xiàn)網(wǎng)頁內(nèi)容抓取功能示例

    這篇文章主要介紹了Python簡單實現(xiàn)網(wǎng)頁內(nèi)容抓取功能,結(jié)合實例形式分析了Python基于urllib模塊的網(wǎng)頁請求、內(nèi)容讀取等相關(guān)操作技巧,需要的朋友可以參考下
    2018-06-06
  • Python3的進(jìn)程和線程你了解嗎

    Python3的進(jìn)程和線程你了解嗎

    這篇文章主要為大家詳細(xì)介紹了Python3進(jìn)程和線程,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-03-03
  • python二維圖制作的實例代碼

    python二維圖制作的實例代碼

    這篇文章主要給大家介紹了關(guān)于python二維圖制作的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • Python SQLAlchemy簡介及基本用法

    Python SQLAlchemy簡介及基本用法

    SQLAlchemy是一個基于Python實現(xiàn)的ORM對象關(guān)系映射框架,該框架建立在DB API之上,使用關(guān)系對象映射進(jìn)行數(shù)據(jù)庫操作,這篇文章主要介紹了SQLAlchemy簡介以及基本使用,需要的朋友可以參考下
    2023-08-08

最新評論