Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 157 additions & 35 deletions context_intelligence_server/neo4j_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,32 @@


def _edge_merge_cypher(edge_type: str) -> str:
"""Return the UNWIND edge-MERGE query for *edge_type*.

src/dst are matched via the universal :Node label so each MATCH uses the
composite :Node(node_id, workspace) index (NodeIndexSeek) instead of an
AllNodesScan over the whole graph — the root cause of the multi-second
"Running" edge transactions on the 1.3M-node graph. The relationship
MERGE/SET logic is unchanged from the original label-free query.
"""Return the UNWIND edge-MERGE query for *edge_type* — self-healing endpoints.

src/dst are ``MERGE``d (not ``MATCH``ed) on the universal :Node label, then the
relationship is ``MERGE``d. ``MERGE`` on the composite :Node(node_id, workspace)
key uses a NodeIndexSeek (idx_node_universal / the :Node uniqueness constraint),
never an AllNodesScan, so the 1.3M-node index-seek behaviour from #19 is kept.

Why MERGE and not MATCH: the old ``MATCH (src) MATCH (dst)`` was an inner join
that SILENTLY dropped the edge whenever either endpoint was not yet committed
(the HAS_SUBSESSION parent-absent race — but pervasive: SOURCED_FROM and ~20
other edge types legitimately write edges to not-yet-committed endpoints).
``MERGE`` creates a bare ``:Node`` placeholder for an absent endpoint instead
of dropping the edge; the placeholder converges with the later typed write
because every node writer keys identity on ``:Node`` (Session/Event/etc. add
their type label via ``SET``), backed by the :Node(node_id, workspace)
uniqueness constraint so concurrent MERGEs stay atomic. Never-silent without
aborting the legitimate eventual-consistency ingest.

*edge_type* MUST already be validated via ``_validate_identifier`` by the
caller (it is interpolated into the Cypher, so it cannot be user input).
"""
return (
"UNWIND $rows AS row "
f"MATCH (src:{_UNIVERSAL_NODE_LABEL} "
f"MERGE (src:{_UNIVERSAL_NODE_LABEL} "
"{node_id: row.src_id, workspace: $workspace}) "
f"MATCH (dst:{_UNIVERSAL_NODE_LABEL} "
f"MERGE (dst:{_UNIVERSAL_NODE_LABEL} "
"{node_id: row.dst_id, workspace: $workspace}) "
f"MERGE (src)-[r:{edge_type}]->(dst) "
"SET r += row.props"
Expand Down Expand Up @@ -309,16 +319,36 @@ async def ensure_neo4j_schema(
"""
async with driver.session(database=database) as session:
# ------------------------------------------------------------------
# Step 1: deduplicate any pre-existing duplicate Session and Event nodes.
# For each duplicate (node_id, workspace) group keep the first node
# (as returned by collect()) and DETACH DELETE the remainder.
# This MUST run before constraint creation so a dirty graph does not
# cause the CREATE CONSTRAINT statement to fail. Both Session (Step 3)
# and Event (Step 4) carry a uniqueness constraint, so both must be
# deduplicated here or the corresponding constraint retry never
# converges.
# Step 1: deduplicate any pre-existing duplicate (node_id, workspace)
# nodes. This MUST run before constraint creation so a dirty graph does
# not cause the CREATE CONSTRAINT statements (Session, Event, and the
# universal :Node constraint in Step 6) to fail.
#
# Three passes:
# 1a. GLOBAL by (node_id, workspace) across EVERY label — required for
# the Step-6 :Node uniqueness constraint, which spans all node
# types (Session/Event/OrchestratorRun/Step/ToolExecution/...).
# Keeps the RICHEST node (most labels) so a fully-typed node always
# wins over a bare :Node placeholder; DETACH DELETE the rest.
# Dups here come from the #19 dead-backfill bug (an indexed
# MERGE (n:Node {...}) duplicated legacy untagged nodes).
# 1b/1c. Session / Event specifically — kept for clarity and because
# their per-label uniqueness constraints (Steps 3/4) still apply.
# (Redundant after 1a, but cheap and explicit.)
# All passes are best-effort (non-fatal) and do not affect the return.
# ------------------------------------------------------------------
try:
await session.run(
"MATCH (n) "
"WITH n.node_id AS nid, n.workspace AS ws, collect(n) AS nodes "
"WHERE size(nodes) > 1 "
# keep the node carrying the most labels (typed > bare placeholder)
"WITH nodes, reduce(best = nodes[0], x IN nodes | "
"CASE WHEN size(labels(x)) > size(labels(best)) THEN x ELSE best END) "
"AS keep "
"UNWIND [x IN nodes WHERE x <> keep] AS duplicate "
"DETACH DELETE duplicate"
)
await session.run(
"MATCH (s:Session) "
"WITH s.node_id AS nid, s.workspace AS ws, collect(s) AS nodes "
Expand Down Expand Up @@ -414,14 +444,15 @@ async def _create_index(statement: str) -> bool:
and fully_established
)

