Python3的進(jìn)程和線程你了解嗎
1.概述
""" 基礎(chǔ)知識(shí): 1.多任務(wù):操作系統(tǒng)可以同時(shí)運(yùn)行多個(gè)任務(wù); 2.單核CPU執(zhí)行多任務(wù):操作系統(tǒng)輪流讓各個(gè)任務(wù)交替執(zhí)行; 3.一個(gè)任務(wù)即一個(gè)進(jìn)程(process),如:打開一個(gè)瀏覽器,即啟動(dòng)一個(gè)瀏覽器進(jìn)程; 4.在一個(gè)進(jìn)程內(nèi),要同時(shí)干多件事,需要同時(shí)運(yùn)行多個(gè)子任務(wù),把進(jìn)程內(nèi)的子任務(wù)稱為"線程(Thread)"; 5.每個(gè)進(jìn)程至少做一件事,因此,一個(gè)進(jìn)程至少有一個(gè)線程; 同時(shí)執(zhí)行多線程的解決方案: a.啟動(dòng)多個(gè)進(jìn)程,每個(gè)進(jìn)程雖然只有一個(gè)線程,但多個(gè)進(jìn)程可以一塊執(zhí)行多個(gè)任務(wù); b.啟動(dòng)一個(gè)進(jìn)程,在一個(gè)進(jìn)程內(nèi)啟動(dòng)多個(gè)線程,多個(gè)線程一塊執(zhí)行多個(gè)任務(wù); c.啟動(dòng)多個(gè)進(jìn)程,每個(gè)進(jìn)程啟動(dòng)多個(gè)線程; 即多任務(wù)的實(shí)現(xiàn)方式: a.多進(jìn)程模式; b.多線程模式; c.多進(jìn)程+多線程模式; """
2.多進(jìn)程
import os print("Process (%s) start..." % os.getpid()) """ 只能在Linux/Unix/Mac上工作 pid = os.fork() if pid == 0: print("I am child process (%s) and my parent is %s." % (os.getpid(), os.getppid())) else: print("I (%s) just created a child process (%s)." % (os.getpid(), pid)) """ print("Hello.")
# multiprocessing:跨平臺(tái)多線程模塊 # process_test.py文件,在交互下python process_test.py from multiprocessing import Process import os def run_process(name): print("Run child process %s (%s)..." % (name, os.getpid())) if __name__ == "__main__": print("Parent process %s." % os.getpid()) p = Process(target = run_process, args = ("test",)) print("Child process will start.") p.start() p.join() # join()方法可以等待子進(jìn)程結(jié)束后再繼續(xù)往下運(yùn)行,用于進(jìn)程間的同步 print("Child process end.")
# 結(jié)果輸出:
Parent process 28340.
Child process will start.
Run child process test (31152)...
Child process end.
# Pool:用進(jìn)程池批量創(chuàng)建子進(jìn)程 # process.py文件,交互下python process.py from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.')
# 結(jié)果輸出:
Parent process 31576.
Waiting for all subprocesses done...
Run task 0 (20416)...
Run task 1 (15900)...
Run task 2 (24716)...
Run task 3 (31148)...
Task 2 runs 0.72 seconds.
Run task 4 (24716)...
Task 4 runs 1.03 seconds.
Task 3 runs 1.82 seconds.
Task 1 runs 2.73 seconds.
Task 0 runs 2.82 seconds.
All subprocesses done.
3.子進(jìn)程
# subprocess模塊:啟動(dòng)一個(gè)子進(jìn)程,控制其輸入和輸出 # subprocess_test.py文件,注:文件名不要和模塊名相同,否則報(bào)錯(cuò) import subprocess print("$ nslookup www.python.org") r = subprocess.call(["nslookup", "www.python.org"]) print("Exit code:", r)
# 結(jié)果輸出:
$ nslookup www.python.org
服務(wù)器: cache-a.guangzhou.gd.cn
Address: 202.96.128.86
非權(quán)威應(yīng)答:
名稱: www.python.org
Addresses: 2a04:4e42:1a::223
151.101.72.223
Exit code: 0
# 子進(jìn)程需要輸入,通過communicate()方法 import subprocess print("$ nslookup") p = subprocess.Popen(["nslookup"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) output, err = p.communicate(b"set q = mx\npython.org\nexit\n") print(output.decode("gbk")) print("Exit code:", p.returncode)
# 結(jié)果輸出:
$ nslookup
默認(rèn)服務(wù)器: cache-a.guangzhou.gd.cn
Address: 202.96.128.86
> Unrecognized command: set q = mx
> 服務(wù)器: cache-a.guangzhou.gd.cn
Address: 202.96.128.86
名稱: python.org
Address: 138.197.63.241
>
Exit code: 0
4.進(jìn)程間通信
# 在父進(jìn)程中創(chuàng)建兩個(gè)子進(jìn)程,一個(gè)往Queue里寫數(shù)據(jù),一個(gè)從Queue里讀數(shù)據(jù) # queue_test.py文件,交互下python queue_test.py from multiprocessing import Process, Queue import os, time, random def write(q): print("Process to write:%s" % os.getpid()) for value in ["W", "I", "L", "L", "A", "R", "D"]: print("Put %s to queue..." % value) q.put(value) time.sleep(random.random()) def read(q): print("Process to read:%s" % os.getpid()) while True: value = q.get(True) print("Get %s from queue." % value) if __name__ == "__main__": # 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程 q = Queue() pw = Process(target = write, args = (q,)) pr = Process(target = read, args = (q,)) # 啟動(dòng)子進(jìn)程pw,寫入 pw.start() # 啟動(dòng)子進(jìn)程pr,讀取 pr.start() # 等待pw結(jié)束 pw.join() # pr進(jìn)程是死循環(huán),無法等待其結(jié)束,需要強(qiáng)行終止 pr.terminate()
# 結(jié)果輸出:
Process to write:15720
Process to read:21524
Put W to queue...
Get W from queue.
Put I to queue...
Get I from queue.
Put L to queue...
Get L from queue.
Put L to queue...
Get L from queue.
Put A to queue...
Get A from queue.
Put R to queue...
Get R from queue.
Put D to queue...
Get D from queue.
5.多線程
# 線程庫:_thread和threading # 啟動(dòng)一個(gè)線程:即把一個(gè)函數(shù)傳入并創(chuàng)建一個(gè)Thread實(shí)例,然后調(diào)用start()開始執(zhí)行 # 任何進(jìn)程默認(rèn)啟動(dòng)一個(gè)線程,該線程稱為主線程,主線程可以啟動(dòng)新的線程 # current_thread()函數(shù):返回當(dāng)前線程的實(shí)例; # 主線程實(shí)例名字:MainThread; # 子線程名字的創(chuàng)建時(shí)指定,如果不指定,則自動(dòng)給線程命名為Thread-1、Thread-2... import time, threading def loop(): print("Thread %s is running..." % threading.current_thread().name) n = 0 while n < 5: n = n + 1 print("Thread %s >>> %s" % (threading.current_thread().name, n)) time.sleep(1) print("Thread %s ended." % threading.current_thread().name) print("Thread %s is running..." % threading.current_thread().name) thread1 = threading.Thread(target = loop, name = "LoopThread") thread1.start() thread1.join() print("Thread %s ended." % threading.current_thread().name)
# 結(jié)果輸出:
Thread MainThread is running...
Thread LoopThread is running...
Thread LoopThread >>> 1
Thread LoopThread >>> 2
Thread LoopThread >>> 3
Thread LoopThread >>> 4
Thread LoopThread >>> 5
Thread LoopThread ended.
Thread MainThread ended.
6.Lock
# 多進(jìn)程:同一個(gè)變量,各自有一份拷貝存在于每個(gè)進(jìn)程中,互不影響; # 多線程:所有變量由所有線程共享,任何一個(gè)變量可以被任何一個(gè)線程修改; # 多線程同時(shí)操作一個(gè)變量 # 多運(yùn)行幾次,發(fā)現(xiàn)結(jié)果不為0 import time, threading balance = 0 def change_it(n): global balance balance = balance + n balance = balance - n def run_thread(n): # 線程交替執(zhí)行,balance結(jié)果不一定為0 for i in range(2000000): change_it(n) thread1 = threading.Thread(target = run_thread, args = (5,)) thread2 = threading.Thread(target = run_thread, args = (8,)) thread1.start() thread2.start() thread1.join() thread2.join() print(balance) # 結(jié)果輸出: # 5(各自不同)
# 確保balance計(jì)算正確,需要給change_it()上一把鎖 # 當(dāng)線程開始執(zhí)行change_it()時(shí),該線程獲得鎖,其他線程不能同時(shí)執(zhí)行change_it(), # 只能等待,直到鎖被釋放,獲得該鎖后才能改; # 通過threading.Lock()創(chuàng)建鎖 import time, threading balance = 0 lock = threading.Lock() def change_it(n): global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(2000000): lock.acquire() try: change_it(n) finally: # 釋放鎖 lock.release() thread1 = threading.Thread(target = run_thread, args = (5,)) thread2 = threading.Thread(target = run_thread, args = (8,)) thread1.start() thread2.start() thread1.join() thread2.join() print(balance) # 結(jié)果輸出: # 0
7.ThreadLocal
# 多線程環(huán)境下,每個(gè)線程有自己的數(shù)據(jù); # 一個(gè)線程使用自己的局部變量比使用全局變量好; import threading # 創(chuàng)建全局ThreadLocal對(duì)象 local_school = threading.local() def process_student(): # 獲取當(dāng)前線程關(guān)聯(lián)的student std = local_school.student print("Hello,%s (in %s)" % (std, threading.current_thread().name)) def process_thread(name): # 綁定ThreadLocal的student local_school.student = name process_student() thread1 = threading.Thread(target = process_thread, args = ("Willard",), name = "Thread-1") thread2 = threading.Thread(target = process_thread, args = ("WenYu",), name = "Thread-2") thread1.start() thread2.start() thread1.join() thread2.join()
# 結(jié)果輸出:
# Hello,Willard (in Thread-1)
# Hello,WenYu (in Thread-2)
8.進(jìn)程VS線程
# 進(jìn)程和線程優(yōu)缺點(diǎn): # 1.要實(shí)現(xiàn)多任務(wù),會(huì)設(shè)計(jì)Master-Worker模式,Master負(fù)責(zé)分配任務(wù),Worker負(fù)責(zé)執(zhí)行任務(wù), # 在多任務(wù)環(huán)境下,通常是一個(gè)Master,多個(gè)Worker; # a.如果使用多進(jìn)程實(shí)現(xiàn)Master-Worker,主進(jìn)程即Master,其他進(jìn)程即Worker; # b.如果使用多線程實(shí)現(xiàn)Master-Worker,主線程即Master,其他線程即Worker; # 2.多進(jìn)程優(yōu)點(diǎn):穩(wěn)定性高,一個(gè)子進(jìn)程崩潰不會(huì)影響主進(jìn)程和其他子進(jìn)程; # 3.多進(jìn)程缺點(diǎn):創(chuàng)建進(jìn)程的代價(jià)大,操作系統(tǒng)能同時(shí)運(yùn)行的進(jìn)程數(shù)有限; # 4.多線程缺點(diǎn):任何一個(gè)線程崩潰,可能直接造成整個(gè)進(jìn)程崩潰; # 線程切換: # 1.依次完成任務(wù)的方式稱為單任務(wù)模型,或批處理任務(wù)模型; # 2.任務(wù)1先做n分鐘,切換到任務(wù)2做n分鐘,再切換到任務(wù)3做n分鐘,依此類推,稱為多任務(wù)模型; # 計(jì)算密集型 VS IO密集型 # 1.計(jì)算密集型任務(wù):要進(jìn)行大量的計(jì)算,消耗CPU資源,如:對(duì)視頻進(jìn)行高清解碼等; # 2.IO密集型任務(wù):涉及到網(wǎng)絡(luò)、磁盤IO的任務(wù),均為IO密集型任務(wù); # 3.IO密集型任務(wù)消耗CPU少,大部分時(shí)間在等待IO操作完成; # 異步IO # 1.事件驅(qū)動(dòng)模型:用單進(jìn)程單線程模型來執(zhí)行多任務(wù); # 2.Python語言中,單線程的異步編程模型稱為協(xié)程;
9.分布式進(jìn)程
""" 實(shí)例: 有一個(gè)通過Queue通信的多進(jìn)程程序在同一機(jī)器上運(yùn)行,但現(xiàn)在處理任務(wù)的進(jìn)程任務(wù)繁重, 希望把發(fā)送任務(wù)的進(jìn)程和處理任務(wù)的進(jìn)程發(fā)布到兩臺(tái)機(jī)器上; """ # task_master_test.py # 交互環(huán)境中:python task_master_test.py import random, time, queue from multiprocessing.managers import BaseManager # 發(fā)送任務(wù)的隊(duì)列 task_queue = queue.Queue() # 接收結(jié)果的隊(duì)列 result_queue = queue.Queue() def return_task_queue(): global task_queue return task_queue def return_result_queue(): global task_queue return task_queue # 從BaseManager繼承的QueueManager class QueueManager(BaseManager): pass if __name__ == "__main__": # 把兩個(gè)Queue注冊(cè)到網(wǎng)絡(luò)上,callable參數(shù)關(guān)聯(lián)Queue對(duì)象 QueueManager.register("get_task_queue", callable = return_task_queue) QueueManager.register("get_result_queue", callable = return_result_queue) # 綁定端口5000,設(shè)置驗(yàn)證碼"Willard" manager = QueueManager(address = ("127.0.0.1", 5000), authkey = b"Willard") # 啟動(dòng)Queue manager.start() # 獲得通過網(wǎng)絡(luò)訪問的Queue對(duì)象 task = manager.get_task_queue() result = manager.get_result_queue() # 放任務(wù)進(jìn)去 for i in range(10): n = random.randint(0, 10000) print("Put task %d..." % n) task.put(n) # 從result隊(duì)列讀取結(jié)果 print("Try get results...") for i in range(10): r = result.get(timeout = 10) print("Result:%s" % r) # 關(guān)閉 manager.shutdown() print("Master Exit.")
# task_worker_test.py文件 # 交互環(huán)境python task_worker_test.py import time, sys, queue from multiprocessing.managers import BaseManager # 創(chuàng)建QueueManager class QueueManager(BaseManager): pass QueueManager.register("get_task_queue") QueueManager.register("get_result_queue") # 連接到服務(wù)器 server_address = "127.0.0.1" print("Connect to server %s..." % server_address) # 端口和驗(yàn)證碼 m = QueueManager(address = (server_address, 5000), authkey = b"Willard") # 網(wǎng)絡(luò)連接 m.connect() # 獲取Queue對(duì)象 task = m.get_task_queue() result = m.get_result_queue() # 從task隊(duì)列取任務(wù),把結(jié)果寫入result隊(duì)列 for i in range(10): try: n = task.get(timeout = 1) print("Run task %d * %d..." % (n, n)) r = "%d * %d = %d" % (n, n, n * n) time.sleep(1) result.put(r) except Queue.Empty: print("Task queue is empty.") print("Worker Exit.")
總結(jié)
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
python中從str中提取元素到list以及將list轉(zhuǎn)換為str的方法
今天小編就為大家分享一篇python中從str中提取元素到list以及將list轉(zhuǎn)換為str的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-06-06Python?Panda中索引和選擇?series?的數(shù)據(jù)
這篇文章主要介紹了Python?Panda中索引和選擇series的數(shù)據(jù),文章通過圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-09-09Python創(chuàng)建二維數(shù)組實(shí)例(關(guān)于list的一個(gè)小坑)
下面小編就為大家?guī)硪黄狿ython創(chuàng)建二維數(shù)組實(shí)例(關(guān)于list的一個(gè)小坑)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-11-11python+appium+yaml移動(dòng)端自動(dòng)化測試框架實(shí)現(xiàn)詳解
這篇文章主要介紹了python+appium+yaml移動(dòng)端自動(dòng)化測試框架實(shí)現(xiàn)詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11