Skip to content

Add blue-green materialization and staleness tracking for graphs#587

Merged
jfrench9 merged 3 commits into
mainfrom
feature/materialization-improvements
Apr 4, 2026
Merged

Add blue-green materialization and staleness tracking for graphs#587
jfrench9 merged 3 commits into
mainfrom
feature/materialization-improvements

Conversation

@jfrench9
Copy link
Copy Markdown
Member

@jfrench9 jfrench9 commented Apr 4, 2026

Summary

Introduces a blue-green materialization strategy for graph databases along with staleness tracking, ensuring zero-downtime data refreshes. This feature allows graphs to be materialized into a staging copy and atomically swapped into production, preventing readers from encountering partially-built or inconsistent data.

Key Accomplishments

Blue-Green Materialization

  • Materialization lock system (materialization_lock.py): Implements a distributed locking mechanism to ensure only one materialization process runs at a time, preventing race conditions during concurrent refresh attempts.
  • Database swap endpoint (routers/databases/swap.py): New API route to atomically swap a freshly materialized staging database into the active production slot, enabling seamless blue-green deployments.
  • Ladybug manager enhancements (core/ladybug/manager.py): Extended the graph manager to orchestrate the full blue-green lifecycle — build into staging, validate, and swap.

Staleness Tracking

  • Staleness detection (operations/extensions/staleness.py): New module to track when materialized data becomes stale relative to upstream source changes, enabling intelligent re-materialization decisions.
  • Ledger integration (routers/ledger/reports.py, routers/ledger/schedules.py): Reports and schedules endpoints now surface staleness metadata so consumers can assess data freshness.

Dagster Sensor for Automated Materialization

  • Materialization sensor (dagster/sensors/materialization.py): A new Dagster sensor that monitors for stale graphs and automatically triggers re-materialization jobs, integrated into the Dagster definitions.

MCP Tooling

  • Materialization tools (middleware/mcp/tools/materialization_tools.py): Exposes materialization operations (trigger, status, swap) as MCP tools, enabling AI agents and middleware consumers to manage graph refreshes programmatically.

Graph API Client Extensions

  • Extended the graph API client (client/client.py) with methods to interact with the new swap and materialization lock endpoints.

QuickBooks Pipeline Integration

  • Updated the QuickBooks load pipeline to participate in the staleness tracking lifecycle, marking downstream graphs as stale after upstream data ingestion.

Breaking Changes

None. All changes are additive. Existing materialization behavior is preserved; the blue-green swap is an opt-in enhancement layered on top.

Testing

Comprehensive test coverage added across all new components (5 new test files, 524+ lines of tests):

  • test_materialization_sensor.py — Validates sensor triggers on stale graphs and no-ops when data is fresh.
  • test_materialization_lock.py — Tests lock acquisition, re-entrancy prevention, and cleanup on failure.
  • test_swap.py — Exercises the swap endpoint including validation, rollback scenarios, and concurrent swap rejection.
  • test_materialization_tools.py — Verifies MCP tool registration and correct dispatch of materialization operations.
  • test_staleness.py — Confirms staleness detection logic against various upstream change scenarios.

Infrastructure Considerations

  • The materialization lock mechanism may require a shared state backend (e.g., database row or distributed lock) in multi-instance deployments — ensure the deployment environment supports the chosen locking strategy.
  • The Dagster sensor introduces a new polling workload; review sensor interval configuration to balance freshness requirements against system load.
  • The new swap endpoint performs an atomic database reference switch — verify that the underlying storage layer supports this operation in your deployment target.

🤖 Generated with Claude Code

Branch Info:

  • Source: feature/materialization-improvements
  • Target: main
  • Type: feature

Co-Authored-By: Claude noreply@anthropic.com