# Composite index backing the universal-label node MERGE. The
# non-Session node write does MERGE (n:Node {node_id, workspace}); this
# index turns that into a NodeIndexSeek instead of an AllNodesScan over
# the whole graph (root cause of the 1.3M-node drain stall).
await _create_index(
f"CREATE INDEX idx_node_universal IF NOT EXISTS "
f"FOR (n:{_UNIVERSAL_NODE_LABEL}) ON (n.node_id, n.workspace)"
)
# NOTE: the composite :Node(node_id, workspace) lookup index that backs the
# universal-label node MERGE (NodeIndexSeek, not AllNodesScan — the
# 1.3M-node drain-stall fix) is now provided by the :Node UNIQUENESS
# CONSTRAINT created in Step 6 below (a uniqueness constraint carries its
# own backing range index). A standalone ``CREATE INDEX idx_node_universal``
# here would CONFLICT with that constraint ("a constraint cannot be created
# until the index has been dropped"), so it is intentionally NOT created
# here; Step 6 drops any legacy idx_node_universal first, then creates the
# constraint.

# ------------------------------------------------------------------
# Steps 3 & 4: uniqueness constraints on Session then Event nodes.
Expand Down Expand Up @@ -496,8 +527,6 @@ async def _create_constraint(session: Any, name: str, statement: str) -> bool:
and fully_established
)

return fully_established

