欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python線程、進(jìn)程和協(xié)程詳解

 更新時(shí)間:2016年07月19日 08:57:51   作者:feixuelove1009  
Python被人詬病最多的大概就是性能差,在這里講一下 Python 的多進(jìn)程,多線程與協(xié)程。首先聲明這不是教程,看完這篇文章,大概能夠?qū)?Python 的多進(jìn)程與多線程有一定的了解。

引言

解釋器環(huán)境:python3.5.1

我們都知道python網(wǎng)絡(luò)編程的兩大必學(xué)模塊socket和socketserver,其中的socketserver是一個(gè)支持IO多路復(fù)用和多線程、多進(jìn)程的模塊。一般我們?cè)趕ocketserver服務(wù)端代碼中都會(huì)寫這么一句:

server = socketserver.ThreadingTCPServer(settings.IP_PORT, MyServer)

ThreadingTCPServer這個(gè)類是一個(gè)支持多線程和TCP協(xié)議的socketserver,它的繼承關(guān)系是這樣的:
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

右邊的TCPServer實(shí)際上是它主要的功能父類,而左邊的ThreadingMixIn則是實(shí)現(xiàn)了多線程的類,它自己本身則沒有任何代碼。
MixIn在python的類命名中,很常見,一般被稱為“混入”,戲稱“亂入”,通常為了某種重要功能被子類繼承。

class ThreadingMixIn:
  
 daemon_threads = False
 
 def process_request_thread(self, request, client_address):  
  try:
   self.finish_request(request, client_address)
   self.shutdown_request(request)
  except:
   self.handle_error(request, client_address)
   self.shutdown_request(request)
 
 def process_request(self, request, client_address):
   
  t = threading.Thread(target = self.process_request_thread,
        args = (request, client_address))
  t.daemon = self.daemon_threads
  t.start()

在ThreadingMixIn類中,其實(shí)就定義了一個(gè)屬性,兩個(gè)方法。在process_request方法中實(shí)際調(diào)用的正是python內(nèi)置的多線程模塊threading。這個(gè)模塊是python中所有多線程的基礎(chǔ),socketserver本質(zhì)上也是利用了這個(gè)模塊。

一、線程

復(fù)制代碼 代碼如下:
線程,有時(shí)被稱為輕量級(jí)進(jìn)程(Lightweight Process,LWP),是程序執(zhí)行流的最小單元。一個(gè)標(biāo)準(zhǔn)的線程由線程ID,當(dāng)前指令指針(PC),寄存器集合和堆棧組成。另外,線程是進(jìn)程中的一個(gè)實(shí)體,是被系統(tǒng)獨(dú)立調(diào)度和分派的基本單位,線程自己不獨(dú)立擁有系統(tǒng)資源,但它可與同屬一個(gè)進(jìn)程的其它線程共享該進(jìn)程所擁有的全部資源。一個(gè)線程可以創(chuàng)建和撤消另一個(gè)線程,同一進(jìn)程中的多個(gè)線程之間可以并發(fā)執(zhí)行。由于線程之間的相互制約,致使線程在運(yùn)行中呈現(xiàn)出間斷性。線程也有就緒、阻塞和運(yùn)行三種基本狀態(tài)。就緒狀態(tài)是指線程具備運(yùn)行的所有條件,邏輯上可以運(yùn)行,在等待處理機(jī);運(yùn)行狀態(tài)是指線程占有處理機(jī)正在運(yùn)行;阻塞狀態(tài)是指線程在等待一個(gè)事件(如某個(gè)信號(hào)量),邏輯上不可執(zhí)行。每一個(gè)應(yīng)用程序都至少有一個(gè)進(jìn)程和一個(gè)線程。線程是程序中一個(gè)單一的順序控制流程。在單個(gè)程序中同時(shí)運(yùn)行多個(gè)線程完成不同的被劃分成一塊一塊的工作,稱為多線程。

以上那一段,可以不用看!舉個(gè)例子,廠家要生產(chǎn)某個(gè)產(chǎn)品,在它的生產(chǎn)基地建設(shè)了很多廠房,每個(gè)廠房?jī)?nèi)又有多條流水生產(chǎn)線。所有廠房配合將整個(gè)產(chǎn)品生產(chǎn)出來,某個(gè)廠房?jī)?nèi)的所有流水線將這個(gè)廠房負(fù)責(zé)的產(chǎn)品部分生產(chǎn)出來。每個(gè)廠房擁有自己的材料庫(kù),廠房?jī)?nèi)的生產(chǎn)線共享這些材料。而每一個(gè)廠家要實(shí)現(xiàn)生產(chǎn)必須擁有至少一個(gè)廠房一條生產(chǎn)線。那么這個(gè)廠家就是某個(gè)應(yīng)用程序;每個(gè)廠房就是一個(gè)進(jìn)程;每條生產(chǎn)線都是一個(gè)線程。

1.1 普通的多線程

在python中,threading模塊提供線程的功能。通過它,我們可以輕易的在進(jìn)程中創(chuàng)建多個(gè)線程。下面是個(gè)例子:

import threading
import time
 
def show(arg):
 time.sleep(1)
 print('thread'+str(arg))
 
