Pytorch 多卡并行(3)—— 使用 DDP 加速 minGPT 训练
  VFpNeRYlMszB 2023年11月02日 108 0


  • 前文 并行原理简介和 DDP 并行实践 和 使用 torchrun 进行容错处理 在简单的随机数据上演示了使用 DDP 并行加速训练的方法,本文考虑一个更加复杂的 GPT 类模型,说明如何进行 DDP 并行实战
  • MinGPT 是 GPT 模型的一个流行的开源 PyTorch 复现项目,其实现简洁干净可解释,因而颇具教育意义。关于 MinGPT 的详细介绍可以参考 minGPT 代码详解(训练 GPT 模型执行两位数加法)
  • 本文参考自:Pytorch 官方教程
  • 完整代码下载:wxc971231/ddp-tutorial-series


文章目录

  • 0. 项目组织
  • 1. 参数准备
  • 2. 数据准备
  • 3. 程序入口
  • 4. 定义模型
  • 5. 定义 Trainer


0. 项目组织

  • 本文改写 MinGPT 库中的 chargpt 例程,这是一个 character-level 语言模型项目,组织如下
  • 说明一下主要文件内容
  1. data/input.txt 是训练用的数据集
  2. char_dataset.py 定义了一个 char-level 的 torch.utils.data.Dataset
  3. gpt_snapshot.pt 是程序运行过程中保存的快照,使用 torchrun 时可以从此重启所有进程的训练
  4. gpt2_train_cfg.yaml 是 yaml 配置文件,记录了训练超参数
  5. main.log 是 hydra 生成的 logging 文件
  6. main.py 是程序入口,符合前文 使用 torchrun 进行容错处理 第1节给出的标准形式
  7. model.py 定义了 GPT 模型结构和 optimizer 的构造方法
  8. trainer.py 定义了训练过程,包括快照保存和加载等操作

1. 参数准备

  • 本项目使用 YAML文件存储超参数设置。YAML 是一种轻量级的数据序列化格式。相较于JSON等其他格式,YAML 更加易读易写,也更加适合用于配置文件等场景。YAML的语法结构主要包含键值对、列表、注释等几种元素
data_config:
  path: ./data/input.txt
  block_size: 128   # 输入序列长度
  train_split: 0.9  # 训练集和测试集划分
  truncate: 0.02    # 只用5%的数据进行训练
gpt_config:
  n_layer: 8
  n_head: 8
  n_embd: 512       
trainer_config:
  max_epochs: 10
  batch_size: 216
  data_loader_workers: 4
  grad_norm_clip: 1.0
  snapshot_path: gpt_snapshot.pt
  save_every: 3
  use_amp: True
optimizer_config:
  weight_decay: 0.1
  learning_rate: 0.0003

hydra:
  run:
    dir: ./

使用yaml文件时,可以使用 ${node.key} 的方式引用yaml中的其他变量;如果超参数的值缺失,可以使用 ??? 输入缺失值,或使用 null 输入空值。

  • 使用 Hydra 来管理超参数,它可以以装饰器的形式方便地加载不同路径下的 yaml 配置文件,最小用例如下
import hydra
from omegaconf import DictConfig

@hydra.main(version_base=None, config_path='configs', config_name='config')
def main(cfg: DictConfig) -> None:
    cfg['key'] # 获得对应的参数值

if __name__ == '__main__':
    main()
  • 这样就把 ./configs/config.yaml 文件的参数加载到 main 函数中了,使用 cfg['key'] 这样的形式获取参数值
  • 使用 Hydra 还有一个好处是它对 logging 标准库进行了包装。在 hydra.main 装饰器中对 log 输出格式规范为 "[%(asctime)s][%(name)s][%(levelname)s] - %(message)s",并设置 level 为 info,运行程序就会自动生成 main.log 日志文件。可以通过命令行的hydra.verbose 参数修改 log 的显示 level

