Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
88fc534
feat(runtime): Implement DirectoryClient for mothership API integration
deanq Jan 3, 2026
3b4e771
feat(runtime): Implement ServiceRegistry for service discovery
deanq Jan 3, 2026
7bd5e72
feat(runtime): Implement CrossEndpointClient for HTTP execution
deanq Jan 3, 2026
e1d9d45
feat(runtime): Implement ProductionWrapper for cross-endpoint routing
deanq Jan 3, 2026
5ea12b6
integrate(runtime): Wire ProductionWrapper into stub layer
deanq Jan 3, 2026
3986335
test(integration): Add cross-endpoint routing integration tests
deanq Jan 3, 2026
1b194c0
fix: remove unused variables in test files (F841 linting errors)
deanq Jan 3, 2026
1fc33de
refactor: use ServerlessResource directly instead of CrossEndpointClient
deanq Jan 3, 2026
de0f6a6
refactor: move imports to top of conftest.py
deanq Jan 3, 2026
f029b96
refactor: document lessons learned from cross-endpoint routing improv…
deanq Jan 3, 2026
ecd4841
refactor: code quality improvements for cross-endpoint routing
deanq Jan 3, 2026
f93192b
refactor: rename RuntimeError to FlashRuntimeError to avoid shadowing…
deanq Jan 3, 2026
1c6e6f9
docs: add comprehensive cross-endpoint routing documentation
deanq Jan 5, 2026
0508c84
Merge branch 'main' into deanq/ae-1348-cross-endpoint-routing
deanq Jan 8, 2026
86a0cac
Merge branch 'main' into deanq/ae-1348-cross-endpoint-routing
deanq Jan 8, 2026
56a4397
Merge branch 'main' into deanq/ae-1348-cross-endpoint-routing
deanq Jan 8, 2026
48bdfce
refactor: extract serialization logic into shared utility module
deanq Jan 9, 2026
9aa2ab2
refactor: rename DirectoryClient to ManifestClient for clarity
deanq Jan 9, 2026
68fb480
refactor: convert manifest to type-safe dataclass structure
deanq Jan 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: extract serialization logic into shared utility module
Create new src/tetra_rp/runtime/serialization.py with reusable functions
for cloudpickle + base64 encoding/decoding to eliminate duplication across
6 production files:
- serialize_arg(), serialize_args(), serialize_kwargs()
- deserialize_arg(), deserialize_args(), deserialize_kwargs()

This addresses the PR #129 comment to refactor duplicated serialization
code. All serialization now goes through a single, consistent interface
with proper error handling via SerializationError.

Updated files:
- production_wrapper.py: Use serialize_args/kwargs
- live_serverless.py: Use serialize_args/kwargs
- execute_class.py: Use serialize_args/kwargs for constructor and method args
- generic_handler.py: Use deserialize/serialize utilities
- lb_handler.py: Use deserialize/serialize for /execute endpoint
- load_balancer_sls.py: Use serialize/deserialize for HTTP-based stub

All 581 tests passing. Code coverage: 65.37%.
  • Loading branch information
deanq committed Jan 9, 2026
commit 48bdfce50f9440fa9c76dadccd39edc2dc328e23
34 changes: 8 additions & 26 deletions src/tetra_rp/execute_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
prevent memory leaks through LRU eviction.
"""

import base64
import hashlib
import inspect
import logging
Expand All @@ -20,6 +19,8 @@
from .core.utils.constants import HASH_TRUNCATE_LENGTH, UUID_FALLBACK_LENGTH
from .core.utils.lru_cache import LRUCache
from .protos.remote_execution import FunctionRequest
from .runtime.exceptions import SerializationError
from .runtime.serialization import serialize_args, serialize_kwargs
from .stubs import stub_resource

log = logging.getLogger(__name__)
Expand All @@ -30,14 +31,7 @@

def serialize_constructor_args(args, kwargs):
"""Serialize constructor arguments for caching."""
serialized_args = [
base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args
]
serialized_kwargs = {
k: base64.b64encode(cloudpickle.dumps(v)).decode("utf-8")
for k, v in kwargs.items()
}
return serialized_args, serialized_kwargs
return serialize_args(args), serialize_kwargs(kwargs)


def get_or_cache_class_data(
Expand Down Expand Up @@ -65,7 +59,7 @@ def get_or_cache_class_data(

log.debug(f"Cached class data for {cls.__name__} with key: {cache_key}")

except (TypeError, AttributeError, OSError) as e:
except (TypeError, AttributeError, OSError, SerializationError) as e:
log.warning(
f"Could not serialize constructor arguments for {cls.__name__}: {e}"
)
Expand Down Expand Up @@ -267,14 +261,8 @@ async def method_proxy(*args, **kwargs):
cached_data = _SERIALIZED_CLASS_CACHE.get(self._cache_key)

# Serialize method arguments (these change per call, so no caching)
method_args = [
base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8")
for arg in args
]
method_kwargs = {
k: base64.b64encode(cloudpickle.dumps(v)).decode("utf-8")
for k, v in kwargs.items()
}
method_args = serialize_args(args)
method_kwargs = serialize_kwargs(kwargs)

# Handle constructor args - use cached if available, else serialize fresh
if cached_data["constructor_args"] is not None:
Expand All @@ -284,14 +272,8 @@ async def method_proxy(*args, **kwargs):
else:
# Constructor args couldn't be cached due to serialization issues
# Serialize them fresh for each method call (fallback behavior)
constructor_args = [
base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8")
for arg in self._constructor_args
]
constructor_kwargs = {
k: base64.b64encode(cloudpickle.dumps(v)).decode("utf-8")
for k, v in self._constructor_kwargs.items()
}
constructor_args = serialize_args(self._constructor_args)
constructor_kwargs = serialize_kwargs(self._constructor_kwargs)

request = FunctionRequest(
execution_type="class",
Expand Down
14 changes: 4 additions & 10 deletions src/tetra_rp/runtime/generic_handler.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""Generic RunPod serverless handler factory for Flash."""

