feat(cloud): Add ability to view Cloud sync logs, including SyncAttempt abstraction for fetching job attempts#781
Conversation
Co-Authored-By: AJ Steers <aj@airbyte.io>
Original prompt from AJ Steers |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1757543890-sync-attempt-abstraction' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1757543890-sync-attempt-abstraction'Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
Community SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
- Add SyncAttempt dataclass with Config API integration - Add get_attempts() method to SyncResult class - Follow existing PyAirbyte patterns for lazy loading and API access - Use _make_config_api_request helper for Config API calls - Include proper timezone handling for datetime fields - Export SyncAttempt in __all__ list Basic implementation complete - ready for log operations in next phase Co-Authored-By: AJ Steers <aj@airbyte.io>
📝 WalkthroughWalkthroughAdds a new public SyncAttempt class and per-attempt lazy-loading in SyncResult; exposes attempt-level properties and logs. Updates MCP cloud ops to return structured sync status, optionally include attempts, and provide a new per-attempt logs tool. Improves config API error handling with richer context. Adds docstring examples for testing tools. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Caller
participant SyncResult
participant CloudAPI as Config API
participant SyncAttempt
Caller->>SyncResult: get_attempts()
alt attempts not cached
SyncResult->>CloudAPI: GET /jobs/get (job with attempts)
CloudAPI-->>SyncResult: job payload incl. attempts
SyncResult->>SyncResult: cache _job_with_attempts_info
end
SyncResult-->>Caller: [SyncAttempt...]
Caller->>SyncAttempt: get_full_log_text()
alt logs available
SyncAttempt->>SyncAttempt: reconstruct per-attempt logs
SyncAttempt-->>Caller: concatenated text
else no logs
SyncAttempt-->>Caller: ""
end
sequenceDiagram
autonumber
actor Client
participant MCP as MCP Cloud Ops
participant Cloud as Airbyte Cloud
Client->>MCP: get_cloud_sync_status(connection_id, job_id?, include_attempts?)
MCP->>Cloud: fetch latest sync (and attempts if requested)
Cloud-->>MCP: job summary (+ attempts)
MCP-->>Client: {status, job_id, bytes, records, start_time, job_url, attempts?}
Client->>MCP: get_cloud_sync_logs(connection_id, job_id?, attempt_number?)
MCP->>Cloud: fetch job (+ attempts/logs)
Cloud-->>MCP: job/attempt data
alt valid attempt
MCP-->>Client: log text
else missing/invalid
MCP-->>Client: error string
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Questions:
Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
airbyte/cloud/sync_results.py (3)
148-151: Type: consider enum for attempt status.If there’s an
AttemptStatusEnum(or similar) inapi_imports, should we return that instead ofstrfor stronger typing, wdyt?
169-183: Error handling: surface a helpful exception when attempt isn’t found.If the API returns 200 but the attempt doesn’t exist, we’ll KeyError later. Would you add a check and raise an
AirbyteError(orPyAirbyteInputError) with context, wdyt?Example:
- self._attempt_info = api_util.get_attempt_for_job( + resp = api_util.get_attempt_for_job( api_root=self.workspace.api_root, @@ - ) - return self._attempt_info + ) + attempt = resp.get("attempt", None) + if attempt is None: + raise RuntimeError(f"No attempt found for job_id={self.job_id}, attempt_number={self.attempt_number}") + self._attempt_info = attempt + return attempt
270-280: Stubbed behavior:get_attempts()returns only attempt 0.Since the docstring says “list of attempts,” should we either (a) update the docstring to note partial support or (b) fetch all attempts and build the list now, wdyt?
Sketch (if API supports listing all):
- return [ - SyncAttempt( - workspace=self.workspace, - connection=self.connection, - job_id=self.job_id, - attempt_number=0, - ) - ] + # TODO: Fetch attempts count or IDs from Config API and enumerate. + return [SyncAttempt(self.workspace, self.connection, self.job_id, 0)]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
poetry.lockis excluded by!**/*.lock
📒 Files selected for processing (1)
airbyte/cloud/sync_results.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/cloud/sync_results.py (3)
airbyte/cloud/workspaces.py (1)
CloudWorkspace(56-444)airbyte/cloud/connections.py (1)
CloudConnection(19-245)airbyte/_util/api_util.py (1)
_make_config_api_request(779-816)
🪛 GitHub Actions: Run Linters
airbyte/cloud/sync_results.py
[error] 173-173: poetry run ruff check failed: 1 error. SLF001 Private member accessed: _make_config_api_request.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (2)
airbyte/cloud/sync_results.py (2)
427-428: Export update looks good.Adding
"SyncAttempt"to__all__matches the new public surface. LGTM.
142-161: Defensive parsing for attempt payload (airbyte/cloud/sync_results.py:142-161)
- Could we guard against schema drift by replacing direct indexing with a safe lookup? Example:
- return self._fetch_attempt_info()["attempt"]["id"] + info = self._fetch_attempt_info() + attempt = info.get("attempt", info) + return attempt["id"]Repeat the same pattern for status, bytes_synced, and records_synced — wdyt?
- FYI: the correct Config API endpoint is POST /api/v1/attempt/get_for_job (singular). The Attempt schema includes id, createdAt (int64), bytesSynced and recordsSynced (int64). Can you confirm whether createdAt is epoch milliseconds vs seconds? wdyt?
- Add _fetch_job_with_attempts() method to get job info with attempts from Config API - Use /v1/jobs/get endpoint to retrieve JobInfoRead with attempts array - Fix get_attempts() to return SyncAttempt objects for all attempts (0 to num_attempts-1) - Add _job_with_attempts_info field to SyncResult dataclass for caching - Addresses GitHub comment about returning all attempts instead of just attempt 0 Co-Authored-By: AJ Steers <aj@airbyte.io>
- Import timezone from datetime module to fix 'type[datetime] has no attribute timezone' error - Resolves MyPy Check CI failure in SyncAttempt.created_at property Co-Authored-By: AJ Steers <aj@airbyte.io>
- Add noqa comments for _make_config_api_request usage - This follows established pattern in PyAirbyte codebase for Config API access - Resolves Ruff Lint Check CI failure Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
airbyte/cloud/sync_results.py (2)
106-106: Timezone import fix looks good.Importing
timezoneand usingtz=timezone.utcresolves the earlier NameError, nice catch—this aligns with the previous feedback. Wdyt?
173-183: Blocker: avoid calling private_make_config_api_request(ruff SLF001) here.This will fail linting and couples us to a private helper. Can we switch to a public wrapper as below, wdyt?
- self._attempt_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper - api_root=self.workspace.api_root, - path="/v1/attempts/get_for_job", - json={ - "jobId": self.job_id, - "attemptNumber": self.attempt_number, - }, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + self._attempt_info = api_util.get_attempt_for_job( + api_root=self.workspace.api_root, + job_id=self.job_id, + attempt_number=self.attempt_number, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + )Add this public wrapper in
airbyte/_util/api_util.py:def get_attempt_for_job( *, api_root: str, job_id: int, attempt_number: int, client_id: SecretString, client_secret: SecretString ) -> dict[str, Any]: return _make_config_api_request( api_root=api_root, path="/v1/attempts/get_for_job", json={"jobId": job_id, "attemptNumber": attempt_number}, client_id=client_id, client_secret=client_secret, )
🧹 Nitpick comments (2)
airbyte/cloud/sync_results.py (2)
153-161: Guard againstNonefrom the API for bytes/records.Airbyte may return
nullfor these;.get(..., 0)won’t cover the “present but null” case. Shall we coerce viaor 0, wdyt?- return self._fetch_attempt_info()["attempt"].get("bytesSynced", 0) + return self._fetch_attempt_info()["attempt"].get("bytesSynced") or 0 @@ - return self._fetch_attempt_info()["attempt"].get("recordsSynced", 0) + return self._fetch_attempt_info()["attempt"].get("recordsSynced") or 0
287-301: Pre-hydrateSyncAttemptto avoid an extra API call per attempt.Since
/v1/jobs/getalready returns attempt payloads, we can seed_attempt_infoto keepattempt_id/status/...access zero-cost. Also, returningrange(len(attempts_data))satisfies the 0..n-1 indexing noted by Aaron ("AJ") Steers (@aaronsteers)—nice. Shall we hydrate as below, wdyt?- attempts_data = job_with_attempts.get("attempts", []) - - return [ - SyncAttempt( - workspace=self.workspace, - connection=self.connection, - job_id=self.job_id, - attempt_number=i, - ) - for i in range(len(attempts_data)) - ] + attempts_data = job_with_attempts.get("attempts", []) + return [ + SyncAttempt( + workspace=self.workspace, + connection=self.connection, + job_id=self.job_id, + attempt_number=i, + _attempt_info={"attempt": attempt}, # pre-hydrate to avoid another API call + ) + for i, attempt in enumerate(attempts_data) + ]If you prefer to keep lazy-loading, we could at least document that property access triggers one fetch per attempt.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/cloud/sync_results.py(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/cloud/sync_results.py (3)
airbyte/cloud/workspaces.py (1)
CloudWorkspace(56-444)airbyte/cloud/connections.py (1)
CloudConnection(19-245)airbyte/_util/api_util.py (1)
_make_config_api_request(779-816)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (2)
airbyte/cloud/sync_results.py (2)
163-167: UTC timestamp conversion looks correct.Dividing ms by 1000 and using
timezone.utcis solid. Wdyt?
446-449: Export surface update looks good.Adding
"SyncAttempt"to__all__is consistent with the new public type. Wdyt?
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
airbyte/cloud/sync_results.py (2)
175-184: Replace private Config API call with a public wrapper (ruff SLF001 and coupling).Can we switch to a public helper to satisfy the linter and reduce coupling, wdyt?
- self._attempt_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper - api_root=self.workspace.api_root, - path="/v1/attempts/get_for_job", - json={ - "jobId": self.job_id, - "attemptNumber": self.attempt_number, - }, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + self._attempt_info = api_util.get_attempt_for_job( + api_root=self.workspace.api_root, + job_id=self.job_id, + attempt_number=self.attempt_number, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + )Add in
airbyte/_util/api_util.py(outside this diff):def get_attempt_for_job( *, api_root: str, job_id: int, attempt_number: int, client_id: SecretString, client_secret: SecretString ) -> dict[str, Any]: return _make_config_api_request( api_root=api_root, path="/v1/attempts/get_for_job", json={"jobId": job_id, "attemptNumber": attempt_number}, client_id=client_id, client_secret=client_secret, )#!/bin/bash # Verify we no longer call the private helper directly in cloud code. rg -n -C1 -P 'api_util\._make_config_api_request\s*\(' airbyte | sed -n '1,200p'
273-288: Same: avoid private_make_config_api_requestfor job-with-attempts.Shall we expose a
get_job_with_attempts(...)wrapper and use it here? Also, consider aforce_refresh: bool = Falseto bypass cache while jobs are running, wdyt?- self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper - api_root=self.workspace.api_root, - path="/v1/jobs/get", - json={ - "id": self.job_id, - }, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + self._job_with_attempts_info = api_util.get_job_with_attempts( + api_root=self.workspace.api_root, + job_id=self.job_id, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + )Add in
airbyte/_util/api_util.py:def get_job_with_attempts( *, api_root: str, job_id: int, client_id: SecretString, client_secret: SecretString ) -> dict[str, Any]: return _make_config_api_request( api_root=api_root, path="/v1/jobs/get", json={"id": job_id}, client_id=client_id, client_secret=client_secret, )
🧹 Nitpick comments (2)
airbyte/cloud/sync_results.py (2)
128-141: SyncAttempt dataclass looks clean.Minor: do we really need to keep a strong reference to
CloudConnectionhere, or wouldworkspace+job_idbe sufficient to avoid a heavier object graph? Happy to keep as-is if it simplifies usage, wdyt?
289-303: Reduce N extra API calls by hydrating attempts from the job payload.Since
/v1/jobs/getalready returns attempts, want to seedSyncAttempt._attempt_infoto avoid one request per attempt and deriveattempt_numberfrom payload when present, wdyt?- return [ - SyncAttempt( - workspace=self.workspace, - connection=self.connection, - job_id=self.job_id, - attempt_number=i, - ) - for i in range(len(attempts_data)) - ] + attempts: list[SyncAttempt] = [] + for i, attempt in enumerate(attempts_data): + # attempt may be a dict with nested "attempt" or the attempt itself + cached = attempt if isinstance(attempt, dict) and "attempt" in attempt else ( + {"attempt": attempt} if isinstance(attempt, dict) else None + ) + number = ( + attempt.get("attempt", {}).get("attemptNumber", i) + if isinstance(attempt, dict) else i + ) + attempts.append( + SyncAttempt( + workspace=self.workspace, + connection=self.connection, + job_id=self.job_id, + attempt_number=number, + _attempt_info=cached, + ) + ) + return attempts
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/cloud/sync_results.py(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/cloud/sync_results.py (3)
airbyte/cloud/workspaces.py (1)
CloudWorkspace(56-444)airbyte/cloud/connections.py (1)
CloudConnection(19-245)airbyte/_util/api_util.py (1)
_make_config_api_request(779-816)
🪛 GitHub Actions: Run Linters
airbyte/cloud/sync_results.py
[error] 171-171: Command 'poetry run ruff check .' failed: Ruff TD003 Missing issue link on the line following this TODO.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte/cloud/sync_results.py (4)
106-106: Good fix: timezone import is correct now.
datetime.fromtimestamp(..., tz=timezone.utc)will work reliably.
204-204: LGTM on caching container for job+attempts.The private field makes sense for lazy job/attempt hydration.
448-451: Export looks good.Publicly exposing
SyncAttemptis consistent with the new API surface.
171-173: Fix Ruff TD003 on TODO (pipeline is failing).Add an issue/PR reference on the next line (or disable the rule inline) to unblock CI, wdyt?
- # TODO: Additionally check if status is of the completed type. - # If not, we should get the latest. (Unless all properties are immutable anyway.) + # TODO: Refresh cache when attempt status is non-final. # noqa: TD003 + # Refs: https://github.com/airbytehq/PyAirbyte/pull/781⛔ Skipped due to learnings
Learnt from: aaronsteers PR: airbytehq/PyAirbyte#411 File: airbyte/cli.py:26-26 Timestamp: 2024-10-06T17:43:09.319Z Learning: In the PyAirbyte project, when reviewing Python code and encountering a TODO comment without an issue link, I should post a friendly reminder to resolve it before merging, instead of suggesting to add an issue link.Learnt from: aaronsteers PR: airbytehq/PyAirbyte#411 File: airbyte/cli.py:26-26 Timestamp: 2024-10-11T22:05:15.550Z Learning: In the PyAirbyte project, when reviewing Python code and encountering a TODO comment without an issue link, I should post a friendly reminder to resolve it before merging, instead of suggesting to add an issue link.
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
…atus with attempts info - Add new get_cloud_sync_logs() MCP tool for retrieving sync attempt logs - Enhance get_cloud_sync_status() to include attempts information with include_attempts parameter - Both tools leverage existing SyncResult.get_attempts() and SyncAttempt.get_full_log_text() methods - Follow existing MCP patterns with proper error handling and FastMCP integration - Register new tool in register_cloud_ops_tools() function Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (3)
airbyte/cloud/sync_results.py (3)
162-167: Fix timestamp unit handling increated_at(seconds vs. milliseconds).Config API timestamps are seconds; dividing by 1000 yields incorrect datetimes. Could we guard for both units?
Apply this diff?
- timestamp = self._fetch_attempt_info()["attempt"]["createdAt"] - return datetime.fromtimestamp(timestamp / 1000, tz=timezone.utc) + ts = self._fetch_attempt_info()["attempt"]["createdAt"] + # Accept seconds (Config API default) and milliseconds. + if isinstance(ts, (int, float)) and ts > 10**12: + ts = ts / 1000 + return datetime.fromtimestamp(ts, tz=timezone.utc)Wdyt?
175-185: Stop calling private_make_config_api_request; add/use a public wrapper.Accessing a private helper (even with
# noqa: SLF001) couples us to internals and was previously flagged in CI. Shall we switch to a small public wrapper?Apply this diff here:
- self._attempt_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper - api_root=self.workspace.api_root, - path="/v1/attempts/get_for_job", - json={ - "jobId": self.job_id, - "attemptNumber": self.attempt_number, - }, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + self._attempt_info = api_util.get_attempt_for_job( + api_root=self.workspace.api_root, + job_id=self.job_id, + attempt_number=self.attempt_number, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + )Add this wrapper (outside this file):
# airbyte/_util/api_util.py def get_attempt_for_job( *, api_root: str, job_id: int, attempt_number: int, client_id: SecretString, client_secret: SecretString ) -> dict[str, Any]: return _make_config_api_request( api_root=api_root, path="/v1/attempts/get_for_job", json={"jobId": job_id, "attemptNumber": attempt_number}, client_id=client_id, client_secret=client_secret, )We could also add
force_refresh: bool = Falseto your fetcher if you want a simple bypass while jobs are running, wdyt?
315-330: Likewise, avoid private Config API here; addget_job_with_attempts(...).Can we switch to a public wrapper to reduce coupling and drop the
# noqa?Apply this diff:
- self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper - api_root=self.workspace.api_root, - path="/v1/jobs/get", - json={ - "id": self.job_id, - }, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + self._job_with_attempts_info = api_util.get_job_with_attempts( + api_root=self.workspace.api_root, + job_id=self.job_id, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + )Add this helper (outside this file):
# airbyte/_util/api_util.py def get_job_with_attempts( *, api_root: str, job_id: int, client_id: SecretString, client_secret: SecretString ) -> dict[str, Any]: return _make_config_api_request( api_root=api_root, path="/v1/jobs/get", json={"id": job_id}, client_id=client_id, client_secret=client_secret, )Do you also want a
force_refresh: bool = Falsetoggle here to bypass the cache while a job is active, wdyt?
🧹 Nitpick comments (2)
airbyte/cloud/sync_results.py (2)
187-221: Optional: format numeric event timestamps as ISO 8601 in logs.If
events[].timestampis numeric, printing raw values hurts readability. Want to normalize to RFC3339?Apply this diff?
- for event in log_events: - timestamp = event.get("timestamp", "") + for event in log_events: + ts = event.get("timestamp", "") + if isinstance(ts, (int, float)): + if ts > 10**12: + ts = ts / 1000 + timestamp = datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() + else: + timestamp = str(ts) level = event.get("level", "INFO") message = event.get("message", "") log_lines.append(f"[{timestamp}] {level}: {message}")Wdyt?
331-345: Verify attempt indexing aligns with API ordering.We index attempts by list position; does the API guarantee list order ==
attemptNumber? If not, shall we readattempt.attemptNumberwhen present?Proposed change:
- return [ - SyncAttempt( - workspace=self.workspace, - connection=self.connection, - job_id=self.job_id, - attempt_number=i, - ) - for i in range(len(attempts_data)) - ] + attempts: list[SyncAttempt] = [] + for i, a in enumerate(attempts_data): + num = a.get("attempt", {}).get("attemptNumber", i) + attempts.append( + SyncAttempt( + workspace=self.workspace, + connection=self.connection, + job_id=self.job_id, + attempt_number=num, + ) + ) + return attemptsWdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/cloud/sync_results.py(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/cloud/sync_results.py (3)
airbyte/cloud/workspaces.py (1)
CloudWorkspace(57-454)airbyte/cloud/connections.py (1)
CloudConnection(19-252)airbyte/_util/api_util.py (1)
_make_config_api_request(807-844)
🪛 GitHub Actions: Run Linters
airbyte/cloud/sync_results.py
[error] 171-171: Ruff TD003: Missing issue link on the line following this TODO. (Command: poetry run ruff check .)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte/cloud/sync_results.py (4)
106-106: Good fix: importtimezonealongsidedatetime.This resolves the earlier undefined
datetime.timezoneusage. Nice catch.
239-239: LGTM: add_job_with_attempts_infocache slot.This aligns with your lazy-loading pattern.
492-493: ExportingSyncAttemptis good.Public API looks consistent with the new feature surface.
171-173: Unblock CI: add issue links for TODOs (ruff TD003)Ruff TD003 failing — three TODOs in airbyte/cloud/sync_results.py lack issue links: lines 171, 248, 433.
Proposed minimal fix (apply under each TODO):
- # TODO: Additionally check if status is of the completed type. + # TODO: Additionally check if status is of the completed type. + # See: https://github.com/airbytehq/PyAirbyte/issues/NNNNWant me to open the issues and drop the links for you, or will you resolve/add links yourself? wdyt?
⛔ Skipped due to learnings
Learnt from: aaronsteers PR: airbytehq/PyAirbyte#411 File: airbyte/cli.py:26-26 Timestamp: 2024-10-06T17:43:09.319Z Learning: In the PyAirbyte project, when reviewing Python code and encountering a TODO comment without an issue link, I should post a friendly reminder to resolve it before merging, instead of suggesting to add an issue link.Learnt from: aaronsteers PR: airbytehq/PyAirbyte#411 File: airbyte/cli.py:26-26 Timestamp: 2024-10-11T22:05:15.550Z Learning: In the PyAirbyte project, when reviewing Python code and encountering a TODO comment without an issue link, I should post a friendly reminder to resolve it before merging, instead of suggesting to add an issue link.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
airbyte/mcp/_cloud_ops.py (4)
4-4: Prefer a typed response overAnyfor tool I/OWould you consider defining
TypedDicts (e.g.,SyncStatusandAttemptInfo) instead of returningdict[str, Any], so MCP tool consumers get a stable, documented shape, wdyt?
321-323: Stabilize response shape when no sync foundReturning only three keys can break consumers expecting the full shape. Can we return all fields with
Nonedefaults for consistency, wdyt?Apply this diff:
- return {"status": None, "job_id": None, "attempts": []} + return { + "status": None, + "job_id": None, + "bytes_synced": None, + "records_synced": None, + "start_time": None, + "job_url": None, + "attempts": [], + }
383-444: Fix linter failure TRY300: move final return toelseRuff flags the
return logsinsidetry. Shall we move the terminal return to atry/except/elseto satisfy TRY300, wdyt?Apply this diff:
def get_cloud_sync_logs( @@ - logs = target_attempt.get_full_log_text() - - if not logs: - return ( - f"No logs available for job '{sync_result.job_id}', " - f"attempt {target_attempt.attempt_number}" - ) - - return logs - - except Exception as ex: - return f"Failed to get logs for connection '{connection_id}': {ex}" + logs = target_attempt.get_full_log_text() + except Exception as ex: + return f"Failed to get logs for connection '{connection_id}': {ex}" + else: + if not logs: + return ( + f"No logs available for job '{sync_result.job_id}', " + f"attempt {target_attempt.attempt_number}" + ) + return logs
427-429: Improve error message with available attemptsWould including the available attempt numbers help debuggability, wdyt?
Apply this diff:
- if target_attempt is None: - return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'" + if target_attempt is None: + available = sorted(a.attempt_number for a in attempts) + return ( + f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'. " + f"Available attempts: {available}" + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/mcp/_cloud_ops.py(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/mcp/_cloud_ops.py (4)
airbyte/cloud/workspaces.py (2)
CloudWorkspace(57-454)get_connection(98-110)airbyte/_util/api_util.py (1)
get_connection(298-325)airbyte/cloud/sync_results.py (13)
SyncResult(224-487)get_job_status(282-284)bytes_synced(153-155)bytes_synced(300-302)records_synced(158-160)records_synced(305-307)start_time(310-313)job_url(242-251)get_attempts(331-344)attempt_id(143-145)status(148-150)created_at(163-166)get_full_log_text(187-220)airbyte/cloud/connections.py (1)
get_sync_result(205-230)
🪛 GitHub Actions: Run Linters
airbyte/mcp/_cloud_ops.py
[error] 440-440: poetry run ruff check: TRY300 Consider moving this statement to an else block.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (3)
airbyte/mcp/_cloud_ops.py (3)
302-307: API/behavior change: new parameter and return-shape shiftThis introduces
include_attemptsand returns a structured dict (not the previous enum/primitive). Do you want to (a) keep the prior function as a thin wrapper for backward compat or (b) rename this to a new tool name to avoid breaking existing callers, wdyt?
324-331: Ensure JSON-serializable fieldsIf
get_job_status()returns an enum, some clients may prefer a string. Do you want to serialize viastr(status)or.valueto avoid enum leakage, wdyt?
458-468: LGTM on tool registrationRegistering
get_cloud_sync_logsalongside status looks good and cohesive.
… statement - Fix missing return statement in get_cloud_sync_status() when include_attempts=True - Address CodeRabbit comments about function return flow - Maintain backward compatibility with include_attempts=False default Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte/mcp/_cloud_ops.py (1)
290-307: Potential breaking change: get_cloud_sync_status return type alteredChanging the public tool from
JobStatusEnum | Nonetodict[str, Any]can break existing callers. Could we either: (a) add a new tool (e.g.,get_cloud_sync_overview) and keep the oldget_cloud_sync_statusbehavior, or (b) keepget_cloud_sync_statusand add a separate wrapper returning only the enum for backward compatibility, at least for one release? Also, would a TypedDict/Pydantic model help keep the response shape stable? wdyt?
♻️ Duplicate comments (1)
airbyte/mcp/_cloud_ops.py (1)
333-349: Follow‑up: missing return oninclude_attempts=Trueis fixedThe unconditional
return resultresolves the earlier fallthrough. Thanks for addressing it. wdyt about also making the response shape consistent (see next comment)?
🧹 Nitpick comments (4)
airbyte/mcp/_cloud_ops.py (4)
321-323: Stabilize response shape for “no job” and error casesThe keys differ across success, “no job”, and exception cases. Shall we return a consistent schema with null/zero defaults to simplify clients? wdyt?
- return {"status": None, "job_id": None, "attempts": []} + return { + "status": None, + "job_id": None, + "bytes_synced": 0, + "records_synced": 0, + "start_time": None, + "job_url": None, + "attempts": [], + } @@ - return { - "status": None, - "job_id": job_id, - "error": f"Failed to get sync status for connection '{connection_id}': {ex}", - "attempts": [], - } + return { + "status": None, + "job_id": job_id, + "bytes_synced": 0, + "records_synced": 0, + "start_time": None, + "job_url": None, + "error": f"Failed to get sync status for connection '{connection_id}': {ex}", + "attempts": [], + }Also applies to: 351-357
337-343: Defensive None-handling for attempt timestamps and countersIf
created_atis missing,.isoformat()will raise. Shall we guard and also default counts to 0? wdyt?- "bytes_synced": attempt.bytes_synced, - "records_synced": attempt.records_synced, - "created_at": attempt.created_at.isoformat(), + "bytes_synced": (attempt.bytes_synced or 0), + "records_synced": (attempt.records_synced or 0), + "created_at": attempt.created_at.isoformat() if getattr(attempt, "created_at", None) else None,
468-468: Tool registration: consider gating new APIs while WIPSince this PR is draft/“do not merge,” do we want to gate
get_cloud_sync_logs(and the structuredget_cloud_sync_statusif kept) behind a feature flag or keep them unregistered until SyncAttempt lands fully? wdyt?
290-351: Optional: define a TypedDict/Pydantic model for the status payloadWould you like to introduce a
TypedDict(or Pydantic model) for the response to lock schema and aid downstream typing? wdyt?Example (outside this hunk):
from typing import TypedDict class CloudSyncStatus(TypedDict, total=False): status: str | None job_id: int | None bytes_synced: int records_synced: int start_time: str | None job_url: str | None attempts: list[dict] error: str | None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/mcp/_cloud_ops.py(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/mcp/_cloud_ops.py (3)
airbyte/cloud/workspaces.py (1)
get_connection(98-110)airbyte/cloud/sync_results.py (9)
SyncResult(224-487)bytes_synced(153-155)bytes_synced(300-302)records_synced(158-160)records_synced(305-307)start_time(310-313)job_url(242-251)get_attempts(331-344)get_full_log_text(187-220)airbyte/cloud/connections.py (1)
get_sync_result(205-230)
🪛 GitHub Actions: Run Linters
airbyte/mcp/_cloud_ops.py
[error] 349-349: TRY300 Consider moving this statement to an 'else' block. (Command: poetry run ruff check .)
[error] 441-441: TRY300 Consider moving this statement to an 'else' block. (Command: poetry run ruff check .)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte/mcp/_cloud_ops.py (1)
4-4: Import update looks goodAdding
Anyis appropriate for the structured payload return.
- Remove TODO comment that was causing TD003 lint error - Fix whitespace issues in blank lines - MyPy Check now passes (CodeRabbit feedback addressed) - 2 TRY300 warnings remain but may be acceptable in codebase Co-Authored-By: AJ Steers <aj@airbyte.io>
…ng-mcp' into devin/1757543890-sync-attempt-abstraction
…ab_datetime_parse - Replace datetime.fromtimestamp() with ab_datetime_parse() in SyncAttempt.created_at - Replace datetime.fromisoformat() with ab_datetime_parse() in SyncResult.start_time - Use ab_datetime_parse from airbyte_cdk.utils.datetime_helpers for robust ISO datetime parsing - Add usage examples to sample script with connection ID that has sync data - Fixes 'Invalid isoformat string: 2025-03-25T17:06:19Z' error by using standard datetime utility Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (4)
airbyte/_util/meta.py (1)
85-99: Order MCP/CI checks before notebook checks inis_interactive().As noted earlier, MCP inside notebooks should still be non‑interactive. Shall we reorder to short‑circuit MCP/CI first, wdyt?
- if is_colab() or is_jupyter(): - return True - - if is_ci() or is_mcp_mode(): - return False + if is_ci() or is_mcp_mode(): + return False + + if is_colab() or is_jupyter(): + return Trueairbyte/cloud/sync_results.py (2)
172-188: Stop calling private_make_config_api_request(ruff SLF001) — add public wrapper.This line still triggers CI. Shall we switch to a public helper and remove the inline noqa, wdyt?
- self._attempt_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper - api_root=self.workspace.api_root, - path="/v1/attempts/get_for_job", - json={ - "jobId": self.job_id, - "attemptNumber": self.attempt_number, - }, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + self._attempt_info = api_util.get_attempt_for_job( + api_root=self.workspace.api_root, + job_id=self.job_id, + attempt_number=self.attempt_number, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + )And similarly below for jobs (see next comment) plus add the two tiny wrappers in
airbyte/_util/api_util.py(shown after). Wdyt?
317-332: Same: replace private Config API call for job+attempts with public wrapper.- self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper - api_root=self.workspace.api_root, - path="/v1/jobs/get", - json={ - "id": self.job_id, - }, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + self._job_with_attempts_info = api_util.get_job_with_attempts( + api_root=self.workspace.api_root, + job_id=self.job_id, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + )airbyte/mcp/_util.py (1)
40-47: Duplicate.envrcload and wrong precedence — remove this block.Loading
.envrcbefore an explicit dotenv (AIRBYTE_MCP_ENV_FILE) means the explicit file cannot override (python‑dotenv’s defaultoverride=False). Also, this block is duplicated below. Can we drop this early block to ensure explicit config wins, wdyt?- envrc_path = Path.cwd() / ".envrc" - if envrc_path.exists(): - envrc_secret_mgr = DotenvSecretManager(envrc_path) - _load_dotenv_file(envrc_path) - register_secret_manager( - envrc_secret_mgr, - )
🧹 Nitpick comments (4)
airbyte/_util/meta.py (1)
49-60: Fix ruff SIM103 and simplifyis_mcp_mode()return.Current tail
return Falsetriggers SIM103. Could we return the condition directly and keep short‑circuit semantics, wdyt?-def is_mcp_mode() -> bool: - """Return True if running in MCP (Model Context Protocol) mode.""" - if _MCP_MODE_ENABLED: - return True - - script_name = get_python_script_name() - if script_name and "airbyte-mcp" in script_name: - return True - - # Else, we are not running in an MCP context. - return False +def is_mcp_mode() -> bool: + """Return True if running in MCP (Model Context Protocol) mode.""" + script_name = get_python_script_name() + return bool(_MCP_MODE_ENABLED or (script_name and "airbyte-mcp" in script_name))airbyte/mcp/_util.py (1)
56-63: Register.envrcas a fallback, and optionally gate behind opt‑in.To avoid surprising overrides and align with principle “explicit beats implicit”, could we make
.envrcopt‑in and append it as backup, wdyt?- envrc_path = Path.cwd() / ".envrc" - if envrc_path.exists(): - envrc_secret_mgr = DotenvSecretManager(envrc_path) - _load_dotenv_file(envrc_path) - register_secret_manager( - envrc_secret_mgr, - ) + envrc_path = Path.cwd() / ".envrc" + if os.environ.get("AIRBYTE_MCP_LOAD_ENVRC") == "1" and envrc_path.exists(): + envrc_secret_mgr = DotenvSecretManager(envrc_path) + # Keep dotenv precedence lower than explicit file: + _load_dotenv_file(envrc_path) + register_secret_manager(envrc_secret_mgr, as_backup=True)Optionally, if we want explicit precedence even when env vars were set by
.envrc, we could passoverride=Truewhen loading the explicit dotenv (or load it first). Prefer avoiding overrides by relying on SecretManagers rather than mutatingos.environ. Wdyt?airbyte/cloud/sync_results.py (2)
106-110: Fix lints: movedatetimeimport to TYPE_CHECKING and drop unusedtimezone.Ruff errors:
- TC003:
datetime.datetimeonly used for typing — move under TYPE_CHECKING.- F401:
timezoneis unused.Proposed fix:
-from datetime import datetime, timezone -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, AnyAnd add inside the existing TYPE_CHECKING block:
if TYPE_CHECKING: + from datetime import datetime import sqlalchemy
189-223: Log reconstruction is reasonable.Covers both
eventsandlogLinesvariants with graceful empty handling — looks good. If we later see very large logs, do we want amax_lines/sinceoption?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte/_util/meta.py(3 hunks)airbyte/cloud/sync_results.py(5 hunks)airbyte/mcp/_util.py(4 hunks)bin/test_mcp_tool.py(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- bin/test_mcp_tool.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-10-09T21:11:11.706Z
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#415
File: airbyte/cli.py:0-0
Timestamp: 2024-10-09T21:11:11.706Z
Learning: In `_resolve_source_job`, returning `None` when `config` is falsy distinguishes between empty config and not-yet-provided config, allowing `_resolve_config()` to handle non-null inputs effectively.
Applied to files:
airbyte/mcp/_util.py
🧬 Code graph analysis (2)
airbyte/mcp/_util.py (4)
airbyte/_util/meta.py (1)
is_interactive(85-99)airbyte/secrets/env_vars.py (1)
DotenvSecretManager(31-64)airbyte/secrets/base.py (1)
SecretSourceEnum(23-31)airbyte/secrets/config.py (2)
register_secret_manager(45-60)disable_secret_source(68-81)
airbyte/cloud/sync_results.py (2)
airbyte/cloud/workspaces.py (1)
CloudWorkspace(57-454)airbyte/_util/api_util.py (1)
_make_config_api_request(807-844)
🪛 GitHub Actions: Run Linters
airbyte/cloud/sync_results.py
[error] 106-106: TC003 Move standard library import datetime.datetime into a type-checking block
[error] 106-106: F401 'datetime.timezone' imported but unused
airbyte/_util/meta.py
[error] 55-55: SIM103 Return the condition directly
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (9)
airbyte/_util/meta.py (3)
20-22: MCP mode flag looks good.Simple, explicit module-level toggle — no concerns.
34-36: Cachingis_ci()is fine.Env var seldom changes at runtime; caching is appropriate.
39-47: API to enable MCP mode looks solid.Clear docs and minimal surface — nice.
airbyte/mcp/_util.py (2)
12-19: Imports and new guards look fine.The non‑interactive guard via
is_interactive()+disable_secret_sourceis a good call.
73-75: Non‑interactive prompt disablement — LGTM.This will prevent unexpected input prompts in MCP/CI contexts.
airbyte/cloud/sync_results.py (4)
333-346: Attempts enumeration approach looks good.Indexing 0..N‑1 from the job’s
attemptslist matches the API’s attempt numbering.Could we confirm
attempts_datais always a list (notNone) across Cloud/OSS variants, and that attempt numbering aligns with the index, wdyt?
166-171: Datetime parsing viaab_datetime_parse— nice and robust.Switching away from manual ms/s handling removes a class of bugs. LGTM.
301-316: Start time parsing viaab_datetime_parse— LGTM.Consistent with other datetime handling.
492-495: Public export ofSyncAttempt— LGTM.Good to surface the new type.
- Replace datetime.fromtimestamp() and datetime.fromisoformat() with ab_datetime_parse() - Handle ISO datetime strings with 'Z' suffix properly using standard PyAirbyte utility - Add error handling for airbyte_api library datetime parsing failures in SyncResult.start_time - Update sample script with valid connection ID from current workspace - Fixes 'Invalid isoformat string: 2025-03-25T17:06:19Z' error Resolves datetime parsing issues in the SyncAttempt abstraction where the created_at property and SyncResult.start_time failed to parse ISO format datetime strings returned by the Airbyte Cloud API. Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
airbyte/cloud/sync_results.py (1)
172-188: Stop calling private_make_config_api_request; add/use public wrappers to satisfy ruff SLF001 and reduce coupling.CI is failing on SLF001. Can we switch to public helpers as suggested earlier and used elsewhere, wdyt?
Apply in this file:
def _fetch_attempt_info(self) -> dict[str, Any]: @@ - self._attempt_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper - api_root=self.workspace.api_root, - path="/v1/attempts/get_for_job", - json={ - "jobId": self.job_id, - "attemptNumber": self.attempt_number, - }, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + self._attempt_info = api_util.get_attempt_for_job( + api_root=self.workspace.api_root, + job_id=self.job_id, + attempt_number=self.attempt_number, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) return self._attempt_infodef start_time(self) -> datetime: """Return the start time of the sync job in UTC.""" try: return ab_datetime_parse(self._fetch_latest_job_info().start_time) - except (ValueError, TypeError) as e: - if "Invalid isoformat string" in str(e): - job_info_raw = api_util._make_config_api_request( - api_root=self.workspace.api_root, - path="/v1/jobs/get", - json={"id": self.job_id}, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) - raw_start_time = job_info_raw.get("startTime") - if raw_start_time: - return ab_datetime_parse(raw_start_time) - raise + except (ValueError, TypeError): + job_info_raw = api_util.get_job_with_attempts( + api_root=self.workspace.api_root, + job_id=self.job_id, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + raw_start_time = job_info_raw.get("startTime") + if raw_start_time: + return ab_datetime_parse(raw_start_time) + raisedef _fetch_job_with_attempts(self) -> dict[str, Any]: @@ - self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper - api_root=self.workspace.api_root, - path="/v1/jobs/get", - json={ - "id": self.job_id, - }, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + self._job_with_attempts_info = api_util.get_job_with_attempts( + api_root=self.workspace.api_root, + job_id=self.job_id, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) return self._job_with_attempts_infoAdd these wrappers (in airbyte/_util/api_util.py):
def get_attempt_for_job( *, api_root: str, job_id: int, attempt_number: int, client_id: SecretString, client_secret: SecretString ) -> dict[str, Any]: return _make_config_api_request( api_root=api_root, path="/v1/attempts/get_for_job", json={"jobId": job_id, "attemptNumber": attempt_number}, client_id=client_id, client_secret=client_secret, ) def get_job_with_attempts( *, api_root: str, job_id: int, client_id: SecretString, client_secret: SecretString ) -> dict[str, Any]: return _make_config_api_request( api_root=api_root, path="/v1/jobs/get", json={"id": job_id}, client_id=client_id, client_secret=client_secret, )Would you also like a
force_refresh: bool = Falseparam on_fetch_job_with_attempts()to bypass the cache while a job is running, wdyt?Also applies to: 314-329, 330-345
🧹 Nitpick comments (1)
airbyte/cloud/sync_results.py (1)
189-223: Optional: normalize and order log events for readability.Shall we sort events by timestamp and normalize timestamps to ISO using
ab_datetime_parseso output is stable and consistent, wdyt?- log_lines = [] - for event in log_events: - timestamp = event.get("timestamp", "") - level = event.get("level", "INFO") - message = event.get("message", "") - log_lines.append(f"[{timestamp}] {level}: {message}") + log_lines = [] + for event in sorted(log_events, key=lambda e: e.get("timestamp", 0)): + ts_raw = event.get("timestamp", "") + ts_str = ab_datetime_parse(ts_raw).isoformat() if ts_raw else "" + level = event.get("level", "INFO") + message = event.get("message", "") + log_lines.append(f"[{ts_str}] {level}: {message}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte/cloud/sync_results.py(5 hunks)bin/test_mcp_tool.py(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- bin/test_mcp_tool.py
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/cloud/sync_results.py (3)
airbyte/cloud/workspaces.py (1)
CloudWorkspace(57-454)airbyte/cloud/connections.py (1)
CloudConnection(19-252)airbyte/_util/api_util.py (1)
_make_config_api_request(807-844)
🪛 GitHub Actions: Run Linters
airbyte/cloud/sync_results.py
[error] 106-106: TC003: Move standard library import 'datetime.datetime' into a type-checking block.
[error] 106-106: F401: 'datetime.timezone' imported but unused.
[error] 318-318: SLF001: Private member accessed: '_make_config_api_request'
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (4)
airbyte/cloud/sync_results.py (4)
132-165: SyncAttempt dataclass and accessors look good.The fields, lazy fetch, and simple derived properties are clean and aligned with existing patterns. LGTM, wdyt?
167-171: Confirmab_datetime_parseinput types forcreatedAt.Config API often returns epoch seconds; sometimes strings. Does
ab_datetime_parseaccept both (int/str) here, or should we coerce withstr(ts)first to be safe, wdyt?
346-359: Attempt indexing: confirm 0..n-1 aligns with API semantics.This returns attempt numbers via
range(len(attempts_data)), which assumes 0-based contiguous numbering. Can we confirm the Config API uses 0-basedattemptNumberconsistently (and no gaps), wdyt?
506-508: Export surface LGTM.Including
SyncAttemptin__all__makes sense given the new API. 👍
- Remove /v1 prefix from path parameters in _make_config_api_request calls - Fix SyncAttempt._fetch_attempt_info to use /attempts/get_for_job instead of /v1/attempts/get_for_job - Fix SyncResult._fetch_job_with_attempts to use /jobs/get instead of /v1/jobs/get - Fix SyncResult.start_time fallback to use /jobs/get instead of /v1/jobs/get - Resolves 403 Forbidden errors caused by malformed URLs like https://cloud.airbyte.com/api/v1/v1/jobs/get Co-Authored-By: AJ Steers <aj@airbyte.io>
…ude full URL path - Add explicit error message for 403 Forbidden errors with full URL - Include detailed context: full_url, config_api_root, path, status_code - Improves debugging of API endpoint construction issues - Tested with MCP tools to verify enhanced error output Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte/cloud/sync_results.py (1)
123-130: Add typing-only import ofdatetimeunder TYPE_CHECKING.This complements the top-level removal and satisfies ruff’s TC003. Wdyt?
if TYPE_CHECKING: import sqlalchemy from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum from airbyte.caches.base import CacheBase from airbyte.cloud.connections import CloudConnection from airbyte.cloud.workspaces import CloudWorkspace + from datetime import datetime
♻️ Duplicate comments (3)
airbyte/cloud/sync_results.py (3)
346-359: LGTM on attempt numbering; considerforce_refreshfor running jobs.Enumerating
range(len(attempts_data))returns[0..n-1]and matches the earlier request to expose attempts by index. Would you add aforce_refresh: bool = Falseto refresh attempts while a job is still running, wdyt?- def _fetch_job_with_attempts(self) -> dict[str, Any]: + def _fetch_job_with_attempts(self, *, force_refresh: bool = False) -> dict[str, Any]: """Fetch job info with attempts from Config API using lazy loading pattern.""" - if self._job_with_attempts_info is not None: + if self._job_with_attempts_info is not None and not force_refresh: return self._job_with_attempts_info @@ - def get_attempts(self) -> list[SyncAttempt]: + def get_attempts(self, *, force_refresh: bool = False) -> list[SyncAttempt]: """Return a list of attempts for this sync job.""" - job_with_attempts = self._fetch_job_with_attempts() + job_with_attempts = self._fetch_job_with_attempts(force_refresh=force_refresh)
330-345: Same issue here: switch to public wrapper and ensure/v1path.Let’s use the
get_job_with_attemptshelper to resolve SLF001 and path concerns, wdyt?- self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper - api_root=self.workspace.api_root, - path="/jobs/get", - json={ - "id": self.job_id, - }, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + self._job_with_attempts_info = api_util.get_job_with_attempts( + api_root=self.workspace.api_root, + job_id=self.job_id, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + )
106-111: Fix lint: movedatetimeto TYPE_CHECKING and drop unusedtimezone(TC003, F401).
datetimeis only used in annotations (PEP 563/PEP 649 friendly), andtimezoneisn’t used. Shall we movedatetimeunder TYPE_CHECKING and removetimezoneso CI goes green, wdyt?-from datetime import datetime, timezone -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any
🧹 Nitpick comments (1)
airbyte/cloud/sync_results.py (1)
189-223: Verify logs payload shape; optionally normalize formatting.Does
/v1/attempts/get_for_jobactually includelogswith eithereventsorlogLinesinline, or is a separate logs endpoint required? If inline is not guaranteed,get_full_log_text()may silently return empty. Shall we (a) confirm payload and (b) optionally normalize timestamps withab_datetime_parsewhen present, wdyt?Example optional tweak to format event timestamps:
- timestamp = event.get("timestamp", "") + ts_raw = event.get("timestamp", "") + try: + timestamp = str(ab_datetime_parse(ts_raw)) if ts_raw else "" + except Exception: + timestamp = str(ts_raw)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/cloud/sync_results.py(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/cloud/sync_results.py (3)
airbyte/cloud/workspaces.py (1)
CloudWorkspace(57-454)airbyte/cloud/connections.py (1)
CloudConnection(19-252)airbyte/_util/api_util.py (1)
_make_config_api_request(807-844)
🪛 GitHub Actions: Run Linters
airbyte/cloud/sync_results.py
[error] 106-106: TC003 Move standard library import datetime.datetime into a type-checking block
[error] 106-106: F401 Import 'datetime.timezone' is unused
[error] 318-318: SLF001 Private member accessed: _make_config_api_request
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (1)
airbyte/cloud/sync_results.py (1)
505-508: ExportingSyncAttemptvia__all__looks good.Public surface matches added class and enables discoverability. 👍
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte/_util/api_util.py (1)
841-851: Do not includeresponse.__dict__in error context — leaks bearer tokens.
response.__dict__contains thePreparedRequestwithAuthorization: Bearer <token>. This risks credential leakage via logs/telemetry. Could we sanitize and only include safe fields (status, URL, truncated body/text, request-id), dropping raw request/response objects, wdyt?Apply this diff to narrow the context:
raise AirbyteError( message=error_message, context={ "full_url": full_url, "config_api_root": config_api_root, "path": path, "status_code": response.status_code, - "url": response.request.url, - "body": response.request.body, - "response": response.__dict__, + "url": response.request.url, + # Avoid leaking secrets; capture safe diagnostics only + "response_text": response.text[:2048], + "request_id": response.headers.get("x-request-id"), }, ) from ex
🧹 Nitpick comments (1)
airbyte/_util/api_util.py (1)
826-833: Safer URL join + add request timeouts.Joining with
+risks double slashes; alsorequestswithout a timeout can hang indefinitely. Shall we useurljoinand set a sane connect/read timeout, wdyt?Apply this diff:
- full_url = config_api_root + path + full_url = urljoin(config_api_root.rstrip("/") + "/", path.lstrip("/")) response = requests.request( method="POST", - url=full_url, + url=full_url, headers=headers, json=json, + timeout=(3.05, 30), # connect, read )Add the import (outside this hunk):
+from urllib.parse import urljoin
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/_util/api_util.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/_util/api_util.py (1)
airbyte/exceptions.py (1)
AirbyteError(432-447)
🪛 GitHub Actions: Run Linters
airbyte/_util/api_util.py
[error] 838-838: PLR2004: Magic value used in comparison, consider replacing 403 with a constant variable.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
…rnal endpoint - Refactor SyncAttempt to use _attempt_data from job response instead of calling /v1/attempt/get_for_job - Eliminates 403 Forbidden errors when accessing attempt-level data - Remove _fetch_attempt_info() method that called internal API endpoint - Update SyncAttempt constructor to accept _attempt_data parameter - Modify get_attempts() to pass attempt data directly to SyncAttempt instances - Maintain existing datetime parsing fixes using ab_datetime_parse - Fixes MCP tools get_cloud_sync_status with include_attempts=true and get_cloud_sync_logs Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte/cloud/sync_results.py (1)
101-110: Re-run CI/linters after removing private Config API calls?
- _make_config_api_request still present: airbyte/cloud/sync_results.py:309, 326 (defined in airbyte/_util/api_util.py:807).
- Local lint run failed: "poetry: command not found" — ruff didn't run. Re-run CI/linters and confirm TC003/F401 cleared, wdyt?
♻️ Duplicate comments (2)
airbyte/cloud/sync_results.py (2)
305-320: Replace private Config API call and handle response shape.Use a public wrapper and support nested job payloads; avoids SLF001 and missing “/v1”, wdyt?
try: return ab_datetime_parse(self._fetch_latest_job_info().start_time) except (ValueError, TypeError) as e: if "Invalid isoformat string" in str(e): - job_info_raw = api_util._make_config_api_request( - api_root=self.workspace.api_root, - path="/jobs/get", - json={"id": self.job_id}, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) - raw_start_time = job_info_raw.get("startTime") + job_info_raw = api_util.get_job_with_attempts( + api_root=self.workspace.api_root, + job_id=self.job_id, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + raw_job = job_info_raw.get("job", job_info_raw) + raw_start_time = raw_job.get("startTime") if raw_start_time: return ab_datetime_parse(raw_start_time) raise
321-336: Same: avoid private_make_config_api_requestin _fetch_job_with_attempts.Switch to a public wrapper and cache as before, wdyt?
def _fetch_job_with_attempts(self) -> dict[str, Any]: """Fetch job info with attempts from Config API using lazy loading pattern.""" if self._job_with_attempts_info is not None: return self._job_with_attempts_info - self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper - api_root=self.workspace.api_root, - path="/jobs/get", - json={ - "id": self.job_id, - }, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + self._job_with_attempts_info = api_util.get_job_with_attempts( + api_root=self.workspace.api_root, + job_id=self.job_id, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) return self._job_with_attempts_info
🧹 Nitpick comments (3)
airbyte/cloud/sync_results.py (3)
166-171: Harden created_at parsing (and clear TC003/F401).Handle seconds vs. milliseconds and use datetime at runtime so the lints pass, wdyt?
@property def created_at(self) -> datetime: """Return the creation time of the attempt.""" - timestamp = self._get_attempt_data()["createdAt"] - return ab_datetime_parse(timestamp) + ts = self._get_attempt_data().get("createdAt") + if isinstance(ts, (int, float)): + # Support ms or s + if ts > 10**12: + ts /= 1000 + return datetime.fromtimestamp(ts, tz=timezone.utc) + return ab_datetime_parse(ts)
172-177: Wrap long error string (E501).Break the message over lines, wdyt?
def _get_attempt_data(self) -> dict[str, Any]: """Get attempt data from the provided attempt data.""" if self._attempt_data is None: - raise ValueError("Attempt data not provided. SyncAttempt should be created via SyncResult.get_attempts().") + raise ValueError( + "Attempt data not provided. " + "SyncAttempt should be created via SyncResult.get_attempts()." + ) return self._attempt_data["attempt"]
337-351: Be robust to nested job payloads when extracting attempts.Support both
{job:{attempts:[...]}}and flat{attempts:[...]}shapes, wdyt?def get_attempts(self) -> list[SyncAttempt]: """Return a list of attempts for this sync job.""" - job_with_attempts = self._fetch_job_with_attempts() - attempts_data = job_with_attempts.get("attempts", []) + job_with_attempts = self._fetch_job_with_attempts() + raw_job = job_with_attempts.get("job", job_with_attempts) + attempts_data = raw_job.get("attempts", []) return [ SyncAttempt( workspace=self.workspace, connection=self.connection, job_id=self.job_id, attempt_number=i, _attempt_data=attempt_data, ) for i, attempt_data in enumerate(attempts_data) ]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/cloud/sync_results.py(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/cloud/sync_results.py (3)
airbyte/cloud/workspaces.py (1)
CloudWorkspace(57-454)airbyte/cloud/connections.py (1)
CloudConnection(19-252)airbyte/_util/api_util.py (1)
_make_config_api_request(807-853)
🪛 GitHub Actions: Run Linters
airbyte/cloud/sync_results.py
[error] 106-106: TC003 Move standard library import datetime.datetime into a type-checking block
[error] 106-106: F401: 'datetime.timezone' imported but unused
[error] 175-175: E501 Line too long (119 > 100)
[error] 178-178: PLR0911 Too many return statements (7 > 6)
[error] 186-186: W293 Blank line contains whitespace
[error] 309-309: SLF001 Private member accessed: _make_config_api_request
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (1)
airbyte/cloud/sync_results.py (1)
497-500: API surface looks good.Exporting SyncAttempt alongside SyncResult makes sense; thanks for keeping the public surface tidy, wdyt?
Addresses GitHub comment from aaronsteers about making enumerate start parameter explicit in get_attempts() method. Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte/_util/api_util.py (1)
875-889: Avoid leaking bearer tokens in exception context; sanitize or drop headers.
response.__dict__contains therequestobject and headers, includingAuthorization: Bearer <token>. If this bubbles into logs/telemetry, it’s a secret leak. Can we redact headers and avoid passing the entire response internals, wdyt?- error_message = f"API request failed with status {response.status_code}" - if response.status_code == HTTPStatus.FORBIDDEN: # 403 error - error_message += f" (Forbidden) when accessing: {full_url}" - raise AirbyteError( - message=error_message, - context={ - "full_url": full_url, - "config_api_root": config_api_root, - "path": path, - "status_code": response.status_code, - "url": response.request.url, - "body": response.request.body, - "response": response.__dict__, - }, - ) from ex + error_message = f"API request failed with status {response.status_code}" + if response.status_code == HTTPStatus.FORBIDDEN: + error_message += f" (Forbidden) when accessing: {full_url}" + # Redact sensitive headers to avoid bearer token leaks in logs/telemetry. + try: + request_headers = { + k: ("REDACTED" if k.lower() == "authorization" else v) + for k, v in response.request.headers.items() + } + except Exception: + request_headers = None + raise AirbyteError( + message=error_message, + context={ + "full_url": full_url, + "config_api_root": config_api_root, + "path": path, + "status_code": response.status_code, + "url": response.request.url, + "body": response.request.body, + "request_headers": request_headers, + "response_text": response.text[:2000], + }, + ) from exAlso, the inline comment “# 403 error” is redundant next to
HTTPStatus.FORBIDDEN; I dropped it in the patch.
♻️ Duplicate comments (3)
airbyte/cloud/sync_results.py (3)
308-308: Verify the Config API path includes/v1prefixThe path "/jobs/get" might be missing the
/v1prefix. Could you verify if this should be "/v1/jobs/get" to match the Config API pattern, wdyt?#!/bin/bash # Check for other Config API paths in the codebase to see the pattern rg -n '"/v1/(jobs|attempts)' --type py -C2 rg -n 'path.*"/jobs' --type py -C2
302-316: Address the use of private API functionThe code uses
api_util._make_config_api_requestwhich is a private function. Past reviews suggested creating a public wrapper. Should we add a public API function for this use case, wdyt?- job_info_raw = api_util._make_config_api_request( # noqa: SLF001 - api_root=self.workspace.api_root, - path="/jobs/get", - json={"id": self.job_id}, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - ) + job_info_raw = api_util.get_job_details( + api_root=self.workspace.api_root, + job_id=self.job_id, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + )And add to
airbyte/_util/api_util.py:def get_job_details( *, api_root: str, job_id: int, client_id: SecretString, client_secret: SecretString ) -> dict[str, Any]: """Get detailed job information from the Config API.""" return _make_config_api_request( api_root=api_root, path="/v1/jobs/get", json={"id": job_id}, client_id=client_id, client_secret=client_secret, )
318-332: Replace private API call with public wrapperSimilar to the previous comment, this also uses the private
_make_config_api_request. Should we use the same public wrapper here, wdyt?
🧹 Nitpick comments (6)
airbyte/_util/api_util.py (1)
864-868: Prefer robust URL joining to avoid accidental double/missing slashes.Building
full_urlvia concatenation can be brittle. Shall we useurljoinfor consistency across roots/paths, wdyt?- full_url = config_api_root + path + full_url = urljoin(config_api_root.rstrip("/") + "/", path.lstrip("/"))Note: the import for
urljoinis included in my import diff above.airbyte/mcp/_cloud_ops.py (3)
394-394: Consider extractingreturnfrom try block to satisfy Ruff TRY300Ruff flags the return inside the try block. Would you like to move it out to an else clause to follow the linter's best practices, wdyt?
+ result_data: dict[str, Any] try: workspace: CloudWorkspace = _get_cloud_workspace() connection = workspace.get_connection(connection_id=connection_id) # If a job ID is provided, get the job by ID. sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) if not sync_result: return {"status": None, "job_id": None, "attempts": []} result = { "status": sync_result.get_job_status(), "job_id": sync_result.job_id, "bytes_synced": sync_result.bytes_synced, "records_synced": sync_result.records_synced, "start_time": sync_result.start_time.isoformat(), "job_url": sync_result.job_url, "attempts": [], } if include_attempts: attempts = sync_result.get_attempts() result["attempts"] = [ { "attempt_number": attempt.attempt_number, "attempt_id": attempt.attempt_id, "status": attempt.status, "bytes_synced": attempt.bytes_synced, "records_synced": attempt.records_synced, "created_at": attempt.created_at.isoformat(), } for attempt in attempts ] - return result # noqa: TRY300 - + result_data = result except Exception as ex: return { "status": None, "job_id": job_id, "error": f"Failed to get sync status for connection '{connection_id}': {ex}", "attempts": [], } + else: + return result_data
486-486: Consider movingreturnoutside try block (Ruff TRY300)Similar to the previous function, would you consider restructuring to avoid the return inside try, wdyt?
+ logs_result: str try: workspace: CloudWorkspace = _get_cloud_workspace() connection = workspace.get_connection(connection_id=connection_id) sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) if not sync_result: return f"No sync job found for connection '{connection_id}'" attempts = sync_result.get_attempts() if not attempts: return f"No attempts found for job '{sync_result.job_id}'" if attempt_number is not None: target_attempt = None for attempt in attempts: if attempt.attempt_number == attempt_number: target_attempt = attempt break if target_attempt is None: return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'" else: target_attempt = max(attempts, key=lambda a: a.attempt_number) logs = target_attempt.get_full_log_text() if not logs: return ( f"No logs available for job '{sync_result.job_id}', " f"attempt {target_attempt.attempt_number}" ) - return logs # noqa: TRY300 - + logs_result = logs except Exception as ex: return f"Failed to get logs for connection '{connection_id}': {ex}" + else: + return logs_result
1-516: Runruff formatto fix formattingThe pipeline is failing because the file needs formatting. Could you run
ruff formatto apply the required formatting changes?#!/bin/bash # Run ruff format on the file cd /path/to/repo poetry run ruff format airbyte/mcp/_cloud_ops.pyairbyte/cloud/sync_results.py (2)
180-210: Consider simplifying the log text extraction logicThe
get_full_log_textmethod has multiple return points and nested conditions. Would you consider consolidating the logic to reduce complexity, wdyt?def get_full_log_text(self) -> str: """Return the complete log text for this attempt. Returns: String containing all log text for this attempt, with lines separated by newlines. """ - if self._attempt_data is None: - return "" - - logs_data = self._attempt_data.get("logs") - if not logs_data: - return "" - - result = "" - - if "events" in logs_data: - log_events = logs_data["events"] - if log_events: - log_lines = [] - for event in log_events: - timestamp = event.get("timestamp", "") - level = event.get("level", "INFO") - message = event.get("message", "") - log_lines.append(f"[{timestamp}] {level}: {message}") - result = "\n".join(log_lines) - elif "logLines" in logs_data: - log_lines = logs_data["logLines"] - if log_lines: - result = "\n".join(log_lines) - - return result + if not self._attempt_data: + return "" + + logs_data = self._attempt_data.get("logs", {}) + + # Try events format first + if events := logs_data.get("events"): + log_lines = [ + f"[{event.get('timestamp', '')}] {event.get('level', 'INFO')}: {event.get('message', '')}" + for event in events + ] + return "\n".join(log_lines) + + # Fall back to logLines format + if log_lines := logs_data.get("logLines"): + return "\n".join(log_lines) + + return ""
1-497: Format the file with ruff to fix lintingThe pipeline indicates this file needs formatting. Could you run ruff format on it?
#!/bin/bash # Run ruff format on the file cd /path/to/repo poetry run ruff format airbyte/cloud/sync_results.py
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte/_util/api_util.py(2 hunks)airbyte/cloud/sync_results.py(6 hunks)airbyte/mcp/_cloud_ops.py(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
airbyte/cloud/sync_results.py (3)
airbyte/cloud/workspaces.py (1)
CloudWorkspace(57-452)airbyte/cloud/connections.py (1)
CloudConnection(19-279)airbyte/_util/api_util.py (1)
_make_config_api_request(845-891)
airbyte/mcp/_cloud_ops.py (4)
airbyte/cloud/workspaces.py (1)
get_connection(98-110)airbyte/_util/api_util.py (1)
get_connection(336-363)airbyte/cloud/sync_results.py (11)
SyncResult(214-491)get_job_status(272-274)bytes_synced(156-158)bytes_synced(290-292)records_synced(161-163)records_synced(295-297)start_time(300-316)job_url(232-241)get_attempts(334-348)status(151-153)get_full_log_text(180-210)airbyte/cloud/connections.py (1)
get_sync_result(232-257)
airbyte/_util/api_util.py (1)
airbyte/exceptions.py (1)
AirbyteError(432-447)
🪛 GitHub Actions: Run Linters
airbyte/cloud/sync_results.py
[error] 1-1: Ruff format would reformat this file. Run 'ruff format' to apply changes.
airbyte/mcp/_cloud_ops.py
[error] 1-1: Ruff format would reformat this file. Run 'ruff format' to apply changes.
airbyte/_util/api_util.py
[error] 14-14: ruff: Import block is un-sorted or un-formatted. 2 issues are fixable with '--fix'.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (7)
airbyte/_util/api_util.py (1)
16-23: Import order fixed in airbyte/_util/api_util.py — ruff --fix reordered the imports; no further changes required. Want me to also addfrom urllib.parse import urljoinif you adopt the URL-join nit, wdyt?airbyte/mcp/_cloud_ops.py (2)
360-395: LGTM! The structured response and error handling look solid.The refactored
get_cloud_sync_statusnow returns structured data that's easier to consume, and the comprehensive error handling ensures robustness.
429-490: Nice addition! The attempt-based log retrieval enhances debugging.The new
get_cloud_sync_logsfunction provides granular access to sync attempt logs, which will be valuable for debugging sync issues.airbyte/cloud/sync_results.py (4)
131-211: Great addition! The SyncAttempt class provides valuable per-attempt visibility.The new
SyncAttemptdataclass effectively encapsulates attempt-level data with lazy loading and proper abstractions for status, metrics, and logs.
334-348: Good implementation of lazy-loaded attempts!The
get_attemptsmethod effectively createsSyncAttemptinstances with the fetched data, maintaining good encapsulation and lazy loading patterns.
347-347: Confirm attempt-number base — enumerate currently starts at 0Search shows attempts are exposed as attempt.attempt_number (used/compared in airbyte/mcp/_cloud_ops.py) while airbyte/cloud/sync_results.py builds attempts with enumerate(attempts_data, start=0).
Should we change to enumerate(attempts_data, start=1) so attempt_number aligns with API/consumer expectations, wdyt?Files to check: airbyte/cloud/sync_results.py (~344–347), airbyte/mcp/_cloud_ops.py (~384, 466–477).
165-169: ConfirmcreatedAttimestamp format
created_atcallsab_datetime_parse(timestamp)directly without dividing by 1000. Could you confirm that the API’screatedAtvalue is provided in seconds (or as an ISO8601 string) and not milliseconds? wdyt?
- Fix import sorting in api_util.py and sync_results.py - Fix formatting and indentation in _cloud_ops.py - All local checks now pass with poe fix-and-check Co-Authored-By: AJ Steers <aj@airbyte.io>
- Update dependency markers after running lint fixes - Keeps lock file in sync with resolved dependencies Co-Authored-By: AJ Steers <aj@airbyte.io>
SyncAttempt abstraction for fetching job attempts
7b3406e
into
main
feat(cloud): Add SyncAttempt abstraction for job attempt handling (do not merge)
Summary
This PR aims to add a
SyncAttemptclass to PyAirbyte to provide proper abstractions for job attempts and enable log retrieval functionality. Currently, this PR only contains poetry.lock updates to establish the working branch.Planned Implementation:
SyncAttemptdataclass tosync_results.pySyncResult.get_attempts() -> list[SyncAttempt]method/v1/attempts/get_for_jobendpointCurrent Status: Only dependency lock file updates - no implementation code yet.
Review & Testing Checklist for Human
Notes
/v1/attempts/get_for_jobendpointSummary by CodeRabbit
New Features
Bug Fixes
Documentation
Important
Auto-merge enabled.
This PR is set to merge automatically when all requirements are met.
Note
Auto-merge may have been disabled. Please check the PR status to confirm.