Skip to content

[feat] support async dataflow by resamping in next rollout step#1198

Merged
YanhuiDua merged 20 commits intoInternLM:mainfrom
YanhuiDua:async_dataflow
Nov 5, 2025
Merged

[feat] support async dataflow by resamping in next rollout step#1198
YanhuiDua merged 20 commits intoInternLM:mainfrom
YanhuiDua:async_dataflow

Conversation

@YanhuiDua
Copy link
Copy Markdown
Collaborator

@YanhuiDua YanhuiDua commented Oct 31, 2025

  1. This PR support async dataflow by resamping in next rollout step and use non-stream inference by call abort_request to infer engine. The aborted(paused) request will take "abort" as finish_reason; LMDeploy support this feature in PR: add endpoint /abort_request lmdeploy#4092

  2. This PR standardizes the usage of concurrency parameters for the dataflow process. XTUNER_MAX_CONCURRENCY is now used to control the concurrency of the dataflow itself. The concurrency for the inference engine is calculated based on the dataflow's concurrency, prompt_repeat_k, and tp_size. The concurrency for httpx is set to the inference engine's concurrency multiplied by tp_size. Finally, RAY_MAX_CONCURRENCY is used to control Ray's concurrency, which is set to the dataflow's concurrency multiplied by prompt_repeat_k.

@YanhuiDua YanhuiDua changed the title [feat] support sync dataflow by resamping in next sample (version1) [feat] support async dataflow by resamping in next training step (version1) Nov 3, 2025
@YanhuiDua YanhuiDua changed the title [feat] support async dataflow by resamping in next training step (version1) [feat] support async dataflow by resamping in next rollout step Nov 4, 2025
tensor_parallel_size=rollout_tp_size,
expert_parallel_size=rollout_ep_size,
gpu_memory_utilization=0.8,
context_length = max_response_length + 2048,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
context_length = max_response_length + 2048,
context_length = max_response_length + max_prompt_length,

eval_data_path = os.environ["EVAL_DATA_PATH"]
enable_evaluate = True if eval_data_path != "" else False
enbale_partial_rollout = int(os.environ.get("ENBALE_PARTIAL_ROLLOUT", "0"))
max_concurrent = int(os.environ.get("XTUNER_MAX_CONCURRENCY", "512"))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
max_concurrent = int(os.environ.get("XTUNER_MAX_CONCURRENCY", "512"))
max_concurrent = int(os.environ.get("XTUNER_MAX_CONCURRENCY", 512))

eval_data_path = os.environ["EVAL_DATA_PATH"]
enable_evaluate = True if eval_data_path != "" else False
enbale_partial_rollout = int(os.environ.get("ENBALE_PARTIAL_ROLLOUT", "0"))
max_concurrent = int(os.environ.get("XTUNER_MAX_CONCURRENCY", "512"))
Copy link
Copy Markdown
Collaborator

@hhaAndroid hhaAndroid Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加个 TODO,说明后续要重构为唯一的对外参数,而且是单卡 。内部所有其他参数都基于这个自动折算

@YanhuiDua YanhuiDua merged commit a8e3b07 into InternLM:main Nov 5, 2025
3 of 4 checks passed
@YanhuiDua YanhuiDua deleted the async_dataflow branch March 3, 2026 11:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants