欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

教你掌握分布式訓(xùn)練PyTorch?DDP到Accelerate到Trainer

 更新時(shí)間:2023年02月17日 09:23:56   作者:HuggingFace  
這篇文章主要為大家介紹了教你掌握分布式訓(xùn)練PyTorch?DDP到Accelerate到Trainer

概述

本教程假定你已經(jīng)對(duì)于 PyToch 訓(xùn)練一個(gè)簡(jiǎn)單模型有一定的基礎(chǔ)理解。本教程將展示使用 3 種封裝層級(jí)不同的方法調(diào)用 DDP (DistributedDataParallel) 進(jìn)程,在多個(gè) GPU 上訓(xùn)練同一個(gè)模型:

  • 使用 pytorch.distributed 模塊的原生 PyTorch DDP 模塊
  • 使用 ?? Accelerate 對(duì) pytorch.distributed 的輕量封裝,確保程序可以在不修改代碼或者少量修改代碼的情況下在單個(gè) GPU 或 TPU 下正常運(yùn)行
  • 使用 ?? Transformer 的高級(jí) Trainer API ,該 API 抽象封裝了所有代碼模板并且支持不同設(shè)備和分布式場(chǎng)景。

什么是分布式訓(xùn)練,為什么它很重要?

下面是一些非?;A(chǔ)的 PyTorch 訓(xùn)練代碼,它基于 Pytorch 官方在 MNIST 上創(chuàng)建和訓(xùn)練模型的示例

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu
    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

我們定義訓(xùn)練設(shè)備 (cuda):

device = "cuda"

構(gòu)建一些基本的 PyTorch DataLoaders:

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

把模型放入 CUDA 設(shè)備:

model = BasicNet().to(device)

構(gòu)建 PyTorch optimizer (優(yōu)化器)

optimizer = optim.AdamW(model.parameters(), lr=1e-3)

最終創(chuàng)建一個(gè)簡(jiǎn)單的訓(xùn)練和評(píng)估循環(huán),訓(xùn)練循環(huán)會(huì)使用全部訓(xùn)練數(shù)據(jù)集進(jìn)行訓(xùn)練,評(píng)估循環(huán)會(huì)計(jì)算訓(xùn)練后模型在測(cè)試數(shù)據(jù)集上的準(zhǔn)確度:

model.train()
for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)
    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
model.eval()
correct = 0
with torch.no_grad():
    for data, target in test_loader:
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

通常從這里開始,就可以將所有的代碼放入 Python 腳本或在 Jupyter Notebook 上運(yùn)行它。

然而,只執(zhí)行 python myscript.py 只會(huì)使用單個(gè) GPU 運(yùn)行腳本。如果有多個(gè) GPU 資源可用,您將如何讓這個(gè)腳本在兩個(gè) GPU 或多臺(tái)機(jī)器上運(yùn)行,通過分布式訓(xùn)練提高訓(xùn)練速度?這是 torch.distributed 發(fā)揮作用的地方。

PyTorch 分布式數(shù)據(jù)并行

顧名思義,torch.distributed 旨在配置分布式訓(xùn)練。你可以使用它配置多個(gè)節(jié)點(diǎn)進(jìn)行訓(xùn)練,例如:多機(jī)器下的單個(gè) GPU,或者單臺(tái)機(jī)器下的多個(gè) GPU,或者兩者的任意組合。

為了將上述代碼轉(zhuǎn)換為分布式訓(xùn)練,必須首先定義一些設(shè)置配置,具體細(xì)節(jié)請(qǐng)參閱 DDP 使用教程

首先必須聲明 setupcleanup 函數(shù)。這將創(chuàng)建一個(gè)進(jìn)程組,并且所有計(jì)算進(jìn)程都可以通過這個(gè)進(jìn)程組通信。

注意:在本教程的這一部分中,假定這些代碼是在 Python 腳本文件中啟動(dòng)。稍后將討論使用 ?? Accelerate 的啟動(dòng)器,就不必聲明 setupcleanup 函數(shù)了

import os
import torch.distributed as dist
def setup(rank, world_size):
    "Sets up the process group and configuration for PyTorch Distributed Data Parallelism"
    os.environ["MASTER_ADDR"] = 'localhost'
    os.environ["MASTER_PORT"] = "12355"
    # Initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
    "Cleans up the distributed environment"
    dist.destroy_process_group()

最后一個(gè)疑問是,我怎樣把我的數(shù)據(jù)和模型發(fā)送到另一個(gè) GPU 上?

