Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
88fc534
feat(runtime): Implement DirectoryClient for mothership API integration
deanq Jan 3, 2026
3b4e771
feat(runtime): Implement ServiceRegistry for service discovery
deanq Jan 3, 2026
7bd5e72
feat(runtime): Implement CrossEndpointClient for HTTP execution
deanq Jan 3, 2026
e1d9d45
feat(runtime): Implement ProductionWrapper for cross-endpoint routing
deanq Jan 3, 2026
5ea12b6
integrate(runtime): Wire ProductionWrapper into stub layer
deanq Jan 3, 2026
3986335
test(integration): Add cross-endpoint routing integration tests
deanq Jan 3, 2026
1b194c0
fix: remove unused variables in test files (F841 linting errors)
deanq Jan 3, 2026
1fc33de
refactor: use ServerlessResource directly instead of CrossEndpointClient
deanq Jan 3, 2026
de0f6a6
refactor: move imports to top of conftest.py
deanq Jan 3, 2026
f029b96
refactor: document lessons learned from cross-endpoint routing improv…
deanq Jan 3, 2026
ecd4841
refactor: code quality improvements for cross-endpoint routing
deanq Jan 3, 2026
f93192b
refactor: rename RuntimeError to FlashRuntimeError to avoid shadowing…
deanq Jan 3, 2026
1c6e6f9
docs: add comprehensive cross-endpoint routing documentation
deanq Jan 5, 2026
0508c84
Merge branch 'main' into deanq/ae-1348-cross-endpoint-routing
deanq Jan 8, 2026
86a0cac
Merge branch 'main' into deanq/ae-1348-cross-endpoint-routing
deanq Jan 8, 2026
56a4397
Merge branch 'main' into deanq/ae-1348-cross-endpoint-routing
deanq Jan 8, 2026
48bdfce
refactor: extract serialization logic into shared utility module
deanq Jan 9, 2026
9aa2ab2
refactor: rename DirectoryClient to ManifestClient for clarity
deanq Jan 9, 2026
68fb480
refactor: convert manifest to type-safe dataclass structure
deanq Jan 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
943 changes: 943 additions & 0 deletions docs/Cross_Endpoint_Routing.md

Large diffs are not rendered by default.

55 changes: 40 additions & 15 deletions src/tetra_rp/cli/commands/build_utils/handler_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import importlib.util
import logging
from pathlib import Path
from typing import Any, Dict, List
from typing import Any, Dict, List, Union

from tetra_rp.runtime.models import Manifest

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -38,39 +40,60 @@
class HandlerGenerator:
"""Generates handler_<name>.py files for each resource config."""

def __init__(self, manifest: Dict[str, Any], build_dir: Path):
def __init__(self, manifest: Union[Dict[str, Any], Manifest], build_dir: Path):
self.manifest = manifest
self.build_dir = build_dir

def generate_handlers(self) -> List[Path]:
"""Generate all handler files for queue-based (non-LB) resources."""
handler_paths = []

for resource_name, resource_data in self.manifest.get("resources", {}).items():
# Handle both dict and Manifest types
resources = (
self.manifest.resources
if isinstance(self.manifest, Manifest)
else self.manifest.get("resources", {})
)

for resource_name, resource_data in resources.items():
# Skip load-balanced resources (handled by LBHandlerGenerator)
if resource_data.get("resource_type") == "LoadBalancerSlsResource":
resource_type = (
resource_data.resource_type
if hasattr(resource_data, "resource_type")
else resource_data.get("resource_type")
)
if resource_type == "LoadBalancerSlsResource":
continue

handler_path = self._generate_handler(resource_name, resource_data)
handler_paths.append(handler_path)

return handler_paths

def _generate_handler(
self, resource_name: str, resource_data: Dict[str, Any]
) -> Path:
def _generate_handler(self, resource_name: str, resource_data: Any) -> Path:
"""Generate a single handler file."""
handler_filename = f"handler_{resource_name}.py"
handler_path = self.build_dir / handler_filename

# Get timestamp from manifest
timestamp = self.manifest.get("generated_at", "")
timestamp = (
self.manifest.generated_at
if isinstance(self.manifest, Manifest)
else self.manifest.get("generated_at", "")
)

# Get functions from resource (handle both dict and ResourceConfig)
functions = (
resource_data.functions
if hasattr(resource_data, "functions")
else resource_data.get("functions", [])
)

# Generate imports section
imports = self._generate_imports(resource_data.get("functions", []))
imports = self._generate_imports(functions)

# Generate function registry
registry = self._generate_registry(resource_data.get("functions", []))
registry = self._generate_registry(functions)

