diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 54c207487..d6c92f9fa 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -41,6 +41,7 @@ from airbyte_cdk.utils import is_cloud_environment, message_utils from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH +from airbyte_cdk.utils.memory_monitor import MemoryMonitor from airbyte_cdk.utils.traced_exception import AirbyteTracedException logger = init_logger("airbyte") @@ -60,6 +61,7 @@ def __init__(self, source: Source): self.source = source self.logger = logging.getLogger(f"airbyte.{getattr(source, 'name', '')}") + self._memory_monitor = MemoryMonitor() @staticmethod def parse_args(args: List[str]) -> argparse.Namespace: @@ -279,6 +281,7 @@ def read( stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) for message in self.source.read(self.logger, config, catalog, state): yield self.handle_record_counts(message, stream_message_counter) + self._memory_monitor.check_memory_usage() for message in self._emit_queued_messages(self.source): yield self.handle_record_counts(message, stream_message_counter) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py new file mode 100644 index 000000000..0767ce3bf --- /dev/null +++ b/airbyte_cdk/utils/memory_monitor.py @@ -0,0 +1,140 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +"""Source-side memory introspection to log memory usage approaching container limits.""" + +import logging +from pathlib import Path +from typing import Optional + +logger = logging.getLogger("airbyte") + +# cgroup v2 paths +_CGROUP_V2_CURRENT = Path("/sys/fs/cgroup/memory.current") +_CGROUP_V2_MAX = Path("/sys/fs/cgroup/memory.max") + +# cgroup v1 paths — TODO: remove if all deployments are confirmed cgroup v2 +_CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes") +_CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes") + +# Log when usage is at or above 90% +_MEMORY_THRESHOLD = 0.90 + +# Check interval (every N messages) +_DEFAULT_CHECK_INTERVAL = 5000 + + +class MemoryMonitor: + """Monitors container memory usage via cgroup files and logs warnings when usage is high. + + Lazily probes cgroup v2 then v1 files on the first call to + ``check_memory_usage()``. Caches which version exists. + If neither is found (local dev / CI), all subsequent calls are instant no-ops. + + Logs a WARNING on every check interval (default 5000 messages) when memory + usage is at or above 90% of the container limit. This gives breadcrumb + trails showing whether memory is climbing, plateauing, or sawtoothing. + """ + + def __init__( + self, + check_interval: int = _DEFAULT_CHECK_INTERVAL, + ) -> None: + if check_interval < 1: + raise ValueError(f"check_interval must be >= 1, got {check_interval}") + self._check_interval = check_interval + self._message_count = 0 + self._cgroup_version: Optional[int] = None + self._probed = False + + def _probe_cgroup(self) -> None: + """Detect which cgroup version (if any) is available. + + Called lazily on the first ``check_memory_usage()`` invocation so + that ``spec`` and ``discover`` commands never incur filesystem I/O. + """ + if self._probed: + return + self._probed = True + + if _CGROUP_V2_CURRENT.exists() and _CGROUP_V2_MAX.exists(): + self._cgroup_version = 2 + elif _CGROUP_V1_USAGE.exists() and _CGROUP_V1_LIMIT.exists(): + self._cgroup_version = 1 + + if self._cgroup_version is None: + logger.debug( + "No cgroup memory files found. Memory monitoring disabled (likely local dev / CI)." + ) + + def _read_memory(self) -> Optional[tuple[int, int]]: + """Read current memory usage and limit from cgroup files. + + Returns a tuple of (usage_bytes, limit_bytes) or None if unavailable. + Best-effort: failures to read memory info never crash a sync. + """ + if self._cgroup_version is None: + return None + + try: + if self._cgroup_version == 2: + usage_path = _CGROUP_V2_CURRENT + limit_path = _CGROUP_V2_MAX + else: + usage_path = _CGROUP_V1_USAGE + limit_path = _CGROUP_V1_LIMIT + + limit_text = limit_path.read_text().strip() + # cgroup v2 memory.max can be the literal string "max" (unlimited) + if limit_text == "max": + return None + + usage_bytes = int(usage_path.read_text().strip()) + limit_bytes = int(limit_text) + + if limit_bytes <= 0: + return None + + return usage_bytes, limit_bytes + except (OSError, ValueError): + logger.debug("Failed to read cgroup memory files; skipping memory check.") + return None + + def check_memory_usage(self) -> None: + """Check memory usage and log when above 90%. + + Intended to be called on every message. The monitor internally tracks + a message counter and only reads cgroup files every ``check_interval`` + messages (default 5000) to minimise I/O overhead. + + Logs a WARNING on every check above 90% to provide breadcrumb trails + showing memory trends over the sync lifetime. + + This method is a no-op if cgroup files are unavailable. + """ + self._probe_cgroup() + if self._cgroup_version is None: + return + + self._message_count += 1 + if self._message_count % self._check_interval != 0: + return + + memory_info = self._read_memory() + if memory_info is None: + return + + usage_bytes, limit_bytes = memory_info + usage_ratio = usage_bytes / limit_bytes + usage_percent = int(usage_ratio * 100) + usage_gb = usage_bytes / (1024**3) + limit_gb = limit_bytes / (1024**3) + + if usage_ratio >= _MEMORY_THRESHOLD: + logger.warning( + "Source memory usage at %d%% of container limit (%.2f / %.2f GB).", + usage_percent, + usage_gb, + limit_gb, + ) diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py new file mode 100644 index 000000000..cf8250465 --- /dev/null +++ b/unit_tests/utils/test_memory_monitor.py @@ -0,0 +1,247 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +import logging +from pathlib import Path +from unittest.mock import patch + +import pytest + +from airbyte_cdk.utils.memory_monitor import ( + _CGROUP_V1_LIMIT, + _CGROUP_V1_USAGE, + _CGROUP_V2_CURRENT, + _CGROUP_V2_MAX, + MemoryMonitor, +) + +_MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB +_MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB +_MOCK_LIMIT = "1000000000\n" # 1 GB + + +def _v2_exists(self: Path) -> bool: + return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) + + +def _v1_exists(self: Path) -> bool: + return self in (_CGROUP_V1_USAGE, _CGROUP_V1_LIMIT) + + +def _v2_mock_read(usage: str = _MOCK_USAGE_BELOW, limit: str = _MOCK_LIMIT): + """Return a mock_read_text function for cgroup v2 with the given usage/limit.""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return usage + if self == _CGROUP_V2_MAX: + return limit + return "" + + return mock_read_text + + +# --------------------------------------------------------------------------- +# __init__ — input validation +# --------------------------------------------------------------------------- + + +def test_check_interval_zero_raises() -> None: + """check_interval=0 should raise ValueError at construction time.""" + with pytest.raises(ValueError, match="check_interval must be >= 1"): + MemoryMonitor(check_interval=0) + + +def test_check_interval_negative_raises() -> None: + """Negative check_interval should raise ValueError at construction time.""" + with pytest.raises(ValueError, match="check_interval must be >= 1"): + MemoryMonitor(check_interval=-1) + + +# --------------------------------------------------------------------------- +# check_memory_usage — no-op paths +# --------------------------------------------------------------------------- + + +def test_noop_when_no_cgroup(caplog: pytest.LogCaptureFixture) -> None: + """check_memory_usage should be a no-op when cgroup is unavailable.""" + monitor = MemoryMonitor() + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", return_value=False), + ): + monitor.check_memory_usage() + assert not caplog.records + + +def test_noop_when_limit_is_max(caplog: pytest.LogCaptureFixture) -> None: + """When cgroup v2 memory.max is 'max' (unlimited), should be a no-op.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(limit="max\n")), + ): + monitor.check_memory_usage() + assert not caplog.records + + +def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None: + """When cgroup limit file contains '0', should be a no-op.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(limit="0\n")), + ): + monitor.check_memory_usage() + assert not caplog.records + + +# --------------------------------------------------------------------------- +# check_memory_usage — below threshold +# --------------------------------------------------------------------------- + + +def test_no_warning_below_threshold(caplog: pytest.LogCaptureFixture) -> None: + """No warning should be emitted when usage is below 90%.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_BELOW)), + ): + monitor.check_memory_usage() + assert not caplog.records + + +# --------------------------------------------------------------------------- +# check_memory_usage — at/above 90% threshold +# --------------------------------------------------------------------------- + + +def test_logs_at_90_percent(caplog: pytest.LogCaptureFixture) -> None: + """Warning log should be emitted at 91% usage (above 90% threshold).""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + ): + monitor.check_memory_usage() + + assert len(caplog.records) == 1 + assert "91%" in caplog.records[0].message + + +def test_logs_on_every_check_above_90_percent(caplog: pytest.LogCaptureFixture) -> None: + """Warning should be logged on EVERY check interval when above 90%, not just once.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + ): + monitor.check_memory_usage() + monitor.check_memory_usage() + monitor.check_memory_usage() + + # All three checks should produce a warning (no one-shot flag) + assert len(caplog.records) == 3 + for record in caplog.records: + assert "91%" in record.message + + +# --------------------------------------------------------------------------- +# check_memory_usage — cgroup v1 path +# --------------------------------------------------------------------------- + + +def test_cgroup_v1_emits_warning(caplog: pytest.LogCaptureFixture) -> None: + """Memory reading should work with cgroup v1 paths (proves v1 detection works).""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V1_USAGE: + return _MOCK_USAGE_AT_90 + if self == _CGROUP_V1_LIMIT: + return _MOCK_LIMIT + return "" + + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v1_exists), + patch.object(Path, "read_text", mock_read_text), + ): + monitor.check_memory_usage() + + assert len(caplog.records) == 1 + assert "91%" in caplog.records[0].message + + +# --------------------------------------------------------------------------- +# check_memory_usage — check interval +# --------------------------------------------------------------------------- + + +def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixture) -> None: + """Monitor should only check cgroup files every check_interval messages.""" + monitor = MemoryMonitor(check_interval=5000) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + ): + # First 4999 calls should be skipped + for _ in range(4999): + monitor.check_memory_usage() + assert not caplog.records + # Call 5000 should trigger the actual check + monitor.check_memory_usage() + assert len(caplog.records) == 1 + + +# --------------------------------------------------------------------------- +# check_memory_usage — graceful degradation +# --------------------------------------------------------------------------- + + +def test_malformed_cgroup_file_degrades_gracefully(caplog: pytest.LogCaptureFixture) -> None: + """Malformed cgroup files should not crash the sync.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", return_value="not_a_number\n"), + ): + monitor.check_memory_usage() + assert not caplog.records + + +def test_empty_cgroup_file_degrades_gracefully(caplog: pytest.LogCaptureFixture) -> None: + """Empty cgroup file content should not crash the sync.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", return_value=""), + ): + monitor.check_memory_usage() + assert not caplog.records + + +def test_os_error_degrades_gracefully(caplog: pytest.LogCaptureFixture) -> None: + """OSError reading cgroup files should not crash the sync.""" + + def mock_read_text(self: Path) -> str: + raise OSError("Permission denied") + + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): + monitor.check_memory_usage() + assert not caplog.records