2. 数据准备

  • 使用的数据是 tiny-shakespear 数据集,它是一个记录了一些英文对话的文本文档,截取如下
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!
  • 下面来构造数据集,思路是把 txt 文件中所有字符去重排序生成 vocab table;样本生成时先把 txt 内容全部读取进来,然后构造 n-gram 样本。如下
import torch
from torch.utils.data import Dataset
import fsspec
from dataclasses import dataclass

"""
Adapted from https://github.com/karpathy/minGPT/blob/master/projects/chargpt/chargpt.py
"""

@dataclass
class DataConfig:
    path: str = None
    block_size: int = None      # 输入序列长度    
    train_split: float = None   # 训练集和测试集划分
    truncate: float = 1.0       # 用于训练的数据占全体数据的比例

class CharDataset(Dataset):

    def __init__(self, data_cfg: DataConfig): #data_path: str, block_size):
        # 加载所需比例的数据
        data = fsspec.open(data_cfg.path).open().read().decode('utf-8')
        data = data[ : int(len(data) * data_cfg.truncate)]

        # Set 去重,转 list 后排序得到数据集中的唯一字符列表作为词表
        chars = sorted(list(set(data))) 
        data_size, vocab_size = len(data), len(chars)
        print('Data has %d characters, %d unique.' % (data_size, vocab_size))

        # 得到字符和词表索引之间的双射
        self.stoi = {ch: i for i, ch in enumerate(chars)}   # 字符 -> 词表索引
        self.itos = {i: ch for i, ch in enumerate(chars)}   # 词表索引 -> 字符
        
        self.block_size = data_cfg.block_size  	# 模型输入序列长度
        self.vocab_size = vocab_size			# 词表尺寸
        self.data = data

    def __len__(self):
        return len(self.data) - self.block_size

    def __getitem__(self, idx):
        # grab a chunk of (block_size + 1) characters from the data
        chunk = self.data[idx:idx + self.block_size + 1]
        
        # encode every character to an integer
        dix = [self.stoi[s] for s in chunk]
        x = torch.tensor(dix[:-1], dtype=torch.long)
        y = torch.tensor(dix[1:], dtype=torch.long)
        return x, y

3. 程序入口

import os
import torch
from torch.utils.data import random_split
from torch.distributed import init_process_group, destroy_process_group
from model import GPT, GPTConfig, OptimizerConfig, create_optimizer
from trainer import Trainer, TrainerConfig
from char_dataset import CharDataset, DataConfig
from omegaconf import DictConfig
import hydra


def ddp_setup():
	os.environ["MASTER_ADDR"] = "localhost" # 由于这里是单机实验所以直接写 localhost
    os.environ["MASTER_PORT"] = "12355"     # 任意空闲端口
    init_process_group(backend="nccl")
    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))

def get_train_objs(gpt_cfg: GPTConfig, opt_cfg: OptimizerConfig, data_cfg: DataConfig):
    dataset = CharDataset(data_cfg)
    train_len = int(len(dataset) * data_cfg.train_split)
    train_set, test_set = random_split(dataset, [train_len, len(dataset) - train_len])

    gpt_cfg.vocab_size = dataset.vocab_size
    gpt_cfg.block_size = dataset.block_size
    model = GPT(gpt_cfg)
    optimizer = create_optimizer(model, opt_cfg)
    
    return model, optimizer, train_set, test_set
 
@hydra.main(version_base=None, config_path=".", config_name="gpt2_train_cfg")
def main(cfg: DictConfig):
    # 初始化进程池
    ddp_setup()

    # 从 yaml 文件读取超参数
    gpt_cfg = GPTConfig(**cfg['gpt_config'])
    opt_cfg = OptimizerConfig(**cfg['optimizer_config'])
    data_cfg = DataConfig(**cfg['data_config'])
    trainer_cfg = TrainerConfig(**cfg['trainer_config'])

    # 创建训练对象
    model, optimizer, train_data, test_data = get_train_objs(gpt_cfg, opt_cfg, data_cfg)
    trainer = Trainer(trainer_cfg, model, optimizer, train_data, test_data)
    
    # 开始训练
    trainer.train()

    # 训练完成后,销毁进程池
    destroy_process_group()


