Skip to content

Commit 7434e9a

Browse files
add background health check to miles native router (#260)
Co-authored-by: zhaochenyang20 <zhaochen20@outlook.com>
1 parent 47a5bdf commit 7434e9a

File tree

2 files changed

+94
-18
lines changed

2 files changed

+94
-18
lines changed

miles/router/router.py

Lines changed: 88 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import argparse
2+
import asyncio
23
import json
4+
import logging
35

46
import httpx
57
import uvicorn
@@ -9,6 +11,8 @@
911

1012
from miles.utils.misc import load_function
1113

14+
logger = logging.getLogger(__name__)
15+
1216

1317
def run_router(args):
1418
"""
@@ -28,9 +32,14 @@ def __init__(self, args, verbose=False):
2832
self.verbose = verbose
2933

3034
self.app = FastAPI()
31-
32-
# Worker information
33-
self.worker_urls: dict[str, int] = {}
35+
self.app.add_event_handler("startup", self._start_background_health_check)
36+
37+
# URL -> Active Request Count (load state)
38+
self.worker_request_counts: dict[str, int] = {}
39+
# URL -> Consecutive Failures
40+
self.worker_failure_counts: dict[str, int] = {}
41+
# Quarantined workers excluded from routing pool
42+
self.dead_workers: set[str] = set()
3443
self.max_weight_version = None
3544

3645
max_connections = getattr(args, "miles_router_max_connections", None)
@@ -63,9 +72,61 @@ def _setup_routes(self):
6372
# Catch-all route for proxying to SGLang - must be registered LAST
6473
self.app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])(self.proxy)
6574

66-
async def health_check(self, request: Request):
67-
# TODO: do health check in background
68-
pass
75+
async def _start_background_health_check(self):
76+
asyncio.create_task(self._health_check_loop())
77+
78+
async def _check_worker_health(self, url):
79+
"""Encapsulated health check logic for better maintainability."""
80+
try:
81+
response = await self.client.get(f"{url}/health", timeout=5.0)
82+
if response.status_code == 200:
83+
return url, True
84+
logger.debug(f"[miles-router] Worker {url} is unhealthy (Status: {response.status_code})")
85+
except Exception as e:
86+
logger.debug(f"[miles-router] Worker {url} health check failed: {e}")
87+
return url, False
88+
89+
async def _health_check_loop(self):
90+
"""Background loop to monitor worker health and adjust routing pool."""
91+
interval = self.args.rollout_health_check_interval
92+
threshold = self.args.miles_router_health_check_failure_threshold
93+
94+
while True:
95+
try:
96+
await asyncio.sleep(interval)
97+
98+
urls = [u for u in self.worker_request_counts if u not in self.dead_workers]
99+
if not urls:
100+
continue
101+
102+
results = await asyncio.gather(*(self._check_worker_health(url) for url in urls))
103+
104+
for url, is_healthy in results:
105+
if not is_healthy:
106+
failures = self.worker_failure_counts.get(url, 0) + 1
107+
self.worker_failure_counts[url] = failures
108+
109+
if failures >= threshold:
110+
logger.warning(
111+
f"[miles-router] Worker {url} failed {threshold} consecutive health checks. Marking as DEAD."
112+
)
113+
self.dead_workers.add(url)
114+
# TODO (chenyang): Connect back 'dead' workers requires a mechanism to sync
115+
# model versions to avoid off-policy issues from stale weights, since these
116+
# dead workers' parameters may not be refitted.
117+
else:
118+
self.worker_failure_counts[url] = 0
119+
120+
logger.debug(
121+
f"[miles-router] Health check complete. {len(self.worker_request_counts) - len(self.dead_workers)} workers healthy."
122+
)
123+
124+
except asyncio.CancelledError:
125+
logger.warning("[miles-router] Background health check loop is being cancelled.")
126+
raise
127+
except Exception as e:
128+
logger.error(f"[miles-router] Unexpected error in health check loop: {e}", exc_info=True)
129+
await asyncio.sleep(5)
69130

70131
async def proxy(self, request: Request, path: str):
71132
"""Proxy all other requests to the SGLang router"""
@@ -124,16 +185,17 @@ async def add_worker(self, request: Request):
124185
)
125186

126187
# Add if new, keep a simple request count per worker
127-
if worker_url not in self.worker_urls:
128-
self.worker_urls[worker_url] = 0
188+
if worker_url not in self.worker_request_counts:
189+
self.worker_request_counts[worker_url] = 0
190+
self.worker_failure_counts[worker_url] = 0
129191
if self.verbose:
130192
print(f"[miles-router] Added new worker: {worker_url}")
131193

132-
return {"status": "success", "worker_urls": self.worker_urls}
194+
return {"status": "success", "worker_urls": self.worker_request_counts}
133195

134196
async def list_workers(self, request: Request):
135197
"""List all registered workers"""
136-
return {"urls": list(self.worker_urls.keys())}
198+
return {"urls": list(self.worker_request_counts.keys())}
137199

138200
async def retrieve_from_text(self, request: Request):
139201
"""Get token information from text input"""
@@ -158,19 +220,27 @@ async def retrieve_from_text(self, request: Request):
158220
return result
159221

160222
def _use_url(self):
161-
"""Select a worker URL using round-robin strategy"""
162-
assert len(self.worker_urls) > 0, "No workers available"
223+
"""Select worker URL with minimal active requests."""
224+
225+
if not self.dead_workers:
226+
# Healthy path: select from all workers
227+
url = min(self.worker_request_counts, key=self.worker_request_counts.get)
228+
else:
229+
# Degraded path: select from workers not in dead_workers
230+
valid_workers = (w for w in self.worker_request_counts if w not in self.dead_workers)
231+
try:
232+
url = min(valid_workers, key=self.worker_request_counts.get)
233+
except ValueError:
234+
raise RuntimeError("No healthy workers available in the pool") from None
163235

164-
# get the url with mininal count
165-
url = min(self.worker_urls, key=self.worker_urls.get)
166-
self.worker_urls[url] += 1
236+
self.worker_request_counts[url] += 1
167237
return url
168238

169239
def _finish_url(self, url):
170240
"""Mark the request to the given URL as finished"""
171-
assert url in self.worker_urls, f"URL {url} not recognized"
172-
self.worker_urls[url] -= 1
173-
assert self.worker_urls[url] >= 0, f"URL {url} count went negative"
241+
assert url in self.worker_request_counts, f"URL {url} not recognized"
242+
self.worker_request_counts[url] -= 1
243+
assert self.worker_request_counts[url] >= 0, f"URL {url} count went negative"
174244

175245

176246
if __name__ == "__main__":

miles/utils/arguments.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,6 +919,12 @@ def add_router_arguments(parser):
919919
default=None,
920920
help="Max connections for MilesRouter HTTP client.",
921921
)
922+
parser.add_argument(
923+
"--miles-router-health-check-failure-threshold",
924+
type=int,
925+
default=3,
926+
help="Number of consecutive failures before marking a worker as unhealthy.",
927+
)
922928
RouterArgs.add_cli_args(parser, use_router_prefix=True, exclude_host_port=True)
923929
return parser
924930

0 commit comments

Comments
 (0)