- Implemented blue-green materialization for LadybugDB databases, allowing for seamless promotion of WIP databases to active status with minimal downtime.
- Introduced a new staleness tracking mechanism that marks graphs as stale after OLTP writes, ensuring the AI and UI can accurately reflect the freshness of data.
- Added endpoints for swapping and rolling back databases, enhancing database management capabilities.
- Created a materialization sensor to automatically trigger materialization jobs for stale graphs.
- Updated various components to support the new materialization and staleness features, including the GraphClient and LadybugDatabaseManager.
- Enhanced error handling and logging throughout the materialization process for better traceability.
@jfrench9
Copy link
Copy Markdown
Member Author

jfrench9 commented Apr 4, 2026

@claude please review this PR

@claude
Copy link
Copy Markdown

claude Bot commented Apr 4, 2026

Claude finished @jfrench9's task in 3m 30s —— View job


PR Review: Add blue-green materialization and staleness tracking for graphs

  • Gather context and understand the request
  • Read core new files (materialization lock, swap router, staleness, sensor)
  • Read modified files (manager, client, MCP tools, operations)
  • Read test files
  • Analyze and post comprehensive review

Overall this is well-architected. The blue-green pattern is sound, the lock design is correct, and the test coverage is solid. A few issues need attention before merge.

Bugs

1. mark_graph_stale is synchronous but called from async route handlers — blocks the event loop

staleness.py:18 defines mark_graph_stale as a synchronous function that opens a DB session and commits. It's called directly in async FastAPI handlers in reports.py and schedules.py, plus the QuickBooks Dagster op. A blocking DB call in an async context will stall the event loop for all concurrent requests.

Fix: wrap in asyncio.to_thread at the call sites, or make it async with async with SessionFactory() as session.

Fix this →

2. sync_status states "materializing" and "failed" are advertised but never returned

materialization_tools.py:36 — the tool description says sync_status can be "fresh" | "stale" | "materializing" | "failed", but execute() only ever returns "fresh" or "stale". There's no code path that returns the other two states. This will mislead AI agents that rely on the tool description.

Fix this →

3. Sensor cursor permanently blocks graphs that fail to rematerialize

materialization.py:107active_ids = submitted_ids & still_stale. If a materialization Dagster job fails and does not clear the staleness flag (i.e., mark_fresh is never called), the graph stays in still_stale and stays in active_ids, preventing re-submission indefinitely. There's no timeout or retry cap.

Fix this →

4. MaterializeGraphTool uses hasattr(env, "API_BASE_URL") — fragile config access

materialization_tools.py:155:

api_base = (
  env.API_BASE_URL if hasattr(env, "API_BASE_URL") else "http://localhost:8000"
)

The CLAUDE.md rule is to use the centralized config directly (env.SOME_VAR). Using hasattr implies API_BASE_URL isn't guaranteed to be in the config schema, meaning the fallback to localhost:8000 could silently take effect in production if the env var is missing. This should be a first-class config field with proper validation, or use the existing env.API_INTERNAL_URL / equivalent.

Fix this →


Design Issues

5. MaterializationLock has no async context manager

materialization_lock.py — distributed locks should implement __aenter__/__aexit__ to guarantee release on exceptions. Without it, callers must wrap every acquire in a try/finally, and any missed finally leaves the lock held until the 1-hour TTL expires. This is particularly risky during the _materialize_blue_green exception path.

async def __aenter__(self):
    if not await self.acquire():
        raise RuntimeError(f"Could not acquire materialization lock for {self.lock_key}")
    return self

async def __aexit__(self, *args):
    await self.release()

Fix this →

6. Rollback endpoint is not usable after a successful swap

manager.py:swap_database Step 6 unconditionally deletes -prev on success. The rollback_database endpoint and the router docstring ("renamed to -prev (rollback safety)") imply the previous version is preserved for rollback, but it isn't. Calling /rollback immediately after a successful swap returns 404.

The rollback endpoint is only useful for a partial failure between rename steps — which is an extremely narrow window. Either:

  • Keep -prev around and rely on the explicit delete call (make Step 6 opt-in), or
  • Update the docstring on both the router and the swap method to clarify that rollback is only for partial failures, not a general post-swap undo.

