欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python并發(fā)多線程的具體操作步驟

 更新時間:2024年02月07日 09:11:09   作者:輝輝輝輝輝輝輝輝輝輝輝  
并發(fā)指的是任務數(shù)多余cpu核數(shù),通過操作系統(tǒng)的各種任務調(diào)度算法,實現(xiàn)用多任務一起執(zhí)行,下面這篇文章主要給大家介紹了關于Python并發(fā)多線程的具體操作步驟的相關資料,需要的朋友可以參考下

一、threading模塊介紹

multiprocess模塊的完全模仿了threading模塊的接口,二者在使用層面,有很大的相似性

二、開啟線程的兩種方式

方式一

#方式一
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    print('主線程')

方式二

# 方式二
from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)

if __name__ == '__main__':
    t = Sayhi('ly')
    t.start()
    print('主線程')

三、在一個進程下開啟多個線程與在一個進程下開啟多個子進程的區(qū)別

1 誰的開啟速度快

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    # 在主進程下開啟線程
    t=Thread(target=work)
    t.start()
    print('主線程/主進程')
    '''
    打印結果:
    hello
    主線程/主進程
    '''

    # 在主進程下開啟子進程
    t=Process(target=work)
    t.start()
    print('主線程/主進程')
    '''
    打印結果:
    主線程/主進程
    hello
    '''

2 瞅一瞅pid

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    # part1:在主進程下開啟多個線程,每個線程都跟主進程的pid一樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主線程/主進程pid',os.getpid())

    # part2:開多個進程,每個進程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主線程/主進程pid',os.getpid())

3 同一進程內(nèi)的線程共享該進程的數(shù)據(jù)?

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫無疑問子進程p已經(jīng)將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進程的n仍然為100

    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('主',n) #查看結果為0,因為同一進程內(nèi)的線程之間共享進程內(nèi)的數(shù)據(jù)

四、練習

練習一:

多線程并發(fā)的socket服務端

# -*- coding: UTF-8 -*-
#!/usr/bin/env python3

import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()

        p=threading.Thread(target=action,args=(conn,))
        p.start()

客戶端

# -*- coding: UTF-8 -*-
#!/usr/bin/env python3

import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)

練習二:三個任務,一個接收用戶輸入,一個將用戶輸入的內(nèi)容格式化成大寫,一個將格式化后的結果存入文件

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:
                res=format_l.pop()
                f.write('%s\n' %res)

if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()

五、線程相關的其他方法

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
  # threading.activeCount(): 返回正在運行的線程數(shù)量,與len(threading.enumerate())有相同的結果。
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())

if __name__ == '__main__':
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內(nèi)有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')

    '''
    打印結果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主線程/主進程
    Thread-1
    '''

主線程等待子線程結束

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('ly',))
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())
    '''
    ly say hello
    主線程
    False
    '''

六、守護進程

無論是進程還是線程,都遵循:守護xxx會等待主xxx運行完畢后被銷毀

需要強調(diào)的是:運行完畢并非終止運行

1.對主進程來說,運行完畢指的是主進程代碼運行完畢
2.對主線程來說,運行完畢指的是主線程所在的進程內(nèi)所有非守護線程統(tǒng)統(tǒng)運行完畢,主線程才算運行完畢

詳細解釋:

#1 主進程在其代碼結束后就已經(jīng)算運行完畢了(守護進程在此時就被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產(chǎn)生僵尸進程),才會結束,

#2 主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味著進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢后才能結束。
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('ly',))
    t.setDaemon(True) #必須在t.start()之前設置
    t.start()

    print('主線程')
    print(t.is_alive())
    '''
    主線程
    True
    '''

迷惑人的例子

from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")

七、Python GIL(Global Interpreter Lock)

1、介紹

'''
定義:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 

because CPython's memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
'''

結論:在Cpython解釋器中,同一個進程下開啟的多線程,同一時刻只能有一個線程執(zhí)行,無法利用多核優(yōu)勢