for i in range(10):
 t = threading.Thread(target=show, args=(i,))
 t.start()
 
print('main thread stop')

上述代碼創(chuàng)建了10個(gè)“前臺(tái)”線程,然后控制器就交給了CPU,CPU根據(jù)指定算法進(jìn)行調(diào)度,分片執(zhí)行指令。

下面是Thread類的主要方法:

start 線程準(zhǔn)備就緒,等待CPU調(diào)度
setName 為線程設(shè)置名稱
getName 獲取線程名稱
setDaemon 設(shè)置為后臺(tái)線程或前臺(tái)線程(默認(rèn))
如果是后臺(tái)線程,主線程執(zhí)行過程中,后臺(tái)線程也在進(jìn)行,主線程執(zhí)行完畢后,后臺(tái)線程不論成功與否,均停止。如果是前臺(tái)線程,主線程執(zhí)行過程中,前臺(tái)線程也在進(jìn)行,主線程執(zhí)行完畢后,等待前臺(tái)線程也執(zhí)行完成后,程序停止。
join 逐個(gè)執(zhí)行每個(gè)線程,執(zhí)行完畢后繼續(xù)往下執(zhí)行,該方法使得多線程變得無意義。
run 線程被cpu調(diào)度后自動(dòng)執(zhí)行線程對(duì)象的run方法  

1.2 自定義線程類

對(duì)于threading模塊中的Thread類,本質(zhì)上是執(zhí)行了它的run方法。因此可以自定義線程類,讓它繼承Thread類,然后重寫run方法。

import threading
class MyThreading(threading.Thread):

 def __init__(self,func,arg):
  super(MyThreading,self).__init__()
  self.func = func
  self.arg = arg

 def run(self):
  self.func(self.arg)

def f1(args):
 print(args)

obj = MyThreading(f1, 123)
obj.start()

1.3 線程鎖

CPU執(zhí)行任務(wù)時(shí),在線程之間是進(jìn)行隨機(jī)調(diào)度的,并且每個(gè)線程可能只執(zhí)行n條代碼后就轉(zhuǎn)而執(zhí)行另外一條線程。由于在一個(gè)進(jìn)程中的多個(gè)線程之間是共享資源和數(shù)據(jù)的,這就容易造成資源搶奪或臟數(shù)據(jù),于是就有了鎖的概念,限制某一時(shí)刻只有一個(gè)線程能訪問某個(gè)指定的數(shù)據(jù)。

1.3.1 未使用鎖

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
NUM = 0
def show():
 global NUM
 NUM += 1
 name = t.getName()
 time.sleep(1)  # 注意,這行語(yǔ)句的位置很重要,必須在NUM被修改后,否則觀察不到臟數(shù)據(jù)的現(xiàn)象。
 print(name, "執(zhí)行完畢后,NUM的值為: ", NUM)

for i in range(10):
 t = threading.Thread(target=show)
 t.start()
print('main thread stop')

上述代碼運(yùn)行后,結(jié)果如下:

main thread stop
Thread-1 執(zhí)行完畢后,NUM的值為: 10
Thread-2 執(zhí)行完畢后,NUM的值為: 10
Thread-4 執(zhí)行完畢后,NUM的值為: 10
Thread-9 執(zhí)行完畢后,NUM的值為: 10
Thread-3 執(zhí)行完畢后,NUM的值為: 10
Thread-6 執(zhí)行完畢后,NUM的值為: 10
Thread-8 執(zhí)行完畢后,NUM的值為: 10
Thread-7 執(zhí)行完畢后,NUM的值為: 10
Thread-5 執(zhí)行完畢后,NUM的值為: 10
Thread-10 執(zhí)行完畢后,NUM的值為: 10

由此可見,由于線程同時(shí)訪問一個(gè)數(shù)據(jù),產(chǎn)生了錯(cuò)誤的結(jié)果。為了解決這個(gè)問題,python在threading模塊中定義了幾種線程鎖類,分別是:

  1. Lock 普通鎖(不可嵌套)
  2. RLock 普通鎖(可嵌套)常用
  3. Semaphore 信號(hào)量
  4. event 事件
  5. condition 條件

1.3.2 普通鎖Lock和RLock

類名:Lock或RLock

普通鎖,也叫互斥鎖,是獨(dú)占的,同一時(shí)刻只有一個(gè)線程被放行。

import time
import threading

NUM = 10
def func(lock):
 global NUM
 lock.acquire() # 讓鎖開始起作用
 NUM -= 1
 time.sleep(1)
 print(NUM)
 lock.release() # 釋放鎖
 
lock = threading.Lock() # 實(shí)例化一個(gè)鎖對(duì)象
for i in range(10):
 t = threading.Thread(target=func, args=(lock,)) # 記得把鎖當(dāng)作參數(shù)傳遞給func參數(shù)
 t.start()

以上是threading模塊的Lock類,它不支持嵌套鎖。RLcok類的用法和Lock一模一樣,但它支持嵌套,因此我們一般直接使用RLcok類。

1.3.3 信號(hào)量(Semaphore)

類名:BoundedSemaphore

