[Do Not Merge]: adding diffuser router for diffusion RL#544
[Do Not Merge]: adding diffuser router for diffusion RL#544sniper35 wants to merge 14 commits intoradixark:mainfrom
Conversation
Summary of ChangesHello @sniper35, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces the "Miles Diffusion Router," a new component designed to efficiently distribute requests across multiple Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new diffusion router for load-balancing requests across multiple worker instances. The implementation includes least-request routing, background health checks, and worker quarantine capabilities. The changes are well-structured, with a core router implementation, a demo script, and documentation.
My review focuses on a critical issue in the health check logic that prevents dead workers from being resurrected, which could impact high availability. I've also provided suggestions to improve code structure by reducing duplication and improving encapsulation, simplifying some logic for better readability, enhancing API consistency, and clarifying the documentation.
examples/diffusion_router/README.md
Outdated
| | `POST` | `/add_worker` | Register a diffusion worker (`?url=...` or JSON body) | | ||
| | `GET` | `/list_workers` | List registered workers | | ||
| | `POST` | `/update_weights_from_disk` | Broadcast weight reload to all workers | | ||
| | `*` | `/{path}` | Catch-all proxy to least-loaded worker | |
There was a problem hiding this comment.
In the API Endpoints table, the method for the catch-all proxy is listed as *. This is not a standard HTTP method. For clarity, you should specify ANY or list the actual methods supported by the implementation, which are GET, POST, PUT, DELETE.
| | `*` | `/{path}` | Catch-all proxy to least-loaded worker | | |
| | `GET, POST, PUT, DELETE` | `/{path}` | Catch-all proxy to least-loaded worker | |
There was a problem hiding this comment.
This should be changed as Gemini suggested.
examples/diffusion_router/README.md
Outdated
| --port Port (default: 30080) | ||
| --worker-urls Initial worker URLs | ||
| --max-connections Max concurrent connections (default: 100) | ||
| --timeout Request timeout in seconds |
There was a problem hiding this comment.
The description for the --timeout option is "Request timeout in seconds", which is ambiguous. It's not clear if this applies to client-to-router connections or router-to-worker connections. Based on the code, it's for router-to-worker requests. Please clarify this in the documentation for better user understanding.
| --timeout Request timeout in seconds | |
| --timeout Request timeout in seconds for router-to-worker requests |
miles/router/diffusion_router.py
Outdated
| try: | ||
| data = json.loads(content) | ||
| return JSONResponse(content=data, status_code=response.status_code, headers=resp_headers) | ||
| except Exception: |
There was a problem hiding this comment.
| worker_url = request.query_params.get("url") or request.query_params.get("worker_url") | ||
|
|
||
| if not worker_url: | ||
| body = await request.body() | ||
| try: | ||
| payload = json.loads(body) if body else {} | ||
| except json.JSONDecodeError: | ||
| return JSONResponse(status_code=400, content={"error": "Invalid JSON body"}) | ||
| worker_url = payload.get("url") or payload.get("worker_url") |
There was a problem hiding this comment.
The add_worker endpoint accepts both url and worker_url as parameter names, in both the query string and the JSON body. This can be confusing for API users. To improve clarity and consistency, it's best to stick to a single name. Since the documentation already uses url, I suggest removing support for worker_url.
worker_url = request.query_params.get("url")
if not worker_url:
body = await request.body()
try:
payload = json.loads(body) if body else {}
except json.JSONDecodeError:
return JSONResponse(status_code=400, content={"error": "Invalid JSON body"})
worker_url = payload.get("url")
miles/router/diffusion_router.py
Outdated
| if __name__ == "__main__": | ||
| parser = argparse.ArgumentParser(description="Miles Diffusion Router") | ||
| parser.add_argument("--host", type=str, default="0.0.0.0") | ||
| parser.add_argument("--port", type=int, default=30080) | ||
| parser.add_argument("--worker-urls", nargs="*", default=[], help="Initial worker URLs to register") | ||
| parser.add_argument("--max-connections", type=int, default=100) | ||
| parser.add_argument("--timeout", type=float, default=None) | ||
| parser.add_argument("--health-check-interval", type=int, default=10) | ||
| parser.add_argument("--health-check-failure-threshold", type=int, default=3) | ||
| parser.add_argument("--verbose", action="store_true") | ||
| args = parser.parse_args() | ||
|
|
||
| router = DiffusionRouter(args, verbose=args.verbose) | ||
| for url in args.worker_urls: | ||
| router.worker_request_counts[url] = 0 | ||
| router.worker_failure_counts[url] = 0 | ||
|
|
||
| uvicorn.run(router.app, host=args.host, port=args.port, log_level="info") |
There was a problem hiding this comment.
The if __name__ == "__main__" block contains logic for argument parsing and pre-registering workers that is nearly identical to the logic in examples/diffusion_router/demo.py. This code duplication makes maintenance harder.
Furthermore, this block (and the demo script) directly manipulates the router's internal state (worker_request_counts, worker_failure_counts), which breaks encapsulation.
Consider removing this __main__ block to establish diffusion_router.py as a pure library module and rely on demo.py as the single executable example. This would improve code structure and maintainability.
examples/diffusion_router/README.md
Outdated
| | `POST` | `/add_worker` | Register a diffusion worker (`?url=...` or JSON body) | | ||
| | `GET` | `/list_workers` | List registered workers | | ||
| | `POST` | `/update_weights_from_disk` | Broadcast weight reload to all workers | | ||
| | `*` | `/{path}` | Catch-all proxy to least-loaded worker | |
There was a problem hiding this comment.
This should be changed as Gemini suggested.
examples/diffusion_router/README.md
Outdated
|
|
||
| The router uses a **least-request** strategy: each incoming request is forwarded to the worker with the fewest in-flight requests. This is workload-aware and avoids hot-spotting compared to round-robin. When a request completes, the worker's count is decremented, keeping the load state accurate in real time. | ||
|
|
||
| Workers that fail consecutive health checks (default: 3) are quarantined and excluded from the routing pool. A background loop pings each worker's `GET /health` endpoint at a configurable interval (default: 10s). |
There was a problem hiding this comment.
|
Thanks for the help @sniper35. Could you ping me on WeChat if you have my contact? Also, this PR will be held for a while under confidential experiment. We will acknowledge your PR in our blog. Thanks! |
|
Also, please clean up your code and document. |
|
Hey, could you try to use this to profile the scalability of your router? python/sglang/multimodal_gen/benchmarks/bench_serving.py in sglang main repo. The idea is to benchmark the scalability and accuracy of your router. Say, using routing to launch 1/2/4/8 diffusion servers, provides 1000 requests, and set concurrency to 1/10/100, etc. |
Sure will get the test result asap, thanks for your immediate response! |
|
@zhaochenyang20 When using SD3 with small (max_concurrency / num_workers) and small num_prompts in which case the hardware is underutilized, performance with router is actually worse than without router, when I increased the max_concurrency(24) / num_workers(8) to 3 and increased the num_prompts to 1920 and it starts to perform better than without router by a little. I need more compute resource to do larger experiment but hopefully it says sth about the advantage brought by the very basic router. I'll get more benchmark data and update the table. Model: stabilityai/stable-diffusion-3-medium-diffusers |
Nicely done. Thanks for your contribution. I do not think that there would be a case where using a router could be worse. My assumption is that we keep the num of servers fixed, like always, to 8. Then the router is just adding routing algorithms. In other words, could you try to compare round-robin vs minimal-workload routing? I think the latter one would always be better. In the same time, I am reviewing this PR: After this is merged, please rebase your codes and implement the |
I found the issue in the test script and refactored it and produced the comparison files by running least-workload against round-robin and random selection. |
8bf3d4a to
752cd44
Compare
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This reverts commit 021cf6b.
a751a23 to
385805d
Compare
Sure, sounds good. I'll wait for [sgl-project/sglang#18154](sgl-project/sglang#18154 to be merged and then evaluate using the new profiling tools. Thanks for your quick response! |
|
https://github.com/zhaochenyang20/sglang-diffusion-routing/invitations Since this PR has been blocked by the actual diffusion RL pipeline, we want to detach it from Miles right now and move it to an independent testing repo for quick development. Would you mind if we acknowledge your codes and move it to https://github.com/zhaochenyang20/sglang-diffusion-routing? It would be much convenient to make python binding to it 😂 thanks so much By the way, if you would like to connect with me in person, let's discuss some details. Thanks! You can reach out to me on WeChat at 18015766633. |
@zhaochenyang20 Seems @klhhhhh has already made a branch https://github.com/zhaochenyang20/sglang-diffusion-routing/tree/kl_dev for it, is this what you are looking for? Thanks @klhhhhh |




WIP, to close #541
Develop the miles-diffusion-router script/module.
Implement basic load balancing logic (east-Request).
Create a demo script showing the router coordinating multiple sglang-diffusion backends. Will run the test and provide test result.
Document the setup process and API usage.(examples/diffusion_router/)