首先需要明確的一點是 GIL 并不是 Python 的特性,它是在實現(xiàn)Python解析器(CPython)時所引入的一個概念。就好比 C++ 是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執(zhí)行代碼。有名的編譯器,例如:GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執(zhí)行環(huán)境來執(zhí)行。像其中的JPython就沒有 GIL 。然而因為 CPython 是大部分環(huán)境下默認的Python執(zhí)行環(huán)境。所以在很多人的概念里 CPython 就是 Python ,也就想當然的把 GIL歸結為Python語言的缺陷。所以這里要先明確一點: GIL 并不是 Python 的特性,Python完全可以不依賴于GIL。

2、GIL介紹

GIL本質(zhì)就是一把互斥鎖,既然是互斥鎖,所有互斥鎖的本質(zhì)都一樣,都是將并發(fā)運行變成串行,以此來控制同一時間內(nèi)共享數(shù)據(jù)只能被一個任務所修改,進而保證數(shù)據(jù)安全。

可以肯定的一點是:保護不同的數(shù)據(jù)的安全,就應該加不同的鎖。

要想了解GIL,首先確定一點:每次執(zhí)行python程序,都會產(chǎn)生一個獨立的進程。例如python test.py,python aaa.py,python bbb.py會產(chǎn)生3個不同的python進程

'''
# 驗證python test.py只會產(chǎn)生一個進程
# test.py內(nèi)容
import os,time
print(os.getpid())
time.sleep(1000)
'''
python3 test.py 
# 在windows下
tasklist |findstr python

# 在linux下
ps aux |grep python

在一個python的進程內(nèi),不僅有test.py的主線程或者由該主線程開啟的其他線程,還有解釋器開啟的垃圾回收等解釋器級別的線程,總之,所有線程都運行在這一個進程內(nèi),毫無疑問

#1 所有數(shù)據(jù)都是共享的,這其中,代碼作為一種數(shù)據(jù)也是被所有線程共享的(test.py的所有代碼以及Cpython解釋器的所有代碼)
例如:test.py定義一個函數(shù)work(代碼內(nèi)容如下圖),在進程內(nèi)所有線程都能訪問到work的代碼,于是我們可以開啟三個線程然后target都指向該代碼,能訪問到意味著就是可以執(zhí)行。

#2 所有線程的任務,都需要將任務的代碼當做參數(shù)傳給解釋器的代碼去執(zhí)行,即所有的線程要想運行自己的任務,首先需要解決的是能夠訪問到解釋器的代碼。

綜上:

如果多個線程的target=work,那么執(zhí)行流程是,多個線程先訪問到解釋器的代碼,即拿到執(zhí)行權限,然后將target的代碼交給解釋器的代碼去執(zhí)行

解釋器的代碼是所有線程共享的,所以垃圾回收線程也可能訪問到解釋器的代碼而去執(zhí)行,這就導致了一個問題:對于同一個數(shù)據(jù)100,可能線程1執(zhí)行x=100的同時,而垃圾回收執(zhí)行的是回收100的操作,解決這種問題沒有什么高明的方法,就是加鎖處理,如下圖的GIL,保證python解釋器同一時間只能執(zhí)行一個任務的代碼

3、GIL與Lock

GIL保護的是解釋器級的數(shù)據(jù),保護用戶自己的數(shù)據(jù)則需要自己加鎖處理,如下圖

4、GIL與多線程

有了GIL的存在,同一時刻同一進程中只有一個線程被執(zhí)行

聽到這里,有的同學立馬質(zhì)問:進程可以利用多核,但是開銷大,而python的多線程開銷小,但卻無法利用多核優(yōu)勢,也就是說python沒用了,php才是最牛逼的語言?

要解決這個問題,我們需要在幾個點上達成一致:

#1. cpu到底是用來做計算的,還是用來做I/O的?

#2. 多cpu,意味著可以有多個核并行完成計算,所以多核提升的是計算性能#3. 每個cpu一旦遇到I/O阻塞,仍然需要等待,所以多核對I/O操作沒什么用處

一個工人相當于cpu,此時計算相當于工人在干活,I/O阻塞相當于為工人干活提供所需原材料的過程,工人干活的過程中如果沒有原材料了,則工人干活的過程需要停止,直到等待原材料的到來。

如果你的工廠干的大多數(shù)任務都要有準備原材料的過程(I/O密集型),那么你有再多的工人,意義也不大,還不如一個人,在等材料的過程中讓工人去干別的活,反過來講,如果你的工廠原材料都齊全,那當然是工人越多,效率越高

