fix: chunk Neo4j flush into bounded sub-transactions to prevent transaction-memory OOM#14
Closed
bkrabach wants to merge 1 commit into
Closed
fix: chunk Neo4j flush into bounded sub-transactions to prevent transaction-memory OOM#14bkrabach wants to merge 1 commit into
bkrabach wants to merge 1 commit into
Conversation
…action-memory OOM Previously, _flush_body wrote the entire node+edge+patch buffer in a single execute_write transaction. On failure, the finally block would restore and re-merge the whole snapshot, causing a self-amplifying grow-spiral under continued load—each failed flush became larger than the last, leading to unbounded transaction memory growth and MemoryPoolOutOfMemoryError. This fix rewrites the flush to: - Write in bounded sub-transactions (nodes→edges→patches) - Respect neo4j_flush_chunk_size (default 200) - Restore only the un-committed remainder on failure (killing the grow-spiral) - Maintain strict ordering for referential integrity Adds neo4j_flush_chunk_size tunable (AMPLIFIER_CONTEXT_INTELLIGENCE_SERVER_NEO4J_FLUSH_CHUNK_SIZE). Tests: 6/6 real-neo4j flush tests pass (4 existing = no regression, 2 new bounding + failure-restore tests), 1273 unit tests pass, ruff+pyright clean. Fixes: microsoft-amplifier/amplifier-support#278 Generated with [Amplifier](https://github.com/microsoft/amplifier) Co-Authored-By: Amplifier <240397093+microsoft-amplifier@users.noreply.github.com>
Collaborator
|
Thanks @bkrabach — we independently arrived at the same core fix (chunked bounded sub-transactions in #15 is a functional superset of this PR, so I'd propose consolidating onto it and closing this one:
Proposing we close this in favor of #15. Thanks for the independent confirmation of the root cause and fix. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
Under sustained ingest backpressure (e.g. draining a large per-session backlog after a restart), the Neo4j write flush builds a single unbounded transaction and OOMs:
It is an unbounded transaction, not a sizing problem. Observed in the field: the "currently using" figure tracks whatever ceiling is configured (it climbed in lock-step when the ceiling was raised 20.6 GiB → 40 GiB) while the Neo4j process RSS stayed ~4.7 GiB. Raising memory only moves the wall.
Root cause
Neo4jGraphStore._flush_bodysnapshots the entire_node_buffer+_edge_buffer+_label_patchesand writes them in oneexecute_write(_write_batch, ...)transaction. On failure thefinallyblock restores the whole snapshot back into the buffers, so under continued load each failed flush makes the next one larger → a self-amplifying grow-spiral that never commits. The per-session queue offset therefore never advances, and the data is stuck (it stays safe on disk in the JSONL queue, but never reaches the graph). Both ingest (registry._flush_barrier) and session finalization (registry._finalize_session) funnel through the sameflush(), so both paths exhibit it.The existing
_DRAIN_MAX_BATCH = 100read cap does not help — the buffer/flush, not the read batch, is the unbounded unit.Fix
Write the buffered snapshot in bounded sub-transactions of at most
neo4j_flush_chunk_sizeitems each:_flush_bodynow writes in three ordered phases — nodes → edges → label-patches (ordering preserved for referential integrity: edges/patchesMATCHnodes that must already exist). Each chunk is its ownexecute_write, so per-transaction memory is bounded regardless of backlog size._write_batchis unchanged — still a pure function of its args; each phase passes only its category populated.New tunable
neo4j_flush_chunk_size(default 200) — added toSettings, reachable viaAMPLIFIER_CONTEXT_INTELLIGENCE_SERVER_NEO4J_FLUSH_CHUNK_SIZEorneo4j_flush_chunk_size:inserver-config.yaml, matching the existingwrite_concurrencypattern.Tests / proof
New, in
tests/neo4j/test_concurrent_flush.py(@pytest.mark.neo4j, real Neo4j container):test_chunked_flush_writes_all_in_bounded_chunks— buffers 500 nodes + 200 edges atchunk=50; asserts all rows land and no transaction exceeded 50 items (instruments_write_batchper-call sizes).test_chunked_flush_partial_failure_restores_only_remainder— injects a mid-flush failure; asserts already-committed chunks are durable, the buffer holds only the remainder (not the whole snapshot), and a retry completes.Results on this branch:
tests/neo4j/test_concurrent_flush.py— 6 passed (4 pre-existing flush/drain/poison tests = no regression, plus the 2 above), realneo4j:5.26.22-community.pytest -m "not neo4j and not integration"— 1273 passed.ruff+pyright— clean.Notes
neo4j_flush_chunk_size=200is conservative; operators can tune.main@750de9d.