Skip to content
Prev Previous commit
Next Next commit
use graphql mutations for updating manifest resources
  • Loading branch information
jhcipar committed Jan 15, 2026
commit 6ce2d8002958915714a472c09ef13f3bf8b1449c
95 changes: 38 additions & 57 deletions src/tetra_rp/runtime/state_manager_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,18 @@ async def get_persisted_manifest(
Raises:
ManifestServiceUnavailableError: If State Manager unavailable after retries.
"""
if httpx is None:
raise ImportError(
"httpx required for StateManagerClient. Install with: pip install httpx"
)

last_exception: Optional[Exception] = None

for attempt in range(self.max_retries):
try:
async with RunpodGraphQLClient() as client:
environment = await client.get_flash_environment({"flashEnvironmentId": mothership_id})
build_id = environment.get("activeBuildId")
if not build_id:
raise ValueError(f"active build for environment {mothership_id} not found")
build = await client.get_flash_build(build_id)

manifest = build.get("manifest")
if not manifest:
raise ManifestServiceUnavailableError(f"manifest not found for build {build["id"]}")
_, manifest = await self._fetch_build_and_manifest(
client, mothership_id
)

logger.debug(f"Persisted manifest loaded for {mothership_id}")
return manifest


except (
asyncio.TimeoutError,
ManifestServiceUnavailableError,
Expand Down Expand Up @@ -128,28 +116,20 @@ async def update_resource_state(
Raises:
ManifestServiceUnavailableError: If State Manager unavailable.
"""
if httpx is None:
raise ImportError(
"httpx required for StateManagerClient. Install with: pip install httpx"
)

last_exception: Optional[Exception] = None

for attempt in range(self.max_retries):
try:
client = await self._get_client()
response = await client.put(
f"{self.base_url}/api/v1/flash/manifests/{mothership_id}/resources/{resource_name}",
headers={"Authorization": f"Bearer {self.api_key}"},
json=resource_data,
timeout=self.timeout,
)

if response.status_code >= 400:
raise ManifestServiceUnavailableError(
f"State Manager returned {response.status_code}: "
f"{response.text[:200]}"
async with RunpodGraphQLClient() as client:
build_id, manifest = await self._fetch_build_and_manifest(
client, mothership_id
)
resources = manifest.setdefault("resources", {})
existing = resources.get(resource_name)
if not isinstance(existing, dict):
existing = {}
resources[resource_name] = {**existing, **resource_data}
await client.update_build_manifest(build_id, manifest)

logger.debug(
f"Updated resource state in State Manager: {mothership_id}/{resource_name}"
Expand Down Expand Up @@ -188,27 +168,18 @@ async def remove_resource_state(
Raises:
ManifestServiceUnavailableError: If State Manager unavailable.
"""
if httpx is None:
raise ImportError(
"httpx required for StateManagerClient. Install with: pip install httpx"
)

last_exception: Optional[Exception] = None

for attempt in range(self.max_retries):
try:
client = await self._get_client()
response = await client.delete(
f"{self.base_url}/api/v1/flash/manifests/{mothership_id}/resources/{resource_name}",
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=self.timeout,
)

if response.status_code >= 400:
raise ManifestServiceUnavailableError(
f"State Manager returned {response.status_code}: "
f"{response.text[:200]}"
async with RunpodGraphQLClient() as client:
build_id, manifest = await self._fetch_build_and_manifest(
client, mothership_id
)
resources = manifest.get("resources") or {}
resources.pop(resource_name, None)
manifest["resources"] = resources
await client.update_build_manifest(build_id, manifest)

logger.debug(
f"Removed resource state from State Manager: {mothership_id}/{resource_name}"
Expand All @@ -235,18 +206,28 @@ async def remove_resource_state(
f"{last_exception}"
)

async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client with proper configuration."""
if self._client is None or self._client.is_closed:
timeout = httpx.Timeout(self.timeout)
self._client = httpx.AsyncClient(timeout=timeout)

return self._client
async def _fetch_build_and_manifest(
self, client: RunpodGraphQLClient, mothership_id: str
) -> tuple[str, Dict[str, Any]]:
environment = await client.get_flash_environment(
{"flashEnvironmentId": mothership_id}
)
build_id = environment.get("activeBuildId")
if not build_id:
raise ManifestServiceUnavailableError(
f"active build for environment {mothership_id} not found"
)
build = await client.get_flash_build(build_id)
manifest = build.get("manifest")
if not manifest:
raise ManifestServiceUnavailableError(
f"manifest not found for build {build.get('id', build_id)}"
)
return build_id, manifest

async def close(self) -> None:
"""Close HTTP session."""
if self._client and not self._client.is_closed:
await self._client.aclose()
return

async def __aenter__(self):
"""Async context manager entry."""
Expand Down
Loading