欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python中的線程操作模塊(oncurrent)

 更新時(shí)間:2022年05月30日 14:26:46   作者:springsnow  
這篇文章介紹了Python中的線程操作模塊(oncurrent),文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

進(jìn)程是cpu資源分配的最小單元,一個(gè)進(jìn)程中可以有多個(gè)線程。

線程是cpu計(jì)算的最小單元。

對(duì)于Python來(lái)說(shuō)他的進(jìn)程和線程和其他語(yǔ)言有差異,是有GIL鎖。

GIL鎖

GIL鎖保證一個(gè)進(jìn)程中同一時(shí)刻只有一個(gè)線程被cpu調(diào)度。

GIL鎖,全局解釋器鎖。用于限制一個(gè)進(jìn)程中同一時(shí)刻只有一個(gè)線程被cpu調(diào)度。
擴(kuò)展:默認(rèn)GIL鎖在執(zhí)行100個(gè)cpu指令(過(guò)期時(shí)間)。
查看GIL切換的指令個(gè)數(shù)

import sys
v1 = sys。getcheckinterval()
print(v1)

一、通過(guò)threading.Thread類創(chuàng)建線程

1、 創(chuàng)建線程的方式:直接使用Thread

from threading import Thread 
import time 

def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('nick',))
    t.start()
    print('主線程')

2、 創(chuàng)建線程的方式:繼承Thread

from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('nick')
    t.start()
    print('主線程')

二、多線程與多進(jìn)程

1、 pid的比較

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    # part1:在主進(jìn)程下開啟多個(gè)線程,每個(gè)線程都跟主進(jìn)程的pid一樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主線程/主進(jìn)程pid',os.getpid())

    # part2:開多個(gè)進(jìn)程,每個(gè)進(jìn)程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主線程/主進(jìn)程pid',os.getpid())

2、 開啟效率的較量

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    # 在主進(jìn)程下開啟線程
    t=thread(target=work)
    t.start()
    print('主線程/主進(jìn)程')
    '''
    打印結(jié)果:
    hello
    主線程/主進(jìn)程
    '''

    # 在主進(jìn)程下開啟子進(jìn)程
    t=Process(target=work)
    t.start()
    print('主線程/主進(jìn)程')
    '''
    打印結(jié)果:
    主線程/主進(jìn)程
    hello
    '''

3、 內(nèi)存數(shù)據(jù)的共享問(wèn)題

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) # 毫無(wú)疑問(wèn)子進(jìn)程p已經(jīng)將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進(jìn)程的n仍然為100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('主',n) # 查看結(jié)果為0,因?yàn)橥贿M(jìn)程內(nèi)的線程之間共享進(jìn)程內(nèi)的數(shù)據(jù)

三、Thread類的其他方法

Thread實(shí)例對(duì)象的方法:

  • isAlive():返回線程是否活動(dòng)的。
  • getName():返回線程名。
  • setName():設(shè)置線程名。

threading模塊提供的一些方法:

  • threading.currentThread():返回當(dāng)前的線程變量。
  • threading.enumerate():返回一個(gè)包含正在運(yùn)行的線程的list。正在運(yùn)行指線程啟動(dòng)后、結(jié)束前,不包括啟動(dòng)前和終止后的線程。
  • threading.activeCount():返回正在運(yùn)行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。

1、 代碼示例

from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    # 在主進(jìn)程下開啟線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread())
 # 主線程
    print(threading.enumerate())
 # 連同主線程在內(nèi)有兩個(gè)運(yùn)行的線程
    print(threading.active_count())
    print('主線程/主進(jìn)程')

    '''
    打印結(jié)果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主線程/主進(jìn)程
    Thread-1
    '''

2、 join方法

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('nick',))
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())
'''
    nick say hello
    主線程
    False
    '''

四、多線程實(shí)現(xiàn)socket

import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()

        p=threading.Thread(target=action,args=(conn,))
        p.start()

五、守護(hù)線程