# ------------------------------------------------------------------
# Step 5: backfill the universal :Node label onto pre-existing nodes.
# The non-Session node MERGE targets (n:Node {node_id, workspace}) so it
Expand All @@ -506,13 +535,23 @@ async def _create_constraint(session: Any, name: str, statement: str) -> bool:
# would CREATE a duplicate of them rather than match them. Tag every
# such node first.
#
# This MUST run BEFORE the :Node uniqueness constraint (Step 6): the
# constraint cannot be created over a graph that still has untagged nodes
# that would become duplicate (node_id, workspace) :Node pairs, and the
# re-keyed Session/edge MERGEs on :Node must find pre-existing nodes
# rather than fork their identity.
#
# Batched + idempotent: CALL { ... } IN TRANSACTIONS commits in chunks
# (so 1.3M nodes don't blow the per-transaction memory cap), and the
# WHERE NOT n:Node guard makes re-runs converge to a no-op once the
# graph is fully tagged. Must run BEFORE any flush MERGEs on :Node —
# ensure_neo4j_schema is awaited at the top of _flush_body, so it does.
# graph is fully tagged. ensure_neo4j_schema is awaited at the top of
# _flush_body, so this runs before any flush MERGEs on :Node.
# The batch size must be a literal in Cypher, so it is interpolated from
# the in-process int constant (never user input).
#
# NOTE (was a latent bug): this block previously sat after an early
# ``return fully_established`` and was therefore DEAD CODE — the backfill
# never ran, so legacy untagged nodes silently duplicated on re-write.
try:
await session.run(
cast(
Expand All @@ -529,6 +568,83 @@ async def _create_constraint(session: Any, name: str, statement: str) -> bool:
exc,
)

# Backfill verification (observability): the migration's "is it done?"
# signal. A non-zero remaining-untagged count means the universal :Node
# identity is NOT yet established graph-wide, so the Step-6 constraint may
# fail and the re-keyed writers can still fork a legacy node's identity.
# Surfaced LOUD (never-silent) so an incomplete migration is visible in
# logs rather than discovered via duplicate nodes.
try:
rec = await (
await session.run(
f"MATCH (n) WHERE NOT n:{_UNIVERSAL_NODE_LABEL} "
"RETURN count(n) AS remaining"
)
).single()
remaining = int(rec["remaining"]) if rec is not None else 0
if remaining:
_LOG.warning(
"ensure_neo4j_schema: :Node backfill incomplete — %d node(s) "
"still lack the :Node label; the universal-identity migration "
"is NOT complete (re-runs next schema init). Do not rely on the "
":Node uniqueness constraint until this reaches 0.",
remaining,
)
else:
_LOG.info(
"ensure_neo4j_schema: :Node backfill complete (0 untagged nodes)."
)
except Exception as exc: # noqa: BLE001 - observability only, never break init
_LOG.debug(
"ensure_neo4j_schema: backfill verification count skipped (benign): %s",
exc,
)

# ------------------------------------------------------------------
# Step 6: universal :Node identity uniqueness constraint.
# Every node writer now keys identity on :Node (Session/Event/etc. add
# their type label via SET), and the cross-session edge writer MERGEs
# bare :Node endpoint placeholders. This constraint is the atomicity
# guard that keeps concurrent MERGEs on (node_id, workspace) from
# creating divergent duplicate nodes — the role the :Session constraint
# used to play for the Session MERGE. Created AFTER the Step-5 backfill
# so no pre-existing untagged duplicate blocks it.
#
# OPERATIONAL GATE (prod, tracked separately): on the live 1.3M-node
# graph, run + verify the backfill completes (count `WHERE NOT n:Node`
# -> 0) before relying on this constraint, and do not enable the re-keyed
# writers if the constraint is absent. In the test/fresh-DB path the
# backfill is a no-op and this constraint is created up-front.
#
# A uniqueness constraint carries its OWN backing range index, and Neo4j
# refuses to create it while a standalone index on the same (label,
# properties) exists ("a constraint cannot be created until the index has
# been dropped"). #19 shipped a plain `idx_node_universal` index on
# :Node(node_id, workspace); drop it first (IF EXISTS, idempotent) so the
# constraint can take over the seek role. The Step-5 backfill + Step-1
# dedup ran first, so the graph satisfies uniqueness before we drop the
# index and create the constraint.
# ------------------------------------------------------------------
try:
await session.run("DROP INDEX idx_node_universal IF EXISTS")
except (Neo4jError, DriverError) as exc: # pragma: no cover - tolerate
_LOG.debug(
"ensure_neo4j_schema: DROP INDEX idx_node_universal skipped "
"(benign): %s",
exc,
)
fully_established = (
await _create_constraint(
session,
"Node",
"CREATE CONSTRAINT node_node_id_workspace_unique IF NOT EXISTS "
"FOR (n:Node) REQUIRE (n.node_id, n.workspace) IS UNIQUE",
)
and fully_established
)

return fully_established


def _serialized_row_size(value: Any) -> int:
"""Return a cheap conservative proxy for the serialized byte size of *value*.
Expand Down Expand Up @@ -657,18 +773,24 @@ async def _write_batch(
{"node_id": node_id, "labels": sorted(set(labels))}
)

# Session nodes: MERGE by Session label + uniqueness constraint (atomic under concurrency)
# Session nodes: MERGE by the universal :Node identity (atomic under concurrency
# via the :Node(node_id, workspace) uniqueness constraint); the :Session type
# label is added via SET.
# Lock-order hygiene: sort by stable key so every writer acquires node locks in one
# global order, preventing the out-of-order lock cycle that causes deadlocks.
session_rows.sort(key=lambda r: r["node_id"])
if session_rows:
# MERGE key stays :Session (backed by the uniqueness constraint); the
# universal :Node label is added via SET so every node carries it and
# the backfill converges permanently (no need to re-tag new sessions).
# MERGE key is :Node (NOT :Session) so a bare :Node placeholder created by a
# cross-session edge write (_edge_merge_cypher MERGEs its endpoints) CONVERGES
# with this typed write instead of splitting into two nodes. Identity is
# purely (node_id, workspace) on :Node — every node carries :Node, and the
# :Node uniqueness constraint (ensure_neo4j_schema) makes concurrent MERGEs
# atomic, the role the :Session constraint used to play for this MERGE.
res = await tx.run(
"UNWIND $rows AS row "
"MERGE (n:Session {node_id: row.node_id, workspace: row.props.workspace}) "
f"SET n += row.props, n:{_UNIVERSAL_NODE_LABEL}",
f"MERGE (n:{_UNIVERSAL_NODE_LABEL} "
"{node_id: row.node_id, workspace: row.props.workspace}) "
"SET n += row.props, n:Session",
rows=session_rows,
)
await res.consume()
Expand Down
Loading
Loading