流水线并行

模型规范

class deepspeed.pipe.PipelineModule(layers, num_stages=None, topology=None, loss_fn=None, seed_layers=False, seed_fn=None, base_seed=1234, partition_method='parameters', activation_checkpoint_interval=0, activation_checkpoint_func=<function checkpoint>, checkpointable_layers=None, dynamic_shape=False)[source]

使用流水线并行来并行化的模块。

使流水线并行成为可能的关键约束是将前向传递表示为一系列层,并对它们之间的简单接口进行强制。前向传递隐式地由模块 layers 定义。关键假设是,每层的输出可以直接作为输入传递给下一层,就像一个 torch.nn.Sequence 一样。前向传递隐式地

def forward(self, inputs):
    x = inputs
    for layer in self.layers:
        x = layer(x)
    return x

注意

流水线并行与 ZeRO-2 和 ZeRO-3 不兼容。

参数
  • layers (Iterable) – 定义流水线结构的一系列层。可以是 torch.nn.Sequential 模块。

  • num_stages (int, optional) – 流水线并行的程度。如果没有指定,则必须提供 topology

  • topology (deepspeed.runtime.pipe.ProcessTopology, optional) – 定义训练的并行轴。如果 num_stagesNone,则必须提供。

  • loss_fn (callable, optional) – 损失计算 loss = loss_fn(outputs, label)

  • seed_layers (bool, optional) – 为每层使用不同的种子。默认为 False。

  • seed_fn (type, optional) – 自定义种子生成函数。默认为随机种子生成器。

  • base_seed (int, optional) – 起始种子。默认为 1234。

  • partition_method (str, optional) – 层分区的方法。默认为‘parameters’。

  • activation_checkpoint_interval (int, optional) – 激活检查点的粒度,以层数表示。0 表示禁用激活检查点。

  • activation_checkpoint_func (callable, optional) – 用于激活检查点的函数。默认为 deepspeed.checkpointing.checkpoint

  • checkpointable_layers (list, optional) – 可检查点的层可能不会被检查点。默认为 None,不会进行额外的过滤。

  • dynamic_shape – 允许动态形状的输入。这可能会影响性能。

forward(forward_input)[source]

定义每次调用时执行的计算。

应由所有子类重写。

注意

尽管需要在此函数中定义前向传递的方案,但应在之后调用 Module 实例,而不是调用此函数,因为前者负责运行已注册的钩子,而后者则会静默地忽略它们。

allreduce_tied_weight_gradients()[source]

对绑定阶段之间的绑定权重的梯度进行全约简

topology()[source]

ProcessTopology 对象,用于查询进程映射。

ckpt_prefix(checkpoints_path, tag)[source]

构建此模块写入的所有检查点文件的首缀。

ckpt_layer_path(ckpt_dir, local_layer_idx)[source]

为特定流水线模块层自定义首缀。

ckpt_layer_path_list(ckpt_dir, local_layer_idx)[source]

获取特定流水线模块层的全部 ckpt 文件列表。

get_additional_losses()[source]

返回用于报告的模型特定的额外损失

返回一个字典,其中 {“损失名称”: 损失值},如果没有额外损失,则返回 None。

class deepspeed.pipe.LayerSpec(typename, *module_args, **module_kwargs)[source]

用于指定流水线并行模块的构建块。

LayerSpec 存储 PipelineModule 中每个阶段的类型信息和参数。例如

nn.Sequence(
    torch.nn.Linear(self.in_dim, self.hidden_dim, bias=False),
    torch.nn.Linear(self.hidden_hidden, self.out_dim)
)

变为

layer_specs = [
    LayerSpec(torch.nn.Linear, self.in_dim, self.hidden_dim, bias=False),
    LayerSpec(torch.nn.Linear, self.hidden_hidden, self.out_dim)]
]
build(log=False)[source]

构建存储的规范。

class deepspeed.pipe.TiedLayerSpec(key, typename, *module_args, forward_fn=None, tied_weight_attr=['weight'], **module_kwargs)[source]
class deepspeed.runtime.pipe.ProcessTopology(axes, dims)[source]

管理将 n 维笛卡尔坐标映射到线性索引。此映射用于将进程的排名映射到各种并行形式的网格。

张量的每个轴都由其名称访问。提供的轴顺序定义了拓扑的布局。ProcessTopology 使用张量轴的“行优先”布局,因此 axes=[‘x’, ‘y’] 将坐标 (x,y) 和 (x,y+1) 映射到相邻的线性索引。如果改为使用 axes=[‘y’, ‘x’],则坐标 (x,y) 和 (x+1,y) 将是相邻的。