這種鎖允許一定數(shù)量的線程同時(shí)更改數(shù)據(jù),它不是互斥鎖。比如地鐵安檢,排隊(duì)人很多,工作人員只允許一定數(shù)量的人進(jìn)入安檢區(qū),其它的人繼續(xù)排隊(duì)。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
import threading

def run(n):
 semaphore.acquire()
 print("run the thread: %s" % n)
 time.sleep(1)
 semaphore.release()

num = 0
semaphore = threading.BoundedSemaphore(5) # 最多允許5個(gè)線程同時(shí)運(yùn)行
for i in range(20):
 t = threading.Thread(target=run, args=(i,))
 t.start()

1.3.4 事件(Event)

類名:Event

事件主要提供了三個(gè)方法 set、wait、clear。

事件機(jī)制:全局定義了一個(gè)“Flag”,如果“Flag”的值為False,那么當(dāng)程序執(zhí)行wait方法時(shí)就會(huì)阻塞,如果“Flag”值為True,那么wait方法時(shí)便不再阻塞。這種鎖,類似交通紅綠燈(默認(rèn)是紅燈),它屬于在紅燈的時(shí)候一次性阻擋所有線程,在綠燈的時(shí)候,一次性放行所有的排隊(duì)中的線程。
clear:將“Flag”設(shè)置為False
set:將“Flag”設(shè)置為True

import threading

def func(e,i):
 print(i)
 e.wait() # 檢測(cè)當(dāng)前event是什么狀態(tài),如果是紅燈,則阻塞,如果是綠燈則繼續(xù)往下執(zhí)行。默認(rèn)是紅燈。
 print(i+100)

event = threading.Event()
for i in range(10):
 t = threading.Thread(target=func, args=(event, i))
 t.start()

event.clear() # 主動(dòng)將狀態(tài)設(shè)置為紅燈
inp = input(">>>")
if inp == "1":
 event.set() # 主動(dòng)將狀態(tài)設(shè)置為綠燈

1.3.5 條件(condition)

類名:Condition

該機(jī)制會(huì)使得線程等待,只有滿足某條件時(shí),才釋放n個(gè)線程。

import threading

def condition():
 ret = False
 r = input(">>>")
 if r == "yes":
  ret = True
 return ret

def func(conn, i):
 print(i)
 conn.acquire()
 conn.wait_for(condition) # 這個(gè)方法接受一個(gè)函數(shù)的返回值
 print(i+100)
 conn.release()

c = threading.Condition()
for i in range(10):
 t = threading.Thread(target=func, args=(c, i,))
 t.start()

上面的例子,每輸入一次“yes”放行了一個(gè)線程。下面這個(gè),可以選擇一次放行幾個(gè)線程。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading

def run(n):
 con.acquire()
 con.wait()
 print("run the thread: %s" %n)
 con.release()

if __name__ == '__main__':

 con = threading.Condition()
 for i in range(10):
  t = threading.Thread(target=run, args=(i,))
  t.start()

 while True:
  inp = input('>>>')
  if inp == "q":
   break
  # 下面這三行是固定語(yǔ)法
  con.acquire()
  con.notify(int(inp)) # 這個(gè)方法接收一個(gè)整數(shù),表示讓多少個(gè)線程通過
  con.release()

1.3 全局解釋器鎖(GIL)

既然介紹了多線程和線程鎖,那就不得不提及python的GIL,也就是全局解釋器鎖。在編程語(yǔ)言的世界,python因?yàn)镚IL的問題廣受詬病,因?yàn)樗诮忉屍鞯膶用嫦拗屏顺绦蛟谕粫r(shí)間只有一個(gè)線程被CPU實(shí)際執(zhí)行,而不管你的程序里實(shí)際開了多少條線程。所以我們經(jīng)常能發(fā)現(xiàn),python中的多線程編程有時(shí)候效率還不如單線程,就是因?yàn)檫@個(gè)原因。那么,對(duì)于這個(gè)GIL,一些普遍的問題如下:

每種編程語(yǔ)言都有GIL嗎?

以python官方Cpython解釋器為代表....其他語(yǔ)言好像未見。

為什么要有GIL?

作為解釋型語(yǔ)言,Python的解釋器必須做到既安全又高效。我們都知道多線程編程會(huì)遇到的問題。解釋器要留意的是避免在不同的線程操作內(nèi)部共享的數(shù)據(jù)。同時(shí)它還要保證在管理用戶線程時(shí)總是有最大化的計(jì)算資源。那么,不同線程同時(shí)訪問時(shí),數(shù)據(jù)的保護(hù)機(jī)制是怎樣的呢?答案是解釋器全局鎖GIL。GIL對(duì)諸如當(dāng)前線程狀態(tài)和為垃圾回收而用的堆分配對(duì)象這樣的東西的訪問提供著保護(hù)。

為什么不能去掉GIL?

首先,在早期的python解釋器依賴較多的全局狀態(tài),傳承下來,使得想要移除當(dāng)今的GIL變得更加困難。其次,對(duì)于程序員而言,僅僅是想要理解它的實(shí)現(xiàn)就需要對(duì)操作系統(tǒng)設(shè)計(jì)、多線程編程、C語(yǔ)言、解釋器設(shè)計(jì)和CPython解釋器的實(shí)現(xiàn)有著非常徹底的理解。