import base64
import json
import logging
import traceback
from pathlib import Path
from typing import Any, Callable, Dict

import cloudpickle
from .serialization import deserialize_args, deserialize_kwargs, serialize_arg

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,13 +63,8 @@ def deserialize_arguments(job_input: Dict[str, Any]) -> tuple[list, dict]:
Returns:
Tuple of (args list, kwargs dict) deserialized from cloudpickle
"""
args = [
cloudpickle.loads(base64.b64decode(arg)) for arg in job_input.get("args", [])
]
kwargs = {
k: cloudpickle.loads(base64.b64decode(v))
for k, v in job_input.get("kwargs", {}).items()
}
args = deserialize_args(job_input.get("args", []))
kwargs = deserialize_kwargs(job_input.get("kwargs", {}))
return args, kwargs


Expand All @@ -83,7 +77,7 @@ def serialize_result(result: Any) -> str:
Returns:
Base64-encoded cloudpickle of result
"""
return base64.b64encode(cloudpickle.dumps(result)).decode("utf-8")
return serialize_arg(result)


def execute_function(
Expand Down
44 changes: 16 additions & 28 deletions src/tetra_rp/runtime/lb_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
Users should NOT expose the /execute endpoint to untrusted clients.
"""

import base64
import inspect
import logging
from typing import Any, Callable, Dict

import cloudpickle
from fastapi import FastAPI, Request

from .serialization import (
deserialize_args,
deserialize_kwargs,
serialize_arg,
)

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -94,29 +98,15 @@ async def execute_remote_function(request: Request) -> Dict[str, Any]:
}

# Deserialize arguments
args = []
for arg_b64 in body.get("args", []):
try:
arg = cloudpickle.loads(base64.b64decode(arg_b64))
args.append(arg)
except Exception as e:
logger.error(f"Failed to deserialize argument: {e}")
return {
"success": False,
"error": f"Failed to deserialize argument: {e}",
}

kwargs = {}
for key, val_b64 in body.get("kwargs", {}).items():
try:
val = cloudpickle.loads(base64.b64decode(val_b64))
kwargs[key] = val
except Exception as e:
logger.error(f"Failed to deserialize kwarg '{key}': {e}")
return {
"success": False,
"error": f"Failed to deserialize kwarg '{key}': {e}",
}
try:
args = deserialize_args(body.get("args", []))
kwargs = deserialize_kwargs(body.get("kwargs", {}))
except Exception as e:
logger.error(f"Failed to deserialize arguments: {e}")
return {
"success": False,
"error": f"Failed to deserialize arguments: {e}",
}

# Execute function in isolated namespace
namespace: Dict[str, Any] = {}
Expand Down Expand Up @@ -160,9 +150,7 @@ async def execute_remote_function(request: Request) -> Dict[str, Any]:

# Serialize result
try:
result_b64 = base64.b64encode(cloudpickle.dumps(result)).decode(
"utf-8"
)
result_b64 = serialize_arg(result)
return {"success": True, "result": result_b64}
except Exception as e:
logger.error(f"Failed to serialize result: {e}")
Expand Down
21 changes: 4 additions & 17 deletions src/tetra_rp/runtime/production_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
"""Production wrapper for cross-endpoint function routing."""

import base64
import logging
from typing import Any, Callable, Dict, Optional

import cloudpickle

from tetra_rp.core.resources.serverless import ServerlessResource

from .exceptions import RemoteExecutionError, SerializationError
from .exceptions import RemoteExecutionError
from .serialization import serialize_args, serialize_kwargs
from .service_registry import ServiceRegistry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -174,22 +172,11 @@ async def _execute_remote(
Execution result.

Raises:
SerializationError: If serialization fails.
RemoteExecutionError: If remote execution fails.
"""
# Serialize arguments
try:
serialized_args = [
base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args
]
serialized_kwargs = {
k: base64.b64encode(cloudpickle.dumps(v)).decode("utf-8")
for k, v in kwargs.items()
}
except Exception as e:
raise SerializationError(
f"Failed to serialize arguments for {function_name}: {e}"
) from e
serialized_args = serialize_args(args)
serialized_kwargs = serialize_kwargs(kwargs)

# Build payload matching RunPod format
payload = {
Expand Down
124 changes: 124 additions & 0 deletions src/tetra_rp/runtime/serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Shared serialization utilities for cloudpickle + base64 encoding."""

import base64
from typing import Any, Dict, List

import cloudpickle

from .exceptions import SerializationError


def serialize_arg(arg: Any) -> str:
"""Serialize single argument with cloudpickle + base64.

Args:
arg: Argument to serialize.

Returns:
Base64-encoded cloudpickle serialized string.

Raises:
SerializationError: If serialization fails.
"""
try:
return base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8")
except Exception as e:
raise SerializationError(f"Failed to serialize argument: {e}") from e


def serialize_args(args: tuple) -> List[str]:
"""Serialize positional arguments.

Args:
args: Tuple of arguments to serialize.

Returns:
List of base64-encoded serialized arguments.

Raises:
SerializationError: If serialization fails.
"""
try:
return [serialize_arg(arg) for arg in args]
except SerializationError:
raise
except Exception as e:
raise SerializationError(f"Failed to serialize args: {e}") from e


def serialize_kwargs(kwargs: dict) -> Dict[str, str]:
"""Serialize keyword arguments.

Args:
kwargs: Dictionary of keyword arguments.

Returns:
Dictionary with base64-encoded serialized values.

Raises:
SerializationError: If serialization fails.
"""
try:
return {k: serialize_arg(v) for k, v in kwargs.items()}
except SerializationError:
raise
except Exception as e:
raise SerializationError(f"Failed to serialize kwargs: {e}") from e


def deserialize_arg(arg_b64: str) -> Any:
"""Deserialize single base64-encoded cloudpickle argument.

Args:
arg_b64: Base64-encoded serialized argument.

Returns:
Deserialized argument.

Raises:
SerializationError: If deserialization fails.
"""
try:
return cloudpickle.loads(base64.b64decode(arg_b64))
except Exception as e:
raise SerializationError(f"Failed to deserialize argument: {e}") from e


def deserialize_args(args_b64: List[str]) -> List[Any]:
"""Deserialize list of base64-encoded arguments.

Args:
args_b64: List of base64-encoded serialized arguments.

Returns:
List of deserialized arguments.

Raises:
SerializationError: If deserialization fails.
"""
try:
return [deserialize_arg(arg) for arg in args_b64]
except SerializationError:
raise
except Exception as e:
raise SerializationError(f"Failed to deserialize args: {e}") from e


def deserialize_kwargs(kwargs_b64: Dict[str, str]) -> Dict[str, Any]:
"""Deserialize dict of base64-encoded keyword arguments.

Args:
kwargs_b64: Dictionary with base64-encoded serialized values.

Returns:
Dictionary with deserialized values.

Raises:
SerializationError: If deserialization fails.
"""
try:
return {k: deserialize_arg(v) for k, v in kwargs_b64.items()}
except SerializationError:
raise
except Exception as e:
raise SerializationError(f"Failed to deserialize kwargs: {e}") from e
10 changes: 3 additions & 7 deletions src/tetra_rp/stubs/live_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
FunctionResponse,
RemoteExecutorStub,
)
from ..runtime.serialization import serialize_args, serialize_kwargs

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -100,14 +101,9 @@ def prepare_request(

# Serialize arguments using cloudpickle
if args:
request["args"] = [
base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args
]
request["args"] = serialize_args(args)
if kwargs:
request["kwargs"] = {
k: base64.b64encode(cloudpickle.dumps(v)).decode("utf-8")
for k, v in kwargs.items()
}
request["kwargs"] = serialize_kwargs(kwargs)

return FunctionRequest(**request)

Expand Down
Loading