diff --git a/docs/Flash_Deploy_Guide.md b/docs/Flash_Deploy_Guide.md index ee3c7500..7a105039 100644 --- a/docs/Flash_Deploy_Guide.md +++ b/docs/Flash_Deploy_Guide.md @@ -1102,26 +1102,32 @@ On mothership boot: ## Testing & Debugging -### flash test-mothership +### flash build --preview -Local testing of mothership provisioning without deploying to RunPod. +Local testing of your distributed system without deploying to RunPod. ```bash -flash test-mothership +flash build --preview ``` **What it does**: -1. Loads flash_manifest.json from current directory -2. Creates temporary resource configs (prefixed with `tmp-`) -3. Simulates mothership provisioning locally -4. Displays resource creation output -5. Auto-cleanup on exit +1. Builds your project (creates archive, manifest) +2. Creates a Docker network for inter-container communication +3. Starts one Docker container per resource config: + - Mothership container (orchestrator) + - All worker containers (GPU, CPU, etc.) +4. Exposes mothership on `localhost:8000` +5. All containers communicate via Docker DNS +6. Auto-cleanup on exit (Ctrl+C) **Use Cases**: - Validate manifest structure before deployment - Test resource provisioning logic +- Debug distributed function calls +- Test endpoint auto-discovery +- Verify container networking -**Code Reference**: `src/tetra_rp/cli/commands/test_mothership.py` +**Code Reference**: `src/tetra_rp/cli/commands/preview.py` ### Local Docker Testing diff --git a/scripts/verify-manifest-constants.sh b/scripts/verify-manifest-constants.sh index 6d9d2aad..fd180f6a 100755 --- a/scripts/verify-manifest-constants.sh +++ b/scripts/verify-manifest-constants.sh @@ -27,7 +27,8 @@ BLUE='\033[0;34m' NC='\033[0m' REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" -EXAMPLES_DIR="/Users/deanquinanola/Github/python/flash-examples/01_getting_started/01_hello_world" +: "${FLASH_EXAMPLES_DIR:="$REPO_ROOT/../flash-examples/01_getting_started/01_hello_world"}" +EXAMPLES_DIR="$FLASH_EXAMPLES_DIR" TEST_RESULTS=() echo -e "${BLUE}============================================================${NC}" diff --git a/src/tetra_rp/cli/commands/build.py b/src/tetra_rp/cli/commands/build.py index f1619b2b..5355674e 100644 --- a/src/tetra_rp/cli/commands/build.py +++ b/src/tetra_rp/cli/commands/build.py @@ -203,6 +203,11 @@ def build_command( "--use-local-tetra", help="Bundle local tetra_rp source instead of PyPI version (for development/testing)", ), + preview: bool = typer.Option( + False, + "--preview", + help="Launch local test environment after successful build", + ), ): """ Build Flash application for deployment. @@ -214,6 +219,7 @@ def build_command( flash build # Build with all dependencies flash build --no-deps # Skip transitive dependencies flash build --keep-build # Keep temporary build directory + flash build --preview # Build and launch local test environment flash build -o my-app.tar.gz # Custom archive name flash build --exclude torch,torchvision # Exclude large packages (assume in base image) """ @@ -226,6 +232,11 @@ def build_command( console.print("Run [bold]flash init[/bold] to create a Flash project") raise typer.Exit(1) + # Auto-enable keep_build if preview requested (preview needs build directory) + if preview: + keep_build = True + logger.debug("Preview mode: automatically enabling keep_build") + # Create build directory first to ensure clean state before collecting files build_dir = create_build_directory(project_dir, app_name) @@ -478,6 +489,30 @@ def build_command( # Success summary _display_build_summary(archive_path, app_name, len(files), len(requirements)) + # Launch preview environment if requested + if preview: + console.print( + "\n[bold cyan]Launching multi-container preview...[/bold cyan]" + ) + console.print("[dim]Starting all endpoints locally in Docker...[/dim]\n") + + try: + from .preview import launch_preview + + # Manifest is in .flash/flash_manifest.json + manifest_path = project_dir / ".flash" / "flash_manifest.json" + + launch_preview( + build_dir=build_dir, + manifest_path=manifest_path, + ) + except KeyboardInterrupt: + console.print("\n[yellow]Preview stopped by user[/yellow]") + except Exception as e: + console.print(f"[red]Preview error:[/red] {e}") + logger.exception("Preview launch failed") + raise typer.Exit(1) + except KeyboardInterrupt: console.print("\n[yellow]Build cancelled by user[/yellow]") raise typer.Exit(1) diff --git a/src/tetra_rp/cli/commands/preview.py b/src/tetra_rp/cli/commands/preview.py new file mode 100644 index 00000000..242f9c19 --- /dev/null +++ b/src/tetra_rp/cli/commands/preview.py @@ -0,0 +1,469 @@ +"""Flash preview command - Launch local distributed system test environment.""" + +import json +import logging +import shutil +import subprocess +import time +from dataclasses import dataclass +from pathlib import Path + +import typer +from rich.console import Console +from rich.table import Table + +from tetra_rp.core.resources.constants import TETRA_CPU_LB_IMAGE + +logger = logging.getLogger(__name__) +console = Console() + +# Container archive mount path - expected location where containers unpack the archive +CONTAINER_ARCHIVE_PATH = "/root/.runpod/archive.tar.gz" + + +@dataclass +class ContainerInfo: + """Information about a running preview container.""" + + id: str # Docker container ID + name: str # Resource name (e.g., "mothership", "gpu_config") + port: int # Local port + is_mothership: bool + url: str # Connection URL + + +def launch_preview( + build_dir: Path, + manifest_path: Path, +) -> None: + """Launch full distributed system preview locally. + + Creates one Docker container per resource config: + - Mothership (orchestrator) + - All child endpoints (gpu, cpu, etc.) + + All containers connected via Docker network with inter-container + communication via Docker DNS. + + Args: + build_dir: Path to .flash/.build directory + manifest_path: Path to flash_manifest.json + + Raises: + typer.Exit: On errors (Docker issues, container startup failures) + """ + try: + # Verify prerequisites + _verify_docker_prerequisites() + + # Load and parse manifest + manifest = _load_manifest(manifest_path) + resources = _parse_resources_from_manifest(manifest) + + if not resources: + console.print("[red]Error:[/red] No resources found in manifest") + raise typer.Exit(1) + + # Create Docker network + network_name = _create_docker_network() + console.print(f"[dim]Docker network: {network_name}[/dim]\n") + + # Start containers for each resource + containers = [] + try: + for resource_name, resource_config in resources.items(): + container = _start_resource_container( + resource_name=resource_name, + resource_config=resource_config, + build_dir=build_dir, + network=network_name, + ) + containers.append(container) + except Exception as e: + # Cleanup on partial failure + console.print(f"[red]Error starting containers:[/red] {e}") + _cleanup_preview(containers, network_name) + raise typer.Exit(1) + + # Display connection info + _display_preview_info(containers) + + # Wait for user interrupt + try: + _wait_for_shutdown() + except KeyboardInterrupt: + console.print("\n[yellow]Shutting down preview...[/yellow]") + finally: + # Cleanup + _cleanup_preview(containers, network_name) + console.print("[green]✓ Preview stopped[/green]") + + except typer.Exit: + raise + except Exception as e: + console.print(f"[red]Preview error:[/red] {e}") + logger.exception("Preview launch failed") + raise typer.Exit(1) + + +def _verify_docker_prerequisites() -> None: + """Verify Docker and Docker daemon are available.""" + # Check Docker command exists + if not shutil.which("docker"): + console.print("[red]Error:[/red] Docker is not installed or not in PATH") + console.print( + "Install Docker from: https://www.docker.com/products/docker-desktop" + ) + raise typer.Exit(1) + + # Check Docker daemon is running + try: + subprocess.run( + ["docker", "ps"], + capture_output=True, + check=True, + timeout=5, + ) + except ( + subprocess.CalledProcessError, + subprocess.TimeoutExpired, + FileNotFoundError, + ): + console.print("[red]Error:[/red] Docker daemon is not running") + console.print("Start Docker and try again") + raise typer.Exit(1) + + +def _load_manifest(manifest_path: Path) -> dict: + """Load and parse manifest JSON file. + + Args: + manifest_path: Path to flash_manifest.json + + Returns: + Parsed manifest dictionary + + Raises: + typer.Exit: If manifest not found or invalid + """ + if not manifest_path.exists(): + console.print(f"[red]Error:[/red] Manifest not found at {manifest_path}") + raise typer.Exit(1) + + try: + with open(manifest_path) as f: + return json.load(f) + except json.JSONDecodeError as e: + console.print(f"[red]Error:[/red] Invalid manifest JSON: {e}") + raise typer.Exit(1) + + +def _parse_resources_from_manifest(manifest: dict) -> dict: + """Parse resource configs from manifest. + + Args: + manifest: Parsed manifest dictionary + + Returns: + Dictionary of resource_name -> resource_config + """ + resources = {} + + # Parse resources from manifest + manifest_resources = manifest.get("resources", {}) + for resource_name, resource_data in manifest_resources.items(): + resources[resource_name] = { + "is_mothership": resource_data.get("is_mothership", False), + "imageName": resource_data.get("imageName", TETRA_CPU_LB_IMAGE), + "functions": resource_data.get("functions", []), + } + + # Fallback: If no mothership found in manifest, create default + has_mothership = any(r.get("is_mothership") for r in resources.values()) + if not has_mothership: + resources["mothership"] = { + "is_mothership": True, + "imageName": TETRA_CPU_LB_IMAGE, + } + + return resources + + +def _create_docker_network() -> str: + """Create Docker network for preview containers. + + Returns: + Docker network name + + Raises: + typer.Exit: If network creation fails + """ + network_name = f"flash-preview-{int(time.time())}" + + try: + subprocess.run( + ["docker", "network", "create", network_name], + capture_output=True, + check=True, + ) + return network_name + except subprocess.CalledProcessError as e: + console.print("[red]Error:[/red] Failed to create Docker network") + error_detail = e.stderr.decode() if e.stderr else str(e) + console.print(f"[dim]{error_detail}[/dim]") + raise typer.Exit(1) + + +def _start_resource_container( + resource_name: str, + resource_config: dict, + build_dir: Path, + network: str, +) -> ContainerInfo: + """Start a single resource container. + + Args: + resource_name: Name of resource (e.g., "gpu_config") + resource_config: Resource configuration dictionary + build_dir: Path to .flash/.build directory + network: Docker network name + + Returns: + ContainerInfo with container details + + Raises: + Exception: If container startup fails + """ + # Determine Docker image + image = resource_config.get("imageName", TETRA_CPU_LB_IMAGE) + is_mothership = resource_config.get("is_mothership", False) + + # Assign port + port = _assign_container_port(resource_name, is_mothership) + + # Container name for Docker network DNS + container_name = f"flash-preview-{resource_name}" + + # Verify archive exists + archive_path = build_dir.parent / "archive.tar.gz" + if not archive_path.exists(): + raise FileNotFoundError( + f"Archive not found at {archive_path}. Run 'flash build' first." + ) + + # Build Docker command + docker_cmd = [ + "docker", + "run", + "-d", + "--name", + container_name, + "--network", + network, + "-v", + f"{archive_path}:{CONTAINER_ARCHIVE_PATH}:ro", + "-v", + f"{build_dir}:/workspace", + "-e", + f"FLASH_RESOURCE_NAME={resource_name}", + "-e", + f"RUNPOD_ENDPOINT_ID=preview-{resource_name}", + "-p", + f"{port}:80", + ] + + if is_mothership: + docker_cmd.extend(["-e", "FLASH_IS_MOTHERSHIP=true"]) + + # Use image's default CMD (uvicorn lb_handler:app) + docker_cmd.append(image) + + # Execute + try: + result = subprocess.run( + docker_cmd, + capture_output=True, + text=True, + check=True, + timeout=30, + ) + container_id = result.stdout.strip() + + logger.info(f"Started container {resource_name}: {container_id}") + + # Verify container is actually running (not crashed immediately) + _verify_container_health(container_id, resource_name) + + return ContainerInfo( + id=container_id, + name=resource_name, + port=port, + is_mothership=is_mothership, + url=f"http://localhost:{port}", + ) + + except subprocess.CalledProcessError as e: + error_msg = e.stderr if e.stderr else str(e) + console.print(f"[red]Error:[/red] Failed to start {resource_name} container") + console.print(f"[dim]{error_msg}[/dim]") + raise + + +def _verify_container_health(container_id: str, resource_name: str) -> None: + """Verify container is running and didn't crash immediately. + + Args: + container_id: Docker container ID + resource_name: Name of resource (for error messages) + + Raises: + Exception: If container is not running or crashed + """ + # Wait briefly for container to start and unpack archive + time.sleep(2) + + # Check container status + check_result = subprocess.run( + ["docker", "inspect", "--format", "{{.State.Status}}", container_id], + capture_output=True, + text=True, + ) + + status = check_result.stdout.strip() + if status != "running": + # Get logs for debugging + logs_result = subprocess.run( + ["docker", "logs", container_id], + capture_output=True, + text=True, + ) + error_msg = f"Container {resource_name} failed to start (status: {status})" + if logs_result.stderr: + error_msg += f"\n{logs_result.stderr[:500]}" + if logs_result.stdout: + error_msg += f"\n{logs_result.stdout[:500]}" + raise Exception(error_msg) + + +def _assign_container_port(resource_name: str, is_mothership: bool) -> int: + """Assign a local port for the container. + + Mothership uses 8000, workers use 8001+ + + Args: + resource_name: Name of resource + is_mothership: Whether this is mothership + + Returns: + Port number to use + """ + if is_mothership: + return 8000 + + # For workers, assign incrementally: 8001, 8002, etc. + # Built-in resources have fixed ports in the map; unknown resources are assigned + # deterministically based on name hash (guaranteed same port for same resource name + # across runs, but possible collisions if hash values wrap). This simple strategy + # works well for local preview testing. + port_map = { + "gpu_config": 8001, + "cpu_config": 8002, + } + + return port_map.get(resource_name, 8001 + (hash(resource_name) % 99)) + + +def _display_preview_info(containers: list[ContainerInfo]) -> None: + """Display information about running containers. + + Args: + containers: List of ContainerInfo objects + """ + table = Table(title="Preview Environment Running", show_header=True) + table.add_column("Resource", style="cyan") + table.add_column("Port", style="magenta") + table.add_column("URL", style="green") + table.add_column("Type", style="blue") + + # Sort: mothership first, then others + sorted_containers = sorted(containers, key=lambda c: (not c.is_mothership, c.name)) + + for container in sorted_containers: + container_type = "Mothership" if container.is_mothership else "Worker" + table.add_row( + container.name, str(container.port), container.url, container_type + ) + + console.print() + console.print(table) + console.print() + + # Display usage instructions + console.print("[bold]Access your application:[/bold]") + mothership = next((c for c in containers if c.is_mothership), None) + if mothership: + console.print(f" [dim]Main: {mothership.url}[/dim]") + console.print(f" [dim]Health: curl {mothership.url}/ping[/dim]") + + console.print() + console.print("[bold]Container communication:[/bold]") + console.print( + " [dim]Containers communicate via Docker DNS on internal port 80[/dim]" + ) + console.print(" [dim]Example: http://flash-preview-gpu_config:80[/dim]") + + console.print() + console.print("[bold][yellow]Press Ctrl+C to stop and cleanup[/yellow][/bold]") + console.print() + + +def _wait_for_shutdown() -> None: + """Block until user requests shutdown (Ctrl+C).""" + try: + while True: + time.sleep(0.1) + except KeyboardInterrupt: + raise + + +def _cleanup_preview(containers: list[ContainerInfo], network: str) -> None: + """Stop all containers and remove Docker network. + + Args: + containers: List of ContainerInfo objects + network: Docker network name + """ + # Stop containers + for container in containers: + try: + subprocess.run( + ["docker", "stop", container.id], + capture_output=True, + timeout=10, + ) + logger.info(f"Stopped container {container.name}") + except Exception as e: + logger.warning(f"Failed to stop container {container.name}: {e}") + + # Remove containers + for container in containers: + try: + subprocess.run( + ["docker", "rm", container.id], + capture_output=True, + timeout=10, + ) + logger.info(f"Removed container {container.name}") + except Exception as e: + logger.warning(f"Failed to remove container {container.name}: {e}") + + # Remove network + try: + subprocess.run( + ["docker", "network", "rm", network], + capture_output=True, + timeout=10, + ) + logger.info(f"Removed Docker network {network}") + except Exception as e: + logger.warning(f"Failed to remove Docker network: {e}") diff --git a/src/tetra_rp/cli/commands/test_mothership.py b/src/tetra_rp/cli/commands/test_mothership.py deleted file mode 100644 index 620c4415..00000000 --- a/src/tetra_rp/cli/commands/test_mothership.py +++ /dev/null @@ -1,460 +0,0 @@ -"""Flash test-mothership command - Test mothership boot locally with Docker.""" - -import logging -import shutil -import subprocess -import sys -import time -from pathlib import Path -from typing import Optional - -import typer -from rich.console import Console -from rich.panel import Panel - -from tetra_rp.core.resources.constants import TETRA_CPU_LB_IMAGE - -logger = logging.getLogger(__name__) -console = Console() - - -def _clear_resource_cache() -> None: - """Clear ResourceManager cache for clean test environment. - - Test-mothership deploys temporary endpoints that should not persist - between test runs. Clearing the cache prevents: - - Stale resources from previous tests being redeployed - - Name conflicts between old and new test resources - - Confusion from endpoints that no longer exist in the codebase - """ - cache_file = Path.home() / ".runpod" / "resources.pkl" - if cache_file.exists(): - try: - cache_file.unlink() - console.print( - "[dim]Cleared resource cache for clean test environment[/dim]" - ) - logger.debug(f"Removed cache file: {cache_file}") - except Exception as e: - console.print(f"[yellow]Warning: Could not clear cache: {e}[/yellow]") - logger.warning(f"Failed to remove cache file {cache_file}: {e}") - - -def test_mothership_command( - image: str = typer.Option( - TETRA_CPU_LB_IMAGE, - "--image", - help="Docker image to use for testing", - ), - port: int = typer.Option(8000, "--port", help="Local port to expose"), - endpoint_id: Optional[str] = typer.Option( - None, "--endpoint-id", help="RunPod endpoint ID (auto-generated if omitted)" - ), - build_dir: str = typer.Option( - ".flash/.build", "--build-dir", help="Path to build directory" - ), - no_build: bool = typer.Option( - False, "--no-build", help="Skip running flash build first" - ), -): - """ - Test mothership boot locally with Docker. - - Runs the application in a Docker container with mothership provisioning enabled. - This simulates the mothership deployment process, including auto-provisioning of - child resources to RunPod. On shutdown (Ctrl+C or docker stop), automatically - cleans up all deployed endpoints. - - Examples: - flash test-mothership # Default setup - flash test-mothership --port 9000 # Custom port - flash test-mothership --image custom:latest # Custom Docker image - flash test-mothership --no-build # Skip flash build step - """ - try: - # Verify prerequisites - _verify_prerequisites() - - # Clear resource cache to prevent stale entries in test mode - _clear_resource_cache() - - # Build if needed - if not no_build: - _run_flash_build() - - # Generate endpoint ID if not provided - if not endpoint_id: - endpoint_id = f"test-mothership-{int(time.time())}" - - # Create entrypoint script for cleanup on shutdown - _create_entrypoint_script(build_dir) - - # Display configuration - _display_test_objectives() - _display_config(build_dir, image, port, endpoint_id) - - # Build Docker command - docker_cmd = _build_docker_command(image, port, endpoint_id, build_dir) - - # Run Docker container - _run_docker_container(docker_cmd, port) - - except typer.Exit: - raise - except Exception as e: - console.print(f"[red]Error:[/red] {e}") - logger.exception("Unexpected error in test_mothership_command") - raise typer.Exit(1) - - -def _verify_prerequisites() -> None: - """Verify that Docker and RUNPOD_API_KEY are available.""" - # Check Docker - result = shutil.which("docker") - if not result: - console.print("[red]Error:[/red] Docker is not installed or not in PATH") - console.print( - "Install Docker from: https://www.docker.com/products/docker-desktop" - ) - raise typer.Exit(1) - - # Check Docker daemon - try: - subprocess.run( - ["docker", "ps"], - capture_output=True, - check=True, - timeout=5, - ) - except ( - subprocess.CalledProcessError, - subprocess.TimeoutExpired, - FileNotFoundError, - ): - console.print("[red]Error:[/red] Docker daemon is not running") - console.print("Start Docker and try again") - raise typer.Exit(1) - - # Check RUNPOD_API_KEY - import os - - if not os.getenv("RUNPOD_API_KEY"): - console.print("[red]Error:[/red] RUNPOD_API_KEY environment variable not set") - console.print("Set it with: export RUNPOD_API_KEY=your-api-key") - raise typer.Exit(1) - - -def _run_flash_build() -> None: - """Run flash build command.""" - console.print("[cyan]Running flash build...[/cyan]") - result = subprocess.run( - ["flash", "build", "--keep-build", "--use-local-tetra"], - capture_output=False, - ) - if result.returncode != 0: - console.print("[red]Error:[/red] flash build failed") - raise typer.Exit(1) - - -def _get_manifest_provisioning_code() -> str: - """Generate Python code to provision resources from flash_manifest.json. - - Uses the manifest as a guide to discover which modules contain resource configs. - Imports the actual resource configs from source (endpoint files) to get full - configuration (workers, GPUs, etc.). This ensures test-mothership provisions - exactly what was built, without discovering skeleton templates. - - Returns: - Python code as a string to be executed - """ - return """ -import asyncio -import importlib -import json -import logging -import os -import sys -from pathlib import Path -from tetra_rp.core.deployment import DeploymentOrchestrator - -logger = logging.getLogger(__name__) - -# Configure logging to match the rest of the system -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s | %(levelname)-5s | %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' -) - -async def provision_from_manifest(): - manifest_path = Path("flash_manifest.json") - if not manifest_path.exists(): - print("[dim]No flash_manifest.json found, skipping manifest-based provisioning[/dim]") - return - - try: - with open(manifest_path) as f: - manifest = json.load(f) - except Exception as e: - logger.error(f"Error loading manifest: {e}") - return - - # Set test-mothership mode for resource naming - os.environ["FLASH_IS_TEST_MOTHERSHIP"] = "true" - - resources = [] - for resource_name, resource_data in manifest.get("resources", {}).items(): - try: - # Get list of modules that contain this resource's functions - functions = resource_data.get("functions", []) - if not functions: - logger.warning(f"No functions found for resource {resource_name}") - continue - - # Import the first function's module to get access to the config - first_func = functions[0] - module_name = first_func.get("module") - if not module_name: - logger.warning(f"No module found for resource {resource_name}") - continue - - # Import the module and look for resource config variable - try: - module = importlib.import_module(module_name) - - config = None - - # Try config_variable from manifest first (most reliable) - config_variable = resource_data.get("config_variable") - if config_variable and hasattr(module, config_variable): - config = getattr(module, config_variable) - logger.info(f"Loaded resource config from {module_name}: {config.name} (variable: {config_variable})") - else: - # Fallback to old search logic for backward compatibility - config_names = [ - "gpu_config", "cpu_config", - "resource_config", "config", - f"{resource_name.lower()}_config", - ] - - for config_name in config_names: - if hasattr(module, config_name): - config = getattr(module, config_name) - break - - if config: - logger.info(f"Loaded resource config from {module_name}: {config.name}") - else: - logger.warning(f"No config variable found in {module_name} for {resource_name}") - - if config: - # Apply test-mothership naming convention - if not resource_name.startswith("tmp-"): - config.name = f"tmp-{resource_name}" - else: - config.name = resource_name - - resources.append(config) - - except Exception as e: - logger.warning(f"Failed to import resource config from {module_name}: {e}") - - except Exception as e: - logger.error(f"Failed to process resource {resource_name}: {e}") - - if resources: - try: - logger.info(f"Provisioning {len(resources)} resource(s)...") - orchestrator = DeploymentOrchestrator() - await orchestrator.deploy_all(resources, show_progress=True) - except Exception as e: - logger.warning(f"Provisioning error: {e}") - else: - logger.warning("No resources loaded from manifest") - -asyncio.run(provision_from_manifest()) -""" - - -def _create_entrypoint_script(build_dir: str) -> None: - """Create entrypoint.sh script for Docker container. - - This script handles signal trapping and cleanup on shutdown. - It runs manifest-based provisioning then flash run (without --auto-provision - to avoid duplicate discovery from bundled dependencies). - """ - build_path = Path(build_dir) - - # Ensure build directory exists - if not build_path.exists(): - console.print( - f"[yellow]Warning:[/yellow] Build directory {build_dir} does not exist" - ) - return - - script_path = build_path / "entrypoint.sh" - provisioning_script_path = build_path / "provision_from_manifest.py" - - # Write provisioning script to file - provisioning_code = _get_manifest_provisioning_code() - provisioning_script_path.write_text(provisioning_code) - - script_content = """#!/bin/bash -set -e - -# Ensure bundled dependencies are available to Python -# /workspace contains all the pip-installed packages (.so files, pure Python modules, etc) -export PYTHONPATH="/workspace:${PYTHONPATH}" - -# Signal test-mothership provisioning context for resource naming -export FLASH_IS_TEST_MOTHERSHIP="true" - -cleanup() { - echo "" - echo "==========================================" - echo "Shutting down test-mothership..." - echo "Cleaning up all temporary endpoints..." - echo "==========================================" - python -m tetra_rp.cli.main undeploy --all --force || true - echo "Cleanup complete" - exit 0 -} - -trap cleanup SIGTERM SIGINT - -echo "==========================================" -echo "Starting mothership test environment" -echo "Phase 1: Mothership container startup" -echo "==========================================" - -# Provision resources from manifest before starting server -# This uses the same method as production mothership, avoiding -# false discovery from bundled skeleton templates -python3 provision_from_manifest.py - -# Start server without --auto-provision to avoid re-discovering resources -python -m tetra_rp.cli.main run --host 0.0.0.0 --port 8000 & -PID=$! - -wait $PID -""" - - script_path.write_text(script_content) - script_path.chmod(0o755) - - -def _display_test_objectives() -> None: - """Display what test-mothership tests and important warnings.""" - objectives_text = """[bold cyan]What this tests:[/bold cyan] -• Mothership container deployment -• Child endpoint auto-provisioning via State Manager -• Manifest persistence and State Manager integration - -[bold yellow]⚠ Important:[/bold yellow] -• Uses peer-to-peer architecture (no hub-and-spoke) -• All endpoints query State Manager directly -• Child endpoints are [bold]temporary[/bold] - prefixed with 'tmp-' -• All child endpoints will be [bold]automatically cleaned up[/bold] on shutdown - -[dim]These are test deployments only. Use 'flash deploy' for production.[/dim]""" - - console.print( - Panel( - objectives_text, - title="Test-Mothership Overview", - border_style="cyan", - ) - ) - console.print() - - -def _display_config(build_dir: str, image: str, port: int, endpoint_id: str) -> None: - """Display test configuration.""" - config_text = f"""[bold]Build directory:[/bold] {build_dir} -[bold]Command:[/bold] flash run -[bold]Docker image:[/bold] {image} -[bold]Endpoint ID:[/bold] {endpoint_id} -[bold]Port:[/bold] http://localhost:{port}""" - - console.print(Panel(config_text, title="🚀 Starting mothership test container")) - - -def _build_docker_command( - image: str, port: int, endpoint_id: str, build_dir: str -) -> list: - """Build the docker run command.""" - import os - - build_path = Path(build_dir).resolve() - - cmd = [ - "docker", - "run", - "--platform", - "linux/amd64", - "--rm", - ] - - # Add interactive flags only if running in a TTY environment - if sys.stdin.isatty() and sys.stdout.isatty(): - cmd.extend(["-it"]) - - cmd.extend( - [ - "-e", - "FLASH_IS_MOTHERSHIP=true", - "-e", - "FLASH_IS_TEST_MOTHERSHIP=true", - "-e", - f"RUNPOD_ENDPOINT_ID={endpoint_id}", - "-e", - f"RUNPOD_API_KEY={os.getenv('RUNPOD_API_KEY')}", - "-e", - "FLASH_MANIFEST_PATH=/workspace/flash_manifest.json", - "-v", - f"{build_path}:/workspace", - "-p", - f"{port}:8000", - "--workdir", - "/workspace", - image, - "/workspace/entrypoint.sh", - ] - ) - - return cmd - - -def _run_docker_container(docker_cmd: list, port: int) -> None: - """Run the Docker container with helpful output.""" - console.print("[cyan]✅ Container started successfully[/cyan]\n") - console.print(f"[dim]Local: http://localhost:{port}[/dim]\n") - console.print("[dim]Verification commands:[/dim]") - console.print(f"[dim] Health: curl http://localhost:{port}/ping[/dim]") - console.print( - "[dim] State Manager Query: All endpoints query State Manager directly[/dim]" - ) - console.print("[dim] No /manifest endpoint - peer-to-peer architecture[/dim]\n") - console.print("[bold]Test phases:[/bold]") - console.print(" [dim]1. Mothership startup and health check[/dim]") - console.print( - " [dim]2. Auto-provisioning child endpoints (prefixed with 'tmp-')[/dim]" - ) - console.print(" [dim]3. Manifest update with child endpoint URLs[/dim]") - console.print() - console.print("[dim]Watch container logs below for provisioning progress...[/dim]") - console.print("[dim]Press Ctrl+C to stop and cleanup all endpoints.\n[/dim]") - - try: - result = subprocess.run(docker_cmd, check=False, capture_output=False) - if result.returncode != 0: - console.print( - "\n[yellow]Container exited with an error.[/yellow] " - "Check the logs above for details. Common issues: missing RUNPOD_API_KEY, " - "port already in use, or Docker daemon not running." - ) - except KeyboardInterrupt: - console.print("\n[yellow]Container stopped[/yellow]") - except Exception as e: - console.print(f"[red]Error running container:[/red] {e}") - raise typer.Exit(1) diff --git a/src/tetra_rp/cli/docs/README.md b/src/tetra_rp/cli/docs/README.md index f75e81f4..44a09278 100644 --- a/src/tetra_rp/cli/docs/README.md +++ b/src/tetra_rp/cli/docs/README.md @@ -63,12 +63,14 @@ flash build [OPTIONS] - `--keep-build`: Keep `.flash/.build` directory after creating archive - `--output, -o`: Custom archive name (default: archive.tar.gz) - `--exclude`: Comma-separated packages to exclude (e.g., 'torch,torchvision') +- `--preview`: Launch local test environment after build **Example:** ```bash flash build +flash build --preview # Build and test locally flash build --keep-build --output deploy.tar.gz -flash build --exclude torch,torchvision,torchaudio # Exclude large packages +flash build --exclude torch,torchvision,torchaudio # Exclude large packages ``` [Full documentation](./flash-build.md) diff --git a/src/tetra_rp/cli/docs/flash-build.md b/src/tetra_rp/cli/docs/flash-build.md index c01c80fa..801e2001 100644 --- a/src/tetra_rp/cli/docs/flash-build.md +++ b/src/tetra_rp/cli/docs/flash-build.md @@ -13,6 +13,8 @@ flash build [OPTIONS] - `--no-deps`: Skip transitive dependencies during pip install (default: false) - `--keep-build`: Keep `.flash/.build` directory after creating archive (default: false) - `--output, -o`: Custom archive name (default: archive.tar.gz) +- `--exclude`: Comma-separated packages to exclude (e.g., 'torch,torchvision') +- `--preview`: Launch local test environment after successful build (auto-enables `--keep-build`) ## Examples @@ -26,6 +28,9 @@ flash build --no-deps # Keep temporary build directory for inspection flash build --keep-build +# Build and launch local test environment +flash build --preview + # Custom output filename flash build --output my-app.tar.gz @@ -87,6 +92,33 @@ Only installs direct dependencies specified in `@remote` decorators: - Smaller deployment packages - Useful when base image already includes dependencies +## Preview Environment + +```bash +flash build --preview +``` + +Launch a local Docker-based test environment immediately after building. This allows you to test your distributed system locally before deploying to RunPod. + +**What happens:** +1. Builds your project (creates archive, manifest) +2. Creates a Docker network for inter-container communication +3. Starts one Docker container per resource config: + - Mothership container (orchestrator) + - All worker containers (GPU, CPU, etc.) +4. Exposes the mothership on `localhost:8000` +5. All containers communicate via Docker DNS +6. On shutdown (Ctrl+C), automatically stops and removes all containers + +**When to use:** +- Test deployment before production +- Validate manifest structure +- Debug resource provisioning +- Verify endpoint auto-discovery +- Test distributed function calls + +**Note:** `--preview` automatically enables `--keep-build` since the preview needs the build directory. + ## Keep Build Directory ```bash diff --git a/src/tetra_rp/cli/main.py b/src/tetra_rp/cli/main.py index 6ec076ec..f1735aba 100644 --- a/src/tetra_rp/cli/main.py +++ b/src/tetra_rp/cli/main.py @@ -9,7 +9,6 @@ init, run, build, - test_mothership, # resource, deploy, apps, @@ -39,7 +38,6 @@ def get_version() -> str: app.command("init")(init.init_command) app.command("run")(run.run_command) app.command("build")(build.build_command) -app.command("test-mothership")(test_mothership.test_mothership_command) # app.command("report")(resource.report_command) diff --git a/src/tetra_rp/core/resources/constants.py b/src/tetra_rp/core/resources/constants.py index 9dbbed30..5c0c5b33 100644 --- a/src/tetra_rp/core/resources/constants.py +++ b/src/tetra_rp/core/resources/constants.py @@ -18,19 +18,18 @@ def _endpoint_domain_from_base_url(base_url: str) -> str: ENDPOINT_DOMAIN = _endpoint_domain_from_base_url(runpod.endpoint_url_base) + # Docker image configuration TETRA_IMAGE_TAG = os.environ.get("TETRA_IMAGE_TAG", "latest") -TETRA_GPU_IMAGE = os.environ.get( - "TETRA_GPU_IMAGE", f"runpod/tetra-rp:{TETRA_IMAGE_TAG}" -) +_RESOLVED_TAG = TETRA_IMAGE_TAG + +TETRA_GPU_IMAGE = os.environ.get("TETRA_GPU_IMAGE", f"runpod/tetra-rp:{_RESOLVED_TAG}") TETRA_CPU_IMAGE = os.environ.get( - "TETRA_CPU_IMAGE", f"runpod/tetra-rp-cpu:{TETRA_IMAGE_TAG}" -) -TETRA_LB_IMAGE = os.environ.get( - "TETRA_LB_IMAGE", f"runpod/tetra-rp-lb:{TETRA_IMAGE_TAG}" + "TETRA_CPU_IMAGE", f"runpod/tetra-rp-cpu:{_RESOLVED_TAG}" ) +TETRA_LB_IMAGE = os.environ.get("TETRA_LB_IMAGE", f"runpod/tetra-rp-lb:{_RESOLVED_TAG}") TETRA_CPU_LB_IMAGE = os.environ.get( - "TETRA_CPU_LB_IMAGE", f"runpod/tetra-rp-lb-cpu:{TETRA_IMAGE_TAG}" + "TETRA_CPU_LB_IMAGE", f"runpod/tetra-rp-lb-cpu:{_RESOLVED_TAG}" ) # Worker configuration defaults diff --git a/tests/unit/cli/commands/test_preview.py b/tests/unit/cli/commands/test_preview.py new file mode 100644 index 00000000..524fea27 --- /dev/null +++ b/tests/unit/cli/commands/test_preview.py @@ -0,0 +1,418 @@ +"""Unit tests for flash preview command.""" + +import json + +import pytest + +from unittest.mock import MagicMock, patch + +from tetra_rp.cli.commands.preview import ( + CONTAINER_ARCHIVE_PATH, + ContainerInfo, + _assign_container_port, + _display_preview_info, + _load_manifest, + _parse_resources_from_manifest, + _verify_container_health, +) + + +class TestContainerInfo: + """Tests for ContainerInfo dataclass.""" + + def test_container_info_creation(self): + """Test creating a ContainerInfo instance.""" + info = ContainerInfo( + id="abc123", + name="gpu_config", + port=8001, + is_mothership=False, + url="http://localhost:8001", + ) + + assert info.id == "abc123" + assert info.name == "gpu_config" + assert info.port == 8001 + assert info.is_mothership is False + assert info.url == "http://localhost:8001" + + def test_mothership_container_info(self): + """Test creating mothership container info.""" + info = ContainerInfo( + id="def456", + name="mothership", + port=8000, + is_mothership=True, + url="http://localhost:8000", + ) + + assert info.is_mothership is True + assert info.port == 8000 + + +class TestAssignContainerPort: + """Tests for _assign_container_port function.""" + + def test_mothership_gets_port_8000(self): + """Test that mothership always gets port 8000.""" + assert _assign_container_port("mothership", True) == 8000 + + def test_gpu_config_gets_port_8001(self): + """Test that gpu_config gets port 8001.""" + port = _assign_container_port("gpu_config", False) + assert port == 8001 + + def test_cpu_config_gets_port_8002(self): + """Test that cpu_config gets port 8002.""" + port = _assign_container_port("cpu_config", False) + assert port == 8002 + + def test_unknown_resource_gets_deterministic_port(self): + """Test that unknown resources get deterministic ports.""" + port1 = _assign_container_port("custom_worker", False) + port2 = _assign_container_port("custom_worker", False) + # Same resource name should get same port + assert port1 == port2 + # Port should be in valid range + assert 8000 < port1 < 8100 + + +class TestLoadManifest: + """Tests for _load_manifest function.""" + + def test_load_valid_manifest(self, tmp_path): + """Test loading a valid manifest file.""" + manifest = {"resources": {"gpu_config": {"functions": []}}} + manifest_path = tmp_path / "manifest.json" + manifest_path.write_text(json.dumps(manifest)) + + loaded = _load_manifest(manifest_path) + assert loaded == manifest + + def test_manifest_not_found(self, tmp_path): + """Test error when manifest file doesn't exist.""" + import typer + + manifest_path = tmp_path / "nonexistent.json" + + with pytest.raises(typer.Exit): + _load_manifest(manifest_path) + + def test_invalid_json(self, tmp_path): + """Test error when manifest contains invalid JSON.""" + import typer + + manifest_path = tmp_path / "manifest.json" + manifest_path.write_text("{ invalid json") + + with pytest.raises(typer.Exit): + _load_manifest(manifest_path) + + +class TestParseResourcesFromManifest: + """Tests for _parse_resources_from_manifest function.""" + + def test_parse_empty_manifest(self): + """Test parsing manifest with no resources creates default mothership.""" + manifest = {"resources": {}} + resources = _parse_resources_from_manifest(manifest) + + # Should create default mothership when none specified + assert "mothership" in resources + assert resources["mothership"]["is_mothership"] is True + + def test_parse_manifest_with_gpu_config_creates_default_mothership(self): + """Test parsing manifest with GPU resource but no mothership.""" + manifest = { + "resources": { + "gpu_config": { + "imageName": "custom-gpu:latest", + "is_mothership": False, + "functions": [{"name": "gpu_fn", "module": "workers.gpu.endpoint"}], + } + } + } + + resources = _parse_resources_from_manifest(manifest) + + # Should include default mothership since none specified + assert "mothership" in resources + assert resources["mothership"]["is_mothership"] is True + + # Should include gpu_config + assert "gpu_config" in resources + assert resources["gpu_config"]["imageName"] == "custom-gpu:latest" + assert resources["gpu_config"]["is_mothership"] is False + + def test_parse_manifest_with_explicit_mothership(self): + """Test parsing manifest with explicit mothership resource.""" + manifest = { + "resources": { + "my_custom_mothership": { + "imageName": "custom-lb:latest", + "is_mothership": True, + "functions": [], + }, + "gpu_config": { + "imageName": "gpu:latest", + "is_mothership": False, + "functions": [], + }, + } + } + + resources = _parse_resources_from_manifest(manifest) + + # Should NOT create default mothership + assert "mothership" not in resources + + # Should use explicit mothership from manifest + assert "my_custom_mothership" in resources + assert resources["my_custom_mothership"]["is_mothership"] is True + + # Should include worker + assert "gpu_config" in resources + assert resources["gpu_config"]["is_mothership"] is False + + def test_parse_manifest_with_multiple_resources(self): + """Test parsing manifest with multiple resources.""" + manifest = { + "resources": { + "gpu_config": { + "imageName": "gpu:latest", + "is_mothership": False, + "functions": [], + }, + "cpu_config": { + "imageName": "cpu:latest", + "is_mothership": False, + "functions": [], + }, + } + } + + resources = _parse_resources_from_manifest(manifest) + + assert len(resources) == 3 # mothership (default) + gpu + cpu + assert "mothership" in resources + assert "gpu_config" in resources + assert "cpu_config" in resources + + def test_parse_manifest_with_named_mothership(self): + """Test manifest with resource literally named 'mothership'.""" + manifest = { + "resources": { + "mothership": { + "imageName": "custom-mothership:latest", + "is_mothership": True, + "functions": [], + } + } + } + + resources = _parse_resources_from_manifest(manifest) + + # Should use the mothership from manifest + assert "mothership" in resources + assert resources["mothership"]["imageName"] == "custom-mothership:latest" + assert resources["mothership"]["is_mothership"] is True + + def test_parse_manifest_missing_image_name(self): + """Test parsing resource without imageName uses default.""" + manifest = { + "resources": {"gpu_config": {"is_mothership": False, "functions": []}} + } + + resources = _parse_resources_from_manifest(manifest) + + assert "gpu_config" in resources + # Should have a default imageName + assert "imageName" in resources["gpu_config"] + + def test_parse_manifest_missing_is_mothership_defaults_false(self): + """Test parsing resource without is_mothership defaults to False.""" + manifest = { + "resources": {"gpu_config": {"imageName": "gpu:latest", "functions": []}} + } + + resources = _parse_resources_from_manifest(manifest) + + assert "gpu_config" in resources + assert resources["gpu_config"]["is_mothership"] is False + # Should create default mothership since none explicitly marked + assert "mothership" in resources + assert resources["mothership"]["is_mothership"] is True + + +class TestDisplayPreviewInfo: + """Tests for _display_preview_info function.""" + + def test_display_with_mothership_and_workers(self): + """Test display with multiple containers.""" + containers = [ + ContainerInfo( + id="abc123", + name="mothership", + port=8000, + is_mothership=True, + url="http://localhost:8000", + ), + ContainerInfo( + id="def456", + name="gpu_config", + port=8001, + is_mothership=False, + url="http://localhost:8001", + ), + ] + + # Should not raise an exception + _display_preview_info(containers) + + def test_display_sorts_mothership_first(self): + """Test that display sorts mothership first.""" + containers = [ + ContainerInfo( + id="def456", + name="gpu_config", + port=8001, + is_mothership=False, + url="http://localhost:8001", + ), + ContainerInfo( + id="abc123", + name="mothership", + port=8000, + is_mothership=True, + url="http://localhost:8000", + ), + ] + + # Should not raise an exception + _display_preview_info(containers) + + def test_display_with_single_mothership(self): + """Test display with only mothership.""" + containers = [ + ContainerInfo( + id="abc123", + name="mothership", + port=8000, + is_mothership=True, + url="http://localhost:8000", + ) + ] + + # Should not raise an exception + _display_preview_info(containers) + + +class TestVerifyContainerHealth: + """Tests for _verify_container_health function.""" + + @patch("tetra_rp.cli.commands.preview.subprocess.run") + @patch("tetra_rp.cli.commands.preview.time.sleep") + def test_container_running_succeeds(self, mock_sleep, mock_run): + """Test that running container passes health check.""" + # Mock docker inspect to return 'running' status + mock_run.return_value = MagicMock( + stdout="running\n", + stderr="", + ) + + # Should not raise + _verify_container_health("abc123", "test_resource") + mock_sleep.assert_called_once_with(2) + + @patch("tetra_rp.cli.commands.preview.subprocess.run") + @patch("tetra_rp.cli.commands.preview.time.sleep") + def test_container_exited_raises_error(self, mock_sleep, mock_run): + """Test that exited container raises error.""" + # Mock docker inspect to return 'exited' status + # First call returns 'exited', second call (logs) returns output + mock_run.side_effect = [ + MagicMock(stdout="exited\n", stderr=""), + MagicMock(stdout="Error unpacking archive", stderr=""), + ] + + with pytest.raises(Exception, match="failed to start"): + _verify_container_health("abc123", "test_resource") + + @patch("tetra_rp.cli.commands.preview.subprocess.run") + @patch("tetra_rp.cli.commands.preview.time.sleep") + def test_container_health_check_includes_logs(self, mock_sleep, mock_run): + """Test that error message includes container logs.""" + error_log = "FileNotFoundError: archive.tar.gz not found" + # First call returns 'exited', second call returns logs + mock_run.side_effect = [ + MagicMock(stdout="exited\n", stderr=""), + MagicMock(stdout=error_log, stderr=""), + ] + + with pytest.raises(Exception) as exc_info: + _verify_container_health("abc123", "test_resource") + + assert error_log in str(exc_info.value) + + +class TestStartResourceContainer: + """Tests for archive validation in _start_resource_container.""" + + def test_archive_path_validation(self, tmp_path): + """Test that missing archive raises FileNotFoundError.""" + from tetra_rp.cli.commands.preview import _start_resource_container + + build_dir = tmp_path / ".flash" / ".build" + build_dir.mkdir(parents=True) + + # Archive does not exist + resource_config = {"imageName": "test:latest"} + + with pytest.raises(FileNotFoundError, match="Archive not found"): + _start_resource_container( + resource_name="test", + resource_config=resource_config, + build_dir=build_dir, + network="test-network", + ) + + @patch("tetra_rp.cli.commands.preview.subprocess.run") + @patch("tetra_rp.cli.commands.preview._verify_container_health") + def test_archive_mount_in_docker_command(self, mock_health, mock_run, tmp_path): + """Test that archive is mounted at correct location.""" + from tetra_rp.cli.commands.preview import _start_resource_container + + build_dir = tmp_path / ".flash" / ".build" + build_dir.mkdir(parents=True) + + # Create archive + archive_path = tmp_path / ".flash" / "archive.tar.gz" + archive_path.parent.mkdir(parents=True, exist_ok=True) + archive_path.write_text("dummy archive") + + # Mock docker run + mock_run.return_value = MagicMock(stdout="container123\n", stderr="") + + resource_config = {"imageName": "test:latest"} + + _start_resource_container( + resource_name="test", + resource_config=resource_config, + build_dir=build_dir, + network="test-network", + ) + + # Verify docker command includes archive mount + command_list = mock_run.call_args.args[ + 0 + ] # First positional arg is the command list + assert "-v" in command_list + # Find the volume mount for archive + v_indices = [i for i, arg in enumerate(command_list) if arg == "-v"] + archive_mounts = [ + command_list[i + 1] + for i in v_indices + if "archive.tar.gz" in command_list[i + 1] + ] + assert len(archive_mounts) > 0 + assert f"{CONTAINER_ARCHIVE_PATH}:ro" in archive_mounts[0] diff --git a/tests/unit/resources/test_live_load_balancer.py b/tests/unit/resources/test_live_load_balancer.py index 7bb313e5..88795aff 100644 --- a/tests/unit/resources/test_live_load_balancer.py +++ b/tests/unit/resources/test_live_load_balancer.py @@ -29,9 +29,10 @@ def test_live_load_balancer_creation_with_local_tag(self, monkeypatch): importlib.reload(ls_module) lb = ls_module.LiveLoadBalancer(name="test-lb") - assert lb.imageName == "runpod/tetra-rp-lb:local" + expected_image = "runpod/tetra-rp-lb:local" + assert lb.imageName == expected_image assert lb.template is not None - assert lb.template.imageName == "runpod/tetra-rp-lb:local" + assert lb.template.imageName == expected_image def test_live_load_balancer_default_image_tag(self): """Test LiveLoadBalancer uses default image tag.""" @@ -194,9 +195,10 @@ def test_cpu_live_load_balancer_creation_with_local_tag(self, monkeypatch): importlib.reload(ls_module) lb = ls_module.CpuLiveLoadBalancer(name="test-lb") - assert lb.imageName == "runpod/tetra-rp-lb-cpu:local" + expected_image = "runpod/tetra-rp-lb-cpu:local" + assert lb.imageName == expected_image assert lb.template is not None - assert lb.template.imageName == "runpod/tetra-rp-lb-cpu:local" + assert lb.template.imageName == expected_image def test_cpu_live_load_balancer_default_image_tag(self): """Test CpuLiveLoadBalancer uses default CPU LB image tag."""