Feat: Add performance metrics and mb/s tracking#356
Conversation
|
Warning Rate limit exceededAaron ("AJ") Steers (@aaronsteers) has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 3 minutes and 33 seconds before requesting another review. How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughWalkthroughThe changes introduce a progress tracking feature across multiple components of the Airbyte codebase. The Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Connector
participant ProgressTracker
participant Source
User->>Connector: Start execution
Connector->>ProgressTracker: Initialize progress tracking
Connector->>Source: Read data
Source->>Connector: Yield record
Connector->>ProgressTracker: Tally bytes read
Connector->>User: Provide execution status
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 1
Outside diff range, codebase verification and nitpick comments (4)
airbyte/_connector_base.py (1)
356-357: Signature change looks good. Consider updating the docstring?The new
progress_trackerparameter is well-integrated into the method signature. Great job on making it a keyword-only argument with proper type hinting!Would you like to update the method's docstring to include information about the new
progress_trackerparameter? This could help other developers understand its purpose and usage. What do you think?airbyte/sources/base.py (1)
Line range hint
1-1: Consider enhancing error handling with progress tracking. Thoughts?The integration of the
progress_trackerlooks good overall. To further improve its usage, wdyt about:
- Adding more granular progress updates within long-running operations?
- Enhancing error handling to provide more detailed progress information when exceptions occur?
These changes could provide more insights into the execution process and make debugging easier. What do you think?
airbyte/progress.py (2)
243-269: New byte tracking properties and methods look great!The additions provide a comprehensive set of tools for tracking and accessing byte-level metrics. The implementation is consistent with the existing code style and looks correct.
One small suggestion: Have you considered adding type hints for the return values of
total_bytes_readandtotal_megabytes_read? It might improve code readability. For example:def total_bytes_read(self) -> int | None: ... def total_megabytes_read(self) -> float | None: ...What do you think? This is just a minor suggestion, and the current implementation is already good.
432-489: Expanded _log_read_metrics looks great!The addition of byte-related metrics significantly enhances the logging functionality. The implementation is well-structured and consistent with the existing code style.
One small suggestion: Have you considered adding a try-except block around the JSON dumping operation? This could help handle potential serialization errors gracefully. For example:
try: self._file_logger.info(json.dumps({"read_performance_metrics": perf_metrics})) except json.JSONEncodeError as e: self._file_logger.error(f"Failed to serialize performance metrics: {e}")What do you think? This is just a suggestion for added robustness, and the current implementation is already good.
Example output printed to log files:
Metrics from Faker to a local DuckDB cache:
{ "read_performance_metrics": { "job_description": { "description": "source-faker -> DuckDBCache", "source": "source-faker", "cache": "DuckDBCache" }, "total_records_read": 200100, "read_start_time": 1725301258.959898, "read_end_time": 1725301282.203239, "records_per_second": 9252.684942052056, "mb_read": 92.544833, "mb_per_second": 4.2793, "stream_metrics": { "products": { "records_read": 100, "read_start_time": 1725301260.57727, "read_end_time": 1725301260.579796, "read_time_seconds": 0.0025260448455810547, "records_per_second": 39587.57904672015, "mb_read": 0.023553, "mb_per_second": 9.3241 }, "users": { "records_read": 100000, "read_start_time": 1725301261.9061, "read_end_time": 1725301273.219753, "read_time_seconds": 11.313652992248535, "records_per_second": 8838.878129682274, "mb_read": 60.902998, "mb_per_second": 5.3831 }, "purchases": { "records_read": 100000, "read_start_time": 1725301274.128414, "read_end_time": 1725301282.041147, "read_time_seconds": 7.91273307800293, "records_per_second": 12637.858375129052, "mb_read": 31.618282, "mb_per_second": 3.9959 } } } }Metrics from e2e source to e2e destination, with caching disabled:
{ "read_performance_metrics": { "job_description": { "description": "source-e2e -> destination-e2e-test", "source": "source-e2e", "destination": "destination-e2e-test" }, "total_records_read": 240000, "read_start_time": 1725301176.394967, "read_end_time": 1725301184.583842, "records_per_second": 36316.86986395221, "mb_read": 62.88, "mb_per_second": 9.515, "stream_metrics": { "stream1": { "records_read": 240000, "read_start_time": 1725301177.975788, "read_end_time": null } } } }We also print MB/s in live progress tracking:
Summary by CodeRabbit
New Features
Bug Fixes
_read_with_catalogfunction to support progress tracking, enhancing usability during data reading processes.