if __name__ == "__main__":
    main()
  • 注意其中使用 hydra.main 装饰器来加载参数;运行时使用以下命令指定 GPU 运行
CUDA_VISIBLE_DEVICES=1,2 torchrun --standalone --nproc_per_node=gpu main.py

4. 定义模型

  • 整个模型定义部分相比 MinGPT 原始代码逻辑上没有区别,只是换了一下写法看起来更清晰一点。首先定义两个 @dataclass 保存模型和优化器参数
from dataclasses import dataclass
import math
import torch
import torch.nn as nn
from torch.nn import functional as F

@dataclass
class GPTConfig:
    model_type: str = 'gpt2'
    # model configurations
    n_layer: int = None
    n_head: int = None
    n_embd: int =  None
    # openai's values for gpt2
    vocab_size: int = 50257 
    block_size: int = 1024
    # dropout hyperparameters
    embd_pdrop: float = 0.1
    resid_pdrop: float = 0.1
    attn_pdrop: float = 0.1

@dataclass
class OptimizerConfig:
    learning_rate: float = 3e-4
    weight_decay: float = 0.1
  • 定义多头 masked self-attention 模块,原本 MinGPT 库是全部手写的,这里则用了 pytorch 自己的多头注意力模块。具体做法是使用 torch.nn.MultiheadAttention 定义普通多头注意力层,在 forward 方法中用同一个序列输入构造 qkv 实现 self-attention,再用过对注意力输出设置遮盖实现 mask
class MultiheadAttentionLayer(nn.Module):
    """
    A multi-head masked self-attention layer with a projection at the end.
    """

    def __init__(self, config, device="cpu", dtype=torch.float32):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        self.resid_drop = nn.Dropout(config.resid_pdrop)
        
        # output projection
        self.c_proj = nn.Linear(config.n_embd, config.n_embd, device=device, dtype=dtype)

        # Causal mask。注意这个mask是通过 self.register_buffer 方法登记的
        # 这样登记过的张量可以求梯度也可以随模型在 CPU/GPU 之间移动,但是不进行参数优化
        self.register_buffer("mask", torch.tril(torch.ones(config.block_size, config.block_size))
                             .view(1, 1, config.block_size, config.block_size))
        
        self.attn = torch.nn.MultiheadAttention(
            embed_dim=config.n_embd,
            num_heads=config.n_head,
            dropout=config.attn_pdrop,
            batch_first=True,
            device=device,
            dtype=dtype
        )

    def forward(self, x):
        _, seq_size, _ = x.size()   # batch size, sequence length, embedding dimensionality (n_embd)
        y = self.attn(x, x, x, attn_mask=self.mask[0, 0, :seq_size, :seq_size])[0]
        y = self.resid_drop(self.c_proj(y))
        return y

我感觉这里 self.attn(x, x, x, attn_mask=self.mask[0, 0, :seq_size, :seq_size])[0] 的调用有问题,因为 torch.nn.MultiheadAttention 的前向过程需要输入 query,key 和 value 三个 tensor,这里应该把 x 用三个线性层变换后再作为输入。如果读者有其他想法可以和我讨论。考虑到本文主要说明 DDP 并行,暂不关注此问题

  • 定义 Transformer block
class Block(nn.Module):
    """ an unassuming Transformer block """
    def __init__(self, config: GPTConfig):
        super().__init__()
        self.ln1 = nn.LayerNorm(config.n_embd)
        self.ln2 = nn.LayerNorm(config.n_embd)
        self.attn = MultiheadAttentionLayer(config)
        self.mlp = nn.Sequential(
            nn.Linear(config.n_embd, 4 * config.n_embd),
            nn.GELU(),
            nn.Linear(4 * config.n_embd, config.n_embd),
            nn.Dropout(config.resid_pdrop),
        )

    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x
  • 定义字符嵌入层,用 nn.Embedding 嵌入 token,再设置一个 nn.Parameter 作为可学习的位置编码
