diff --git a/.gitignore b/.gitignore index e30da25..0f05876 100644 --- a/.gitignore +++ b/.gitignore @@ -206,3 +206,6 @@ marimo/_static/ marimo/_lsp/ __marimo__/ *.pkl + +.flash/ +.runpod/ diff --git a/03_advanced_workers/05_load_balancer/.env.example b/03_advanced_workers/05_load_balancer/.env.example new file mode 100644 index 0000000..8360712 --- /dev/null +++ b/03_advanced_workers/05_load_balancer/.env.example @@ -0,0 +1,4 @@ +# FLASH_HOST=localhost +# FLASH_PORT=8888 +# LOG_LEVEL=INFO +# RUNPOD_API_KEY=your_api_key_here diff --git a/03_advanced_workers/05_load_balancer/.flashignore b/03_advanced_workers/05_load_balancer/.flashignore new file mode 100644 index 0000000..ea5988c --- /dev/null +++ b/03_advanced_workers/05_load_balancer/.flashignore @@ -0,0 +1,40 @@ +# Flash Build Ignore Patterns + +# Python cache +__pycache__/ +*.pyc + +# Virtual environments +venv/ +.venv/ +env/ + +# IDE +.vscode/ +.idea/ + +# Environment files +.env +.env.local + +# Git +.git/ +.gitignore + +# Build artifacts +dist/ +build/ +*.egg-info/ + +# Flash resources +.tetra_resources.pkl + +# Tests +tests/ +test_*.py +*_test.py + +# Documentation +docs/ +*.md +!README.md diff --git a/03_advanced_workers/05_load_balancer/.gitignore b/03_advanced_workers/05_load_balancer/.gitignore new file mode 100644 index 0000000..9e84778 --- /dev/null +++ b/03_advanced_workers/05_load_balancer/.gitignore @@ -0,0 +1,44 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +.venv/ +ENV/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Environment +.env +.env.local + +# Flash +.tetra_resources.pkl +dist/ + +# OS +.DS_Store +Thumbs.db diff --git a/03_advanced_workers/05_load_balancer/README.md b/03_advanced_workers/05_load_balancer/README.md new file mode 100644 index 0000000..b8b63f6 --- /dev/null +++ b/03_advanced_workers/05_load_balancer/README.md @@ -0,0 +1,502 @@ +# Load Balancer Endpoints Example + +Demonstrates Flash's load-balancer endpoints with custom HTTP routes using the `@remote` decorator with `method` and `path` parameters. This example shows how to create low-latency APIs with direct HTTP routing on a single serverless endpoint. + +## What Are Load-Balancer Endpoints? + +Load-balancer endpoints use direct HTTP routing to serverless workers, providing lower latency compared to queue-based endpoints. They support custom HTTP methods (GET, POST, PUT, DELETE, PATCH) and multiple routes on a single endpoint. + +| Feature | Queue-Based (QB) | Load-Balanced (LB) | +|---------|------------------|-------------------| +| Request model | Sequential queue | Direct HTTP routing | +| Latency | Higher (queuing) | Lower (direct) | +| Custom routes | Limited | Full HTTP support (GET, POST, PUT, DELETE, PATCH) | +| Automatic retries | Yes | No (client handles) | +| Configuration | Default `ServerlessType.QB` | Use `LiveLoadBalancer` or `LoadBalancerSlsResource` | +| Use case | Batch processing, long-running tasks | Real-time APIs, request/response patterns | + +**Load-balancer endpoints are ideal for:** +- Low-latency REST APIs +- Custom HTTP routes with different methods (GET, POST, etc.) +- Request/response patterns that require direct HTTP communication +- Multiple endpoints on a single serverless instance +- Services requiring fast turnaround times + +## Quick Start + +### 1. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +### 2. Configure Environment + +```bash +cp .env.example .env +# Add your RUNPOD_API_KEY to .env +``` + +Get your API key from [Runpod Settings](https://www.runpod.io/console/user/settings). + +### 3. Run Locally (from repository root) + +```bash +flash run +``` + +Visit **http://localhost:8888/docs** for interactive API documentation (unified app with all examples). + +### 4. Test Endpoints (via unified app) + +When using `flash run` from the repository root, routes are prefixed with the example name: + +**GPU Service (Compute)**: +```bash +# Health check +curl http://localhost:8888/05_load_balancer/gpu/health + +# List GPU info +curl http://localhost:8888/05_load_balancer/gpu/info + +# Compute with GPU +curl -X POST http://localhost:8888/05_load_balancer/gpu/compute \ + -H "Content-Type: application/json" \ + -d '{"numbers": [1, 2, 3, 4, 5]}' +``` + +**CPU Service (Data Processing)**: +```bash +# Health check +curl http://localhost:8888/05_load_balancer/cpu/health + +# Validate text +curl -X POST http://localhost:8888/05_load_balancer/cpu/validate \ + -H "Content-Type: application/json" \ + -d '{"text": "Hello world"}' + +# Transform text +curl -X POST http://localhost:8888/05_load_balancer/cpu/transform \ + -H "Content-Type: application/json" \ + -d '{"text": "hello", "operation": "uppercase"}' +``` + +### 5. Run Standalone (for this example only) + +```bash +cd 03_advanced_workers/05_load_balancer +python main.py +``` + +This runs the example on **http://localhost:8000** (default standalone port) without the example name prefix. The unified app uses port 8888, but standalone mode defaults to 8000 unless FLASH_PORT is set. + +## How Load-Balancer Endpoints Work + +### Defining Routes with @remote Decorator + +Load-balancer endpoints use the `@remote` decorator with `method` and `path` parameters to define HTTP routes. The decorator automatically registers the function as an HTTP endpoint on the load-balancer runtime. + +```python +from tetra_rp import remote, LiveLoadBalancer + +# Create load-balanced endpoint (for local development) +lb = LiveLoadBalancer(name="my_service") + +# Define HTTP routes with method and path parameters +@remote(lb, method="GET", path="/health") +async def health_check() -> dict: + """Health check endpoint.""" + return {"status": "healthy"} + +@remote(lb, method="POST", path="/compute") +async def compute_data(numbers: list[int]) -> dict: + """Compute the sum of squared numbers.""" + result = sum(x ** 2 for x in numbers) + return {"result": result} + +@remote(lb, method="GET", path="/info") +async def get_info() -> dict: + """Get service information.""" + return {"info": "service running"} +``` + +**Key parameters for @remote:** +- `method`: HTTP verb (GET, POST, PUT, DELETE, PATCH) +- `path`: Route path (must start with `/`) +- Resource: Use `LiveLoadBalancer` for local development, `LoadBalancerSlsResource` for production deployment + +**How routing works:** +1. Each `@remote` decorated function becomes an HTTP endpoint +2. The `method` parameter specifies the HTTP verb +3. The `path` parameter specifies the URL route +4. When an HTTP request matches the method and path, the function is called with the request data + +### Multiple Routes on One Endpoint + +One load-balanced endpoint can have multiple routes: + +```python +api = LiveLoadBalancer(name="user_api") + +@remote(api, method="GET", path="/users") +async def list_users(): ... + +@remote(api, method="POST", path="/users") +async def create_user(name: str): ... + +@remote(api, method="DELETE", path="/users/{user_id}") +async def delete_user(user_id: int): ... +``` + +All routes are automatically registered on the same load-balanced endpoint. + +### Reserved Paths + +The following paths are reserved and cannot be used: +- `/ping` - Health check endpoint (reserved) +- `/execute` - Framework endpoint (local development only) + +## Project Structure + +``` +05_load_balancer/ +├── main.py # FastAPI application +├── workers/ +│ ├── gpu/ # GPU load-balancer endpoints +│ │ ├── __init__.py # Router with endpoints +│ │ └── endpoint.py # @remote functions +│ └── cpu/ # CPU load-balancer endpoints +│ ├── __init__.py # Router with endpoints +│ └── endpoint.py # @remote functions +├── .env.example # Environment template +├── requirements.txt # Dependencies +└── README.md # This file +``` + +## GPU Service Endpoints + +**Compute-intensive operations on GPU** + +### GET /gpu/health +Health check for GPU service. +```bash +curl http://localhost:8888/05_load_balancer/gpu/health +``` + +### GET /gpu/info +Get GPU availability and device information. +```bash +curl http://localhost:8888/05_load_balancer/gpu/info +``` + +### POST /gpu/compute +Perform compute-intensive operations on GPU. + +Request: +```json +{ + "numbers": [1, 2, 3, 4, 5] +} +``` + +Response: +```json +{ + "status": "success", + "input_count": 5, + "sum_of_squares": 55, + "mean": 3.0, + "max": 5, + "min": 1, + "compute_time_ms": 0.42 +} +``` + +## CPU Service Endpoints + +**Data processing operations** + +### GET /cpu/health +Health check for CPU service. + +### POST /cpu/validate +Validate and analyze text data. + +Request: +```json +{ + "text": "Hello world from load balancer" +} +``` + +Response: +```json +{ + "status": "success", + "is_valid": true, + "character_count": 30, + "word_count": 5, + "average_word_length": 6.0 +} +``` + +### POST /cpu/transform +Transform text with various operations. + +Request: +```json +{ + "text": "hello world", + "operation": "uppercase" +} +``` + +Operations: +- `uppercase` - Convert to uppercase +- `lowercase` - Convert to lowercase +- `reverse` - Reverse the text +- `title` - Convert to title case + +Response: +```json +{ + "status": "success", + "original": "hello world", + "transformed": "HELLO WORLD", + "operation": "uppercase" +} +``` + +## Resource Types + +### LiveLoadBalancer (Local Development) + +`LiveLoadBalancer` is used for local development and testing. It provides all load-balancer features in a development environment without requiring a full deployment. + +```python +from tetra_rp import LiveLoadBalancer, remote + +# Create load-balanced endpoint for local development +lb = LiveLoadBalancer(name="my_api") + +@remote(lb, method="POST", path="/process") +async def process(data: dict) -> dict: + """Process data on the load-balanced endpoint.""" + return {"result": "success", "processed": data} +``` + +**When to use:** +- Local development and testing +- Testing @remote decorated functions before deployment +- Running examples with `flash run` from the repository root + +**Features:** +- Automatically uses the `tetra-rp-lb` container image +- Local execution with `/execute` endpoint for development +- Perfect for testing and debugging +- No GPU/CPU configuration needed (inherits from resource type) + +### LoadBalancerSlsResource (Production Deployment) + +`LoadBalancerSlsResource` is the production resource for deploying load-balancer endpoints to RunPod. + +```python +from tetra_rp import LoadBalancerSlsResource, remote + +# Create load-balanced endpoint for production deployment +lb = LoadBalancerSlsResource( + name="my_api", + imageName="runpod/tetra-rp-lb:latest", + workersMin=1, + workersMax=5, +) + +@remote(lb, method="POST", path="/process") +async def process(data: dict) -> dict: + """Process data on the deployed load-balanced endpoint.""" + return {"result": "success", "processed": data} +``` + +**When to use:** +- Production deployment to RunPod +- Scaling requirements (auto-scaling based on request count) +- Multi-region deployment + +**Features:** +- Direct HTTP routing to healthy workers +- Auto-scaling based on request count (default scaler) +- No `/execute` endpoint (security - direct routes only) +- Client handles retries (no automatic retries) +- Lower latency for request/response patterns +- Custom HTTP routes on a single endpoint + +## Testing Workers Locally + +```bash +# Test GPU worker +python -m workers.gpu.endpoint + +# Test CPU worker +python -m workers.cpu.endpoint +``` + +## Deployment + +### Build for Production + +```bash +flash build +``` + +This generates handlers for your load-balancer endpoints. + +### Deploy to RunPod + +```bash +flash deploy new production +flash deploy send production +``` + +## Local vs Deployed + +The `@remote` decorator with method and path works the same way in both local and production environments. The only difference is the resource configuration. + +**Local Development (LiveLoadBalancer):** +- Use `LiveLoadBalancer` for testing load-balancer endpoints locally +- Automatically uses `tetra-rp-lb` container image +- Includes `/execute` endpoint for development/testing +- Testing via `flash run` or direct Python execution +- Perfect for development, testing, and debugging + +**Production Deployment (LoadBalancerSlsResource):** +- Use `LoadBalancerSlsResource` when deploying to RunPod +- Specifies the container image and scaling parameters +- No `/execute` endpoint (security - direct routes only) +- All execution flows through custom HTTP routes +- Automatic scaling based on request count +- Direct HTTP routing to healthy workers + +**Migration path:** Code remains identical - just change the resource from `LiveLoadBalancer` to `LoadBalancerSlsResource` when deploying to production! + +## Key Concepts + +### Async Functions +All `@remote` functions should be async: +```python +@remote(config, method="POST", path="/process") +async def process_data(input: str) -> dict: + # Your code here + return {"result": "success"} +``` + +### Error Handling +For load-balancer endpoints, raise `ValueError` for validation errors. The framework automatically handles these as HTTP 400 Bad Request responses: +```python +@remote(lb, method="POST", path="/process") +async def process(text: str) -> dict: + if not text: + raise ValueError("text cannot be empty") + if not isinstance(text, str): + raise ValueError("text must be a string") + return {"result": text.upper()} +``` + +**HTTP Error Mapping:** +- `ValueError` → 400 Bad Request +- Other exceptions → 500 Internal Server Error + +### Dependencies +Specify Python dependencies in the decorator: +```python +@remote( + config, + method="POST", + path="/analyze", + dependencies=["torch", "transformers"] +) +async def analyze(data: str) -> dict: + import torch + # Your code here +``` + +## Environment Variables + +```bash +# Required +RUNPOD_API_KEY=your_api_key_here + +# Optional +FLASH_HOST=localhost # Server host (default: localhost) +FLASH_PORT=8000 # Server port (default: 8000) +LOG_LEVEL=INFO # Logging level (default: INFO) +``` + +## Cost Estimates + +Load-balancer endpoints are cost-efficient for request/response patterns: + +**GPU Service (Compute)** +- Instance type: GPU (depends on your configuration) +- Minimum workers: 1 +- Idle timeout: Varies by configuration +- Cost: Pay-per-second while running + +**CPU Service (Data Processing)** +- Instance type: 3 vCPU, 1 GB RAM (CPU3C_1_2) +- Minimum workers: 1 +- Cost: ~$0.0002 per hour per worker (idle) + +**Comparison with queue-based endpoints:** +- Load-balancers: Lower latency, pay for active processing time only +- Queue-based: Higher throughput, automatic retries, better for batch jobs + +For current pricing, see [RunPod Pricing](https://www.runpod.io/pricing). + +## Troubleshooting + +### Load-balancer endpoints not responding + +**Problem**: Endpoints return 502 or timeout +- Ensure workers are properly deployed with `flash deploy` +- Check worker logs via RunPod console +- Verify `method` and `path` parameters match your HTTP requests +- Confirm the resource configuration (GPU/CPU types) is available + +### ValueError not mapping to 400 responses + +**Problem**: Validation errors return 500 instead of 400 +- Ensure you're raising `ValueError` for validation errors +- For custom error types, the framework may return 500 +- Raise `ValueError` for all user input validation errors + +### Workers not starting + +**Problem**: Workers fail to initialize +- Check that all dependencies in `dependencies` parameter are available +- Verify the container image has required system packages +- Check worker function imports and module availability +- Review worker logs in the RunPod console + +### Mixed latency in responses + +**Problem**: Some requests are fast, others are slow +- Load-balancer uses direct HTTP routing (no queue) +- First request to a cold worker will be slower (initialization) +- Adjust `workersMin` to keep workers warm if consistent low latency is critical +- Consider using `idleTimeout` to reduce cold starts + +### Endpoint discovery not working + +**Problem**: Example doesn't load in unified app with `flash run` +- Ensure routers are named `gpu_router` and `cpu_router` +- Verify routers are properly exported in `__init__.py` files +- Check that `main.py` includes routers with `app.include_router()` +- Run `flash run` from the repository root, not the example directory + +## Next Steps + +1. Explore the endpoints via Swagger UI (`/docs`) +2. Modify the `@remote` functions to add your logic +3. Add new routes with different `method` and `path` values +4. Deploy to RunPod when ready +5. Monitor performance and scaling behavior diff --git a/03_advanced_workers/05_load_balancer/main.py b/03_advanced_workers/05_load_balancer/main.py new file mode 100644 index 0000000..7115e75 --- /dev/null +++ b/03_advanced_workers/05_load_balancer/main.py @@ -0,0 +1,55 @@ +import logging +import os + +from fastapi import FastAPI +from workers.cpu import cpu_router +from workers.gpu import gpu_router + +logger = logging.getLogger(__name__) + + +app = FastAPI( + title="Load Balancer Example", + description="Demonstrates load-balancer endpoints with custom HTTP routes using Flash", + version="0.1.0", +) + +# Include routers for unified app discovery +app.include_router(gpu_router, prefix="/gpu", tags=["GPU Compute Service"]) +app.include_router(cpu_router, prefix="/cpu", tags=["CPU Data Service"]) + + +@app.get("/") +def home(): + return { + "message": "Load Balancer Example API", + "description": "Demonstrates load-balancer endpoints with custom HTTP routes", + "note": "Load-balancer endpoints are defined with @remote(resource, method=..., path=...)", + "docs": "/docs", + "gpu_endpoints": { + "health": "GET /gpu/health", + "compute": "POST /gpu/compute", + "info": "GET /gpu/info", + }, + "cpu_endpoints": { + "health": "GET /cpu/health", + "validate": "POST /cpu/validate", + "transform": "POST /cpu/transform", + }, + } + + +@app.get("/ping") +def ping(): + """Health check endpoint for load balancer.""" + return {"status": "healthy"} + + +if __name__ == "__main__": + import uvicorn + + host = os.getenv("FLASH_HOST", "localhost") + port = int(os.getenv("FLASH_PORT", 8000)) + logger.info(f"Starting Load Balancer Example on {host}:{port}") + + uvicorn.run(app, host=host, port=port) diff --git a/03_advanced_workers/05_load_balancer/pyproject.toml b/03_advanced_workers/05_load_balancer/pyproject.toml new file mode 100644 index 0000000..b4e6aa8 --- /dev/null +++ b/03_advanced_workers/05_load_balancer/pyproject.toml @@ -0,0 +1,8 @@ +[project] +name = "load-balancer-example" +version = "0.1.0" +description = "Load-balancer endpoints with custom HTTP routes using Flash" +requires-python = ">=3.11" +dependencies = [ + "tetra_rp", +] diff --git a/03_advanced_workers/05_load_balancer/requirements.txt b/03_advanced_workers/05_load_balancer/requirements.txt new file mode 100644 index 0000000..532a30b --- /dev/null +++ b/03_advanced_workers/05_load_balancer/requirements.txt @@ -0,0 +1 @@ +tetra_rp diff --git a/03_advanced_workers/05_load_balancer/workers/__init__.py b/03_advanced_workers/05_load_balancer/workers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/03_advanced_workers/05_load_balancer/workers/cpu/__init__.py b/03_advanced_workers/05_load_balancer/workers/cpu/__init__.py new file mode 100644 index 0000000..d7f9ef7 --- /dev/null +++ b/03_advanced_workers/05_load_balancer/workers/cpu/__init__.py @@ -0,0 +1,53 @@ +"""CPU Load-Balancer Endpoints + +Load-balancer endpoints use the @remote decorator with method and path parameters. +These decorators automatically create HTTP routes that are registered with the +Load Balancer runtime. + +For the unified app discovery, we export a router that documents the endpoints. +The actual HTTP handling is managed by the tetra-rp-lb framework. +""" + +from fastapi import APIRouter +from pydantic import BaseModel + +from .endpoint import cpu_config, cpu_health, transform_data, validate_data + + +# Pydantic models for request validation +class ValidateRequest(BaseModel): + """Request model for text validation.""" + + text: str + + +class TransformRequest(BaseModel): + """Request model for text transformation.""" + + text: str + operation: str = "uppercase" + + +# Export for unified app discovery +cpu_router = APIRouter() + + +@cpu_router.get("/health") +async def get_cpu_health(): + """CPU service health check.""" + return await cpu_health() + + +@cpu_router.post("/validate") +async def post_validate(request: ValidateRequest): + """Validate and analyze text data.""" + return await validate_data(request.text) + + +@cpu_router.post("/transform") +async def post_transform(request: TransformRequest): + """Transform text data with specified operation.""" + return await transform_data(request.text, request.operation) + + +__all__ = ["cpu_config", "cpu_health", "cpu_router", "transform_data", "validate_data"] diff --git a/03_advanced_workers/05_load_balancer/workers/cpu/endpoint.py b/03_advanced_workers/05_load_balancer/workers/cpu/endpoint.py new file mode 100644 index 0000000..02ca0b5 --- /dev/null +++ b/03_advanced_workers/05_load_balancer/workers/cpu/endpoint.py @@ -0,0 +1,122 @@ +from tetra_rp import CpuInstanceType, CpuLiveLoadBalancer, PodTemplate, remote + +VALID_OPERATIONS = ["uppercase", "lowercase", "reverse", "title"] + +cpu_config = CpuLiveLoadBalancer( + name="03_05_load_balancer_cpu", + workersMin=1, + instanceIds=[CpuInstanceType.CPU3C_1_2], + template=PodTemplate(containerDiskInGb=10), +) + + +@remote(cpu_config, method="GET", path="/health") +async def cpu_health() -> dict: + """Health check endpoint for CPU service.""" + return {"status": "healthy", "service": "cpu"} + + +@remote(cpu_config, method="POST", path="/validate") +async def validate_data(text: str) -> dict: + """Validate and analyze text data. + + Args: + text: Text to validate + + Returns: + Validation results + """ + import time + from datetime import datetime, timezone + + if not text or not text.strip(): + raise ValueError("text cannot be empty") + + start_time = time.time() + + # Simple text analysis + words = text.split() + char_count = len(text) + word_count = len(words) + avg_word_length = char_count / word_count if word_count > 0 else 0 + + process_time = (time.time() - start_time) * 1000 + + return { + "status": "success", + "is_valid": True, + "character_count": char_count, + "word_count": word_count, + "average_word_length": round(avg_word_length, 2), + "process_time_ms": round(process_time, 2), + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + +@remote(cpu_config, method="POST", path="/transform") +async def transform_data(text: str, operation: str = "uppercase") -> dict: + """Transform text data. + + Args: + text: Text to transform + operation: Transform operation (uppercase, lowercase, reverse, title) + + Returns: + Transformed text + """ + import time + from datetime import datetime, timezone + + if not text or not text.strip(): + raise ValueError("text cannot be empty") + + if operation not in VALID_OPERATIONS: + raise ValueError(f"operation must be one of: {', '.join(VALID_OPERATIONS)}") + + start_time = time.time() + result = "" + + # Perform transformation + if operation == "uppercase": + result = text.upper() + elif operation == "lowercase": + result = text.lower() + elif operation == "reverse": + result = text[::-1] + elif operation == "title": + result = text.title() + + process_time = (time.time() - start_time) * 1000 + + return { + "status": "success", + "original": text, + "transformed": result, + "operation": operation, + "process_time_ms": round(process_time, 2), + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + +# Test locally with: python -m workers.cpu.endpoint +if __name__ == "__main__": + import asyncio + + async def test(): + print("Testing CPU worker endpoints...\n") + + print("1. Health check:") + result = await cpu_health() + print(f" {result}\n") + + print("2. Validate text:") + result = await validate_data("Hello world from load balancer") + print(f" Characters: {result['character_count']}") + print(f" Words: {result['word_count']}\n") + + print("3. Transform text:") + result = await transform_data("Hello World", "uppercase") + print(f" Original: {result['original']}") + print(f" Transformed: {result['transformed']}") + + asyncio.run(test()) diff --git a/03_advanced_workers/05_load_balancer/workers/gpu/__init__.py b/03_advanced_workers/05_load_balancer/workers/gpu/__init__.py new file mode 100644 index 0000000..347f5e2 --- /dev/null +++ b/03_advanced_workers/05_load_balancer/workers/gpu/__init__.py @@ -0,0 +1,60 @@ +"""GPU Load-Balancer Endpoints + +Load-balancer endpoints use the @remote decorator with method and path parameters. +These decorators automatically create HTTP routes that are registered with the +Load Balancer runtime. + +For the unified app discovery, we export a router that documents the endpoints. +The actual HTTP handling is managed by the tetra-rp-lb framework. +""" + +from fastapi import APIRouter +from pydantic import BaseModel, field_validator + +from .endpoint import compute_intensive, gpu_config, gpu_health, gpu_info + + +class ComputeRequest(BaseModel): + """Request model for compute-intensive operations.""" + + numbers: list[int] + + @field_validator("numbers") + @classmethod + def validate_numbers(cls, v: list[int]) -> list[int]: + """Validate that numbers list is not empty.""" + if not v: + raise ValueError("numbers list cannot be empty") + return v + + +# Export for unified app discovery +gpu_router = APIRouter() + + +@gpu_router.get("/health") +async def get_gpu_health(): + """GPU service health check.""" + return await gpu_health() + + +@gpu_router.post("/compute") +async def post_gpu_compute(request: ComputeRequest): + """Perform compute-intensive operation on GPU.""" + return await compute_intensive(request.model_dump()) + + +@gpu_router.get("/info") +async def get_gpu_info(): + """Get GPU device information.""" + return await gpu_info() + + +__all__ = [ + "ComputeRequest", + "compute_intensive", + "gpu_config", + "gpu_health", + "gpu_info", + "gpu_router", +] diff --git a/03_advanced_workers/05_load_balancer/workers/gpu/endpoint.py b/03_advanced_workers/05_load_balancer/workers/gpu/endpoint.py new file mode 100644 index 0000000..48ac62a --- /dev/null +++ b/03_advanced_workers/05_load_balancer/workers/gpu/endpoint.py @@ -0,0 +1,93 @@ +from tetra_rp import GpuGroup, LiveLoadBalancer, remote + +gpu_config = LiveLoadBalancer( + name="03_05_load_balancer_gpu", + workersMin=1, + gpus=[GpuGroup.ANY], +) + + +@remote(gpu_config, method="GET", path="/health") +async def gpu_health() -> dict: + """Health check endpoint for GPU service.""" + return {"status": "healthy", "service": "gpu"} + + +@remote(gpu_config, method="POST", path="/compute") +async def compute_intensive(request: dict) -> dict: + """Perform compute-intensive operation on GPU. + + Args: + request: Request dict with numbers to process + + Returns: + Computation results + """ + import time + from datetime import datetime, timezone + + numbers = request.get("numbers", []) + start_time = time.time() + + # Simulate GPU-intensive computation + result = sum(x**2 for x in numbers) + mean = sum(numbers) / len(numbers) + max_val = max(numbers) + min_val = min(numbers) + + compute_time = (time.time() - start_time) * 1000 + + return { + "status": "success", + "input_count": len(numbers), + "sum_of_squares": result, + "mean": mean, + "max": max_val, + "min": min_val, + "compute_time_ms": round(compute_time, 2), + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + +@remote(gpu_config, method="GET", path="/info") +async def gpu_info() -> dict: + """Get GPU availability information.""" + try: + import torch + + if torch.cuda.is_available(): + info = { + "available": True, + "device": torch.cuda.get_device_name(0), + "count": torch.cuda.device_count(), + } + else: + info = {"available": False, "device": "No GPU", "count": 0} + except Exception as e: + info = {"available": False, "device": str(e), "count": 0} + + return info + + +# Test locally with: python -m workers.gpu.endpoint +if __name__ == "__main__": + import asyncio + + async def test(): + print("Testing GPU worker endpoints...\n") + + print("1. Health check:") + result = await gpu_health() + print(f" {result}\n") + + print("2. Compute intensive:") + request_data = {"numbers": [1, 2, 3, 4, 5]} + result = await compute_intensive(request_data) + print(f" Sum of squares: {result['sum_of_squares']}") + print(f" Mean: {result['mean']}\n") + + print("3. GPU Info:") + result = await gpu_info() + print(f" {result}") + + asyncio.run(test()) diff --git a/03_advanced_workers/README.md b/03_advanced_workers/README.md index 80dbefa..3a1607b 100644 --- a/03_advanced_workers/README.md +++ b/03_advanced_workers/README.md @@ -4,6 +4,25 @@ Production-ready worker patterns for building robust, scalable applications. ## Examples +### 05_load_balancer +Load-balancer endpoints with custom HTTP routes. + +**What you'll learn:** +- Creating load-balanced endpoints +- Custom HTTP routing (GET, POST, PUT, DELETE, PATCH) +- Low-latency request/response patterns +- Multiple routes on a single endpoint + +**Use cases:** +- Real-time APIs +- REST services +- Direct HTTP communication +- Low-latency inference services + +**Resources:** +- `LiveLoadBalancer` - Local development +- `LoadBalancerSlsResource` - Production deployment + ### 01_streaming _(coming soon)_ Streaming responses with Server-Sent Events (SSE) and WebSockets. diff --git a/CLAUDE.md b/CLAUDE.md index d39c441..877580f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -59,8 +59,9 @@ make consolidate-deps from tetra_rp import remote, LiveServerless, GpuGroup # Configure resource requirements +# Naming convention: {category}_{example}_{worker_type} gpu_config = LiveServerless( - name="my_example_gpu", + name="01_01_getting_started_gpu", gpus=[GpuGroup.ANY], workersMin=0, workersMax=3, diff --git a/README.md b/README.md index 2c933b2..967fc71 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ Production-ready worker patterns. - 02_batch_processing - Batch inference optimization _(coming soon)_ - 03_caching - Model and result caching strategies _(coming soon)_ - 04_custom_images - Custom Docker images _(coming soon)_ +- **[05_load_balancer](./03_advanced_workers/05_load_balancer/)** - Load-balancer endpoints with custom HTTP routes ### 04 - Scaling & Performance Optimize for production workloads. diff --git a/uv.lock b/uv.lock index 8646b69..8cf10cf 100644 --- a/uv.lock +++ b/uv.lock @@ -3875,7 +3875,7 @@ wheels = [ [[package]] name = "tetra-rp" -version = "0.18.0" +version = "0.19.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "cloudpickle" }, @@ -3887,9 +3887,9 @@ dependencies = [ { name = "runpod" }, { name = "typer" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/85/98/58cbd75b34d7163be860a2b6af47b0f2c7a43626f31f10f89c6327d4a2bd/tetra_rp-0.18.0.tar.gz", hash = "sha256:a4fc9c262d4d6a08bce0f88f683c931c3914ca0db2e530b3125e5667cf15fccc", size = 85404 } +sdist = { url = "https://files.pythonhosted.org/packages/57/56/4210c7f9382749c4ed4c0de004de8705ff13a1a2352e4aea57e561e1edb5/tetra_rp-0.19.0.tar.gz", hash = "sha256:baad99bf3a1f59419b303c367492500e449e6ae50481a371024ec3bfa89791a8", size = 99321 } wheels = [ - { url = "https://files.pythonhosted.org/packages/c9/b4/9aaf1c2113f223099fb21c8a6f242a80b01faa96db57d3ac28679a5ca1cf/tetra_rp-0.18.0-py3-none-any.whl", hash = "sha256:d78fab260e3e31cfb613b29796f1094bd8fe70c59351dbe269ce7f54bae9382e", size = 84284 }, + { url = "https://files.pythonhosted.org/packages/da/ed/5ed3d95f6ba9c2765ac5b7f8ba33a279ac168be86c2bbe0a42607adc74a2/tetra_rp-0.19.0-py3-none-any.whl", hash = "sha256:4c6c230a0d5a7a9236d8e1c39403a5286bf54551535f5eb40259e69c5e4bde26", size = 99156 }, ] [[package]]