7. Inline imports of mark_graph_stale in router functions

In reports.py, schedules.py, and quickbooks/pipeline/load.py, mark_graph_stale is imported inside function bodies rather than at the module top. Per Python convention, imports belong at the top of the file. This also makes static analysis (mypy/basedpyright) less effective.


Sensor

8. run_key uses now.isoformat() — defeats Dagster deduplication

materialization.py:89:

run_key=f"stale_materialize_{graph_id}_{now.isoformat()}"

Dagster uses run_key to prevent duplicate runs. Since now changes every sensor tick, Dagster won't deduplicate, and if the sensor fires twice in quick succession (e.g., during a restart), it could submit two concurrent materializations for the same graph — exactly what the lock is supposed to prevent. Consider using the graph's graph_stale_at timestamp as part of the key instead of now.

Fix this →


Test Coverage Gaps

9. test_swap_moves_wal_files doesn't assert the WAL ends up at the right place

test_swap.py:72-83 — the test only verifies the WIP WAL is gone. It should also assert (db_dir / "kg123.lbug.wal").read_text() == "wip-wal" to confirm the WAL was promoted, not just deleted.

10. Missing @pytest.mark.unit on new test classes

All five new test files are missing the @pytest.mark.unit decorator on their test classes/functions, per the testing conventions in CLAUDE.md. Without the marker, just test (unit tests only) may not pick them up depending on marker filtering configuration.


Nits

  • materialization_lock.py:84import asyncio and import time inside acquire() should be at the module top level; these aren't conditional imports.
  • materialization_tools.pyGetGraphSyncStatusTool and MaterializeGraphTool import httpx, SessionFactory, and Graph inside execute(). These should move to the module level.
  • test_materialization_tools.py:47 — patches robosystems.database.SessionFactory but the tool imports it from robosystems.database. Verify the patch target matches the actual import path in materialization_tools.py (which is from robosystems.database import SessionFactory).

Summary

The core blue-green implementation is solid — the file-rename swap, WAL handling, connection pool cleanup, and automatic rollback on failure are all correct. The main items blocking merge are:

Priority Issue
🔴 Bug Sync mark_graph_stale called from async route handlers
🔴 Bug Sensor cursor permanently blocks graphs that fail to rematerialize
🟡 Design sync_status missing states vs. description
🟡 Design hasattr(env, "API_BASE_URL") — needs proper config entry
🟡 Design MaterializationLock needs __aenter__/__aexit__
🟡 Design run_key uses now — defeats Dagster deduplication
🟢 Minor Inline imports, rollback docs, test markers, WAL assertion

Comment thread robosystems/middleware/mcp/tools/materialization_tools.py Fixed
Comment thread robosystems/operations/extensions/materialize.py Fixed
Comment thread robosystems/operations/extensions/materialize.py Fixed
- Added JSON cursor handling and expiry logic to the stale graph materialization sensor, improving the management of in-progress materializations.
- Updated the swap endpoint to include materialization lock verification, ensuring safe promotion of WIP databases to active status.
- Removed rollback functionality from the database manager and related endpoints, streamlining the database management process.
- Enhanced error handling and logging throughout the materialization and database operations for better traceability and reliability.
- Updated tests to cover new cursor logic and database management changes, ensuring robust functionality.
@jfrench9
Copy link
Copy Markdown
Member Author

jfrench9 commented Apr 4, 2026

@claude can you review the latest changes

@claude
Copy link
Copy Markdown

claude Bot commented Apr 4, 2026

Claude finished @jfrench9's task in 2m 31s —— View job


PR Review: Latest Changes

  • Gather context and understand the request
  • Check git diff to see what changed since last review
  • Read changed files
  • Post comprehensive review

Good progress on the second commit — most of the issues from the previous review are resolved. One newly introduced bug needs attention before merge.

What Was Fixed