class EmbeddingStem(nn.Module):
    def __init__(self, config: GPTConfig, device="cpu", dtype=torch.float32):
        super().__init__()
        self.tok_emb = nn.Embedding(config.vocab_size, config.n_embd, device=device, dtype=dtype)
        self.pos_emb = nn.Parameter(torch.zeros(1, config.block_size, config.n_embd, device=device, dtype=dtype))
        self.drop = nn.Dropout(config.embd_pdrop)
        self.block_size = config.block_size

    def reset_parameters(self): 
        self.tok_emb.reset_parameters() # 将 nn.Embedding 层参数初始化为正态分布采样

    def forward(self, idx):
        b, t = idx.size()
        assert t <= self.block_size, f"Cannot forward sequence of length {t}, block size is only {self.block_size}"

        token_embeddings = self.tok_emb(idx)            # each index maps to a (learnable) embedding vector
        position_embeddings = self.pos_emb[:, :t, :]    # each position maps to a (learnable) position vector
        return self.drop(token_embeddings + position_embeddings)
  • 把以上组件合在一起,定义 GPT 模型
class GPT(nn.Module):
    """ GPT Language Model """

    def __init__(self, config: GPTConfig):
        super().__init__()
        self.block_size = config.block_size
        config = self._set_model_config(config)

        # input embedding stem
        self.emb_stem = EmbeddingStem(config)
        # transformer
        self.blocks = nn.Sequential(*[Block(config) for _ in range(config.n_layer)])
        # decoder head
        self.ln_f = nn.LayerNorm(config.n_embd)
        self.head = nn.Linear(config.n_embd, config.vocab_size, bias=False)

        # init all weights, and apply a special scaled init to the residual projections, per GPT-2 paper
        self.apply(self._init_weights)
        for pn, p in self.named_parameters():
            if pn.endswith('c_proj.weight'):
                p.data.normal_(mean=0.0, std=0.02/math.sqrt(2 * config.n_layer))

        # report number of parameters (note we don't count the decoder parameters in lm_head)
        n_params = sum(p.numel() for p in self.blocks.parameters())
        print("number of parameters: %.2fM" % (n_params/1e6,))

    def _set_model_config(self, config):
        type_given = config.model_type is not None
        params_given = all([config.n_layer is not None, config.n_head is not None, config.n_embd is not None])
        # assert type_given ^ params_given # exactly one of these (XOR)
        if type_given and not params_given:
            # translate from model_type to detailed configuration
            config.__dict__.update({
                # names follow the huggingface naming conventions
                # GPT-1
                'openai-gpt':   dict(n_layer=12, n_head=12, n_embd=768),  # 117M params
                # GPT-2 configs
                'gpt2':         dict(n_layer=12, n_head=12, n_embd=768),  # 124M params
                'gpt2-medium':  dict(n_layer=24, n_head=16, n_embd=1024), # 350M params
                'gpt2-large':   dict(n_layer=36, n_head=20, n_embd=1280), # 774M params
                'gpt2-xl':      dict(n_layer=48, n_head=25, n_embd=1600), # 1558M params
                # Gophers
                'gopher-44m':   dict(n_layer=8, n_head=16, n_embd=512),
                # (there are a number more...)
                # I made these tiny models up
                'gpt-mini':     dict(n_layer=6, n_head=6, n_embd=192),
                'gpt-micro':    dict(n_layer=4, n_head=4, n_embd=128),
                'gpt-nano':     dict(n_layer=3, n_head=3, n_embd=48),
            }[config.model_type])
        return config
    
    def _init_weights(self, module):
        if isinstance(module, (nn.Linear, nn.Embedding)):
            module.weight.data.normal_(mean=0.0, std=0.02)
            if isinstance(module, nn.Linear) and module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.LayerNorm):
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)

    def forward(self, idx, targets=None):
        x = self.emb_stem(idx)
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.head(x)

        # if we are given some desired targets also calculate the loss
        loss = None
        if targets is not None:
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)

        return logits, loss

    @torch.no_grad()
    def generate(self, idx, max_new_tokens, temperature=1.0, do_sample=False, top_k=None):
        """
        Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and complete
        the sequence max_new_tokens times, feeding the predictions back into the model each time.
        Most likely you'll want to make sure to be in model.eval() mode of operation for this.
        """
        for _ in range(max_new_tokens):
            # if the sequence context is growing too long we must crop it at block_size
            idx_cond = idx if idx.size(1) <= self.block_size else idx[:, -self.block_size:]
            # forward the model to get the logits for the index in the sequence
            logits, _ = self(idx_cond)
            # pluck the logits at the final step and scale by desired temperature
            logits = logits[:, -1, :] / temperature
            # optionally crop the logits to only the top k options
            if top_k is not None:
                v, _ = torch.topk(logits, top_k)
                logits[logits < v[:, [-1]]] = -float('Inf')
            # apply softmax to convert logits to (normalized) probabilities
            probs = F.softmax(logits, dim=-1)
            # either sample from the distribution or take the most likely element
            if do_sample:
                idx_next = torch.multinomial(probs, num_samples=1)
            else:
                _, idx_next = torch.topk(probs, k=1, dim=-1)
            # append sampled index to the running sequence and continue
            idx = torch.cat((idx, idx_next), dim=1)

        return idx
  • 最后我们来定义优化器,