某些方法返回 ProcessCoord 具名元组。

get_rank(**coord_kwargs)[source]

通过进程的坐标返回进程的全局排名。

坐标指定为 kwargs。例如

>>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
>>> X.get_rank(x=0, y=1)
1
get_axis_names()[source]

返回拓扑排序中的轴名称列表。

get_rank_repr(rank, omit_axes=['data', 'pipe'], inner_sep='_', outer_sep='-')[source]

返回排名的字符串表示。

此方法主要用于检查点模型数据。

例如
>>> topo = Topo(axes=['a', 'b'], dims=[2, 2])
>>> topo.get_rank_repr(rank=3)
'a_01-b_01'
>>> topo.get_rank_repr(rank=3, omit_axes=['a'])
'b_01'
参数
  • rank (int) – 拓扑中的排名。

  • omit_axes (list, optional) – 应该不在表示中的轴。默认为 [‘data’, ‘pipe’]。

  • inner_sep (str, optional) – [描述]。默认为 ‘_’。

  • outer_sep (str, optional) – [描述]。默认为 ‘-‘。

返回

rank 拥有的坐标的字符串表示。

返回类型

str

get_dim(axis)[source]

返回给定轴上的进程数。

例如
>>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
>>> X.get_dim('y')
3
get_coord(rank)[source]

返回由进程排名拥有的坐标。

返回的具名元组的轴可以直接作为成员访问。对于 .. rubric:: 示例

>>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
>>> coord = X.get_coord(rank=1)
>>> coord.x
0
>>> coord.y
1
get_axis_comm_lists(axis)[source]

构建适合沿轴 axis 的通信器组的列表。

示例

>>> topo = Topo(axes=['pipe', 'data', 'model'], dims=[2, 2, 2])
>>> topo.get_axis_comm_lists('pipe')
[
    [0, 4], # data=0, model=0
    [1, 5], # data=0, model=1
    [2, 6], # data=1, model=0
    [3, 7], # data=1, model=1
]
返回

坐标在所有轴上匹配的列表列表, axis 之外。

filter_match(**filter_kwargs)[source]

返回其坐标与提供条件匹配的排名列表。

示例

>>> X = ProcessTopology(axes=['pipe', 'data', 'model'], dims=[2, 2, 2])
>>> X.filter_match(pipe=0, data=1)
[2, 3]
>>> [X.get_coord(rank) for rank in X.filter_match(pipe=0, data=1)]
[ProcessCoord(pipe=0, data=1, model=0), ProcessCoord(pipe=0, data=1, model=1)]
参数

**filter_kwargs (dict) – 用于选择坐标的条件。

返回

坐标与 filter_kwargs 匹配的排名列表。

get_axis_list(axis, idx)[source]

返回其坐标在轴中为 idx 的全局排名列表。

例如
>>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
>>> X.get_axis_list(axis='x', idx=0)
[0, 1, 2]
>>> X.get_axis_list(axis='y', idx=0)
[0, 3]

培训

class deepspeed.runtime.pipe.engine.PipelineEngine(has_bool_tensors=False, *super_args, **super_kwargs)[source]

混合管道、数据和模型并行训练的训练引擎。

此引擎由 deepspeed.initialize() 创建,当提供 PipelineModule 时。

reset_activation_shape()[source]

当激活和梯度的形状发生变化时,重置缓冲区。例如,对于更改每个样本 seqlen 的课程学习,我们需要在 seqlen 将要改变时调用它。

train_batch(data_iter=None)[source]

推进管道以训练下一批数据。该引擎将总共接收 self.train_batch_size() 个样本,这些样本是在所有工作程序中共同收集的。

除非 deepspeed.initialize() 提供了训练集,否则应将迭代训练数据的迭代器作为参数提供。在这种情况下,将自动读取训练数据。

警告

每个管道将从 data_iter 中提取 self.gradient_accumulation_steps() 个条目。 data_iter 中必须有足够的数据,否则 StopIteration 将停止训练。

DeepSpeed 提供了一个便利类 deepspeed.utils.RepeatingLoader,它包装数据加载器以在 StopIteration 时自动重启。

参数

data_iter (Iterator, optional) – 训练数据的迭代器。

返回

本批计算的损失的算术平均值。

eval_batch(data_iter, return_logits=False, compute_loss=True, reduce_output='avg', bcast_loss=True, num_micro_batches=None)[source]

在来自 data_iter 的一批数据上评估管道。该引擎将总共评估 self.train_batch_size() 个样本,这些样本是在所有工作程序中共同收集的。

此方法等效于