無(wú)論是進(jìn)程還是線程,都遵循:守護(hù)xx會(huì)等待主xx運(yùn)行完畢后被銷毀。需要強(qiáng)調(diào)的是:運(yùn)行完畢并非終止運(yùn)行。

  • 對(duì)主進(jìn)程來(lái)說(shuō),運(yùn)行完畢指的是主進(jìn)程代碼運(yùn)行完畢
  • 對(duì)主線程來(lái)說(shuō),運(yùn)行完畢指的是主線程所在的進(jìn)程內(nèi)所有非守護(hù)線程統(tǒng)統(tǒng)運(yùn)行完畢,主線程才算運(yùn)行完畢

1、 詳細(xì)解釋

  • 主進(jìn)程在其代碼結(jié)束后就已經(jīng)算運(yùn)行完畢了(守護(hù)進(jìn)程在此時(shí)就被回收),然后主進(jìn)程會(huì)一直等非守護(hù)的子進(jìn)程都運(yùn)行完畢后回收子進(jìn)程的資源(否則會(huì)產(chǎn)生僵尸進(jìn)程),才會(huì)結(jié)束。

  • 主線程在其他非守護(hù)線程運(yùn)行完畢后才算運(yùn)行完畢(守護(hù)線程在此時(shí)就被回收)。因?yàn)橹骶€程的結(jié)束意味著進(jìn)程的結(jié)束,進(jìn)程整體的資源都將被回收,而進(jìn)程必須保證非守護(hù)線程都運(yùn)行完畢后才能結(jié)束。

2、 守護(hù)線程例

from threading import Thread
import time


def foo():
    print(123)
    time.sleep(10)
    print("end123")


def bar():
    print(456)
    time.sleep(10)
    print("end456")


t1 = Thread(target=foo)
t2 = Thread(target=bar)

t1.daemon= True #必須在t.start()之前設(shè)置
# t1.setDaemon(True)

t1.start()
t2.start()
print("main-------")
print(t1.is_alive())
# 123
# 456
# main-------
# end456

六、同步鎖

1、 多個(gè)線程搶占資源的情況

from threading import Thread
import os,time
def work():
    

global n temp=n 
time.sleep(0.1) 
n=temp-1 

if __name__ == '__main__':
    n=100


    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結(jié)果可能為99

2、同步鎖的引用

對(duì)公共數(shù)據(jù)的操作

import threading

R=threading.Lock()
R.acquire()
'''
對(duì)公共數(shù)據(jù)的操作
'''
R.release()

3、實(shí)例

不加鎖:并發(fā)執(zhí)行,速度快,數(shù)據(jù)不安全

from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''

加鎖:未加鎖部分并發(fā)執(zhí)行,加鎖部分串行執(zhí)行,速度慢,數(shù)據(jù)安全

from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼并發(fā)運(yùn)行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的代碼串行運(yùn)行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

七、死鎖與遞歸鎖

所謂死鎖:是指兩個(gè)或兩個(gè)以上的進(jìn)程或線程在執(zhí)行過(guò)程中,因爭(zhēng)奪資源而造成的一種互相等待的現(xiàn)象,若無(wú)外力作用,它們都將無(wú)法推進(jìn)下去。此時(shí)稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖,這些永遠(yuǎn)在互相等待的進(jìn)程稱為死鎖進(jìn)程,如下就是死鎖

1、 死鎖

from threading import Lock as Lock
import time

mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)

mutexA.release()
mutexA.release()

解決方法:遞歸鎖,在Python中為了支持在同一線程中多次請(qǐng)求同一資源,python提供了可重入鎖RLock。

2、 遞歸鎖(可重入鎖)RLock

這個(gè)RLock內(nèi)部維護(hù)著一個(gè)Lock和一個(gè)counter變量,counter記錄了acquire的次數(shù),從而使得資源可以被多次require。直到一個(gè)線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會(huì)發(fā)生死鎖。

from threading import RLock as Lock
import time

mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

3、典型問(wèn)題:科學(xué)家吃面

遞歸鎖解決死鎖問(wèn)題

import time
from threading import Thread,RLock

fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了面條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃面'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了面條' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','nick','tank']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