def create_optimizer(model: torch.nn.Module, opt_config: OptimizerConfig):
    """
    This long function is unfortunately doing something very simple and is being very defensive:
    We are separating out all parameters of the model into two buckets: those that will experience
    weight decay for regularization and those that won't (biases, and layernorm/embedding weights).
    We are then returning the PyTorch optimizer object.
    """

    # separate out all parameters to those that will and won't experience regularizing weight decay
    decay = set()
    no_decay = set()
    whitelist_weight_modules = (torch.nn.Linear, )
    blacklist_weight_modules = (torch.nn.LayerNorm, torch.nn.Embedding)
    for mn, m in model.named_modules():
        for pn, p in m.named_parameters():
            fpn = '%s.%s' % (mn, pn) if mn else pn # full param name
            # random note: because named_modules and named_parameters are recursive
            # we will see the same tensors p many many times. but doing it this way
            # allows us to know which parent module any tensor p belongs to...
            if pn.endswith('bias'):
                # all biases will not be decayed
                no_decay.add(fpn)
            elif pn.endswith('weight') and isinstance(m, whitelist_weight_modules):
                # weights of whitelist modules will be weight decayed
                decay.add(fpn)
            elif pn.endswith('in_proj_weight'):
                # MHA projection layer
                decay.add(fpn)
            elif pn.endswith('weight') and isinstance(m, blacklist_weight_modules):
                # weights of blacklist modules will NOT be weight decayed
                no_decay.add(fpn)
            elif pn.endswith('pos_emb'):
                # positional embedding shouldn't be decayed
                no_decay.add(fpn)

    # validate that we considered every parameter
    param_dict = {pn: p for pn, p in model.named_parameters()}
    inter_params = decay & no_decay
    union_params = decay | no_decay
    assert len(inter_params) == 0, "parameters %s made it into both decay/no_decay sets!" % (str(inter_params), )
    assert len(param_dict.keys() - union_params) == 0, "parameters %s were not separated into either decay/no_decay set!" \
                                                % (str(param_dict.keys() - union_params), )

    # create the pytorch optimizer object
    optim_groups = [
        {"params": [param_dict[pn] for pn in sorted(list(decay))], "weight_decay": opt_config.weight_decay},
        {"params": [param_dict[pn] for pn in sorted(list(no_decay))], "weight_decay": 0.0},
    ]
    optimizer = torch.optim.AdamW(optim_groups, lr=opt_config.learning_rate, betas=(0.9, 0.95))
    return optimizer
  • 这里主要是通过权重衰减方法来进行正则化避免过拟合。注意到作者通过一个二重遍历考察 GPT 模型所有 sub module 的所有 parameters,仅对所有 torch.nn.Linear 层的 weight 参数进行衰减,bias 参数及所有 torch.nn.LayerNormtorch.nn.Embedding 模块的参数都不做处理。由于模块是递归组织的,这个二重遍历会重复访问很多参数,所以通过 set 自动去重,最后根据处理结果定义 torch.optim.AdamW 优化器返回