Previous Issue Status
mark_graph_stale sync in async handlers ✅ Fixed — run_in_executor added
sync_status advertised states don't exist ✅ Fixed — description trimmed to "fresh" | "stale"
Sensor cursor permanently blocks failed graphs ✅ Fixed — JSON cursor with 2-hour expiry
hasattr(env, "API_BASE_URL") fragile config ✅ Fixed — env.ROBOSYSTEMS_API_URL used directly
MaterializationLock missing __aenter__/__aexit__ ✅ Fixed
run_key uses now — defeats Dagster deduplication ✅ Fixed — uses graph.graph_stale_at
WAL assertion missing in swap test ✅ Fixed — read_text() == "wip" added
Rollback docs inconsistency ✅ Fixed — rollback removed entirely (clean choice)
Empty except: pass blocks ✅ Fixed — now logs with logger.warning
-wip/-prev counted toward capacity limits ✅ Fixed — list_databases() now filters them out

Bugs

1. Double-lock acquisition causes 409 on every blue-green materialization

This is a newly introduced regression. _materialize_blue_green in materialize.py:910-930 acquires the materialization lock, then calls client.swap_database(graph_id) at line 953. The swap endpoint (swap.py:70-76) tries to acquire the same lock when no X-Materialization-Lock-Token header is present — and since the caller already holds it, the acquire fails and the endpoint returns 409 CONFLICT.

In other words: the operations layer acquires the lock, then calls the swap endpoint, which tries to acquire the same lock and gets blocked by itself.

The fix is to thread the lock token through. The client should forward the token via the header:

# client/client.py — add lock_token parameter
async def swap_database(self, graph_id: str, lock_token: str | None = None) -> dict[str, Any]:
    headers = {}
    if lock_token:
        headers["X-Materialization-Lock-Token"] = lock_token
    response = await self._request("POST", f"/databases/{graph_id}/swap", headers=headers)
    return response.json()

And in materialize.py:953, pass the token:

await client.swap_database(graph_id, lock_token=lock.token if lock else None)

Fix this →


Design Issues

2. asyncio.get_event_loop().run_in_executor() — use get_running_loop() instead

reports.py:298-300 and schedules.py:118-120 use asyncio.get_event_loop(). Inside an async function there is always a running loop, so the correct API is asyncio.get_running_loop(). get_event_loop() is deprecated in Python 3.10+ in contexts without a running loop, and while it won't fail here, it's the wrong call. Also, the returned Future is not awaited — this is intentional (fire-and-forget), but any exception from mark_graph_stale will be silently dropped. Since staleness is advisory, this is acceptable — but worth a comment.

Preferred:

asyncio.get_running_loop().run_in_executor(None, mark_graph_stale, graph_id, "report_generated")

Or more idiomatically in modern Python: asyncio.create_task(asyncio.to_thread(mark_graph_stale, graph_id, "report_generated")).

Fix this →

3. from_token trusts the caller without Valkey verification

materialization_lock.py:162-176from_token() creates a lock with _acquired = True without checking Valkey to confirm the token is actually held. A caller passing any arbitrary token bypasses the 409 guard. This is a low risk internally (the endpoint is internal-only), but the method contract is misleading — the docstring says "for passthrough" but it's really just a trust-the-caller shortcut.

This is acceptable for now, but consider renaming to from_trusted_token() or adding a note to the docstring that this does not verify ownership.


Nits

  • materialization.py:73from dateutil import parser as date_parser is inside a loop. Python caches imports so there's no repeated disk I/O, but it's unusual style. Moving it to the top of the function body (with the other deferred imports) would be cleaner.
  • materialization.py:80 — the bare except Exception: pass for malformed cursor entries has no logging — inconsistent with the rest of the file where empty except blocks were cleaned up. A logger.debug here would help debug corrupted cursors.
  • @pytest.mark.unit still missing from all 5 new test files — flagged in the previous review, still unaddressed. Without the marker, just test may not include these tests depending on the marker filter config. Add @pytest.mark.unit to each test class.
  • The inline imports at the top of _materialize_blue_green (from robosystems.graph_api.core.ladybug.materialization_lock import MaterializationLock and Valkey imports) were previously noted — they haven't been moved to module level.