在1999年,針對(duì)Python1.5,一個(gè)“freethreading”補(bǔ)丁已經(jīng)嘗試移除GIL,用細(xì)粒度的鎖來代替。然而,GIL的移除給單線程程序的執(zhí)行速度帶來了一定的負(fù)面影響。當(dāng)用單線程執(zhí)行時(shí),速度大約降低了40%。雖然使用兩個(gè)線程時(shí)在速度上得到了提高,但這個(gè)提高并沒有隨著核數(shù)的增加而線性增長(zhǎng)。因此這個(gè)補(bǔ)丁沒有被采納。

另外,在python的不同解釋器實(shí)現(xiàn)中,如PyPy就移除了GIL,其執(zhí)行速度更快(不單單是去除GIL的原因)。然而,我們通常使用的CPython占有著統(tǒng)治地位的使用量,所以,你懂的。

在Python 3.2中實(shí)現(xiàn)了一個(gè)新的GIL,并且?guī)е恍┓e極的結(jié)果。這是自1992年以來,GIL的一次最主要改變。舊的GIL通過對(duì)Python指令進(jìn)行計(jì)數(shù)來確定何時(shí)放棄GIL。在新的GIL實(shí)現(xiàn)中,用一個(gè)固定的超時(shí)時(shí)間來指示當(dāng)前的線程以放棄這個(gè)鎖。在當(dāng)前線程保持這個(gè)鎖,且當(dāng)?shù)诙€(gè)線程請(qǐng)求這個(gè)鎖的時(shí)候,當(dāng)前線程就會(huì)在5ms后被強(qiáng)制釋放掉這個(gè)鎖(這就是說,當(dāng)前線程每5ms就要檢查其是否需要釋放這個(gè)鎖)。當(dāng)任務(wù)是可行的時(shí)候,這會(huì)使得線程間的切換更加可預(yù)測(cè)。

GIL對(duì)我們有什么影響?

最大的影響是我們不能隨意使用多線程。要區(qū)分任務(wù)場(chǎng)景。

在單核cpu情況下對(duì)性能的影響可以忽略不計(jì),多線程多進(jìn)程都差不多。在多核CPU時(shí),多線程效率較低。GIL對(duì)單進(jìn)程和多進(jìn)程沒有影響。

在實(shí)際使用中有什么好的建議?

建議在IO密集型任務(wù)中使用多線程,在計(jì)算密集型任務(wù)中使用多進(jìn)程。深入研究python的協(xié)程機(jī)制,你會(huì)有驚喜的。

更多的詳細(xì)介紹和說明請(qǐng)參考下面的文獻(xiàn):

原文:
譯文:
Python 最難的問題

1.4 定時(shí)器(Timer)

定時(shí)器,指定n秒后執(zhí)行某操作。很簡(jiǎn)單但很使用的東西。

from threading import Timer
def hello():
 print("hello, world")
t = Timer(1, hello) # 表示1秒后執(zhí)行hello函數(shù)
t.start() 

1.5 隊(duì)列

通常而言,隊(duì)列是一種先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),與之對(duì)應(yīng)的是堆棧這種后進(jìn)先出的結(jié)構(gòu)。但是在python中,它內(nèi)置了一個(gè)queue模塊,它不但提供普通的隊(duì)列,還提供一些特殊的隊(duì)列。具體如下:

  1. queue.Queue :先進(jìn)先出隊(duì)列
  2. queue.LifoQueue :后進(jìn)先出隊(duì)列
  3. queue.PriorityQueue :優(yōu)先級(jí)隊(duì)列
  4. queue.deque :雙向隊(duì)列

1.5.1 Queue:先進(jìn)先出隊(duì)列

這是最常用也是最普遍的隊(duì)列,先看一個(gè)例子。

import queue
q = queue.Queue(5)
q.put(11)
q.put(22)
q.put(33)

print(q.get())
print(q.get())
print(q.get())

Queue類的參數(shù)和方法:

maxsize 隊(duì)列的最大元素個(gè)數(shù),也就是queue.Queue(5)中的5。當(dāng)隊(duì)列內(nèi)的元素達(dá)到這個(gè)值時(shí),后來的元素默認(rèn)會(huì)阻塞,等待隊(duì)列騰出位置。

def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)

qsize() 獲取當(dāng)前隊(duì)列中元素的個(gè)數(shù),也就是隊(duì)列的大小
empty() 判斷當(dāng)前隊(duì)列是否為空,返回True或者False
full() 判斷當(dāng)前隊(duì)列是否已滿,返回True或者False
put(self, block=True, timeout=None)

往隊(duì)列里放一個(gè)元素,默認(rèn)是阻塞和無時(shí)間限制的。如果,block設(shè)置為False,則不阻塞,這時(shí),如果隊(duì)列是滿的,放不進(jìn)去,就會(huì)彈出異常。如果timeout設(shè)置為n秒,則會(huì)等待這個(gè)秒數(shù)后才put,如果put不進(jìn)去則彈出異常。

get(self, block=True, timeout=None)