关于权重衰减的理论说明,参考:机器学习基础(6)—— 使用权重衰减和丢弃法缓解过拟合问题

5. 定义 Trainer

  • Trainer 定义和原始 MinGPT 库主要有两个区别
  1. 按指定周期要求 rank0 进程保存 snapshot,本项目中应包含 epoch、模型参数和优化器参数三部分内容;初始化 Trainer 时应当加载可能存在的 snapshot 文件,这样在 torchrun 自动重启进程时可以从最近的 snapshot 恢复训练
  2. 可以使用 torch.cuda.amp.GradScaler 进行混合精度训练
  • 混合精度训练(Mixed Precision Training)是一种训练深度学习模型的技术,旨在提高模型的训练速度和效率。它利用了现代GPU可以混合计算精度的硬件特性,使用FP16数据类型对模型中的某些操作进行加速。具体而言,模型的参数通常使用FP32数据类型,而输入数据和梯度则使用FP16数据类型,从而减少内存开销,加速计算速度,提高模型的训练效率。此外,混合精度训练还可以通过减少浮点运算和内存访问,降低能源消
  • 混合精度训练的主要困难在于 fp16 的表示范围有限,在训练中常出现溢出问题,尤其是下溢出,因为在网络训练的后期,模型的梯度往往很小;另外还有舍入误差问题,这是指当梯度过小,小于当前区间内的最小间隔时,该次梯度更新可能会失效
  • 解决以上问题的方法包括损失缩放FP32权重备份等,前者对计算出的 loss 值进行缩放(scale),这样梯度也会被缩放进而平移到 FP16 的有效范围内存储,在进行梯度更新之前先将缩放后的梯度转化为 FP32 再unscale回去;后者将模型权重、激活值、梯度等数据用 FP16 来存储,同时维护一份 FP32 的模型权重副本用于更新。在反向传播得到 FP16 的梯度以后,将其转化成 FP32 并 unscale,最后更新 FP32 的模型权重。因为整个更新过程是在 FP32 的环境中进行的,所以不会出现舍入误差
  • 有一些代码库可以帮助我们快速实现混合精度训练,而无需大幅修改代码,包括 nvidia 的 apex 库和 pytorch 1.6 后引入的 amp 库等
  1. 本项目使用 pytorch 的 amp 库进行混合精度训练,主要用到 GradScaler 和 autocast 两个组件。其中 Gradscalar 对会检查梯度是否发现溢出,并对优化器进行控制 (将丢弃的batches转换为 no-op);autocast 是一个上下文管理器,当进入 autocast 上下文后,tensor 的数据类型会自动转换为半精度浮点型,从而在不损失训练精度的情况下加快运算,而不需要手动调用 .half()。 一个最小实践示例为
from torch.cuda.amp import autocast as autocast, GradScaler
'''
other code
'''
 
# 在训练最开始之前实例化一个GradScaler对象
scaler = GradScaler()
'''
other code
'''
        # 前向过程(model + loss)开启 autocast
        with autocast():
            output = model(input)
            loss = loss_fn(output, target)
 
        # Scales loss,这是因为半精度的数值范围有限,因此需要用它放大
        scaler.scale(loss).backward()
 
        # scaler.step() unscale之前放大后的梯度,但是scale太多可能出现inf或NaN
        # 故其会判断是否出现了inf/NaN
        # 如果梯度的值不是 infs 或者 NaNs, 那么调用optimizer.step()来更新权重,
        # 如果检测到出现了inf或者NaN,就跳过这次梯度更新,同时动态调整scaler的大小
        scaler.step(optimizer)
 
        # 查看是否要更新scaler,这个要注意不能丢
        scaler.update()
 
