data distribute parallel

torch自带的数据并行.

优点缺点
1. 数据并行, 每一个线程上分别有不同的数据, 可以显著减小数据过大导致的一个epoch速度过慢1. 没有切分模型和梯度图
2. 减轻每一个显卡上的数据量, 减轻显存负担
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
 
def main(rank, **kwargs):
    # rank == 0是main主分支, io操作(save, load, log)尽量只在rank == 0
    model = Model()
    # 这里是将模型分装到不同的显卡上
    model = DDP(model.cuda(), device_ids=[rank], output_device=rank)
 
    # training loop
    unwrapped_model = model.module
    optimizer = ...
    for epoch in trange(total_epoch, disable=rank != 0):
        for batch in dataloader:
            optimizer.zero_grad()
            output = model(**batch)
            loss = loss_func(output)
            loss.backward()
            optimizer.step()
 
world_size = torch.cuda.device_count()
mp.spawn(main, args=(**kwargs), n_procs=world_size)

deepspeed

优点缺点
1. 分多个阶段. 可以极大减少显存占用1. 配置复杂, 需要配合Huggingface的Trainer或者accelerate使用

需要写ds_config.json:

{
    "fp16": {
        "enabled": "auto",
        "loss_scale": 0,
        "loss_scale_window": 1000,
        "initial_scale_power": 16,
        "hysteresis": 2,
        "min_loss_scale": 1
    },
    "bf16": {
        "enabled": false
    },
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {
            "device": "none",
            "pin_memory": true
        },
        "offload_param": {
            "device": "none",
            "pin_memory": true
        },
        "overlap_comm": true,
        "contiguous_gradients": true,
        "sub_group_size": 1e9,
        "reduce_bucket_size": "auto",
        "stage3_prefetch_bucket_size": "auto",
        "stage3_param_persistence_threshold": "auto",
        "stage3_max_live_parameters": 1e9,
        "stage3_max_reuse_distance": 1e9,
        "stage3_gather_16bit_weights_on_model_save": true
    },
 
    "gradient_accumulation_steps": "auto",
    "gradient_clipping": "auto",
    "steps_per_print": 100,
    "train_batch_size": "auto",
    "train_micro_batch_size_per_gpu": 2,
    "wall_clock_breakdown": false
}

huggingface代码使用:

from transformer import Trainer, TrainingArguments
# 不需要显式引用deepspeed
training_args = TrainingArguments(...)
 
model = Model(logger)
 
trainer = CustomTrainer(model=model.lm, ...)
 
trainer.train()

在有huggingface Trainer的基础上调用:

deepspeed --module hf_trainer
# 或者
deepspeed hf_trainer.py

修改GPU:

deepspeed --num_gpus 2 hf_trainer.py
deepspeed --localhost:0,2,3,4 hf_trainer.py

注意选项必须写在前面

会在hf_trainer.py后面加上一个额外的argument: --local_rank r

如果不想要:

deepspeed --no_local_rank hf_trainer.py

accelerate

优点缺点
1. 使用简单1. 本身没有分布式, 需要deepspeed或者fsdp配合

也是需要config.yaml:

compute_environment: LOCAL_MACHINE
debug: false
deepspeed_config:
  gradient_accumulation_steps: 1
  gradient_clipping: 1.0
  offload_optimizer_device: cpu
  offload_param_device: cpu
  zero3_init_flag: true
  zero3_save_16bit_model: true
  zero_stage: 3
distributed_type: DEEPSPEED
downcast_bf16: 'no'
enable_cpu_affinity: false
machine_rank: 0
main_training_function: main
mixed_precision: 'no'
num_machines: 1
num_processes: 8
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false

代码:

from accelerate import Accelerator
import torch
 
accelerator = Accelerator() # 越早定义越好
 
model = Model(logger)
dataloader = torch.util.data.Dataloader(Dataset("train"), ...)
optimizer = ...
scheduler = ...
 
# 这一步相当于将model分布切分到不同的卡上, 具体的策略依照config.yaml中写的
wrapped_model, optimizer, scheduler, dataloader = accelerator.prepare(model, optimizer, scheduler, dataloader) # 顺序无所谓, 只要一一对应
wrapped_model.train()
 
# train loop
for epoch in trange(total_epoch, disable=not accelerator.is_main_process):
    for batch in tqdm(dataloader, disable=not accelerator.is_main_process):
        with accelerator.no_sync(model):
            optimizer.zero_grad()
            outputs = wrapper_model(**item)
            loss = outputs.loss
            # 原来是loss.backward()
            accelerator.backward(loss)
            optimizer.step()
    scheduler.step()