結論:

對計算來說,cpu越多越好,但是對于I/O來說,再多的cpu也沒用

當然對運行一個程序來說,隨著cpu的增多執(zhí)行效率肯定會有所提高(不管提高幅度多大,總會有所提高),這是因為一個程序基本上不會是純計算或者純I/O,所以我們只能相對的去看一個程序到底是計算密集型還是I/O密集型,從而進一步分析python的多線程到底有無用武之地

# 分析:我們有四個任務需要處理,處理方式肯定是要玩出并發(fā)的效果,解決方案可以是:
方案一:開啟四個進程
方案二:一個進程下,開啟四個線程

# 單核情況下,分析結果:如果四個任務是計算密集型,沒有多核來并行計算,方案一徒增了創(chuàng)建進程的開銷,方案二勝
如果四個任務是I/O密集型,方案一創(chuàng)建進程的開銷大,且進程的切換速度遠不如線程,方案二勝

# 多核情況下,分析結果:如果四個任務是計算密集型,多核意味著并行計算,在python中一個進程中同一時刻只有一個線程執(zhí)行用不上多核,方案一勝
如果四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝

# 結論:現(xiàn)在的計算機基本上都是多核,python對于計算密集型的任務開多線程的效率并不能帶來多大性能上的提升,甚至不如串行(沒有大量切換),但是,對于IO密集型的任務效率還是有顯著提升的。

5、多線程性能測試

計算密集型:多進程效率高

from multiprocessing import Process
from threading import Thread
import os,time
def work():
    res=0
    for i in range(100000000):
        res*=i

if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) # 本機為4核
    start=time.time()
    for i in range(4):
        p=Process(target=work) # 耗時5s多
        p=Thread(target=work)  # 耗時18s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))

I/O密集型:多線程效率高

from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
    time.sleep(2)
    print('===>')

if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) # 本機為4核
    start=time.time()
    for i in range(400):
        # p=Process(target=work) # 耗時12s多,大部分時間耗費在創(chuàng)建進程上
        p=Thread(target=work)    # 耗時2s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))

應用:

多線程用于IO密集型,如:socket,爬蟲,web 多進程用于計算密集型,如:金融分析

6、CPU 和 GIL 必須都具備才可以執(zhí)行代碼

拿到 CPU 權限 -> 拿到 GIL 解釋器鎖 -> 執(zhí)行代碼

在 Python 3.2 之后 GIL 有了新的實現(xiàn),目的是為了解決 That GIL Thrashing 問題,這是Antoine Pitrou 的功勞

7、GIL 解釋器鎖會在兩種情況下釋放

1.主動釋放

遇到 IO 操作或者分配的 CPU 時間片到時間了。

注意,GIL存在的意義在于維護線程安全,x=10涉及到IO操作,如果也被當成普通的IO操作,主動交出GIL,那么一定會出現(xiàn)數(shù)據(jù)不安全問題,所以x=10一定是被區(qū)分對待了。

至于x=10如何實現(xiàn)的被區(qū)分對待,這其實很好理解,任何的io操作都是向操作系統(tǒng)發(fā)送系統(tǒng)調(diào)用,即調(diào)用操作系統(tǒng)的某一接口實現(xiàn)的,比如變量賦值操作肯定是調(diào)用了一種接口,文件讀寫操作肯定也是調(diào)用了一種接口,網(wǎng)絡io也是調(diào)用了某一種接口,這就給區(qū)分對待提供了實現(xiàn)的依據(jù),即變量賦值操作并不屬于主動釋放的范疇,這樣GIL在線程安全方面才會有所作為。

2.被動釋放

python3.2之后定義了一個全局變量

