From caa0c822aee0f110a39f84577209ce064ff726b7 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 21:25:35 +0000 Subject: [PATCH 01/16] feat(cdk): add source-side memory introspection to emit controlled error messages before OOM kills Add MemoryMonitor class that reads cgroup v2/v1 memory files to detect memory pressure in containerized environments. Integrates into the AirbyteEntrypoint.read() loop to check memory every 1000 messages. - At 85% usage: logs a warning message - At 95% usage: raises MemoryLimitExceeded (AirbyteTracedException with transient_error failure type) for graceful shutdown - No-op when cgroup files unavailable (local dev / CI) - No new dependencies required Related to https://github.com/airbytehq/airbyte-internal-issues/issues/15938 Co-Authored-By: bot_apk --- airbyte_cdk/entrypoint.py | 6 + airbyte_cdk/utils/memory_monitor.py | 133 +++++++++++ unit_tests/utils/test_memory_monitor.py | 301 ++++++++++++++++++++++++ 3 files changed, 440 insertions(+) create mode 100644 airbyte_cdk/utils/memory_monitor.py create mode 100644 unit_tests/utils/test_memory_monitor.py diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 54c207487..3b6e4a8f5 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -41,6 +41,7 @@ from airbyte_cdk.utils import is_cloud_environment, message_utils from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH +from airbyte_cdk.utils.memory_monitor import DEFAULT_CHECK_INTERVAL, MemoryMonitor from airbyte_cdk.utils.traced_exception import AirbyteTracedException logger = init_logger("airbyte") @@ -277,8 +278,13 @@ def read( # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) + memory_monitor = MemoryMonitor() + message_count = 0 for message in self.source.read(self.logger, config, catalog, state): yield self.handle_record_counts(message, stream_message_counter) + message_count += 1 + if message_count % DEFAULT_CHECK_INTERVAL == 0: + memory_monitor.check_memory_usage() for message in self._emit_queued_messages(self.source): yield self.handle_record_counts(message, stream_message_counter) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py new file mode 100644 index 000000000..1a3c5d298 --- /dev/null +++ b/airbyte_cdk/utils/memory_monitor.py @@ -0,0 +1,133 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +"""Source-side memory introspection to emit controlled error messages before OOM kills.""" + +import logging +from pathlib import Path +from typing import Optional + +from airbyte_cdk.models import FailureType +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + +logger = logging.getLogger("airbyte") + +# cgroup v2 paths +_CGROUP_V2_CURRENT = Path("/sys/fs/cgroup/memory.current") +_CGROUP_V2_MAX = Path("/sys/fs/cgroup/memory.max") + +# cgroup v1 paths +_CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes") +_CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes") + +# Default thresholds +_DEFAULT_WARNING_THRESHOLD = 0.85 +_DEFAULT_CRITICAL_THRESHOLD = 0.95 + +# Check interval (every N messages) +DEFAULT_CHECK_INTERVAL = 1000 + + +class MemoryLimitExceeded(AirbyteTracedException): + """Raised when connector memory usage exceeds critical threshold.""" + + pass + + +class MemoryMonitor: + """Monitors container memory usage via cgroup files and emits warnings before OOM kills. + + On init, probes cgroup v2 then v1 files. Caches which version exists. + If neither is found (local dev / CI), all subsequent calls are instant no-ops. + """ + + def __init__( + self, + warning_threshold: float = _DEFAULT_WARNING_THRESHOLD, + critical_threshold: float = _DEFAULT_CRITICAL_THRESHOLD, + ) -> None: + self._warning_threshold = warning_threshold + self._critical_threshold = critical_threshold + self._warning_emitted = False + self._critical_raised = False + self._cgroup_version: Optional[int] = None + + # Probe cgroup version on init + if _CGROUP_V2_CURRENT.exists() and _CGROUP_V2_MAX.exists(): + self._cgroup_version = 2 + elif _CGROUP_V1_USAGE.exists() and _CGROUP_V1_LIMIT.exists(): + self._cgroup_version = 1 + + if self._cgroup_version is None: + logger.debug("No cgroup memory files found. Memory monitoring disabled (likely local dev / CI).") + + def _read_memory(self) -> Optional[tuple[int, int]]: + """Read current memory usage and limit from cgroup files. + + Returns a tuple of (usage_bytes, limit_bytes) or None if unavailable. + """ + if self._cgroup_version is None: + return None + + if self._cgroup_version == 2: + usage_path = _CGROUP_V2_CURRENT + limit_path = _CGROUP_V2_MAX + else: + usage_path = _CGROUP_V1_USAGE + limit_path = _CGROUP_V1_LIMIT + + usage_text = usage_path.read_text().strip() + limit_text = limit_path.read_text().strip() + + # cgroup v2 memory.max can be the literal string "max" (unlimited) + if limit_text == "max": + return None + + usage_bytes = int(usage_text) + limit_bytes = int(limit_text) + + if limit_bytes <= 0: + return None + + return usage_bytes, limit_bytes + + def check_memory_usage(self) -> None: + """Check memory usage against thresholds. + + At the warning threshold (default 85%), logs a warning message. + At the critical threshold (default 95%), raises MemoryLimitExceeded to + trigger a graceful shutdown with an actionable error message. + + Each threshold triggers at most once per sync to avoid log spam. + This method is a no-op if cgroup files are unavailable. + """ + if self._cgroup_version is None: + return + + memory_info = self._read_memory() + if memory_info is None: + return + + usage_bytes, limit_bytes = memory_info + usage_ratio = usage_bytes / limit_bytes + usage_percent = int(usage_ratio * 100) + + if usage_ratio >= self._critical_threshold and not self._critical_raised: + self._critical_raised = True + raise MemoryLimitExceeded( + internal_message=f"Memory usage is {usage_percent}% ({usage_bytes} / {limit_bytes} bytes). " + f"Critical threshold is {int(self._critical_threshold * 100)}%.", + message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down. " + f"Reduce the number of streams or increase memory allocation.", + failure_type=FailureType.transient_error, + ) + + if usage_ratio >= self._warning_threshold and not self._warning_emitted: + self._warning_emitted = True + logger.warning( + "Source memory usage reached %d%% of container limit (%d / %d bytes).", + usage_percent, + usage_bytes, + limit_bytes, + ) diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py new file mode 100644 index 000000000..5d34387d2 --- /dev/null +++ b/unit_tests/utils/test_memory_monitor.py @@ -0,0 +1,301 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from airbyte_cdk.models import FailureType +from airbyte_cdk.utils.memory_monitor import ( + DEFAULT_CHECK_INTERVAL, + MemoryLimitExceeded, + MemoryMonitor, + _CGROUP_V1_LIMIT, + _CGROUP_V1_USAGE, + _CGROUP_V2_CURRENT, + _CGROUP_V2_MAX, +) + + +class TestMemoryMonitorInit: + """Tests for MemoryMonitor initialization and cgroup detection.""" + + def test_no_cgroup_files_disables_monitoring(self, tmp_path: Path) -> None: + """When no cgroup files exist, monitoring should be disabled (no-op).""" + with patch.object(Path, "exists", return_value=False): + monitor = MemoryMonitor() + assert monitor._cgroup_version is None + + def test_cgroup_v2_detected(self, tmp_path: Path) -> None: + """When cgroup v2 files exist, version should be 2.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor() + assert monitor._cgroup_version == 2 + + def test_cgroup_v1_detected(self, tmp_path: Path) -> None: + """When only cgroup v1 files exist, version should be 1.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V1_USAGE, _CGROUP_V1_LIMIT) + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor() + assert monitor._cgroup_version == 1 + + def test_cgroup_v2_preferred_over_v1(self) -> None: + """When both cgroup v2 and v1 files exist, v2 should be preferred.""" + with patch.object(Path, "exists", return_value=True): + monitor = MemoryMonitor() + assert monitor._cgroup_version == 2 + + +class TestMemoryMonitorCheckMemory: + """Tests for the check_memory_usage method.""" + + def test_noop_when_no_cgroup(self) -> None: + """check_memory_usage should be a no-op when cgroup is unavailable.""" + with patch.object(Path, "exists", return_value=False): + monitor = MemoryMonitor() + # Should not raise + monitor.check_memory_usage() + + def test_noop_when_limit_is_max(self) -> None: + """When cgroup v2 memory.max is 'max' (unlimited), should be a no-op.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return "1000000\n" + if self == _CGROUP_V2_MAX: + return "max\n" + return "" + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor() + + with patch.object(Path, "read_text", mock_read_text): + # Should not raise even though "usage" is technically present + monitor.check_memory_usage() + + def test_no_warning_below_threshold(self) -> None: + """No warning should be emitted when usage is below 85%.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return "500000000\n" # 500MB + if self == _CGROUP_V2_MAX: + return "1000000000\n" # 1GB + return "" + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor() + + with patch.object(Path, "read_text", mock_read_text): + monitor.check_memory_usage() + + assert not monitor._warning_emitted + assert not monitor._critical_raised + + def test_warning_at_85_percent(self) -> None: + """Warning should be emitted at 85% usage.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return "870000000\n" # 870MB = 87% of 1GB + if self == _CGROUP_V2_MAX: + return "1000000000\n" # 1GB + return "" + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor() + + with patch.object(Path, "read_text", mock_read_text): + monitor.check_memory_usage() + + assert monitor._warning_emitted + assert not monitor._critical_raised + + def test_critical_at_95_percent_raises(self) -> None: + """MemoryLimitExceeded should be raised at 95% usage.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return "960000000\n" # 960MB = 96% of 1GB + if self == _CGROUP_V2_MAX: + return "1000000000\n" # 1GB + return "" + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor() + + with patch.object(Path, "read_text", mock_read_text): + with pytest.raises(MemoryLimitExceeded) as exc_info: + monitor.check_memory_usage() + + assert exc_info.value.failure_type == FailureType.transient_error + assert "96%" in (exc_info.value.message or "") + assert "shut down" in (exc_info.value.message or "") + + def test_warning_emitted_only_once(self) -> None: + """Warning should only be emitted once even if called multiple times.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return "870000000\n" # 87% + if self == _CGROUP_V2_MAX: + return "1000000000\n" + return "" + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor() + + with patch.object(Path, "read_text", mock_read_text): + monitor.check_memory_usage() + assert monitor._warning_emitted + + # Call again — should not error or re-emit + monitor.check_memory_usage() + assert monitor._warning_emitted + + def test_critical_raised_only_once(self) -> None: + """MemoryLimitExceeded should only be raised once.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return "960000000\n" + if self == _CGROUP_V2_MAX: + return "1000000000\n" + return "" + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor() + + with patch.object(Path, "read_text", mock_read_text): + with pytest.raises(MemoryLimitExceeded): + monitor.check_memory_usage() + + # Second call should NOT raise again + monitor.check_memory_usage() + + def test_cgroup_v1_reading(self) -> None: + """Memory reading should work with cgroup v1 paths.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V1_USAGE, _CGROUP_V1_LIMIT) + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V1_USAGE: + return "870000000\n" # 87% + if self == _CGROUP_V1_LIMIT: + return "1000000000\n" + return "" + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor() + assert monitor._cgroup_version == 1 + + with patch.object(Path, "read_text", mock_read_text): + monitor.check_memory_usage() + + assert monitor._warning_emitted + + def test_custom_thresholds_warning(self) -> None: + """Custom warning threshold should be respected.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return "750000000\n" # 75% + if self == _CGROUP_V2_MAX: + return "1000000000\n" + return "" + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor(warning_threshold=0.70, critical_threshold=0.90) + + with patch.object(Path, "read_text", mock_read_text): + monitor.check_memory_usage() + + assert monitor._warning_emitted + assert not monitor._critical_raised + + def test_custom_thresholds_critical(self) -> None: + """Custom critical threshold should be respected.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return "850000000\n" # 85% + if self == _CGROUP_V2_MAX: + return "1000000000\n" + return "" + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor(warning_threshold=0.70, critical_threshold=0.80) + + with patch.object(Path, "read_text", mock_read_text): + with pytest.raises(MemoryLimitExceeded): + monitor.check_memory_usage() + + +class TestMemoryLimitExceeded: + """Tests for the MemoryLimitExceeded exception.""" + + def test_is_airbyte_traced_exception(self) -> None: + """MemoryLimitExceeded should be a subclass of AirbyteTracedException.""" + from airbyte_cdk.utils.traced_exception import AirbyteTracedException + + exc = MemoryLimitExceeded( + internal_message="test", + message="test message", + failure_type=FailureType.transient_error, + ) + assert isinstance(exc, AirbyteTracedException) + + def test_default_attributes(self) -> None: + """MemoryLimitExceeded should have correct default attributes.""" + exc = MemoryLimitExceeded( + internal_message="Memory at 96%", + message="Source exceeded memory limit.", + failure_type=FailureType.transient_error, + ) + assert exc.failure_type == FailureType.transient_error + assert exc.message == "Source exceeded memory limit." + assert exc.internal_message == "Memory at 96%" + + +class TestDefaultCheckInterval: + """Tests for the DEFAULT_CHECK_INTERVAL constant.""" + + def test_check_interval_is_positive(self) -> None: + assert DEFAULT_CHECK_INTERVAL > 0 + + def test_check_interval_value(self) -> None: + assert DEFAULT_CHECK_INTERVAL == 1000 From 8d059cff57f58c9a584332daeed9e7406a869690 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 21:27:40 +0000 Subject: [PATCH 02/16] style: fix ruff format and import sorting Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 4 +++- unit_tests/utils/test_memory_monitor.py | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 1a3c5d298..9527eca06 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -60,7 +60,9 @@ def __init__( self._cgroup_version = 1 if self._cgroup_version is None: - logger.debug("No cgroup memory files found. Memory monitoring disabled (likely local dev / CI).") + logger.debug( + "No cgroup memory files found. Memory monitoring disabled (likely local dev / CI)." + ) def _read_memory(self) -> Optional[tuple[int, int]]: """Read current memory usage and limit from cgroup files. diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 5d34387d2..8053be409 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -9,13 +9,13 @@ from airbyte_cdk.models import FailureType from airbyte_cdk.utils.memory_monitor import ( - DEFAULT_CHECK_INTERVAL, - MemoryLimitExceeded, - MemoryMonitor, _CGROUP_V1_LIMIT, _CGROUP_V1_USAGE, _CGROUP_V2_CURRENT, _CGROUP_V2_MAX, + DEFAULT_CHECK_INTERVAL, + MemoryLimitExceeded, + MemoryMonitor, ) From b2e233f941c131537e13a80261d3c793500a4fa6 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 21:48:23 +0000 Subject: [PATCH 03/16] fix(cdk): add error handling to _read_memory() for graceful degradation Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 35 ++++++++++++----------- unit_tests/utils/test_memory_monitor.py | 38 +++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 16 deletions(-) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 9527eca06..ae076360a 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -68,32 +68,35 @@ def _read_memory(self) -> Optional[tuple[int, int]]: """Read current memory usage and limit from cgroup files. Returns a tuple of (usage_bytes, limit_bytes) or None if unavailable. + Best-effort: failures to read memory info never crash a sync. """ if self._cgroup_version is None: return None - if self._cgroup_version == 2: - usage_path = _CGROUP_V2_CURRENT - limit_path = _CGROUP_V2_MAX - else: - usage_path = _CGROUP_V1_USAGE - limit_path = _CGROUP_V1_LIMIT + try: + if self._cgroup_version == 2: + usage_path = _CGROUP_V2_CURRENT + limit_path = _CGROUP_V2_MAX + else: + usage_path = _CGROUP_V1_USAGE + limit_path = _CGROUP_V1_LIMIT - usage_text = usage_path.read_text().strip() - limit_text = limit_path.read_text().strip() + limit_text = limit_path.read_text().strip() + # cgroup v2 memory.max can be the literal string "max" (unlimited) + if limit_text == "max": + return None - # cgroup v2 memory.max can be the literal string "max" (unlimited) - if limit_text == "max": - return None + usage_bytes = int(usage_path.read_text().strip()) + limit_bytes = int(limit_text) - usage_bytes = int(usage_text) - limit_bytes = int(limit_text) + if limit_bytes <= 0: + return None - if limit_bytes <= 0: + return usage_bytes, limit_bytes + except (OSError, ValueError): + logger.debug("Failed to read cgroup memory files; skipping memory check.") return None - return usage_bytes, limit_bytes - def check_memory_usage(self) -> None: """Check memory usage against thresholds. diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 8053be409..bd43a4494 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -264,6 +264,44 @@ def mock_read_text(self: Path) -> str: with pytest.raises(MemoryLimitExceeded): monitor.check_memory_usage() + def test_malformed_cgroup_file_degrades_gracefully(self) -> None: + """Malformed cgroup files should not crash the sync.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + def mock_read_text(self: Path) -> str: + return "not_a_number\n" + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor() + + with patch.object(Path, "read_text", mock_read_text): + # Should not raise — degrades gracefully + monitor.check_memory_usage() + + assert not monitor._warning_emitted + assert not monitor._critical_raised + + def test_os_error_degrades_gracefully(self) -> None: + """OSError reading cgroup files should not crash the sync.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + def mock_read_text(self: Path) -> str: + raise OSError("Permission denied") + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor() + + with patch.object(Path, "read_text", mock_read_text): + # Should not raise — degrades gracefully + monitor.check_memory_usage() + + assert not monitor._warning_emitted + assert not monitor._critical_raised + class TestMemoryLimitExceeded: """Tests for the MemoryLimitExceeded exception.""" From ee935659b245dea43f183476e2e3585a0ec55007 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 21:51:01 +0000 Subject: [PATCH 04/16] fix(cdk): wrap read() loop in try/finally to flush queued messages on shutdown Co-Authored-By: bot_apk --- airbyte_cdk/entrypoint.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 3b6e4a8f5..7926a71c9 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -280,13 +280,15 @@ def read( stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) memory_monitor = MemoryMonitor() message_count = 0 - for message in self.source.read(self.logger, config, catalog, state): - yield self.handle_record_counts(message, stream_message_counter) - message_count += 1 - if message_count % DEFAULT_CHECK_INTERVAL == 0: - memory_monitor.check_memory_usage() - for message in self._emit_queued_messages(self.source): - yield self.handle_record_counts(message, stream_message_counter) + try: + for message in self.source.read(self.logger, config, catalog, state): + yield self.handle_record_counts(message, stream_message_counter) + message_count += 1 + if message_count % DEFAULT_CHECK_INTERVAL == 0: + memory_monitor.check_memory_usage() + finally: + for message in self._emit_queued_messages(self.source): + yield self.handle_record_counts(message, stream_message_counter) @staticmethod def handle_record_counts( From cca6c50e6c4fd301a259e531bb19f0a160edc6db Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 21:58:35 +0000 Subject: [PATCH 05/16] refactor(cdk): encapsulate check interval inside MemoryMonitor Move message counting and modulo logic from the entrypoint into MemoryMonitor.check_memory_usage(). The caller now simply calls check_memory_usage() on every message and the monitor decides internally whether to read cgroup files based on check_interval. This removes the coupling between the entrypoint and the monitor's internal check cadence. Co-Authored-By: bot_apk --- airbyte_cdk/entrypoint.py | 7 +--- airbyte_cdk/utils/memory_monitor.py | 11 +++++ unit_tests/utils/test_memory_monitor.py | 56 ++++++++++++++++++++----- 3 files changed, 58 insertions(+), 16 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 7926a71c9..fbbbceffa 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -41,7 +41,7 @@ from airbyte_cdk.utils import is_cloud_environment, message_utils from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH -from airbyte_cdk.utils.memory_monitor import DEFAULT_CHECK_INTERVAL, MemoryMonitor +from airbyte_cdk.utils.memory_monitor import MemoryMonitor from airbyte_cdk.utils.traced_exception import AirbyteTracedException logger = init_logger("airbyte") @@ -279,13 +279,10 @@ def read( # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) memory_monitor = MemoryMonitor() - message_count = 0 try: for message in self.source.read(self.logger, config, catalog, state): yield self.handle_record_counts(message, stream_message_counter) - message_count += 1 - if message_count % DEFAULT_CHECK_INTERVAL == 0: - memory_monitor.check_memory_usage() + memory_monitor.check_memory_usage() finally: for message in self._emit_queued_messages(self.source): yield self.handle_record_counts(message, stream_message_counter) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index ae076360a..3ccfcbe36 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -46,9 +46,12 @@ def __init__( self, warning_threshold: float = _DEFAULT_WARNING_THRESHOLD, critical_threshold: float = _DEFAULT_CRITICAL_THRESHOLD, + check_interval: int = DEFAULT_CHECK_INTERVAL, ) -> None: self._warning_threshold = warning_threshold self._critical_threshold = critical_threshold + self._check_interval = check_interval + self._message_count = 0 self._warning_emitted = False self._critical_raised = False self._cgroup_version: Optional[int] = None @@ -100,6 +103,10 @@ def _read_memory(self) -> Optional[tuple[int, int]]: def check_memory_usage(self) -> None: """Check memory usage against thresholds. + Intended to be called on every message. The monitor internally tracks + a message counter and only reads cgroup files every ``check_interval`` + messages (default 1000) to minimise I/O overhead. + At the warning threshold (default 85%), logs a warning message. At the critical threshold (default 95%), raises MemoryLimitExceeded to trigger a graceful shutdown with an actionable error message. @@ -110,6 +117,10 @@ def check_memory_usage(self) -> None: if self._cgroup_version is None: return + self._message_count += 1 + if self._message_count % self._check_interval != 0: + return + memory_info = self._read_memory() if memory_info is None: return diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index bd43a4494..7df601593 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -79,7 +79,7 @@ def mock_read_text(self: Path) -> str: return "" with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=1) with patch.object(Path, "read_text", mock_read_text): # Should not raise even though "usage" is technically present @@ -99,7 +99,7 @@ def mock_read_text(self: Path) -> str: return "" with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=1) with patch.object(Path, "read_text", mock_read_text): monitor.check_memory_usage() @@ -121,7 +121,7 @@ def mock_read_text(self: Path) -> str: return "" with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=1) with patch.object(Path, "read_text", mock_read_text): monitor.check_memory_usage() @@ -143,7 +143,7 @@ def mock_read_text(self: Path) -> str: return "" with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=1) with patch.object(Path, "read_text", mock_read_text): with pytest.raises(MemoryLimitExceeded) as exc_info: @@ -167,7 +167,7 @@ def mock_read_text(self: Path) -> str: return "" with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=1) with patch.object(Path, "read_text", mock_read_text): monitor.check_memory_usage() @@ -191,7 +191,7 @@ def mock_read_text(self: Path) -> str: return "" with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=1) with patch.object(Path, "read_text", mock_read_text): with pytest.raises(MemoryLimitExceeded): @@ -214,7 +214,7 @@ def mock_read_text(self: Path) -> str: return "" with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=1) assert monitor._cgroup_version == 1 with patch.object(Path, "read_text", mock_read_text): @@ -222,6 +222,32 @@ def mock_read_text(self: Path) -> str: assert monitor._warning_emitted + def test_check_interval_skips_intermediate_calls(self) -> None: + """Monitor should only check cgroup files every check_interval messages.""" + + def mock_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return "870000000\n" # 87% + if self == _CGROUP_V2_MAX: + return "1000000000\n" + return "" + + with patch.object(Path, "exists", mock_exists): + monitor = MemoryMonitor(check_interval=3) + + with patch.object(Path, "read_text", mock_read_text): + # Calls 1 and 2 should be skipped + monitor.check_memory_usage() + assert not monitor._warning_emitted + monitor.check_memory_usage() + assert not monitor._warning_emitted + # Call 3 should trigger the actual check + monitor.check_memory_usage() + assert monitor._warning_emitted + def test_custom_thresholds_warning(self) -> None: """Custom warning threshold should be respected.""" @@ -236,7 +262,11 @@ def mock_read_text(self: Path) -> str: return "" with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(warning_threshold=0.70, critical_threshold=0.90) + monitor = MemoryMonitor( + warning_threshold=0.70, + critical_threshold=0.90, + check_interval=1, + ) with patch.object(Path, "read_text", mock_read_text): monitor.check_memory_usage() @@ -258,7 +288,11 @@ def mock_read_text(self: Path) -> str: return "" with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(warning_threshold=0.70, critical_threshold=0.80) + monitor = MemoryMonitor( + warning_threshold=0.70, + critical_threshold=0.80, + check_interval=1, + ) with patch.object(Path, "read_text", mock_read_text): with pytest.raises(MemoryLimitExceeded): @@ -274,7 +308,7 @@ def mock_read_text(self: Path) -> str: return "not_a_number\n" with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=1) with patch.object(Path, "read_text", mock_read_text): # Should not raise — degrades gracefully @@ -293,7 +327,7 @@ def mock_read_text(self: Path) -> str: raise OSError("Permission denied") with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=1) with patch.object(Path, "read_text", mock_read_text): # Should not raise — degrades gracefully From ec569439173461efef663296a9fc8a72917f4bfc Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 22:02:17 +0000 Subject: [PATCH 06/16] refactor(cdk): move MemoryMonitor to AirbyteEntrypoint.__init__ Instantiate MemoryMonitor in __init__ so it persists for the lifetime of the entrypoint instance. This makes it available as self._memory_monitor for read() and any future command that does significant work (e.g. check). Also avoids resetting warning/critical flags if read() were ever called more than once. Co-Authored-By: bot_apk --- airbyte_cdk/entrypoint.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index fbbbceffa..3b2c771ac 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -61,6 +61,7 @@ def __init__(self, source: Source): self.source = source self.logger = logging.getLogger(f"airbyte.{getattr(source, 'name', '')}") + self._memory_monitor = MemoryMonitor() @staticmethod def parse_args(args: List[str]) -> argparse.Namespace: @@ -278,11 +279,10 @@ def read( # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) - memory_monitor = MemoryMonitor() try: for message in self.source.read(self.logger, config, catalog, state): yield self.handle_record_counts(message, stream_message_counter) - memory_monitor.check_memory_usage() + self._memory_monitor.check_memory_usage() finally: for message in self._emit_queued_messages(self.source): yield self.handle_record_counts(message, stream_message_counter) From 7f7ab40b3038e8e819a536cf3e0ace1d021e7f3b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 22:18:43 +0000 Subject: [PATCH 07/16] test(cdk): add comprehensive memory monitor tests and graceful shutdown integration test - Complete test_memory_monitor.py with 24 tests across 4 test classes - TestMemoryMonitorInit: cgroup v1/v2 detection, lazy-init verification - TestMemoryMonitorCheckMemory: thresholds, error degradation, intervals, limit_bytes==0 - TestMemoryLimitExceeded: exception type and attribute validation - TestDefaultCheckInterval: constant value verification - Add test_memory_limit_exceeded_flushes_queued_messages to test_entrypoint.py verifying that try/finally in read() flushes queued state messages even when MemoryLimitExceeded propagates Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 16 +- unit_tests/test_entrypoint.py | 93 +++++++ unit_tests/utils/test_memory_monitor.py | 312 +++++++++++++----------- 3 files changed, 277 insertions(+), 144 deletions(-) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 3ccfcbe36..9acaeaae1 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -38,7 +38,8 @@ class MemoryLimitExceeded(AirbyteTracedException): class MemoryMonitor: """Monitors container memory usage via cgroup files and emits warnings before OOM kills. - On init, probes cgroup v2 then v1 files. Caches which version exists. + Lazily probes cgroup v2 then v1 files on the first call to + ``check_memory_usage()``. Caches which version exists. If neither is found (local dev / CI), all subsequent calls are instant no-ops. """ @@ -55,8 +56,18 @@ def __init__( self._warning_emitted = False self._critical_raised = False self._cgroup_version: Optional[int] = None + self._probed = False + + def _probe_cgroup(self) -> None: + """Detect which cgroup version (if any) is available. + + Called lazily on the first ``check_memory_usage()`` invocation so + that ``spec`` and ``discover`` commands never incur filesystem I/O. + """ + if self._probed: + return + self._probed = True - # Probe cgroup version on init if _CGROUP_V2_CURRENT.exists() and _CGROUP_V2_MAX.exists(): self._cgroup_version = 2 elif _CGROUP_V1_USAGE.exists() and _CGROUP_V1_LIMIT.exists(): @@ -114,6 +125,7 @@ def check_memory_usage(self) -> None: Each threshold triggers at most once per sync to avoid log spam. This method is a no-op if cgroup files are unavailable. """ + self._probe_cgroup() if self._cgroup_version is None: return diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 520131881..9d10888ac 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -835,6 +835,99 @@ def test_handle_record_counts( ) +def test_memory_limit_exceeded_flushes_queued_messages(mocker, spec_mock, config_mock): + """When MemoryLimitExceeded is raised mid-read, queued messages should still be flushed. + + The read() try/finally ensures _emit_queued_messages runs even when + MemoryLimitExceeded propagates. The exception still surfaces to the + caller, but all messages yielded before (records) and during (finally- + block state messages) the exception are available to the consumer. + """ + queued_state = AirbyteMessage( + type=Type.STATE, + state=AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="stream"), + stream_state=AirbyteStateBlob(updated_at="2026-01-01"), + ), + ), + ) + message_repository = MagicMock() + # consume_queue calls: + # 1. run() preamble → initial queued control message + # 2. read() finally block → queued state (the key assertion) + # 3. run() outer finally → nothing + message_repository.consume_queue.side_effect = [ + [MESSAGE_FROM_REPOSITORY], + [queued_state], + [], + ] + mocker.patch.object( + MockSource, + "message_repository", + new_callable=mocker.PropertyMock, + return_value=message_repository, + ) + entrypoint = AirbyteEntrypoint(MockSource()) + + record = AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage(stream="stream", data={"id": "1"}, emitted_at=1), + ) + mocker.patch.object(MockSource, "read_state", return_value={}) + mocker.patch.object(MockSource, "read_catalog", return_value={}) + mocker.patch.object(MockSource, "read", return_value=[record, record]) + + from airbyte_cdk.utils.memory_monitor import MemoryLimitExceeded + + call_count = 0 + + def _raise_on_second_call() -> None: + nonlocal call_count + call_count += 1 + if call_count >= 2: + raise MemoryLimitExceeded( + internal_message="Memory at 96%", + message="Source exceeded memory limit (96% used) and must shut down. " + "Reduce the number of streams or increase memory allocation.", + failure_type=FailureType.transient_error, + ) + + mocker.patch.object( + entrypoint._memory_monitor, "check_memory_usage", side_effect=_raise_on_second_call + ) + + parsed_args = Namespace( + command="read", config="config_path", state="statepath", catalog="catalogpath" + ) + + # The generator yields messages until MemoryLimitExceeded propagates. + # Collect everything yielded before the exception surfaces. + messages: list[str] = [] + with pytest.raises(MemoryLimitExceeded): + for msg in entrypoint.run(parsed_args): + messages.append(msg) + + # 1. The first record was yielded before the exception + record_messages = [m for m in messages if "RECORD" in m] + assert len(record_messages) >= 1, ( + "At least the first record should be yielded before MemoryLimitExceeded" + ) + + # 2. The queued state message was flushed by the finally block + state_messages = [m for m in messages if "STATE" in m] + assert len(state_messages) >= 1, ( + "Queued state message should be flushed even after MemoryLimitExceeded" + ) + + # 3. The flushed state has sourceStats.recordCount set by handle_record_counts. + # Both records are yielded (and counted) before the second check_memory_usage + # raises, so the counter is 2.0 at flush time. + state_json = orjson.loads(state_messages[0]) + assert state_json["state"]["sourceStats"]["recordCount"] == 2.0 + + def test_given_serialization_error_using_orjson_then_fallback_on_json( entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock ): diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 7df601593..f565c6572 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -18,40 +18,73 @@ MemoryMonitor, ) +_MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB +_MOCK_USAGE_WARNING = "870000000\n" # 87% of 1 GB +_MOCK_USAGE_CRITICAL = "960000000\n" # 96% of 1 GB +_MOCK_LIMIT = "1000000000\n" # 1 GB + + +def _v2_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + +def _v1_exists(self: Path) -> bool: + return self in (_CGROUP_V1_USAGE, _CGROUP_V1_LIMIT) + class TestMemoryMonitorInit: - """Tests for MemoryMonitor initialization and cgroup detection.""" + """Tests for MemoryMonitor initialization and lazy cgroup detection.""" - def test_no_cgroup_files_disables_monitoring(self, tmp_path: Path) -> None: + def test_no_cgroup_files_disables_monitoring(self) -> None: """When no cgroup files exist, monitoring should be disabled (no-op).""" + monitor = MemoryMonitor() with patch.object(Path, "exists", return_value=False): - monitor = MemoryMonitor() + monitor.check_memory_usage() assert monitor._cgroup_version is None - def test_cgroup_v2_detected(self, tmp_path: Path) -> None: + def test_cgroup_v2_detected(self) -> None: """When cgroup v2 files exist, version should be 2.""" - - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=2) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", return_value=_MOCK_USAGE_BELOW), + ): + monitor.check_memory_usage() assert monitor._cgroup_version == 2 - def test_cgroup_v1_detected(self, tmp_path: Path) -> None: + def test_cgroup_v1_detected(self) -> None: """When only cgroup v1 files exist, version should be 1.""" - - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V1_USAGE, _CGROUP_V1_LIMIT) - - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=2) + with ( + patch.object(Path, "exists", _v1_exists), + patch.object(Path, "read_text", return_value=_MOCK_USAGE_BELOW), + ): + monitor.check_memory_usage() assert monitor._cgroup_version == 1 def test_cgroup_v2_preferred_over_v1(self) -> None: """When both cgroup v2 and v1 files exist, v2 should be preferred.""" - with patch.object(Path, "exists", return_value=True): - monitor = MemoryMonitor() + monitor = MemoryMonitor(check_interval=2) + with ( + patch.object(Path, "exists", return_value=True), + patch.object(Path, "read_text", return_value=_MOCK_USAGE_BELOW), + ): + monitor.check_memory_usage() + assert monitor._cgroup_version == 2 + + def test_lazy_probe_not_called_until_check(self) -> None: + """Cgroup probing should not happen during __init__, only on first check_memory_usage().""" + monitor = MemoryMonitor() + assert not monitor._probed + assert monitor._cgroup_version is None + + with ( + patch.object(Path, "exists", return_value=True), + patch.object(Path, "read_text", return_value=_MOCK_USAGE_BELOW), + ): + monitor.check_memory_usage() + + assert monitor._probed assert monitor._cgroup_version == 2 @@ -60,17 +93,13 @@ class TestMemoryMonitorCheckMemory: def test_noop_when_no_cgroup(self) -> None: """check_memory_usage should be a no-op when cgroup is unavailable.""" + monitor = MemoryMonitor() with patch.object(Path, "exists", return_value=False): - monitor = MemoryMonitor() - # Should not raise - monitor.check_memory_usage() + monitor.check_memory_usage() def test_noop_when_limit_is_max(self) -> None: """When cgroup v2 memory.max is 'max' (unlimited), should be a no-op.""" - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: return "1000000\n" @@ -78,30 +107,28 @@ def mock_read_text(self: Path) -> str: return "max\n" return "" - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(check_interval=1) - - with patch.object(Path, "read_text", mock_read_text): - # Should not raise even though "usage" is technically present + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): monitor.check_memory_usage() def test_no_warning_below_threshold(self) -> None: """No warning should be emitted when usage is below 85%.""" - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return "500000000\n" # 500MB + return _MOCK_USAGE_BELOW if self == _CGROUP_V2_MAX: - return "1000000000\n" # 1GB + return _MOCK_LIMIT return "" - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(check_interval=1) - - with patch.object(Path, "read_text", mock_read_text): + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): monitor.check_memory_usage() assert not monitor._warning_emitted @@ -110,20 +137,18 @@ def mock_read_text(self: Path) -> str: def test_warning_at_85_percent(self) -> None: """Warning should be emitted at 85% usage.""" - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return "870000000\n" # 870MB = 87% of 1GB + return _MOCK_USAGE_WARNING if self == _CGROUP_V2_MAX: - return "1000000000\n" # 1GB + return _MOCK_LIMIT return "" - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(check_interval=1) - - with patch.object(Path, "read_text", mock_read_text): + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): monitor.check_memory_usage() assert monitor._warning_emitted @@ -132,114 +157,99 @@ def mock_read_text(self: Path) -> str: def test_critical_at_95_percent_raises(self) -> None: """MemoryLimitExceeded should be raised at 95% usage.""" - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return "960000000\n" # 960MB = 96% of 1GB + return _MOCK_USAGE_CRITICAL if self == _CGROUP_V2_MAX: - return "1000000000\n" # 1GB + return _MOCK_LIMIT return "" - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(check_interval=1) - - with patch.object(Path, "read_text", mock_read_text): + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): with pytest.raises(MemoryLimitExceeded) as exc_info: monitor.check_memory_usage() assert exc_info.value.failure_type == FailureType.transient_error assert "96%" in (exc_info.value.message or "") - assert "shut down" in (exc_info.value.message or "") def test_warning_emitted_only_once(self) -> None: """Warning should only be emitted once even if called multiple times.""" - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return "870000000\n" # 87% + return _MOCK_USAGE_WARNING if self == _CGROUP_V2_MAX: - return "1000000000\n" + return _MOCK_LIMIT return "" - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(check_interval=1) - - with patch.object(Path, "read_text", mock_read_text): + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): monitor.check_memory_usage() assert monitor._warning_emitted - - # Call again — should not error or re-emit monitor.check_memory_usage() assert monitor._warning_emitted def test_critical_raised_only_once(self) -> None: """MemoryLimitExceeded should only be raised once.""" - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return "960000000\n" + return _MOCK_USAGE_CRITICAL if self == _CGROUP_V2_MAX: - return "1000000000\n" + return _MOCK_LIMIT return "" - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(check_interval=1) - - with patch.object(Path, "read_text", mock_read_text): + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): with pytest.raises(MemoryLimitExceeded): monitor.check_memory_usage() - # Second call should NOT raise again monitor.check_memory_usage() def test_cgroup_v1_reading(self) -> None: """Memory reading should work with cgroup v1 paths.""" - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V1_USAGE, _CGROUP_V1_LIMIT) - def mock_read_text(self: Path) -> str: if self == _CGROUP_V1_USAGE: - return "870000000\n" # 87% + return _MOCK_USAGE_WARNING if self == _CGROUP_V1_LIMIT: - return "1000000000\n" + return _MOCK_LIMIT return "" - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(check_interval=1) - assert monitor._cgroup_version == 1 - - with patch.object(Path, "read_text", mock_read_text): + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v1_exists), + patch.object(Path, "read_text", mock_read_text), + ): monitor.check_memory_usage() + assert monitor._cgroup_version == 1 assert monitor._warning_emitted def test_check_interval_skips_intermediate_calls(self) -> None: """Monitor should only check cgroup files every check_interval messages.""" - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return "870000000\n" # 87% + return _MOCK_USAGE_WARNING if self == _CGROUP_V2_MAX: - return "1000000000\n" + return _MOCK_LIMIT return "" - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(check_interval=3) - - with patch.object(Path, "read_text", mock_read_text): - # Calls 1 and 2 should be skipped + monitor = MemoryMonitor(check_interval=3) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): monitor.check_memory_usage() assert not monitor._warning_emitted monitor.check_memory_usage() @@ -251,24 +261,22 @@ def mock_read_text(self: Path) -> str: def test_custom_thresholds_warning(self) -> None: """Custom warning threshold should be respected.""" - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return "750000000\n" # 75% + return "750000000\n" if self == _CGROUP_V2_MAX: - return "1000000000\n" + return _MOCK_LIMIT return "" - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor( - warning_threshold=0.70, - critical_threshold=0.90, - check_interval=1, - ) - - with patch.object(Path, "read_text", mock_read_text): + monitor = MemoryMonitor( + warning_threshold=0.70, + critical_threshold=0.90, + check_interval=1, + ) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): monitor.check_memory_usage() assert monitor._warning_emitted @@ -277,41 +285,44 @@ def mock_read_text(self: Path) -> str: def test_custom_thresholds_critical(self) -> None: """Custom critical threshold should be respected.""" - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return "850000000\n" # 85% + return "850000000\n" if self == _CGROUP_V2_MAX: - return "1000000000\n" + return _MOCK_LIMIT return "" - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor( - warning_threshold=0.70, - critical_threshold=0.80, - check_interval=1, - ) - - with patch.object(Path, "read_text", mock_read_text): + monitor = MemoryMonitor( + warning_threshold=0.70, + critical_threshold=0.80, + check_interval=1, + ) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): with pytest.raises(MemoryLimitExceeded): monitor.check_memory_usage() def test_malformed_cgroup_file_degrades_gracefully(self) -> None: """Malformed cgroup files should not crash the sync.""" + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", return_value="not_a_number\n"), + ): + monitor.check_memory_usage() - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - - def mock_read_text(self: Path) -> str: - return "not_a_number\n" - - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(check_interval=1) + assert not monitor._warning_emitted + assert not monitor._critical_raised - with patch.object(Path, "read_text", mock_read_text): - # Should not raise — degrades gracefully + def test_empty_cgroup_file_degrades_gracefully(self) -> None: + """Empty cgroup file content should not crash the sync.""" + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", return_value=""), + ): monitor.check_memory_usage() assert not monitor._warning_emitted @@ -320,17 +331,34 @@ def mock_read_text(self: Path) -> str: def test_os_error_degrades_gracefully(self) -> None: """OSError reading cgroup files should not crash the sync.""" - def mock_exists(self: Path) -> bool: - return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) - def mock_read_text(self: Path) -> str: raise OSError("Permission denied") - with patch.object(Path, "exists", mock_exists): - monitor = MemoryMonitor(check_interval=1) + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): + monitor.check_memory_usage() + + assert not monitor._warning_emitted + assert not monitor._critical_raised + + def test_limit_bytes_zero_is_noop(self) -> None: + """When cgroup limit file contains '0', should be a no-op.""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return _MOCK_USAGE_BELOW + if self == _CGROUP_V2_MAX: + return "0\n" + return "" - with patch.object(Path, "read_text", mock_read_text): - # Should not raise — degrades gracefully + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): monitor.check_memory_usage() assert not monitor._warning_emitted From a021eb7dda7f917ba43da5a1fc442f4ba13ed38e Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 22:26:19 +0000 Subject: [PATCH 08/16] fix(cdk): change MemoryLimitExceeded to system_error and update user-facing message Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 5 ++--- unit_tests/test_entrypoint.py | 5 ++--- unit_tests/utils/test_memory_monitor.py | 8 ++++---- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 9acaeaae1..ec60892fb 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -146,9 +146,8 @@ def check_memory_usage(self) -> None: raise MemoryLimitExceeded( internal_message=f"Memory usage is {usage_percent}% ({usage_bytes} / {limit_bytes} bytes). " f"Critical threshold is {int(self._critical_threshold * 100)}%.", - message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down. " - f"Reduce the number of streams or increase memory allocation.", - failure_type=FailureType.transient_error, + message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash.", + failure_type=FailureType.system_error, ) if usage_ratio >= self._warning_threshold and not self._warning_emitted: diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 9d10888ac..5a331f762 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -889,9 +889,8 @@ def _raise_on_second_call() -> None: if call_count >= 2: raise MemoryLimitExceeded( internal_message="Memory at 96%", - message="Source exceeded memory limit (96% used) and must shut down. " - "Reduce the number of streams or increase memory allocation.", - failure_type=FailureType.transient_error, + message="Source exceeded memory limit (96% used) and must shut down to avoid an out-of-memory crash.", + failure_type=FailureType.system_error, ) mocker.patch.object( diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index f565c6572..66887ab22 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -172,7 +172,7 @@ def mock_read_text(self: Path) -> str: with pytest.raises(MemoryLimitExceeded) as exc_info: monitor.check_memory_usage() - assert exc_info.value.failure_type == FailureType.transient_error + assert exc_info.value.failure_type == FailureType.system_error assert "96%" in (exc_info.value.message or "") def test_warning_emitted_only_once(self) -> None: @@ -375,7 +375,7 @@ def test_is_airbyte_traced_exception(self) -> None: exc = MemoryLimitExceeded( internal_message="test", message="test message", - failure_type=FailureType.transient_error, + failure_type=FailureType.system_error, ) assert isinstance(exc, AirbyteTracedException) @@ -384,9 +384,9 @@ def test_default_attributes(self) -> None: exc = MemoryLimitExceeded( internal_message="Memory at 96%", message="Source exceeded memory limit.", - failure_type=FailureType.transient_error, + failure_type=FailureType.system_error, ) - assert exc.failure_type == FailureType.transient_error + assert exc.failure_type == FailureType.system_error assert exc.message == "Source exceeded memory limit." assert exc.internal_message == "Memory at 96%" From fc71ababc4141d9b753af953e58e80590c1719a1 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 22:28:27 +0000 Subject: [PATCH 09/16] refactor(cdk): make DEFAULT_CHECK_INTERVAL private and drop circular tests Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 4 ++-- unit_tests/utils/test_memory_monitor.py | 11 ----------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index ec60892fb..027efe906 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -26,7 +26,7 @@ _DEFAULT_CRITICAL_THRESHOLD = 0.95 # Check interval (every N messages) -DEFAULT_CHECK_INTERVAL = 1000 +_DEFAULT_CHECK_INTERVAL = 1000 class MemoryLimitExceeded(AirbyteTracedException): @@ -47,7 +47,7 @@ def __init__( self, warning_threshold: float = _DEFAULT_WARNING_THRESHOLD, critical_threshold: float = _DEFAULT_CRITICAL_THRESHOLD, - check_interval: int = DEFAULT_CHECK_INTERVAL, + check_interval: int = _DEFAULT_CHECK_INTERVAL, ) -> None: self._warning_threshold = warning_threshold self._critical_threshold = critical_threshold diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 66887ab22..62eefea0f 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -13,7 +13,6 @@ _CGROUP_V1_USAGE, _CGROUP_V2_CURRENT, _CGROUP_V2_MAX, - DEFAULT_CHECK_INTERVAL, MemoryLimitExceeded, MemoryMonitor, ) @@ -389,13 +388,3 @@ def test_default_attributes(self) -> None: assert exc.failure_type == FailureType.system_error assert exc.message == "Source exceeded memory limit." assert exc.internal_message == "Memory at 96%" - - -class TestDefaultCheckInterval: - """Tests for the DEFAULT_CHECK_INTERVAL constant.""" - - def test_check_interval_is_positive(self) -> None: - assert DEFAULT_CHECK_INTERVAL > 0 - - def test_check_interval_value(self) -> None: - assert DEFAULT_CHECK_INTERVAL == 1000 From 5f7e16d539155ed8ef3dd7d52cba67a6cf282cec Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 22:36:40 +0000 Subject: [PATCH 10/16] refactor(cdk): move memory check before yield, test observable behavior via caplog Co-Authored-By: bot_apk --- airbyte_cdk/entrypoint.py | 2 +- unit_tests/test_entrypoint.py | 10 +- unit_tests/utils/test_memory_monitor.py | 626 +++++++++++------------- 3 files changed, 287 insertions(+), 351 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 3b2c771ac..ad290ebce 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -281,8 +281,8 @@ def read( stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) try: for message in self.source.read(self.logger, config, catalog, state): - yield self.handle_record_counts(message, stream_message_counter) self._memory_monitor.check_memory_usage() + yield self.handle_record_counts(message, stream_message_counter) finally: for message in self._emit_queued_messages(self.source): yield self.handle_record_counts(message, stream_message_counter) diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 5a331f762..7cd51fdbb 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -908,7 +908,8 @@ def _raise_on_second_call() -> None: for msg in entrypoint.run(parsed_args): messages.append(msg) - # 1. The first record was yielded before the exception + # 1. Only the first record was yielded; the second triggered the exception + # before its yield so it is handled in the finally block instead. record_messages = [m for m in messages if "RECORD" in m] assert len(record_messages) >= 1, ( "At least the first record should be yielded before MemoryLimitExceeded" @@ -921,10 +922,11 @@ def _raise_on_second_call() -> None: ) # 3. The flushed state has sourceStats.recordCount set by handle_record_counts. - # Both records are yielded (and counted) before the second check_memory_usage - # raises, so the counter is 2.0 at flush time. + # Only the first record is yielded (and counted) because the memory check + # now runs *before* yield — the second check raises before the second + # record is handled, so the counter is 1.0 at flush time. state_json = orjson.loads(state_messages[0]) - assert state_json["state"]["sourceStats"]["recordCount"] == 2.0 + assert state_json["state"]["sourceStats"]["recordCount"] == 1.0 def test_given_serialization_error_using_orjson_then_fallback_on_json( diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 62eefea0f..9081f0f8e 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -2,6 +2,7 @@ # Copyright (c) 2026 Airbyte, Inc., all rights reserved. # +import logging from pathlib import Path from unittest.mock import patch @@ -31,360 +32,293 @@ def _v1_exists(self: Path) -> bool: return self in (_CGROUP_V1_USAGE, _CGROUP_V1_LIMIT) -class TestMemoryMonitorInit: - """Tests for MemoryMonitor initialization and lazy cgroup detection.""" +def _v2_mock_read(usage: str = _MOCK_USAGE_BELOW, limit: str = _MOCK_LIMIT): + """Return a mock_read_text function for cgroup v2 with the given usage/limit.""" - def test_no_cgroup_files_disables_monitoring(self) -> None: - """When no cgroup files exist, monitoring should be disabled (no-op).""" - monitor = MemoryMonitor() - with patch.object(Path, "exists", return_value=False): + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return usage + if self == _CGROUP_V2_MAX: + return limit + return "" + + return mock_read_text + + +# --------------------------------------------------------------------------- +# check_memory_usage — no-op paths +# --------------------------------------------------------------------------- + + +def test_noop_when_no_cgroup(caplog: pytest.LogCaptureFixture) -> None: + """check_memory_usage should be a no-op when cgroup is unavailable.""" + monitor = MemoryMonitor() + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", return_value=False), + ): + monitor.check_memory_usage() + assert not caplog.records + + +def test_noop_when_limit_is_max(caplog: pytest.LogCaptureFixture) -> None: + """When cgroup v2 memory.max is 'max' (unlimited), should be a no-op.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(limit="max\n")), + ): + monitor.check_memory_usage() + assert not caplog.records + + +def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None: + """When cgroup limit file contains '0', should be a no-op.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(limit="0\n")), + ): + monitor.check_memory_usage() + assert not caplog.records + + +# --------------------------------------------------------------------------- +# check_memory_usage — below threshold +# --------------------------------------------------------------------------- + + +def test_no_warning_below_threshold(caplog: pytest.LogCaptureFixture) -> None: + """No warning should be emitted when usage is below 85%.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_BELOW)), + ): + monitor.check_memory_usage() + assert not caplog.records + + +# --------------------------------------------------------------------------- +# check_memory_usage — warning threshold +# --------------------------------------------------------------------------- + + +def test_warning_at_85_percent(caplog: pytest.LogCaptureFixture) -> None: + """Warning log should be emitted at 87% usage (above 85% threshold).""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_WARNING)), + ): + monitor.check_memory_usage() + + assert len(caplog.records) == 1 + assert "87%" in caplog.records[0].message + + +def test_warning_emitted_only_once(caplog: pytest.LogCaptureFixture) -> None: + """Warning should only be logged once even if called multiple times.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_WARNING)), + ): + monitor.check_memory_usage() + monitor.check_memory_usage() + + assert len(caplog.records) == 1 + + +def test_custom_thresholds_warning(caplog: pytest.LogCaptureFixture) -> None: + """Custom warning threshold should be respected.""" + monitor = MemoryMonitor( + warning_threshold=0.70, + critical_threshold=0.90, + check_interval=1, + ) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage="750000000\n")), + ): + # 75% exceeds 70% warning threshold but is below 90% critical + monitor.check_memory_usage() + + assert len(caplog.records) == 1 + assert "75%" in caplog.records[0].message + + +# --------------------------------------------------------------------------- +# check_memory_usage — critical threshold +# --------------------------------------------------------------------------- + + +def test_critical_at_95_percent_raises() -> None: + """MemoryLimitExceeded should be raised at 96% usage.""" + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_CRITICAL)), + ): + with pytest.raises(MemoryLimitExceeded) as exc_info: monitor.check_memory_usage() - assert monitor._cgroup_version is None - - def test_cgroup_v2_detected(self) -> None: - """When cgroup v2 files exist, version should be 2.""" - monitor = MemoryMonitor(check_interval=2) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", return_value=_MOCK_USAGE_BELOW), - ): - monitor.check_memory_usage() - assert monitor._cgroup_version == 2 - - def test_cgroup_v1_detected(self) -> None: - """When only cgroup v1 files exist, version should be 1.""" - monitor = MemoryMonitor(check_interval=2) - with ( - patch.object(Path, "exists", _v1_exists), - patch.object(Path, "read_text", return_value=_MOCK_USAGE_BELOW), - ): - monitor.check_memory_usage() - assert monitor._cgroup_version == 1 - - def test_cgroup_v2_preferred_over_v1(self) -> None: - """When both cgroup v2 and v1 files exist, v2 should be preferred.""" - monitor = MemoryMonitor(check_interval=2) - with ( - patch.object(Path, "exists", return_value=True), - patch.object(Path, "read_text", return_value=_MOCK_USAGE_BELOW), - ): - monitor.check_memory_usage() - assert monitor._cgroup_version == 2 - - def test_lazy_probe_not_called_until_check(self) -> None: - """Cgroup probing should not happen during __init__, only on first check_memory_usage().""" - monitor = MemoryMonitor() - assert not monitor._probed - assert monitor._cgroup_version is None - - with ( - patch.object(Path, "exists", return_value=True), - patch.object(Path, "read_text", return_value=_MOCK_USAGE_BELOW), - ): - monitor.check_memory_usage() - - assert monitor._probed - assert monitor._cgroup_version == 2 - - -class TestMemoryMonitorCheckMemory: - """Tests for the check_memory_usage method.""" - - def test_noop_when_no_cgroup(self) -> None: - """check_memory_usage should be a no-op when cgroup is unavailable.""" - monitor = MemoryMonitor() - with patch.object(Path, "exists", return_value=False): - monitor.check_memory_usage() - - def test_noop_when_limit_is_max(self) -> None: - """When cgroup v2 memory.max is 'max' (unlimited), should be a no-op.""" - - def mock_read_text(self: Path) -> str: - if self == _CGROUP_V2_CURRENT: - return "1000000\n" - if self == _CGROUP_V2_MAX: - return "max\n" - return "" - - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", mock_read_text), - ): - monitor.check_memory_usage() - - def test_no_warning_below_threshold(self) -> None: - """No warning should be emitted when usage is below 85%.""" - - def mock_read_text(self: Path) -> str: - if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_BELOW - if self == _CGROUP_V2_MAX: - return _MOCK_LIMIT - return "" - - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", mock_read_text), - ): - monitor.check_memory_usage() - - assert not monitor._warning_emitted - assert not monitor._critical_raised - - def test_warning_at_85_percent(self) -> None: - """Warning should be emitted at 85% usage.""" - def mock_read_text(self: Path) -> str: - if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_WARNING - if self == _CGROUP_V2_MAX: - return _MOCK_LIMIT - return "" + assert exc_info.value.failure_type == FailureType.system_error + assert "96%" in (exc_info.value.message or "") - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", mock_read_text), - ): - monitor.check_memory_usage() - assert monitor._warning_emitted - assert not monitor._critical_raised - - def test_critical_at_95_percent_raises(self) -> None: - """MemoryLimitExceeded should be raised at 95% usage.""" - - def mock_read_text(self: Path) -> str: - if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_CRITICAL - if self == _CGROUP_V2_MAX: - return _MOCK_LIMIT - return "" - - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", mock_read_text), - ): - with pytest.raises(MemoryLimitExceeded) as exc_info: - monitor.check_memory_usage() - - assert exc_info.value.failure_type == FailureType.system_error - assert "96%" in (exc_info.value.message or "") - - def test_warning_emitted_only_once(self) -> None: - """Warning should only be emitted once even if called multiple times.""" - - def mock_read_text(self: Path) -> str: - if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_WARNING - if self == _CGROUP_V2_MAX: - return _MOCK_LIMIT - return "" - - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", mock_read_text), - ): - monitor.check_memory_usage() - assert monitor._warning_emitted +def test_critical_raised_only_once() -> None: + """MemoryLimitExceeded should only be raised once.""" + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_CRITICAL)), + ): + with pytest.raises(MemoryLimitExceeded): monitor.check_memory_usage() - assert monitor._warning_emitted - - def test_critical_raised_only_once(self) -> None: - """MemoryLimitExceeded should only be raised once.""" - - def mock_read_text(self: Path) -> str: - if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_CRITICAL - if self == _CGROUP_V2_MAX: - return _MOCK_LIMIT - return "" - - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", mock_read_text), - ): - with pytest.raises(MemoryLimitExceeded): - monitor.check_memory_usage() - # Second call should NOT raise again + # Second call should NOT raise again + monitor.check_memory_usage() + + +def test_custom_thresholds_critical() -> None: + """Custom critical threshold should be respected.""" + monitor = MemoryMonitor( + warning_threshold=0.70, + critical_threshold=0.80, + check_interval=1, + ) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage="850000000\n")), + ): + with pytest.raises(MemoryLimitExceeded): monitor.check_memory_usage() - def test_cgroup_v1_reading(self) -> None: - """Memory reading should work with cgroup v1 paths.""" - - def mock_read_text(self: Path) -> str: - if self == _CGROUP_V1_USAGE: - return _MOCK_USAGE_WARNING - if self == _CGROUP_V1_LIMIT: - return _MOCK_LIMIT - return "" - - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v1_exists), - patch.object(Path, "read_text", mock_read_text), - ): - monitor.check_memory_usage() - - assert monitor._cgroup_version == 1 - assert monitor._warning_emitted - - def test_check_interval_skips_intermediate_calls(self) -> None: - """Monitor should only check cgroup files every check_interval messages.""" - def mock_read_text(self: Path) -> str: - if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_WARNING - if self == _CGROUP_V2_MAX: - return _MOCK_LIMIT - return "" +# --------------------------------------------------------------------------- +# check_memory_usage — cgroup v1 path +# --------------------------------------------------------------------------- - monitor = MemoryMonitor(check_interval=3) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", mock_read_text), - ): - monitor.check_memory_usage() - assert not monitor._warning_emitted - monitor.check_memory_usage() - assert not monitor._warning_emitted - # Call 3 should trigger the actual check - monitor.check_memory_usage() - assert monitor._warning_emitted - - def test_custom_thresholds_warning(self) -> None: - """Custom warning threshold should be respected.""" - - def mock_read_text(self: Path) -> str: - if self == _CGROUP_V2_CURRENT: - return "750000000\n" - if self == _CGROUP_V2_MAX: - return _MOCK_LIMIT - return "" - - monitor = MemoryMonitor( - warning_threshold=0.70, - critical_threshold=0.90, - check_interval=1, - ) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", mock_read_text), - ): - monitor.check_memory_usage() - - assert monitor._warning_emitted - assert not monitor._critical_raised - - def test_custom_thresholds_critical(self) -> None: - """Custom critical threshold should be respected.""" - - def mock_read_text(self: Path) -> str: - if self == _CGROUP_V2_CURRENT: - return "850000000\n" - if self == _CGROUP_V2_MAX: - return _MOCK_LIMIT - return "" - - monitor = MemoryMonitor( - warning_threshold=0.70, - critical_threshold=0.80, - check_interval=1, - ) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", mock_read_text), - ): - with pytest.raises(MemoryLimitExceeded): - monitor.check_memory_usage() - - def test_malformed_cgroup_file_degrades_gracefully(self) -> None: - """Malformed cgroup files should not crash the sync.""" - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", return_value="not_a_number\n"), - ): - monitor.check_memory_usage() - - assert not monitor._warning_emitted - assert not monitor._critical_raised - - def test_empty_cgroup_file_degrades_gracefully(self) -> None: - """Empty cgroup file content should not crash the sync.""" - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", return_value=""), - ): - monitor.check_memory_usage() - - assert not monitor._warning_emitted - assert not monitor._critical_raised - - def test_os_error_degrades_gracefully(self) -> None: - """OSError reading cgroup files should not crash the sync.""" - - def mock_read_text(self: Path) -> str: - raise OSError("Permission denied") - - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", mock_read_text), - ): - monitor.check_memory_usage() - - assert not monitor._warning_emitted - assert not monitor._critical_raised - - def test_limit_bytes_zero_is_noop(self) -> None: - """When cgroup limit file contains '0', should be a no-op.""" - - def mock_read_text(self: Path) -> str: - if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_BELOW - if self == _CGROUP_V2_MAX: - return "0\n" - return "" - - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", mock_read_text), - ): - monitor.check_memory_usage() - assert not monitor._warning_emitted - assert not monitor._critical_raised - - -class TestMemoryLimitExceeded: - """Tests for the MemoryLimitExceeded exception.""" - - def test_is_airbyte_traced_exception(self) -> None: - """MemoryLimitExceeded should be a subclass of AirbyteTracedException.""" - from airbyte_cdk.utils.traced_exception import AirbyteTracedException - - exc = MemoryLimitExceeded( - internal_message="test", - message="test message", - failure_type=FailureType.system_error, - ) - assert isinstance(exc, AirbyteTracedException) - - def test_default_attributes(self) -> None: - """MemoryLimitExceeded should have correct default attributes.""" - exc = MemoryLimitExceeded( - internal_message="Memory at 96%", - message="Source exceeded memory limit.", - failure_type=FailureType.system_error, - ) - assert exc.failure_type == FailureType.system_error - assert exc.message == "Source exceeded memory limit." - assert exc.internal_message == "Memory at 96%" +def test_cgroup_v1_emits_warning(caplog: pytest.LogCaptureFixture) -> None: + """Memory reading should work with cgroup v1 paths (proves v1 detection works).""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V1_USAGE: + return _MOCK_USAGE_WARNING + if self == _CGROUP_V1_LIMIT: + return _MOCK_LIMIT + return "" + + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v1_exists), + patch.object(Path, "read_text", mock_read_text), + ): + monitor.check_memory_usage() + + assert len(caplog.records) == 1 + assert "87%" in caplog.records[0].message + + +# --------------------------------------------------------------------------- +# check_memory_usage — check interval +# --------------------------------------------------------------------------- + + +def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixture) -> None: + """Monitor should only check cgroup files every check_interval messages.""" + monitor = MemoryMonitor(check_interval=3) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_WARNING)), + ): + monitor.check_memory_usage() + assert not caplog.records # call 1: skipped + monitor.check_memory_usage() + assert not caplog.records # call 2: skipped + # Call 3 should trigger the actual check + monitor.check_memory_usage() + assert len(caplog.records) == 1 + + +# --------------------------------------------------------------------------- +# check_memory_usage — graceful degradation +# --------------------------------------------------------------------------- + + +def test_malformed_cgroup_file_degrades_gracefully(caplog: pytest.LogCaptureFixture) -> None: + """Malformed cgroup files should not crash the sync.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", return_value="not_a_number\n"), + ): + monitor.check_memory_usage() + assert not caplog.records + + +def test_empty_cgroup_file_degrades_gracefully(caplog: pytest.LogCaptureFixture) -> None: + """Empty cgroup file content should not crash the sync.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", return_value=""), + ): + monitor.check_memory_usage() + assert not caplog.records + + +def test_os_error_degrades_gracefully(caplog: pytest.LogCaptureFixture) -> None: + """OSError reading cgroup files should not crash the sync.""" + + def mock_read_text(self: Path) -> str: + raise OSError("Permission denied") + + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): + monitor.check_memory_usage() + assert not caplog.records + + +# --------------------------------------------------------------------------- +# MemoryLimitExceeded exception +# --------------------------------------------------------------------------- + + +def test_memory_limit_exceeded_is_airbyte_traced_exception() -> None: + """MemoryLimitExceeded should be a subclass of AirbyteTracedException.""" + from airbyte_cdk.utils.traced_exception import AirbyteTracedException + + exc = MemoryLimitExceeded( + internal_message="test", + message="test message", + failure_type=FailureType.system_error, + ) + assert isinstance(exc, AirbyteTracedException) + + +def test_memory_limit_exceeded_attributes() -> None: + """MemoryLimitExceeded should have correct attributes.""" + exc = MemoryLimitExceeded( + internal_message="Memory at 96%", + message="Source exceeded memory limit.", + failure_type=FailureType.system_error, + ) + assert exc.failure_type == FailureType.system_error + assert exc.message == "Source exceeded memory limit." + assert exc.internal_message == "Memory at 96%" From 0485fa2770cb28bd361b4af7d559bfc6d8a7bc32 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 23:02:21 +0000 Subject: [PATCH 11/16] refactor(cdk): move memory check back to after yield for zero data loss Co-Authored-By: bot_apk --- airbyte_cdk/entrypoint.py | 2 +- unit_tests/test_entrypoint.py | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index ad290ebce..3b2c771ac 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -281,8 +281,8 @@ def read( stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) try: for message in self.source.read(self.logger, config, catalog, state): - self._memory_monitor.check_memory_usage() yield self.handle_record_counts(message, stream_message_counter) + self._memory_monitor.check_memory_usage() finally: for message in self._emit_queued_messages(self.source): yield self.handle_record_counts(message, stream_message_counter) diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 7cd51fdbb..f551ce40d 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -908,11 +908,11 @@ def _raise_on_second_call() -> None: for msg in entrypoint.run(parsed_args): messages.append(msg) - # 1. Only the first record was yielded; the second triggered the exception - # before its yield so it is handled in the finally block instead. + # 1. Both records were yielded before the exception — the memory check + # runs after yield so every message pulled from the source is emitted. record_messages = [m for m in messages if "RECORD" in m] - assert len(record_messages) >= 1, ( - "At least the first record should be yielded before MemoryLimitExceeded" + assert len(record_messages) == 2, ( + "Both records should be yielded before MemoryLimitExceeded" ) # 2. The queued state message was flushed by the finally block @@ -922,11 +922,10 @@ def _raise_on_second_call() -> None: ) # 3. The flushed state has sourceStats.recordCount set by handle_record_counts. - # Only the first record is yielded (and counted) because the memory check - # now runs *before* yield — the second check raises before the second - # record is handled, so the counter is 1.0 at flush time. + # Both records are yielded (and counted) before the second check_memory_usage + # raises, so the counter is 2.0 at flush time. state_json = orjson.loads(state_messages[0]) - assert state_json["state"]["sourceStats"]["recordCount"] == 1.0 + assert state_json["state"]["sourceStats"]["recordCount"] == 2.0 def test_given_serialization_error_using_orjson_then_fallback_on_json( From 1621a39426d1a11109d3894c62d0ca340690581b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 23:03:12 +0000 Subject: [PATCH 12/16] style: fix ruff format in test_entrypoint.py Co-Authored-By: bot_apk --- unit_tests/test_entrypoint.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index f551ce40d..191738e3e 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -911,9 +911,7 @@ def _raise_on_second_call() -> None: # 1. Both records were yielded before the exception — the memory check # runs after yield so every message pulled from the source is emitted. record_messages = [m for m in messages if "RECORD" in m] - assert len(record_messages) == 2, ( - "Both records should be yielded before MemoryLimitExceeded" - ) + assert len(record_messages) == 2, "Both records should be yielded before MemoryLimitExceeded" # 2. The queued state message was flushed by the finally block state_messages = [m for m in messages if "STATE" in m] From 4dda57cefa67fcce797145b9348a8d999da0f5d5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 02:32:43 +0000 Subject: [PATCH 13/16] refactor(cdk): display memory usage in GB instead of raw bytes Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 027efe906..85666f483 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -140,11 +140,13 @@ def check_memory_usage(self) -> None: usage_bytes, limit_bytes = memory_info usage_ratio = usage_bytes / limit_bytes usage_percent = int(usage_ratio * 100) + usage_gb = usage_bytes / (1024**3) + limit_gb = limit_bytes / (1024**3) if usage_ratio >= self._critical_threshold and not self._critical_raised: self._critical_raised = True raise MemoryLimitExceeded( - internal_message=f"Memory usage is {usage_percent}% ({usage_bytes} / {limit_bytes} bytes). " + internal_message=f"Memory usage is {usage_percent}% ({usage_gb:.2f} / {limit_gb:.2f} GB). " f"Critical threshold is {int(self._critical_threshold * 100)}%.", message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash.", failure_type=FailureType.system_error, @@ -153,8 +155,8 @@ def check_memory_usage(self) -> None: if usage_ratio >= self._warning_threshold and not self._warning_emitted: self._warning_emitted = True logger.warning( - "Source memory usage reached %d%% of container limit (%d / %d bytes).", + "Source memory usage reached %d%% of container limit (%.2f / %.2f GB).", usage_percent, - usage_bytes, - limit_bytes, + usage_gb, + limit_gb, ) From c96825ff61561bd888957b7fa7dadcd1ec4cedc7 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 04:56:31 +0000 Subject: [PATCH 14/16] refactor(cdk): remove MemoryLimitExceeded subclass, raise AirbyteTracedException directly Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 12 ++------ unit_tests/test_entrypoint.py | 16 +++++----- unit_tests/utils/test_memory_monitor.py | 41 ++++--------------------- 3 files changed, 17 insertions(+), 52 deletions(-) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 85666f483..607afb28b 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -29,12 +29,6 @@ _DEFAULT_CHECK_INTERVAL = 1000 -class MemoryLimitExceeded(AirbyteTracedException): - """Raised when connector memory usage exceeds critical threshold.""" - - pass - - class MemoryMonitor: """Monitors container memory usage via cgroup files and emits warnings before OOM kills. @@ -119,8 +113,8 @@ def check_memory_usage(self) -> None: messages (default 1000) to minimise I/O overhead. At the warning threshold (default 85%), logs a warning message. - At the critical threshold (default 95%), raises MemoryLimitExceeded to - trigger a graceful shutdown with an actionable error message. + At the critical threshold (default 95%), raises ``AirbyteTracedException`` + to trigger a graceful shutdown with an actionable error message. Each threshold triggers at most once per sync to avoid log spam. This method is a no-op if cgroup files are unavailable. @@ -145,7 +139,7 @@ def check_memory_usage(self) -> None: if usage_ratio >= self._critical_threshold and not self._critical_raised: self._critical_raised = True - raise MemoryLimitExceeded( + raise AirbyteTracedException( internal_message=f"Memory usage is {usage_percent}% ({usage_gb:.2f} / {limit_gb:.2f} GB). " f"Critical threshold is {int(self._critical_threshold * 100)}%.", message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash.", diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 191738e3e..8bbfb4bbd 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -836,10 +836,10 @@ def test_handle_record_counts( def test_memory_limit_exceeded_flushes_queued_messages(mocker, spec_mock, config_mock): - """When MemoryLimitExceeded is raised mid-read, queued messages should still be flushed. + """When AirbyteTracedException is raised mid-read, queued messages should still be flushed. The read() try/finally ensures _emit_queued_messages runs even when - MemoryLimitExceeded propagates. The exception still surfaces to the + AirbyteTracedException propagates. The exception still surfaces to the caller, but all messages yielded before (records) and during (finally- block state messages) the exception are available to the consumer. """ @@ -879,7 +879,7 @@ def test_memory_limit_exceeded_flushes_queued_messages(mocker, spec_mock, config mocker.patch.object(MockSource, "read_catalog", return_value={}) mocker.patch.object(MockSource, "read", return_value=[record, record]) - from airbyte_cdk.utils.memory_monitor import MemoryLimitExceeded + from airbyte_cdk.utils.traced_exception import AirbyteTracedException call_count = 0 @@ -887,7 +887,7 @@ def _raise_on_second_call() -> None: nonlocal call_count call_count += 1 if call_count >= 2: - raise MemoryLimitExceeded( + raise AirbyteTracedException( internal_message="Memory at 96%", message="Source exceeded memory limit (96% used) and must shut down to avoid an out-of-memory crash.", failure_type=FailureType.system_error, @@ -901,22 +901,22 @@ def _raise_on_second_call() -> None: command="read", config="config_path", state="statepath", catalog="catalogpath" ) - # The generator yields messages until MemoryLimitExceeded propagates. + # The generator yields messages until AirbyteTracedException propagates. # Collect everything yielded before the exception surfaces. messages: list[str] = [] - with pytest.raises(MemoryLimitExceeded): + with pytest.raises(AirbyteTracedException): for msg in entrypoint.run(parsed_args): messages.append(msg) # 1. Both records were yielded before the exception — the memory check # runs after yield so every message pulled from the source is emitted. record_messages = [m for m in messages if "RECORD" in m] - assert len(record_messages) == 2, "Both records should be yielded before MemoryLimitExceeded" + assert len(record_messages) == 2, "Both records should be yielded before AirbyteTracedException" # 2. The queued state message was flushed by the finally block state_messages = [m for m in messages if "STATE" in m] assert len(state_messages) >= 1, ( - "Queued state message should be flushed even after MemoryLimitExceeded" + "Queued state message should be flushed even after AirbyteTracedException" ) # 3. The flushed state has sourceStats.recordCount set by handle_record_counts. diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 9081f0f8e..31fbb71fe 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -14,9 +14,9 @@ _CGROUP_V1_USAGE, _CGROUP_V2_CURRENT, _CGROUP_V2_MAX, - MemoryLimitExceeded, MemoryMonitor, ) +from airbyte_cdk.utils.traced_exception import AirbyteTracedException _MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB _MOCK_USAGE_WARNING = "870000000\n" # 87% of 1 GB @@ -160,13 +160,13 @@ def test_custom_thresholds_warning(caplog: pytest.LogCaptureFixture) -> None: def test_critical_at_95_percent_raises() -> None: - """MemoryLimitExceeded should be raised at 96% usage.""" + """AirbyteTracedException should be raised at 96% usage.""" monitor = MemoryMonitor(check_interval=1) with ( patch.object(Path, "exists", _v2_exists), patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_CRITICAL)), ): - with pytest.raises(MemoryLimitExceeded) as exc_info: + with pytest.raises(AirbyteTracedException) as exc_info: monitor.check_memory_usage() assert exc_info.value.failure_type == FailureType.system_error @@ -174,13 +174,13 @@ def test_critical_at_95_percent_raises() -> None: def test_critical_raised_only_once() -> None: - """MemoryLimitExceeded should only be raised once.""" + """AirbyteTracedException should only be raised once.""" monitor = MemoryMonitor(check_interval=1) with ( patch.object(Path, "exists", _v2_exists), patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_CRITICAL)), ): - with pytest.raises(MemoryLimitExceeded): + with pytest.raises(AirbyteTracedException): monitor.check_memory_usage() # Second call should NOT raise again monitor.check_memory_usage() @@ -197,7 +197,7 @@ def test_custom_thresholds_critical() -> None: patch.object(Path, "exists", _v2_exists), patch.object(Path, "read_text", _v2_mock_read(usage="850000000\n")), ): - with pytest.raises(MemoryLimitExceeded): + with pytest.raises(AirbyteTracedException): monitor.check_memory_usage() @@ -293,32 +293,3 @@ def mock_read_text(self: Path) -> str: ): monitor.check_memory_usage() assert not caplog.records - - -# --------------------------------------------------------------------------- -# MemoryLimitExceeded exception -# --------------------------------------------------------------------------- - - -def test_memory_limit_exceeded_is_airbyte_traced_exception() -> None: - """MemoryLimitExceeded should be a subclass of AirbyteTracedException.""" - from airbyte_cdk.utils.traced_exception import AirbyteTracedException - - exc = MemoryLimitExceeded( - internal_message="test", - message="test message", - failure_type=FailureType.system_error, - ) - assert isinstance(exc, AirbyteTracedException) - - -def test_memory_limit_exceeded_attributes() -> None: - """MemoryLimitExceeded should have correct attributes.""" - exc = MemoryLimitExceeded( - internal_message="Memory at 96%", - message="Source exceeded memory limit.", - failure_type=FailureType.system_error, - ) - assert exc.failure_type == FailureType.system_error - assert exc.message == "Source exceeded memory limit." - assert exc.internal_message == "Memory at 96%" From cdc518cfb11b1ca892057f8d84209ba7ca672c63 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 04:57:25 +0000 Subject: [PATCH 15/16] fix(cdk): validate check_interval >= 1 to prevent ZeroDivisionError Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 2 ++ unit_tests/utils/test_memory_monitor.py | 17 +++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 607afb28b..82c96b48d 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -43,6 +43,8 @@ def __init__( critical_threshold: float = _DEFAULT_CRITICAL_THRESHOLD, check_interval: int = _DEFAULT_CHECK_INTERVAL, ) -> None: + if check_interval < 1: + raise ValueError(f"check_interval must be >= 1, got {check_interval}") self._warning_threshold = warning_threshold self._critical_threshold = critical_threshold self._check_interval = check_interval diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 31fbb71fe..662bd491f 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -45,6 +45,23 @@ def mock_read_text(self: Path) -> str: return mock_read_text +# --------------------------------------------------------------------------- +# __init__ — input validation +# --------------------------------------------------------------------------- + + +def test_check_interval_zero_raises() -> None: + """check_interval=0 should raise ValueError at construction time.""" + with pytest.raises(ValueError, match="check_interval must be >= 1"): + MemoryMonitor(check_interval=0) + + +def test_check_interval_negative_raises() -> None: + """Negative check_interval should raise ValueError at construction time.""" + with pytest.raises(ValueError, match="check_interval must be >= 1"): + MemoryMonitor(check_interval=-1) + + # --------------------------------------------------------------------------- # check_memory_usage — no-op paths # --------------------------------------------------------------------------- From bf9db5f3f2c422c61c18ef0a5156c4ac8e0e01f6 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:07:52 +0000 Subject: [PATCH 16/16] refactor(cdk): switch memory monitor to logging-only trial mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per pnilan's review feedback: - Remove AirbyteTracedException raise at critical threshold — log only - Change check_interval default from 1000 to 5000 messages - Log WARNING on every check above 90% (remove one-shot flags) - Simplify constructor (remove threshold params and state flags) - Remove try/finally from entrypoint read() (no longer raising exceptions) - Add TODO comment for cgroup v1 removal - Update tests: remove exception/one-shot tests, add repeated logging test Co-Authored-By: bot_apk --- airbyte_cdk/entrypoint.py | 12 ++- airbyte_cdk/utils/memory_monitor.py | 50 ++++------- unit_tests/test_entrypoint.py | 91 ------------------- unit_tests/utils/test_memory_monitor.py | 111 +++++------------------- 4 files changed, 44 insertions(+), 220 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 3b2c771ac..d6c92f9fa 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -279,13 +279,11 @@ def read( # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) - try: - for message in self.source.read(self.logger, config, catalog, state): - yield self.handle_record_counts(message, stream_message_counter) - self._memory_monitor.check_memory_usage() - finally: - for message in self._emit_queued_messages(self.source): - yield self.handle_record_counts(message, stream_message_counter) + for message in self.source.read(self.logger, config, catalog, state): + yield self.handle_record_counts(message, stream_message_counter) + self._memory_monitor.check_memory_usage() + for message in self._emit_queued_messages(self.source): + yield self.handle_record_counts(message, stream_message_counter) @staticmethod def handle_record_counts( diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 82c96b48d..0767ce3bf 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -2,55 +2,49 @@ # Copyright (c) 2026 Airbyte, Inc., all rights reserved. # -"""Source-side memory introspection to emit controlled error messages before OOM kills.""" +"""Source-side memory introspection to log memory usage approaching container limits.""" import logging from pathlib import Path from typing import Optional -from airbyte_cdk.models import FailureType -from airbyte_cdk.utils.traced_exception import AirbyteTracedException - logger = logging.getLogger("airbyte") # cgroup v2 paths _CGROUP_V2_CURRENT = Path("/sys/fs/cgroup/memory.current") _CGROUP_V2_MAX = Path("/sys/fs/cgroup/memory.max") -# cgroup v1 paths +# cgroup v1 paths — TODO: remove if all deployments are confirmed cgroup v2 _CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes") _CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes") -# Default thresholds -_DEFAULT_WARNING_THRESHOLD = 0.85 -_DEFAULT_CRITICAL_THRESHOLD = 0.95 +# Log when usage is at or above 90% +_MEMORY_THRESHOLD = 0.90 # Check interval (every N messages) -_DEFAULT_CHECK_INTERVAL = 1000 +_DEFAULT_CHECK_INTERVAL = 5000 class MemoryMonitor: - """Monitors container memory usage via cgroup files and emits warnings before OOM kills. + """Monitors container memory usage via cgroup files and logs warnings when usage is high. Lazily probes cgroup v2 then v1 files on the first call to ``check_memory_usage()``. Caches which version exists. If neither is found (local dev / CI), all subsequent calls are instant no-ops. + + Logs a WARNING on every check interval (default 5000 messages) when memory + usage is at or above 90% of the container limit. This gives breadcrumb + trails showing whether memory is climbing, plateauing, or sawtoothing. """ def __init__( self, - warning_threshold: float = _DEFAULT_WARNING_THRESHOLD, - critical_threshold: float = _DEFAULT_CRITICAL_THRESHOLD, check_interval: int = _DEFAULT_CHECK_INTERVAL, ) -> None: if check_interval < 1: raise ValueError(f"check_interval must be >= 1, got {check_interval}") - self._warning_threshold = warning_threshold - self._critical_threshold = critical_threshold self._check_interval = check_interval self._message_count = 0 - self._warning_emitted = False - self._critical_raised = False self._cgroup_version: Optional[int] = None self._probed = False @@ -108,17 +102,15 @@ def _read_memory(self) -> Optional[tuple[int, int]]: return None def check_memory_usage(self) -> None: - """Check memory usage against thresholds. + """Check memory usage and log when above 90%. Intended to be called on every message. The monitor internally tracks a message counter and only reads cgroup files every ``check_interval`` - messages (default 1000) to minimise I/O overhead. + messages (default 5000) to minimise I/O overhead. - At the warning threshold (default 85%), logs a warning message. - At the critical threshold (default 95%), raises ``AirbyteTracedException`` - to trigger a graceful shutdown with an actionable error message. + Logs a WARNING on every check above 90% to provide breadcrumb trails + showing memory trends over the sync lifetime. - Each threshold triggers at most once per sync to avoid log spam. This method is a no-op if cgroup files are unavailable. """ self._probe_cgroup() @@ -139,19 +131,9 @@ def check_memory_usage(self) -> None: usage_gb = usage_bytes / (1024**3) limit_gb = limit_bytes / (1024**3) - if usage_ratio >= self._critical_threshold and not self._critical_raised: - self._critical_raised = True - raise AirbyteTracedException( - internal_message=f"Memory usage is {usage_percent}% ({usage_gb:.2f} / {limit_gb:.2f} GB). " - f"Critical threshold is {int(self._critical_threshold * 100)}%.", - message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash.", - failure_type=FailureType.system_error, - ) - - if usage_ratio >= self._warning_threshold and not self._warning_emitted: - self._warning_emitted = True + if usage_ratio >= _MEMORY_THRESHOLD: logger.warning( - "Source memory usage reached %d%% of container limit (%.2f / %.2f GB).", + "Source memory usage at %d%% of container limit (%.2f / %.2f GB).", usage_percent, usage_gb, limit_gb, diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 8bbfb4bbd..520131881 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -835,97 +835,6 @@ def test_handle_record_counts( ) -def test_memory_limit_exceeded_flushes_queued_messages(mocker, spec_mock, config_mock): - """When AirbyteTracedException is raised mid-read, queued messages should still be flushed. - - The read() try/finally ensures _emit_queued_messages runs even when - AirbyteTracedException propagates. The exception still surfaces to the - caller, but all messages yielded before (records) and during (finally- - block state messages) the exception are available to the consumer. - """ - queued_state = AirbyteMessage( - type=Type.STATE, - state=AirbyteStateMessage( - type=AirbyteStateType.STREAM, - stream=AirbyteStreamState( - stream_descriptor=StreamDescriptor(name="stream"), - stream_state=AirbyteStateBlob(updated_at="2026-01-01"), - ), - ), - ) - message_repository = MagicMock() - # consume_queue calls: - # 1. run() preamble → initial queued control message - # 2. read() finally block → queued state (the key assertion) - # 3. run() outer finally → nothing - message_repository.consume_queue.side_effect = [ - [MESSAGE_FROM_REPOSITORY], - [queued_state], - [], - ] - mocker.patch.object( - MockSource, - "message_repository", - new_callable=mocker.PropertyMock, - return_value=message_repository, - ) - entrypoint = AirbyteEntrypoint(MockSource()) - - record = AirbyteMessage( - type=Type.RECORD, - record=AirbyteRecordMessage(stream="stream", data={"id": "1"}, emitted_at=1), - ) - mocker.patch.object(MockSource, "read_state", return_value={}) - mocker.patch.object(MockSource, "read_catalog", return_value={}) - mocker.patch.object(MockSource, "read", return_value=[record, record]) - - from airbyte_cdk.utils.traced_exception import AirbyteTracedException - - call_count = 0 - - def _raise_on_second_call() -> None: - nonlocal call_count - call_count += 1 - if call_count >= 2: - raise AirbyteTracedException( - internal_message="Memory at 96%", - message="Source exceeded memory limit (96% used) and must shut down to avoid an out-of-memory crash.", - failure_type=FailureType.system_error, - ) - - mocker.patch.object( - entrypoint._memory_monitor, "check_memory_usage", side_effect=_raise_on_second_call - ) - - parsed_args = Namespace( - command="read", config="config_path", state="statepath", catalog="catalogpath" - ) - - # The generator yields messages until AirbyteTracedException propagates. - # Collect everything yielded before the exception surfaces. - messages: list[str] = [] - with pytest.raises(AirbyteTracedException): - for msg in entrypoint.run(parsed_args): - messages.append(msg) - - # 1. Both records were yielded before the exception — the memory check - # runs after yield so every message pulled from the source is emitted. - record_messages = [m for m in messages if "RECORD" in m] - assert len(record_messages) == 2, "Both records should be yielded before AirbyteTracedException" - - # 2. The queued state message was flushed by the finally block - state_messages = [m for m in messages if "STATE" in m] - assert len(state_messages) >= 1, ( - "Queued state message should be flushed even after AirbyteTracedException" - ) - - # 3. The flushed state has sourceStats.recordCount set by handle_record_counts. - # Both records are yielded (and counted) before the second check_memory_usage - # raises, so the counter is 2.0 at flush time. - state_json = orjson.loads(state_messages[0]) - assert state_json["state"]["sourceStats"]["recordCount"] == 2.0 - - def test_given_serialization_error_using_orjson_then_fallback_on_json( entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock ): diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 662bd491f..cf8250465 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -8,7 +8,6 @@ import pytest -from airbyte_cdk.models import FailureType from airbyte_cdk.utils.memory_monitor import ( _CGROUP_V1_LIMIT, _CGROUP_V1_USAGE, @@ -16,11 +15,9 @@ _CGROUP_V2_MAX, MemoryMonitor, ) -from airbyte_cdk.utils.traced_exception import AirbyteTracedException _MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB -_MOCK_USAGE_WARNING = "870000000\n" # 87% of 1 GB -_MOCK_USAGE_CRITICAL = "960000000\n" # 96% of 1 GB +_MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB _MOCK_LIMIT = "1000000000\n" # 1 GB @@ -108,7 +105,7 @@ def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None: def test_no_warning_below_threshold(caplog: pytest.LogCaptureFixture) -> None: - """No warning should be emitted when usage is below 85%.""" + """No warning should be emitted when usage is below 90%.""" monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.WARNING, logger="airbyte"), @@ -120,102 +117,40 @@ def test_no_warning_below_threshold(caplog: pytest.LogCaptureFixture) -> None: # --------------------------------------------------------------------------- -# check_memory_usage — warning threshold +# check_memory_usage — at/above 90% threshold # --------------------------------------------------------------------------- -def test_warning_at_85_percent(caplog: pytest.LogCaptureFixture) -> None: - """Warning log should be emitted at 87% usage (above 85% threshold).""" +def test_logs_at_90_percent(caplog: pytest.LogCaptureFixture) -> None: + """Warning log should be emitted at 91% usage (above 90% threshold).""" monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_WARNING)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), ): monitor.check_memory_usage() assert len(caplog.records) == 1 - assert "87%" in caplog.records[0].message + assert "91%" in caplog.records[0].message -def test_warning_emitted_only_once(caplog: pytest.LogCaptureFixture) -> None: - """Warning should only be logged once even if called multiple times.""" +def test_logs_on_every_check_above_90_percent(caplog: pytest.LogCaptureFixture) -> None: + """Warning should be logged on EVERY check interval when above 90%, not just once.""" monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_WARNING)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), ): monitor.check_memory_usage() monitor.check_memory_usage() - - assert len(caplog.records) == 1 - - -def test_custom_thresholds_warning(caplog: pytest.LogCaptureFixture) -> None: - """Custom warning threshold should be respected.""" - monitor = MemoryMonitor( - warning_threshold=0.70, - critical_threshold=0.90, - check_interval=1, - ) - with ( - caplog.at_level(logging.WARNING, logger="airbyte"), - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage="750000000\n")), - ): - # 75% exceeds 70% warning threshold but is below 90% critical - monitor.check_memory_usage() - - assert len(caplog.records) == 1 - assert "75%" in caplog.records[0].message - - -# --------------------------------------------------------------------------- -# check_memory_usage — critical threshold -# --------------------------------------------------------------------------- - - -def test_critical_at_95_percent_raises() -> None: - """AirbyteTracedException should be raised at 96% usage.""" - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_CRITICAL)), - ): - with pytest.raises(AirbyteTracedException) as exc_info: - monitor.check_memory_usage() - - assert exc_info.value.failure_type == FailureType.system_error - assert "96%" in (exc_info.value.message or "") - - -def test_critical_raised_only_once() -> None: - """AirbyteTracedException should only be raised once.""" - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_CRITICAL)), - ): - with pytest.raises(AirbyteTracedException): - monitor.check_memory_usage() - # Second call should NOT raise again monitor.check_memory_usage() - -def test_custom_thresholds_critical() -> None: - """Custom critical threshold should be respected.""" - monitor = MemoryMonitor( - warning_threshold=0.70, - critical_threshold=0.80, - check_interval=1, - ) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage="850000000\n")), - ): - with pytest.raises(AirbyteTracedException): - monitor.check_memory_usage() + # All three checks should produce a warning (no one-shot flag) + assert len(caplog.records) == 3 + for record in caplog.records: + assert "91%" in record.message # --------------------------------------------------------------------------- @@ -228,7 +163,7 @@ def test_cgroup_v1_emits_warning(caplog: pytest.LogCaptureFixture) -> None: def mock_read_text(self: Path) -> str: if self == _CGROUP_V1_USAGE: - return _MOCK_USAGE_WARNING + return _MOCK_USAGE_AT_90 if self == _CGROUP_V1_LIMIT: return _MOCK_LIMIT return "" @@ -242,7 +177,7 @@ def mock_read_text(self: Path) -> str: monitor.check_memory_usage() assert len(caplog.records) == 1 - assert "87%" in caplog.records[0].message + assert "91%" in caplog.records[0].message # --------------------------------------------------------------------------- @@ -252,17 +187,17 @@ def mock_read_text(self: Path) -> str: def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixture) -> None: """Monitor should only check cgroup files every check_interval messages.""" - monitor = MemoryMonitor(check_interval=3) + monitor = MemoryMonitor(check_interval=5000) with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_WARNING)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), ): - monitor.check_memory_usage() - assert not caplog.records # call 1: skipped - monitor.check_memory_usage() - assert not caplog.records # call 2: skipped - # Call 3 should trigger the actual check + # First 4999 calls should be skipped + for _ in range(4999): + monitor.check_memory_usage() + assert not caplog.records + # Call 5000 should trigger the actual check monitor.check_memory_usage() assert len(caplog.records) == 1