使用:

accelerate launch --config_file config.yaml --module train

一个完整的例子:

import json
import time
import os
 
from accelerate import Accelerator
from peft import get_peft_model_state_dict, set_peft_model_state_dict
import torch
import torch.distributed
from torch.utils.data import DataLoader
from tqdm import trange, tqdm
 
from config.config import config
from dataset import SistorDataset
from data_utils import collate_fn_lm_pretrain
from qwen import Model
 
 
if __name__ == "__main__":
    accelerator = Accelerator()
    time_stamp = time.strftime("%Y-%m-%d-%H-%M-%S")
    accelerator.print("---------Loading Model---------")
    wrapper_model = Model(accelerator.print)
    accelerator.print("---------Loading Dataloader---------")
    dataset = SistorDataset("train")
    dataloader = DataLoader(
        dataset,
        batch_size=config["dataset"]["batch_size"],
        num_workers=config["dataset"]["num_workers"],
        persistent_workers=config["dataset"]["persistent_workers"],
        drop_last=config["dataset"]["drop_last"],
        collate_fn=lambda batch: collate_fn_lm_pretrain(batch, wrapper_model.tokenizer),
    )
    accelerator.print("---------Loading Optimizer---------")
    optimizer = torch.optim.AdamW(wrapper_model.lm.parameters(), lr=2e-4, betas=[0.9, 0.99], weight_decay=0.0)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, eta_min=1e-6, T_max=100)
    # Pretrain Model
    if config["pretrain"]["checkpoints"]:
        accelerator.print("---------Loading Pretrain Model---------")
        set_peft_model_state_dict(wrapper_model.lm, torch.load(config["pretrain"]["checkpoints"]))
    # Accelerator
    accelerator.print("---------Loading Accelerator---------")
    model, optimizer, dataloader, scheduler = accelerator.prepare(wrapper_model.lm, optimizer, dataloader, scheduler)
    model.train()
 
    accelerator.print("---------Training---------")
    train_info = {}
    if accelerator.is_main_process and config["save"]:
        os.makedirs(f"/2022233318/sistor/ckpt/lm_pretrain-{time_stamp}", exist_ok=True)
 
    for epoch in trange(100000000, disable=not accelerator.is_local_main_process):
        loss_dict = {}
        for idx, item in enumerate(tqdm(dataloader, disable=not accelerator.is_local_main_process)):
            with accelerator.no_sync(model):
                optimizer.zero_grad()
                outputs = wrapper_model(**item)
                loss = outputs.loss
                accelerator.backward(loss)
                optimizer.step()
 
            verbose = {"outputs": loss.item()}
            for k, v in verbose.items():
                if k not in loss_dict:
                    loss_dict[k] = v
                else:
                    loss_dict[k] += v
 
        scheduler.step()
 
        for k, v in loss_dict.items():
            loss_dict[k] = v / len(dataloader)
        accelerator.print("---------Saving---------")
        accelerator.wait_for_everyone()
        unwrapped_model = accelerator.unwrap_model(model)
        if config["save"]:
            accelerator.save(
                get_peft_model_state_dict(unwrapped_model, state_dict=accelerator.get_state_dict(model)),
                f"/2022233318/sistor/ckpt/lm_pretrain-{time_stamp}/epoch{epoch}.pt",
            )
            if accelerator.is_main_process:
                train_info["epoch_" + str(epoch)] = {"outputs": loss_dict["outputs"]}
                with open(
                    f"/2022233318/sistor_log/lm_logs/log_{time_stamp}.json",
                    "w",
                ) as f:
                    json.dump(train_info, f)
 

Misc

手动保存文件

保存模型一般指的是保存模型的state dict, 加载的时候直接在分布式之前load即可.

保存的方法为:


        # don't return before `wait_for_everyone`
        accelerator.wait_for_everyone()
        state_dict = accelerate.get_state_dict(wrapped_model)
        if torch.distributed.get_rank() != 0:
        	return
        # you must return before use the variable `state_dict`, because this is None in other thread
        accelerator.save(state_dict, "...")
		
多卡loss整合(reduce)

使用torch.distributed.reduce可以快速整合多卡的loss. 同理, 整合的结果只有在main_process中存在, 其他的process都是None


        loss = ...
        loss_reduced = torch.tensor(loss).float().to(current_process_device)
        dist.all_reduce(loss_reduced) # default reducer is sum
        loss_reduced = loss_reduced / dist.get_world_size()
        if dist.get_rank() != 0:
            return