'''
other code
'''
  • 下面开始分析 trainer 代码,首先定义两个 @dataclass 存储 Trainer 参数和 snapshot 参数
@dataclass
class TrainerConfig:
    max_epochs: int = None
    batch_size: int = None
    data_loader_workers: int = None
    grad_norm_clip: float = None
    snapshot_path: Optional[str] = None
    save_every: int = None
    use_amp: bool = None

@dataclass
class Snapshot:
    model_state: 'OrderedDict[str, torch.Tensor]'
    optimizer_state: Dict[str, Any]
    finished_epoch: int
  • 定义 Trianer 的初始化方法
class Trainer:
    def __init__(self, trainer_config: TrainerConfig, model, optimizer, train_dataset, test_dataset=None):
        self.config = trainer_config
        # set torchrun variables
        self.local_rank = int(os.environ["LOCAL_RANK"]) # 在所有node的所有进程中当前GPU进程的rank
        self.global_rank = int(os.environ["RANK"])      # 在当前node中当前GPU进程的rank
        
        # data stuff
        self.train_dataset = train_dataset
        self.train_loader = self._prepare_dataloader(train_dataset)
        self.test_loader = self._prepare_dataloader(test_dataset) if test_dataset else None
        
        # initialize train states
        self.epochs_run = 0
        self.model = model.to(self.local_rank)
        self.optimizer = optimizer        
        self.save_every = self.config.save_every

        # load snapshot if available. only necessary on the first node.
        if self.config.snapshot_path is None:
            self.config.snapshot_path = "snapshot.pt"
        self._load_snapshot()

        # wrap with DDP. this step will synch model across all the processes.
        self.model = DDP(self.model, device_ids=[self.local_rank])

        # torch.cuda.amp.GradScaler 是一个用于自动混合精度训练的 PyTorch 工具,它可以帮助加速模型训练并减少显存使用量
        # 具体来说,GradScaler 可以将梯度缩放到较小的范围,以避免数值下溢或溢出的问题,同时保持足够的精度以避免模型的性能下降
        if self.config.use_amp: 
            self.scaler = torch.cuda.amp.GradScaler()

注意几点

  1. torchrun 帮助我们自动分发进程,通过环境变量获取当前运行代码的 GPU rank 信息
  2. 初始化 Trainer 时加载可能存在的 snapshot,实现断点续训
  3. 模型使用 DDP 进行包装
  4. 定义混合精度训练所需的 torch.cuda.amp.GradScaler()
  • 定义 DataLoder,注意使用 DistributedSampler 来分发训练数据
def _prepare_dataloader(self, dataset: Dataset):
   return DataLoader(
        dataset,
        batch_size=self.config.batch_size,
        pin_memory=True,
        shuffle=False,
        num_workers=self.config.data_loader_workers,
        sampler=DistributedSampler(dataset)                 # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠
    )
  • 定义 snapshot 的加载和保存方法
def _save_snapshot(self, epoch):
	# capture snapshot
	model = self.model
	raw_model = model.module if hasattr(model, "module") else model
	snapshot = Snapshot(
	    model_state=raw_model.state_dict(),
	    optimizer_state=self.optimizer.state_dict(),
	    finished_epoch=epoch
	)
	# save snapshot
	snapshot = asdict(snapshot)
	torch.save(snapshot, self.config.snapshot_path)
	print(f"Snapshot saved at epoch {epoch}")

def _load_snapshot(self):
    try:
        snapshot = fsspec.open(self.config.snapshot_path)   # fsspec 为各种后端存储系统提供统一的 Python 接口,可以用相同的语法打开本地、AWS S3 和 GCS 等各种云存储平台的文件
        with snapshot as f:
            snapshot_data = torch.load(f, map_location="cpu")
    except FileNotFoundError:
        print("Snapshot not found. Training model from scratch")
        return 

    snapshot = Snapshot(**snapshot_data)
    self.model.load_state_dict(snapshot.model_state)
    self.optimizer.load_state_dict(snapshot.optimizer_state)
    self.epochs_run = snapshot.finished_epoch
    print(f"Resuming training from snapshot at Epoch {self.epochs_run}")
  • 定义训练流程