八、線程隊(duì)列

queue隊(duì)列:使用import queue,用法與進(jìn)程Queue一樣

當(dāng)必須在多個(gè)線程之間安全地交換信息時(shí),隊(duì)列在線程編程中特別有用。

1、先進(jìn)先出:Queue

通過(guò)雙向列表實(shí)現(xiàn)的

class queue.Queue(maxsize=0)

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結(jié)果(先進(jìn)先出):
first
second
third
'''

2、后進(jìn)先出:LifoQueue

通過(guò)堆實(shí)現(xiàn)

class queue.LifoQueue(maxsize=0)

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結(jié)果(后進(jìn)先出):
third
second
first
'''

3、存儲(chǔ)數(shù)據(jù)時(shí)可設(shè)置優(yōu)先級(jí)的隊(duì)列:PriorityQueue

PriorityQueue類和LifoQueue類繼承Queue類然后重寫了_init、_qsize、_put、_get這四個(gè)類的私有方法.

通過(guò)list來(lái)實(shí)現(xiàn)的。

class queue.PriorityQueue(maxsize=0)

優(yōu)先隊(duì)列的構(gòu)造函數(shù)。maxsize是一個(gè)整數(shù),它設(shè)置可以放置在隊(duì)列中的項(xiàng)數(shù)的上限。一旦達(dá)到此大小,插入將阻塞,直到隊(duì)列項(xiàng)被使用。如果maxsize小于或等于0,則隊(duì)列大小為無(wú)窮大。

import queue