這正是 DistributedDataParallel 模塊發(fā)揮作用的地方, 它將您的模型復(fù)制到每個(gè) GPU 上 ,并且當(dāng) loss.backward() 被調(diào)用進(jìn)行反向傳播的時(shí)候,所有這些模型副本的梯度將被同步地平均/下降 (reduce)。這確保每個(gè)設(shè)備在執(zhí)行優(yōu)化器步驟后具有相同的權(quán)重。

下面是我們的訓(xùn)練設(shè)置示例,我們使用了 DistributedDataParallel 重構(gòu)了訓(xùn)練函數(shù):

注意:此處的 rank 是當(dāng)前 GPU 與所有其他可用 GPU 相比的總體 rank,這意味著它們的 rank 為 0 -> n-1

from torch.nn.parallel import DistributedDataParallel as DDP
def train(model, rank, world_size):
    setup(rank, world_size)
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    # Train for one epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    cleanup()

在上述的代碼中需要為每個(gè)副本設(shè)備上的模型 (因此在這里是ddp_model的參數(shù)而不是 model 的參數(shù)) 聲明優(yōu)化器,以便正確計(jì)算每個(gè)副本設(shè)備上的梯度。

最后,要運(yùn)行腳本,PyTorch 有一個(gè)方便的 torchrun 命令行模塊可以提供幫助。只需傳入它應(yīng)該使用的節(jié)點(diǎn)數(shù)以及要運(yùn)行的腳本即可:

torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py

上面的代碼可以在在一臺(tái)機(jī)器上的兩個(gè) GPU 上運(yùn)行訓(xùn)練腳本,這是使用 PyTorch 只進(jìn)行分布式訓(xùn)練的情況 (不可以在單機(jī)單卡上運(yùn)行)。

現(xiàn)在讓我們談?wù)??? Accelerate,一個(gè)旨在使并行化更加無縫并有助于一些最佳實(shí)踐的庫。

?? Accelerate

?? Accelerate 是一個(gè)庫,旨在無需大幅修改代碼的情況下完成并行化。除此之外,?? Accelerate 附帶的數(shù)據(jù) pipeline 還可以提高代碼的性能。

首先,讓我們將剛剛執(zhí)行的所有上述代碼封裝到一個(gè)函數(shù)中,以幫助我們直觀地看到差異:

def train_ddp(rank, world_size):
    setup(rank, world_size)
    # Build DataLoaders
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])
    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)
    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
    # Build model
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    # Build optimizer
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    # Train for a single epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    # Evaluate
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

接下來讓我們談?wù)??? Accelerate 如何便利地實(shí)現(xiàn)并行化的。上面的代碼有幾個(gè)問題:

  • 該代碼有點(diǎn)低效,因?yàn)槊總€(gè)設(shè)備都會(huì)創(chuàng)建一個(gè) dataloader
  • 這些代碼只能運(yùn)行在多 GPU 下,當(dāng)想讓這個(gè)代碼運(yùn)行在單個(gè) GPU 或 TPU 時(shí),還需要額外進(jìn)行一些修改。

?? Accelerate 通過 Accelerator 類解決上述問題。通過它,不論是單節(jié)點(diǎn)還是多節(jié)點(diǎn),除了三行代碼外,其余代碼幾乎保持不變,如下所示:

def train_ddp_accelerate():
    accelerator = Accelerator()
    # Build DataLoaders
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])
    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)
    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
    # Build model
    model = BasicModel()
    # Build optimizer
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)
    # Send everything through `accelerator.prepare`
    train_loader, test_loader, model, optimizer = accelerator.prepare(
        train_loader, test_loader, model, optimizer
    )
    # Train for a single epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        output = model(data)
        loss = F.nll_loss(output, target)
        accelerator.backward(loss)
        optimizer.step()
        optimizer.zero_grad()
    # Evaluate
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

借助 Accelerator 對(duì)象,您的 PyTorch 訓(xùn)練循環(huán)現(xiàn)在已配置為可以在任何分布式情況運(yùn)行。使用 Accelerator 改造后的代碼仍然可以通過 torchrun CLI 或通過 ?? Accelerate 自己的 CLI 界面啟動(dòng)(啟動(dòng)你的?? Accelerate 腳本)。

因此,現(xiàn)在可以盡可能保持 PyTorch 原生代碼不變的前提下,使用 ?? Accelerate 執(zhí)行分布式訓(xùn)練。

早些時(shí)候有人提到 ?? Accelerate 還可以使 DataLoaders 更高效。這是通過自定義采樣器實(shí)現(xiàn)的,它可以在訓(xùn)練期間自動(dòng)將部分批次發(fā)送到不同的設(shè)備,從而允許每個(gè)設(shè)備只需要儲(chǔ)存數(shù)據(jù)的一部分,而不是一次將數(shù)據(jù)復(fù)制四份存入內(nèi)存,具體取決于配置。因此,內(nèi)存總量中只有原始數(shù)據(jù)集的一個(gè)完整副本。該數(shù)據(jù)集 會(huì)拆分后分配到各個(gè)訓(xùn)練節(jié)點(diǎn)上,從而允許在單個(gè)實(shí)例上訓(xùn)練更大的數(shù)據(jù)集,而不會(huì)使內(nèi)存爆炸