# Format template
handler_code = HANDLER_TEMPLATE.format(
Expand All @@ -87,7 +110,7 @@ def _generate_handler(

return handler_path

def _generate_imports(self, functions: List[Dict[str, Any]]) -> str:
def _generate_imports(self, functions: List[Any]) -> str:
"""Generate import statements for functions using dynamic imports.

Uses importlib.import_module() to handle module names with invalid
Expand All @@ -98,24 +121,26 @@ def _generate_imports(self, functions: List[Dict[str, Any]]) -> str:

imports = []
for func in functions:
module = func.get("module")
name = func.get("name")
# Handle both dict and FunctionMetadata
module = func.module if hasattr(func, "module") else func.get("module")
name = func.name if hasattr(func, "name") else func.get("name")

if module and name:
# Use dynamic import to handle invalid identifiers
imports.append(f"{name} = importlib.import_module('{module}').{name}")

return "\n".join(imports) if imports else "# No functions to import"

def _generate_registry(self, functions: List[Dict[str, Any]]) -> str:
def _generate_registry(self, functions: List[Any]) -> str:
"""Generate function registry dictionary."""
if not functions:
return " # No functions registered"

registry_lines = []

for func in functions:
name = func.get("name")
# Handle both dict and FunctionMetadata
name = func.name if hasattr(func, "name") else func.get("name")
registry_lines.append(f' "{name}": {name},')

return "\n".join(registry_lines)
Expand Down
72 changes: 53 additions & 19 deletions src/tetra_rp/cli/commands/build_utils/lb_handler_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import importlib.util
import logging
from pathlib import Path
from typing import Any, Dict, List
from typing import Any, Dict, List, Union

from tetra_rp.runtime.models import Manifest

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -58,17 +60,28 @@ def ping():
class LBHandlerGenerator:
"""Generates FastAPI handlers for LoadBalancerSlsResource endpoints."""

def __init__(self, manifest: Dict[str, Any], build_dir: Path):
def __init__(self, manifest: Union[Dict[str, Any], Manifest], build_dir: Path):
self.manifest = manifest
self.build_dir = build_dir

def generate_handlers(self) -> List[Path]:
"""Generate all LB handler files."""
handler_paths = []

for resource_name, resource_data in self.manifest.get("resources", {}).items():
# Handle both dict and Manifest types
resources = (
self.manifest.resources
if isinstance(self.manifest, Manifest)
else self.manifest.get("resources", {})
)

for resource_name, resource_data in resources.items():
# Generate for both LiveLoadBalancer (local dev) and LoadBalancerSlsResource (deployed)
resource_type = resource_data.get("resource_type")
resource_type = (
resource_data.resource_type
if hasattr(resource_data, "resource_type")
else resource_data.get("resource_type")
)
if resource_type not in ["LoadBalancerSlsResource", "LiveLoadBalancer"]:
continue

Expand All @@ -77,26 +90,39 @@ def generate_handlers(self) -> List[Path]:

return handler_paths

def _generate_handler(
self, resource_name: str, resource_data: Dict[str, Any]
) -> Path:
def _generate_handler(self, resource_name: str, resource_data: Any) -> Path:
"""Generate a single FastAPI handler file."""
handler_filename = f"handler_{resource_name}.py"
handler_path = self.build_dir / handler_filename

# Get timestamp from manifest
timestamp = self.manifest.get("generated_at", "")
timestamp = (
self.manifest.generated_at
if isinstance(self.manifest, Manifest)
else self.manifest.get("generated_at", "")
)

# Determine if /execute endpoint should be included
# LiveLoadBalancer (local dev) includes /execute, deployed LoadBalancerSlsResource does not
resource_type = resource_data.get("resource_type", "LoadBalancerSlsResource")
resource_type = (
resource_data.resource_type
if hasattr(resource_data, "resource_type")
else resource_data.get("resource_type", "LoadBalancerSlsResource")
)
include_execute = resource_type == "LiveLoadBalancer"

# Get functions from resource (handle both dict and ResourceConfig)
functions = (
resource_data.functions
if hasattr(resource_data, "functions")
else resource_data.get("functions", [])
)

# Generate imports section
imports = self._generate_imports(resource_data.get("functions", []))
imports = self._generate_imports(functions)

# Generate route registry
registry = self._generate_route_registry(resource_data.get("functions", []))
registry = self._generate_route_registry(functions)

# Format template
handler_code = LB_HANDLER_TEMPLATE.format(
Expand All @@ -114,27 +140,28 @@ def _generate_handler(

return handler_path

def _generate_imports(self, functions: List[Dict[str, Any]]) -> str:
def _generate_imports(self, functions: List[Any]) -> str:
"""Generate import statements for functions.

Args:
functions: List of function metadata dicts
functions: List of function metadata (dicts or FunctionMetadata objects)

Returns:
Import statements as string
"""
imports = []

for func in functions:
module = func.get("module")
name = func.get("name")
# Handle both dict and FunctionMetadata
module = func.module if hasattr(func, "module") else func.get("module")
name = func.name if hasattr(func, "name") else func.get("name")

if module and name:
imports.append(f"from {module} import {name}")

return "\n".join(imports) if imports else "# No functions to import"

def _generate_route_registry(self, functions: List[Dict[str, Any]]) -> str:
def _generate_route_registry(self, functions: List[Any]) -> str:
"""Generate route registry for FastAPI app.

Creates mapping of (method, path) tuples to function names.
Expand All @@ -151,9 +178,16 @@ def _generate_route_registry(self, functions: List[Dict[str, Any]]) -> str:
registry_lines = []

for func in functions:
name = func.get("name")
method = func.get("http_method")
path = func.get("http_path")
# Handle both dict and FunctionMetadata
name = func.name if hasattr(func, "name") else func.get("name")
method = (
func.http_method
if hasattr(func, "http_method")
else func.get("http_method")
)
path = (
func.http_path if hasattr(func, "http_path") else func.get("http_path")
)

if name and method and path:
# Create tuple key: ("GET", "/api/process")
Expand Down
34 changes: 8 additions & 26 deletions src/tetra_rp/execute_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
prevent memory leaks through LRU eviction.
"""

import base64
import hashlib
import inspect
import logging
Expand All @@ -20,6 +19,8 @@
from .core.utils.constants import HASH_TRUNCATE_LENGTH, UUID_FALLBACK_LENGTH
from .core.utils.lru_cache import LRUCache
from .protos.remote_execution import FunctionRequest
from .runtime.exceptions import SerializationError
from .runtime.serialization import serialize_args, serialize_kwargs
from .stubs import stub_resource

log = logging.getLogger(__name__)
Expand All @@ -30,14 +31,7 @@

def serialize_constructor_args(args, kwargs):
"""Serialize constructor arguments for caching."""
serialized_args = [
base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args
]
serialized_kwargs = {
k: base64.b64encode(cloudpickle.dumps(v)).decode("utf-8")
for k, v in kwargs.items()
}
return serialized_args, serialized_kwargs
return serialize_args(args), serialize_kwargs(kwargs)


def get_or_cache_class_data(
Expand Down Expand Up @@ -65,7 +59,7 @@ def get_or_cache_class_data(

log.debug(f"Cached class data for {cls.__name__} with key: {cache_key}")

except (TypeError, AttributeError, OSError) as e:
except (TypeError, AttributeError, OSError, SerializationError) as e:
log.warning(
f"Could not serialize constructor arguments for {cls.__name__}: {e}"
)
Expand Down Expand Up @@ -267,14 +261,8 @@ async def method_proxy(*args, **kwargs):
cached_data = _SERIALIZED_CLASS_CACHE.get(self._cache_key)

# Serialize method arguments (these change per call, so no caching)
method_args = [
base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8")
for arg in args
]
method_kwargs = {
k: base64.b64encode(cloudpickle.dumps(v)).decode("utf-8")
for k, v in kwargs.items()
}
method_args = serialize_args(args)
method_kwargs = serialize_kwargs(kwargs)

# Handle constructor args - use cached if available, else serialize fresh
if cached_data["constructor_args"] is not None:
Expand All @@ -284,14 +272,8 @@ async def method_proxy(*args, **kwargs):
else:
# Constructor args couldn't be cached due to serialization issues
# Serialize them fresh for each method call (fallback behavior)
constructor_args = [
base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8")
for arg in self._constructor_args
]
constructor_kwargs = {
k: base64.b64encode(cloudpickle.dumps(v)).decode("utf-8")
for k, v in self._constructor_kwargs.items()
}
constructor_args = serialize_args(self._constructor_args)
constructor_kwargs = serialize_kwargs(self._constructor_kwargs)

request = FunctionRequest(
execution_type="class",
Expand Down
12 changes: 12 additions & 0 deletions src/tetra_rp/runtime/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Configuration constants for runtime module."""

# HTTP client configuration
DEFAULT_REQUEST_TIMEOUT = 10 # seconds
DEFAULT_MAX_RETRIES = 3
DEFAULT_BACKOFF_BASE = 2

# Directory cache configuration
DEFAULT_CACHE_TTL = 300 # seconds

# Serialization limits
MAX_PAYLOAD_SIZE = 10 * 1024 * 1024 # 10MB
31 changes: 31 additions & 0 deletions src/tetra_rp/runtime/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Custom exceptions for cross-endpoint runtime."""


class FlashRuntimeError(Exception):
"""Base exception for runtime errors in cross-endpoint execution."""

pass


class RemoteExecutionError(FlashRuntimeError):
"""Raised when remote function execution fails."""

pass


class SerializationError(FlashRuntimeError):
"""Raised when serialization or deserialization of arguments fails."""

pass


class ManifestError(FlashRuntimeError):
"""Raised when manifest is invalid, missing, or has unexpected structure."""

pass


class ManifestServiceUnavailableError(FlashRuntimeError):
"""Raised when manifest directory service is unavailable."""

pass
Loading
Loading