From 566889de43a47d62091a47e9826f58161569cbaa Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 17 Sep 2024 13:39:49 -0700 Subject: [PATCH 1/3] fix: offset MB processed by size of RECORD envelop --- airbyte/_connector_base.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 2007cbfa5..9b1b99aa2 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -401,13 +401,20 @@ def _execute( # Fail early if the connector is not installed. self.executor.ensure_installation(auto_fix=False) + # When calculating MB read, we need to account for the envelope size. + envelope_size = ( + len('{"type": "RECORD", "record": }') + + len('{"stream": "", "data": {}, "emitted_at": 1234567890}') + # + len('"namespace": "", ') # We're knowingly omitting this to keep perf impact low. + ) + try: for line in self.executor.execute(args, stdin=stdin): try: message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line) if progress_tracker and message.record: progress_tracker.tally_bytes_read( - len(line), + bytes_read=len(line) - envelope_size - len(message.record.stream), stream_name=message.record.stream, ) self._peek_airbyte_message(message) From d046eea74f38dbc3a096cc0032a3c353341b74a2 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 17 Sep 2024 13:41:05 -0700 Subject: [PATCH 2/3] add comment --- airbyte/_connector_base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 9b1b99aa2..eef9af876 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -402,6 +402,8 @@ def _execute( self.executor.ensure_installation(auto_fix=False) # When calculating MB read, we need to account for the envelope size. + # Note our priority is to keep performance up, while providing at least rough + # alignment with comparable metrics in Airbyte Cloud. envelope_size = ( len('{"type": "RECORD", "record": }') + len('{"stream": "", "data": {}, "emitted_at": 1234567890}') From b962f3946c6af6cc240e147d0a445bf8dc2acce4 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 17 Sep 2024 14:15:48 -0700 Subject: [PATCH 3/3] apply pr suggestion --- airbyte/_connector_base.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index eef9af876..8d7dbc00a 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -404,10 +404,18 @@ def _execute( # When calculating MB read, we need to account for the envelope size. # Note our priority is to keep performance up, while providing at least rough # alignment with comparable metrics in Airbyte Cloud. - envelope_size = ( - len('{"type": "RECORD", "record": }') - + len('{"stream": "", "data": {}, "emitted_at": 1234567890}') - # + len('"namespace": "", ') # We're knowingly omitting this to keep perf impact low. + envelope_size = len( + json.dumps( + { + "type": "RECORD", + "record": { + "stream": "", + "data": {}, + "emitted_at": 1234567890, + # "namespace": "", # We're knowingly omitting this to keep perf impact low. + }, + } + ) ) try: @@ -415,9 +423,10 @@ def _execute( try: message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line) if progress_tracker and message.record: + stream_name = message.record.stream progress_tracker.tally_bytes_read( - bytes_read=len(line) - envelope_size - len(message.record.stream), - stream_name=message.record.stream, + bytes_read=len(line) - envelope_size - len(stream_name), + stream_name=stream_name, ) self._peek_airbyte_message(message) yield message