From e822ddf68ef796fafb4da02068eeedc0bd3bfdc1 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Thu, 20 Mar 2025 16:22:55 -0400 Subject: [PATCH 1/2] Ensure most_recent_cursor_value is deserialized --- .../abstract_stream_state_converter.py | 2 + .../sources/streams/concurrent/test_cursor.py | 59 +++++++++++++++++-- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py index ccff41ba7..b51da51eb 100644 --- a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py @@ -71,6 +71,8 @@ def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, An for stream_slice in state.get("slices", []): stream_slice[self.START_KEY] = self._from_state_message(stream_slice[self.START_KEY]) stream_slice[self.END_KEY] = self._from_state_message(stream_slice[self.END_KEY]) + if self.MOST_RECENT_RECORD_KEY in stream_slice: + stream_slice[self.MOST_RECENT_RECORD_KEY] = self._from_state_message(stream_slice[self.MOST_RECENT_RECORD_KEY]) return state def serialize( diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index e87976964..53f59615e 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +from copy import deepcopy from datetime import datetime, timedelta, timezone from functools import partial from typing import Any, Mapping, Optional @@ -84,7 +84,7 @@ def _cursor_with_slice_boundary_fields( return ConcurrentCursor( _A_STREAM_NAME, _A_STREAM_NAMESPACE, - {}, + deepcopy(_NO_STATE), self._message_repository, self._state_manager, EpochValueConcurrentStreamStateConverter(is_sequential_state), @@ -99,7 +99,7 @@ def _cursor_without_slice_boundary_fields(self) -> ConcurrentCursor: return ConcurrentCursor( _A_STREAM_NAME, _A_STREAM_NAMESPACE, - {}, + deepcopy(_NO_STATE), self._message_repository, self._state_manager, EpochValueConcurrentStreamStateConverter(is_sequential_state=True), @@ -265,7 +265,7 @@ def test_given_no_state_when_generate_slices_then_create_slice_from_start_to_end cursor = ConcurrentCursor( _A_STREAM_NAME, _A_STREAM_NAMESPACE, - _NO_STATE, + deepcopy(_NO_STATE), self._message_repository, self._state_manager, EpochValueConcurrentStreamStateConverter(is_sequential_state=False), @@ -950,6 +950,57 @@ def test_given_initial_state_is_sequential_and_start_provided_when_generate_slic }, # State message is updated to the legacy format before being emitted ) + @freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(50, timezone.utc)) + def test_given_most_recent_cursor_value_in_input_state_when_emit_state_then_serialize_state_properly( + self, + ) -> None: + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + { + "state_type": ConcurrencyCompatibleStateType.date_range.value, + "slices": [ + { + EpochValueConcurrentStreamStateConverter.START_KEY: 0, + EpochValueConcurrentStreamStateConverter.END_KEY: 20, + EpochValueConcurrentStreamStateConverter.MOST_RECENT_RECORD_KEY: 15, + }, + ], + }, + self._message_repository, + self._state_manager, + EpochValueConcurrentStreamStateConverter(is_sequential_state=False), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + datetime.fromtimestamp(0, timezone.utc), + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + cursor.close_partition(_partition( + StreamSlice( + partition={}, + cursor_slice={ + _LOWER_SLICE_BOUNDARY_FIELD: 20, + _UPPER_SLICE_BOUNDARY_FIELD: 50, + }, + ), + _stream_name=_A_STREAM_NAME, + )) + + expected_state = { + "state_type": ConcurrencyCompatibleStateType.date_range.value, + "slices": [ + { + EpochValueConcurrentStreamStateConverter.START_KEY: 0, + EpochValueConcurrentStreamStateConverter.END_KEY: 50, + EpochValueConcurrentStreamStateConverter.MOST_RECENT_RECORD_KEY: 15, + }, + ], + } + self._state_manager.update_state_for_stream.assert_called_once_with(_A_STREAM_NAME, _A_STREAM_NAMESPACE, expected_state) + + class ClampingIntegrationTest(TestCase): def setUp(self) -> None: From 0b2620df2c3528f0f0658a33f21f6f0ab425aeaf Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 20 Mar 2025 20:27:27 +0000 Subject: [PATCH 2/2] Auto-fix lint and format issues --- .../abstract_stream_state_converter.py | 4 ++- .../sources/streams/concurrent/test_cursor.py | 27 ++++++++++--------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py index b51da51eb..7489eaf40 100644 --- a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py @@ -72,7 +72,9 @@ def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, An stream_slice[self.START_KEY] = self._from_state_message(stream_slice[self.START_KEY]) stream_slice[self.END_KEY] = self._from_state_message(stream_slice[self.END_KEY]) if self.MOST_RECENT_RECORD_KEY in stream_slice: - stream_slice[self.MOST_RECENT_RECORD_KEY] = self._from_state_message(stream_slice[self.MOST_RECENT_RECORD_KEY]) + stream_slice[self.MOST_RECENT_RECORD_KEY] = self._from_state_message( + stream_slice[self.MOST_RECENT_RECORD_KEY] + ) return state def serialize( diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index 53f59615e..ddca1c689 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -977,16 +977,18 @@ def test_given_most_recent_cursor_value_in_input_state_when_emit_state_then_seri _NO_LOOKBACK_WINDOW, ) - cursor.close_partition(_partition( - StreamSlice( - partition={}, - cursor_slice={ - _LOWER_SLICE_BOUNDARY_FIELD: 20, - _UPPER_SLICE_BOUNDARY_FIELD: 50, - }, - ), - _stream_name=_A_STREAM_NAME, - )) + cursor.close_partition( + _partition( + StreamSlice( + partition={}, + cursor_slice={ + _LOWER_SLICE_BOUNDARY_FIELD: 20, + _UPPER_SLICE_BOUNDARY_FIELD: 50, + }, + ), + _stream_name=_A_STREAM_NAME, + ) + ) expected_state = { "state_type": ConcurrencyCompatibleStateType.date_range.value, @@ -998,8 +1000,9 @@ def test_given_most_recent_cursor_value_in_input_state_when_emit_state_then_seri }, ], } - self._state_manager.update_state_for_stream.assert_called_once_with(_A_STREAM_NAME, _A_STREAM_NAMESPACE, expected_state) - + self._state_manager.update_state_for_stream.assert_called_once_with( + _A_STREAM_NAME, _A_STREAM_NAMESPACE, expected_state + ) class ClampingIntegrationTest(TestCase):