def _run_batch(self, source, targets, train: bool = True) -> float:
    with torch.set_grad_enabled(train), torch.cuda.amp.autocast(dtype=torch.float16, enabled=(self.config.use_amp)):
        _, loss = self.model(source, targets)
    
    if train:
        self.optimizer.zero_grad(set_to_none=True)
        if self.config.use_amp: 
            self.scaler.scale(loss).backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_norm_clip)
            self.scaler.step(self.optimizer)
            self.scaler.update()
        else:
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_norm_clip)
            self.optimizer.step()
    
    #return loss.item()
    return loss

def _run_epoch(self, epoch: int, dataloader: DataLoader, train: bool = True):
    dataloader.sampler.set_epoch(epoch)
    for iter, (source, targets) in enumerate(dataloader):
        step_type = "Train" if train else "Eval"
        source = source.to(self.local_rank)
        targets = targets.to(self.local_rank)
        batch_loss = self._run_batch(source, targets, train)
        if iter % 100 == 0:
            #print(f"[GPU{self.global_rank}] Epoch {epoch} | Iter {iter} | {step_type} Loss {batch_loss.item():.5f}")
            if train:
                print(f"[GPU{self.global_rank}] Epoch {epoch} | Iter {iter} | {step_type} Loss {batch_loss.item():.5f}")
            else:
                eval_loss_list = [torch.zeros_like(batch_loss) for _ in range(int(os.environ['WORLD_SIZE']))]
                dist.gather(
                    batch_loss,
                    eval_loss_list if self.local_rank == 0 else None, 
                    dst=0
                )
                if self.local_rank == 0:
                    for i, loss in enumerate(eval_loss_list):
                        print(f"[GPU{i}] Epoch {epoch} | Iter {iter} | {step_type} Loss {loss.item():.5f}")

def train(self):
    for epoch in range(self.epochs_run, self.config.max_epochs):
        epoch += 1
        
        # train for one epoch
        self._run_epoch(epoch, self.train_loader, train=True)

        # 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 snapshot 以免重复保存
        if self.local_rank == 0 and epoch % self.save_every == 0:
            self._save_snapshot(epoch)

        # eval run
        if self.test_loader:
            self._run_epoch(epoch, self.test_loader, train=False)

这里需要注意几点:

  1. 指定 rank0 进程保存 snapshot 以免重复保存
  2. _run_batch 方法中,计算 loss 的部分设置在 torch.amp.autocast 上下文中,启动混合精度训练
  3. _run_epoch 方法中,使用 torch.distributed.gather 原语汇聚各个 GPU 的验证损失信息到 rank0 上,常用这种操作进行 log 训练信息。除此以外 Pytorch 一共提供了六个进程通信原语,如下
import torch.distributed as dist

dist.broadcast(tensor, src, group)				# 将 tensor 从 src 复制到所有其他进程。
dist.reduce(tensor, dst, op, group)				# 将 op 应用于每个 tensor 并将结果存储在 dst 中。
dist.all_reduce(tensor, op, group)				# 与 reduce 相同,但结果存储在所有进程中。
dist.scatter(tensor, scatter_list, src, group)	# 复制  tensor scatter_lost[i] 到  进程
dist.gather(tensor,gather_list, dst, group)		# 从 dst 中的所有进程复制 tensor。
dist.all_gather(tensor_list, tensor, group)		# 将所有进程的 tensor 复制到所有进程上的 tensor_list。
dist.barrier(group)								# 阻塞组中的所有进程,直到每个进程都进入该函数。
  1. 其中 op 操作有四种
dist.ReduceOp.SUM,
dist.ReduceOp.PRODUCT,
dist.ReduceOp.MAX,
dist.ReduceOp.MIN.
  1. 这些方法在需要手动汇聚或分发信息时特别有用,具体用法可以参考 pytorch 官方文档


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
VFpNeRYlMszB