Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0cc42a7
feat(dumper): add SGLANG_DUMPER_CLEANUP env var to auto-remove old dumps
fzyzcjy Feb 19, 2026
3726326
more
fzyzcjy Feb 19, 2026
1d85c3a
more
fzyzcjy Feb 19, 2026
bc0a29c
feat(dumper): add watchdog timeout for collective communication ops
fzyzcjy Feb 19, 2026
c2ec469
test(dumper): add tests for collective communication timeout watchdog
fzyzcjy Feb 19, 2026
c45a0d6
more
fzyzcjy Feb 19, 2026
ff36758
refactor(test): extract _capture_stdout context manager for stdout re…
fzyzcjy Feb 19, 2026
ef77c2d
refactor(test): use nonlocal instead of list, add debug prints for ca…
fzyzcjy Feb 19, 2026
0c8dd0c
fmt
fzyzcjy Feb 19, 2026
294b10b
fix(test): use correct kwarg cleanup_previous instead of needs_cleanup
fzyzcjy Feb 19, 2026
3c5b578
Merge branch 'ac8398/0' into ac8398/1
fzyzcjy Feb 19, 2026
3079303
refactor(dumper): split SGLANG_DUMPER_WRITE_FILE into OUTPUT_FILE and…
fzyzcjy Feb 19, 2026
73e98bc
refactor(test): merge output file and console tests into symmetric Te…
fzyzcjy Feb 19, 2026
279749a
remove redundant distributed test_write_disabled, covered by TestOutp…
fzyzcjy Feb 19, 2026
681c86e
feat(dumper): add capture_output() context manager for in-memory dump…
fzyzcjy Feb 19, 2026
e829bfe
test(dumper): verify capture_output respects filter
fzyzcjy Feb 19, 2026
ff729d1
refactor(dumper): extract _DumperConfig frozen dataclass
fzyzcjy Feb 19, 2026
a0a6f1c
Revert "refactor(dumper): extract _DumperConfig frozen dataclass"
fzyzcjy Feb 19, 2026
96493cb
feat(dumper): support KV filtering on name, extra_kwargs, and global_ctx
fzyzcjy Feb 20, 2026
bb4dfaf
style(dumper): collapse double-if filter check into single condition
fzyzcjy Feb 20, 2026
9002a4a
refactor(dumper): pre-compute user_kwargs at grad hook registration time
fzyzcjy Feb 20, 2026
6a742c9
rename(dumper): user_kwargs -> tags
fzyzcjy Feb 20, 2026
9e2592b
rename(dumper): _format_kwargs -> _format_tags
fzyzcjy Feb 20, 2026
8dc3443
fmt
fzyzcjy Feb 20, 2026
e7cf6c3
Merge branch 'main' into ac8398/3
fzyzcjy Feb 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(dumper): add watchdog timeout for collective communication ops
Collective operations (broadcast_object_list, all_gather_object) hang
silently when not all ranks participate. Add a configurable timeout
(default 60s) that prints a warning if a collective op doesn't complete,
helping users diagnose missing rank participation.
  • Loading branch information
fzyzcjy committed Feb 19, 2026
commit bc0a29cba2c5a17ff1fd3cefda50389481a52e51
53 changes: 45 additions & 8 deletions python/sglang/srt/debug_utils/dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
partial_name: Optional[str] = None,
enable_http_server: bool = True,
cleanup_previous: bool = False,
collective_timeout: int = 60,
):
# Config
self._enable = enable
Expand All @@ -68,6 +69,7 @@ def __init__(
self._enable_grad = enable_grad
self._enable_model_value = enable_model_value
self._enable_model_grad = enable_model_grad
self._collective_timeout = collective_timeout

# States
self._partial_name = partial_name
Expand Down Expand Up @@ -96,6 +98,7 @@ def from_env(cls) -> "_Dumper":
"SGLANG_ENABLE_DUMPER_HTTP_SERVER", "1"
),
cleanup_previous=get_bool_env_var("SGLANG_DUMPER_CLEANUP_PREVIOUS", "0"),
collective_timeout=60,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The collective_timeout is hardcoded to 60 seconds. While this is a reasonable default, it would be more flexible to allow users to configure this value through an environment variable, similar to other settings. This is particularly useful in environments with slower networks or for debugging complex distributed scenarios where collectives might take longer than expected.

