diff --git a/examples/specdec_bench/README.md b/examples/specdec_bench/README.md index 1987167e7c8..1322fc00196 100644 --- a/examples/specdec_bench/README.md +++ b/examples/specdec_bench/README.md @@ -145,6 +145,54 @@ python3 run.py \ --runtime_params runtime_args_long_context.yaml ``` +## Uploading results to S3 + +Each `run.py` invocation writes a result directory containing `configuration.json`, +`timing.json`, `acceptance_rate.json`, and (when applicable) `mtbench.json` / `specbench.json`. +`upload_to_s3.py` is a single-file, drop-in tool that uploads one run — or an entire sweep — +to any S3-compatible bucket: + +```bash +python upload_to_s3.py /path/to/run_or_sweep_dir s3://your-bucket/some/prefix \ + --endpoint https://your-s3-endpoint \ + --key-id YOUR_KEY_ID \ + --secret YOUR_SECRET +``` + +`--endpoint`, `--key-id`, and `--secret` default to the `S3_ENDPOINT`, `S3_KEY_ID`, and +`S3_SECRET` environment variables. Omit `--endpoint` (or set `S3_ENDPOINT=""`) to use AWS S3's +default endpoint. Use `--dry-run` to preview the upload plan, and `--skip-existing` to skip +runs already present at the destination instead of failing. + +The tool handles two directory layouts and mirrors them into S3: + +- **Flat** — `LOCAL_DIR/run_name/{configuration,timing,...}.json` +- **Sweep** — `LOCAL_DIR/sweep_name/run_name/{configuration,timing,...}.json` + +`LOCAL_DIR`'s basename is preserved in the destination prefix, so re-uploads from the same +source land in the same place. + +### Optional attestation fields + +`run.py` reads two environment variables when writing `configuration.json`; they're optional +provenance hints that downstream consumers (dashboards, leaderboards) can use to attest a run: + +| Env var | Purpose | +|---|---| +| `JIRA_TICKET` | A tracking ID for the run (your tracker — JIRA key, GitHub issue, etc.) | +| `HUGGINGFACE_MODEL_ID` | The public model id on the Hugging Face Hub, so the model used can be independently fetched | + +Set them in the same shell that launches `run.py`: + +```bash +export JIRA_TICKET=MYTRACK-1234 +export HUGGINGFACE_MODEL_ID=meta-llama/Llama-3.3-70B-Instruct +python3 run.py ... +``` + +Both fields appear in the run's `configuration.json` as `jira_ticket` and `huggingface_model_id` +(or `null` when unset). They have no effect on the benchmark itself. + ## Notes The goal of this benchmark is to provide an easy way to configure, run, and compare speculative implementations across frameworks in an apples-to-apples method. diff --git a/examples/specdec_bench/requirements_speed.txt b/examples/specdec_bench/requirements_speed.txt index 549a5d73e81..b6e7d001ffd 100644 --- a/examples/specdec_bench/requirements_speed.txt +++ b/examples/specdec_bench/requirements_speed.txt @@ -1,3 +1,5 @@ +boto3>=1.34.0 +botocore>=1.34.0 datasets>=3.1.0 rich>=14.2.0 seaborn>=0.13.2 diff --git a/examples/specdec_bench/run.py b/examples/specdec_bench/run.py index 94932c787b8..337c5184f20 100644 --- a/examples/specdec_bench/run.py +++ b/examples/specdec_bench/run.py @@ -20,6 +20,7 @@ from specdec_bench import datasets, metrics, models, runners from specdec_bench.utils import ( decode_chat, + dump_env, encode_chat, get_tokenizer, postprocess_base, @@ -174,6 +175,10 @@ def run_simple(args): if args.save_dir is not None: for metric in metrics_list: metric.update_directory(args.save_dir) + # Stamp configuration.json BEFORE the run loop so the file lands even + # when the run crashes mid-way. Engine init is already done, so the + # live serving_config from the model is available. + dump_env(args, args.save_dir, overrides={"serving_config": model.get_serving_config()}) runner = runners.SimpleRunner(model, metrics=metrics_list) diff --git a/examples/specdec_bench/specdec_bench/__init__.py b/examples/specdec_bench/specdec_bench/__init__.py index 47f1c65a15f..7882d772f5e 100644 --- a/examples/specdec_bench/specdec_bench/__init__.py +++ b/examples/specdec_bench/specdec_bench/__init__.py @@ -13,3 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Methodology version. Bump: +# - minor (0.X.0) when adding a new metric or strictly-additive provenance field +# - major (X.0.0) when changing how an existing metric is computed OR its +# on-disk field names (incompatible with prior consumers / visualizers) +# The visualizer aggregates runs by major version to avoid apple-to-orange +# comparisons across methodology changes. +# +# 1.0.0: rename Request_AR / Category_AR / Average_AR → *_AL across the +# SpecBench / AcceptanceRate / MTBench metric writers, AND add +# Joint_Acceptance_Rate to the AcceptanceRate metric. The renamed +# values were always acceptance LENGTH (mean tokens generated per +# inference step), not a rate, and the visualizer reads *_AL. +# Pre-1.0.0 runs in S3 have *_AR and no Joint_AR; they must be +# re-run or post-processed before comparing. +__version__ = "1.0.0" diff --git a/examples/specdec_bench/specdec_bench/metrics/acceptance_rate.py b/examples/specdec_bench/specdec_bench/metrics/acceptance_rate.py index fffd9ebf789..c9bbe743647 100644 --- a/examples/specdec_bench/specdec_bench/metrics/acceptance_rate.py +++ b/examples/specdec_bench/specdec_bench/metrics/acceptance_rate.py @@ -55,23 +55,31 @@ def _process_lengths(self, lengths): self.out["Conditional_Acceptance_Rate"][k] = running_len / sum_lengths / prev_ratio prev_ratio = running_len / sum_lengths running_len -= v + # Joint acceptance rate at step k = product of conditional acceptance + # rates at steps 1..k = probability that ≥k tokens are accepted in + # a row. The visualizer renders this as a separate panel. + self.out["Joint_Acceptance_Rate"] = {} + running_joint = 1.0 + for k, cond_ar in self.out["Conditional_Acceptance_Rate"].items(): + running_joint *= cond_ar + self.out["Joint_Acceptance_Rate"][k] = running_joint def process_final(self, text_outputs): all_ar = [] lengths = {} - self.out["Request_AR"] = {} + self.out["Request_AL"] = {} self.prompt_ar = dict(sorted(self.prompt_ar.items(), key=lambda x: x[0])) for request_id, turns in self.prompt_ar.items(): - self.out["Request_AR"][request_id] = {} + self.out["Request_AL"][request_id] = {} for turn_id, turn in turns.items(): ar = sum(turn) / len(turn) - self.out["Request_AR"][request_id][turn_id] = ar + self.out["Request_AL"][request_id][turn_id] = ar all_ar.append(ar) self._get_lengths(turn, lengths) - print(request_id, turn_id, self.out["Request_AR"][request_id][turn_id]) + print(request_id, turn_id, self.out["Request_AL"][request_id][turn_id]) average_ar = sum(all_ar) / len(all_ar) print("Average AR:", average_ar) - self.out["Average_AR"] = average_ar + self.out["Average_AL"] = average_ar self._process_lengths(lengths) self.write() self._format_write_output(text_outputs) diff --git a/examples/specdec_bench/specdec_bench/metrics/mtbench.py b/examples/specdec_bench/specdec_bench/metrics/mtbench.py index 0a7dd8b485b..c073aa55ce5 100644 --- a/examples/specdec_bench/specdec_bench/metrics/mtbench.py +++ b/examples/specdec_bench/specdec_bench/metrics/mtbench.py @@ -34,29 +34,29 @@ class MTBench(AcceptanceRate): def process_final(self, text_outputs): i = 0 lengths = {} - self.out["Request_AR"] = {} + self.out["Request_AL"] = {} self.prompt_ar = dict(sorted(self.prompt_ar.items(), key=lambda x: x[0])) for request_id, turns in self.prompt_ar.items(): turn_1 = turns[0] turn_2 = turns[1] q_id = request_id mtbench_topic = MTBENCH_TOPICS[q_id // 10] - self.out["Request_AR"][request_id] = sum(turn_1 + turn_2) / len(turn_1 + turn_2) + self.out["Request_AL"][request_id] = sum(turn_1 + turn_2) / len(turn_1 + turn_2) self._get_lengths(turn_1, lengths) self._get_lengths(turn_2, lengths) print(mtbench_topic, sum(turn_1 + turn_2) / len(turn_1 + turn_2)) per_category = [[] for _ in range(len(MTBENCH_TOPICS))] - for q_id, ar in self.out["Request_AR"].items(): + for q_id, ar in self.out["Request_AL"].items(): per_category[q_id // 10].append(ar) - self.out["Category_AR"] = {} + self.out["Category_AL"] = {} for i, category in enumerate(per_category): if len(category) > 0: category_ar = sum(category) / len(category) - self.out["Category_AR"][MTBENCH_TOPICS[i]] = category_ar + self.out["Category_AL"][MTBENCH_TOPICS[i]] = category_ar print(f"{MTBENCH_TOPICS[i]} Average AR: {category_ar}") - average_ar = sum(self.out["Request_AR"].values()) / len(self.out["Request_AR"]) + average_ar = sum(self.out["Request_AL"].values()) / len(self.out["Request_AL"]) print("Average AR:", average_ar) - self.out["Average_AR"] = average_ar + self.out["Average_AL"] = average_ar self._process_lengths(lengths) self.write() self._format_write_output(text_outputs) diff --git a/examples/specdec_bench/specdec_bench/metrics/specbench.py b/examples/specdec_bench/specdec_bench/metrics/specbench.py index 32ab3d1c7a5..18902a659a3 100644 --- a/examples/specdec_bench/specdec_bench/metrics/specbench.py +++ b/examples/specdec_bench/specdec_bench/metrics/specbench.py @@ -44,26 +44,26 @@ def __init__(self, requests): def process_final(self, text_outputs): lengths = {} - self.out["Request_AR"] = {} + self.out["Request_AL"] = {} for request_id, request in enumerate(self.requests): turns = self.prompt_ar[request_id].values() assert len(turns) == len(request.turns), ( f"Number of turns {len(turns)} does not match number of turns in request {len(request.turns)}" ) - self.out["Request_AR"][request.question_id] = mean(list(chain(*turns))) + self.out["Request_AL"][request.question_id] = mean(list(chain(*turns))) for turn in turns: self._get_lengths(turn, lengths) - print(request.category, self.out["Request_AR"][request.question_id]) + print(request.category, self.out["Request_AL"][request.question_id]) per_category = defaultdict(list) for request in self.requests: - per_category[request.category].append(self.out["Request_AR"][request.question_id]) - self.out["Category_AR"] = {} + per_category[request.category].append(self.out["Request_AL"][request.question_id]) + self.out["Category_AL"] = {} for category_name, category_ar in per_category.items(): if len(category_ar) > 0: category_ar = mean(category_ar) - self.out["Category_AR"][category_name] = category_ar - average_ar = mean(self.out["Request_AR"].values()) - self.out["Average_AR"] = average_ar + self.out["Category_AL"][category_name] = category_ar + average_ar = mean(self.out["Request_AL"].values()) + self.out["Average_AL"] = average_ar self._process_lengths(lengths) self.write() self._format_write_output(text_outputs) @@ -96,12 +96,12 @@ def _pretty_print_results(self): table.add_column("Average AR", justify="right", style="green") # Add category rows - for category_name, category_ar in sorted(self.out["Category_AR"].items()): + for category_name, category_ar in sorted(self.out["Category_AL"].items()): table.add_row(category_name, f"{category_ar:.4f}") # Add separator and summary row table.add_section() - table.add_row("[bold]Overall Average[/bold]", f"[bold]{self.out['Average_AR']:.4f}[/bold]") + table.add_row("[bold]Overall Average[/bold]", f"[bold]{self.out['Average_AL']:.4f}[/bold]") console.print(table) @@ -124,8 +124,8 @@ def _create_visualizations( df_clean = pd.DataFrame.from_dict( { - "question_id": list(self.out["Request_AR"].keys()), - "acceptance_rate": list(self.out["Request_AR"].values()), + "question_id": list(self.out["Request_AL"].keys()), + "acceptance_rate": list(self.out["Request_AL"].values()), "category": [request.category for request in self.requests], "response_length": [ mean([len(c["content"]) for c in messages if c["role"] == "assistant"]) diff --git a/examples/specdec_bench/specdec_bench/models/base.py b/examples/specdec_bench/specdec_bench/models/base.py index ab26a4704dd..43d3a1337d3 100644 --- a/examples/specdec_bench/specdec_bench/models/base.py +++ b/examples/specdec_bench/specdec_bench/models/base.py @@ -27,5 +27,14 @@ async def run(self, prompt_ids, sampling_params, request_id, turn_id): """ raise NotImplementedError + def get_serving_config(self): + """Return a JSON-serializable dict describing the engine's effective config. + + Captured into configuration.json's `serving_config` for reproducibility. + Subclasses override to surface engine-specific defaults (max_model_len, + kv_cache_dtype, etc.) that don't appear in the CLI args. Default: empty. + """ + return {} + def stop(self): pass diff --git a/examples/specdec_bench/specdec_bench/models/sglang.py b/examples/specdec_bench/specdec_bench/models/sglang.py index 2133ec937eb..99a66b0647e 100644 --- a/examples/specdec_bench/specdec_bench/models/sglang.py +++ b/examples/specdec_bench/specdec_bench/models/sglang.py @@ -91,6 +91,7 @@ def __init__( if "mamba_scheduler_strategy" in kwargs: engine_kwargs["mamba_scheduler_strategy"] = kwargs["mamba_scheduler_strategy"] + self.engine_kwargs = engine_kwargs self.model = sgl.Engine(**engine_kwargs) self.sampling_config = sampling_kwargs @@ -129,3 +130,11 @@ async def run(self, prompt_ids, max_length, end_id, request_id, turn_id): output_dict["output_logits"] = None output_dict["token_times"] = timing return output_dict + + def get_serving_config(self): + """Dump the engine_kwargs dict supplied to sgl.Engine().""" + try: + # engine_kwargs is plain dict of scalars/None — already JSON-safe. + return dict(self.engine_kwargs) + except Exception: + return {} diff --git a/examples/specdec_bench/specdec_bench/models/vllm.py b/examples/specdec_bench/specdec_bench/models/vllm.py index fc595c1d579..52bf35f1a0d 100644 --- a/examples/specdec_bench/specdec_bench/models/vllm.py +++ b/examples/specdec_bench/specdec_bench/models/vllm.py @@ -89,6 +89,7 @@ def __init__(self, model_dir, max_concurrent_requests, sampling_kwargs, **kwargs async_scheduling=kwargs.get("async_scheduling", True), enforce_eager=False, ) + self.engine_args = engine_args self.model = AsyncLLM.from_engine_args(engine_args) self.sampling_kwargs = sampling_kwargs # https://github.com/vllm-project/vllm/blob/main/vllm/sampling_params.py @@ -151,6 +152,25 @@ async def generate(self, prompt_ids, request_id, turn_id): break return outputs, timing, full_tokens + def get_serving_config(self): + """Dump the AsyncEngineArgs dataclass plus the runtime vllm_config when available.""" + try: + import dataclasses + + cfg = dataclasses.asdict(self.engine_args) + except Exception: + cfg = {} + # vllm exposes the resolved engine config on the AsyncLLM instance once + # initialized — capture max_model_len / kv cache / dtype defaults that + # don't appear in AsyncEngineArgs. + try: + vllm_config = getattr(self.model, "vllm_config", None) + if vllm_config is not None and hasattr(vllm_config, "to_dict"): + cfg["vllm_config"] = vllm_config.to_dict() + except Exception: + pass + return cfg + def stop(self): try: self.loop.run_until_complete(self.model.shutdown()) diff --git a/examples/specdec_bench/specdec_bench/utils.py b/examples/specdec_bench/specdec_bench/utils.py index 14ded0f31b2..08ea07b8480 100644 --- a/examples/specdec_bench/specdec_bench/utils.py +++ b/examples/specdec_bench/specdec_bench/utils.py @@ -13,10 +13,26 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime +import hashlib import json +import os +import subprocess +import sys +from pathlib import Path from transformers import AutoTokenizer +from . import __version__ as specdec_bench_version + +_SENSITIVE_SUBSTRINGS = ("token", "key", "secret", "password") +# Keys whose names contain a sensitive substring but are NOT actually secrets. +# Without this allowlist `tokenizer` redacts the model path because it contains +# `token`, losing meaningful provenance. +_SENSITIVE_KEY_ALLOWLIST = frozenset( + {"tokenizer", "tokenizer_path", "tokenizer_mode", "tokenizer_revision"} +) + def get_tokenizer(path, trust_remote_code=False): return AutoTokenizer.from_pretrained(path, trust_remote_code=trust_remote_code) @@ -58,3 +74,250 @@ def postprocess_gptoss(text): if "<|channel|>" in final_message: final_message = final_message.split("<|channel|>")[0] return final_message + + +def _get_engine_version(engine): + """Return the engine package's __version__, or None on failure.""" + try: + if engine in ("TRTLLM", "AUTO_DEPLOY"): + import tensorrt_llm + + return tensorrt_llm.__version__ + elif engine == "VLLM": + import vllm + + return vllm.__version__ + elif engine == "SGLANG": + import sglang + + return sglang.__version__ + except Exception: + pass + return None + + +def _get_gpu_name(): + try: + import torch + + if torch.cuda.is_available(): + return torch.cuda.get_device_name(0) + except Exception: + pass + return None + + +def _get_modelopt_version(): + try: + import modelopt + + return getattr(modelopt, "__version__", None) + except Exception: + return None + + +def _git_sha(path): + """git rev-parse HEAD inside `path`. Returns None if not a repo or git missing.""" + try: + out = subprocess.run( + ["git", "rev-parse", "HEAD"], + cwd=path, + capture_output=True, + text=True, + timeout=5, + ) + if out.returncode == 0: + return out.stdout.strip() + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + return None + + +def _shard_files_from_index(index_path): + """Return the set of shard filenames referenced by a safetensors index JSON.""" + try: + with open(index_path) as f: + wm = json.load(f).get("weight_map", {}) or {} + return set(wm.values()) + except Exception: + return set() + + +def _checkpoint_provenance(model_dir): + """Cheap reproducibility fingerprint for a HuggingFace checkpoint directory. + + Returns {path, size_bytes, index_sha256, index_source}: + - index_sha256 hashes model.safetensors.index.json (or config.json fallback) + so it changes whenever the shard set or model config changes. + - size_bytes sums only the index-listed shards + config.json. For a + sharded 70B+ checkpoint this avoids a full rglob walk over hundreds + of tokenizer/cache files. Falls back to rglob when no index exists. + """ + if model_dir is None: + return None + try: + p = Path(model_dir) + if not p.is_dir(): + return {"path": str(model_dir)} + hash_target = None + for name in ("model.safetensors.index.json", "config.json"): + candidate = p / name + if candidate.is_file(): + hash_target = candidate + break + index_sha256 = None + if hash_target is not None: + h = hashlib.sha256() + with open(hash_target, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + h.update(chunk) + index_sha256 = h.hexdigest() + # Size: shards listed in the safetensors index + the index/config file + # itself. Avoids walking the entire model directory (which can be huge + # for sharded multi-100B checkpoints). + size_bytes = 0 + if hash_target is not None and hash_target.name == "model.safetensors.index.json": + for shard_name in _shard_files_from_index(hash_target): + shard_path = p / shard_name + if shard_path.is_file(): + size_bytes += shard_path.stat().st_size + size_bytes += hash_target.stat().st_size + else: + # No shard index — fall back to summing every file under the dir. + size_bytes = sum(f.stat().st_size for f in p.rglob("*") if f.is_file()) + return { + "path": str(model_dir), + "size_bytes": size_bytes, + "index_sha256": index_sha256, + "index_source": hash_target.name if hash_target is not None else None, + } + except Exception: + return {"path": str(model_dir)} + + +def _is_sensitive_key(key): + klow = key.lower() + if klow in _SENSITIVE_KEY_ALLOWLIST: + return False + return any(s in klow for s in _SENSITIVE_SUBSTRINGS) + + +def _redact_value(value): + """Recursively redact secrets in nested dict/list values. + + The top-level `_redact_config` walks one level of keys, but engine configs + (serving_config from VLLMModel/SGLANGModel) and user-supplied runtime_params + are nested arbitrarily — fields like `hf_token`, `tokenizer_revision`, or + `aws_secret_access_key` need to be redacted at any depth. + """ + if isinstance(value, dict): + return { + k: ("***REDACTED***" if _is_sensitive_key(k) else _redact_value(v)) + for k, v in value.items() + } + if isinstance(value, list): + return [_redact_value(v) for v in value] + if isinstance(value, tuple): + return tuple(_redact_value(v) for v in value) + return value + + +def _redact_config(config): + return _redact_value(config) + + +def _redact_argv(argv): + """Mask values that follow a sensitive flag name (e.g. --hf_token VALUE). + + Conservative: only masks when the previous element looks like a flag whose + bare name (sans leading dashes) trips _is_sensitive_key. Also handles the + --flag=VALUE form. + """ + redacted = [] + prev_is_sensitive = False + for tok in argv: + s = str(tok) + if prev_is_sensitive: + redacted.append("***REDACTED***") + prev_is_sensitive = False + continue + if s.startswith("--"): + name, sep, val = s[2:].partition("=") + if _is_sensitive_key(name): + if sep: + redacted.append(f"--{name}=***REDACTED***") + prev_is_sensitive = False + else: + redacted.append(s) + prev_is_sensitive = True + continue + redacted.append(s) + prev_is_sensitive = False + return redacted + + +def dump_env(args, save_dir, overrides=None): + """Write configuration.json to save_dir capturing run args, engine version, and provenance. + + `overrides` is merged in last and is the channel for runtime-only fields + (e.g. the live engine's serving_config dict from runner.get_serving_config()). + """ + config = _redact_config(vars(args).copy()) + if overrides: + config.update(_redact_config(overrides)) + + config["engine_version"] = _get_engine_version(config.get("engine")) + config["gpu"] = _get_gpu_name() + config["python_version"] = sys.version + config["argv"] = _redact_argv(sys.argv[:]) + + # Provenance for reproducibility / apple-to-orange guarding. + # Each *_sha and modelopt_version prefers an env var set by the harness + # (because git/.git is typically not present inside the runtime container), + # then falls back to runtime detection for standalone usage outside the + # harness. container_image and nmm_sandbox_sha are env-only — there is no + # reasonable in-process way to know them. + config["specdec_bench_version"] = specdec_bench_version + specdec_bench_dir = Path(__file__).resolve().parent + config["specdec_bench_sha"] = ( + os.environ.get("SPECDEC_BENCH_SHA") or _git_sha(specdec_bench_dir) + ) + config["modelopt_version"] = ( + os.environ.get("MODELOPT_VERSION") or _get_modelopt_version() + ) + # Fallback assumes the in-tree layout examples/specdec_bench/specdec_bench/. + # parents[2] reaches the modelopt repo root in that case. When vendored + # elsewhere this would `git rev-parse` an unrelated repo; rely on the env + # var path instead for non-in-tree deployments. + config["modelopt_sha"] = ( + os.environ.get("MODELOPT_SHA") or _git_sha(specdec_bench_dir.parents[2]) + ) + config["nmm_sandbox_sha"] = os.environ.get("NMM_SANDBOX_SHA") or None + config["container_image"] = os.environ.get("CONTAINER_IMAGE") or None + # Checkpoint fingerprint. + config["checkpoint"] = _checkpoint_provenance(getattr(args, "model_dir", None)) + # UTC timestamp. + config["timestamp"] = datetime.datetime.now(datetime.timezone.utc).isoformat() + + # Attestation fields used by the visualizer to distinguish JIRA-tracked + # official runs (must be on HuggingFace Hub) from community-contributed + # runs. The harness sets JIRA_TICKET only after verifying the checkpoint + # resolves on Hub; standalone runs leave both empty. + config["jira_ticket"] = os.environ.get("JIRA_TICKET") or None + config["huggingface_model_id"] = os.environ.get("HUGGINGFACE_MODEL_ID") or None + + # Deterministic per-run UID. SHA-256 of the small set of inputs that + # together identify this run; same inputs → same UID, so accidental + # re-uploads of the same run dedupe in the visualizer. + uid_parts = "|".join([ + str(config["timestamp"]), + str(getattr(args, "model_dir", "") or ""), + str(getattr(args, "speculative_algorithm", "") or ""), + str(getattr(args, "concurrency", "") or ""), + str(config.get("specdec_bench_sha") or ""), + ]) + config["uid"] = hashlib.sha256(uid_parts.encode()).hexdigest()[:16] + + os.makedirs(save_dir, exist_ok=True) + with open(os.path.join(save_dir, "configuration.json"), "w") as f: + json.dump(config, f, indent=4, default=str) diff --git a/examples/specdec_bench/upload_to_s3.py b/examples/specdec_bench/upload_to_s3.py new file mode 100644 index 00000000000..517bd53cd1c --- /dev/null +++ b/examples/specdec_bench/upload_to_s3.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Upload specdec_bench results to S3. + +Handles both flat and sweep directory layouts: + + Flat: LOCAL_DIR/run_name/{configuration,timing,...}.json + Sweep: LOCAL_DIR/sweep_name/run_name/{configuration,timing,...}.json + +LOCAL_DIR's name is preserved under the S3 prefix: + + s3://bucket/prefix/LOCAL_DIR_NAME/[sweep_name/]run_name/ + +Usage examples: + + # Upload a sweep output directory + python upload_to_s3.py /data/sweep_outputs/my_sweep s3://team-specdec-workgroup/results + + # Upload a single run + python upload_to_s3.py /data/my_single_run s3://team-specdec-workgroup/results + + # Skip already-uploaded runs instead of failing + python upload_to_s3.py /data/sweep_outputs/my_sweep s3://team-specdec-workgroup/results --skip-existing +""" + +import argparse +import os +import sys +from pathlib import Path + +_RUN_SENTINELS = ("configuration.json", "timing.json", "acceptance_rate.json") + + +# ── S3 helpers ──────────────────────────────────────────────────────────────── +# Inlined from a former specdec_bench/s3_utils.py so this stays a drop-in +# single-file tool: a contributor can `wget upload_to_s3.py` and run it without +# installing the specdec_bench package. Defaults are empty — endpoint, key id, +# and secret are taken from --endpoint / --key-id / --secret (or the +# corresponding S3_ENDPOINT / S3_KEY_ID / S3_SECRET env vars). No team-specific +# infrastructure leaks into this public example. + + +def parse_s3_path(path: str) -> tuple[str, str]: + """'s3://bucket/prefix' → (bucket, prefix). prefix may be empty.""" + without_scheme = path[5:] # strip "s3://" + parts = without_scheme.split("/", 1) + bucket = parts[0] + prefix = parts[1].strip("/") if len(parts) > 1 else "" + return bucket, prefix + + +def make_s3_client(endpoint: str, key_id: str, secret: str): + import boto3 + from botocore.config import Config + return boto3.client( + "s3", + endpoint_url=endpoint or None, + aws_access_key_id=key_id, + aws_secret_access_key=secret, + region_name="us-east-1", + config=Config(s3={"addressing_style": "path"}), + ) + + +def s3_prefix_exists(s3, bucket: str, prefix: str) -> bool: + resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix.rstrip("/") + "/", MaxKeys=1) + return bool(resp.get("Contents")) + + +def _upload_files(s3, local_dir: Path, bucket: str, s3_prefix: str) -> None: + for file_path in sorted(local_dir.rglob("*")): + if not file_path.is_file(): + continue + rel = file_path.relative_to(local_dir).as_posix() + key = f"{s3_prefix}/{rel}" + s3.upload_file(str(file_path), bucket, key) + print(f" Uploaded: s3://{bucket}/{key}") + + +def upload_run_dir(s3, local_dir: Path, bucket: str, s3_prefix: str) -> None: + """Upload a single run directory to s3://bucket/s3_prefix/. + + Raises ValueError if the destination prefix already has any objects. + """ + s3_prefix = s3_prefix.rstrip("/") + if s3_prefix_exists(s3, bucket, s3_prefix): + raise ValueError( + f"S3 destination already exists: s3://{bucket}/{s3_prefix} — refusing to overwrite" + ) + _upload_files(s3, local_dir, bucket, s3_prefix) + + +def _is_run_dir(d: Path) -> bool: + return any((d / f).exists() for f in _RUN_SENTINELS) + + +def _discover_runs(local_root: Path, s3_prefix_base: str) -> list[tuple[Path, str]]: + """Return list of (local_run_dir, s3_key) pairs to upload. + + local_root's name is appended to s3_prefix_base, then contents mirrored: + local_root/run_name/ → s3_prefix_base/local_root.name/run_name/ + local_root/sweep_name/run_name/ → s3_prefix_base/local_root.name/sweep_name/run_name/ + """ + base = f"{s3_prefix_base}/{local_root.name}".lstrip("/") + queue: list[tuple[Path, str]] = [] + + if _is_run_dir(local_root): + # The directory itself is a single run + queue.append((local_root, base)) + return queue + + for child in sorted(local_root.iterdir()): + if not child.is_dir(): + continue + if _is_run_dir(child): + # Flat layout: local_root/run_name/ + queue.append((child, f"{base}/{child.name}")) + else: + # Sweep layout: local_root/sweep_name/run_name/ + queue.extend( + (grandchild, f"{base}/{child.name}/{grandchild.name}") + for grandchild in sorted(child.iterdir()) + if grandchild.is_dir() and _is_run_dir(grandchild) + ) + + return queue + + +def main(): + parser = argparse.ArgumentParser( + description="Upload specdec_bench results to S3.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__.split("Usage examples:")[1] if "Usage examples:" in __doc__ else "", + ) + parser.add_argument("local_dir", help="Local results directory to upload") + parser.add_argument( + "s3_dest", help="S3 destination prefix, e.g. s3://team-specdec-workgroup/results" + ) + parser.add_argument( + "--endpoint", + default=os.environ.get("S3_ENDPOINT", ""), + help="S3 endpoint URL", + ) + parser.add_argument( + "--key-id", + default=os.environ.get("S3_KEY_ID", ""), + dest="key_id", + help="S3 access key ID", + ) + parser.add_argument( + "--secret", + default=os.environ.get("S3_SECRET", ""), + help="S3 secret access key", + ) + parser.add_argument( + "--skip-existing", + action="store_true", + help="Skip runs that already exist in S3 instead of failing", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Print what would be uploaded without actually uploading", + ) + args = parser.parse_args() + + if not args.s3_dest.startswith("s3://"): + sys.exit("Error: s3_dest must start with s3://") + + local_root = Path(args.local_dir).resolve() + if not local_root.is_dir(): + sys.exit(f"Error: {local_root} is not a directory") + + bucket, s3_prefix_base = parse_s3_path(args.s3_dest) + queue = _discover_runs(local_root, s3_prefix_base) + + if not queue: + sys.exit("No run directories found to upload.") + + print(f"Found {len(queue)} run(s) to upload → s3://{bucket}/") + + if args.dry_run: + for local_run_dir, s3_key in queue: + print(f" {local_run_dir} → s3://{bucket}/{s3_key}") + return + + s3 = make_s3_client(args.endpoint, args.key_id, args.secret) + + errors = 0 + skipped = 0 + uploaded = 0 + for local_run_dir, s3_key in queue: + print(f"\n{local_run_dir.name} → s3://{bucket}/{s3_key}") + try: + upload_run_dir(s3, local_run_dir, bucket, s3_key) + uploaded += 1 + except ValueError as exc: + # Raised by upload_run_dir when the destination already exists. + if args.skip_existing: + print(f" Skipped: {exc}") + skipped += 1 + else: + print(f" Error: {exc}") + errors += 1 + except Exception as exc: # noqa: BLE001 - any boto3/IO failure should not abort the sweep + # ClientError (auth, throttling, network), OSError on a single file, + # etc. Keep going so the other runs in a sweep still upload, and + # report a per-run summary at the end. Sweep-level non-zero exit + # still flags the overall run as failed. + print(f" Error: {type(exc).__name__}: {exc}") + errors += 1 + + print(f"\nDone: {uploaded} uploaded, {skipped} skipped, {errors} failed.") + if errors: + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tools/launcher/common/specdec_bench/run.sh b/tools/launcher/common/specdec_bench/run.sh new file mode 100755 index 00000000000..9b4fbe47b64 --- /dev/null +++ b/tools/launcher/common/specdec_bench/run.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" +source ${SCRIPT_DIR}/../service_utils.sh + +trap 'error_handler $0 $LINENO' ERR +trap 'exit_handler' EXIT + +################################################################################################### +# Backend-agnostic specdec_bench entrypoint. The caller's YAML supplies --engine +# (VLLM | SGLANG | TRTLLM | NONE) and the dataset / sweep flags via "args"; this +# script just sources service_utils.sh, installs the speed-bench deps, and execs +# examples/specdec_bench/run.py. +# +# Required env: HF_MODEL_CKPT +# Optional env: HF_DRAFT_MODEL_CKPT (consumed by --draft_model_dir if the YAML passes it) + +# Skip the install when the deps are already present (warm container or +# previous task in the pipeline). Saves a few minutes per task and avoids +# silently drifting versions if upstream wheels move between launches. +if ! pip show boto3 >/dev/null 2>&1 || \ + ! pip show datasets >/dev/null 2>&1 || \ + ! pip show seaborn >/dev/null 2>&1; then + if ! pip install -r modules/Model-Optimizer/examples/specdec_bench/requirements_speed.txt; then + report_result "FAIL: specdec_bench: pip install requirements_speed.txt failed" + exit 1 + fi +fi + +if ! python3 modules/Model-Optimizer/examples/specdec_bench/run.py \ + --model_dir ${HF_MODEL_CKPT} \ + --tokenizer ${HF_MODEL_CKPT} \ + "${@}"; then + report_result "FAIL: specdec_bench: run.py exited non-zero" + exit 1 +fi + +report_result "PASS: specdec_bench run completed" diff --git a/tools/launcher/examples/Qwen/Qwen3.5-4B/specdec_bench.yaml b/tools/launcher/examples/Qwen/Qwen3.5-4B/specdec_bench.yaml new file mode 100644 index 00000000000..0c36ded7706 --- /dev/null +++ b/tools/launcher/examples/Qwen/Qwen3.5-4B/specdec_bench.yaml @@ -0,0 +1,48 @@ +# SPEED-bench smoke run for Qwen3.5-4B via vLLM (autoregressive baseline). +# +# Reads nvidia/SPEED-Bench-Internal/qualitative through Qwen/Qwen3.5-4B with +# --speculative_algorithm NONE (no draft model) and writes timing.json + +# aa_timing.json + acceptance_rate.json + specbench_responses.jsonl + +# specbench_results.json to /scratchspace/specdec_bench/. +# +# The qwen3_5 model_type needs transformers >= 4.58, which is NOT in +# vllm/vllm-openai:latest yet — use the qwen3_5-cu130 tag instead. +# +# Local run: +# uv run launch.py --yaml examples/Qwen/Qwen3.5-4B/specdec_bench.yaml \ +# hf_local=/home/omniml_data_3/hf-local --yes +# +# Slurm run on cw_dfw (where Qwen3.5-4B and SPEED-Bench-Internal are staged): +# uv run slurm.py --yaml modules/Model-Optimizer/tools/launcher/examples/Qwen/Qwen3.5-4B/specdec_bench.yaml --yes + +job_name: Qwen3.5-4B_specdec_bench + +pipeline: + global_vars: + hf_model: /hf-local/Qwen/Qwen3.5-4B + + task_0: + script: common/specdec_bench/run.sh + args: + - --dataset speed + - --dataset_path /hf-local/nvidia/SPEED-Bench-Internal/qualitative + - --engine VLLM + - --speculative_algorithm NONE + - --tp_size 1 + - --ep_size 1 + - --concurrency 1 + - --num_requests 80 + - --output_length 4096 + - --aa_timing + - --show_progress + - --save_dir /scratchspace/specdec_bench + - --trust_remote_code + environment: + - HF_MODEL_CKPT: <> + - HF_LOCAL: /hf-local + slurm_config: + _factory_: "slurm_factory" + nodes: 1 + ntasks_per_node: 1 + gpus_per_node: 1 + container: vllm/vllm-openai:qwen3_5-cu130 diff --git a/tools/launcher/examples/Qwen/Qwen3.5-4B/specdec_bench_mtp.yaml b/tools/launcher/examples/Qwen/Qwen3.5-4B/specdec_bench_mtp.yaml new file mode 100644 index 00000000000..abdb7e6842e --- /dev/null +++ b/tools/launcher/examples/Qwen/Qwen3.5-4B/specdec_bench_mtp.yaml @@ -0,0 +1,57 @@ +# SPEED-bench MTP speculative-decoding run for Qwen3.5-4B via vLLM. +# +# Companion to specdec_bench.yaml (the NONE/autoregressive baseline). This +# variant exercises Qwen3.5-4B's built-in MTP head (text_config.mtp_num_hidden_layers=1) +# with draft_length=3 to produce real acceptance-rate numbers instead of the +# trivial AR=1 that NONE yields. +# +# Slurm run on cw_dfw: +# uv run slurm.py --yaml modules/Model-Optimizer/tools/launcher/examples/Qwen/Qwen3.5-4B/specdec_bench_mtp.yaml --yes + +job_name: Qwen3.5-4B_specdec_bench_mtp + +pipeline: + global_vars: + hf_model: /hf-local/Qwen/Qwen3.5-4B + + task_0: + script: common/specdec_bench/run.sh + args: + - --dataset speed + - --dataset_path /hf-local/nvidia/SPEED-Bench-Internal/qualitative + - --engine VLLM + - --speculative_algorithm MTP + - --draft_length 3 + - --tp_size 1 + - --ep_size 1 + - --concurrency 1 + - --num_requests 80 + - --output_length 4096 + - --aa_timing + - --show_progress + - --save_dir /scratchspace/specdec_bench_mtp + - --trust_remote_code + environment: + - HF_MODEL_CKPT: <> + - HF_LOCAL: /hf-local + # dump_env() in examples/specdec_bench/run.py reads provenance from these + # env vars when present; they are intentionally NOT set here. Hardcoded + # SHAs in a committed YAML would stale-lie about the build that's + # actually running. Set them in the harness (planned for Phase 2 in + # OMNIML-4788, where slurm.py / launch.py will compute and inject them + # at launch time): + # + # SPECDEC_BENCH_SHA = git rev-parse HEAD:examples/specdec_bench + # MODELOPT_SHA = git -C modules/Model-Optimizer rev-parse HEAD + # MODELOPT_VERSION = modelopt.__version__ (or git describe --tags) + # NMM_SANDBOX_SHA = git -C nmm-sandbox rev-parse HEAD + # CONTAINER_IMAGE = slurm_config.container (the image:tag below) + # + # When run standalone (without the harness), dump_env falls back to + # git rev-parse / module imports on the host filesystem. + slurm_config: + _factory_: "slurm_factory" + nodes: 1 + ntasks_per_node: 1 + gpus_per_node: 1 + container: vllm/vllm-openai:qwen3_5-cu130