q=queue.PriorityQueue()
#put進(jìn)入一個(gè)元組,元組的第一個(gè)元素是優(yōu)先級(jí)(通常是數(shù)字,也可以是非數(shù)字之間的比較),數(shù)字越小優(yōu)先級(jí)越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結(jié)果(數(shù)字越小優(yōu)先級(jí)越高,優(yōu)先級(jí)高的優(yōu)先出隊(duì)):
(10, 'b')
(20, 'a')
(30, 'c')
'''

更多方法說(shuō)明

  • __init__(self, maxsize=0) :初始化隊(duì)列長(zhǎng)度,maxsize為0的時(shí)候長(zhǎng)度為無(wú)限
  • empty(self) :返回隊(duì)列是否為空
  • full(self) :返回隊(duì)列是否為滿
  • qsize(self) :返回隊(duì)列的大小(并不可靠)
  • get(self, block=True, timeout=None) :從隊(duì)頭獲取并刪除元素,block為true:timeout為None時(shí)候,阻塞當(dāng)前線程直到隊(duì)列中有可用元素;timeout為非負(fù)時(shí)候,等了timeout的時(shí)間還沒(méi)有可用元素時(shí)候拋出一個(gè)Empty異常;block為false:timeout為None時(shí)候,隊(duì)列為空則拋出Empty異常;timeout為非負(fù)時(shí)候,等待timeout時(shí)候后沒(méi)有可用元素則拋出Empty異常。
  • get_nowait(self) :#返回self.get(block=False)
  • put(self, item, block=True, timeout=None): 在隊(duì)尾插入一個(gè)元素,block為true:timeout為None時(shí)候,阻塞當(dāng)前線程直到隊(duì)列中有可用位置;timeout為非負(fù)時(shí)候,等了timeout時(shí)間還沒(méi)有可用位置時(shí)候拋出一個(gè)Full異常;block為false:timeout為None時(shí)候,隊(duì)列沒(méi)有位置則拋出Full異常;timeout為非負(fù)時(shí)候,等待timeout時(shí)候后還是沒(méi)有可用位置則拋出Full異常。
  • put_nowait(self, item) :返回 self.put(item, block=False)
  • join(self) :阻塞當(dāng)前線程直到隊(duì)列的任務(wù)全部完成了
  • task_done(self) :通知隊(duì)列任務(wù)的完成情況,當(dāng)完成時(shí)候喚醒被join阻塞的線程

九、Python標(biāo)準(zhǔn)模塊——concurrent.futures

官方文檔:https://docs.python.org/dev/library/concurrent.futures.html

1、介紹

concurrent.futures模塊提供了高度封裝的異步調(diào)用接口:

  • ThreadPoolExecutor:線程池,提供異步調(diào)用
  • ProcessPoolExecutor:進(jìn)程池,提供異步調(diào)用

兩者都實(shí)現(xiàn)了由抽象Executor類定義的相同接口。

ThreadPoolExecutor(線程池)與ProcessPoolExecutor(進(jìn)程池)都是concurrent.futures模塊下的,主線程(或進(jìn)程)中可以獲取某一個(gè)線程(進(jìn)程)執(zhí)行的狀態(tài)或者某一個(gè)任務(wù)執(zhí)行的狀態(tài)及返回值。

通過(guò)submit返回的是一個(gè)future對(duì)象,它是一個(gè)未來(lái)可期的對(duì)象,通過(guò)它可以獲悉線程的狀態(tài)。

比較:

  • 1、線程不是越多越好,會(huì)涉及cpu上下文的切換(會(huì)把上一次的記錄保存)。
  • 2、進(jìn)程比線程消耗資源,進(jìn)程相當(dāng)于一個(gè)工廠,工廠里有很多人,里面的人共同享受著福利資源,,一個(gè)進(jìn)程里默認(rèn)只有一個(gè)主線程,比如:開啟程序是進(jìn)程,里面執(zhí)行的是線程,線程只是一個(gè)進(jìn)程創(chuàng)建多個(gè)人同時(shí)去工作。
  • 3、線程里有GIL全局解鎖器:不允許cpu調(diào)度
  • 4、計(jì)算密度型適用于多進(jìn)程
  • 5、線程:線程是計(jì)算機(jī)中工作的最小單元
  • 6、進(jìn)程:默認(rèn)有主線程 (幫工作)可以多線程共存
  • 7、協(xié)程:一個(gè)線程,一個(gè)進(jìn)程做多個(gè)任務(wù),使用進(jìn)程中一個(gè)線程去做多個(gè)任務(wù),微線程
  • 8、GIL全局解釋器鎖:保證同一時(shí)刻只有一個(gè)線程被cpu調(diào)度

2、基本方法

  • submit(fn, *args, **kwargs):異步提交任務(wù)
  • map(func, *iterables, timeout=None, chunksize=1):取代for循環(huán)submit的操作
  • shutdown(wait=True):相當(dāng)于進(jìn)程池的pool.close()+pool.join()操作
    wait=True,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù) ,
    wait=False,立即返回,并不會(huì)等待池內(nèi)的任務(wù)執(zhí)行完畢 ,
    但不管wait參數(shù)為何值,整個(gè)程序都會(huì)等到所有任務(wù)執(zhí)行完畢 ,submit和map必須在shutdown之前。
  • result(timeout=None):取得結(jié)果
  • add_done_callback(fn):回調(diào)函數(shù)
  • done():判斷某一個(gè)線程是否完成
  • cancle():取消某個(gè)任務(wù)

3、ProcessPoolExecutor、ThreadPoolExecutor線程池

ThreadPoolExecutor構(gòu)造實(shí)例的時(shí)候,傳入max_workers參數(shù)來(lái)設(shè)置線程中最多能同時(shí)運(yùn)行的線程數(shù)目 。

使用submit函數(shù)來(lái)提交線程需要執(zhí)行任務(wù)(函數(shù)名和參數(shù))到線程池中,并返回該任務(wù)的句柄(類似于文件、畫圖),注意submit()不是阻塞的,而是立即返回。

通過(guò)submit函數(shù)返回的任務(wù)句柄,能夠使用done()方法判斷該任務(wù)是否結(jié)束。

使用result()方法可以獲取任務(wù)的返回值,查看內(nèi)部代碼,發(fā)現(xiàn)這個(gè)方法是阻塞的。

對(duì)于頻繁的cpu操作,由于GIL鎖的原因,多個(gè)線程只能用一個(gè)cpu,這時(shí)多進(jìn)程的執(zhí)行效率要比多線程高。

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(
future.result())

4、過(guò)wait()判斷線程執(zhí)行的狀態(tài):

wait方法可以讓主線程阻塞,直到滿足設(shè)定的要求。

wait(fs, timeout=None, return_when=ALL_COMPLETED),wait接受3個(gè)參數(shù),

  • s表示執(zhí)行的task序列;
  • timeout表示等待的最長(zhǎng)時(shí)間,超過(guò)這個(gè)時(shí)間即使線程未執(zhí)行完成也將返回;
  • return_when表示wait返回結(jié)果的條件,默認(rèn)為ALL_COMPLETED全部執(zhí)行完成再返回
import time
from concurrent.futures import (
    ThreadPoolExecutor, wait
)


def get_thread_time(times):
    time.sleep(times)
    return times


start = time.time()
executor = ThreadPoolExecutor(max_workers=4)
task_list = [executor.submit(get_thread_time, times) for times in [1, 2, 3, 4]]
i = 1
for task in task_list:
    print("task{}:{}".format(i, task))
    i += 1
print(wait(task_list, timeout=2.5))

# wait在2.5秒后返回線程的狀態(tài),result:
# task1:<Future at 0x7ff3c885f208 state=running>
# task2:<Future at 0x7ff3c885fb00 state=running>
# task3:<Future at 0x7ff3c764b2b0 state=running>
# task4:<Future at 0x7ff3c764b9b0 state=running>
# DoneAndNotDoneFutures(
# done={<Future at 0x7ff3c885f208 state=finished returned int>, <Future at 0x7ff3c885fb00 state=finished returned int>},
# not_done={<Future at 0x7ff3c764b2b0 state=running>, <Future at 0x7ff3c764b9b0 state=running>})
#
# 可以看到在timeout 2.5時(shí),task1和task2執(zhí)行完畢,task3和task4仍在執(zhí)行中

4、map的用法

map(fn, *iterables, timeout=None),第一個(gè)參數(shù)fn是線程執(zhí)行的函數(shù);第二個(gè)參數(shù)接受一個(gè)可迭代對(duì)象;第三個(gè)參數(shù)timeout跟wait()的timeout一樣,但由于map是返回線程執(zhí)行的結(jié)果,如果timeout小于線程執(zhí)行時(shí)間會(huì)拋異常TimeoutError。

map的返回是有序的,它會(huì)根據(jù)第二個(gè)參數(shù)的順序返回執(zhí)行的結(jié)果:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12))

 #map取代了for+submit

5、s_completed返回線程執(zhí)行結(jié)果

上面雖然提供了判斷任務(wù)是否結(jié)束的方法,但是不能在主線程中一直判斷,有時(shí)候我們是得知某個(gè)任務(wù)結(jié)束了,就去獲取結(jié)果,而不是一直判斷每個(gè)任務(wù)有沒(méi)有結(jié)束。這是就可以使用as_completed方法一次取出所有任務(wù)的結(jié)果。

import time
from collections import OrderedDict
from concurrent.futures import (
    ThreadPoolExecutor, as_completed
)


def get_thread_time(times):
    time.sleep(times)
    return times


start = time.time()
executor = ThreadPoolExecutor(max_workers=4)
task_list = [executor.submit(get_thread_time, times) for times in [2, 3, 1, 4]]
task_to_time = OrderedDict(zip(["task1", "task2", "task3", "task4"],[2, 3, 1, 4]))
task_map = OrderedDict(zip(task_list, ["task1", "task2", "task3", "task4"]))

for result in as_completed(task_list):
    task_name = task_map.get(result)
    print("{}:{}".format(task_name,task_to_time.get(task_name)))

# task3: 1
# task1: 2
# task2: 3
# task4: 4

task1、task2、task3、task4的等待時(shí)間分別為2s、3s、1s、4s,通過(guò)as_completed返回執(zhí)行完的線程結(jié)果,as_completed(fs, timeout=None)接受2個(gè)參數(shù),第一個(gè)是執(zhí)行的線程列表,第二個(gè)參數(shù)timeout與map的timeout一樣,當(dāng)timeout小于線程執(zhí)行時(shí)間會(huì)拋異常TimeoutError。

通過(guò)執(zhí)行結(jié)果可以看出,as_completed返回的順序是線程執(zhí)行結(jié)束的順序,最先執(zhí)行結(jié)束的線程最早返回。

6、回調(diào)函數(shù)

Future對(duì)象也可以像協(xié)程一樣,當(dāng)它設(shè)置完成結(jié)果時(shí),就可以立即進(jìn)行回調(diào)別的函數(shù)。add_done_callback(fn),則表示 Futures 完成后,會(huì)調(diào)?fn函數(shù)。

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進(jìn)程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進(jìn)程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個(gè)future對(duì)象obj,需要用obj.result()拿到結(jié)果

到此這篇關(guān)于Python線程操作模塊(oncurrent)的文章就介紹到這了。希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Python使用win32com.client的方法示例

    Python使用win32com.client的方法示例

    本文主要介紹了Python使用win32com.client的方法示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-02-02
  • python兩種遍歷字典(dict)的方法比較

    python兩種遍歷字典(dict)的方法比較

    這篇文章主要介紹了python兩種遍歷字典(dict)的方法比較,同時(shí)介紹了dict遍歷中帶括號(hào)與不帶括號(hào)的性能問(wèn)題,需要的朋友可以參考下
    2014-05-05
  • python自動(dòng)化之如何利用allure生成測(cè)試報(bào)告

    python自動(dòng)化之如何利用allure生成測(cè)試報(bào)告

    這篇文章主要給大家介紹了關(guān)于python自動(dòng)化之如何利用allure生成測(cè)試報(bào)告的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-05-05
  • python實(shí)現(xiàn)簡(jiǎn)單的聊天小程序

    python實(shí)現(xiàn)簡(jiǎn)單的聊天小程序

    這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)簡(jiǎn)單的聊天小程序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • Python 字符串去除空格的五種方法

    Python 字符串去除空格的五種方法

    這篇文章主要介紹了Python 字符串去除空格的五種方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • 深入了解python中的常見錯(cuò)誤類型與解決

    深入了解python中的常見錯(cuò)誤類型與解決

    在Python編程過(guò)程中,經(jīng)常會(huì)遇到各種錯(cuò)誤,了解這些錯(cuò)誤的類型以及如何處理它們是成為一位優(yōu)秀的Python開發(fā)者所必備的技能之一,下面就跟隨小編一起學(xué)習(xí)一下python中的常見錯(cuò)誤類型吧
    2023-11-11
  • Python?pandas.replace的用法詳解

    Python?pandas.replace的用法詳解

    在處理數(shù)據(jù)的時(shí)候,很多時(shí)候會(huì)遇到批量替換的情況,如果一個(gè)一個(gè)去修改效率過(guò)低,也容易出錯(cuò),replace()是很好的方法,下面這篇文章主要給大家介紹了關(guān)于Python?pandas.replace用法的相關(guān)資料,需要的朋友可以參考下
    2022-06-06
  • No module named 'plotly.graph_objects'報(bào)錯(cuò)解決

    No module named 'plotly.graph_objects&ap

    這篇文章主要為大家介紹了No module named 'plotly.graph_objects'報(bào)錯(cuò)解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-12-12
  • pd.drop_duplicates刪除重復(fù)行的方法實(shí)現(xiàn)

    pd.drop_duplicates刪除重復(fù)行的方法實(shí)現(xiàn)

    drop_duplicates 方法實(shí)現(xiàn)對(duì)數(shù)據(jù)框 DataFrame 去除特定列的重復(fù)行,本文主要介紹了pd.drop_duplicates刪除重復(fù)行的方法實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-06-06
  • pycharm設(shè)置當(dāng)前工作目錄的操作(working directory)

    pycharm設(shè)置當(dāng)前工作目錄的操作(working directory)

    今天小編就為大家分享一篇pycharm設(shè)置當(dāng)前工作目錄的操作(working directory),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-02-02

最新評(píng)論