Suggested change
collective_timeout=60,
collective_timeout=get_int_env_var("SGLANG_DUMPER_COLLECTIVE_TIMEOUT", 60),

)

def on_forward_pass_start(self):
Expand All @@ -119,11 +122,11 @@ def _ensure_http_server(self):
if self._http_server_handled:
return
self._http_server_handled = True
_start_maybe_http_server(self)
_start_maybe_http_server(self, timeout_seconds=self._collective_timeout)

def _ensure_partial_name(self):
if self._partial_name is None:
self._partial_name = _get_partial_name()
self._partial_name = _get_partial_name(timeout_seconds=self._collective_timeout)
print(f"[Dumper] Choose partial_name={self._partial_name}")

def set_ctx(self, **kwargs):
Expand Down Expand Up @@ -331,11 +334,37 @@ def _torch_save(value, path: str):
print(f"[Dumper] Observe error={e} when saving data, skip the tensor")


def _get_partial_name():
def _collective_with_timeout(fn, operation_name: str, timeout_seconds: int = 60):
completed = threading.Event()

def watchdog():
if not completed.wait(timeout=timeout_seconds):
print(
f"\n[Dumper] WARNING: '{operation_name}' has not completed after "
f"{timeout_seconds}s. This usually means not all ranks are "
f"participating in this collective operation.\n",
flush=True,
)

thread = threading.Thread(target=watchdog, daemon=True)
thread.start()
try:
return fn()
finally:
completed.set()


def _get_partial_name(timeout_seconds: int = 60):
rank = _get_rank()
object_list = [str(time.time()) if rank == 0 else None]

if dist.is_initialized():
dist.broadcast_object_list(object_list, device="cuda")
_collective_with_timeout(
lambda: dist.broadcast_object_list(object_list, device="cuda"),
operation_name="broadcast_object_list in _get_partial_name",
timeout_seconds=timeout_seconds,
)

return object_list[0]


Expand Down Expand Up @@ -494,14 +523,16 @@ def _collect_megatron_parallel_info():
# -------------------------------------- http control server ------------------------------------------


def _start_maybe_http_server(dumper):
def _start_maybe_http_server(dumper, timeout_seconds: int = 60):
http_port = get_int_env_var("SGLANG_DUMPER_SERVER_PORT", 40000)
zmq_base_port = get_int_env_var("SGLANG_DUMPER_ZMQ_BASE_PORT", 16800)
if http_port <= 0:
return

local_handler = _DumperRpcHandler(dumper)
rpc_handles = _create_zmq_rpc_handles(local_handler, base_port=zmq_base_port)
rpc_handles = _create_zmq_rpc_handles(
local_handler, base_port=zmq_base_port, timeout_seconds=timeout_seconds
)

if _get_rank() == 0:
handler_class = _make_dumper_http_handler(rpc_handles=rpc_handles)
Expand Down Expand Up @@ -549,7 +580,9 @@ def set_enable(self, enable: bool):
# -------------------------------------- zmq rpc ------------------------------------------


def _create_zmq_rpc_handles(handler, base_port: int) -> Optional[List["_ZmqRpcHandle"]]:
def _create_zmq_rpc_handles(
handler, base_port: int, timeout_seconds: int = 60
) -> Optional[List["_ZmqRpcHandle"]]:
import zmq

rank = _get_rank()
Expand Down Expand Up @@ -578,7 +611,11 @@ def serve_loop():

if dist.is_initialized():
all_addresses = [None] * world_size
dist.all_gather_object(all_addresses, local_addr)
_collective_with_timeout(
lambda: dist.all_gather_object(all_addresses, local_addr),
operation_name="all_gather_object in _create_zmq_rpc_handles",
timeout_seconds=timeout_seconds,
)
else:
all_addresses = [local_addr]
print(f"[Dumper.ZmqRpc] rank={rank} all_addresses={all_addresses}")
Expand Down