Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,14 @@ def observe(self, record: Record) -> None:
"Invalid state as stream slices that are emitted should refer to an existing cursor"
)

# if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
try:
record_cursor_value = self._cursor_field.extract_value(record)
except ValueError:
return

record_cursor = self._connector_state_converter.output_format(
self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
self._connector_state_converter.parse_value(record_cursor_value)
)
self._update_global_cursor(record_cursor)
if not self._use_global_cursor:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,279 @@ def run_incremental_parent_state_test(
],
},
),
(
"test_incremental_parent_state_one_record_without_cursor",
SUBSTREAM_MANIFEST,
[
# Fetch the first page of posts
(
f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}",
{
"posts": [
{"id": 1, "updated_at": POST_1_UPDATED_AT},
]
},
),
# Fetch the first page of comments for post 1
(
"https://api.example.com/community/posts/1/comments?per_page=100",
{
"comments": [
{
"id": 9,
"post_id": 1,
"updated_at": COMMENT_9_OLDEST,
},
{
"id": 10,
"post_id": 1,
"updated_at": COMMENT_10_UPDATED_AT,
},
{
"id": 11,
"post_id": 1,
"updated_at": COMMENT_11_UPDATED_AT,
},
]
},
),
# Fetch the first page of votes for comment 10 of post 1 (vote without cursor field)
(
f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
{
"votes": [
{
"id": 100,
"comment_id": 10,
}
],
},
),
# Fetch the first page of votes for comment 11 of post 1
(
f"https://api.example.com/community/posts/1/comments/11/votes"
f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}",
{"votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}]},
),
# Requests with intermediate states
# Fetch votes for comment 11 of post 1
(
f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={VOTE_111_CREATED_AT}",
{
"votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}],
},
),
],
# Expected records
[
{"comment_id": 10, "comment_updated_at": COMMENT_10_UPDATED_AT, "id": 100},
{
"comment_id": 11,
"comment_updated_at": COMMENT_11_UPDATED_AT,
"created_at": "2024-01-13T00:00:00Z",
"id": 111,
},
],
# Number of intermediate states - 6 as number of parent partitions
2,
# Initial state
{
"parent_state": {
"post_comments": {
"states": [
{
"partition": {"id": 1, "parent_slice": {}},
"cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1},
}
],
"parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}},
}
},
"state": {"created_at": INITIAL_GLOBAL_CURSOR},
"states": [
{
"partition": {
"id": 10,
"parent_slice": {"id": 1, "parent_slice": {}},
},
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR},
},
{
"partition": {
"id": 11,
"parent_slice": {"id": 1, "parent_slice": {}},
},
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
},
],
"lookback_window": 86400,
},
# Expected state
{
"state": {"created_at": VOTE_111_CREATED_AT},
"parent_state": {
"post_comments": {
"use_global_cursor": False,
"state": {"updated_at": COMMENT_10_UPDATED_AT}, # 10 is the "latest"
"parent_state": {
"posts": {"updated_at": POST_1_UPDATED_AT}
}, # post 1 is the latest
"lookback_window": 1,
"states": [
{
"partition": {"id": 1, "parent_slice": {}},
"cursor": {"updated_at": COMMENT_10_UPDATED_AT},
},
],
}
},
"lookback_window": 1,
"use_global_cursor": False,
"states": [
{
"partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}},
# initial state because record doesn't have a cursor field
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR},
},
{
"partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}},
"cursor": {"created_at": VOTE_111_CREATED_AT},
},
],
},
),
(
"test_incremental_parent_state_all_records_without_cursor",
SUBSTREAM_MANIFEST,
[
# Fetch the first page of posts
(
f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}",
{
"posts": [
{"id": 1, "updated_at": POST_1_UPDATED_AT},
]
},
),
# Fetch the first page of comments for post 1
(
"https://api.example.com/community/posts/1/comments?per_page=100",
{
"comments": [
{
"id": 9,
"post_id": 1,
"updated_at": COMMENT_9_OLDEST,
},
{
"id": 10,
"post_id": 1,
"updated_at": COMMENT_10_UPDATED_AT,
},
{
"id": 11,
"post_id": 1,
"updated_at": COMMENT_11_UPDATED_AT,
},
]
},
),
# Fetch the first page of votes for comment 10 of post 1 (vote without cursor field)
(
f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
{
"votes": [
{
"id": 100,
"comment_id": 10,
}
],
},
),
# Fetch the first page of votes for comment 11 of post 1 (vote without cursor field)
(
f"https://api.example.com/community/posts/1/comments/11/votes"
f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}",
{"votes": [{"id": 111, "comment_id": 11}]},
),
],
# Expected records
[
{"comment_id": 10, "comment_updated_at": COMMENT_10_UPDATED_AT, "id": 100},
{
"comment_id": 11,
"comment_updated_at": COMMENT_11_UPDATED_AT,
"id": 111,
},
],
# Number of intermediate states - 6 as number of parent partitions
2,
# Initial state
{
"parent_state": {
"post_comments": {
"states": [
{
"partition": {"id": 1, "parent_slice": {}},
"cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1},
}
],
"parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}},
}
},
"state": {"created_at": INITIAL_GLOBAL_CURSOR},
"states": [
{
"partition": {
"id": 10,
"parent_slice": {"id": 1, "parent_slice": {}},
},
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR},
},
{
"partition": {
"id": 11,
"parent_slice": {"id": 1, "parent_slice": {}},
},
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
},
],
"lookback_window": 86400,
},
# Expected state
{
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
"parent_state": {
"post_comments": {
"use_global_cursor": False,
"state": {"updated_at": COMMENT_10_UPDATED_AT}, # 10 is the "latest"
"parent_state": {
"posts": {"updated_at": POST_1_UPDATED_AT}
}, # post 1 is the latest
"lookback_window": 1,
"states": [
{
"partition": {"id": 1, "parent_slice": {}},
"cursor": {"updated_at": COMMENT_10_UPDATED_AT},
},
],
}
},
"lookback_window": 1,
"use_global_cursor": False,
"states": [
{
"partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}},
# initial state because record doesn't have a cursor field
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR},
},
{
"partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}},
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
},
],
},
),
],
)
def test_incremental_parent_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,12 @@ def test_dynamic_streams_with_parametrized_components_resolver():
if record.type == Type.RECORD
]

assert len(records) == 3
assert [dict(sorted(record.record.data.items())) for record in records] == [
expected_records = [
{"field1": "Customers info", "field2": "Related to customers field"},
{"field1": "Refunds info", "field2": "Related to refunds field"},
{"field1": "Orders info", "field2": "Related to orders field"},
]
assert len(records) == len(expected_records)
Comment thread
darynaishchenko marked this conversation as resolved.
for record in records:
record_data = record.record.data
assert record_data in expected_records
Loading