/ Python/ceval.c /*
...
static volatile int gil_drop_request = 0;

注意當只有一個線程時,該線程會一直運行,不會釋放GIL,當有多個線程時

例如:thead1,thread2

如果thread1一直沒有主動釋放掉GIL,那肯定不會讓他一直運行下去啊,實際上在thread1運行的過程時,thread2就會執(zhí)行一個cv_wait(gil,TIMEOUT)的函數(shù),(默認TIMEOUT值為5milliseconds,但是可以修改),一旦到了時間,就會將全局變量

gil_drop_request = 1;線程thread1就會被強制釋放GIL,然后線程thread2開始運行并返回一個ack給線程thread1,線程thread1開始調(diào)用cv_wait(gil,TIMEOUT)

八、同步鎖

三個需要注意的點:
#1.線程搶的是GIL鎖,GIL鎖相當于執(zhí)行權限,拿到執(zhí)行權限后才能拿到互斥鎖Lock,其他線程也可以搶到GIL,但如果發(fā)現(xiàn)Lock仍然沒有被釋放則阻塞,即便是拿到執(zhí)行權限GIL也要立刻交出來#2.join是等待所有,即整體串行,而鎖只是鎖住修改共享數(shù)據(jù)的部分,即部分串行,要想保證數(shù)據(jù)安全的根本原理在于讓并發(fā)變成串行,join與互斥鎖都可以實現(xiàn),毫無疑問,互斥鎖的部分串行效率要更高#3. 一定要看本小節(jié)最后的GIL與互斥鎖的經(jīng)典分析

GIL VS Lock

首先我們需要達成共識:鎖的目的是為了保護共享的數(shù)據(jù),同一時間只能有一個線程來修改共享的數(shù)據(jù)

然后,我們可以得出結論:保護不同的數(shù)據(jù)就應該加不同的鎖。

最后,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數(shù)據(jù)不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數(shù)據(jù),比如垃圾回收的數(shù)據(jù)),后者是保護用戶自己開發(fā)的應用程序的數(shù)據(jù),很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

過程分析:所有線程搶的是GIL鎖,或者說所有線程搶的是執(zhí)行權限

線程1搶到GIL鎖,拿到執(zhí)行權限,開始執(zhí)行,然后加了一把Lock,還沒有執(zhí)行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執(zhí)行,執(zhí)行過程中發(fā)現(xiàn)Lock還沒有被線程1釋放,于是線程2進入阻塞,被奪走執(zhí)行權限,有可能線程1拿到GIL,然后正常執(zhí)行到釋放Lock。。。這就導致了串行運行的效果

既然是串行,那我們執(zhí)行

t1.start()

t1.join

t2.start()

t2.join()

這也是串行執(zhí)行啊,為何還要加Lock呢,需知join是等待t1所有的代碼執(zhí)行完,相當于鎖住了t1的所有代碼,而Lock只是鎖住一部分操作共享數(shù)據(jù)的代碼。

詳細

因為Python解釋器幫你自動定期進行內(nèi)存回收,你可以理解為python解釋器里有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內(nèi)存數(shù)據(jù)是可以被清空的,此時你自己的程序里的線程和py解釋器自己的線程是并發(fā)運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內(nèi)存空間賦值了,結果就有可能新賦值的數(shù)據(jù)被刪除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,  這可以說是Python早期版本的遺留問題。
from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果可能為99

鎖通常被用來實現(xiàn)對共享資源的同步訪問。為每一個共享資源創(chuàng)建一個Lock對象,當你需要訪問該資源時,調(diào)用acquire方法來獲取鎖對象(如果其它線程已經(jīng)獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完后,再調(diào)用release方法釋放鎖:

import threading

R=threading.Lock()

R.acquire()
'''
對公共數(shù)據(jù)的操作
'''
R.release()
from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果肯定為0,由原來的并發(fā)執(zhí)行變成串行,犧牲了執(zhí)行效率保證了數(shù)據(jù)安全

GIL鎖與互斥鎖綜合分析(重點?。。。?/p>

分析:

#1. 100個線程去搶GIL鎖,即搶執(zhí)行權限

#2. 肯定有一個線程先搶到GIL(暫且稱為線程1),然后開始執(zhí)行,一旦執(zhí)行就會拿到lock.acquire()

#3. 極有可能線程1還未運行完畢,就有另外一個線程2搶到GIL,然后開始運行,但線程2發(fā)現(xiàn)互斥鎖lock還未被線程1釋放,于是阻 塞,被迫交出執(zhí)行權限,即釋放GIL

#4. 直到線程1重新?lián)尩紾IL,開始從上次暫停的位置繼續(xù)執(zhí)行,直到正常釋放互斥鎖lock,然后其他的線程再重復2 3 4的過程

互斥鎖與join的區(qū)別(重點?。。。?/strong>

# 不加鎖:并發(fā)執(zhí)行,速度快,數(shù)據(jù)不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''

# 不加鎖:未加鎖部分并發(fā)執(zhí)行,加鎖部分串行執(zhí)行,速度慢,數(shù)據(jù)安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    # 未加鎖的代碼并發(fā)運行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    # 加鎖的代碼串行運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

# 有的同學可能有疑問:既然加鎖會讓運行變成串行,那么我在start之后立即使用join,就不用加鎖了啊,也是串行的效果啊
# 沒錯:在start之后立刻使用jion,肯定會將100個任務的執(zhí)行變成串行,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是
# start后立即join:任務內(nèi)的所有代碼都是串行執(zhí)行的,而加鎖,只是加鎖的部分即修改共享數(shù)據(jù)的部分是串行的
# 單從保證數(shù)據(jù)安全方面,二者都可以實現(xiàn),但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1

if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多么的恐怖
'''

九、死鎖現(xiàn)象與遞歸鎖

進程也有死鎖與遞歸鎖,在進程那里忘記說了,放到這里一切說了額

所謂死鎖: 是指兩個或兩個以上的進程或線程在執(zhí)行過程中,因爭奪資源而造成的一種互相等待的現(xiàn)象,若無外力作用,它們都將無法推進下去。此時稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖
然后就卡住,死鎖了
'''

解決方法,遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內(nèi)部維護著一個Lock和一個counter變量,counter記錄了acquire的次數(shù),從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發(fā)生死鎖:

mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內(nèi)又碰到加鎖的情況,則counter繼續(xù)加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止

十、信號量Semaphore

同進程的一樣,Semaphore管理一個內(nèi)置的計數(shù)器, 每當調(diào)用acquire()時內(nèi)置計數(shù)器-1; 調(diào)用release() 時內(nèi)置計數(shù)器+1; 計數(shù)器不能小于0;當計數(shù)器為0時,acquire()將阻塞線程直到其他線程調(diào)用release()。

實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數(shù)為5):

from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

與進程池是完全不同的概念,進程池Pool(4),最大只能產(chǎn)生4個進程,而且從頭到尾都只是這四個進程,不會產(chǎn)生新的,而信號量是產(chǎn)生一堆線程/進程

十一、Event

同進程的一樣

線程的一個關鍵特性是每個線程都是獨立運行且狀態(tài)不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態(tài)來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發(fā)生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經(jīng)被設置為真的Event對象,那么它將忽略這個事件, 繼續(xù)執(zhí)行

event.isSet():返回event的狀態(tài)值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態(tài)值為True,所有阻塞池的線程激活進入就緒狀態(tài), 等待操作系統(tǒng)調(diào)度;

event.clear():恢復event的狀態(tài)值為False。

例如,有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。那么我們就可以采用threading.Event機制來協(xié)調(diào)各個工作線程的連接操作

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('鏈接超時')
        print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>鏈接成功' %threading.current_thread().getName())

def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

十二、條件Condition(了解)

使得線程等待,只有滿足某條件時,才釋放n個線程

import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret

def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

十三、定時器

定時器,指定n秒后執(zhí)行某操作

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

驗證碼定時器

from threading import Timer
import random,time

class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self,interval=5):
        self.cache=self.make_code()
        print(self.cache)
        self.t=Timer(interval,self.make_cache)
        self.t.start()

    def make_code(self,n=4):
        res=''
        for i in range(n):
            s1=str(random.randint(0,9))
            s2=chr(random.randint(65,90))
            res+=random.choice([s1,s2])
        return res

    def check(self):
        while True:
            inp=input('>>: ').strip()
            if inp.upper() ==  self.cache:
                print('驗證成功',end='\n')
                self.t.cancel()
                break

if __name__ == '__main__':
    obj=Code()
    obj.check()

十四、線程queue

queue隊列 :使用import queue,用法與進程Queue一樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

• class queue.Queue(maxsize=0) 先進先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''
  • class queue.LifoQueue(maxsize=0) 先進先出 # last in fisrt out 后進先出
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(后進先出):
third
second
first
'''
  • class queue.PriorityQueue(maxsize=0) # 存儲數(shù)據(jù)時可設置優(yōu)先級的隊列
import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優(yōu)先級(通常是數(shù)字,也可以是非數(shù)字之間的比較),數(shù)字越小優(yōu)先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數(shù)字越小優(yōu)先級越高,優(yōu)先級高的優(yōu)先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

其他 

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

構造一個優(yōu)先級隊列,其中maxsize是一個整數(shù),用于設置可以放入隊列的項目數(shù)量的上限.一旦達到這個上限,插入就會阻塞,直到隊列中有項目被消耗。如果maxsize小于或等于0,則隊列長度為無窮大。

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

首先檢索最低值的條目(最低值的條目是指列表經(jīng)過排序后取到的索引為0的那個元素,一般條目是(優(yōu)先級數(shù)字,數(shù)據(jù))這種元組的形式

exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

當表示非阻塞的get()或get_nowait()在一個空的隊列對象中被調(diào)用時,會拋出異常

exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

當表示非阻塞的put()或put_nowait()在一個滿的隊列對象中被調(diào)用時,會拋出異常

Queue.qsize()
Queue.empty() #return True if empty  

當隊列為空返回True

Queue.full() # return True if full 

當隊列為滿返回True

Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

將一個項放入隊列。如果可選參數(shù)block為true并且timeout為None(默認值),則在必要時阻塞,直到有空閑槽可用。如果參數(shù)timeout是一個正數(shù),它最多阻塞timeout秒,如果在這段時間內(nèi)沒有可用的空閑槽,則會引發(fā)Full異常。否則(block為false),如果有空閑槽可用,則將一個項目放入隊列中,否則引發(fā)Full異常(在這種情況下,timeout被忽略)。

Queue.put_nowait(item)
Equivalent to put(item, False).

Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

從隊列中移除并返回一個項。如果可選參數(shù)block為true并且timeout為None(默認值),則在必要時阻塞,直到有可用的項。如果timeout為正數(shù),則最多阻塞timeout秒,如果在該時間內(nèi)沒有可用項,則拋出Empty異常。否則(block為false),如果一個項目可用,則返回那個項目,否則引發(fā)Empty異常(在這種情況下,timeout被忽略)。

Queue.get_nowait()
Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

提供了兩種方法來支持追蹤進入隊列的任務是否已被生產(chǎn)者的守護線程完全處理。

Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

假定先前進入隊列的任務已完成。并且被隊列生產(chǎn)者使用。對于每個用于獲取任務的get(),后續(xù)對task_done()的調(diào)用都會告訴隊列任務的處理已經(jīng)完成。

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

如果join()當前正被阻塞,它將在所有項都被處理完時恢復(這意味著對于每個已經(jīng)put()到隊列中的項都接收到task_done()調(diào)用)。

Raises a ValueError if called more times than there were items placed in the queue.

如果調(diào)用次數(shù)超過放入隊列的項數(shù),將引發(fā)ValueError。

Queue.join() 

阻塞,直到queue被消費完畢

十五、Python標準模塊–concurrent.futures

# 1 介紹
concurrent.futures模塊提供了高度封裝的異步調(diào)用接口
ThreadPoolExecutor:線程池,提供異步調(diào)用
ProcessPoolExecutor: 進程池,提供異步調(diào)用
Both implement the same interface, which is defined by the abstract Executor class.

# 2 基本方法
# submit(fn, *args, **kwargs)
異步提交任務

# map(func, *iterables, timeout=None, chunksize=1) 
取代for循環(huán)submit的操作

# shutdown(wait=True) 
相當于進程池的pool.close()+pool.join()操作
wait=True,等待池內(nèi)所有任務執(zhí)行完畢回收完資源后才繼續(xù)
wait=False,立即返回,并不會等待池內(nèi)的任務執(zhí)行完畢
但不管wait參數(shù)為何值,整個程序都會等到所有任務執(zhí)行完畢
submit和map必須在shutdown之前

# result(timeout=None)
取得結果

# add_done_callback(fn)
回調(diào)函數(shù)

ProcessPoolExecutor

# 介紹
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

ProcessPoolExecutor類是Executor的子類,它使用一個進程池來異步執(zhí)行調(diào)用。ProcessPoolExecutor會調(diào)用多進程模塊,這允許它避開全局解釋器鎖,但也意味著只能執(zhí)行和返回可pickle的對象。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
能夠異步調(diào)用數(shù)量不超過參數(shù)max_workers的子進程,如果max_workers為None或未給出,則默認值為機器上的處理器數(shù)。如果max_workers小于或等于0,則會拋出異常ValueError

# 用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

ThreadPoolExecutor

#介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
ThreadPoolExecutor是Executor的一個子類,可以異步調(diào)用線程池里的線程
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
能夠異步調(diào)用數(shù)量不超過參數(shù)max_workers的子進程.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
3.5版本中的變化:如果max_workers為None或者沒有被指定,它將默認為計算機的處理器個數(shù)乘以5,假設ThreadPoolExecutor(線程池)通常用于重復I / O操作而不是CPU的計算,那么它的實際效率會低于ProcessPoolExecutor(進程池)

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.
3.6版本中新增功能:添加了thread_name_prefix參數(shù),允許用戶更方便地控制線程??梢宰远x由線程池創(chuàng)建的線程名,以便于調(diào)試。

#用法
與ProcessPoolExecutor相同

map的用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit
回調(diào)函數(shù)
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果

總結 

到此這篇關于Python并發(fā)多線程的文章就介紹到這了,更多相關Python并發(fā)多線程操作內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 在python3中使用shuffle函數(shù)要注意的地方

    在python3中使用shuffle函數(shù)要注意的地方

    今天小編就為大家分享一篇在python3中使用shuffle函數(shù)要注意的地方,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-02-02
  • 解決Shell執(zhí)行python文件,傳參空格引起的問題

    解決Shell執(zhí)行python文件,傳參空格引起的問題

    今天小編就為大家分享一篇解決Shell執(zhí)行python文件,傳參空格引起的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-10-10
  • Python在字典中獲取帶權重的隨機值實現(xiàn)方式

    Python在字典中獲取帶權重的隨機值實現(xiàn)方式

    這篇文章主要介紹了Python在字典中獲取帶權重的隨機值,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-11-11
  • Python基于scipy實現(xiàn)信號濾波功能

    Python基于scipy實現(xiàn)信號濾波功能

    本文將以實戰(zhàn)的形式基于scipy模塊使用Python實現(xiàn)簡單濾波處理。這篇文章主要介紹了Python基于scipy實現(xiàn)信號濾波功能,需要的朋友可以參考下
    2019-05-05
  • python GUI計算器的實現(xiàn)

    python GUI計算器的實現(xiàn)

    這篇文章主要介紹了python gui計算器的實現(xiàn),幫助大家更好的理解和學習python gui編程,感興趣的朋友可以了解下
    2020-10-10
  • 利用Python實現(xiàn)繪制3D愛心的代碼分享

    利用Python實現(xiàn)繪制3D愛心的代碼分享

    最近你是否也被李峋的愛心跳動代碼所感動,心動不如行動,相同的代碼很多,我們今天換一個玩法!構建一個三維的跳動愛心!嗯!這篇博客本著開源的思想!不是說誰對浪漫過敏的
    2022-11-11
  • 在Django的URLconf中使用多個視圖前綴的方法

    在Django的URLconf中使用多個視圖前綴的方法

    這篇文章主要介紹了在Django的URLconf中使用多個視圖前綴的方法,Django是Python中最為著名的遵循MVC結構的開發(fā)框架,需要的朋友可以參考下
    2015-07-07
  • Python學習之自定義異常詳解

    Python學習之自定義異常詳解

    這篇文章主要為大家介紹了Python中如何自定義異常,以及自定義拋出異常的關鍵字—raise的用法,文中示例代碼講解詳細,感興趣的小伙伴可以了解一下
    2022-03-03
  • pytorch  RNN參數(shù)詳解(最新)

    pytorch  RNN參數(shù)詳解(最新)

    這篇文章主要介紹了pytorch  RNN參數(shù)詳解,這個示例代碼展示了如何使用 PyTorch 定義和訓練一個 LSTM 模型,并詳細解釋了每個類和方法的參數(shù)及其作用,需要的朋友可以參考下
    2024-06-06
  • python 第三方庫的安裝及pip的使用詳解

    python 第三方庫的安裝及pip的使用詳解

    下面小編就為大家?guī)硪黄猵ython 第三方庫的安裝及pip的使用詳解。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-05-05

最新評論