module.eval()
with torch.no_grad():
    output = module(batch)

警告

每个管道将从 data_iter 中提取 self.gradient_accumulation_steps() 个条目。 data_iter 中必须有足够的数据,否则 StopIteration 将停止训练。

DeepSpeed 提供了一个便利类 deepspeed.utils.RepeatingLoader,它包装数据加载器以在 StopIteration 时自动重启。

参数

data_iter (Iterator) – 要评估的数据的迭代器。

返回

本批计算的损失的算术平均值。

set_train_batch_size(train_batch_size)[source]

通过增加或减少微批次的数量(即梯度累积步数)来调整全局批次大小。每个微批次的大小(即 train_micro_batch_size_per_gpu)不会改变。:param train_batch_size: 训练的新全局批次大小。 :type train_batch_size: int

引发

ValueError – 如果 train_batch_size 不能被配置的微批次大小和数据并行度整除。

is_first_stage()[source]

如果此进程位于管道中的第一个阶段,则为 True。

is_last_stage()[source]

如果此进程位于管道中的最后一个阶段,则为 True。

set_dataiterator(iterator)[source]

存储一个迭代器以对训练数据进行采样。

set_batch_fn(fn)[source]

对输入数据执行后处理函数。

参数

fn (function) – 要运行的函数。

is_gradient_accumulation_boundary()[source]

如果引擎正在执行梯度减少或优化器步骤指令,则为 True。

此方法从 DeepSpeedEngine 中覆盖,以在管道引擎被指示执行时强制进行减少和步骤。

返回

是否应该进行减少和优化器步骤。

返回类型

bool

forward(*args, **kwargs)[source]

在管道并行训练中禁用。参见 train_batch()

backward(*args, **kwargs)[source]

在管道并行训练中禁用。参见 train_batch()

step(*args, **kwargs)[source]

在管道并行训练中禁用。参见 train_batch()

module_state_dict(exclude_frozen_parameters=False)[source]

覆盖 hack 以保存管道模型并返回保存的目录路径。

此方法应该只由 DeepSpeed 的 save_checkpoint() 调用。保存 PipelineModule 的推荐方法是在 save_checkpoint() 之外使用 save_state_dict()

返回

None

load_module_state_dict(checkpoint, strict=True, custom_load_fn=None, fetch_z3_params=False)[source]

覆盖 hack 以改为使用目录路径。

这很重要,因为管道模型按层而不是按排名进行检查点。

如果 state_dict 不是 Nonestr,我们将恢复到 super(),期望一个 dict

参数
  • state_dict (str, None) – 未使用

  • strict (bool, optional) – 严格状态加载。默认为 True。

扩展管道并行性

class deepspeed.runtime.pipe.schedule.PipeSchedule(micro_batches, stages, stage_id)[source]

通过生成 PipeInstruction 序列来指导管道引擎的执行。

调度程序是生成器,它会生成 PipeInstruction 序列来处理一个批次中的微批次。每个生成的步骤都是原子的,这意味着可以在连续的步骤之间放置一个屏障同步,而不会出现死锁。

下面是一个实现具有梯度累积的数据并行的示例调度程序

class DataParallelSchedule(PipeSchedule):
    def steps(self):
        for step_id in range(self.micro_batches):
            cmds = [
                LoadMicroBatch(buffer_id=0),
                ForwardPass(buffer_id=0),
                BackwardPass(buffer_id=0),
            ]
            if step_id == self.micro_batches - 1:
                cmds.extend([
                    ReduceGrads(),
                    OptimizerStep(),
                ])
            yield cmds

    def num_pipe_buffers(self):
        return 1
参数
  • micro_batches (int) – 构成一个批次的微批次数量。

  • stages (int) – 管道阶段的数量。

  • stage_id (int) – 将执行生成的调度的管道阶段。

abstract steps()[source]

为调度中的每个步骤生成 PipeInstruction 列表。

注意

调度必须实现 steps() 来定义调度。

返回

要作为管道的一个步骤执行的指令

num_pipe_buffers()[source]

此阶段将使用的管道缓冲区数量。

注意

调度程序应专门化 num_pipe_buffers() 以在规模上节省内存。

返回

引擎要分配的缓冲区数量。

property stage

用于配置此调度的阶段索引。

property num_stages

用于配置此调度的总管道阶段数。

property num_micro_batches

用于配置此调度的总微批次数量。

property is_first_stage

如果配置的 stage_id 是管道中的第一个阶段,则为 True。

property is_last_stage

如果配置的 stage_id 是管道中的最后一个阶段,则为 True。

