From f76467fb2c62cc856bfed9227110dec38b2e86ed Mon Sep 17 00:00:00 2001 From: Raja Sekhar Rao Dheekonda Date: Fri, 29 Aug 2025 10:57:50 -0700 Subject: [PATCH 1/4] Update export runs call to leverage platform paginated runs and saves to disk --- docs/sdk/api.mdx | 83 ++++++++---- docs/sdk/data_types.mdx | 6 +- docs/sdk/main.mdx | 8 +- docs/sdk/metric.mdx | 8 +- docs/sdk/scorers.mdx | 10 +- dreadnode/api/client.py | 231 ++++++++++++++++++++++++++++---- dreadnode/credential_manager.py | 2 +- 7 files changed, 278 insertions(+), 70 deletions(-) diff --git a/docs/sdk/api.mdx b/docs/sdk/api.mdx index 214574a2..f7419ad5 100644 --- a/docs/sdk/api.mdx +++ b/docs/sdk/api.mdx @@ -88,6 +88,7 @@ def __init__( headers=headers, base_url=self._base_url, timeout=30, + cookies=_cookies, ) if debug: @@ -169,7 +170,6 @@ def export_metrics( Returns: A DataFrame containing the exported metric data. """ - import pandas as pd response = self.request( "GET", @@ -264,7 +264,6 @@ def export_parameters( Returns: A DataFrame containing the exported parameter data. """ - import pandas as pd response = self.request( "GET", @@ -293,36 +292,48 @@ export_runs( filter: str | None = None, status: StatusFilter = "completed", aggregations: list[MetricAggregationType] | None = None, -) -> pd.DataFrame + format: ExportFormat = "parquet", + base_dir: str | None = None, +) -> str ``` -Exports run data for a specific project. +Export runs using pagination - always writes to disk. **Parameters:** * **`project`** (`str`) - –The project identifier. + –The project identifier * **`filter`** (`str | None`, default: `None` ) - –A filter to apply to the exported data. Defaults to None. + –A filter to apply to the exported data * **`status`** (`StatusFilter`, default: `'completed'` ) - –The status of runs to include. Defaults to "completed". + –The status of runs to include * **`aggregations`** (`list[MetricAggregationType] | None`, default: `None` ) - –A list of aggregation types to apply. Defaults to None. + –A list of aggregation types to apply +* **`format`** + (`ExportFormat`, default: + `'parquet'` + ) + –Output format - "parquet", "csv", "json", "jsonl" +* **`base_dir`** + (`str | None`, default: + `None` + ) + –Base directory for export (defaults to "./strikes-data") **Returns:** -* `DataFrame` - –A DataFrame containing the exported run data. +* **`str`** ( `str` + ) –Path to the export directory ```python @@ -331,35 +342,61 @@ def export_runs( project: str, *, filter: str | None = None, - # format: ExportFormat = "parquet", status: StatusFilter = "completed", aggregations: list[MetricAggregationType] | None = None, -) -> "pd.DataFrame": + format: ExportFormat = "parquet", + base_dir: str | None = None, +) -> str: """ - Exports run data for a specific project. + Export runs using pagination - always writes to disk. Args: - project: The project identifier. - filter: A filter to apply to the exported data. Defaults to None. - status: The status of runs to include. Defaults to "completed". - aggregations: A list of aggregation types to apply. Defaults to None. + project: The project identifier + filter: A filter to apply to the exported data + status: The status of runs to include + aggregations: A list of aggregation types to apply + format: Output format - "parquet", "csv", "json", "jsonl" + base_dir: Base directory for export (defaults to "./strikes-data") Returns: - A DataFrame containing the exported run data. + str: Path to the export directory """ - import pandas as pd - response = self.request( + logger.info(f"Starting paginated export for project '{project}', format='{format}'") + + page = 1 + first_response = self.request( "GET", - f"/strikes/projects/{project!s}/export", + f"/strikes/projects/{project!s}/export/paginated", params={ - "format": "parquet", + "page": page, "status": status, **({"filter": filter} if filter else {}), **({"aggregations": aggregations} if aggregations else {}), }, ) - return pd.read_parquet(io.BytesIO(response.content)) + + if not first_response.content: + logger.info("No data found") + + first_chunk = pd.read_parquet(io.BytesIO(first_response.content)) + + total_runs = int(first_response.headers.get("x-total", "0")) + has_more = first_response.headers.get("x-has-more", "false") == "true" + + logger.info(f"Total runs: {total_runs}, Has more: {has_more}") + + logger.info(f"Writing {total_runs} runs to disk") + return self._export_to_disk( + project, + first_chunk, + dict(first_response.headers), + filter, + status, + aggregations, + format, + str(base_dir) if base_dir else None, + ) ``` diff --git a/docs/sdk/data_types.mdx b/docs/sdk/data_types.mdx index da56e3ea..d47a6ad2 100644 --- a/docs/sdk/data_types.mdx +++ b/docs/sdk/data_types.mdx @@ -643,10 +643,12 @@ def to_serializable(self) -> tuple[bytes, dict[str, t.Any]]: Returns: A tuple of (video_bytes, metadata_dict) """ - import numpy as np # type: ignore[import,unused-ignore] + import numpy as np # type: ignore[import,unused-ignore] # noqa: PLC0415 try: - from moviepy.video.VideoClip import VideoClip # type: ignore[import,unused-ignore] + from moviepy.video.VideoClip import ( # type: ignore[import,unused-ignore,import-untyped] # noqa: PLC0415 + VideoClip, + ) except ImportError: VideoClip = None # noqa: N806 diff --git a/docs/sdk/main.mdx b/docs/sdk/main.mdx index 300287e8..803a286e 100644 --- a/docs/sdk/main.mdx +++ b/docs/sdk/main.mdx @@ -283,10 +283,7 @@ def configure( with contextlib.suppress(Exception): user_config = UserConfig.read() profile_name = profile or os.environ.get(ENV_PROFILE) - if profile_name: - active_profile = profile_name - else: - active_profile = user_config.active_profile_name + active_profile = profile_name or user_config.active_profile_name if active_profile: config_source = f"profile: {active_profile}" @@ -460,7 +457,6 @@ def initialize(self) -> None: This method is called automatically when you call `configure()`. """ - from s3fs import S3FileSystem # type: ignore [import-untyped] if self._initialized: return @@ -976,7 +972,7 @@ with dreadnode.run("my_run"): def log_metric( self, name: str, - value: float | bool | Metric, + value: float | bool | Metric, # noqa: FBT001 *, step: int = 0, origin: t.Any | None = None, diff --git a/docs/sdk/metric.mdx b/docs/sdk/metric.mdx index 34a6afe6..0a537149 100644 --- a/docs/sdk/metric.mdx +++ b/docs/sdk/metric.mdx @@ -31,8 +31,8 @@ Metric Metric( value: float, step: int = 0, - timestamp: datetime = lambda: datetime.now( - timezone.utc + timestamp: datetime = ( + lambda: datetime.now(timezone.utc) )(), attributes: JsonDict = dict(), ) @@ -136,9 +136,7 @@ def apply_mode(self, mode: MetricAggMode, others: "list[Metric]") -> "Metric": self.value = len(others) + 1 elif mode == "avg" and prior_values: current_avg = prior_values[-1] - self.value = current_avg + (self.value - current_avg) / ( - len(prior_values) + 1 - ) + self.value = current_avg + (self.value - current_avg) / (len(prior_values) + 1) return self ``` diff --git a/docs/sdk/scorers.mdx b/docs/sdk/scorers.mdx index 573c2e9f..ff9b4863 100644 --- a/docs/sdk/scorers.mdx +++ b/docs/sdk/scorers.mdx @@ -128,7 +128,7 @@ def zero_shot_classification( ) try: - from transformers import ( # type: ignore [attr-defined,import-not-found,unused-ignore] + from transformers import ( # type: ignore [attr-defined,import-not-found,unused-ignore] # noqa: PLC0415 pipeline, ) except ImportError: @@ -846,7 +846,7 @@ def detect_harm_with_openai( model: The moderation model to use. name: Name of the scorer. """ - import openai + import openai # noqa: PLC0415 async def evaluate(data: t.Any) -> Metric: text = str(data) @@ -1816,7 +1816,7 @@ def detect_pii_with_presidio( ) try: - import presidio_analyzer # type: ignore[import-not-found,unused-ignore] # noqa: F401 + import presidio_analyzer # type: ignore[import-not-found,unused-ignore] # noqa: F401, PLC0415 except ImportError: warn_at_user_stacklevel(presidio_import_error_msg, UserWarning) @@ -2020,7 +2020,7 @@ def wrap_chat( """ async def evaluate(chat: "Chat") -> Metric: - from rigging.chat import Chat + from rigging.chat import Chat # noqa: PLC0415 # Fall through to the inner scorer if chat is not a Chat instance if not isinstance(chat, Chat): @@ -2479,7 +2479,7 @@ def similarity_with_litellm( or self-hosted models. name: Name of the scorer. """ - import litellm + import litellm # noqa: PLC0415 async def evaluate(data: t.Any) -> Metric: nonlocal reference, model diff --git a/dreadnode/api/client.py b/dreadnode/api/client.py index fda85b9e..41a81f6a 100644 --- a/dreadnode/api/client.py +++ b/dreadnode/api/client.py @@ -1,11 +1,14 @@ import io import json +import shutil import time import typing as t from datetime import datetime, timezone +from pathlib import Path from urllib.parse import urlparse import httpx +import pandas as pd from loguru import logger from pydantic import BaseModel from ulid import ULID @@ -13,6 +16,7 @@ from dreadnode.api.models import ( AccessRefreshTokenResponse, DeviceCodeResponse, + ExportFormat, GithubTokenResponse, MetricAggregationType, Project, @@ -371,80 +375,252 @@ def get_run_trace( trace = sorted(trace, key=lambda x: x.timestamp) return trace if format == "flat" else convert_flat_trace_to_tree(trace) - # Data exports + # Data Exports - def export_runs( + def export_metrics( self, project: str, *, filter: str | None = None, # format: ExportFormat = "parquet", status: StatusFilter = "completed", + metrics: list[str] | None = None, aggregations: list[MetricAggregationType] | None = None, ) -> "pd.DataFrame": """ - Exports run data for a specific project. + Exports metric data for a specific project. Args: project: The project identifier. filter: A filter to apply to the exported data. Defaults to None. - status: The status of runs to include. Defaults to "completed". + status: The status of metrics to include. Defaults to "completed". + metrics: A list of metric names to include. Defaults to None. aggregations: A list of aggregation types to apply. Defaults to None. Returns: - A DataFrame containing the exported run data. + A DataFrame containing the exported metric data. """ - import pandas as pd # noqa: PLC0415 response = self.request( "GET", - f"/strikes/projects/{project!s}/export", + f"/strikes/projects/{project!s}/export/metrics", params={ "format": "parquet", "status": status, - **({"filter": filter} if filter else {}), + "filter": filter, + **({"metrics": metrics} if metrics else {}), **({"aggregations": aggregations} if aggregations else {}), }, ) return pd.read_parquet(io.BytesIO(response.content)) - def export_metrics( + def export_runs( self, project: str, *, filter: str | None = None, - # format: ExportFormat = "parquet", status: StatusFilter = "completed", - metrics: list[str] | None = None, aggregations: list[MetricAggregationType] | None = None, - ) -> "pd.DataFrame": + format: ExportFormat = "parquet", + base_dir: str | None = None, + ) -> str: """ - Exports metric data for a specific project. + Export runs using pagination - always writes to disk. Args: - project: The project identifier. - filter: A filter to apply to the exported data. Defaults to None. - status: The status of metrics to include. Defaults to "completed". - metrics: A list of metric names to include. Defaults to None. - aggregations: A list of aggregation types to apply. Defaults to None. + project: The project identifier + filter: A filter to apply to the exported data + status: The status of runs to include + aggregations: A list of aggregation types to apply + format: Output format - "parquet", "csv", "json", "jsonl" + base_dir: Base directory for export (defaults to "./strikes-data") Returns: - A DataFrame containing the exported metric data. + str: Path to the export directory """ - import pandas as pd # noqa: PLC0415 - response = self.request( + logger.info(f"Starting paginated export for project '{project}', format='{format}'") + + page = 1 + first_response = self.request( "GET", - f"/strikes/projects/{project!s}/export/metrics", + f"/strikes/projects/{project!s}/export/paginated", params={ - "format": "parquet", + "page": page, "status": status, - "filter": filter, - **({"metrics": metrics} if metrics else {}), + **({"filter": filter} if filter else {}), **({"aggregations": aggregations} if aggregations else {}), }, ) - return pd.read_parquet(io.BytesIO(response.content)) + + if not first_response.content: + logger.info("No data found") + + first_chunk = pd.read_parquet(io.BytesIO(first_response.content)) + + total_runs = int(first_response.headers.get("x-total", "0")) + has_more = first_response.headers.get("x-has-more", "false") == "true" + + logger.info(f"Total runs: {total_runs}, Has more: {has_more}") + + logger.info(f"Writing {total_runs} runs to disk") + return self._export_to_disk( + project, + first_chunk, + dict(first_response.headers), + filter, + status, + aggregations, + format, + str(base_dir) if base_dir else None, + ) + + def _export_to_disk( + self, + project: str, + first_chunk: "pd.DataFrame", + first_headers: dict[str, str], + filter: str | None, + status: StatusFilter, + aggregations: list[MetricAggregationType] | None, + format: str, + base_dir: str | None, + ) -> str: + """Handle dataset export to disk - one file per chunk.""" + + if base_dir: + export_base = Path(base_dir) / "strikes-data" / "export-runs" + else: + export_base = Path("./strikes-data") / "export-runs" + + project_dir = export_base / project + + logger.info(f"Using project name: '{project}'") + logger.info(f"Project directory will be: {project_dir}") + + # Clean up old project data + if project_dir.exists(): + logger.info(f"Removing old export data: {project_dir}") + shutil.rmtree(project_dir) + + project_dir.mkdir(parents=True, exist_ok=True) + logger.info(f"Created directory: {project_dir}") + + page = 1 + total_exported_runs = 0 + + # Write first chunk + filename = self._write_chunk_file(first_chunk, project_dir, page, format) + chunk_run_count = len(first_chunk["run_id"].unique()) + total_exported_runs += chunk_run_count + logger.info(f"Page {page}: Wrote {filename} ({chunk_run_count} runs)") + + has_more = first_headers.get("x-has-more", "false") == "true" + total_runs = int(first_headers.get("x-total", "0")) + + logger.info(f"Total runs to export: {total_runs}") + + # Loop through remaining pages - SDK just increments page until has_more = false + while has_more: + page += 1 + logger.info(f"Fetching page {page}") + + try: + response = self.request( + "GET", + f"/strikes/projects/{project!s}/export/paginated", + params={ + "page": page, + "status": status, + **({"filter": filter} if filter else {}), + **({"aggregations": aggregations} if aggregations else {}), + }, + ) + + if not response.content: + logger.info("No more data - empty response") + break + + # Parse response + chunk_df = pd.read_parquet(io.BytesIO(response.content)) + + if chunk_df.empty: + logger.info("Empty chunk received - breaking") + break + + # Write chunk + filename = self._write_chunk_file(chunk_df, project_dir, page, format) + chunk_run_count = len(chunk_df["run_id"].unique()) + total_exported_runs += chunk_run_count + logger.info(f"Page {page}: Wrote {filename} ({chunk_run_count} runs)") + + # Check if API has more pages + has_more = response.headers.get("x-has-more", "false") == "true" + + except Exception as e: # noqa: BLE001 + logger.error(f"Error fetching page {page}: {e}") + break + + logger.info(f"Export complete to {project_dir}") + logger.info(f"Total pages: {page}, Total runs: {total_exported_runs}") + + return str(project_dir) + + def _write_chunk_file( + self, df: "pd.DataFrame", project_dir: Path, page: int, format: str + ) -> str: + """Write chunk to a single file with intelligent naming.""" + + if df.empty: + return "" + + total_runs = len(df["run_id"].unique()) + total_rows = len(df) + + logger.info(f"Writing chunk: {total_rows} rows, {total_runs} unique runs, page {page}") + + if total_runs == 1: + # Single run - use the run ID + run_id = df["run_id"].iloc[0] + base_name = f"run_{run_id}" + elif total_runs <= 10: # noqa: PLR2004 + # Few runs - include count + base_name = f"runs_{total_runs}_page_{page}" + elif total_runs <= 100: # noqa: PLR2004 + # Medium batch - include count + base_name = f"runs_{total_runs}_batch_{page}" + else: + base_name = f"runs_chunk_{page}_{total_runs}_runs" + + # Write based on format + if format == "csv": + filename = f"{base_name}.csv" + filepath = project_dir / filename + df.to_csv(filepath, index=False) + logger.info(f"Wrote CSV: {filepath}") + elif format == "parquet": + filename = f"{base_name}.parquet" + filepath = project_dir / filename + df.to_parquet(filepath, index=False) + logger.info(f"Wrote Parquet: {filepath}") + elif format == "json": + filename = f"{base_name}.json" + filepath = project_dir / filename + df.to_json(filepath, orient="records", indent=2) + logger.info(f"Wrote JSON: {filepath}") + elif format == "jsonl": + filename = f"{base_name}.jsonl" + filepath = project_dir / filename + df.to_json(filepath, orient="records", lines=True) + logger.info(f"Wrote JSONL: {filepath}") + else: + # Default to parquet + filename = f"{base_name}.parquet" + filepath = project_dir / filename + df.to_parquet(filepath, index=False) + logger.info(f"Wrote default Parquet: {filepath}") + + return filename def export_parameters( self, @@ -471,7 +647,6 @@ def export_parameters( Returns: A DataFrame containing the exported parameter data. """ - import pandas as pd # noqa: PLC0415 response = self.request( "GET", @@ -512,7 +687,7 @@ def export_timeseries( Returns: A DataFrame containing the exported timeseries data. """ - import pandas as pd # noqa: PLC0415 + import pandas as pd response = self.request( "GET", diff --git a/dreadnode/credential_manager.py b/dreadnode/credential_manager.py index afc9c2a3..daf81a5d 100644 --- a/dreadnode/credential_manager.py +++ b/dreadnode/credential_manager.py @@ -83,7 +83,7 @@ def _refresh_credentials(self) -> None: self._filesystem = new_filesystem self._prefix = f"{new_credentials.bucket}/{new_credentials.prefix}/" - logger.info("Storage credentials refreshed, valid until %s", self._credentials_expiry) + logger.info(f"Storage credentials refreshed, valid until {self._credentials_expiry}") except Exception: logger.exception("Failed to refresh storage credentials") From 70366a4e4e96d2f30bdcc3f6db8a50d2c421ef43 Mon Sep 17 00:00:00 2001 From: Raja Sekhar Rao Dheekonda Date: Fri, 29 Aug 2025 12:48:39 -0700 Subject: [PATCH 2/4] add doc updates --- docs/usage/export.mdx | 117 +++++++++++++++++++++++++++++++----------- 1 file changed, 86 insertions(+), 31 deletions(-) diff --git a/docs/usage/export.mdx b/docs/usage/export.mdx index 1bad59b7..e17a2719 100644 --- a/docs/usage/export.mdx +++ b/docs/usage/export.mdx @@ -31,46 +31,99 @@ import dreadnode api = dreadnode.api() # List all runs in a project -runs = api.strikes.list_runs('project-name') +runs = api.list_runs('project-name') print(f"Found {len(runs)} runs") # Get the trace for a specific run -trace = api.strikes.get_run_trace(runs[0].id) +trace = api.get_run_trace(runs[0].id) # Export different types of data as pandas DataFrames -df_metrics = api.strikes.export_metrics('project-name') -df_params = api.strikes.export_parameters('project-name') -df_runs = api.strikes.export_runs('project-name') -df_timeseries = api.strikes.export_timeseries('project-name') +df_metrics = api.export_metrics('project-name') +df_params = api.export_parameters('project-name') +export_runs_path = api.export_runs('project-name') +df_timeseries = api.export_timeseries('project-name') ``` ## Export Types ### Export Runs -Export all run data including parameters, tags, and aggregated metrics. +Export all run data including parameters, tags, and aggregated metrics. All runs are written to organized files on disk and returns the directory path. ```python -df = api.strikes.export_runs( +# Export runs - always writes to disk and returns directory path +export_path = api.export_runs( 'project-name', filter='tags.contains("production")', # Optional filter expression status='completed', # 'all', 'completed', or 'failed' - aggregations=['avg', 'min', 'max'] # Metrics aggregations to include + aggregations=['avg', 'min', 'max'], # Metrics aggregations to include + format='parquet', # Format: 'parquet', 'csv', 'json', 'jsonl' + base_dir='/my/data' # Optional: custom base directory ) + +print(f"Data exported to: {export_path}") +# Returns: /my/data/strikes-data/export-runs/project-name/ ``` -The resulting `DataFrame` contains: +```python +File Structure +Files are organized with intelligent naming based on chunk size: +./strikes-data/export-runs/project-name/ +├── runs_chunk_1_1000_runs.parquet # Large chunks (100+ runs) +├── runs_chunk_2_1000_runs.parquet +├── runs_50_batch_3.parquet # Medium chunks (11-100 runs) +└── runs_5_page_4.parquet # Small chunks (2-10 runs) +``` + +Loading Exported Data + +```python +import pandas as pd +from pathlib import Path + +def load_exported_runs(export_path: str) -> pd.DataFrame: + """Load all exported run files into a single DataFrame.""" + export_dir = Path(export_path) + + # For parquet files + parquet_files = list(export_dir.glob("*.parquet")) + if parquet_files: + df = pd.read_parquet(export_path) + return df + + # For CSV files + csv_files = list(export_dir.glob("*.csv")) + if csv_files: + chunks = [pd.read_csv(file) for file in csv_files] + return pd.concat(chunks, ignore_index=True) + + # For JSON files + json_files = list(export_dir.glob("*.json")) + if json_files: + chunks = [pd.read_json(file) for file in json_files] + return pd.concat(chunks, ignore_index=True) + + return pd.DataFrame() + +# Usage +export_path = api.export_runs('my-project') +df = load_exported_runs(export_path) +print(f"Loaded {len(df)} runs from exported files") +``` + +The resulting DataFrame contains: + - Run metadata (ID, name, start time, duration, status) -- Parameters (prefixed with `param_`) -- Tags (prefixed with `tag_`) -- Aggregated metrics (prefixed with `metric_`) +- Parameters (prefixed with param_) +- Tags (prefixed with tag_) +- Aggregated metrics (prefixed with metric_) ### Export Metrics Focus on the metrics data with detailed information about each metric point. ```python -df = api.strikes.export_metrics( +df = api.export_metrics( 'project-name', filter='name.contains("training")', # Optional filter expression status='completed', # 'all', 'completed', or 'failed' @@ -90,7 +143,7 @@ The resulting `DataFrame` contains: Analyze how different parameter values affect your metrics. ```python -df = api.strikes.export_parameters( +df = api.export_parameters( 'project-name', filter='metrics.accuracy.max > 0.9', # Optional filter expression status='completed', # 'all', 'completed', or 'failed' @@ -110,7 +163,7 @@ The resulting `DataFrame` shows how different parameter values influence your me Get time-based data for your metrics, with options for time representation. ```python -df = api.strikes.export_timeseries( +df = api.export_timeseries( 'project-name', filter='params.model == "resnet50"', # Optional filter expression status='completed', # 'all', 'completed', or 'failed' @@ -133,16 +186,17 @@ All export functions support filtering to narrow down the results. The filter ex ```python # Filter by tags -df = api.strikes.export_runs('project-name', filter='tags.contains("production")') +export_path = api.export_runs('project-name', filter='tags.contains("production")') +df = load_exported_runs(export_path) # Filter by parameters -df = api.strikes.export_metrics('project-name', filter='params.learning_rate < 0.01') +df = api.export_metrics('project-name', filter='params.learning_rate < 0.01') # Filter by metrics -df = api.strikes.export_parameters('project-name', filter='metrics.accuracy.max > 0.9') +df = api.export_parameters('project-name', filter='metrics.accuracy.max > 0.9') # Combine filters -df = api.strikes.export_timeseries( +df = api.export_timeseries( 'project-name', filter='params.model == "resnet50" and metrics.loss.min < 0.1' ) @@ -188,12 +242,12 @@ You can list all runs in a project and inspect their metadata: ```python # List all runs in a project -runs = api.strikes.list_runs('project-name') +runs = api.list_runs('project-name') for run in runs: print(run.id, run.name, run.status, run.start_time) # Get full details for a specific run -run = api.strikes.get_run(runs[0].id) +run = api.get_run(runs[0].id) print(run) ``` @@ -203,7 +257,7 @@ A trace provides a complete record of all tasks and spans executed during a run, ```python # Get the full trace for a run (as a flat list or tree) -trace = api.strikes.get_run_trace(run_id, format="flat") # or format="tree" +trace = api.get_run_trace(run_id, format="flat") # or format="tree" for span in trace: print(span.name, span.timestamp) ``` @@ -215,12 +269,12 @@ You can also pull just the tasks for a run, including their arguments (inputs), ```python # Get all tasks for a run -tasks = api.strikes.get_run_tasks(run_id, format="flat") +tasks = api.get_run_tasks(run_id, format="flat") for task in tasks: print(task.name, task.timestamp, task.inputs, task.output) # Get tasks as a tree (shows parent/child relationships) -task_tree = api.strikes.get_run_tasks(run_id, format="tree") +task_tree = api.get_run_tasks(run_id, format="tree") ``` - Each task object contains its input arguments, output, status, and timing. @@ -231,7 +285,7 @@ task_tree = api.strikes.get_run_tasks(run_id, format="tree") You can use the above methods to build a complete picture of how your code executed, what data was processed, and what results were produced. For example, to view all inputs and outputs for every task in a run: ```python -for task in api.strikes.get_run_tasks(run_id): +for task in api.get_run_tasks(run_id): print(f"Task: {task.name}") print(f" Inputs: {task.inputs}") print(f" Output: {task.output}") @@ -245,7 +299,7 @@ This is especially useful for debugging, auditing, or building custom visualizat ```python # Get parameters and their impact on metrics -df = api.strikes.export_parameters( +df = api.export_parameters( 'my-experiment', parameters=['learning_rate', 'batch_size', 'model'], metrics=['accuracy', 'loss'], @@ -267,7 +321,7 @@ plt.show() ```python # Get timeseries data for loss metrics -df = api.strikes.export_timeseries( +df = api.export_timeseries( 'my-experiment', metrics=['train_loss', 'val_loss'], time_axis='step' @@ -300,7 +354,7 @@ You can export trace information for debugging and performance analysis: ```python # Get the trace for a specific run -trace = api.strikes.get_run_trace(run_id) +trace = api.get_run_trace(run_id) # Extract spans and analyze them spans = [span for span in trace if hasattr(span, 'duration')] @@ -322,8 +376,9 @@ For more complex analyses, you can combine different exports: ```python # Get runs and parameters -runs_df = api.strikes.export_runs('project-name') -params_df = api.strikes.export_parameters('project-name') +runs_export_path = api.export_runs('project-name') +runs_df = load_exported_runs(runs_export_path) +params_df = api.export_parameters('project-name') # Join them for additional insights merged = runs_df.merge(params_df, left_on='run_id', right_on='run_id') From d6f56670081075464915489fe467f791cacd777e Mon Sep 17 00:00:00 2001 From: Raja Sekhar Rao Dheekonda Date: Fri, 29 Aug 2025 13:00:02 -0700 Subject: [PATCH 3/4] Fix ruff error --- dreadnode/api/client.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/dreadnode/api/client.py b/dreadnode/api/client.py index 41a81f6a..9108770d 100644 --- a/dreadnode/api/client.py +++ b/dreadnode/api/client.py @@ -48,9 +48,6 @@ ModelT = t.TypeVar("ModelT", bound=BaseModel) -if t.TYPE_CHECKING: - import pandas as pd - class ApiClient: """ From 75b583f63a9bf2d1bb0f2c78c472a034f55d9624 Mon Sep 17 00:00:00 2001 From: Raja Sekhar Rao Dheekonda Date: Fri, 29 Aug 2025 13:03:42 -0700 Subject: [PATCH 4/4] Fix ruff error --- dreadnode/api/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dreadnode/api/client.py b/dreadnode/api/client.py index 9108770d..53b4ab91 100644 --- a/dreadnode/api/client.py +++ b/dreadnode/api/client.py @@ -684,7 +684,6 @@ def export_timeseries( Returns: A DataFrame containing the exported timeseries data. """ - import pandas as pd response = self.request( "GET",