-
Notifications
You must be signed in to change notification settings - Fork 122
[Do Not Merge]: adding diffuser router for diffusion RL #544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
d0cceb8
46a2e57
d39899d
a3c34a5
1d93451
fb4570a
ad9d879
73b065e
930fcbf
079e7c2
6111a94
749fdd0
50745a3
385805d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,69 @@ | ||||||
| # Miles Diffusion Router | ||||||
|
|
||||||
| Load-balances requests across multiple `sglang-diffusion` worker instances using least-request routing with background health checks and worker quarantine. | ||||||
|
|
||||||
| ## Quick Start | ||||||
|
|
||||||
| ```bash | ||||||
| # Start the router with two diffusion backends | ||||||
| python examples/diffusion_router/demo.py --port 30080 \ | ||||||
| --worker-urls http://localhost:10090 http://localhost:10091 | ||||||
|
|
||||||
| # Or start empty and add workers dynamically | ||||||
| python examples/diffusion_router/demo.py --port 30080 | ||||||
| curl -X POST 'http://localhost:30080/add_worker?url=http://localhost:10090' | ||||||
| ``` | ||||||
|
|
||||||
| ## API Endpoints | ||||||
|
|
||||||
| | Method | Path | Description | | ||||||
| |--------|------|-------------| | ||||||
| | `POST` | `/generate` | Image generation (forwards to `/v1/images/generations`) | | ||||||
| | `POST` | `/generate_video` | Video generation (forwards to `/v1/videos/generations`) | | ||||||
| | `GET` | `/health` | Aggregated router health | | ||||||
| | `GET` | `/health_workers` | Per-worker health and load info | | ||||||
| | `POST` | `/add_worker` | Register a diffusion worker (`?url=...` or JSON body) | | ||||||
| | `GET` | `/list_workers` | List registered workers | | ||||||
| | `POST` | `/update_weights_from_disk` | Broadcast weight reload to all workers | | ||||||
| | `*` | `/{path}` | Catch-all proxy to least-loaded worker | | ||||||
|
|
||||||
| ## Load Balancing | ||||||
|
|
||||||
| The router uses a **least-request** strategy: each incoming request is forwarded to the worker with the fewest in-flight requests. This is workload-aware and avoids hot-spotting compared to round-robin. When a request completes, the worker's count is decremented, keeping the load state accurate in real time. | ||||||
|
|
||||||
| Workers that fail consecutive health checks (default: 3) are quarantined and excluded from the routing pool. A background loop pings each worker's `GET /health` endpoint at a configurable interval (default: 10s). | ||||||
|
||||||
|
|
||||||
| ## Notes | ||||||
|
|
||||||
| - Health check endpoint follows Miles/SGLang convention: `GET /health`. | ||||||
| - Responses are fully buffered; streaming and large-response handling are not supported yet (planned for a follow-up PR). | ||||||
|
|
||||||
| ## Example Requests | ||||||
|
|
||||||
| ```bash | ||||||
| # Check health | ||||||
| curl http://localhost:30080/health | ||||||
|
|
||||||
| # Generate an image | ||||||
| curl -X POST http://localhost:30080/generate \ | ||||||
| -H 'Content-Type: application/json' \ | ||||||
| -d '{"model": "stabilityai/stable-diffusion-3", "prompt": "a cat", "n": 1, "size": "1024x1024"}' | ||||||
|
|
||||||
| # Reload weights on all workers | ||||||
| curl -X POST http://localhost:30080/update_weights_from_disk \ | ||||||
| -H 'Content-Type: application/json' \ | ||||||
| -d '{"model_path": "/path/to/new/weights"}' | ||||||
| ``` | ||||||
|
|
||||||
| ## CLI Options | ||||||
|
|
||||||
| ``` | ||||||
| --host Bind address (default: 0.0.0.0) | ||||||
| --port Port (default: 30080) | ||||||
| --worker-urls Initial worker URLs | ||||||
| --max-connections Max concurrent connections (default: 100) | ||||||
| --timeout Request timeout in seconds | ||||||
|
||||||
| --timeout Request timeout in seconds | |
| --timeout Request timeout in seconds for router-to-worker requests |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| """ | ||
| Demo script for the Miles Diffusion Router. | ||
|
|
||
| Starts a diffusion router that load-balances requests across multiple | ||
| sglang-diffusion worker instances using least-request routing. | ||
|
|
||
| Usage: | ||
| # Start with no workers (add them dynamically via /add_worker): | ||
| python examples/diffusion_router/demo.py --port 30080 | ||
|
|
||
| # Start with pre-registered workers: | ||
| python examples/diffusion_router/demo.py --port 30080 \ | ||
| --worker-urls http://localhost:10090 http://localhost:10091 | ||
|
|
||
| # Then interact: | ||
| curl http://localhost:30080/health | ||
| curl -X POST 'http://localhost:30080/add_worker?url=http://localhost:10092' | ||
| curl http://localhost:30080/list_workers | ||
| curl -X POST http://localhost:30080/generate -H 'Content-Type: application/json' \ | ||
| -d '{"model": "stabilityai/stable-diffusion-3", "prompt": "a cat", "n": 1, "size": "1024x1024"}' | ||
| """ | ||
|
|
||
| import argparse | ||
|
|
||
| import uvicorn | ||
|
|
||
| from miles.router.diffusion_router import DiffusionRouter | ||
|
|
||
|
|
||
| def main(): | ||
| parser = argparse.ArgumentParser(description="Miles Diffusion Router Demo") | ||
| parser.add_argument("--host", type=str, default="0.0.0.0", help="Router bind address") | ||
| parser.add_argument("--port", type=int, default=30080, help="Router port") | ||
| parser.add_argument("--worker-urls", nargs="*", default=[], help="Initial diffusion worker URLs") | ||
| parser.add_argument("--max-connections", type=int, default=100, help="Max concurrent connections to workers") | ||
| parser.add_argument("--timeout", type=float, default=None, help="Request timeout in seconds") | ||
| parser.add_argument("--health-check-interval", type=int, default=10, help="Seconds between health checks") | ||
| parser.add_argument("--health-check-failure-threshold", type=int, default=3, help="Failures before quarantine") | ||
| parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") | ||
| args = parser.parse_args() | ||
|
|
||
| router = DiffusionRouter(args, verbose=args.verbose) | ||
|
|
||
| # Pre-register any workers specified on the command line | ||
| for url in args.worker_urls: | ||
| router.worker_request_counts[url] = 0 | ||
| router.worker_failure_counts[url] = 0 | ||
zhaochenyang20 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if args.verbose: | ||
| print(f"[demo] Pre-registered worker: {url}") | ||
|
|
||
| print(f"[demo] Starting diffusion router on {args.host}:{args.port}") | ||
| print(f"[demo] Workers: {list(router.worker_request_counts.keys()) or '(none — add via POST /add_worker)'}") | ||
| uvicorn.run(router.app, host=args.host, port=args.port, log_level="info") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the API Endpoints table, the method for the catch-all proxy is listed as
*. This is not a standard HTTP method. For clarity, you should specifyANYor list the actual methods supported by the implementation, which areGET, POST, PUT, DELETE.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be changed as Gemini suggested.