使用 notebook_launcher

之前提到您可以直接從 Jupyter Notebook 運(yùn)行分布式代碼。這來自 ?? Accelerate 的 notebook_launcher 模塊,它可以在 Jupyter Notebook 內(nèi)部的代碼啟動(dòng)多 GPU 訓(xùn)練。

使用它就像導(dǎo)入 launcher 一樣簡(jiǎn)單:

from accelerate import notebook_launcher

接著傳遞我們之前聲明的訓(xùn)練函數(shù)、要傳遞的任何參數(shù)以及要使用的進(jìn)程數(shù)(例如 TPU 上的 8 個(gè),或兩個(gè) GPU 上的 2 個(gè))。下面兩個(gè)訓(xùn)練函數(shù)都可以運(yùn)行,但請(qǐng)注意,啟動(dòng)單次啟動(dòng)后,實(shí)例需要重新啟動(dòng)才能產(chǎn)生另一個(gè):

notebook_launcher(train_ddp, args=(), num_processes=2)

或者:

notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)

使用 ?? Trainer

終于我們來到了最高級(jí)的 API——Hugging Face Trainer.

它涵蓋了盡可能多的訓(xùn)練類型,同時(shí)仍然能夠在分布式系統(tǒng)上進(jìn)行訓(xùn)練,用戶根本不需要做任何事情。

首先我們需要導(dǎo)入 ?? Trainer:

from transformers import Trainer

然后我們定義一些 TrainingArguments 來控制所有常用的超參數(shù)。?? Trainer 需要的訓(xùn)練數(shù)據(jù)是字典類型的,因此需要制作自定義整理功能。

最后,我們將訓(xùn)練器子類化并編寫我們自己的 compute_loss.

之后,這段代碼也可以分布式運(yùn)行,而無需修改任何訓(xùn)練代碼!

from transformers import Trainer, TrainingArguments
model = BasicNet()
training_args = TrainingArguments(
    "basic-trainer",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    num_train_epochs=1,
    evaluation_strategy="epoch",
    remove_unused_columns=False
)
def collate_fn(examples):
    pixel_values = torch.stack([example[0] for example in examples])
    labels = torch.tensor([example[1] for example in examples])
    return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(inputs["x"])
        target = inputs["labels"]
        loss = F.nll_loss(outputs, target)
        return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
    model,
    training_args,
    train_dataset=train_dset,
    eval_dataset=test_dset,
    data_collator=collate_fn,
)
trainer.train()
***** Running training *****
  Num examples = 60000
  Num Epochs = 1
  Instantaneous batch size per device = 64
  Total train batch size (w. parallel, distributed & accumulation) = 64
  Gradient Accumulation steps = 1
  Total optimization steps = 938
Epoch訓(xùn)練損失驗(yàn)證損失
10.8757000.282633

與上面的 notebook_launcher 示例類似,也可以將這個(gè)過程封裝成一個(gè)訓(xùn)練函數(shù):

def train_trainer_ddp():
    model = BasicNet()
    training_args = TrainingArguments(
        "basic-trainer",
        per_device_train_batch_size=64,
        per_device_eval_batch_size=64,
        num_train_epochs=1,
        evaluation_strategy="epoch",
        remove_unused_columns=False
    )
    def collate_fn(examples):
        pixel_values = torch.stack([example[0] for example in examples])
        labels = torch.tensor([example[1] for example in examples])
        return {"x":pixel_values, "labels":labels}
    class MyTrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            outputs = model(inputs["x"])
            target = inputs["labels"]
            loss = F.nll_loss(outputs, target)
            return (loss, outputs) if return_outputs else loss
    trainer = MyTrainer(
        model,
        training_args,
        train_dataset=train_dset,
        eval_dataset=test_dset,
        data_collator=collate_fn,
    )
    trainer.train()
notebook_launcher(train_trainer_ddp, args=(), num_processes=2)

相關(guān)資源

原文作者:Zachary Mueller

譯者:innovation64 (李洋)

審校:yaoqi (胡耀淇)

排版:zhongdongy (阿東)

以上就是教你掌握分布式訓(xùn)練PyTorch DDP到Accelerate到Trainer的詳細(xì)內(nèi)容,更多關(guān)于分布式訓(xùn)練PyTorch DDP Accelerate Trainer的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論