From 20a409e9985b6f84461e39d28c6d202078a242e4 Mon Sep 17 00:00:00 2001 From: Max Yu <18641481+maxyu1115@users.noreply.github.com> Date: Sun, 26 Apr 2026 20:41:39 -0700 Subject: [PATCH 1/2] Fix BF16_Optimizer last-microbatch grad leak under ZeRO-1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In `DeepSpeedEngine._backward_epilogue`, `allreduce_gradients()` ran before `optimizer.backward_epilogue()`. For `BF16_Optimizer` (used when bf16 model + grad_accum_dtype=fp32 + ZeRO-1) without `immediate_grad_update`, this means the boundary microbatch's gradient is added to the rank-local fp32 accumulator *after* the cross-rank allreduce, so it is silently skipped from the average. Effect: each rank's fp32 buffer ends with fp32_buffer_rank_i = avg_ranks(sum_{m=0..ga-2} grad_m) + local_grad_{ga-1}_rank_i which biases the global gradient by `(world_size-1)/world_size * 1/ga_steps`. Because the bias scales with per-microbatch grad weight, training trajectories diverge depending on `per_device_train_batch_size` even with identical effective batch size — the symptom is loss/grad-norm curves that drift apart between otherwise-equivalent configs. Fix: call `optimizer.backward_epilogue()` *before* `allreduce_gradients()` so the boundary microbatch's grad is in the buffer being reduced. `exit_backward()` remains after the allreduce; it only manages backward-hook state and has no ordering dependency on the accumulator. This is a no-op for `DeepSpeedZeroOptimizer_Stage1And2` and Stage3, whose `backward_epilogue` does not mutate the reduction buffer (their grads are either on `param.grad` already populated by autograd or accumulated via inline backward hooks). It is also a no-op for `BF16_Optimizer` with `immediate_grad_update=true` because the per-param hooks already fill the fp32 buffer synchronously during backward. Signed-off-by: Max Yu <18641481+maxyu1115@users.noreply.github.com> --- deepspeed/runtime/engine.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index dac7aa10a7fb..a055f69fc7d8 100755 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -2470,12 +2470,21 @@ def _backward_prologue(self): def _backward_epilogue(self): self._stop_timers(self.engine_timers.backward_inner_timers) self._start_timers(self.engine_timers.backward_reduce_timers) + # Run optimizer.backward_epilogue() before allreduce so the boundary + # microbatch grad lands in the optimizer accumulator that gets reduced. + # BF16_Optimizer (without immediate_grad_update) accumulates into a + # separate fp32 buffer here; if allreduce ran first, the boundary + # microbatch grad would be added locally afterward and silently skipped + # by the cross-rank average. No-op for Stage1And2/Stage3 whose epilogues + # do not mutate the reduction buffer. + if isinstance(self.optimizer, ZeROOptimizer): + self.optimizer.backward_epilogue() + if self.enable_backward_allreduce and not self.inside_no_sync_ctxt: # Traditional code path that allreduces the module parameter grads self.allreduce_gradients() if isinstance(self.optimizer, ZeROOptimizer): - self.optimizer.backward_epilogue() self.optimizer.exit_backward() see_memory_usage("Engine after backward", force=self.memory_breakdown()) From d5e54e81200e2ce595aae1975037218b65f05c00 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 28 Apr 2026 02:45:40 -0700 Subject: [PATCH 2/2] fix #7985 for non-BF16 optimizer Signed-off-by: Masahiro Tanaka --- deepspeed/runtime/engine.py | 15 ++++---- tests/unit/v1/zero/test_zero.py | 63 +++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index a055f69fc7d8..94f662b66f3d 100755 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -2470,14 +2470,11 @@ def _backward_prologue(self): def _backward_epilogue(self): self._stop_timers(self.engine_timers.backward_inner_timers) self._start_timers(self.engine_timers.backward_reduce_timers) - # Run optimizer.backward_epilogue() before allreduce so the boundary - # microbatch grad lands in the optimizer accumulator that gets reduced. - # BF16_Optimizer (without immediate_grad_update) accumulates into a - # separate fp32 buffer here; if allreduce ran first, the boundary - # microbatch grad would be added locally afterward and silently skipped - # by the cross-rank average. No-op for Stage1And2/Stage3 whose epilogues - # do not mutate the reduction buffer. - if isinstance(self.optimizer, ZeROOptimizer): + # BF16_Optimizer (without immediate_grad_update) accumulates low + # precision grads into a separate fp32 buffer in backward_epilogue(). + # Run it before allreduce so the boundary microbatch is reduced. + bf16_optimizer = isinstance(self.optimizer, BF16_Optimizer) + if bf16_optimizer: self.optimizer.backward_epilogue() if self.enable_backward_allreduce and not self.inside_no_sync_ctxt: @@ -2485,6 +2482,8 @@ def _backward_epilogue(self): self.allreduce_gradients() if isinstance(self.optimizer, ZeROOptimizer): + if not bf16_optimizer: + self.optimizer.backward_epilogue() self.optimizer.exit_backward() see_memory_usage("Engine after backward", force=self.memory_breakdown()) diff --git a/tests/unit/v1/zero/test_zero.py b/tests/unit/v1/zero/test_zero.py index fb0e393dd5da..fde4a42bfb15 100644 --- a/tests/unit/v1/zero/test_zero.py +++ b/tests/unit/v1/zero/test_zero.py @@ -21,6 +21,7 @@ import deepspeed from deepspeed.runtime.engine import DeepSpeedEngine +from deepspeed.runtime.bf16_optimizer import BF16_Optimizer from deepspeed.runtime.zero.partition_parameters import ZeroParamStatus from deepspeed.utils.zero_to_fp32 import load_state_dict_from_zero_checkpoint from deepspeed.runtime.zero.utils import ZeRORuntimeException @@ -53,6 +54,68 @@ def dump_state_dict(model): print(f"{name} {param.data}") +class TestBF16OptimizerGradReduction(DistributedTest): + world_size = 2 + + def test_boundary_microbatch_grad_is_reduced(self): + if not get_accelerator().is_bf16_supported(): + pytest.skip("bfloat16 is not supported on this accelerator") + + class ScaleModel(Module): + + def __init__(self): + super().__init__() + self.weight = Parameter(torch.ones(4)) + + def forward(self, x): + return (self.weight * x).sum() + + config_dict = { + "train_micro_batch_size_per_gpu": 1, + "gradient_accumulation_steps": 2, + "zero_optimization": { + "stage": 1 + }, + "optimizer": { + "type": "Adam", + "params": { + "lr": 1e-3 + } + }, + "bf16": { + "enabled": True, + "immediate_grad_update": False, + }, + "data_types": { + "grad_accum_dtype": "fp32" + } + } + + model = ScaleModel() + engine, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) + assert isinstance(engine.optimizer, BF16_Optimizer) + + rank = dist.get_rank() + rank_offset = 18 * rank + inputs = [ + torch.tensor([1, 3, 5, 7], dtype=torch.bfloat16, device=engine.device) + rank_offset, + torch.tensor([11, 13, 15, 17], dtype=torch.bfloat16, device=engine.device) + rank_offset, + ] + for i, x in enumerate(inputs): + engine.set_gradient_accumulation_boundary(i == len(inputs) - 1) + engine.backward(engine(x)) + + grad = engine.optimizer.fp32_groups_gradients_flat[0].detach().clone() + expected = torch.tensor([15, 17, 19, 21], dtype=grad.dtype, device=grad.device) + torch.testing.assert_close(grad, expected) + + gathered_grads = [torch.zeros_like(grad) for _ in range(dist.get_world_size())] + dist.all_gather(gathered_grads, grad) + torch.testing.assert_close(gathered_grads[0], gathered_grads[1]) + + engine.destroy() + + @pytest.mark.parametrize("zero_stage", [1, 2, 3]) class TestZeroUnbalancedGradients(DistributedTest): world_size = 1