欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

pytorch 數(shù)據(jù)預(yù)加載的實現(xiàn)示例

 更新時間:2023年12月05日 08:59:40   作者:SATAN 先生  
在PyTorch中,數(shù)據(jù)加載和預(yù)處理是深度學(xué)習(xí)中非常重要的一部分,本文主要介紹了pytorch 數(shù)據(jù)預(yù)加載的實現(xiàn)示例,具有一定的參考價值,感興趣的可以了解一下

1. Abstract

本文介紹一個工具 PreDataLoader,它包裝 torch.utils.data.DataLoader,接收該類的一個實例 loader,啟動一個線程 t,創(chuàng)建一個隊列 q,t 將 loader 中的數(shù)據(jù)預(yù)加載到隊列 q 中, 以在模型計算時也能啟動啟動數(shù)據(jù)加載程序, 節(jié)省數(shù)據(jù)加載時間。代碼:

class PreDataLoader(object):
	"""
	@Author: Yuwei from https://www.zhihu.com/people/aewil-zheng, with few changes

	** 包裝 torch.utils.data.DataLoader, 接收該類的一個實例 loader, 啟動一個線程 t, 創(chuàng)建一個隊列 q
	t 將 loader 中的數(shù)據(jù)預(yù)加載到隊列 q 中, 以在模型計算時也能啟動啟動數(shù)據(jù)加載程序, 節(jié)省數(shù)據(jù)加載時間

	** 若提供了 cuda device, 數(shù)據(jù)將直接被加載到 GPU 上
	"""

	def __init__(self, loader, device=None, queue_size=2):
		"""
		:param loader: torch.utils.data.DataLoader
		:param device: torch.device('cuda' or 'cpu'), to use cpu, set None
		:param queue_size: the number of samples to be preloaded
		"""
		self.__loader = loader
		self.__device = device
		self.__queue_size = queue_size

		self.__load_stream = torch.cuda.Stream(device=device) \
			if str(device).startswith('cuda') else None  # 如果提供了 cuda device, 則創(chuàng)建 cuda 流

		self.__queue = Queue(maxsize=self.__queue_size)
		self.__idx = 0
		self.__worker = Thread(target=self._load_loop)
		self.__worker.setDaemon(True)
		self.__worker.start()

	def _load_loop(self):
		""" 不斷的將數(shù)據(jù)加載到隊列里 """
		if str(self.__device).startswith('cuda'):
			logging.info(f'>>> data will be preloaded into device \'{self.__device}\'')
			logging.info(f'>>> this may cost more GPU memory!!!')
			# The loop that will load into the queue in the background
			torch.cuda.set_device(self.__device)
			while True:
				for sample in self.__loader:
					self.__queue.put(self._load_instance(sample))
		else:
			while True:
				for sample in self.__loader:
					self.__queue.put(sample)

	def _load_instance(self, sample):
		""" 將 batch 數(shù)據(jù)從 CPU 加載到 GPU 中 """
		if torch.is_tensor(sample):
			with torch.cuda.stream(self.__load_stream):
				return sample.to(self.__device, non_blocking=True)
		elif sample is None or type(sample) == str:
			return sample
		elif isinstance(sample, dict):
			return {k: self._load_instance(v) for k, v in sample.items()}
		else:
			return [self._load_instance(s) for s in sample]

	def __iter__(self):
		self.__idx = 0
		return self

	def __next__(self):
		# 加載線程掛了
		if not self.__worker.is_alive() and self.__queue.empty():
			self.__idx = 0
			self.__queue.join()
			self.__worker.join()
			raise StopIteration
		# 一個 epoch 加載完了
		elif self.__idx >= len(self.__loader):
			self.__idx = 0
			raise StopIteration
		# 下一個 batch
		else:
			out = self.__queue.get()
			self.__queue.task_done()
			self.__idx += 1
		return out

	def next(self):
		return self.__next__()

	def __len__(self):
		return len(self.__loader)

	@property
	def sampler(self):
		return self.__loader.sampler

	@property
	def dataset(self):
		return self.__loader.dataset

如果你對實現(xiàn)技術(shù)細(xì)節(jié)不感興趣,也可直接拿來用。后面我將對相關(guān)細(xì)節(jié)展開討論,包括:

  • python 中的并發(fā)與并行;
  • cuda 流:torch.cuda.Stream(device=device);

2. python 中的并發(fā)與并行