Summary

The core architecture is solid. The critical item before merge is bug #1 — every normal blue-green materialization will fail with 409 because the lock is re-acquired by the swap endpoint while the caller already holds it. That's a straightforward fix in the client and the call site. Everything else is minor.

Priority Issue
🔴 Bug swap_database re-acquires lock already held by _materialize_blue_green → 409 on every swap
🟡 Design asyncio.get_event_loop() → use get_running_loop()
🟡 Design from_token() trusts caller without verifying against Valkey
🟢 Minor Missing @pytest.mark.unit markers on all 5 new test files
🟢 Minor Inline import inside loop, unlogged cursor drop

- Renamed `from_token` method to `from_trusted_token` in the MaterializationLock class to clarify its purpose and usage.
- Enhanced error handling in the stale graph materialization sensor by adding detailed logging for malformed cursor entries.
- Updated the GraphClient's swap_database method to accept an optional lock token, ensuring safe database swaps without re-acquiring locks.
- Adjusted related tests to reflect the new method name and improved functionality, ensuring robust coverage for materialization lock handling.
@jfrench9
Copy link
Copy Markdown
Member Author

jfrench9 commented Apr 4, 2026

@claude all issues resolved now?

@claude
Copy link
Copy Markdown

claude Bot commented Apr 4, 2026

Claude finished @jfrench9's task in 1m 41s —— View job


PR Review: Status Check

  • Gather context
  • Read current state of files with outstanding issues
  • Post updated review

Almost everything is fixed. All critical and design issues are resolved — only three minor nits remain.

What's Fixed Since Last Review

Previous Issue Status
🔴 swap_database re-acquires lock → 409 on every swap ✅ Fixed — lock_token threaded through client and call site
🟡 asyncio.get_event_loop() → use get_running_loop() ✅ Fixed — both reports.py:298 and schedules.py:118 now use get_running_loop()
🟡 from_token() trusts caller without Valkey verification ✅ Fixed — renamed to from_trusted_token() with explicit docstring warning
Code quality: empty except blocks in materialize.py ✅ Fixed — both cleanup paths now log warnings
Code quality: empty except in materialization_tools.py ✅ Fixed — logs logger.warning with the offending value
Sensor: dateutil import inside loop ✅ Fixed — moved above the for loop
Sensor: malformed cursor drop has no logging ✅ Fixed — logger.debug added

Still Open (Minor)

1. @pytest.mark.unit still missing from all 5 new test files

Flagged in both previous reviews. Without the marker, just test may skip these depending on the marker filter:

  • tests/dagster/test_materialization_sensor.py
  • tests/graph_api/core/ladybug/test_materialization_lock.py
  • tests/graph_api/core/ladybug/test_swap.py
  • tests/middleware/mcp/tools/test_materialization_tools.py
  • tests/operations/extensions/test_staleness.py

Fix this →

2. import asyncio / import time still inside acquire()materialization_lock.py:84-85

These are unconditional imports that belong at the module top. Minor, but was flagged twice.

3. Inline imports inside execute() methods in materialization_tools.py

GetGraphSyncStatusTool.execute() imports SessionFactory, Graph, and dateutil inline (lines 46-48, 78). MaterializeGraphTool.execute() imports httpx and env inline (lines 149, 151). Flagged in both reviews — still unaddressed.


Summary

The three remaining items are all style nits with no functional impact. The core logic — blue-green swap, lock passthrough, cursor expiry, async-safe staleness marking, Dagster deduplication — is all correct. This is ready to merge if you're OK leaving the nits for a follow-up.

@jfrench9 jfrench9 merged commit 16c63e3 into main Apr 4, 2026
7 checks passed
@jfrench9 jfrench9 deleted the feature/materialization-improvements branch April 4, 2026 19:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant