From 5568b4db63d11d1b915c511938f277d5f8ed20fc Mon Sep 17 00:00:00 2001 From: duguwanglong Date: Thu, 14 May 2026 19:08:26 +0800 Subject: [PATCH 1/3] fix(workflow,syslog): prevent syslog-driven memory growth in stream_alert_dedup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root causes identified: 1. Large alert lists (enriched_alerts, unique_alerts, …) were written verbatim into every workflow_execution SQLite row — both in outputResults and in each executionLog snapshot emitted by _on_step_complete — duplicating megabytes of data that is already persisted to JSONL on disk. 2. The problem affected both the syslog trigger path (manager.py) and the HTTP background execution path (routes/workflow.py), but only the former had a local fix applied; the HTTP path was left unprotected. Changes: - execution_store.py: add compact_outputs_for_storage() and compact_history_for_storage() as the canonical compact API. Lists whose key belongs to DEFAULT_LARGE_LIST_KEYS and whose length exceeds DEFAULT_COMPACT_SIZE_THRESHOLD (100) are replaced with a lightweight __count integer; small lists and unknown keys pass through unchanged, so ordinary metadata arrays are never silently dropped. - syslog/manager.py: remove the local _compact_outputs/_compact_history implementation and import the shared helpers from execution_store. - routes/workflow.py: apply compact_outputs_for_storage to outputResults and compact_history_for_storage to executionLog in both the background execution path and the workflow-center invoke path. Also compact each step dict inside _on_step_complete before appending to step_history so that every intermediate _write_progress snapshot stays bounded. - tests/workflow/test_execution_store_compact.py: 14 new unit tests covering the size-threshold guard, non-mutation contract, defensive pass-through for non-dict inputs, and the end-to-end size reduction (>1 000x on a 10K-alert payload). Co-authored-by: Cursor --- flocks/ingest/syslog/manager.py | 7 +- flocks/server/routes/workflow.py | 23 +- flocks/workflow/execution_store.py | 82 ++++++- .../workflow/test_execution_store_compact.py | 225 ++++++++++++++++++ 4 files changed, 331 insertions(+), 6 deletions(-) create mode 100644 tests/workflow/test_execution_store_compact.py diff --git a/flocks/ingest/syslog/manager.py b/flocks/ingest/syslog/manager.py index 91037db45..0ef9aae6b 100644 --- a/flocks/ingest/syslog/manager.py +++ b/flocks/ingest/syslog/manager.py @@ -9,6 +9,8 @@ from flocks.storage.storage import Storage from flocks.utils.log import Log from flocks.workflow.execution_store import ( + compact_history_for_storage, + compact_outputs_for_storage, create_execution_record, record_execution_result, resolve_execution_outcome, @@ -21,6 +23,7 @@ log = Log.create(service="syslog.manager") + # Maximum concurrent workflow executions per workflow to avoid FD exhaustion and SQLite write contention _MAX_CONCURRENT_EXECUTIONS = 8 # Maximum number of buffered syslog messages per workflow; excess messages are dropped with a warning. @@ -468,11 +471,11 @@ async def _trigger_workflow( duration = time.time() - start_time exec_data.update({ "status": status, - "outputResults": result.outputs if isinstance(result.outputs, dict) else {}, + "outputResults": compact_outputs_for_storage(result.outputs), "finishedAt": int(time.time() * 1000), "duration": duration, "errorMessage": error_msg, - "executionLog": list(result.history or []), + "executionLog": compact_history_for_storage(result.history), "currentNodeId": result.last_node_id, "currentPhase": status, "currentStepIndex": result.steps, diff --git a/flocks/server/routes/workflow.py b/flocks/server/routes/workflow.py index 8f77ee3f7..3a59fd5e4 100644 --- a/flocks/server/routes/workflow.py +++ b/flocks/server/routes/workflow.py @@ -46,6 +46,8 @@ ) from flocks.ingest.syslog.constants import WORKFLOW_SYSLOG_CONFIG_PREFIX from flocks.workflow.execution_store import ( + compact_history_for_storage, + compact_outputs_for_storage, create_execution_record, normalize_execution_status as _normalize_execution_status, record_execution_result as _record_execution_result, @@ -499,7 +501,15 @@ def _on_step_start(_run_id, step_index, node, _inputs): return step_index def _on_step_complete(step_result) -> None: + # Compact each step's outputs *before* appending so the running + # ``step_history`` (and every subsequent ``_write_progress`` snapshot + # that ships it to SQLite) stays bounded, even when a workflow node + # returns tens of thousands of alerts that are already persisted to + # JSONL on disk. step_dict = step_result.model_dump(mode="json") + raw_outputs = step_dict.get("outputs") + if isinstance(raw_outputs, dict): + step_dict["outputs"] = compact_outputs_for_storage(raw_outputs) step_history.append(step_dict) _write_progress({ "executionLog": list(step_history), @@ -530,12 +540,17 @@ def _on_step_complete(step_result) -> None: # status write still succeeds rather than blowing up. current_data = {"id": exec_id, "workflowId": workflow_id} status_value, error_message = _resolve_execution_outcome(result) + # ``result.history`` is the engine-side authoritative history (not + # yet compacted), while ``step_history`` was already compacted in + # ``_on_step_complete``. Prefer the former when available, then + # run it through ``compact_history_for_storage`` so the persisted + # row stays small in either branch. current_data.update({ - "outputResults": result.outputs, + "outputResults": compact_outputs_for_storage(result.outputs), "status": status_value, "finishedAt": int(time.time() * 1000), "duration": duration, - "executionLog": result.history or list(step_history), + "executionLog": compact_history_for_storage(result.history) or list(step_history), "errorMessage": error_message, "currentNodeId": result.last_node_id, "currentNodeType": current_data.get("currentNodeType"), @@ -1073,7 +1088,9 @@ async def workflow_center_invoke(workflow_id: str, req: WorkflowCenterInvokeRequ status_value = _normalize_execution_status(raw_status) success = status_value == "success" exec_data.update({ - "outputResults": result.get("outputs", {}) if isinstance(result, dict) else {}, + "outputResults": compact_outputs_for_storage( + result.get("outputs", {}) if isinstance(result, dict) else {} + ), "status": status_value, "finishedAt": int(time.time() * 1000), "duration": duration, diff --git a/flocks/workflow/execution_store.py b/flocks/workflow/execution_store.py index cf8bc3091..a160c6d5e 100644 --- a/flocks/workflow/execution_store.py +++ b/flocks/workflow/execution_store.py @@ -5,7 +5,7 @@ import asyncio import time import uuid -from typing import Any, Dict, Optional +from typing import Any, Dict, Iterable, List, Optional from flocks.session.recorder import Recorder from flocks.storage.storage import Storage @@ -14,6 +14,86 @@ log = Log.create(service="workflow.execution_store") +# Keys whose values are expected to be large alert/event lists that have +# already been persisted elsewhere (typically JSONL on disk). When writing +# the execution record to SQLite we replace them with a ``__count`` +# integer to keep row sizes bounded. Callers may extend or override this +# set via the ``keys`` argument of the compact helpers below. +DEFAULT_LARGE_LIST_KEYS: frozenset[str] = frozenset( + { + "enriched_alerts", + "unique_alerts", + "raw_alerts", + "normalized_alerts", + "filtered_alerts", + } +) + +# Lists smaller than this many items are passed through verbatim. The cap +# protects against accidentally stripping small metadata lists that happen +# to share a name with a known large-list key. +DEFAULT_COMPACT_SIZE_THRESHOLD: int = 100 + + +def compact_outputs_for_storage( + outputs: Any, + *, + keys: Iterable[str] = DEFAULT_LARGE_LIST_KEYS, + size_threshold: int = DEFAULT_COMPACT_SIZE_THRESHOLD, +) -> Dict[str, Any]: + """Return a copy of *outputs* with large alert lists replaced by counts. + + Only list values whose key is in *keys* AND whose length exceeds + *size_threshold* are compacted; everything else is passed through. + This prevents megabytes of alert data from being serialised into the + ``workflow_execution`` SQLite row on every invocation, while still + keeping small lists (e.g. error details, short configuration arrays) + fully inspectable in the execution-history UI. + """ + if not isinstance(outputs, dict): + return {} + key_set = frozenset(keys) + compacted: Dict[str, Any] = {} + for k, v in outputs.items(): + if ( + k in key_set + and isinstance(v, list) + and len(v) > size_threshold + ): + compacted[f"_{k}_count"] = len(v) + else: + compacted[k] = v + return compacted + + +def compact_history_for_storage( + history: Any, + *, + keys: Iterable[str] = DEFAULT_LARGE_LIST_KEYS, + size_threshold: int = DEFAULT_COMPACT_SIZE_THRESHOLD, +) -> List[Any]: + """Strip large alert lists from step outputs in workflow history. + + Returns an empty list when *history* is falsy. Non-dict step entries + (defensive: shouldn't happen with normal ``StepResult`` dumps) are + passed through unchanged so the caller sees no surprising drops. + """ + if not history: + return [] + result: List[Any] = [] + for step in history: + if not isinstance(step, dict): + result.append(step) + continue + step_copy = dict(step) + raw_outputs = step_copy.get("outputs") + if isinstance(raw_outputs, dict): + step_copy["outputs"] = compact_outputs_for_storage( + raw_outputs, keys=keys, size_threshold=size_threshold + ) + result.append(step_copy) + return result + # Maximum number of execution history records retained per workflow. # Older records are pruned automatically to prevent a syslog flood from bloating Storage. _MAX_EXECUTION_HISTORY_PER_WORKFLOW = 500 diff --git a/tests/workflow/test_execution_store_compact.py b/tests/workflow/test_execution_store_compact.py new file mode 100644 index 000000000..7da53f389 --- /dev/null +++ b/tests/workflow/test_execution_store_compact.py @@ -0,0 +1,225 @@ +"""Regression tests for ``compact_outputs_for_storage`` and +``compact_history_for_storage`` in ``flocks.workflow.execution_store``. + +These helpers protect the ``workflow_execution`` SQLite row from being +inflated to tens of MB per syslog message: each execution of +``stream_alert_dedup`` (and similar streaming workflows) can produce +``enriched_alerts``/``unique_alerts`` lists with thousands of items that +are already persisted to JSONL on disk. Without compaction, those lists +end up duplicated both in the final ``outputResults`` and in every +intermediate ``executionLog`` snapshot written by ``_on_step_complete``, +which is the root cause of the syslog-driven memory blow-up. + +The tests below pin the externally observable contract so future +refactors don't accidentally drop the protection or, conversely, start +stripping legitimately small metadata lists. +""" + +from __future__ import annotations + +from typing import Any, Dict, List + +import pytest + +from flocks.workflow.execution_store import ( + DEFAULT_COMPACT_SIZE_THRESHOLD, + DEFAULT_LARGE_LIST_KEYS, + compact_history_for_storage, + compact_outputs_for_storage, +) + + +def _make_alerts(n: int) -> List[Dict[str, Any]]: + return [{"sip": f"1.2.3.{i % 256}", "url": f"/p/{i}"} for i in range(n)] + + +# ── compact_outputs_for_storage ─────────────────────────────────────────────── + + +def test_compact_outputs_strips_large_alert_lists() -> None: + big = _make_alerts(5_000) + outputs = { + "enriched_alerts": big, + "unique_alerts": big[:1_000], + "dedup_key": "abc", + "stats": {"raw_count": 5_000}, + } + + compacted = compact_outputs_for_storage(outputs) + + assert compacted["_enriched_alerts_count"] == 5_000 + assert compacted["_unique_alerts_count"] == 1_000 + assert "enriched_alerts" not in compacted + assert "unique_alerts" not in compacted + # Non-list metadata is preserved verbatim. + assert compacted["dedup_key"] == "abc" + assert compacted["stats"] == {"raw_count": 5_000} + + +def test_compact_outputs_keeps_small_lists_verbatim() -> None: + """A list whose key matches but stays below the size threshold is + passed through unchanged: small metadata arrays (e.g. error details) + must remain inspectable in the execution-history UI. + """ + small = _make_alerts(10) + outputs = {"enriched_alerts": small, "stats": {"raw_count": 10}} + + compacted = compact_outputs_for_storage(outputs) + + assert compacted["enriched_alerts"] == small + assert "_enriched_alerts_count" not in compacted + + +def test_compact_outputs_ignores_unknown_keys() -> None: + big_unknown = _make_alerts(5_000) + outputs = {"some_other_alerts": big_unknown} + + compacted = compact_outputs_for_storage(outputs) + + # Unknown keys are not in the default large-list set; they must pass + # through even if huge, so callers don't get surprising drops. + assert compacted["some_other_alerts"] is big_unknown + + +def test_compact_outputs_accepts_custom_keys_and_threshold() -> None: + big = _make_alerts(150) + outputs = {"custom_payload": big, "enriched_alerts": _make_alerts(50)} + + compacted = compact_outputs_for_storage( + outputs, + keys={"custom_payload"}, + size_threshold=100, + ) + + assert compacted["_custom_payload_count"] == 150 + # Default key is no longer in the override set so its list is kept. + assert compacted["enriched_alerts"] == _make_alerts(50) + + +def test_compact_outputs_handles_non_dict_input() -> None: + assert compact_outputs_for_storage(None) == {} + assert compact_outputs_for_storage([1, 2, 3]) == {} + assert compact_outputs_for_storage("oops") == {} + + +def test_compact_outputs_does_not_mutate_input() -> None: + big = _make_alerts(5_000) + outputs = {"enriched_alerts": big, "dedup_key": "abc"} + + compact_outputs_for_storage(outputs) + + assert "enriched_alerts" in outputs + assert outputs["enriched_alerts"] is big + assert outputs["dedup_key"] == "abc" + + +def test_compact_outputs_drastically_reduces_serialised_size() -> None: + """End-to-end size guarantee: the typical 10K-alert payload should + shrink by more than 1000x once compacted, which is what makes the + SQLite row size bounded under syslog throughput. + """ + import json + + big = [ + { + "sip": f"1.2.3.{i % 256}", + "req_http_url": "/admin?id=" + "x" * 200, + "req_body": "b" * 300, + "dedup_key": "abc" * 10, + } + for i in range(10_000) + ] + outputs = {"enriched_alerts": big, "unique_alerts": big[:2_000]} + + before = len(json.dumps(outputs).encode()) + after = len(json.dumps(compact_outputs_for_storage(outputs)).encode()) + + assert before > 1_000_000 # ≥ 1 MB before + assert after < 1_000 # < 1 KB after + assert before / after > 1_000 + + +# ── compact_history_for_storage ─────────────────────────────────────────────── + + +def test_compact_history_compacts_each_step_outputs() -> None: + big = _make_alerts(5_000) + history = [ + {"node_id": "receive", "outputs": {"raw_alerts": big}}, + {"node_id": "normalize", "outputs": {"normalized_alerts": big}}, + {"node_id": "dedup", "outputs": {"enriched_alerts": big, "dedup_key": "x"}}, + ] + + compacted = compact_history_for_storage(history) + + assert compacted[0]["outputs"] == {"_raw_alerts_count": 5_000} + assert compacted[1]["outputs"] == {"_normalized_alerts_count": 5_000} + assert compacted[2]["outputs"]["_enriched_alerts_count"] == 5_000 + assert compacted[2]["outputs"]["dedup_key"] == "x" + # Top-level keys (node_id) untouched. + assert [s["node_id"] for s in compacted] == ["receive", "normalize", "dedup"] + + +def test_compact_history_passes_through_falsy_history() -> None: + assert compact_history_for_storage(None) == [] + assert compact_history_for_storage([]) == [] + + +def test_compact_history_does_not_mutate_input() -> None: + big = _make_alerts(5_000) + history = [{"node_id": "x", "outputs": {"enriched_alerts": big}}] + + compact_history_for_storage(history) + + assert history[0]["outputs"]["enriched_alerts"] is big + + +def test_compact_history_tolerates_non_dict_steps() -> None: + """Defensive: a malformed step entry should pass through rather than + crash the syslog/HTTP execution recorder. + """ + history = [ + "not-a-dict", + {"node_id": "ok", "outputs": {"enriched_alerts": _make_alerts(5_000)}}, + 42, + ] + + compacted = compact_history_for_storage(history) + + assert compacted[0] == "not-a-dict" + assert compacted[2] == 42 + assert compacted[1]["outputs"]["_enriched_alerts_count"] == 5_000 + + +def test_compact_history_skips_step_with_non_dict_outputs() -> None: + history = [{"node_id": "weird", "outputs": "string-output"}] + + compacted = compact_history_for_storage(history) + + # Non-dict outputs are left as-is (defensive pass-through). + assert compacted[0]["outputs"] == "string-output" + + +# ── Defaults exposed to callers ─────────────────────────────────────────────── + + +def test_default_large_list_keys_cover_stream_alert_dedup_outputs() -> None: + """The default key set must include every large list produced by the + stream_alert_dedup workflow; otherwise syslog memory growth regresses + silently. + """ + expected = { + "enriched_alerts", + "unique_alerts", + "raw_alerts", + "normalized_alerts", + "filtered_alerts", + } + assert expected <= DEFAULT_LARGE_LIST_KEYS + + +def test_default_compact_size_threshold_is_reasonable() -> None: + # The threshold must be high enough to keep ordinary metadata lists + # (a few dozen items at most) intact, but low enough that megabyte- + # scale payloads get compacted on every triggered execution. + assert 1 <= DEFAULT_COMPACT_SIZE_THRESHOLD <= 1_000 From 0528f0c84ac35bc5e1800fa91f28d5597cc0cf54 Mon Sep 17 00:00:00 2001 From: duguwanglong Date: Thu, 14 May 2026 19:28:05 +0800 Subject: [PATCH 2/3] fix(workflow): compact inputParams before persisting execution record MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit inputParams was written verbatim into every workflow_execution SQLite row. For HTTP /run batch calls that pass a large alert list as an input parameter (e.g. {"raw_alerts": [...10k items...]}) this inflates each row by the same order of magnitude as the uncompacted outputResults bug fixed in the previous commit. Changes: - execution_store.py: create_execution_record() now passes input_params through compact_outputs_for_storage() before building the record, so known large-list keys (raw_alerts, enriched_alerts, etc.) are replaced with lightweight __count integers. Scalar fields and unknown keys are preserved unchanged, so audit / replay use-cases are not affected. - tests/workflow/test_execution_store_compact.py: two new tests pin the expected behaviour — "alerts" (not in DEFAULT_LARGE_LIST_KEYS) passes through unchanged, while "raw_alerts" (in the default set) is compacted. Co-authored-by: Cursor --- flocks/workflow/execution_store.py | 11 ++++- .../workflow/test_execution_store_compact.py | 42 +++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/flocks/workflow/execution_store.py b/flocks/workflow/execution_store.py index a160c6d5e..50388929f 100644 --- a/flocks/workflow/execution_store.py +++ b/flocks/workflow/execution_store.py @@ -235,10 +235,17 @@ async def create_execution_record( input_params: Optional[Dict[str, Any]] = None, exec_id: Optional[str] = None, ) -> Dict[str, Any]: - """Create and persist a running workflow execution record.""" + """Create and persist a running workflow execution record. + + *input_params* is compacted with the same policy as outputResults so that + batch HTTP calls passing large alert lists (e.g. ``{"alerts": [...10k...]}`` + ) don't bloat the SQLite row with data that is already persisted to disk by + the workflow itself. + """ + compacted_params = compact_outputs_for_storage(input_params or {}) exec_data = build_initial_execution_record( workflow_id, - input_params=input_params, + input_params=compacted_params, exec_id=exec_id, ) await Storage.write(workflow_execution_key(exec_data["id"]), exec_data) diff --git a/tests/workflow/test_execution_store_compact.py b/tests/workflow/test_execution_store_compact.py index 7da53f389..05399282a 100644 --- a/tests/workflow/test_execution_store_compact.py +++ b/tests/workflow/test_execution_store_compact.py @@ -223,3 +223,45 @@ def test_default_compact_size_threshold_is_reasonable() -> None: # (a few dozen items at most) intact, but low enough that megabyte- # scale payloads get compacted on every triggered execution. assert 1 <= DEFAULT_COMPACT_SIZE_THRESHOLD <= 1_000 + + +# ── create_execution_record compacts inputParams ───────────────────────────── + + +def test_compact_outputs_covers_input_params_batch_key() -> None: + """HTTP /run batch calls may pass a large ``alerts`` list as inputParams. + ``compact_outputs_for_storage`` must compact it when the key is in + ``DEFAULT_LARGE_LIST_KEYS`` – this is what ``create_execution_record`` + now does before writing to SQLite. + """ + batch_inputs = { + "alerts": _make_alerts(5_000), + "filter_enabled": True, + "threshold": 0.7, + } + + compacted = compact_outputs_for_storage(batch_inputs) + + assert "_alerts_count" not in compacted, ( + "'alerts' is not in DEFAULT_LARGE_LIST_KEYS so it should pass through unchanged" + ) + # Scalar fields must survive unchanged. + assert compacted["filter_enabled"] is True + assert compacted["threshold"] == 0.7 + + +def test_compact_outputs_covers_raw_alerts_in_input_params() -> None: + """When inputParams contains ``raw_alerts`` (a key that IS in + DEFAULT_LARGE_LIST_KEYS), it must be compacted. + """ + batch_inputs = { + "raw_alerts": _make_alerts(5_000), + "source_log_type": "tdp", + } + + compacted = compact_outputs_for_storage(batch_inputs) + + assert "_raw_alerts_count" in compacted + assert compacted["_raw_alerts_count"] == 5_000 + assert "raw_alerts" not in compacted + assert compacted["source_log_type"] == "tdp" From 7d5fe4918e71680feaf66e153fc5b2d5371755d3 Mon Sep 17 00:00:00 2001 From: duguwanglong Date: Fri, 15 May 2026 10:33:54 +0800 Subject: [PATCH 3/3] fix(workflow): address PR #273 review notes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five issues raised in code review, all fixed in this commit: 1. Docstring/behaviour mismatch — alerts key create_execution_record()'s docstring previously used {"alerts":[…10k]} as the example of what gets compacted. "alerts" is NOT in DEFAULT_LARGE_LIST_KEYS, so the row would silently stay large. The docstring now cites "raw_alerts" (which IS in the set) and adds an explicit note that keys outside the default set are stored verbatim. compact_outputs_for_storage() gains the same clarification so callers can easily see the full list of compacted keys. 2. workflow_center_invoke — executionLog path The invoke path proxies to an external service and never populates executionLog locally, so no data was at risk. Added an explanatory comment and an explicit compact_history_for_storage() call on the executionLog field as a forward-compatible guard against future code that might populate it before the final write. 3. tuple sequences not compacted compact_outputs_for_storage() checked isinstance(v, list), missing tuple values that some serialisation paths may produce. Changed to isinstance(v, (list, tuple)). New test test_compact_outputs_compacts_ tuple_sequences() pins this behaviour. 4. Unused import pytest Removed the bare "import pytest" from test_execution_store_compact.py; no pytest.raises / @pytest.mark usages exist in the file. 5. Observability note compact_outputs_for_storage() docstring now explicitly states that compacted keys are replaced with __count and that callers who need the full list contents must read from the JSONL files written by the workflow. Co-authored-by: Cursor --- flocks/server/routes/workflow.py | 5 +++ flocks/workflow/execution_store.py | 32 ++++++++++++------- .../workflow/test_execution_store_compact.py | 17 ++++++++-- 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/flocks/server/routes/workflow.py b/flocks/server/routes/workflow.py index 3a59fd5e4..77ea94825 100644 --- a/flocks/server/routes/workflow.py +++ b/flocks/server/routes/workflow.py @@ -1087,10 +1087,15 @@ async def workflow_center_invoke(workflow_id: str, req: WorkflowCenterInvokeRequ raw_status = result.get("status", "SUCCEEDED") if isinstance(result, dict) else "SUCCEEDED" status_value = _normalize_execution_status(raw_status) success = status_value == "success" + # workflow_center_invoke proxies to an external published service; no + # step callbacks run locally so executionLog stays as the empty list + # set by create_execution_record. We still run compact_history here + # as a forward-compatible guard in case a future code path populates it. exec_data.update({ "outputResults": compact_outputs_for_storage( result.get("outputs", {}) if isinstance(result, dict) else {} ), + "executionLog": compact_history_for_storage(exec_data.get("executionLog")), "status": status_value, "finishedAt": int(time.time() * 1000), "duration": duration, diff --git a/flocks/workflow/execution_store.py b/flocks/workflow/execution_store.py index 50388929f..c8df3b66a 100644 --- a/flocks/workflow/execution_store.py +++ b/flocks/workflow/execution_store.py @@ -43,12 +43,20 @@ def compact_outputs_for_storage( ) -> Dict[str, Any]: """Return a copy of *outputs* with large alert lists replaced by counts. - Only list values whose key is in *keys* AND whose length exceeds - *size_threshold* are compacted; everything else is passed through. - This prevents megabytes of alert data from being serialised into the - ``workflow_execution`` SQLite row on every invocation, while still - keeping small lists (e.g. error details, short configuration arrays) - fully inspectable in the execution-history UI. + Only **list or tuple** values whose key is in *keys* AND whose length + exceeds *size_threshold* are compacted to ``__count``; everything + else is passed through unchanged. This prevents megabytes of alert data + from being serialised into the ``workflow_execution`` SQLite row on every + invocation, while still keeping small sequences (e.g. error details, short + configuration arrays) fully inspectable in the execution-history UI. + + **Keys that are compacted by default** (see ``DEFAULT_LARGE_LIST_KEYS``): + ``enriched_alerts``, ``unique_alerts``, ``raw_alerts``, + ``normalized_alerts``, ``filtered_alerts``. Keys outside this set — such + as a generic ``alerts`` parameter — are *not* compacted unless the caller + passes a custom *keys* argument. Callers who depend on inspecting the + full list contents of compacted keys must read the data from the JSONL + files written by the workflow itself. """ if not isinstance(outputs, dict): return {} @@ -57,7 +65,7 @@ def compact_outputs_for_storage( for k, v in outputs.items(): if ( k in key_set - and isinstance(v, list) + and isinstance(v, (list, tuple)) and len(v) > size_threshold ): compacted[f"_{k}_count"] = len(v) @@ -237,10 +245,12 @@ async def create_execution_record( ) -> Dict[str, Any]: """Create and persist a running workflow execution record. - *input_params* is compacted with the same policy as outputResults so that - batch HTTP calls passing large alert lists (e.g. ``{"alerts": [...10k...]}`` - ) don't bloat the SQLite row with data that is already persisted to disk by - the workflow itself. + *input_params* is passed through ``compact_outputs_for_storage`` before + writing to SQLite so that batch HTTP calls whose inputs contain a key in + ``DEFAULT_LARGE_LIST_KEYS`` (e.g. ``{"raw_alerts": [...10k items...]}`` + ) don't bloat the row. Keys outside the default set — such as a generic + ``alerts`` parameter — are stored verbatim; pass a custom *keys* argument + to ``compact_outputs_for_storage`` directly if you need broader coverage. """ compacted_params = compact_outputs_for_storage(input_params or {}) exec_data = build_initial_execution_record( diff --git a/tests/workflow/test_execution_store_compact.py b/tests/workflow/test_execution_store_compact.py index 05399282a..311bb2a49 100644 --- a/tests/workflow/test_execution_store_compact.py +++ b/tests/workflow/test_execution_store_compact.py @@ -19,8 +19,6 @@ from typing import Any, Dict, List -import pytest - from flocks.workflow.execution_store import ( DEFAULT_COMPACT_SIZE_THRESHOLD, DEFAULT_LARGE_LIST_KEYS, @@ -96,6 +94,21 @@ def test_compact_outputs_accepts_custom_keys_and_threshold() -> None: assert compacted["enriched_alerts"] == _make_alerts(50) +def test_compact_outputs_compacts_tuple_sequences() -> None: + """``tuple`` values whose key is in the default set must be compacted just + like ``list`` values, since some serialisation paths (e.g. ``exec()`` + return values) may produce tuples instead of lists. + """ + big_tuple = tuple(_make_alerts(5_000)) + outputs = {"enriched_alerts": big_tuple, "dedup_key": "x"} + + compacted = compact_outputs_for_storage(outputs) + + assert compacted["_enriched_alerts_count"] == 5_000 + assert "enriched_alerts" not in compacted + assert compacted["dedup_key"] == "x" + + def test_compact_outputs_handles_non_dict_input() -> None: assert compact_outputs_for_storage(None) == {} assert compact_outputs_for_storage([1, 2, 3]) == {}