總所周知,由于 Global Interpreter Lock (GIL) 的存在,Python 語言中,任何時間點只有一個線程在執(zhí)行,即便在多核 CPU 上,Python 的多線程也無法實現(xiàn)真正的并行計算。

GIL 的原因在于 Python 的內(nèi)存管理并不是線程安全的。為了防止多個線程同時操作一個對象,造成數(shù)據(jù)混亂的問題,Python 設(shè)定了 GIL 來限制多線程的并發(fā)執(zhí)行。因此,盡管你可以在 Python 中創(chuàng)建多線程,并且看起來他們是同時運行的,但實質(zhì)上,在任一時刻,只有一個線程在執(zhí)行。

既然如此,上面代碼使用多線程是如何提高程序的效率的?再看:

然而,如果你的程序是 IO 密集型的,例如大量的網(wǎng)絡(luò)請求或文件讀寫操作,那么使用多線程還是能顯著提高程序的效率的,因為在等待 IO 的過程中,其他線程還可以繼續(xù)執(zhí)行。

數(shù)據(jù)的預(yù)加載應(yīng)該算是 IO 吧,那模型計算和數(shù)據(jù)加載能并行嗎?

2.1 Numpy 和 PyTorch 底層計算是多線程并行的

Numpy 的底層實現(xiàn)是 C 語言,計算速度和并發(fā)性遠勝于 Python,當(dāng)我們使用 numpy 進行計算時,特別是復(fù)雜的矩陣運算,Python 程序會把這個任務(wù)拋給底層的 C 語言進行計算,從而能夠使用 CPU 多核。驗證:

import time
import numpy as np

def dot():
	start = time.time()
	a = np.random.randn(10000, 10000)
	b = np.random.randn(10000, 10000)
	np.dot(a, b)
	end = time.time()
	print(end - start)

dot()

驗證代碼用 numpy.dot() 計算兩個 10000 10000 10000 維的矩陣乘法,觀察 CPU 的使用效率(i5-10400,6核心12線程),發(fā)現(xiàn) CPU 使用率很快從不足 20% 提升至 80% 左右。計算時間約為 15s。

為了確定是否真的使用了多核,再設(shè)計一個 Python 計算程序:

import time

def add():
	cnt = 1
	start = time.time()
	for i in range(500000000):  # 累加
		cnt += 1
	end = time.time()
	print(end - start)

add()

五億次加法運算,耗時約 20s,CPU 使用率全程維持在 20% 以下。如此說來,numpy 確實是在使用多核并行計算。

下面看一看 Python 多線程能不能使它們并行計算:

import threading
import time
import numpy as np

def dot():
	start = time.time()
	a = np.random.randn(10000, 10000)
	b = np.random.randn(10000, 10000)
	np.dot(a, b)
	end = time.time()
	print(end - start)

def add():
	cnt = 1
	start = time.time()
	for i in range(500000000):
		cnt += 1
	end = time.time()
	print(end - start)

t = threading.Thread(target=dot)

s = time.time()
add()
t.start()
t1.join()
e = time.time()
print(e - s)

輸出:

15.057043313980103
23.129913806915283
23.13091516494751

如果說整個程序只能同時使用一個 CPU 核,那么整體計算時間應(yīng)該是兩部分計算時間的和 35s 左右,但這里只用了 23s,可見 numpy 底層并行計算是實錘了。而且,這兩個函數(shù)的計算是并行的,即 np.dot() 在計算的時候,add() 也在計算。為什么 add 計算相比其單獨運行時多了 3s?而 np.dot() 計算時間基本沒變?

可以排除 CPU 資源不夠的可能,否則的話,np.dot() 的計算時間也要加長;再者我觀察了 CPU 利用率,全程未達到 100%。我覺得這是線程切換的開銷add() 可能不是一直在運行的,多個 Python 線程還是只能使用一個 CPU 核,線程之間交替執(zhí)行,只不過 np.dot() 線程在離開后,底層運行還在繼續(xù),而 add() 線程離開后,其不再運行。即:有那么 3s 時間,add() 沒運行,“單核 CPU” 轉(zhuǎn)向了線程 np.dot() 檢查計算結(jié)果是否已返回。

再增加一個 numpy 計算任務(wù)線程:

...
t1 = threading.Thread(target=dot)
t2 = threading.Thread(target=dot)

s = time.time()
add()
t1.start()
t2.start()
t1.join()
t2.join()
e = time.time()
print(e - s)

輸出:

25.624603986740112
27.81219220161438
30.751672983169556
30.752644538879395

