From 48028567192877dd6bb0e87d9b91f2fc322cdc69 Mon Sep 17 00:00:00 2001 From: Marianob Span Date: Wed, 5 Mar 2025 20:59:41 -0300 Subject: [PATCH 1/5] Fix state handling when passed in read() method Low-code connectors instantiate the source without a state and pass the state via the read() method. This commit updates the ConcurrentDeclarativeSource to support this behavior. Changes are: - Instantiate the ConnectorStateManager with the state passed in read() method. - Update the ModelToComponentFactory to use the stream_state parameter when creating a cursor. - Add a test to verify the behavior: instantiate the source without a state and pass the state via the read() method. --- .../concurrent_declarative_source.py | 3 + .../parsers/model_to_component_factory.py | 4 +- .../test_concurrent_declarative_source.py | 67 +++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index c92ffb150..4333d341e 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -135,6 +135,7 @@ def read( catalog: ConfiguredAirbyteCatalog, state: Optional[List[AirbyteStateMessage]] = None, ) -> Iterator[AirbyteMessage]: + self._connector_state_manager = ConnectorStateManager(state=state) concurrent_streams, _ = self._group_streams(config=config) # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of @@ -267,6 +268,7 @@ def _group_streams( component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above stream_name=declarative_stream.name, stream_namespace=declarative_stream.namespace, + stream_state=stream_state, config=config or {}, ) else: @@ -275,6 +277,7 @@ def _group_streams( component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above stream_name=declarative_stream.name, stream_namespace=declarative_stream.namespace, + stream_state=stream_state, config=config or {}, ) partition_generator = StreamSlicerPartitionGenerator( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 39058f834..322fbbf28 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -983,7 +983,8 @@ def create_concurrent_cursor_from_datetime_based_cursor( ) -> ConcurrentCursor: # Per-partition incremental streams can dynamically create child cursors which will pass their current # state via the stream_state keyword argument. Incremental syncs without parent streams use the - # incoming state and connector_state_manager that is initialized when the component factory is created + # incoming state and connector_state_manager that is initialized when the component factory is created. + # stream_state is also used in low code connector where the state is passed via the read() method. stream_state = ( self._connector_state_manager.get_stream_state(stream_name, stream_namespace) if "stream_state" not in kwargs @@ -1209,6 +1210,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor( # Per-partition incremental streams can dynamically create child cursors which will pass their current # state via the stream_state keyword argument. Incremental syncs without parent streams use the # incoming state and connector_state_manager that is initialized when the component factory is created + # stream_state is also used in low code connector where the state is passed via the read() method. stream_state = ( self._connector_state_manager.get_stream_state(stream_name, stream_namespace) if "stream_state" not in kwargs diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 4a043ac82..fd975d77d 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -825,6 +825,73 @@ def test_create_concurrent_cursor(): assert incremental_counting_cursor._end_provider() == math.inf +@freezegun.freeze_time(_NOW) +def test_concurrent_cursor_with_state_in_read_method(): + """ + This test mimicks the behavior of a source in a real life low-code connector + where the source is instantiated without a state and the state is provided + via the read() method. + + Note: this test specifically checks that for DatetimeBasedCursor + """ + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="party_members", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-09-05"), + ), + ), + ] + + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="party_members", json_schema={}, supported_sync_modes=[SyncMode.incremental] + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ), + ] + ) + + + def get_source(): + return ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[] + ) + + + source = get_source() + with HttpMocker() as http_mocker: + _mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES) + messages_iterator = source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state) + + # Get the first message only to get the stream running so that we can check the cursor + first_message = next(messages_iterator) + + concurrent_streams, _ = source._group_streams(config=_CONFIG) + party_members_stream = [s for s in concurrent_streams if s.name == "party_members"][0] + + assert party_members_stream is not None, "Could not find party_members stream" + party_members_cursor = party_members_stream.cursor + + assert isinstance(party_members_cursor, ConcurrentCursor) + assert party_members_cursor._stream_name == "party_members" + assert party_members_cursor._cursor_field.cursor_field_key == "updated_at" + + cursor_value = AirbyteDateTime.strptime("2024-09-05", "%Y-%m-%d") + + assert len(party_members_cursor._concurrent_state["slices"]) == 1 + assert party_members_cursor._concurrent_state["slices"][0]["most_recent_cursor_value"] == cursor_value + + messages = list(messages_iterator) + party_members_records = get_records_for_stream("party_members", messages) + # There is only one record after 2024-09-05 + assert len(party_members_records) == 1 + assert party_members_records[0].data["id"] == "yoshizawa" + def test_check(): """ Verifies that the ConcurrentDeclarativeSource check command is run against synchronous streams From 5c341a8c652dfdd9b6ac35090a23ab9f65fe2a84 Mon Sep 17 00:00:00 2001 From: Marianob Span Date: Wed, 5 Mar 2025 21:05:06 -0300 Subject: [PATCH 2/5] Format tests --- .../test_concurrent_declarative_source.py | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index fd975d77d..641655d11 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -848,7 +848,9 @@ def test_concurrent_cursor_with_state_in_read_method(): streams=[ ConfiguredAirbyteStream( stream=AirbyteStream( - name="party_members", json_schema={}, supported_sync_modes=[SyncMode.incremental] + name="party_members", + json_schema={}, + supported_sync_modes=[SyncMode.incremental], ), sync_mode=SyncMode.incremental, destination_sync_mode=DestinationSyncMode.append, @@ -856,21 +858,21 @@ def test_concurrent_cursor_with_state_in_read_method(): ] ) - def get_source(): return ConcurrentDeclarativeSource( source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[] ) - source = get_source() with HttpMocker() as http_mocker: _mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES) - messages_iterator = source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state) - + messages_iterator = source.read( + logger=source.logger, config=_CONFIG, catalog=catalog, state=state + ) + # Get the first message only to get the stream running so that we can check the cursor first_message = next(messages_iterator) - + concurrent_streams, _ = source._group_streams(config=_CONFIG) party_members_stream = [s for s in concurrent_streams if s.name == "party_members"][0] @@ -880,18 +882,22 @@ def get_source(): assert isinstance(party_members_cursor, ConcurrentCursor) assert party_members_cursor._stream_name == "party_members" assert party_members_cursor._cursor_field.cursor_field_key == "updated_at" - + cursor_value = AirbyteDateTime.strptime("2024-09-05", "%Y-%m-%d") - + assert len(party_members_cursor._concurrent_state["slices"]) == 1 - assert party_members_cursor._concurrent_state["slices"][0]["most_recent_cursor_value"] == cursor_value - + assert ( + party_members_cursor._concurrent_state["slices"][0]["most_recent_cursor_value"] + == cursor_value + ) + messages = list(messages_iterator) party_members_records = get_records_for_stream("party_members", messages) # There is only one record after 2024-09-05 assert len(party_members_records) == 1 assert party_members_records[0].data["id"] == "yoshizawa" + def test_check(): """ Verifies that the ConcurrentDeclarativeSource check command is run against synchronous streams From 19da94da23fc29227d1ba17b3f88f15fcf39c4d5 Mon Sep 17 00:00:00 2001 From: Marianob Span Date: Wed, 5 Mar 2025 22:01:54 -0300 Subject: [PATCH 3/5] State can be missing --- .../sources/declarative/concurrent_declarative_source.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 4333d341e..357bc95e1 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -135,7 +135,8 @@ def read( catalog: ConfiguredAirbyteCatalog, state: Optional[List[AirbyteStateMessage]] = None, ) -> Iterator[AirbyteMessage]: - self._connector_state_manager = ConnectorStateManager(state=state) + if state: + self._connector_state_manager = ConnectorStateManager(state=state) concurrent_streams, _ = self._group_streams(config=config) # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of From 5055173c6c945491326780106f3a9c3577894f7e Mon Sep 17 00:00:00 2001 From: Marianob Span Date: Wed, 5 Mar 2025 22:46:14 -0300 Subject: [PATCH 4/5] Fix test --- .../test_concurrent_declarative_source.py | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 641655d11..f4148c13a 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -839,7 +839,7 @@ def test_concurrent_cursor_with_state_in_read_method(): type=AirbyteStateType.STREAM, stream=AirbyteStreamState( stream_descriptor=StreamDescriptor(name="party_members", namespace=None), - stream_state=AirbyteStateBlob(updated_at="2024-09-05"), + stream_state=AirbyteStateBlob(updated_at="2024-09-04"), ), ), ] @@ -858,20 +858,15 @@ def test_concurrent_cursor_with_state_in_read_method(): ] ) - def get_source(): - return ConcurrentDeclarativeSource( - source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[] - ) - - source = get_source() + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[] + ) + with HttpMocker() as http_mocker: _mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES) - messages_iterator = source.read( + messages = list(source.read( logger=source.logger, config=_CONFIG, catalog=catalog, state=state - ) - - # Get the first message only to get the stream running so that we can check the cursor - first_message = next(messages_iterator) + )) concurrent_streams, _ = source._group_streams(config=_CONFIG) party_members_stream = [s for s in concurrent_streams if s.name == "party_members"][0] @@ -883,7 +878,7 @@ def get_source(): assert party_members_cursor._stream_name == "party_members" assert party_members_cursor._cursor_field.cursor_field_key == "updated_at" - cursor_value = AirbyteDateTime.strptime("2024-09-05", "%Y-%m-%d") + cursor_value = AirbyteDateTime.strptime("2024-09-04", "%Y-%m-%d") assert len(party_members_cursor._concurrent_state["slices"]) == 1 assert ( @@ -891,12 +886,16 @@ def get_source(): == cursor_value ) - messages = list(messages_iterator) - party_members_records = get_records_for_stream("party_members", messages) # There is only one record after 2024-09-05 + party_members_records = get_records_for_stream("party_members", messages) assert len(party_members_records) == 1 assert party_members_records[0].data["id"] == "yoshizawa" + # Emitted state should have the updated_at of the one record read + states = get_states_for_stream("party_members", messages) + assert len(states) == 2 + assert states[1].stream.stream_state == AirbyteStateBlob(updated_at=party_members_records[0].data["updated_at"]) + def test_check(): """ From cdde727dd1df4d8a20e594d97fa546a90e44dfda Mon Sep 17 00:00:00 2001 From: Marianob Span Date: Wed, 5 Mar 2025 22:51:25 -0300 Subject: [PATCH 5/5] Format --- .../test_concurrent_declarative_source.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index f4148c13a..3a0ae682a 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -861,12 +861,12 @@ def test_concurrent_cursor_with_state_in_read_method(): source = ConcurrentDeclarativeSource( source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[] ) - + with HttpMocker() as http_mocker: _mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES) - messages = list(source.read( - logger=source.logger, config=_CONFIG, catalog=catalog, state=state - )) + messages = list( + source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state) + ) concurrent_streams, _ = source._group_streams(config=_CONFIG) party_members_stream = [s for s in concurrent_streams if s.name == "party_members"][0] @@ -894,7 +894,9 @@ def test_concurrent_cursor_with_state_in_read_method(): # Emitted state should have the updated_at of the one record read states = get_states_for_stream("party_members", messages) assert len(states) == 2 - assert states[1].stream.stream_state == AirbyteStateBlob(updated_at=party_members_records[0].data["updated_at"]) + assert states[1].stream.stream_state == AirbyteStateBlob( + updated_at=party_members_records[0].data["updated_at"] + ) def test_check():