class deepspeed.runtime.pipe.schedule.InferenceSchedule(micro_batches, stages, stage_id)[source]

使用管道并行性推断批次的调度程序。

num_pipe_buffers()[source]

推断只需要两个管道缓冲区。

返回

2

class deepspeed.runtime.pipe.schedule.TrainSchedule(micro_batches, stages, stage_id)[source]

使用混合并行性训练一个批次的调度程序。

管道并行性是通过梯度累积提取的,因此收敛遵循具有相同批次大小的数据并行方法的收敛。

num_pipe_buffers()[source]

返回此阶段所需的管道缓冲区数量。

这等效于正在进行的前向传递的最大数量,因为我们需要记住前向传递的激活才能运行反向传播。对于同步 1F1B,这等效于此阶段和最后一个阶段之间的索引差。

class deepspeed.runtime.pipe.schedule.DataParallelSchedule(micro_batches, stages, stage_id)[source]

使用传统的具有梯度累积的数据并行性进行训练的示例调度程序。

num_pipe_buffers()[source]

只需要一个管道缓冲区。

class deepspeed.runtime.pipe.schedule.PipeInstruction(**kwargs)[source]

所有管道引擎执行指令的基类。

所有关键字参数都作为成员存储,类似于 namedtuple。这些成员在执行期间可以被 PipeEngine 访问。

参数

kwargs (可选) – 作为成员存储的关键字参数

class deepspeed.runtime.pipe.schedule.OptimizerStep(**kwargs)[source]

使用优化器执行一步操作并归零梯度。

注意

应在 ReduceGradsReduceTiedGrads 之后发出。

注意

可以是数据并行进程之间的同步点。

class deepspeed.runtime.pipe.schedule.ReduceGrads(**kwargs)[source]

在阶段内,将计算出的梯度在数据并行进程之间进行归约。

class deepspeed.runtime.pipe.schedule.ReduceTiedGrads(**kwargs)[source]

在一个管道并行组内,归约计算出的绑定模块的梯度。

警告

直到模型在管道阶段之间进行分区,该同步点包含的阶段才可知。在最坏的情况下,它包含所有管道阶段。应谨慎安排此指令以避免死锁。

class deepspeed.runtime.pipe.schedule.BufferOpInstruction(buffer_id, **kwargs)[source]

对管道缓冲区进行操作的管道指令。

参数

buffer_id (int) – 要修改的管道缓冲区索引。

class deepspeed.runtime.pipe.schedule.LoadMicroBatch(buffer_id, **kwargs)[source]

将微批次加载到缓冲区中。

大致

buffers['inputs'][buffer_id] = next(data_iter)
class deepspeed.runtime.pipe.schedule.ForwardPass(buffer_id, **kwargs)[source]

计算前向传播。

大致

buffers['outputs'][buffer_id] = forward(buffers['inputs'][buffer_id])
class deepspeed.runtime.pipe.schedule.BackwardPass(buffer_id, **kwargs)[source]

计算反向传播并累积梯度。

大致

outputs = buffers['outputs'][buffer_id]
gradients = buffers['gradients'][buffer_id]
torch.autograd.backward(tensors=outputs,
                        grad_tensors=gradients)
class deepspeed.runtime.pipe.schedule.SendActivation(buffer_id, **kwargs)[source]

将激活发送到管道中的下一个阶段。

大致

send(buffers['outputs'][buffer_id])

注意

通信是阻塞的,必须与下一个管道阶段上的 RecvActivation 配合使用以避免死锁。

class deepspeed.runtime.pipe.schedule.RecvActivation(buffer_id, **kwargs)[source]

从管道中的上一个阶段接收激活。

大致

buffers['inputs'][buffer_id] = recv()

注意

通信是阻塞的,必须与上一个管道阶段上的 SendActivation 配合使用以避免死锁。

class deepspeed.runtime.pipe.schedule.SendGrad(buffer_id, **kwargs)[source]

将计算出的梯度发送到上一个管道阶段,相对于接收到的激活。

注意

只有 requires_grad==True 的接收张量才会生成梯度。在接收阶段,缺失的梯度将被替换为 None

注意

通信是阻塞的,必须与下一个管道阶段上的 RecvGrad 配合使用以避免死锁。

class deepspeed.runtime.pipe.schedule.RecvGrad(buffer_id, **kwargs)[source]

接收下一个管道阶段计算出的梯度。

注意

只有 requires_grad==True 的激活才会生成梯度。缺失的梯度将被替换为 None

注意

通信是阻塞的,必须与下一个管道阶段上的 SendGrad 配合使用以避免死锁。