時間增加了不少,基本快趕上計算一次 dot() 時間的兩倍了。這大概是由于 CPU 的計算達到了極限:

CPU 利用率長時間維持在 100%。

以上驗證對于 PyTorch 也是一樣的。

結(jié)論:numpy 和 pytorch 的計算不受 GIL 的限制,可以使用 CPU 多核;一個線程中,numpy 和 pytorch 將計算丟給底層的 C/C++ 語言后,“等待計算結(jié)果”類似于 IO,會釋放 GIL 鎖,而計算還在繼續(xù),其他 python 線程可以得到執(zhí)行。
推論:使用 GPU 計算是同樣的道理,python 程序?qū)⒂嬎銇G給 GPU 后,等待計算結(jié)果,當(dāng)前線程阻塞,釋放 GIL 鎖,其他 python 線程得以執(zhí)行,從而提高計算效率。

3. torch.cuda.Stream(device=device)

torch.cuda.Stream 是 PyTorch 庫中的一個類,用于管理 GPU 上的異步操作。

在 GPU 上執(zhí)行計算任務(wù)時,通??梢允褂枚鄠€流(stream)來并行執(zhí)行不同的操作。每個流都有自己的命令隊列,可以獨立地執(zhí)行操作,從而提高計算效率。torch.cuda.Stream 就是用來創(chuàng)建和管理這些流的。

使用 torch.cuda.Stream,可以將一系列 GPU 操作放入一個流中,并且可以通過調(diào)用流的 synchronize() 方法來等待流中所有操作完成。這對于需要處理多個 GPU 操作的情況非常有用。

以下是一個使用 torch.cuda.Stream 的示例代碼:

import torch

stream = torch.cuda.Stream()  # 創(chuàng)建流對象

with torch.cuda.stream(stream):  # 在流中執(zhí)行操作
	# 執(zhí)行GPU操作
	# ...

stream.synchronize()  # 等待流中操作完成

在上述示例中,我們首先創(chuàng)建了一個 torch.cuda.Stream 對象 stream。然后,我們使用 with 語句塊將一些 GPU 操作放入流中執(zhí)行。最后,我們調(diào)用 stream.synchronize() 來等待流中的操作完成。

通過使用 torch.cuda.Stream,我們可以更靈活地控制 GPU 操作的執(zhí)行順序和并行性,以優(yōu)化計算性能。

以上是 GPT3.5 給出的關(guān)于 torch.cuda.Stream 的簡介。另外,還可參考教程《如何在 Pytorch 中使用 CUDA 流(CUDA stream)》 講的不錯。我現(xiàn)在將其搬過來:

什么是 CUDA 流(CUDA stream)?

CUDA 流是一種在 GPU 上并行執(zhí)行操作的機制。在默認(rèn)情況下,PyTorch 會在默認(rèn)的流上執(zhí)行所有的操作,即在主流(default stream)上進行。但是,當(dāng)我們有一些可以并行執(zhí)行的操作時,通過將這些操作分配到不同的流上,我們可以在 GPU 上更有效地利用計算資源。

第一句就強調(diào):并行執(zhí)行操作的機制。

如何創(chuàng)建 CUDA 流?

可以通過 torch.cuda.Stream() 函數(shù)來創(chuàng)建 CUDA 流:

stream = torch.cuda.Stream()

使用 torch.cuda.Stream() 函數(shù)創(chuàng)建了一個名為 stream 的 CUDA 流。

如何使用 CUDA 流?

通過 with 上下文管理操作,并使用 stream.synchronize() 方法等待操作完成:

import torch

# 創(chuàng)建兩個CUDA流
stream1 = torch.cuda.Stream()
stream2 = torch.cuda.Stream()

# 分別將操作記錄到兩個流上
with torch.cuda.stream(stream1):
	# 執(zhí)行操作1
	# ...

with torch.cuda.stream(stream2):
	# 執(zhí)行操作2
	# ...

# 等待兩個流上的操作完成
torch.cuda.synchronize(stream1)
torch.cuda.synchronize(stream2)

我們創(chuàng)建了兩個 CUDA 流 stream1 和 stream2。然后,在兩個流上分別記錄操作,并使用torch.cuda.synchronize() 方法等待這些操作完成。

如何利用 CUDA 流提高性能?

一種常見的用法是將計算數(shù)據(jù)傳輸操作分配到不同的流上,從而實現(xiàn)計算和數(shù)據(jù)傳輸?shù)牟⑿袌?zhí)行。

