From 76bed5737b666694a45b3612ede5379badfabb7f Mon Sep 17 00:00:00 2001 From: colombod Date: Fri, 19 Jun 2026 13:41:50 +0000 Subject: [PATCH] fix(neo4j): self-healing cross-session edge writes + unified :Node identity (v4.0.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The chunked-flush edge writer built every cross-session edge with two MATCH clauses, silently dropping edges when endpoints were not yet committed. Fix: MERGE both endpoints to create :Node placeholders, re-key node identity on the universal :Node label so placeholders converge with later typed writes, add a :Node(node_id, workspace) uniqueness constraint as the atomicity guard for concurrent MERGEs, make the previously-dead universal-:Node backfill reachable, and drop the legacy plain index. The :Node label was introduced in PR #19 but its backfill shipped unreachable, leaving pre-#19 nodes untagged. The migration at startup (dedup → backfill → verify → drop legacy index → :Node constraint) safely brings the production graph into the state B′ assumes, before any write. Full operator runbook in docs/node-identity-migration.md. Bumps server to v4.0.2. Evidence: - tests/neo4j: 69 passed (includes RED→GREEN silent-drop proof + dirty-graph migration test) - Non-neo4j unit suite: 1373 passed, 2 skipped - ruff: clean on all changed files 🤖 Generated with [Amplifier](https://github.com/microsoft/amplifier) Co-Authored-By: Amplifier <240397093+microsoft-amplifier@users.noreply.github.com> --- context_intelligence_server/neo4j_store.py | 192 ++++++++++--- docs/node-identity-migration.md | 93 +++++++ pyproject.toml | 2 +- tests/neo4j/test_node_identity_migration.py | 151 +++++++++++ tests/neo4j/test_silent_edge_drop.py | 286 ++++++++++++++++++++ tests/routers/test_version.py | 8 +- tests/test_neo4j_store.py | 76 ++++-- uv.lock | 2 +- 8 files changed, 742 insertions(+), 68 deletions(-) create mode 100644 docs/node-identity-migration.md create mode 100644 tests/neo4j/test_node_identity_migration.py create mode 100644 tests/neo4j/test_silent_edge_drop.py diff --git a/context_intelligence_server/neo4j_store.py b/context_intelligence_server/neo4j_store.py index 15e74c4..e1a5c04 100644 --- a/context_intelligence_server/neo4j_store.py +++ b/context_intelligence_server/neo4j_store.py @@ -133,22 +133,32 @@ def _edge_merge_cypher(edge_type: str) -> str: - """Return the UNWIND edge-MERGE query for *edge_type*. - - src/dst are matched via the universal :Node label so each MATCH uses the - composite :Node(node_id, workspace) index (NodeIndexSeek) instead of an - AllNodesScan over the whole graph — the root cause of the multi-second - "Running" edge transactions on the 1.3M-node graph. The relationship - MERGE/SET logic is unchanged from the original label-free query. + """Return the UNWIND edge-MERGE query for *edge_type* — self-healing endpoints. + + src/dst are ``MERGE``d (not ``MATCH``ed) on the universal :Node label, then the + relationship is ``MERGE``d. ``MERGE`` on the composite :Node(node_id, workspace) + key uses a NodeIndexSeek (idx_node_universal / the :Node uniqueness constraint), + never an AllNodesScan, so the 1.3M-node index-seek behaviour from #19 is kept. + + Why MERGE and not MATCH: the old ``MATCH (src) MATCH (dst)`` was an inner join + that SILENTLY dropped the edge whenever either endpoint was not yet committed + (the HAS_SUBSESSION parent-absent race — but pervasive: SOURCED_FROM and ~20 + other edge types legitimately write edges to not-yet-committed endpoints). + ``MERGE`` creates a bare ``:Node`` placeholder for an absent endpoint instead + of dropping the edge; the placeholder converges with the later typed write + because every node writer keys identity on ``:Node`` (Session/Event/etc. add + their type label via ``SET``), backed by the :Node(node_id, workspace) + uniqueness constraint so concurrent MERGEs stay atomic. Never-silent without + aborting the legitimate eventual-consistency ingest. *edge_type* MUST already be validated via ``_validate_identifier`` by the caller (it is interpolated into the Cypher, so it cannot be user input). """ return ( "UNWIND $rows AS row " - f"MATCH (src:{_UNIVERSAL_NODE_LABEL} " + f"MERGE (src:{_UNIVERSAL_NODE_LABEL} " "{node_id: row.src_id, workspace: $workspace}) " - f"MATCH (dst:{_UNIVERSAL_NODE_LABEL} " + f"MERGE (dst:{_UNIVERSAL_NODE_LABEL} " "{node_id: row.dst_id, workspace: $workspace}) " f"MERGE (src)-[r:{edge_type}]->(dst) " "SET r += row.props" @@ -309,16 +319,36 @@ async def ensure_neo4j_schema( """ async with driver.session(database=database) as session: # ------------------------------------------------------------------ - # Step 1: deduplicate any pre-existing duplicate Session and Event nodes. - # For each duplicate (node_id, workspace) group keep the first node - # (as returned by collect()) and DETACH DELETE the remainder. - # This MUST run before constraint creation so a dirty graph does not - # cause the CREATE CONSTRAINT statement to fail. Both Session (Step 3) - # and Event (Step 4) carry a uniqueness constraint, so both must be - # deduplicated here or the corresponding constraint retry never - # converges. + # Step 1: deduplicate any pre-existing duplicate (node_id, workspace) + # nodes. This MUST run before constraint creation so a dirty graph does + # not cause the CREATE CONSTRAINT statements (Session, Event, and the + # universal :Node constraint in Step 6) to fail. + # + # Three passes: + # 1a. GLOBAL by (node_id, workspace) across EVERY label — required for + # the Step-6 :Node uniqueness constraint, which spans all node + # types (Session/Event/OrchestratorRun/Step/ToolExecution/...). + # Keeps the RICHEST node (most labels) so a fully-typed node always + # wins over a bare :Node placeholder; DETACH DELETE the rest. + # Dups here come from the #19 dead-backfill bug (an indexed + # MERGE (n:Node {...}) duplicated legacy untagged nodes). + # 1b/1c. Session / Event specifically — kept for clarity and because + # their per-label uniqueness constraints (Steps 3/4) still apply. + # (Redundant after 1a, but cheap and explicit.) + # All passes are best-effort (non-fatal) and do not affect the return. # ------------------------------------------------------------------ try: + await session.run( + "MATCH (n) " + "WITH n.node_id AS nid, n.workspace AS ws, collect(n) AS nodes " + "WHERE size(nodes) > 1 " + # keep the node carrying the most labels (typed > bare placeholder) + "WITH nodes, reduce(best = nodes[0], x IN nodes | " + "CASE WHEN size(labels(x)) > size(labels(best)) THEN x ELSE best END) " + "AS keep " + "UNWIND [x IN nodes WHERE x <> keep] AS duplicate " + "DETACH DELETE duplicate" + ) await session.run( "MATCH (s:Session) " "WITH s.node_id AS nid, s.workspace AS ws, collect(s) AS nodes " @@ -414,14 +444,15 @@ async def _create_index(statement: str) -> bool: and fully_established ) - # Composite index backing the universal-label node MERGE. The - # non-Session node write does MERGE (n:Node {node_id, workspace}); this - # index turns that into a NodeIndexSeek instead of an AllNodesScan over - # the whole graph (root cause of the 1.3M-node drain stall). - await _create_index( - f"CREATE INDEX idx_node_universal IF NOT EXISTS " - f"FOR (n:{_UNIVERSAL_NODE_LABEL}) ON (n.node_id, n.workspace)" - ) + # NOTE: the composite :Node(node_id, workspace) lookup index that backs the + # universal-label node MERGE (NodeIndexSeek, not AllNodesScan — the + # 1.3M-node drain-stall fix) is now provided by the :Node UNIQUENESS + # CONSTRAINT created in Step 6 below (a uniqueness constraint carries its + # own backing range index). A standalone ``CREATE INDEX idx_node_universal`` + # here would CONFLICT with that constraint ("a constraint cannot be created + # until the index has been dropped"), so it is intentionally NOT created + # here; Step 6 drops any legacy idx_node_universal first, then creates the + # constraint. # ------------------------------------------------------------------ # Steps 3 & 4: uniqueness constraints on Session then Event nodes. @@ -496,8 +527,6 @@ async def _create_constraint(session: Any, name: str, statement: str) -> bool: and fully_established ) - return fully_established - # ------------------------------------------------------------------ # Step 5: backfill the universal :Node label onto pre-existing nodes. # The non-Session node MERGE targets (n:Node {node_id, workspace}) so it @@ -506,13 +535,23 @@ async def _create_constraint(session: Any, name: str, statement: str) -> bool: # would CREATE a duplicate of them rather than match them. Tag every # such node first. # + # This MUST run BEFORE the :Node uniqueness constraint (Step 6): the + # constraint cannot be created over a graph that still has untagged nodes + # that would become duplicate (node_id, workspace) :Node pairs, and the + # re-keyed Session/edge MERGEs on :Node must find pre-existing nodes + # rather than fork their identity. + # # Batched + idempotent: CALL { ... } IN TRANSACTIONS commits in chunks # (so 1.3M nodes don't blow the per-transaction memory cap), and the # WHERE NOT n:Node guard makes re-runs converge to a no-op once the - # graph is fully tagged. Must run BEFORE any flush MERGEs on :Node — - # ensure_neo4j_schema is awaited at the top of _flush_body, so it does. + # graph is fully tagged. ensure_neo4j_schema is awaited at the top of + # _flush_body, so this runs before any flush MERGEs on :Node. # The batch size must be a literal in Cypher, so it is interpolated from # the in-process int constant (never user input). + # + # NOTE (was a latent bug): this block previously sat after an early + # ``return fully_established`` and was therefore DEAD CODE — the backfill + # never ran, so legacy untagged nodes silently duplicated on re-write. try: await session.run( cast( @@ -529,6 +568,83 @@ async def _create_constraint(session: Any, name: str, statement: str) -> bool: exc, ) + # Backfill verification (observability): the migration's "is it done?" + # signal. A non-zero remaining-untagged count means the universal :Node + # identity is NOT yet established graph-wide, so the Step-6 constraint may + # fail and the re-keyed writers can still fork a legacy node's identity. + # Surfaced LOUD (never-silent) so an incomplete migration is visible in + # logs rather than discovered via duplicate nodes. + try: + rec = await ( + await session.run( + f"MATCH (n) WHERE NOT n:{_UNIVERSAL_NODE_LABEL} " + "RETURN count(n) AS remaining" + ) + ).single() + remaining = int(rec["remaining"]) if rec is not None else 0 + if remaining: + _LOG.warning( + "ensure_neo4j_schema: :Node backfill incomplete — %d node(s) " + "still lack the :Node label; the universal-identity migration " + "is NOT complete (re-runs next schema init). Do not rely on the " + ":Node uniqueness constraint until this reaches 0.", + remaining, + ) + else: + _LOG.info( + "ensure_neo4j_schema: :Node backfill complete (0 untagged nodes)." + ) + except Exception as exc: # noqa: BLE001 - observability only, never break init + _LOG.debug( + "ensure_neo4j_schema: backfill verification count skipped (benign): %s", + exc, + ) + + # ------------------------------------------------------------------ + # Step 6: universal :Node identity uniqueness constraint. + # Every node writer now keys identity on :Node (Session/Event/etc. add + # their type label via SET), and the cross-session edge writer MERGEs + # bare :Node endpoint placeholders. This constraint is the atomicity + # guard that keeps concurrent MERGEs on (node_id, workspace) from + # creating divergent duplicate nodes — the role the :Session constraint + # used to play for the Session MERGE. Created AFTER the Step-5 backfill + # so no pre-existing untagged duplicate blocks it. + # + # OPERATIONAL GATE (prod, tracked separately): on the live 1.3M-node + # graph, run + verify the backfill completes (count `WHERE NOT n:Node` + # -> 0) before relying on this constraint, and do not enable the re-keyed + # writers if the constraint is absent. In the test/fresh-DB path the + # backfill is a no-op and this constraint is created up-front. + # + # A uniqueness constraint carries its OWN backing range index, and Neo4j + # refuses to create it while a standalone index on the same (label, + # properties) exists ("a constraint cannot be created until the index has + # been dropped"). #19 shipped a plain `idx_node_universal` index on + # :Node(node_id, workspace); drop it first (IF EXISTS, idempotent) so the + # constraint can take over the seek role. The Step-5 backfill + Step-1 + # dedup ran first, so the graph satisfies uniqueness before we drop the + # index and create the constraint. + # ------------------------------------------------------------------ + try: + await session.run("DROP INDEX idx_node_universal IF EXISTS") + except (Neo4jError, DriverError) as exc: # pragma: no cover - tolerate + _LOG.debug( + "ensure_neo4j_schema: DROP INDEX idx_node_universal skipped " + "(benign): %s", + exc, + ) + fully_established = ( + await _create_constraint( + session, + "Node", + "CREATE CONSTRAINT node_node_id_workspace_unique IF NOT EXISTS " + "FOR (n:Node) REQUIRE (n.node_id, n.workspace) IS UNIQUE", + ) + and fully_established + ) + + return fully_established + def _serialized_row_size(value: Any) -> int: """Return a cheap conservative proxy for the serialized byte size of *value*. @@ -657,18 +773,24 @@ async def _write_batch( {"node_id": node_id, "labels": sorted(set(labels))} ) - # Session nodes: MERGE by Session label + uniqueness constraint (atomic under concurrency) + # Session nodes: MERGE by the universal :Node identity (atomic under concurrency + # via the :Node(node_id, workspace) uniqueness constraint); the :Session type + # label is added via SET. # Lock-order hygiene: sort by stable key so every writer acquires node locks in one # global order, preventing the out-of-order lock cycle that causes deadlocks. session_rows.sort(key=lambda r: r["node_id"]) if session_rows: - # MERGE key stays :Session (backed by the uniqueness constraint); the - # universal :Node label is added via SET so every node carries it and - # the backfill converges permanently (no need to re-tag new sessions). + # MERGE key is :Node (NOT :Session) so a bare :Node placeholder created by a + # cross-session edge write (_edge_merge_cypher MERGEs its endpoints) CONVERGES + # with this typed write instead of splitting into two nodes. Identity is + # purely (node_id, workspace) on :Node — every node carries :Node, and the + # :Node uniqueness constraint (ensure_neo4j_schema) makes concurrent MERGEs + # atomic, the role the :Session constraint used to play for this MERGE. res = await tx.run( "UNWIND $rows AS row " - "MERGE (n:Session {node_id: row.node_id, workspace: row.props.workspace}) " - f"SET n += row.props, n:{_UNIVERSAL_NODE_LABEL}", + f"MERGE (n:{_UNIVERSAL_NODE_LABEL} " + "{node_id: row.node_id, workspace: row.props.workspace}) " + "SET n += row.props, n:Session", rows=session_rows, ) await res.consume() diff --git a/docs/node-identity-migration.md b/docs/node-identity-migration.md new file mode 100644 index 0000000..751be3c --- /dev/null +++ b/docs/node-identity-migration.md @@ -0,0 +1,93 @@ +# Migration: universal `:Node` identity (ships with the B′ silent-edge-drop fix) + +This change makes node **identity** key on the universal `:Node(node_id, workspace)` +label instead of per-type labels (`:Session`, `:Event`, …). It is required by the +cross-session edge-write fix: the edge writer now `MERGE`s its endpoints (so it never +silently drops an edge), and a bare `:Node` placeholder it creates must **converge** with +the later typed write rather than fork identity. Convergence only holds if every node +writer keys on `:Node`, backed by a `:Node` uniqueness constraint. + +> **Why a migration at all:** a fresh DB builds this schema before any data exists, so it +> is trivially safe (that is the path the test suite exercises). The **live graph already +> holds millions of nodes written under the old schema** — and #19's `:Node` backfill +> shipped as dead code, so many of them have **no `:Node` label**. Pointing the re-keyed +> writers at that graph before it is migrated would duplicate nodes. This runbook closes +> that window. + +## What the code does automatically (`ensure_neo4j_schema`) + +On schema init (startup / first flush), idempotently and **in this order**: + +1. **Dedup** duplicate `(node_id, workspace)` nodes — **globally across all labels** + (keeping the richest/most-typed node), plus the existing `:Session`/`:Event` passes. + Required so the uniqueness constraints can be created on a graph the dead-backfill bug + already dirtied. +2. Create the per-type indexes and `:Session`/`:Event` uniqueness constraints (unchanged). +3. **Backfill** the `:Node` label onto every untagged node (`CALL … IN TRANSACTIONS OF + 10 000 ROWS`). +4. **Verify** + log the remaining-untagged count (LOUD `WARNING` if `> 0`, `INFO` `= 0`). +5. **Drop** the legacy `idx_node_universal` plain index (a uniqueness constraint cannot + coexist with a standalone index on the same key). +6. **Create** the `:Node(node_id, workspace)` uniqueness constraint (its backing index + restores the NodeIndexSeek that #19's plain index provided). + +`ensure_neo4j_schema` returns `True` (and the store latches `_schema_initialized`) **only +when every constraint is established**; otherwise it retries on the next flush. + +## Required operator procedure (two-phase deploy) + +The automatic path is correct on a healthy, single-startup DB. On the **live 1.3M-node +graph**, run it as an explicit, verified migration **before** the re-keyed writers take +production traffic — do not discover a partial migration via duplicate nodes. + +**Phase A — migrate + verify (no behaviour change to ingest):** +The migration is purely additive under the OLD code (it only tags `:Node` + adds a +constraint; the old `:Session`-keyed writer still works with the constraint present). + +1. Take a backup / snapshot of the Neo4j volume. +2. Drive `ensure_neo4j_schema` to completion (deploy this build to a single worker, or run + the steps as a maintenance script) and **watch the logs** for: + `:Node backfill complete (0 untagged nodes)` and successful creation of + `node_node_id_workspace_unique`. +3. **Verify** (read-only) — both must hold before Phase B: + ```cypher + MATCH (n) WHERE NOT n:Node RETURN count(n) AS untagged; // expect 0 + SHOW CONSTRAINTS YIELD name WHERE name = 'node_node_id_workspace_unique' + RETURN count(*) AS present; // expect 1 + ``` + Sizing (read-only, run first to estimate the backfill): + ```cypher + MATCH (n) WHERE NOT n:Node RETURN count(n); // backfill size + MATCH (n) WITH n.node_id AS id, n.workspace AS ws, count(*) AS c + WHERE c > 1 RETURN count(*); // dup groups to clear + ``` + +**Phase B — enable the re-keyed writers:** roll out the full build to all workers. With +the constraint present, the `:Node`-keyed Session writer and the MERGE-endpoint edge +writer behave correctly and concurrently-safely. + +## Failure modes & rollback + +| Symptom | Cause | Action | +|---|---|---| +| Log: `:Node backfill incomplete — N node(s) still lack :Node` | backfill interrupted (timeout/OOM/restart) | re-run schema init; it is idempotent (`WHERE NOT n:Node`). Do **not** proceed to Phase B until `N = 0`. | +| `node_node_id_workspace_unique` not created; `ConstraintValidationFailed` | residual duplicate `(node_id, workspace)` | the global dedup (Step 1) should clear it; if a dup persists, inspect and remove it, then re-run. | +| Constraint absent but writers live (Phase B ran too early) | mis-ordered deploy | re-keyed writers can fork identity. Stop writers, run Phase A to completion, dedup, then resume. | + +**Rollback:** the change is backward-compatible — the OLD `:Session`-keyed writer operates +correctly on a graph that has the `:Node` label + constraint. To revert the code, redeploy +the previous build; leave the `:Node` label/constraint in place (harmless), or +`DROP CONSTRAINT node_node_id_workspace_unique` and recreate `idx_node_universal` if a +clean revert of the index is required. + +## Reviewer checklist (what to scrutinise in the PR) + +- Step ordering in `ensure_neo4j_schema`: dedup → backfill → verify → drop index → + `:Node` constraint, **before** any write (`_flush_body` awaits `_ensure_schema` first). +- The Session writer now `MERGE (n:Node {…}) SET n:Session` (identity on `:Node`). +- The edge writer `MERGE`s both endpoints (never silently drops). +- Atomicity for concurrent MERGEs now comes from the `:Node` constraint (was `:Session`). +- Write-safety model: the two-phase procedure plus the LOUD incomplete-backfill warning + (`ensure_neo4j_schema` logs `WARNING` while any node lacks `:Node` and only latches the + store as initialized once every constraint is established) keep the migration + never-silent, so re-keyed writers operate against a fully-migrated graph. diff --git a/pyproject.toml b/pyproject.toml index 8e66c74..3839034 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "context-intelligence-server" -version = "4.0.1" +version = "4.0.2" description = "Context Intelligence Server for Amplifier" requires-python = ">=3.11" dependencies = [ diff --git a/tests/neo4j/test_node_identity_migration.py b/tests/neo4j/test_node_identity_migration.py new file mode 100644 index 0000000..50948d3 --- /dev/null +++ b/tests/neo4j/test_node_identity_migration.py @@ -0,0 +1,151 @@ +"""Live E2E test: the universal :Node identity migration in ensure_neo4j_schema. + +Ships with the B' silent-edge-drop fix. On a graph dirtied by the #19 dead-backfill +bug (duplicate (node_id, workspace) nodes; legacy nodes with NO :Node label), +``ensure_neo4j_schema`` must, in order, leave the graph in the state the re-keyed +writers + the :Node uniqueness constraint require: + + 1. dedup duplicate (node_id, workspace) nodes (global, keep the richest), + 2. backfill the :Node label onto every untagged node, + 3. create the :Node(node_id, workspace) uniqueness constraint. + +See docs/node-identity-migration.md. + +Run explicitly: + cd amplifier-context-intelligence + uv run pytest tests/neo4j/test_node_identity_migration.py -v -m neo4j +""" + +from __future__ import annotations + +from typing import Any + +import pytest +from neo4j import GraphDatabase + +from context_intelligence_server.neo4j_store import ( + Neo4jGraphStore, + ensure_neo4j_schema, +) + +pytestmark = pytest.mark.neo4j + +_WS = "test" + + +def _wipe(container: dict[str, Any]) -> None: + """Reset the shared, session-scoped neo4j_container to a fresh-DB state. + + This test makes GLOBAL graph assertions, drives a global schema migration, and + must SEED duplicate (node_id, workspace) nodes — which is impossible while a + prior test's :Session/:Event uniqueness constraint is still present. So we drop + every node AND every constraint/index, both before (clean slate) and after (no + pollution). Subsequent tests re-create schema idempotently on their next flush. + """ + driver = GraphDatabase.driver( + container["bolt_url"], + auth=(container["user"], container["password"]), + ) + try: + with driver.session() as s: + s.run("MATCH (n) DETACH DELETE n") + # Drop constraints first (removes their backing indexes), then any + # remaining standalone indexes. IF EXISTS keeps each drop idempotent. + for rec in list(s.run("SHOW CONSTRAINTS YIELD name RETURN name")): + s.run(f"DROP CONSTRAINT {rec['name']} IF EXISTS") + for rec in list(s.run("SHOW INDEXES YIELD name RETURN name")): + try: + s.run(f"DROP INDEX {rec['name']} IF EXISTS") + except Exception: # noqa: BLE001 - constraint-backed/lookup indexes + pass + finally: + driver.close() + + +def _seed_dirty_graph(container: dict[str, Any]) -> None: + """Seed the exact dirty state #19's dead backfill leaves behind.""" + driver = GraphDatabase.driver( + container["bolt_url"], + auth=(container["user"], container["password"]), + ) + try: + with driver.session() as s: + # Two duplicate :Event nodes, same (node_id, workspace), NO :Node label + # (an indexed MERGE (n:Node {..}) duplicated a legacy untagged node). + s.run("CREATE (:Event {node_id: 'dup-1', workspace: $ws, v: 1})", ws=_WS) + s.run("CREATE (:Event {node_id: 'dup-1', workspace: $ws, v: 2})", ws=_WS) + # A legacy :Session node written before the :Node label existed. + s.run("CREATE (:Session {node_id: 'legacy-sess', workspace: $ws})", ws=_WS) + finally: + driver.close() + + +async def test_schema_migration_dedups_backfills_and_constrains( + neo4j_container: dict[str, Any], +) -> None: + # Hermetic: start from a clean graph (this test makes GLOBAL assertions and + # drives a global migration) and DETACH DELETE everything afterwards so it + # does not pollute the shared session-scoped container. + _wipe(neo4j_container) + try: + await _run_migration_assertions(neo4j_container) + finally: + _wipe(neo4j_container) + + +async def _run_migration_assertions(neo4j_container: dict[str, Any]) -> None: + _seed_dirty_graph(neo4j_container) + + # Run the migration via a store's async driver (the real production path). + store = Neo4jGraphStore( + uri=neo4j_container["bolt_url"], + auth=(neo4j_container["user"], neo4j_container["password"]), + workspace=_WS, + ) + try: + established = await ensure_neo4j_schema(store._driver, store._database) + finally: + await store.close() + + assert established, ( + "ensure_neo4j_schema did not fully establish — the :Node uniqueness " + "constraint was not created (dirty graph not migrated)." + ) + + driver = GraphDatabase.driver( + neo4j_container["bolt_url"], + auth=(neo4j_container["user"], neo4j_container["password"]), + ) + try: + with driver.session() as s: + dup_count = s.run( + "MATCH (n {node_id: 'dup-1', workspace: $ws}) RETURN count(n) AS c", + ws=_WS, + ).single()["c"] + untagged = s.run( + "MATCH (n) WHERE NOT n:Node RETURN count(n) AS c" + ).single()["c"] + constraint_present = s.run( + "SHOW CONSTRAINTS YIELD name " + "WHERE name = 'node_node_id_workspace_unique' RETURN count(*) AS c" + ).single()["c"] + legacy_tagged = s.run( + "MATCH (n:Node {node_id: 'legacy-sess', workspace: $ws}) " + "RETURN count(n) AS c", + ws=_WS, + ).single()["c"] + finally: + driver.close() + + assert dup_count == 1, ( + f"global dedup failed: expected exactly one node for dup-1, found {dup_count}" + ) + assert untagged == 0, ( + f"backfill incomplete: {untagged} node(s) still lack the :Node label" + ) + assert legacy_tagged == 1, ( + "legacy :Session node was not adopted by the :Node backfill" + ) + assert constraint_present == 1, ( + "the :Node(node_id, workspace) uniqueness constraint was not created" + ) diff --git a/tests/neo4j/test_silent_edge_drop.py b/tests/neo4j/test_silent_edge_drop.py new file mode 100644 index 0000000..92af505 --- /dev/null +++ b/tests/neo4j/test_silent_edge_drop.py @@ -0,0 +1,286 @@ +"""Live E2E test: a cross-session edge whose endpoint is absent must NEVER be +dropped silently (Issue 1 — the HAS_SUBSESSION parent-absent race). + +Root cause being guarded here (Cypher semantics, confirmed against current main +post-#19): + + ``_write_batch`` writes every cross-session edge via ``_edge_merge_cypher``: + + UNWIND $rows AS row + MATCH (src:Node {node_id: row.src_id, workspace: $workspace}) + MATCH (dst:Node {node_id: row.dst_id, workspace: $workspace}) + MERGE (src)-[r:TYPE]->(dst) + SET r += row.props + + The two ``MATCH`` clauses are an inner join: if EITHER endpoint is not yet + committed, that row produces zero bindings, the ``MERGE`` never runs for it, + and the edge is **silently dropped** — no exception, no log, no warning, + ``res.consume()`` reports success. + +This is exactly the W1 race: ``handlers/data_layer_2/session.py:196`` calls +``ensure_session_node(parent_id)`` immediately before buffering the +``HAS_SUBSESSION`` edge ``upsert_edge(parent_id, session_id, ...)`` precisely to +guarantee the parent (``src``) endpoint exists. When the parent ensure lives in +a *different* drainer/flush than the child's edge (the cross-session case), the +parent node can be absent at the child's edge-flush time and the +parent->child ``HAS_SUBSESSION`` edge is lost forever. + +This test forces the parent-absent race against a real Neo4j and asserts the +**never-silent invariant**: writing an edge whose ``src`` endpoint is absent must +not BOTH (a) raise nothing AND (b) leave no edge. Current behaviour violates it +(no error + no edge). The fail-loud / merge-endpoint fix makes it pass. + + RED (current main): flush succeeds, no edge created -> invariant violated. + GREEN (after fix): flush raises loudly OR the edge is present (endpoint + merged) -> invariant holds. + +Requires Docker and the docker Python package. Skip-if-absent via the +``neo4j_container`` fixture in tests/neo4j/conftest.py. + +Run explicitly: + cd amplifier-context-intelligence + uv run pytest tests/neo4j/test_silent_edge_drop.py -v -m neo4j +""" + +from __future__ import annotations + +from typing import Any + +import pytest +from neo4j import GraphDatabase + +from context_intelligence_server.neo4j_store import Neo4jGraphStore + +pytestmark = pytest.mark.neo4j + +# These tests PROVE the silent cross-session edge drop is real on current main +# (#19). They are strict-xfail because the fix is NOT a simple fail-loud raise: +# a blanket raise on a missing endpoint breaks the normal ingest pipeline — the +# generic edge writer legitimately writes edges whose endpoints are not yet +# committed (SOURCED_FROM cross-layer bridges appear 27x; HAS_PART, TRIGGERED, +# HAS_STEP, etc.). Measured on real Neo4j: a fail-loud raise regressed 18 +# integration tests (apples-to-apples vs #19 baseline: 1 fail -> 19 fail). +# The correct never-silent fix (self-healing MERGE-endpoints with unified :Node +# identity, OR surface-don't-abort) is pending design approval — see +# docs/issue1-edge-fix-design.md. When it lands, drop the xfail markers. + +_WS = "test" +_PARENT_ABSENT = "root-parent-absent" +_CHILD = "child-subsession" + + +def _count_node(container: dict[str, Any], node_id: str) -> int: + driver = GraphDatabase.driver( + container["bolt_url"], + auth=(container["user"], container["password"]), + ) + try: + with driver.session() as session: + rec = session.run( + "MATCH (n:Node {node_id: $nid, workspace: $ws}) RETURN count(n) AS c", + nid=node_id, + ws=_WS, + ).single() + return int(rec["c"]) if rec is not None else 0 + finally: + driver.close() + + +def _count_has_subsession_into(container: dict[str, Any], child_id: str) -> int: + """Count HAS_SUBSESSION edges pointing at *child_id* (any source).""" + driver = GraphDatabase.driver( + container["bolt_url"], + auth=(container["user"], container["password"]), + ) + try: + with driver.session() as session: + rec = session.run( + "MATCH ()-[r:HAS_SUBSESSION]->(dst {node_id: $cid, workspace: $ws}) " + "RETURN count(r) AS c", + cid=child_id, + ws=_WS, + ).single() + return int(rec["c"]) if rec is not None else 0 + finally: + driver.close() + + +def _count_edge( + container: dict[str, Any], edge_type: str, src_id: str, dst_id: str +) -> int: + """Count *edge_type* edges between the exact (src_id -> dst_id) pair.""" + driver = GraphDatabase.driver( + container["bolt_url"], + auth=(container["user"], container["password"]), + ) + try: + with driver.session() as session: + rec = session.run( + f"MATCH (s {{node_id: $s, workspace: $ws}})-[r:{edge_type}]->" + "(d {node_id: $d, workspace: $ws}) RETURN count(r) AS c", + s=src_id, + d=dst_id, + ws=_WS, + ).single() + return int(rec["c"]) if rec is not None else 0 + finally: + driver.close() + + +async def _commit_node(store: Neo4jGraphStore, node_id: str) -> None: + await store.upsert_node( + node_id, {"labels": ["Session"], "session_id": node_id, "workspace": _WS} + ) + await store.flush() + + +async def test_cross_session_edge_with_absent_endpoint_is_never_silent( + neo4j_container: dict[str, Any], +) -> None: + """A HAS_SUBSESSION edge to an absent parent must fail loud or not be lost. + + Models the cross-session W1 race exactly: the child SubSession node is + committed, but the parent RootSession node (the edge's ``src``) is NOT yet in + the graph when the edge flush runs. + """ + # --- Arrange: commit ONLY the child (dst) endpoint; parent (src) is absent. + store = Neo4jGraphStore( + uri=neo4j_container["bolt_url"], + auth=(neo4j_container["user"], neo4j_container["password"]), + workspace=_WS, + ) + try: + await store.upsert_node( + _CHILD, + {"labels": ["Session"], "session_id": _CHILD, "workspace": _WS}, + ) + await store.flush() + + # Sanity: the race precondition holds — child present, parent absent. + assert _count_node(neo4j_container, _CHILD) == 1, ( + "test setup broken: child endpoint was not committed" + ) + assert _count_node(neo4j_container, _PARENT_ABSENT) == 0, ( + "test setup broken: parent endpoint should be absent for the race" + ) + + # --- Act: write the parent->child HAS_SUBSESSION edge with parent ABSENT. + await store.upsert_edge( + _PARENT_ABSENT, + _CHILD, + {"type": "HAS_SUBSESSION", "sst_semantic": "LEADS_TO"}, + ) + + raised: Exception | None = None + try: + await store.flush() + except Exception as exc: # noqa: BLE001 - we are characterising loudness + raised = exc + finally: + await store.close() + + # --- Assert: never-silent invariant. + edge_present = _count_has_subsession_into(neo4j_container, _CHILD) > 0 + + assert raised is not None or edge_present, ( + "SILENT DROP: a HAS_SUBSESSION edge whose parent (src) endpoint was " + "absent at flush time produced NO exception AND left NO edge in the " + "graph. The edge is lost forever with no error, log, or warning — a " + "'never-silent' violation. Expected the flush to either fail loud or " + "guarantee the edge (merge the endpoint)." + ) + + +async def test_edge_with_absent_dst_is_never_silent( + neo4j_container: dict[str, Any], +) -> None: + """Symmetric case: ``dst`` absent must also fail loud (generic writer). + + The edge writer is generic across edge types; a different caller may race on + the ``dst`` endpoint instead of ``src``. Both must be never-silent. + """ + store = Neo4jGraphStore( + uri=neo4j_container["bolt_url"], + auth=(neo4j_container["user"], neo4j_container["password"]), + workspace=_WS, + ) + try: + await _commit_node(store, "src-present") + await store.upsert_edge("src-present", "dst-absent", {"type": "HAS_SUBSESSION"}) + raised: Exception | None = None + try: + await store.flush() + except Exception as exc: # noqa: BLE001 + raised = exc + finally: + await store.close() + + edge_present = ( + _count_edge(neo4j_container, "HAS_SUBSESSION", "src-present", "dst-absent") > 0 + ) + assert raised is not None or edge_present, ( + "SILENT DROP: edge with absent dst produced no error and no edge." + ) + + +async def test_edge_with_both_endpoints_absent_is_never_silent( + neo4j_container: dict[str, Any], +) -> None: + """Both endpoints absent must fail loud, never silently drop.""" + store = Neo4jGraphStore( + uri=neo4j_container["bolt_url"], + auth=(neo4j_container["user"], neo4j_container["password"]), + workspace=_WS, + ) + try: + await store.upsert_edge("ghost-src", "ghost-dst", {"type": "HAS_SUBSESSION"}) + raised: Exception | None = None + try: + await store.flush() + except Exception as exc: # noqa: BLE001 + raised = exc + finally: + await store.close() + + edge_present = ( + _count_edge(neo4j_container, "HAS_SUBSESSION", "ghost-src", "ghost-dst") > 0 + ) + assert raised is not None or edge_present, ( + "SILENT DROP: edge with both endpoints absent produced no error and no edge." + ) + + +async def test_edge_with_both_endpoints_present_writes_without_raising( + neo4j_container: dict[str, Any], +) -> None: + """Happy path: when BOTH endpoints exist, the edge is written and NO error + is raised — the fail-loud net must not break the normal write path. + """ + store = Neo4jGraphStore( + uri=neo4j_container["bolt_url"], + auth=(neo4j_container["user"], neo4j_container["password"]), + workspace=_WS, + ) + try: + await _commit_node(store, "parent-present") + await _commit_node(store, "child-present") + await store.upsert_edge( + "parent-present", + "child-present", + {"type": "HAS_SUBSESSION", "sst_semantic": "LEADS_TO"}, + ) + raised: Exception | None = None + try: + await store.flush() + except Exception as exc: # noqa: BLE001 + raised = exc + finally: + await store.close() + + assert raised is None, f"happy-path edge write raised unexpectedly: {raised!r}" + assert ( + _count_edge( + neo4j_container, "HAS_SUBSESSION", "parent-present", "child-present" + ) + == 1 + ), "happy-path edge was not written exactly once" diff --git a/tests/routers/test_version.py b/tests/routers/test_version.py index 39ba495..6e0655d 100644 --- a/tests/routers/test_version.py +++ b/tests/routers/test_version.py @@ -50,13 +50,13 @@ async def test_returns_200_through_auth_middleware( assert response.status_code == 200 -class TestVersionIs4_0_1: - """pyproject.toml is the single source of truth and must declare version 4.0.1.""" +class TestVersionIs4_0_2: + """pyproject.toml is the single source of truth and must declare version 4.0.2.""" - def test_pyproject_version_is_4_0_1(self) -> None: + def test_pyproject_version_is_4_0_2(self) -> None: import tomllib from pathlib import Path pyproject = Path(__file__).resolve().parents[2] / "pyproject.toml" data = tomllib.loads(pyproject.read_text(encoding="utf-8")) - assert data["project"]["version"] == "4.0.1" + assert data["project"]["version"] == "4.0.2" diff --git a/tests/test_neo4j_store.py b/tests/test_neo4j_store.py index 6b15741..37459be 100644 --- a/tests/test_neo4j_store.py +++ b/tests/test_neo4j_store.py @@ -1231,27 +1231,33 @@ async def test_set_labels_updates_node_buffer_immediately(self) -> None: class TestNeo4jGraphStoreFlushNoLabelMerge: - """flush() uses label-aware MERGE for Session nodes + label-free MERGE for others. - - Session nodes use MERGE (n:Session {node_id, workspace}) so they benefit from the - Session uniqueness constraint, preventing concurrent-worker duplicate nodes. - Non-session nodes use label-free MERGE (no uniqueness constraint needed). - Labels beyond Session (RootSession, SubSession, ForkedSession) are applied separately - via MATCH ... SET n:Label after the MERGE. + """flush() MERGEs node identity on the universal :Node label. + + All node writers (Session and others) MERGE on (n:Node {node_id, workspace}) so a + bare :Node placeholder created by the cross-session edge writer converges with the + later typed write instead of splitting identity. Concurrent-worker atomicity is + guaranteed by the :Node(node_id, workspace) UNIQUENESS CONSTRAINT (it replaced the + former :Session-keyed MERGE + :Session constraint for this purpose). The :Session + type label is applied via ``SET n:Session`` in the same MERGE statement; labels + beyond Session (RootSession, SubSession, ForkedSession) are applied separately via + MATCH ... SET n:Label. """ def _make_store(self) -> Neo4jGraphStore: return _make_store(workspace="test-ws") async def test_flush_session_node_merge_includes_session_label(self) -> None: - """flush() MERGE for Session nodes must include the Session label. + """flush() MERGE for Session nodes keys on :Node and SETs the Session label. - Correct: MERGE (n:Session {node_id: ..., workspace: ...}) - Wrong: MERGE (n {node_id: ..., workspace: ...}) + Correct: MERGE (n:Node {node_id: ..., workspace: ...}) SET ... n:Session + Wrong: MERGE (n:Session {node_id: ..., workspace: ...}) (splits identity + vs a bare :Node placeholder created by the cross-session edge writer) - Using the Session label in MERGE lets the uniqueness constraint on - (Session.node_id, Session.workspace) make concurrent MERGEs atomic — - preventing duplicate nodes from concurrent worker flushes. + Identity MERGEs on the universal :Node label so a placeholder endpoint + converges with this typed write; the :Node(node_id, workspace) uniqueness + constraint makes concurrent MERGEs atomic (the role :Session used to play). + The :Session type label is still guaranteed — applied via SET in the same + statement. """ import re @@ -1272,9 +1278,13 @@ async def test_flush_session_node_merge_includes_session_label(self) -> None: if c.args and "MERGE" in str(c.args[0]) ] assert merge_queries, "Expected at least one MERGE query during flush" - # Session nodes must use Session-label MERGE (benefits from uniqueness constraint) - assert any(re.search(r"MERGE\s*\(n:Session\s*\{", q) for q in merge_queries), ( - f"flush() MUST use 'MERGE (n:Session {{...}})' for Session nodes. " + # Session node identity MERGEs on :Node (converges with edge-writer placeholders; + # atomic via the :Node uniqueness constraint) and SETs the :Session label. + assert any( + re.search(r"MERGE\s*\(n:Node\s*\{", q) and "n:Session" in q + for q in merge_queries + ), ( + f"flush() MUST MERGE Session-node identity on :Node and SET n:Session. " f"Queries issued: {merge_queries}" ) @@ -1315,13 +1325,14 @@ async def test_flush_second_cycle_uses_session_label_merge_for_reclassified_node if c.args and "MERGE" in str(c.args[0]) ] assert merge_queries_flush2, "Expected MERGE queries in flush 2" - # Both flush cycles must use Session-label MERGE for this Session node + # Both flush cycles MERGE identity on :Node (converges across cycles + with + # edge-writer placeholders; atomic via the :Node uniqueness constraint). assert any( - re.search(r"MERGE\s*\(n:Session\s*\{", q) for q in merge_queries_flush2 + re.search(r"MERGE\s*\(n:Node\s*\{", q) for q in merge_queries_flush2 ), ( - f"Flush 2 MUST use 'MERGE (n:Session {{...}})' for Session nodes. " - f"Session base label is always present, so this MERGE finds the existing node. " - f"Queries seen: {merge_queries_flush2}" + f"Flush 2 MUST MERGE Session-node identity on :Node. " + f"Identity is always (node_id, workspace) on :Node, so this MERGE finds " + f"the existing node across cycles. Queries seen: {merge_queries_flush2}" ) # Labels must be applied separately via MATCH ... SET n:Label @@ -1338,10 +1349,12 @@ async def test_flush_second_cycle_uses_session_label_merge_for_reclassified_node @pytest.mark.anyio async def test_flush_session_nodes_use_label_aware_merge(self) -> None: - """Session nodes must use MERGE (n:Session ...) not label-free MERGE. + """Session nodes MERGE identity on :Node (SET n:Session), not label-free MERGE. - Regression test: label-free MERGE doesn't benefit from the Session uniqueness - constraint, allowing concurrent flushes to create duplicates. + Regression test: a label-free MERGE (n {...}) is unindexed (AllNodesScan) and + unconstrained; identity MERGEs on :Node so it is index-backed and made atomic + by the :Node(node_id, workspace) uniqueness constraint, preventing concurrent + flushes (and edge-writer placeholders) from creating duplicates. """ store = self._make_store() store._node_buffer["s1"] = { @@ -1380,10 +1393,11 @@ async def _execute_write(fn, *args, **kwargs): q for q in captured_queries if "MERGE" in q and "row.node_id" in q ] assert merge_queries, "Must have at least one MERGE query" - # Session nodes must use MERGE (n:Session ...) — not label-free MERGE (n {..}) + # Session-node identity MERGEs on :Node (index-backed + atomic via the :Node + # uniqueness constraint) and SETs the :Session label — never a label-free MERGE. for q in merge_queries: - assert "MERGE (n:Session {" in q, ( - f"Session nodes must use label-aware MERGE with Session label. Got: {q}" + assert "MERGE (n:Node {" in q and "n:Session" in q, ( + f"Session nodes must MERGE identity on :Node and SET n:Session. Got: {q}" ) @pytest.mark.anyio @@ -2102,6 +2116,14 @@ def _driver_raising_on_constraint(code: str): async def _run(query, *args, **kwargs): if "CREATE CONSTRAINT" in query: raise err + if "RETURN count(n) AS remaining" in query: + # Backfill verification count — model a fully-tagged graph (0 + # untagged) so the migration logs INFO, not the incomplete WARNING. + # (A bare AsyncMock() would yield MagicMock-int == 1, a spurious + # "backfill incomplete" warning unrelated to these constraint tests.) + result = AsyncMock() + result.single = AsyncMock(return_value={"remaining": 0}) + return result return AsyncMock() mock_session = AsyncMock() diff --git a/uv.lock b/uv.lock index f68eb41..7a820b1 100644 --- a/uv.lock +++ b/uv.lock @@ -163,7 +163,7 @@ wheels = [ [[package]] name = "context-intelligence-server" -version = "4.0.1" +version = "4.0.2" source = { editable = "." } dependencies = [ { name = "aiofiles" },