diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 2007cbfa5..8d7dbc00a 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -401,14 +401,32 @@ def _execute( # Fail early if the connector is not installed. self.executor.ensure_installation(auto_fix=False) + # When calculating MB read, we need to account for the envelope size. + # Note our priority is to keep performance up, while providing at least rough + # alignment with comparable metrics in Airbyte Cloud. + envelope_size = len( + json.dumps( + { + "type": "RECORD", + "record": { + "stream": "", + "data": {}, + "emitted_at": 1234567890, + # "namespace": "", # We're knowingly omitting this to keep perf impact low. + }, + } + ) + ) + try: for line in self.executor.execute(args, stdin=stdin): try: message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line) if progress_tracker and message.record: + stream_name = message.record.stream progress_tracker.tally_bytes_read( - len(line), - stream_name=message.record.stream, + bytes_read=len(line) - envelope_size - len(stream_name), + stream_name=stream_name, ) self._peek_airbyte_message(message) yield message