欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python高級(jí)編程之消息隊(duì)列(Queue)與進(jìn)程池(Pool)實(shí)例詳解

 更新時(shí)間:2019年11月01日 10:04:15   作者:HMMHMH  
這篇文章主要介紹了Python高級(jí)編程之消息隊(duì)列(Queue)與進(jìn)程池(Pool),結(jié)合實(shí)例形式詳細(xì)分析了Python消息隊(duì)列與進(jìn)程池的相關(guān)原理、使用技巧與操作注意事項(xiàng),需要的朋友可以參考下

本文實(shí)例講述了Python高級(jí)編程之消息隊(duì)列(Queue)與進(jìn)程池(Pool)。分享給大家供大家參考,具體如下:

Queue消息隊(duì)列

1.創(chuàng)建

import multiprocessing
queue = multiprocessing.Queue(隊(duì)列長(zhǎng)度)

2.方法

方法 描述
put 變量名.put(數(shù)據(jù)),放入數(shù)據(jù)(如隊(duì)列已滿,則程序進(jìn)入阻塞狀態(tài),等待隊(duì)列取出后再放入)
put_nowait 變量名.put_nowati(數(shù)據(jù)),放入數(shù)據(jù)(如隊(duì)列已滿,則不等待隊(duì)列信息取出后再放入,直接報(bào)錯(cuò))
get 變量名.get(數(shù)據(jù)),取出數(shù)據(jù)(如隊(duì)列為空,則程序進(jìn)入阻塞狀態(tài),等待隊(duì)列防如數(shù)據(jù)后再取出)
get_nowait 變量名.get_nowait(數(shù)據(jù)),取出數(shù)據(jù)(如隊(duì)列為空,則不等待隊(duì)列放入信息后取出數(shù)據(jù),直接報(bào)錯(cuò)),放入數(shù)據(jù)后立馬判斷是否為空有時(shí)為True,原因是放入值和判斷同時(shí)進(jìn)行
qsize 變量名.qsize(),消息數(shù)量
empty 變量名.empty()(返回值為True或False),判斷是否為空
full 變量名.full()(返回值為True或False),判斷是否為滿

3.進(jìn)程通信

因?yàn)檫M(jìn)程間不共享全局變量,所以使用Queue進(jìn)行數(shù)據(jù)通信,可以在父進(jìn)程中創(chuàng)建兩個(gè)字進(jìn)程,一個(gè)往Queue里寫數(shù)據(jù),一個(gè)從Queue里取出數(shù)據(jù)。
例:

import multiprocessing
import time
def write_queue(queue):
  # 循環(huán)寫入數(shù)據(jù)
  for i in range(10):
    if queue.full():
      print("隊(duì)列已滿!")
      break
    # 向隊(duì)列中放入消息
    queue.put(i)
    print(i)
    time.sleep(0.5)
def read_queue(queue):
  # 循環(huán)讀取隊(duì)列消息
  while True:
    # 隊(duì)列為空,停止讀取
    if queue.empty():
      print("---隊(duì)列已空---")
      break
    # 讀取消息并輸出
    result = queue.get()
    print(result)
if __name__ == '__main__':
  # 創(chuàng)建消息隊(duì)列
  queue = multiprocessing.Queue(3)
  # 創(chuàng)建子進(jìn)程
  p1 = multiprocessing.Process(target=write_queue, args=(queue,))
  p1.start()
  # 等待p1寫數(shù)據(jù)進(jìn)程執(zhí)行結(jié)束后,再往下執(zhí)行
  p1.join()
  p1 = multiprocessing.Process(target=read_queue, args=(queue,))
  p1.start()

執(zhí)行結(jié)果:

Pool進(jìn)程池

初始化Pool時(shí),可以指定一個(gè)最大進(jìn)程數(shù),當(dāng)有新的請(qǐng)求提交到Pool中時(shí),如果池還沒有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到指定的最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)用之前的進(jìn)程來執(zhí)行新的任務(wù)。

1.創(chuàng)建

import multiprocessing
pool = multiprocessing.Pool(最大進(jìn)程數(shù))

2.方法

方法 描述
apply() 以同步方式添加進(jìn)程
apply_async() 以異步方式添加進(jìn)程
close() 關(guān)閉Pool,使其不接受新任務(wù)(還可以使用)
terminate() 不管任務(wù)是否完成,立即終止
join() 主進(jìn)程阻塞,等待子進(jìn)程的退出,必須在close和terminate后使用

3.進(jìn)程池內(nèi)通信

創(chuàng)建進(jìn)程池內(nèi)Queue消息隊(duì)列通信

import multiprocessing 
Queue:queue = multiprocessing.Manager().Queue() 

例:

import multiprocessing 
import time

