Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/en/get_started/quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -580,3 +580,30 @@ miles has been deeply optimized for distributed training of large-scale Mixture
- [Example: 64xH100 Training GLM-4.5](../examples/glm4.5-355B-A32B.md)
- [Example: 128xH100 Training DeepSeek-R1](../examples/deepseek-r1.md)
- The scripts such as `scripts/run_qwen3_30b_a3b.py`, `scripts/run_glm45_355b_a32b.py` also support multi-node training, though there are little documentations about it currently.


## Verification of Router Health Check

This verifies the **Background Health Check** and its "Strike System".

```bash
# Start training with the Miles Router and a 10-second health check interval
python train.py \
${MODEL_ARGS[@]} \
--hf-checkpoint /root/GLM-Z1-9B-0414 \
--load /root/GLM-Z1-9B-0414_torch_dist \
--prompt-data /root/dapo-math-17k/dapo-math-17k.jsonl \
--input-key prompt \
--label-key label \
--apply-chat-template \
--rm-type deepscaler \
--use-miles-router \
--rollout-health-check-interval 10 \
--rollout-batch-size 16 \
--global-batch-size 16 \
--num-rollout 10 \
--colocate
```

The router monitors worker health silently. On the 3rd consecutive failure, it logs a `WARNING` and stops routing traffic to that worker. It logs an `INFO` message only when a worker recovers.

77 changes: 70 additions & 7 deletions miles/router/router.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import argparse
import asyncio
import json
import logging

import httpx
import uvicorn
Expand All @@ -9,6 +11,8 @@

from miles.utils.misc import load_function

logger = logging.getLogger(__name__)


def run_router(args):
"""
Expand All @@ -28,9 +32,11 @@ def __init__(self, args, verbose=False):
self.verbose = verbose

self.app = FastAPI()
self.app.add_event_handler("startup", self._start_background_health_check)

# Worker information
self.worker_urls: dict[str, int] = {}
self.worker_failure_counts: dict[str, int] = {}
self.max_weight_version = None

max_connections = getattr(args, "miles_router_max_connections", None)
Expand Down Expand Up @@ -63,9 +69,60 @@ def _setup_routes(self):
# Catch-all route for proxying to SGLang - must be registered LAST
self.app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])(self.proxy)

async def health_check(self, request: Request):
# TODO: do health check in background
pass
async def _start_background_health_check(self):
asyncio.create_task(self._health_check_loop())

async def _check_worker_health(self, url):
"""Encapsulated health check logic for better maintainability."""
try:
response = await self.client.get(f"{url}/health", timeout=5.0)
if response.status_code == 200:
return url, True
logger.debug(f"[miles-router] Worker {url} is unhealthy (Status: {response.status_code})")
except Exception as e:
logger.debug(f"[miles-router] Worker {url} health check failed: {e}")
return url, False

async def _health_check_loop(self):
interval = self.args.rollout_health_check_interval

while True:
try:
await asyncio.sleep(interval)

urls = list(self.worker_urls.keys())
if not urls:
continue

# Concurrent execution using a generator expression
results = await asyncio.gather(*(self._check_worker_health(url) for url in urls))

for url, is_healthy in results:
prev_failures = self.worker_failure_counts.get(url, 0)

if is_healthy:
if prev_failures >= 3:
logger.info(f"[miles-router] Worker {url} has recovered and is now back in the pool.")
self.worker_failure_counts[url] = 0
else:
new_failures = prev_failures + 1
self.worker_failure_counts[url] = new_failures

if new_failures == 3:
logger.warning(
f"[miles-router] Worker {url} failed 3 consecutive health checks. Stopping traffic."
)

logger.debug("[miles-router] Health check complete.")

except asyncio.CancelledError:
# Normal shutdown
raise
except Exception as e:
# Prevent background task from dying silently
logger.error(f"[miles-router] Unexpected error in health check loop: {e}", exc_info=True)
# Brief backoff before restarting the loop
await asyncio.sleep(5)

async def proxy(self, request: Request, path: str):
"""Proxy all other requests to the SGLang router"""
Expand Down Expand Up @@ -126,6 +183,7 @@ async def add_worker(self, request: Request):
# Add if new, keep a simple request count per worker
if worker_url not in self.worker_urls:
self.worker_urls[worker_url] = 0
self.worker_failure_counts[worker_url] = 0
if self.verbose:
print(f"[miles-router] Added new worker: {worker_url}")

Expand Down Expand Up @@ -158,11 +216,16 @@ async def retrieve_from_text(self, request: Request):
return result

def _use_url(self):
"""Select a worker URL using round-robin strategy"""
assert len(self.worker_urls) > 0, "No workers available"
"""Select a worker URL using round-robin strategy, excluding unhealthy workers."""
# Filter for healthy workers (those with less than 3 consecutive failures)
healthy_urls = {
url: count for url, count in self.worker_urls.items() if self.worker_failure_counts.get(url, 0) < 3
}

assert len(healthy_urls) > 0, "No healthy workers available in the pool"

# get the url with mininal count
url = min(self.worker_urls, key=self.worker_urls.get)
# get the url with mininal count among healthy workers
url = min(healthy_urls, key=healthy_urls.get)
self.worker_urls[url] += 1
return url

Expand Down