Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
caa0c82
feat(cdk): add source-side memory introspection to emit controlled er…
devin-ai-integration[bot] Mar 9, 2026
8d059cf
style: fix ruff format and import sorting
devin-ai-integration[bot] Mar 9, 2026
b2e233f
fix(cdk): add error handling to _read_memory() for graceful degradation
devin-ai-integration[bot] Mar 9, 2026
ee93565
fix(cdk): wrap read() loop in try/finally to flush queued messages on…
devin-ai-integration[bot] Mar 9, 2026
cca6c50
refactor(cdk): encapsulate check interval inside MemoryMonitor
devin-ai-integration[bot] Mar 9, 2026
ec56943
refactor(cdk): move MemoryMonitor to AirbyteEntrypoint.__init__
devin-ai-integration[bot] Mar 9, 2026
7f7ab40
test(cdk): add comprehensive memory monitor tests and graceful shutdo…
devin-ai-integration[bot] Mar 9, 2026
a021eb7
fix(cdk): change MemoryLimitExceeded to system_error and update user-…
devin-ai-integration[bot] Mar 9, 2026
fc71aba
refactor(cdk): make DEFAULT_CHECK_INTERVAL private and drop circular …
devin-ai-integration[bot] Mar 9, 2026
5f7e16d
refactor(cdk): move memory check before yield, test observable behavi…
devin-ai-integration[bot] Mar 9, 2026
0485fa2
refactor(cdk): move memory check back to after yield for zero data loss
devin-ai-integration[bot] Mar 9, 2026
1621a39
style: fix ruff format in test_entrypoint.py
devin-ai-integration[bot] Mar 9, 2026
4dda57c
refactor(cdk): display memory usage in GB instead of raw bytes
devin-ai-integration[bot] Mar 10, 2026
c96825f
refactor(cdk): remove MemoryLimitExceeded subclass, raise AirbyteTrac…
devin-ai-integration[bot] Mar 10, 2026
cdc518c
fix(cdk): validate check_interval >= 1 to prevent ZeroDivisionError
devin-ai-integration[bot] Mar 10, 2026
bf9db5f
refactor(cdk): switch memory monitor to logging-only trial mode
devin-ai-integration[bot] Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from airbyte_cdk.utils import is_cloud_environment, message_utils
from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets
from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH
from airbyte_cdk.utils.memory_monitor import MemoryMonitor
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

logger = init_logger("airbyte")
Expand All @@ -60,6 +61,7 @@ def __init__(self, source: Source):

self.source = source
self.logger = logging.getLogger(f"airbyte.{getattr(source, 'name', '')}")
self._memory_monitor = MemoryMonitor()

@staticmethod
def parse_args(args: List[str]) -> argparse.Namespace:
Expand Down Expand Up @@ -279,6 +281,7 @@ def read(
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
self._memory_monitor.check_memory_usage()
for message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(message, stream_message_counter)

Expand Down
140 changes: 140 additions & 0 deletions airbyte_cdk/utils/memory_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#

"""Source-side memory introspection to log memory usage approaching container limits."""

import logging
from pathlib import Path
from typing import Optional

logger = logging.getLogger("airbyte")

# cgroup v2 paths
_CGROUP_V2_CURRENT = Path("/sys/fs/cgroup/memory.current")
_CGROUP_V2_MAX = Path("/sys/fs/cgroup/memory.max")

# cgroup v1 paths — TODO: remove if all deployments are confirmed cgroup v2
_CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
_CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes")

# Log when usage is at or above 90%
_MEMORY_THRESHOLD = 0.90

# Check interval (every N messages)
_DEFAULT_CHECK_INTERVAL = 5000


class MemoryMonitor:
"""Monitors container memory usage via cgroup files and logs warnings when usage is high.

Lazily probes cgroup v2 then v1 files on the first call to
``check_memory_usage()``. Caches which version exists.
If neither is found (local dev / CI), all subsequent calls are instant no-ops.

Logs a WARNING on every check interval (default 5000 messages) when memory
usage is at or above 90% of the container limit. This gives breadcrumb
trails showing whether memory is climbing, plateauing, or sawtoothing.
"""

def __init__(
self,
check_interval: int = _DEFAULT_CHECK_INTERVAL,
) -> None:
if check_interval < 1:
raise ValueError(f"check_interval must be >= 1, got {check_interval}")
self._check_interval = check_interval
self._message_count = 0
self._cgroup_version: Optional[int] = None
self._probed = False

def _probe_cgroup(self) -> None:
"""Detect which cgroup version (if any) is available.

Called lazily on the first ``check_memory_usage()`` invocation so
that ``spec`` and ``discover`` commands never incur filesystem I/O.
"""
if self._probed:
return
self._probed = True

if _CGROUP_V2_CURRENT.exists() and _CGROUP_V2_MAX.exists():
self._cgroup_version = 2
elif _CGROUP_V1_USAGE.exists() and _CGROUP_V1_LIMIT.exists():
self._cgroup_version = 1

if self._cgroup_version is None:
logger.debug(
"No cgroup memory files found. Memory monitoring disabled (likely local dev / CI)."
)

def _read_memory(self) -> Optional[tuple[int, int]]:
"""Read current memory usage and limit from cgroup files.

Returns a tuple of (usage_bytes, limit_bytes) or None if unavailable.
Best-effort: failures to read memory info never crash a sync.
"""
if self._cgroup_version is None:
return None

try:
if self._cgroup_version == 2:
usage_path = _CGROUP_V2_CURRENT
limit_path = _CGROUP_V2_MAX
else:
usage_path = _CGROUP_V1_USAGE
limit_path = _CGROUP_V1_LIMIT

limit_text = limit_path.read_text().strip()
# cgroup v2 memory.max can be the literal string "max" (unlimited)
if limit_text == "max":
return None

usage_bytes = int(usage_path.read_text().strip())
limit_bytes = int(limit_text)

if limit_bytes <= 0:
return None

return usage_bytes, limit_bytes
except (OSError, ValueError):
logger.debug("Failed to read cgroup memory files; skipping memory check.")
return None

def check_memory_usage(self) -> None:
"""Check memory usage and log when above 90%.

Intended to be called on every message. The monitor internally tracks
a message counter and only reads cgroup files every ``check_interval``
messages (default 5000) to minimise I/O overhead.

Logs a WARNING on every check above 90% to provide breadcrumb trails
showing memory trends over the sync lifetime.

This method is a no-op if cgroup files are unavailable.
"""
self._probe_cgroup()
if self._cgroup_version is None:
return

self._message_count += 1
if self._message_count % self._check_interval != 0:
return
Comment thread
pnilan marked this conversation as resolved.

memory_info = self._read_memory()
if memory_info is None:
return

usage_bytes, limit_bytes = memory_info
usage_ratio = usage_bytes / limit_bytes
usage_percent = int(usage_ratio * 100)
usage_gb = usage_bytes / (1024**3)
limit_gb = limit_bytes / (1024**3)

if usage_ratio >= _MEMORY_THRESHOLD:
logger.warning(
"Source memory usage at %d%% of container limit (%.2f / %.2f GB).",
usage_percent,
usage_gb,
limit_gb,
)
Loading
Loading