From 883b8967a287d67ecf5b970c2581cf3c121ba639 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 19 Jun 2025 14:16:45 +0300 Subject: [PATCH 1/7] sync records without cursor field in ConcurrentPerPartitionCursor --- .../incremental/concurrent_partition_cursor.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index a2194c75a..77c65523a 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -448,8 +448,14 @@ def observe(self, record: Record) -> None: "Invalid state as stream slices that are emitted should refer to an existing cursor" ) + # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do + try: + record_cursor_value = self._cursor_field.extract_value(record) + except ValueError: + return + record_cursor = self._connector_state_converter.output_format( - self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) + self._connector_state_converter.parse_value(record_cursor_value) ) self._update_global_cursor(record_cursor) if not self._use_global_cursor: From 683d97453195708839808b84238d084b49964cd3 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 19 Jun 2025 16:16:01 +0300 Subject: [PATCH 2/7] fix assert for expected records in test_dynamic_streams_with_parametrized_components_resolver --- .../resolvers/test_parametrized_components_resolver.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/resolvers/test_parametrized_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_parametrized_components_resolver.py index 279f96008..ea66bd8bb 100644 --- a/unit_tests/sources/declarative/resolvers/test_parametrized_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_parametrized_components_resolver.py @@ -172,9 +172,12 @@ def test_dynamic_streams_with_parametrized_components_resolver(): if record.type == Type.RECORD ] - assert len(records) == 3 - assert [dict(sorted(record.record.data.items())) for record in records] == [ + expected_records = [ {"field1": "Customers info", "field2": "Related to customers field"}, {"field1": "Refunds info", "field2": "Related to refunds field"}, {"field1": "Orders info", "field2": "Related to orders field"}, ] + assert len(records) == len(expected_records) + for record in records: + record_data = record.record.data + assert record_data in expected_records From 2aec1df90ee0bd51472bbc995d028f1d667b9618 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Mon, 23 Jun 2025 18:08:22 +0300 Subject: [PATCH 3/7] added unit test --- .../test_concurrent_perpartitioncursor.py | 172 ++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index ae6ec0713..cdfd99e82 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -1460,6 +1460,178 @@ def run_incremental_parent_state_test( ], }, ), + ( + "test_incremental_parent_state_records_without_cursor", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}", + { + "posts": [ + {"id": 1, "updated_at": POST_1_UPDATED_AT}, + ] + }, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + { + "id": 9, + "post_id": 1, + "updated_at": COMMENT_9_OLDEST, + }, + { + "id": 10, + "post_id": 1, + "updated_at": COMMENT_10_UPDATED_AT, + }, + { + "id": 11, + "post_id": 1, + "updated_at": COMMENT_11_UPDATED_AT, + }, + ] + }, + ), + # Fetch the first page of votes for comment 10 of post 1 (vote without cursor field) + ( + f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + { + "votes": [ + { + "id": 100, + "comment_id": 10, + } + ], + }, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/11/votes" + f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + {"votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={LOOKBACK_DATE}", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [{"id": 20, "post_id": 2, "updated_at": COMMENT_20_UPDATED_AT}], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Requests with intermediate states + # Fetch votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={VOTE_100_CREATED_AT}", + { + "votes": [{"id": 100, "comment_id": 10, "created_at": VOTE_100_CREATED_AT}], + }, + ), + # Fetch votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={VOTE_111_CREATED_AT}", + { + "votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}], + }, + ), + # Fetch votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={VOTE_111_CREATED_AT}", + { + "votes": [], + }, + ), + ], + # Expected records + [ + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "id": 100 + }, + { + "comment_id": 11, + "comment_updated_at": COMMENT_11_UPDATED_AT, + "created_at": "2024-01-13T00:00:00Z", + "id": 111 + } + ], + # Number of intermediate states - 6 as number of parent partitions + 2, + # Initial state + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, + } + ], + "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, + } + }, + "state": {"created_at": INITIAL_GLOBAL_CURSOR}, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + "lookback_window": 86400, + }, + # Expected state + { + "state": {"created_at": VOTE_111_CREATED_AT}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": COMMENT_10_UPDATED_AT}, # 10 is the "latest" + "parent_state": { + "posts": {"updated_at": POST_1_UPDATED_AT} + }, # post 1 is the latest + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_10_UPDATED_AT}, + }, + ], + } + }, + "lookback_window": 1, + "use_global_cursor": False, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + # initial state because record doesn't have a cursor field + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_111_CREATED_AT}, + }, + ], + }, + ), ], ) def test_incremental_parent_state( From 11039b903335c6fd88e463869e287fa76e1f561d Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Mon, 23 Jun 2025 15:09:51 +0000 Subject: [PATCH 4/7] Auto-fix lint and format issues --- .../incremental/test_concurrent_perpartitioncursor.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index cdfd99e82..2deadbd29 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -1552,17 +1552,13 @@ def run_incremental_parent_state_test( ], # Expected records [ - { - "comment_id": 10, - "comment_updated_at": COMMENT_10_UPDATED_AT, - "id": 100 - }, + {"comment_id": 10, "comment_updated_at": COMMENT_10_UPDATED_AT, "id": 100}, { "comment_id": 11, "comment_updated_at": COMMENT_11_UPDATED_AT, "created_at": "2024-01-13T00:00:00Z", - "id": 111 - } + "id": 111, + }, ], # Number of intermediate states - 6 as number of parent partitions 2, From f7aa910d535247970f6805fb37e700910222a85b Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Mon, 23 Jun 2025 18:36:30 +0300 Subject: [PATCH 5/7] updated unit test --- .../test_concurrent_perpartitioncursor.py | 167 +++++++++++++++++- 1 file changed, 158 insertions(+), 9 deletions(-) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 2deadbd29..1f79befb7 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -1461,7 +1461,7 @@ def run_incremental_parent_state_test( }, ), ( - "test_incremental_parent_state_records_without_cursor", + "test_incremental_parent_state_one_record_without_cursor", SUBSTREAM_MANIFEST, [ # Fetch the first page of posts @@ -1499,14 +1499,7 @@ def run_incremental_parent_state_test( # Fetch the first page of votes for comment 10 of post 1 (vote without cursor field) ( f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", - { - "votes": [ - { - "id": 100, - "comment_id": 10, - } - ], - }, + {"votes": [{"id": 100, "comment_id": 10,}],}, ), # Fetch the first page of votes for comment 11 of post 1 ( @@ -1628,6 +1621,162 @@ def run_incremental_parent_state_test( ], }, ), + ( + "test_incremental_parent_state_all_records_without_cursor", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}", + { + "posts": [ + {"id": 1, "updated_at": POST_1_UPDATED_AT}, + ] + }, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + { + "id": 9, + "post_id": 1, + "updated_at": COMMENT_9_OLDEST, + }, + { + "id": 10, + "post_id": 1, + "updated_at": COMMENT_10_UPDATED_AT, + }, + { + "id": 11, + "post_id": 1, + "updated_at": COMMENT_11_UPDATED_AT, + }, + ] + }, + ), + # Fetch the first page of votes for comment 10 of post 1 (vote without cursor field) + ( + f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + {"votes": [{"id": 100, "comment_id": 10,}],}, + ), + # Fetch the first page of votes for comment 11 of post 1 (vote without cursor field) + ( + f"https://api.example.com/community/posts/1/comments/11/votes" + f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + {"votes": [{"id": 111, "comment_id": 11}]}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={LOOKBACK_DATE}", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [{"id": 20, "post_id": 2, "updated_at": COMMENT_20_UPDATED_AT}], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Requests with intermediate states + # Fetch votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={VOTE_100_CREATED_AT}", + { + "votes": [{"id": 100, "comment_id": 10, "created_at": VOTE_100_CREATED_AT}], + }, + ), + # Fetch votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={VOTE_111_CREATED_AT}", + { + "votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}], + }, + ), + # Fetch votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={VOTE_111_CREATED_AT}", + { + "votes": [], + }, + ), + ], + # Expected records + [ + {"comment_id": 10, "comment_updated_at": COMMENT_10_UPDATED_AT, "id": 100}, + {"comment_id": 11, "comment_updated_at": COMMENT_11_UPDATED_AT, "id": 111,}, + ], + # Number of intermediate states - 6 as number of parent partitions + 2, + # Initial state + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, + } + ], + "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, + } + }, + "state": {"created_at": INITIAL_GLOBAL_CURSOR}, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + "lookback_window": 86400, + }, + # Expected state + { + "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": COMMENT_10_UPDATED_AT}, # 10 is the "latest" + "parent_state": { + "posts": {"updated_at": POST_1_UPDATED_AT} + }, # post 1 is the latest + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_10_UPDATED_AT}, + }, + ], + } + }, + "lookback_window": 1, + "use_global_cursor": False, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + # initial state because record doesn't have a cursor field + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + }, + ), ], ) def test_incremental_parent_state( From 6ae9a612c2285b4c6955e1105d15001458733130 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Mon, 23 Jun 2025 15:40:34 +0000 Subject: [PATCH 6/7] Auto-fix lint and format issues --- .../test_concurrent_perpartitioncursor.py | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 1f79befb7..0457d88b3 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -1499,7 +1499,14 @@ def run_incremental_parent_state_test( # Fetch the first page of votes for comment 10 of post 1 (vote without cursor field) ( f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", - {"votes": [{"id": 100, "comment_id": 10,}],}, + { + "votes": [ + { + "id": 100, + "comment_id": 10, + } + ], + }, ), # Fetch the first page of votes for comment 11 of post 1 ( @@ -1660,7 +1667,14 @@ def run_incremental_parent_state_test( # Fetch the first page of votes for comment 10 of post 1 (vote without cursor field) ( f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", - {"votes": [{"id": 100, "comment_id": 10,}],}, + { + "votes": [ + { + "id": 100, + "comment_id": 10, + } + ], + }, ), # Fetch the first page of votes for comment 11 of post 1 (vote without cursor field) ( @@ -1707,7 +1721,11 @@ def run_incremental_parent_state_test( # Expected records [ {"comment_id": 10, "comment_updated_at": COMMENT_10_UPDATED_AT, "id": 100}, - {"comment_id": 11, "comment_updated_at": COMMENT_11_UPDATED_AT, "id": 111,}, + { + "comment_id": 11, + "comment_updated_at": COMMENT_11_UPDATED_AT, + "id": 111, + }, ], # Number of intermediate states - 6 as number of parent partitions 2, From 53a5196919840af11035912795c4dd6a1ab468e8 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Mon, 23 Jun 2025 19:01:18 +0300 Subject: [PATCH 7/7] delete redundant mocks --- .../test_concurrent_perpartitioncursor.py | 62 ------------------- 1 file changed, 62 deletions(-) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 0457d88b3..13d1194dd 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -1514,27 +1514,7 @@ def run_incremental_parent_state_test( f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", {"votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}]}, ), - # Fetch the first page of votes for comment 12 of post 1 - ( - f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={LOOKBACK_DATE}", - {"votes": []}, - ), - # Fetch the first page of comments for post 2 - ( - "https://api.example.com/community/posts/2/comments?per_page=100", - { - "comments": [{"id": 20, "post_id": 2, "updated_at": COMMENT_20_UPDATED_AT}], - "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", - }, - ), # Requests with intermediate states - # Fetch votes for comment 10 of post 1 - ( - f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={VOTE_100_CREATED_AT}", - { - "votes": [{"id": 100, "comment_id": 10, "created_at": VOTE_100_CREATED_AT}], - }, - ), # Fetch votes for comment 11 of post 1 ( f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={VOTE_111_CREATED_AT}", @@ -1542,13 +1522,6 @@ def run_incremental_parent_state_test( "votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}], }, ), - # Fetch votes for comment 12 of post 1 - ( - f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={VOTE_111_CREATED_AT}", - { - "votes": [], - }, - ), ], # Expected records [ @@ -1682,41 +1655,6 @@ def run_incremental_parent_state_test( f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", {"votes": [{"id": 111, "comment_id": 11}]}, ), - # Fetch the first page of votes for comment 12 of post 1 - ( - f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={LOOKBACK_DATE}", - {"votes": []}, - ), - # Fetch the first page of comments for post 2 - ( - "https://api.example.com/community/posts/2/comments?per_page=100", - { - "comments": [{"id": 20, "post_id": 2, "updated_at": COMMENT_20_UPDATED_AT}], - "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", - }, - ), - # Requests with intermediate states - # Fetch votes for comment 10 of post 1 - ( - f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={VOTE_100_CREATED_AT}", - { - "votes": [{"id": 100, "comment_id": 10, "created_at": VOTE_100_CREATED_AT}], - }, - ), - # Fetch votes for comment 11 of post 1 - ( - f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={VOTE_111_CREATED_AT}", - { - "votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}], - }, - ), - # Fetch votes for comment 12 of post 1 - ( - f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={VOTE_111_CREATED_AT}", - { - "votes": [], - }, - ), ], # Expected records [