從隊(duì)列里獲取一個(gè)元素。參數(shù)和put是一樣的意思。

join() 阻塞進(jìn)程,直到所有任務(wù)完成,需要配合另一個(gè)方法task_done。

def join(self):
 with self.all_tasks_done:
  while self.unfinished_tasks:
   self.all_tasks_done.wait()

task_done() 表示某個(gè)任務(wù)完成。每一條get語(yǔ)句后需要一條task_done。

import queue
q = queue.Queue(5)
q.put(11)
q.put(22)
print(q.get())
q.task_done()
print(q.get())
q.task_done()
q.join()

1.5.2 LifoQueue:后進(jìn)先出隊(duì)列

類似于“堆?!保筮M(jìn)先出。也較常用。

import queue
q = queue.LifoQueue()
q.put(123)
q.put(456)
print(q.get())

上述代碼運(yùn)行結(jié)果是:456

1.5.3 PriorityQueue:優(yōu)先級(jí)隊(duì)列

帶有權(quán)重的隊(duì)列,每個(gè)元素都是一個(gè)元組,前面的數(shù)字表示它的優(yōu)先級(jí),數(shù)字越小優(yōu)先級(jí)越高,同樣的優(yōu)先級(jí)先進(jìn)先出

q = queue.PriorityQueue()
q.put((1,"alex1"))
q.put((1,"alex2"))
q.put((1,"alex3"))
q.put((3,"alex3"))
print(q.get())

1.5.4 deque:雙向隊(duì)列

Queue和LifoQueue的“綜合體”,雙向進(jìn)出。方法較多,使用復(fù)雜,慎用!

q = queue.deque()
q.append(123)
q.append(333)
q.appendleft(456)
q.pop()
q.popleft()

1.6 生產(chǎn)者消費(fèi)者模型

利用多線程和隊(duì)列可以搭建一個(gè)生產(chǎn)者消費(fèi)者模型,用于處理大并發(fā)的服務(wù)。

在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。

為什么要使用生產(chǎn)者和消費(fèi)者模式

在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。

什么是生產(chǎn)者消費(fèi)者模式

生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。

這個(gè)阻塞隊(duì)列就是用來給生產(chǎn)者和消費(fèi)者解耦的??v觀大多數(shù)設(shè)計(jì)模式,都會(huì)找一個(gè)第三者出來進(jìn)行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。在學(xué)習(xí)一些設(shè)計(jì)模式的過程中,如果先找到這個(gè)模式的第三者,能幫助我們快速熟悉一個(gè)設(shè)計(jì)模式。

以上摘自方騰飛的《聊聊并發(fā)——生產(chǎn)者消費(fèi)者模式》

下面是一個(gè)簡(jiǎn)單的廚師做包子,顧客吃包子的例子。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Liu Jiang
import time
import queue
import threading

q = queue.Queue(10)
def productor(i):
 while True:
  q.put("廚師 %s 做的包子!"%i)
  time.sleep(2)

def consumer(k):
 while True:
  print("顧客 %s 吃了一個(gè) %s"%(k,q.get()))
  time.sleep(1)

for i in range(3):
 t = threading.Thread(target=productor,args=(i,))
 t.start()

for k in range(10):
 v = threading.Thread(target=consumer,args=(k,))
 v.start()

1.7 線程池

在使用多線程處理任務(wù)時(shí)也不是線程越多越好,由于在切換線程的時(shí)候,需要切換上下文環(huán)境,依然會(huì)造成cpu的大量開銷。為解決這個(gè)問題,線程池的概念被提出來了。預(yù)先創(chuàng)建好一個(gè)較為優(yōu)化的數(shù)量的線程,讓過來的任務(wù)立刻能夠使用,就形成了線程池。在python中,沒有內(nèi)置的較好的線程池模塊,需要自己實(shí)現(xiàn)或使用第三方模塊。下面是一個(gè)簡(jiǎn)單的線程池:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Liu Jiang

import queue
import time
import threading

class MyThreadPool:
 def __init__(self, maxsize=5):
  self.maxsize = maxsize
  self._q = queue.Queue(maxsize)
  for i in range(maxsize):
   self._q.put(threading.Thread)
   
 def get_thread(self):
  return self._q.get()
  
 def add_thread(self):
  self._q.put(threading.Thread)

def task(i, pool):
 print(i)
 time.sleep(1)
 pool.add_thread()

pool = MyThreadPool(5)
for i in range(100):
 t = pool.get_thread()
 obj = t(target=task, args=(i,pool))
 obj.start()

上面的例子是把線程類當(dāng)做元素添加到隊(duì)列內(nèi)。實(shí)現(xiàn)方法比較糙,每個(gè)線程使用后就被拋棄,一開始就將線程開到滿,因此性能較差。下面是一個(gè)相對(duì)好一點(diǎn)的例子:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object() # 創(chuàng)建空對(duì)象