寫入數(shù)據(jù)的方法

def write_data(queue): 
# for循環(huán) 向消息隊(duì)列中寫入值 
for i in range(5): 
# 添加消息 
queue.put(i) 
print(i) 
time.sleep(0.2) 
print("隊(duì)列已滿~")

創(chuàng)建讀取數(shù)據(jù)的方法

def read_data(queue):
  # 循環(huán)讀取數(shù)據(jù)
  while True:
    # 判斷隊(duì)列是否為空
    if queue.qsize() == 0:
      print("隊(duì)列為空~")
      break
    # 從隊(duì)列中讀取數(shù)據(jù)
    result = queue.get()
    print(result)
if __name__ == '__main__':
  # 創(chuàng)建進(jìn)程池
  pool = multiprocessing.Pool(2)
  # 創(chuàng)建進(jìn)程池隊(duì)列
  queue = multiprocessing.Manager().Queue()
  # 在進(jìn)程池中的進(jìn)程間進(jìn)行通信
  # 使用線程池同步的方式,先寫后讀
  # pool.apply(write_data, (queue, ))
  # pool.apply(read_data, (queue, ))
  # apply_async() 返回ApplyResult 對(duì)象
  result = pool.apply_async(write_data, (queue, ))
  # ApplyResult對(duì)象的wait() 方法,表示后續(xù)進(jìn)程必須等待當(dāng)前進(jìn)程執(zhí)行完再繼續(xù)
  result.wait()
  pool.apply_async(read_data, (queue, ))
  pool.close()
  # 異步后,主線程不再等待子進(jìn)程執(zhí)行結(jié)束,再結(jié)束
  # join() 后,表示主線程會(huì)等待子進(jìn)程執(zhí)行結(jié)束后,再結(jié)束
  pool.join()

運(yùn)行結(jié)果:

 

4.案例(文件夾copy器)

代碼:

# 導(dǎo)入模塊
import os
import multiprocessing
# 拷貝文件函數(shù)
def copy_dir(file_name, source_dir, desk_dir):
  # 要拷貝的文件路徑
  source_path = source_dir+'/'+file_name
  # 目標(biāo)路徑
  desk_path = desk_dir+'/'+file_name
  # 獲取文件大小
  file_size = os.path.getsize(source_path)
  # 記錄拷貝次數(shù)
  i = 0
  # 以二進(jìn)制度讀方式打開原文件
  with open(source_path, "rb") as source_file:
    # 以二進(jìn)制寫入方式創(chuàng)建并打開目標(biāo)文件
    with open(desk_path, "wb") as desk_file:
      # 循環(huán)寫入
      while True:
        # 讀取1024字節(jié)
        file_data = source_file.read(1024)
        # 如果讀到的不為空,則將讀到的寫入目標(biāo)文件
        if file_data:
          desk_file.write(file_data)
          # 讀取次數(shù)+1
          i += 1
          # 拷貝百分比進(jìn)度等于拷貝次數(shù)*1024*100/文件大小
          n = i*102400/file_size
          if n >= 100:
            n = 100
          print(file_name, "拷貝進(jìn)度%.2f%%" % n)
        else:
          print(file_name, "拷貝成功")
          break
if __name__ == '__main__':
  # 要拷貝的文件夾
  source_dir = 'test'
  # 要拷貝到的路徑
  desk_dir = 'C:/Users/Administrator/Desktop/'+source_dir
  # 存在文件夾則不創(chuàng)建
  try:
    os.mkdir(desk_dir)
  except:
    print("目標(biāo)文件夾已存在,未創(chuàng)建")
  # 獲取文件夾內(nèi)文件目錄,存到列表里
  file_list = os.listdir(source_dir)
  print(file_list)
  # 創(chuàng)建進(jìn)程池,最多同時(shí)運(yùn)行3個(gè)子進(jìn)程
  pool = multiprocessing.Pool(3)
  for file_name in file_list:
    # 異步方式添加到進(jìn)程池內(nèi)
    pool.apply_async(copy_dir, args=(file_name, source_dir, desk_dir))
  # 關(guān)閉進(jìn)程池(停止添加,已添加的還可運(yùn)行)
  pool.close()
  # 讓主進(jìn)程阻塞,等待子進(jìn)程結(jié)束
  pool.join()

運(yùn)行結(jié)果:

更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Python進(jìn)程與線程操作技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門與進(jìn)階經(jīng)典教程》、《Python+MySQL數(shù)據(jù)庫(kù)程序設(shè)計(jì)入門教程》及《Python常見數(shù)據(jù)庫(kù)操作技巧匯總

希望本文所述對(duì)大家Python程序設(shè)計(jì)有所幫助。

相關(guān)文章

最新評(píng)論