3.1 對 PreDataLoader 中 CUDA 流的解釋

with torch.cuda.stream(self.__load_stream):
	return sample.to(self.__device, non_blocking=True)

這一句 sample.to(self.__device, non_blocking=True) 算是數(shù)據(jù)傳輸吧,它處在一個數(shù)據(jù)預(yù)加載線程中,想要與模型計算并行。那么按照上面的教程:一個 CUDA 流中的操作是順序執(zhí)行的,模型計算使用的是默認(rèn)流(default stream),平時我們的代碼 sample.to(device) 也使用了默認(rèn)流,這意味著數(shù)據(jù)的傳輸和模型計算是串行的。

所以,PreDataLoader 中定義了一個新的 CUDA 流,把 sample.to(self.__device, non_blocking=True) 放入這個新 CUDA 流,就可以和模型計算并行了。

4. @property

@property 是一個裝飾器,用于將類的方法轉(zhuǎn)換為屬性。通過使用 @property,您可以定義一個方法,并將其作為實例的屬性來訪問,而不需要使用函數(shù)調(diào)用的語法。

下面是一個示例,說明如何使用 @property 裝飾器:

class Circle:
	def __init__(self, radius):
		self.radius = radius

	@property
	def diameter(self):
		return 2 * self.radius

	@diameter.setter
	def diameter(self, value):
		self.radius = value / 2


# 創(chuàng)建 Circle 對象
circle = Circle(5)

# 訪問 diameter 屬性(實際上是調(diào)用了 diameter 方法)
print(circle.diameter)  # 輸出:10

# 設(shè)置 diameter 屬性(實際上是調(diào)用了 diameter.setter 方法)
circle.diameter = 14
print(circle.radius)  # 輸出:7

在上面的示例中,Circle 類定義了一個 radius 實例變量和一個 diameter 方法(被 @property 裝飾)。當(dāng)我們像訪問屬性一樣訪問 circle.diameter 時,實際上是調(diào)用了 diameter 方法并返回其結(jié)果。

此外,我們還可以使用 @property 創(chuàng)建一個 setter 方法,用于設(shè)置屬性的值。在示例中,diameter 屬性的 setter 方法名為 diameter.setter,它接受一個參數(shù) value,我們可以在 setter 方法中對 self.radius 進行更新。

總結(jié):使用 @property 裝飾器可以將一個方法定義為屬性,并提供更加方便和易讀的方式來訪問和設(shè)置屬性。

既然擔(dān)心 Python 線程的 GIL 問題,為何不直接用多進程?

:多進程沒那么好用,進程是重量級的,有獨立的內(nèi)存管理,共享內(nèi)存是比較麻煩的:

import multiprocessing

class Int(object):
	def __init__(self, i):
		self.__int = i

	def add(self):
		self.__int += 1

	def print(self):
		print(self.__int)

def add(integer: Int):
	integer.add()
	integer.print()
	print(id(integer))

if __name__ == '__main__':
	a_integer = Int(0)
	p1 = multiprocessing.Process(target=add, args=(a_integer,))
	p2 = multiprocessing.Process(target=add, args=(a_integer,))
	p3 = multiprocessing.Process(target=add, args=(a_integer,))

	p1.start()
	p2.start()
	p3.start()

	add(a_integer)
	a_integer.print()

輸出:

1
1839132811024
1
1
2091010788944
1
1721319788112
1
2095109213776

可見,各進程操作的 Int 對象不是同一個,即,創(chuàng)建子進程時傳入?yún)?shù)會是參數(shù)的一份拷貝。

如果將 multiprocessing.Process 換成 threading.Thread,則輸出:

1
2691328945888
2
2691328945888
3
2691328945888
4
2691328945888
4

創(chuàng)建線程時傳入?yún)?shù)會是參數(shù)對象本身

此外,子進程不能訪問主線程的變量,如果:

def add(integer: Int):
	integer.add()
	integer.print()
	b_integer.add()  # 加一個主進程中的變量
	print(id(integer))

則會報錯。而線程則可以。

可以看到,PreDataLoader 中的線程是訪問了主程序的數(shù)據(jù)了的,如果用進程,一是編程比較麻煩,二是效率也未必就高。

到此這篇關(guān)于pytorch 數(shù)據(jù)預(yù)加載的實現(xiàn)示例的文章就介紹到這了,更多相關(guān)pytorch 數(shù)據(jù)預(yù)加載內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論