class ThreadPool(object):

 def __init__(self, max_num, max_task_num = None):
  if max_task_num:
   self.q = queue.Queue(max_task_num)
  else:
   self.q = queue.Queue()
  self.max_num = max_num
  self.cancel = False
  self.terminal = False
  self.generate_list = []
  self.free_list = []

 def run(self, func, args, callback=None):
  """
  線程池執(zhí)行一個(gè)任務(wù)
  :param func: 任務(wù)函數(shù)
  :param args: 任務(wù)函數(shù)所需參數(shù)
  :param callback: 任務(wù)執(zhí)行失敗或成功后執(zhí)行的回調(diào)函數(shù),回調(diào)函數(shù)有兩個(gè)參數(shù)1、任務(wù)函數(shù)執(zhí)行狀態(tài);2、任務(wù)函數(shù)返回值(默認(rèn)為None,即:不執(zhí)行回調(diào)函數(shù))
  :return: 如果線程池已經(jīng)終止,則返回True否則None
  """
  if self.cancel:
   return
  if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
   self.generate_thread()
  w = (func, args, callback,)
  self.q.put(w)

 def generate_thread(self):
  """
  創(chuàng)建一個(gè)線程
  """
  t = threading.Thread(target=self.call)
  t.start()

 def call(self):
  """
  循環(huán)去獲取任務(wù)函數(shù)并執(zhí)行任務(wù)函數(shù)
  """
  current_thread = threading.currentThread
  self.generate_list.append(current_thread)

  event = self.q.get()
  while event != StopEvent:

   func, arguments, callback = event
   try:
    result = func(*arguments)
    success = True
   except Exception as e:
    success = False
    result = None

   if callback is not None:
    try:
     callback(success, result)
    except Exception as e:
     pass

   with self.worker_state(self.free_list, current_thread):
    if self.terminal:
     event = StopEvent
    else:
     event = self.q.get()
  else:

   self.generate_list.remove(current_thread)

 def close(self):
  """
  執(zhí)行完所有的任務(wù)后,所有線程停止
  """
  self.cancel = True
  full_size = len(self.generate_list)
  while full_size:
   self.q.put(StopEvent)
   full_size -= 1

 def terminate(self):
  """
  無論是否還有任務(wù),終止線程
  """
  self.terminal = True

  while self.generate_list:
   self.q.put(StopEvent)

  self.q.empty()

 @contextlib.contextmanager
 def worker_state(self, state_list, worker_thread):
  """
  用于記錄線程中正在等待的線程數(shù)
  """
  state_list.append(worker_thread)
  try:
   yield
  finally:
   state_list.remove(worker_thread)

# How to use
pool = ThreadPool(5)

def callback(status, result):
 # status, execute action status
 # result, execute action return value
 pass

def action(i):
 print(i)

for i in range(30):
 ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()

二、進(jìn)程

在python中multiprocess模塊提供了Process類,實(shí)現(xiàn)進(jìn)程相關(guān)的功能。但是,由于它是基于fork機(jī)制的,因此不被windows平臺(tái)支持。想要在windows中運(yùn)行,必須使用if __name__ == '__main__:的方式,顯然這只能用于調(diào)試和學(xué)習(xí),不能用于實(shí)際環(huán)境。

(PS:在這里我必須吐槽一下python的包、模塊和類的組織結(jié)構(gòu)。在multiprocess中你既可以import大寫的Process,也可以import小寫的process,這兩者是完全不同的東西。這種情況在python中很多,新手容易傻傻分不清。)

下面是一個(gè)簡(jiǎn)單的多進(jìn)程例子,你會(huì)發(fā)現(xiàn)Process的用法和Thread的用法幾乎一模一樣。

from multiprocessing import Process

def foo(i):
 print("This is Process ", i)

if __name__ == '__main__':
 for i in range(5):
  p = Process(target=foo, args=(i,))
  p.start()

2.1 進(jìn)程的數(shù)據(jù)共享

每個(gè)進(jìn)程都有自己獨(dú)立的數(shù)據(jù)空間,不同進(jìn)程之間通常是不能共享數(shù)據(jù),創(chuàng)建一個(gè)進(jìn)程需要非常大的開銷。

from multiprocessing import Process
list_1 = []
def foo(i):
 list_1.append(i)
 print("This is Process ", i," and list_1 is ", list_1)

if __name__ == '__main__':
 for i in range(5):
  p = Process(target=foo, args=(i,))
  p.start()

 print("The end of list_1:", list_1)

運(yùn)行上面的代碼,你會(huì)發(fā)現(xiàn)列表list_1在各個(gè)進(jìn)程中只有自己的數(shù)據(jù),完全無法共享。想要進(jìn)程之間進(jìn)行資源共享可以使用queues/Array/Manager這三個(gè)multiprocess模塊提供的類。

2.1.1 使用Array共享數(shù)據(jù)

from multiprocessing import Process
from multiprocessing import Array

def Foo(i,temp):
 temp[0] += 100
 for item in temp:
  print(i,'----->',item)

if __name__ == '__main__':
 temp = Array('i', [11, 22, 33, 44])
 for i in range(2):
  p = Process(target=Foo, args=(i,temp))
  p.start()

