流水线并行
模型规范
- class deepspeed.pipe.PipelineModule(layers, num_stages=None, topology=None, loss_fn=None, seed_layers=False, seed_fn=None, base_seed=1234, partition_method='parameters', activation_checkpoint_interval=0, activation_checkpoint_func=<function checkpoint>, checkpointable_layers=None, dynamic_shape=False)[source]
使用流水线并行来并行化的模块。
使流水线并行成为可能的关键约束是将前向传递表示为一系列层,并对它们之间的简单接口进行强制。前向传递隐式地由模块
layers
定义。关键假设是,每层的输出可以直接作为输入传递给下一层,就像一个torch.nn.Sequence
一样。前向传递隐式地def forward(self, inputs): x = inputs for layer in self.layers: x = layer(x) return x
注意
流水线并行与 ZeRO-2 和 ZeRO-3 不兼容。
- 参数
layers (Iterable) – 定义流水线结构的一系列层。可以是
torch.nn.Sequential
模块。num_stages (int, optional) – 流水线并行的程度。如果没有指定,则必须提供
topology
。topology (
deepspeed.runtime.pipe.ProcessTopology
, optional) – 定义训练的并行轴。如果num_stages
为None
,则必须提供。loss_fn (callable, optional) – 损失计算
loss = loss_fn(outputs, label)
seed_layers (bool, optional) – 为每层使用不同的种子。默认为 False。
seed_fn (type, optional) – 自定义种子生成函数。默认为随机种子生成器。
base_seed (int, optional) – 起始种子。默认为 1234。
partition_method (str, optional) – 层分区的方法。默认为‘parameters’。
activation_checkpoint_interval (int, optional) – 激活检查点的粒度,以层数表示。0 表示禁用激活检查点。
activation_checkpoint_func (callable, optional) – 用于激活检查点的函数。默认为
deepspeed.checkpointing.checkpoint
。checkpointable_layers (list, optional) – 可检查点的层可能不会被检查点。默认为 None,不会进行额外的过滤。
dynamic_shape – 允许动态形状的输入。这可能会影响性能。
- class deepspeed.pipe.LayerSpec(typename, *module_args, **module_kwargs)[source]
用于指定流水线并行模块的构建块。
LayerSpec 存储 PipelineModule 中每个阶段的类型信息和参数。例如
nn.Sequence( torch.nn.Linear(self.in_dim, self.hidden_dim, bias=False), torch.nn.Linear(self.hidden_hidden, self.out_dim) )
变为
layer_specs = [ LayerSpec(torch.nn.Linear, self.in_dim, self.hidden_dim, bias=False), LayerSpec(torch.nn.Linear, self.hidden_hidden, self.out_dim)] ]
- class deepspeed.pipe.TiedLayerSpec(key, typename, *module_args, forward_fn=None, tied_weight_attr=['weight'], **module_kwargs)[source]
- class deepspeed.runtime.pipe.ProcessTopology(axes, dims)[source]
管理将 n 维笛卡尔坐标映射到线性索引。此映射用于将进程的排名映射到各种并行形式的网格。
张量的每个轴都由其名称访问。提供的轴顺序定义了拓扑的布局。ProcessTopology 使用张量轴的“行优先”布局,因此 axes=[‘x’, ‘y’] 将坐标 (x,y) 和 (x,y+1) 映射到相邻的线性索引。如果改为使用 axes=[‘y’, ‘x’],则坐标 (x,y) 和 (x+1,y) 将是相邻的。
某些方法返回 ProcessCoord 具名元组。
- get_rank(**coord_kwargs)[source]
通过进程的坐标返回进程的全局排名。
坐标指定为 kwargs。例如
>>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3]) >>> X.get_rank(x=0, y=1) 1
- get_rank_repr(rank, omit_axes=['data', 'pipe'], inner_sep='_', outer_sep='-')[source]
返回排名的字符串表示。
此方法主要用于检查点模型数据。
- 例如
>>> topo = Topo(axes=['a', 'b'], dims=[2, 2]) >>> topo.get_rank_repr(rank=3) 'a_01-b_01' >>> topo.get_rank_repr(rank=3, omit_axes=['a']) 'b_01'
- 参数
rank (int) – 拓扑中的排名。
omit_axes (list, optional) – 应该不在表示中的轴。默认为 [‘data’, ‘pipe’]。
inner_sep (str, optional) – [描述]。默认为 ‘_’。
outer_sep (str, optional) – [描述]。默认为 ‘-‘。
- 返回
由
rank
拥有的坐标的字符串表示。- 返回类型
str
- get_dim(axis)[source]
返回给定轴上的进程数。
- 例如
>>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3]) >>> X.get_dim('y') 3
- get_coord(rank)[source]
返回由进程排名拥有的坐标。
返回的具名元组的轴可以直接作为成员访问。对于 .. rubric:: 示例
>>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3]) >>> coord = X.get_coord(rank=1) >>> coord.x 0 >>> coord.y 1
- get_axis_comm_lists(axis)[source]
构建适合沿轴
axis
的通信器组的列表。示例
>>> topo = Topo(axes=['pipe', 'data', 'model'], dims=[2, 2, 2]) >>> topo.get_axis_comm_lists('pipe') [ [0, 4], # data=0, model=0 [1, 5], # data=0, model=1 [2, 6], # data=1, model=0 [3, 7], # data=1, model=1 ]
- 返回
坐标在所有轴上匹配的列表列表,除
axis
之外。
- filter_match(**filter_kwargs)[source]
返回其坐标与提供条件匹配的排名列表。
示例
>>> X = ProcessTopology(axes=['pipe', 'data', 'model'], dims=[2, 2, 2]) >>> X.filter_match(pipe=0, data=1) [2, 3] >>> [X.get_coord(rank) for rank in X.filter_match(pipe=0, data=1)] [ProcessCoord(pipe=0, data=1, model=0), ProcessCoord(pipe=0, data=1, model=1)]
- 参数
**filter_kwargs (dict) – 用于选择坐标的条件。
- 返回
坐标与 filter_kwargs 匹配的排名列表。
培训
- class deepspeed.runtime.pipe.engine.PipelineEngine(has_bool_tensors=False, *super_args, **super_kwargs)[source]
混合管道、数据和模型并行训练的训练引擎。
此引擎由
deepspeed.initialize()
创建,当提供PipelineModule
时。- reset_activation_shape()[source]
当激活和梯度的形状发生变化时,重置缓冲区。例如,对于更改每个样本 seqlen 的课程学习,我们需要在 seqlen 将要改变时调用它。
- train_batch(data_iter=None)[source]
推进管道以训练下一批数据。该引擎将总共接收
self.train_batch_size()
个样本,这些样本是在所有工作程序中共同收集的。除非
deepspeed.initialize()
提供了训练集,否则应将迭代训练数据的迭代器作为参数提供。在这种情况下,将自动读取训练数据。警告
每个管道将从
data_iter
中提取self.gradient_accumulation_steps()
个条目。data_iter
中必须有足够的数据,否则StopIteration
将停止训练。DeepSpeed 提供了一个便利类
deepspeed.utils.RepeatingLoader
,它包装数据加载器以在StopIteration
时自动重启。- 参数
data_iter (Iterator, optional) – 训练数据的迭代器。
- 返回
本批计算的损失的算术平均值。
- eval_batch(data_iter, return_logits=False, compute_loss=True, reduce_output='avg', bcast_loss=True, num_micro_batches=None)[source]
在来自
data_iter
的一批数据上评估管道。该引擎将总共评估self.train_batch_size()
个样本,这些样本是在所有工作程序中共同收集的。此方法等效于
module.eval() with torch.no_grad(): output = module(batch)
警告
每个管道将从
data_iter
中提取self.gradient_accumulation_steps()
个条目。data_iter
中必须有足够的数据,否则StopIteration
将停止训练。DeepSpeed 提供了一个便利类
deepspeed.utils.RepeatingLoader
,它包装数据加载器以在StopIteration
时自动重启。- 参数
data_iter (Iterator) – 要评估的数据的迭代器。
- 返回
本批计算的损失的算术平均值。
- set_train_batch_size(train_batch_size)[source]
通过增加或减少微批次的数量(即梯度累积步数)来调整全局批次大小。每个微批次的大小(即
train_micro_batch_size_per_gpu
)不会改变。:param train_batch_size: 训练的新全局批次大小。 :type train_batch_size: int- 引发
ValueError – 如果
train_batch_size
不能被配置的微批次大小和数据并行度整除。
- is_gradient_accumulation_boundary()[source]
如果引擎正在执行梯度减少或优化器步骤指令,则为 True。
此方法从
DeepSpeedEngine
中覆盖,以在管道引擎被指示执行时强制进行减少和步骤。- 返回
是否应该进行减少和优化器步骤。
- 返回类型
bool
扩展管道并行性
- class deepspeed.runtime.pipe.schedule.PipeSchedule(micro_batches, stages, stage_id)[source]
通过生成
PipeInstruction
序列来指导管道引擎的执行。调度程序是生成器,它会生成
PipeInstruction
序列来处理一个批次中的微批次。每个生成的步骤都是原子的,这意味着可以在连续的步骤之间放置一个屏障同步,而不会出现死锁。下面是一个实现具有梯度累积的数据并行的示例调度程序
class DataParallelSchedule(PipeSchedule): def steps(self): for step_id in range(self.micro_batches): cmds = [ LoadMicroBatch(buffer_id=0), ForwardPass(buffer_id=0), BackwardPass(buffer_id=0), ] if step_id == self.micro_batches - 1: cmds.extend([ ReduceGrads(), OptimizerStep(), ]) yield cmds def num_pipe_buffers(self): return 1
- 参数
micro_batches (int) – 构成一个批次的微批次数量。
stages (int) – 管道阶段的数量。
stage_id (int) – 将执行生成的调度的管道阶段。
- abstract steps()[source]
为调度中的每个步骤生成
PipeInstruction
列表。注意
调度必须实现
steps()
来定义调度。- 返回
要作为管道的一个步骤执行的指令
- num_pipe_buffers()[source]
此阶段将使用的管道缓冲区数量。
注意
调度程序应专门化
num_pipe_buffers()
以在规模上节省内存。- 返回
引擎要分配的缓冲区数量。
- property stage
用于配置此调度的阶段索引。
- property num_stages
用于配置此调度的总管道阶段数。
- property num_micro_batches
用于配置此调度的总微批次数量。
- property is_first_stage
如果配置的
stage_id
是管道中的第一个阶段,则为 True。
- property is_last_stage
如果配置的
stage_id
是管道中的最后一个阶段,则为 True。
- class deepspeed.runtime.pipe.schedule.InferenceSchedule(micro_batches, stages, stage_id)[source]
使用管道并行性推断批次的调度程序。
- class deepspeed.runtime.pipe.schedule.TrainSchedule(micro_batches, stages, stage_id)[source]
使用混合并行性训练一个批次的调度程序。
管道并行性是通过梯度累积提取的,因此收敛遵循具有相同批次大小的数据并行方法的收敛。
- class deepspeed.runtime.pipe.schedule.DataParallelSchedule(micro_batches, stages, stage_id)[source]
使用传统的具有梯度累积的数据并行性进行训练的示例调度程序。
- class deepspeed.runtime.pipe.schedule.PipeInstruction(**kwargs)[source]
所有管道引擎执行指令的基类。
所有关键字参数都作为成员存储,类似于
namedtuple
。这些成员在执行期间可以被PipeEngine
访问。- 参数
kwargs (可选) – 作为成员存储的关键字参数
- class deepspeed.runtime.pipe.schedule.OptimizerStep(**kwargs)[source]
使用优化器执行一步操作并归零梯度。
注意
应在
ReduceGrads
和ReduceTiedGrads
之后发出。注意
可以是数据并行进程之间的同步点。
- class deepspeed.runtime.pipe.schedule.ReduceTiedGrads(**kwargs)[source]
在一个管道并行组内,归约计算出的绑定模块的梯度。
警告
直到模型在管道阶段之间进行分区,该同步点包含的阶段才可知。在最坏的情况下,它包含所有管道阶段。应谨慎安排此指令以避免死锁。
- class deepspeed.runtime.pipe.schedule.BufferOpInstruction(buffer_id, **kwargs)[source]
对管道缓冲区进行操作的管道指令。
- 参数
buffer_id (int) – 要修改的管道缓冲区索引。
- class deepspeed.runtime.pipe.schedule.LoadMicroBatch(buffer_id, **kwargs)[source]
将微批次加载到缓冲区中。
大致
buffers['inputs'][buffer_id] = next(data_iter)
- class deepspeed.runtime.pipe.schedule.ForwardPass(buffer_id, **kwargs)[source]
计算前向传播。
大致
buffers['outputs'][buffer_id] = forward(buffers['inputs'][buffer_id])
- class deepspeed.runtime.pipe.schedule.BackwardPass(buffer_id, **kwargs)[source]
计算反向传播并累积梯度。
大致
outputs = buffers['outputs'][buffer_id] gradients = buffers['gradients'][buffer_id] torch.autograd.backward(tensors=outputs, grad_tensors=gradients)
- class deepspeed.runtime.pipe.schedule.SendActivation(buffer_id, **kwargs)[source]
将激活发送到管道中的下一个阶段。
大致
send(buffers['outputs'][buffer_id])
注意
通信是阻塞的,必须与下一个管道阶段上的
RecvActivation
配合使用以避免死锁。
- class deepspeed.runtime.pipe.schedule.RecvActivation(buffer_id, **kwargs)[source]
从管道中的上一个阶段接收激活。
大致
buffers['inputs'][buffer_id] = recv()
注意
通信是阻塞的,必须与上一个管道阶段上的
SendActivation
配合使用以避免死锁。