python程序中的線程操作 concurrent模塊使用詳解
一、concurrent模塊的介紹
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor:進程池,提供異步調用
ProcessPoolExecutor 和 ThreadPoolExecutor:兩者都實現(xiàn)相同的接口,該接口由抽象Executor類定義。
二、基本方法
submit(fn, *args, **kwargs) :異步提交任務
map(func, *iterables, timeout=None, chunksize=1) :取代for循環(huán)submit的操作
shutdown(wait=True) :相當于進程池的pool.close()+pool.join()操作
- wait=True,等待池內所有任務執(zhí)行完畢回收完資源后才繼續(xù)
- wait=False,立即返回,并不會等待池內的任務執(zhí)行完畢
- 但不管wait參數(shù)為何值,整個程序都會等到所有任務執(zhí)行完畢
- submit和map必須在shutdown之前
result(timeout=None) :取得結果
add_done_callback(fn) :回調函數(shù)
三、進程池和線程池
池的功能:限制進程數(shù)或線程數(shù).
什么時候限制: 當并發(fā)的任務數(shù)量遠遠大于計算機所能承受的范圍,即無法一次性開啟過多的任務數(shù)量 我就應該考慮去限制我進程數(shù)或線程數(shù),從保證服務器不崩.
3.1 進程池
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process,current_process
import time
def task(i):
print(f'{current_process().name} 在執(zhí)行任務{i}')
time.sleep(1)
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 進程池里又4個進程
for i in range(20): # 20個任務
pool.submit(task,i)# 進程池里當前執(zhí)行的任務i,池子里的4個進程一次一次執(zhí)行任務
3.2 線程池
from concurrent.futures import ThreadPoolExecutor
from threading import Thread,currentThread
import time
def task(i):
print(f'{currentThread().name} 在執(zhí)行任務{i}')
time.sleep(1)
if __name__ == '__main__':
pool = ThreadPoolExecutor(4) # 進程池里又4個線程
for i in range(20): # 20個任務
pool.submit(task,i)# 線程池里當前執(zhí)行的任務i,池子里的4個線程一次一次執(zhí)行任務
四、Map的用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ThreadPoolExecutor(max_workers=3)
# for i in range(20):
# future=executor.submit(task,i)
executor.map(task,range(1,21)) #map取代了for+submit
五、同步和異步
理解為提交任務的兩種方式
同步: 提交了一個任務,必須等任務執(zhí)行完了(拿到返回值),才能執(zhí)行下一行代碼
異步: 提交了一個任務,不要等執(zhí)行完了,可以直接執(zhí)行下一行代碼.
同步:相當于執(zhí)行任務的串行執(zhí)行
異步
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process,current_process
import time
n = 1
def task(i):
global n
print(f'{current_process().name} 在執(zhí)行任務{i}')
time.sleep(1)
n += i
return n
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 進程池里又4個線程
pool_lis = []
for i in range(20): # 20個任務
future = pool.submit(task,i)# 進程池里當前執(zhí)行的任務i,池子里的4個線程一次一次執(zhí)行任務
# print(future.result()) # 這是在等待我執(zhí)行任務得到的結果,如果一直沒有結果,這里會導致我們所有任務編程了串行
# 在這里就引出了下面的pool.shutdown()方法
pool_lis.append(future)
pool.shutdown(wait=True) # 關閉了池的入口,不允許在往里面添加任務了,會等帶所有的任務執(zhí)行完,結束阻塞
for p in pool_lis:
print(p.result())
print(n)# 這里一開始肯定是拿到0的,因為我只是去告訴操作系統(tǒng)執(zhí)行子進程的任務,代碼依然會繼續(xù)往下執(zhí)行
# 可以用join去解決,等待每一個進程結束后,拿到他的結果
六、回調函數(shù)
import time
from threading import Thread,currentThread
from concurrent.futures import ThreadPoolExecutor
def task(i):
print(f'{currentThread().name} 在執(zhí)行{i}')
time.sleep(1)
return i**2
# parse 就是一個回調函數(shù)
def parse(future):
# 處理拿到的結果
print(f'{currentThread().name} 結束了當前任務')
print(future.result())
if __name__ == '__main__':
pool = ThreadPoolExecutor(4)
for i in range(20):
future = pool.submit(task,i)
'''
給當前執(zhí)行的任務綁定了一個函數(shù),在當前任務結束的時候就會觸發(fā)這個函數(shù)(稱之為回調函數(shù))
會把future對象作為參數(shù)傳給函數(shù)
注:這個稱為回調函數(shù),當前任務處理結束了,就回來調parse這個函數(shù)
'''
future.add_done_callback(parse)
# add_done_callback (parse) parse是一個回調函數(shù)
# add_done_callback () 是對象的一個綁定方法,他的參數(shù)就是一個函數(shù)
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
Python入門教程(十七)Python的While循環(huán)
這篇文章主要介紹了Python入門教程(十七)Python的While循環(huán),Python是一門非常強大好用的語言,也有著易上手的特性,本文為入門教程,需要的朋友可以參考下2023-04-04
淺談python中字典append 到list 后值的改變問題
今天小編就為大家分享一篇淺談python中字典append 到list 后值的改變問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-05-05
Python使用?TCP協(xié)議實現(xiàn)智能聊天機器人功能
TCP協(xié)議適用于對效率要求相對較低而準確性要求很高的場合,下面通過本文給大家介紹基于Python?使用?TCP?實現(xiàn)智能聊天機器人,需要的朋友可以參考下2022-05-05