對(duì)于Array數(shù)組類,括號(hào)內(nèi)的“i”表示它內(nèi)部的元素全部是int類型,而不是指字符i,列表內(nèi)的元素可以預(yù)先指定,也可以指定列表長(zhǎng)度。概括的來說就是Array類在實(shí)例化的時(shí)候就必須指定數(shù)組的數(shù)據(jù)類型和數(shù)組的大小,類似temp = Array('i', 5)。對(duì)于數(shù)據(jù)類型有下面的表格對(duì)應(yīng):

'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

2.1.2 使用Manager共享數(shù)據(jù)

from multiprocessing import Process,Manager

def Foo(i,dic):
 dic[i] = 100+i
 print(dic.values())

if __name__ == '__main__':
 manage = Manager()
 dic = manage.dict()
 for i in range(10):
  p = Process(target=Foo, args=(i,dic))
  p.start()
  p.join()

Manager比Array要好用一點(diǎn),因?yàn)樗梢酝瑫r(shí)保存多種類型的數(shù)據(jù)格式。

2.1.3 使用queues的Queue類共享數(shù)據(jù)

import multiprocessing
from multiprocessing import Process
from multiprocessing import queues

def foo(i,arg):
 arg.put(i)
 print('The Process is ', i, "and the queue's size is ", arg.qsize())

if __name__ == "__main__":
 li = queues.Queue(20, ctx=multiprocessing)
 for i in range(10):
  p = Process(target=foo, args=(i,li,))
  p.start()

這里就有點(diǎn)類似上面的隊(duì)列了。從運(yùn)行結(jié)果里,你還能發(fā)現(xiàn)數(shù)據(jù)共享中存在的臟數(shù)據(jù)問題。另外,比較悲催的是multiprocessing里還有一個(gè)Queue,一樣能實(shí)現(xiàn)這個(gè)功能。

2.2 進(jìn)程鎖

為了防止和多線程一樣的出現(xiàn)數(shù)據(jù)搶奪和臟數(shù)據(jù)的問題,同樣需要設(shè)置進(jìn)程鎖。與threading類似,在multiprocessing里也有同名的鎖類RLock, Lock, Event, Condition, Semaphore,連用法都是一樣樣的?。ㄟ@個(gè)我喜歡)

from multiprocessing import Process
from multiprocessing import queues
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import multiprocessing
import time

def foo(i,lis,lc):
 lc.acquire()
 lis[0] = lis[0] - 1
 time.sleep(1)
 print('say hi',lis[0])
 lc.release()

if __name__ == "__main__":
 # li = []
 li = Array('i', 1)
 li[0] = 10
 lock = RLock()
 for i in range(10):
  p = Process(target=foo,args=(i,li,lock))
  p.start()

2.3 進(jìn)程池

既然有線程池,那必然也有進(jìn)程池。但是,python給我們內(nèi)置了一個(gè)進(jìn)程池,不需要像線程池那樣需要自定義,你只需要簡(jiǎn)單的from multiprocessing import Pool。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time

def f1(args):
 time.sleep(1)
 print(args)

if __name__ == '__main__':
 p = Pool(5)
 for i in range(30):
  p.apply_async(func=f1, args= (i,))
 p.close()   # 等子進(jìn)程執(zhí)行完畢后關(guān)閉線程池
 # time.sleep(2)
 # p.terminate()  # 立刻關(guān)閉線程池
 p.join()

進(jìn)程池內(nèi)部維護(hù)一個(gè)進(jìn)程序列,當(dāng)使用時(shí),去進(jìn)程池中獲取一個(gè)進(jìn)程,如果進(jìn)程池序列中沒有可供使用的進(jìn)程,那么程序就會(huì)等待,直到進(jìn)程池中有可用進(jìn)程為止。

進(jìn)程池中有以下幾個(gè)主要方法:

  1. apply:從進(jìn)程池里取一個(gè)進(jìn)程并執(zhí)行
  2. apply_async:apply的異步版本
  3. terminate:立刻關(guān)閉線程池
  4. join:主進(jìn)程等待所有子進(jìn)程執(zhí)行完畢,必須在close或terminate之后
  5. close:等待所有進(jìn)程結(jié)束后,才關(guān)閉線程池

三、協(xié)程

線程和進(jìn)程的操作是由程序觸發(fā)系統(tǒng)接口,最后的執(zhí)行者是系統(tǒng),它本質(zhì)上是操作系統(tǒng)提供的功能。而協(xié)程的操作則是程序員指定的,在python中通過yield,人為的實(shí)現(xiàn)并發(fā)處理。

協(xié)程存在的意義:對(duì)于多線程應(yīng)用,CPU通過切片的方式來切換線程間的執(zhí)行,線程切換時(shí)需要耗時(shí)。協(xié)程,則只使用一個(gè)線程,分解一個(gè)線程成為多個(gè)“微線程”,在一個(gè)線程中規(guī)定某個(gè)代碼塊的執(zhí)行順序。

協(xié)程的適用場(chǎng)景:當(dāng)程序中存在大量不需要CPU的操作時(shí)(IO)。

在不需要自己“造輪子”的年代,同樣有第三方模塊為我們提供了高效的協(xié)程,這里介紹一下greenlet和gevent。本質(zhì)上,gevent是對(duì)greenlet的高級(jí)封裝,因此一般用它就行,這是一個(gè)相當(dāng)高效的模塊。

