Python全棧之協(xié)程詳解
1. 線程隊列
# ### 線程隊列 from queue import Queue """ put 存放 超出隊列長度阻塞 get 獲取 超出隊列長度阻塞 put_nowait 存放,超出隊列長度報錯 get_nowait 獲取,超出隊列長度報錯 """ # (1) Queue """先進(jìn)先出,后進(jìn)先出""" q = Queue() q.put(100) q.put(200) print(q.get()) # print(q.get()) # print(q.get()) 阻塞 # print(q.get_nowait()) # print(q.get_nowait()) 報錯 # Queue(3) => 指定隊列長度, 元素個數(shù)只能是3個; q2 = Queue(3) q2.put(1000) q2.put(2000) # q2.put(3000) # q2.put(4000) 阻塞 q2.put_nowait(6000) # q2.put_nowait(4000) 報錯 # (2) LifoQueue """先進(jìn)后出,后進(jìn)先出(棧的特點)""" from queue import LifoQueue lq = LifoQueue() lq.put(110) lq.put(120) lq.put(119) print(lq.get()) print(lq.get()) print(lq.get()) # (3) PriorityQueue """按照優(yōu)先級順序進(jìn)行排序存放(默認(rèn)從小到大)""" """在一個優(yōu)先級隊列中,要放同一類型的數(shù)據(jù),不能混合使用""" from queue import PriorityQueue pq = PriorityQueue() # 1.對數(shù)字進(jìn)行排序 pq.put(100) pq.put(19) pq.put(-90) pq.put(88) print(pq.get()) print(pq.get()) print(pq.get()) print(pq.get()) # 2.對字母進(jìn)行排序 (按照ascii編碼) pq.put("wangwen") pq.put("sunjian") pq.put('wangwei') pq.put("王文") pq.put("孫堅") pq.put('王維') print( pq.get() ) print( pq.get() ) print( pq.get() ) print( pq.get() ) print( pq.get() ) print( pq.get() ) # 3.對容器進(jìn)行排序 pq.put( (22,"wangwen") ) pq.put( (67,"wangyuhan") ) pq.put( (3,"anxiaodong") ) pq.put( (3,"liuyubo") ) print(pq.get()) print(pq.get()) print(pq.get()) print(pq.get()) # 4.注意點 pq.put(100) pq.put("nihao") pq.put( (1,2,3) )
2. 進(jìn)程池_線程池
知識點:
# 線程池 # 實例化線程池 ThreadPoolExcutor (推薦5*cpu_count) # 異步提交任務(wù) submit / map # 阻塞直到任務(wù)完成 shutdown # 獲取子線程的返回值 result # 使用回調(diào)函數(shù) add_done_callback # 線程池 是由子線程實現(xiàn)的 # 進(jìn)程池 是由主進(jìn)程實現(xiàn)的
程序?qū)崿F(xiàn):
# ### 進(jìn)程池 和 線程池 from concurrent.futures import ProcessPoolExecutor , ThreadPoolExecutor import os,time,random # 獲取的邏輯處理器 # print(os.cpu_count()) """多條進(jìn)程提前開辟,可觸發(fā)多cpu的并行效果""" ''' # (1) 進(jìn)程池 ProcessPoolExecutor def func(i): # print(i) time.sleep(random.uniform(0.1,0.8)) print(" 任務(wù)執(zhí)行中 ... start ... 進(jìn)程號{}".format(os.getpid()) , i ) print(" 任務(wù)執(zhí)行中 ... end ... 進(jìn)程號{}".format(os.getpid())) return i if __name__ == "__main__": lst = [] # (1) 創(chuàng)建進(jìn)程池對象 """默認(rèn)參數(shù)是 系統(tǒng)最大的邏輯核心數(shù) 4""" p = ProcessPoolExecutor() # (2) 異步提交任務(wù) """submit(任務(wù),參數(shù)1,參數(shù)2 ... )""" """默認(rèn)如果一個進(jìn)程短時間內(nèi)可以完成更多的任務(wù),進(jìn)程池就不會使用更多的進(jìn)程來輔助完成 , 可以節(jié)省系統(tǒng)資源的損耗;""" for i in range(10): obj = p.submit( func , i ) # print(obj) # print(obj.result()) 不要寫在這,導(dǎo)致程序同步,內(nèi)部有阻塞 lst.append(obj) # (3) 獲取當(dāng)前任務(wù)的返回值 for i in lst: print(i.result(),">===獲取返回值===?") # (4) shutdown 等待所有進(jìn)程池里的進(jìn)程執(zhí)行完畢之后,在放行 p.shutdown() print("進(jìn)程池結(jié)束 ... ") ''' # (2) ThreadPoolExecutor ''' # from threading import currentThread as ct from threading import current_thread as ct def func(i): print(" 任務(wù)執(zhí)行中 ... start ... 線程號{}".format( ct().ident ) , i ) time.sleep(1) print(" 任務(wù)執(zhí)行中 ... end ... 線程號{}".format(os.getpid())) return ct().ident # 線程號 if __name__ == "__main__": lst = [] setvar = set() """默認(rèn)參數(shù)是 系統(tǒng)最大的邏輯核心數(shù) 4 * 5 = 20""" # (1) 創(chuàng)建線程池對象 t = ThreadPoolExecutor() # 20 # print(t) # (2) 異步提交任務(wù) """默認(rèn)如果一個線程短時間內(nèi)可以完成更多的任務(wù),線程池就不會使用更多的線程來輔助完成 , 可以節(jié)省系統(tǒng)資源的損耗;""" for i in range(100): obj = t.submit(func,i) lst.append(obj) # (3) 獲取當(dāng)前任務(wù)的返回值 for i in lst: setvar.add(i.result()) # (4) shutdown 等待所有線程池里的線程執(zhí)行完畢之后,在放行 t.shutdown() print("主線程執(zhí)行結(jié)束 ... ") print(setvar , len(setvar)) ''' # (3) 線程池 map from threading import currentThread as ct from collections import Iterator,Iterable def func(i): time.sleep(random.uniform(0.1,0.7)) print("thread ... 線程號{}".format(ct().ident),i) return "*" * i if __name__ == "__main__": t = ThreadPoolExecutor() it = t.map(func,range(100)) # 返回的數(shù)據(jù)是迭代器 print(isinstance(it,Iterator)) # 協(xié)調(diào)子父線程,等待線程池中所有線程執(zhí)行完畢之后,在放行; t.shutdown() # 獲取迭代器里面的返回值 for i in it: print(i) """ # 總結(jié): 無論是進(jìn)程池還是線程池,都是由固定的進(jìn)程數(shù)或者線程數(shù)來執(zhí)行所有任務(wù) 系統(tǒng)不會額外創(chuàng)建多余的進(jìn)程或者線程來執(zhí)行任務(wù); """
3. 回調(diào)函數(shù)
知識點:
# 回調(diào)函數(shù) 就是一個參數(shù),將這個函數(shù)作為參數(shù)傳到另一個函數(shù)里面. 函數(shù)先執(zhí)行,再執(zhí)行當(dāng)參數(shù)傳遞的這個函數(shù),這個參數(shù)函數(shù)是回調(diào)函數(shù)
程序?qū)崿F(xiàn):
# ### 回調(diào)函數(shù) """ 回調(diào)函數(shù): 回頭調(diào)用一下函數(shù)獲取最后結(jié)果 微信支付寶付款成功后, 獲取付款金額 微信支付寶退款成功后, 獲取退款金額 一般用在獲取最后的狀態(tài)值時,使用回調(diào) 通過add_done_callback最后調(diào)用一下自定義的回調(diào)函數(shù); """ from concurrent.futures import ProcessPoolExecutor , ThreadPoolExecutor from threading import currentThread as ct import os,time,random """進(jìn)程任務(wù)""" def func1(i): time.sleep(random.uniform(0.1,0.9)) print(" 進(jìn)程任務(wù)執(zhí)行中 ... start ... 進(jìn)程號{}".format(os.getpid()) , i ) print(" 進(jìn)程任務(wù)執(zhí)行中 ... end ... 進(jìn)程號{}".format(os.getpid()) ) return i def call_back1(obj): print( "<==回調(diào)函數(shù)的進(jìn)程號{}==>".format(os.getpid()) ) print(obj.result()) """線程任務(wù)""" def func2(i): time.sleep(random.uniform(0.1,0.9)) print(" 線程任務(wù)執(zhí)行中 ... start ... 線程號{}".format(ct().ident) , i ) print(" 線程任務(wù)執(zhí)行中 ... end ... 線程號{}".format( ct().ident) ) return i def call_back2(obj): print( "<==回調(diào)函數(shù)的線程號{}==>".format( ct().ident) ) print(obj.result()) if __name__ == "__main__": """ # (1)進(jìn)程池 結(jié)果:(進(jìn)程池的回調(diào)函數(shù)由主進(jìn)程執(zhí)行) p = ProcessPoolExecutor() # os.cpu_count() => 4 for i in range(1,11): obj = p.submit(func1 , i ) # 使用add_done_callback在獲取最后返回值的時候,可以異步并行 obj.add_done_callback(call_back1) # 直接使用result獲取返回值的時候,會變成同步程序,速度慢; # obj.result() p.shutdown() print( "主進(jìn)程執(zhí)行結(jié)束...進(jìn)程號:" , os.getpid() ) """ print("<==============================================>") # (2)線程池 結(jié)果:(線程池的回調(diào)函數(shù)由子線程執(zhí)行) t = ThreadPoolExecutor() for i in range(1,11): obj = t.submit(func2 , i ) # 使用add_done_callback在獲取最后返回值的時候,可以異步并發(fā) obj.add_done_callback(call_back2) # 直接使用result獲取返回值的時候,會變成同步程序,速度慢; # obj.result() t.shutdown() print("主線程執(zhí)行結(jié)束 .... 線程號{}".format(ct().ident)) """ # 原型: class Ceshi(): def add_done_callback(self,func): print("系統(tǒng)執(zhí)行操作1 ... ") print("系統(tǒng)執(zhí)行操作2 ... ") # 回頭調(diào)用一下 func(self) def result(self): return 112233 def call_back(obj): print(obj.result()) obj = Ceshi() obj.add_done_callback(call_back) """
4. 協(xié)程
知識點:
#協(xié)程也叫纖程: 協(xié)程是線程的一種實現(xiàn)方式. 指的是一條線程能夠在多任務(wù)之間來回切換的一種實現(xiàn). 對于CPU、操作系統(tǒng)來說,協(xié)程并不存在. 任務(wù)之間的切換會花費時間. 目前電腦配置一般線程開到200會阻塞卡頓. #協(xié)程的實現(xiàn) 協(xié)程幫助你記住哪個任務(wù)執(zhí)行到哪個位置上了,并且實現(xiàn)安全的切換 一個任務(wù)一旦阻塞卡頓,立刻切換到另一個任務(wù)繼續(xù)執(zhí)行,保證線程總是忙碌的,更加充分的利用CPU,搶占更多的時間片 # 一個線程可以由多個協(xié)程來實現(xiàn),協(xié)程之間不會產(chǎn)生數(shù)據(jù)安全問題 #協(xié)程模塊 # greenlet gevent的底層,協(xié)程,切換的模塊 # gevent 直接用的,gevent能提供更全面的功能
程序?qū)崿F(xiàn):
# ### 協(xié)程 """ 進(jìn)程是資源分配的最小單位 線程是程序調(diào)度的最下單位 協(xié)程是線程實現(xiàn)的具體方式 總結(jié): 在進(jìn)程一定的情況下,開辟多個線程, 在線程一定的情況下,創(chuàng)建多個協(xié)程, 以便提高更大的并行并發(fā) """ # (1) 用協(xié)程改寫生產(chǎn)者消費者模型 """ def producer(): for i in range(1000): yield i def consumer(gen): for i in range(10): print( next(gen) ) gen = producer() consumer(gen) print("<==========>") consumer(gen) print("<==========>") consumer(gen) """ # (2) greenlet 協(xié)程的早期版本 from greenlet import greenlet import time """ switch 可以切換任務(wù),但是需要手動切換""" """ def eat(): print("eat1") g2.switch() time.sleep(3) print("eat2") def play(): print("play1") time.sleep(3) print("play2") g1.switch() g1 = greenlet(eat) g2 = greenlet(play) g1.switch() """ # (3) 升級到gevent版本 """自動進(jìn)行任務(wù)上的切換,但是不能識別阻塞""" """ import gevent def eat(): print("eat1") gevent.sleep(3) # time.sleep(3) print("eat2") def play(): print("play1") gevent.sleep(3) # time.sleep(3) print("play2") # 利用gevent.spawn創(chuàng)建協(xié)程對象g1 g1 = gevent.spawn(eat) # 利用gevent.spawn創(chuàng)建協(xié)程對象g2 g2 = gevent.spawn(play) # 如果不加join, 主線程直接結(jié)束任務(wù),不會默認(rèn)等待協(xié)程任務(wù). # 阻塞,必須等待g1任務(wù)完成之后在放行 g1.join() # 阻塞,必須等待g2任務(wù)完成之后在放行 g2.join() print("主線程執(zhí)行結(jié)束 .... ") """ # (4) 協(xié)程的終極版本; from gevent import monkey;monkey.patch_all() """引入猴子補丁,可以實現(xiàn)所有的阻塞全部識別""" import time import gevent def eat(): print("eat1") time.sleep(3) print("eat2") def play(): print("play1") time.sleep(3) print("play2") # 利用gevent.spawn創(chuàng)建協(xié)程對象g1 g1 = gevent.spawn(eat) # 利用gevent.spawn創(chuàng)建協(xié)程對象g2 g2 = gevent.spawn(play) # 如果不加join, 主線程直接結(jié)束任務(wù),不會默認(rèn)等待協(xié)程任務(wù). # 阻塞,必須等待g1任務(wù)完成之后在放行 g1.join() # 阻塞,必須等待g2任務(wù)完成之后在放行 g2.join() print(" 主線程執(zhí)行結(jié)束 ... ") """ # 分號,利用分號可以把多行代碼放在一行進(jìn)行編寫; a = 1 b = 2 a = 1;b = 2 """
==理解:==一個線程上有好多任務(wù),協(xié)程可以記住每個任務(wù)完成的狀態(tài),比如做飯的時候做到一半的時候停下來,去掃地,掃完地之后拐回來做飯,從做到一半的時候開始做。
小提示: 下載gevent包,會自帶greenlet
早期版本的想到在time.sleep執(zhí)行了兩次,每次執(zhí)行了一秒鐘,切換回來有執(zhí)行了一秒,這是模擬早期版本,模擬堵塞
總結(jié):
p.shutdown() 這里的shutdown類似于join 生成器在實例化對象的時候,里面的代碼是不走的,調(diào)用的時候才有,next 調(diào)用等 單線程實現(xiàn)的一種異步并發(fā)的一種結(jié)構(gòu) 協(xié)程能記住任務(wù)的狀態(tài)
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
Python 提取dict轉(zhuǎn)換為xml/json/table并輸出的實現(xiàn)代碼
這篇文章主要介紹了Python 提取dict轉(zhuǎn)換為xml/json/table并輸出的實現(xiàn)代碼,需要的朋友可以參考下2016-08-08以SortedList為例詳解Python的defaultdict對象使用自定義類型的方法
這篇文章主要介紹了以SortedList為例詳解Python的defaultdict對象使用自定義類型的方法,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的朋友可以參考一下2022-07-07python統(tǒng)計函數(shù)被調(diào)用次數(shù)的實現(xiàn)
本文主要介紹了python如何統(tǒng)計函數(shù)被調(diào)用次數(shù),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02