在使用它們之前,需要先安裝,可以通過源碼,也可以通過pip。

3.1 greenlet

from greenlet import greenlet

def test1():
 print(12)
 gr2.switch()
 print(34)
 gr2.switch()

def test2():
 print(56)
 gr1.switch()
 print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

實(shí)際上,greenlet就是通過switch方法在不同的任務(wù)之間進(jìn)行切換。

3.2 gevent

from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
 print('GET: %s' % url)
 resp = requests.get(url)
 data = resp.text
 print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
  gevent.spawn(f, 'https://www.python.org/'),
  gevent.spawn(f, 'https://www.yahoo.com/'),
  gevent.spawn(f, 'https://github.com/'),
])

通過joinall將任務(wù)f和它的參數(shù)進(jìn)行統(tǒng)一調(diào)度,實(shí)現(xiàn)單線程中的協(xié)程。代碼封裝層次很高,實(shí)際使用只需要了解它的幾個(gè)主要方法即可。

相關(guān)文章

  • Python Flask上下文管理機(jī)制實(shí)例解析

    Python Flask上下文管理機(jī)制實(shí)例解析

    這篇文章主要介紹了Python Flask上下文管理機(jī)制實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-03-03
  • python原始套接字編程示例分享

    python原始套接字編程示例分享

    在實(shí)驗(yàn)中需要自己構(gòu)造單獨(dú)的HTTP數(shù)據(jù)報(bào)文,而使用SOCK_STREAM進(jìn)行發(fā)送數(shù)據(jù)包,需要進(jìn)行完整的TCP交互。因此想使用原始套接字進(jìn)行編程,直接構(gòu)造數(shù)據(jù)包,并在IP層進(jìn)行發(fā)送,即采用SOCK_RAW進(jìn)行數(shù)據(jù)發(fā)送。使用SOCK_RAW的優(yōu)勢(shì)是,可以對(duì)數(shù)據(jù)包進(jìn)行完整的修改,可以處理IP層上的所有數(shù)據(jù)包,對(duì)各字段進(jìn)行修改,而不受UDP和TCP的限制。
    2014-02-02
  • Python?中?yeild?的用法詳解

    Python?中?yeild?的用法詳解

    yield?是?Python?中的關(guān)鍵字,用于生成器函數(shù)中,可以將函數(shù)變成一個(gè)迭代器,實(shí)現(xiàn)惰性計(jì)算,節(jié)省內(nèi)存空間。本文將介紹?yield?的基本用法和實(shí)現(xiàn)原理,以及與?yield?相關(guān)的注意事項(xiàng)和常見問題。
    2023-06-06
  • Python?if?else語(yǔ)句對(duì)縮進(jìn)的要求

    Python?if?else語(yǔ)句對(duì)縮進(jìn)的要求

    這篇文章主要介紹了Python?if?else語(yǔ)句對(duì)縮進(jìn)的要求,前面的一篇文章展示了選擇結(jié)構(gòu)的三種基本形式,并給出了實(shí)例演示,這篇文章基于上一篇內(nèi)容繼續(xù)對(duì)Python?if?else語(yǔ)句對(duì)縮進(jìn)進(jìn)行描述,需要的小伙伴可以參考一下
    2022-03-03
  • python生成器推導(dǎo)式用法簡(jiǎn)單示例

    python生成器推導(dǎo)式用法簡(jiǎn)單示例

    這篇文章主要介紹了python生成器推導(dǎo)式用法,結(jié)合簡(jiǎn)單實(shí)例形式分析了Python生成器推導(dǎo)式的原理、使用方法及相關(guān)操作注意事項(xiàng),需要的朋友可以參考下
    2019-10-10
  • python+selenium+autoit實(shí)現(xiàn)文件上傳功能

    python+selenium+autoit實(shí)現(xiàn)文件上傳功能

    這篇文章主要介紹了python+selenium+autoit實(shí)現(xiàn)文件上傳功能,需要的朋友可以參考下
    2017-08-08
  • Windows+Anaconda3+PyTorch+PyCharm的安裝教程圖文詳解

    Windows+Anaconda3+PyTorch+PyCharm的安裝教程圖文詳解

    這篇文章主要介紹了Windows+Anaconda3+PyTorch+PyCharm的安裝教程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-04-04
  • 解決Python報(bào)錯(cuò):ValueError:operands?could?not?be?broadcast?together?with?shapes

    解決Python報(bào)錯(cuò):ValueError:operands?could?not?be?broadcast?t

    這篇文章主要給大家介紹了關(guān)于解決Python報(bào)錯(cuò):ValueError:operands?could?not?be?broadcast?together?with?shapes的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-02-02
  • Python異常學(xué)習(xí)筆記

    Python異常學(xué)習(xí)筆記

    這篇文章主要介紹了Python異常學(xué)習(xí)筆記,本文著重講解了如何自定義一個(gè)異常,需要的朋友可以參考下
    2015-02-02
  • Python中reset_index()函數(shù)的使用

    Python中reset_index()函數(shù)的使用

    本文主要介紹了Python中reset_index()函數(shù)的使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-05-05

最新評(píng)論