From e54702b12eaaedf92c117a557312ad897cfdf8c7 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Mon, 9 Feb 2026 12:33:24 -0800 Subject: [PATCH 1/7] Validate SimpleDataSourceStreamReader end offset advances --- .../tutorial/sql/python_data_source.rst | 6 +++- python/pyspark/errors/error-conditions.json | 5 ++++ python/pyspark/sql/datasource_internal.py | 17 +++++++++++ .../tests/test_python_streaming_datasource.py | 30 +++++++++++++++++++ 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/python/docs/source/tutorial/sql/python_data_source.rst b/python/docs/source/tutorial/sql/python_data_source.rst index b3267405ffdd7..f57cb4eaa108a 100644 --- a/python/docs/source/tutorial/sql/python_data_source.rst +++ b/python/docs/source/tutorial/sql/python_data_source.rst @@ -309,7 +309,11 @@ This is the same dummy streaming reader that generates 2 rows every batch implem def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]: """ Takes start offset as an input, return an iterator of tuples and - the start offset of next read. + the end offset (start offset for the next read). The end offset must + advance past the start offset when returning data; otherwise Spark + raises a validation exception. + For example, returning 2 records from start_idx 0 means end should + be {"offset": 2} (i.e. start + 2). """ start_idx = start["offset"] it = iter([(i,) for i in range(start_idx, start_idx + 2)]) diff --git a/python/pyspark/errors/error-conditions.json b/python/pyspark/errors/error-conditions.json index ee35e237b8983..e417056379918 100644 --- a/python/pyspark/errors/error-conditions.json +++ b/python/pyspark/errors/error-conditions.json @@ -1185,6 +1185,11 @@ "SparkContext or SparkSession should be created first." ] }, + "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE": { + "message": [ + "SimpleDataSourceStreamReader.read() returned a non-empty batch but the end offset did not advance past the start offset. Returning end equal to start with data would cause the same batch to be processed repeatedly. The end offset must represent the position after the last record returned." + ] + }, "SLICE_WITH_STEP": { "message": [ "Slice with step is not supported." diff --git a/python/pyspark/sql/datasource_internal.py b/python/pyspark/sql/datasource_internal.py index 92a968cf05723..aee6e56df0461 100644 --- a/python/pyspark/sql/datasource_internal.py +++ b/python/pyspark/sql/datasource_internal.py @@ -93,6 +93,22 @@ def getDefaultReadLimit(self) -> ReadLimit: # We do not consider providing different read limit on simple stream reader. return ReadAllAvailable() + def _validate_read_result(self, start: dict, end: dict, it: Iterator[Tuple]) -> None: + """ + Validates that read() did not return a non-empty batch with end equal to start, + which would cause the same batch to be processed repeatedly. + """ + if json.dumps(end) != json.dumps(start): + return + try: + next(it) + except StopIteration: + return + raise PySparkException( + errorClass="SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE", + messageParameters={}, + ) + def latestOffset(self, start: dict, limit: ReadLimit) -> dict: assert start is not None, "start offset should not be None" assert isinstance( @@ -100,6 +116,7 @@ def latestOffset(self, start: dict, limit: ReadLimit) -> dict: ), "simple stream reader does not support read limit" (iter, end) = self.simple_reader.read(start) + self._validate_read_result(start, end, iter) self.cache.append(PrefetchedCacheEntry(start, end, iter)) return end diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py b/python/pyspark/sql/tests/test_python_streaming_datasource.py index bef85f7ba8457..3c1e4540593b9 100644 --- a/python/pyspark/sql/tests/test_python_streaming_datasource.py +++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py @@ -41,6 +41,7 @@ have_pyarrow, pyarrow_requirement_message, ) +from pyspark.errors import PySparkException from pyspark.testing import assertDataFrameEqual from pyspark.testing.utils import eventually from pyspark.testing.sqlutils import ReusedSQLTestCase @@ -509,6 +510,35 @@ def check_batch(df, batch_id): q.awaitTermination(timeout=30) self.assertIsNone(q.exception(), "No exception has to be propagated.") + def test_simple_stream_reader_offset_did_not_advance_raises(self): + """Validate that returning end == start with non-empty data raises SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE.""" + from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper + + class BuggySimpleStreamReader(SimpleDataSourceStreamReader): + def initialOffset(self): + return {"offset": 0} + + def read(self, start: dict): + # Bug: return same offset as end despite returning data + start_idx = start["offset"] + it = iter([(i,) for i in range(start_idx, start_idx + 3)]) + return (it, {"offset": start_idx}) + + def readBetweenOffsets(self, start: dict, end: dict): + return iter([]) + + def commit(self, end: dict): + pass + + reader = BuggySimpleStreamReader() + wrapper = _SimpleStreamReaderWrapper(reader) + with self.assertRaises(PySparkException) as cm: + wrapper.latestOffset({"offset": 0}, ReadAllAvailable()) + self.assertEqual( + cm.exception.getCondition(), + "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE", + ) + def test_stream_writer(self): input_dir = tempfile.TemporaryDirectory(prefix="test_data_stream_write_input") output_dir = tempfile.TemporaryDirectory(prefix="test_data_stream_write_output") From 78c35637856fe13d2fbe91bd518975445f99bfbf Mon Sep 17 00:00:00 2001 From: vinodkc Date: Mon, 9 Feb 2026 17:46:25 -0800 Subject: [PATCH 2/7] Cache only when start != end --- python/docs/source/tutorial/sql/python_data_source.rst | 2 ++ python/pyspark/sql/datasource.py | 4 ++++ python/pyspark/sql/datasource_internal.py | 6 ++++-- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/python/docs/source/tutorial/sql/python_data_source.rst b/python/docs/source/tutorial/sql/python_data_source.rst index f57cb4eaa108a..4219c14bc5bcf 100644 --- a/python/docs/source/tutorial/sql/python_data_source.rst +++ b/python/docs/source/tutorial/sql/python_data_source.rst @@ -314,6 +314,8 @@ This is the same dummy streaming reader that generates 2 rows every batch implem raises a validation exception. For example, returning 2 records from start_idx 0 means end should be {"offset": 2} (i.e. start + 2). + When there is no data to read, you may return the same offset as end and + start,but you must provide an empty iterator. """ start_idx = start["offset"] it = iter([(i,) for i in range(start_idx, start_idx + 2)]) diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py index bb73a7a9206b1..3a4c5eece7ec8 100644 --- a/python/pyspark/sql/datasource.py +++ b/python/pyspark/sql/datasource.py @@ -928,6 +928,10 @@ def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]: Read all available data from start offset and return the offset that next read attempt starts from. + When there is data, the end offset must advance past the start offset; otherwise + Spark raises a validation exception. When there is no data to read, the implementation + may return the same offset as both start and end, but must provide an empty iterator. + Parameters ---------- start : dict diff --git a/python/pyspark/sql/datasource_internal.py b/python/pyspark/sql/datasource_internal.py index aee6e56df0461..8b550a4a6d3da 100644 --- a/python/pyspark/sql/datasource_internal.py +++ b/python/pyspark/sql/datasource_internal.py @@ -96,9 +96,12 @@ def getDefaultReadLimit(self) -> ReadLimit: def _validate_read_result(self, start: dict, end: dict, it: Iterator[Tuple]) -> None: """ Validates that read() did not return a non-empty batch with end equal to start, - which would cause the same batch to be processed repeatedly. + which would cause the same batch to be processed repeatedly. When end != start, + appends the result to the cache; when end == start with empty iterator, does not + cache (avoids unbounded cache growth). """ if json.dumps(end) != json.dumps(start): + self.cache.append(PrefetchedCacheEntry(start, end, it)) return try: next(it) @@ -117,7 +120,6 @@ def latestOffset(self, start: dict, limit: ReadLimit) -> dict: (iter, end) = self.simple_reader.read(start) self._validate_read_result(start, end, iter) - self.cache.append(PrefetchedCacheEntry(start, end, iter)) return end def commit(self, end: dict) -> None: From 3644539db34a5a06d7e545e9d0e15ffc1042e7f7 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Mon, 9 Feb 2026 17:53:33 -0800 Subject: [PATCH 3/7] Add test for empty iter and start == end offset --- .../tests/test_python_streaming_datasource.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py b/python/pyspark/sql/tests/test_python_streaming_datasource.py index 3c1e4540593b9..9e36ecd485616 100644 --- a/python/pyspark/sql/tests/test_python_streaming_datasource.py +++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py @@ -539,6 +539,31 @@ def commit(self, end: dict): "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE", ) + def test_simple_stream_reader_empty_iterator_start_equals_end_allowed(self): + """When read() returns end == start with an empty iterator, no exception and no cache entry.""" + from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper + + class EmptyBatchReader(SimpleDataSourceStreamReader): + def initialOffset(self): + return {"offset": 0} + + def read(self, start: dict): + # Valid: same offset as end but empty iterator (no data) + return (iter([]), {"offset": start["offset"]}) + + def readBetweenOffsets(self, start: dict, end: dict): + return iter([]) + + def commit(self, end: dict): + pass + + reader = EmptyBatchReader() + wrapper = _SimpleStreamReaderWrapper(reader) + start = {"offset": 0} + end = wrapper.latestOffset(start, ReadAllAvailable()) + self.assertEqual(end, start) + self.assertEqual(len(wrapper.cache), 0) + def test_stream_writer(self): input_dir = tempfile.TemporaryDirectory(prefix="test_data_stream_write_input") output_dir = tempfile.TemporaryDirectory(prefix="test_data_stream_write_output") From 71b4e1f5a44a0a11af6c64a126d7a761aba654cb Mon Sep 17 00:00:00 2001 From: vinodkc Date: Mon, 9 Feb 2026 18:27:53 -0800 Subject: [PATCH 4/7] Rename error class --- python/pyspark/errors/error-conditions.json | 4 ++-- python/pyspark/sql/datasource.py | 4 ---- python/pyspark/sql/datasource_internal.py | 2 +- python/pyspark/sql/tests/test_python_streaming_datasource.py | 4 ++-- 4 files changed, 5 insertions(+), 9 deletions(-) diff --git a/python/pyspark/errors/error-conditions.json b/python/pyspark/errors/error-conditions.json index e417056379918..be5003ce0a389 100644 --- a/python/pyspark/errors/error-conditions.json +++ b/python/pyspark/errors/error-conditions.json @@ -1185,9 +1185,9 @@ "SparkContext or SparkSession should be created first." ] }, - "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE": { + "STREAM_READER_OFFSET_DID_NOT_ADVANCE": { "message": [ - "SimpleDataSourceStreamReader.read() returned a non-empty batch but the end offset did not advance past the start offset. Returning end equal to start with data would cause the same batch to be processed repeatedly. The end offset must represent the position after the last record returned." + "Stream reader read() returned a non-empty batch but the end offset did not advance past the start offset. Returning end equal to start with data would cause the same batch to be processed repeatedly. The end offset must represent the position after the last record returned." ] }, "SLICE_WITH_STEP": { diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py index 3a4c5eece7ec8..bb73a7a9206b1 100644 --- a/python/pyspark/sql/datasource.py +++ b/python/pyspark/sql/datasource.py @@ -928,10 +928,6 @@ def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]: Read all available data from start offset and return the offset that next read attempt starts from. - When there is data, the end offset must advance past the start offset; otherwise - Spark raises a validation exception. When there is no data to read, the implementation - may return the same offset as both start and end, but must provide an empty iterator. - Parameters ---------- start : dict diff --git a/python/pyspark/sql/datasource_internal.py b/python/pyspark/sql/datasource_internal.py index 8b550a4a6d3da..e97e3811ea2dc 100644 --- a/python/pyspark/sql/datasource_internal.py +++ b/python/pyspark/sql/datasource_internal.py @@ -108,7 +108,7 @@ def _validate_read_result(self, start: dict, end: dict, it: Iterator[Tuple]) -> except StopIteration: return raise PySparkException( - errorClass="SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE", + errorClass="STREAM_READER_OFFSET_DID_NOT_ADVANCE", messageParameters={}, ) diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py b/python/pyspark/sql/tests/test_python_streaming_datasource.py index 9e36ecd485616..2b519083ecf9e 100644 --- a/python/pyspark/sql/tests/test_python_streaming_datasource.py +++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py @@ -511,7 +511,7 @@ def check_batch(df, batch_id): self.assertIsNone(q.exception(), "No exception has to be propagated.") def test_simple_stream_reader_offset_did_not_advance_raises(self): - """Validate that returning end == start with non-empty data raises SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE.""" + """Validate that returning end == start with non-empty data raises STREAM_READER_OFFSET_DID_NOT_ADVANCE.""" from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper class BuggySimpleStreamReader(SimpleDataSourceStreamReader): @@ -536,7 +536,7 @@ def commit(self, end: dict): wrapper.latestOffset({"offset": 0}, ReadAllAvailable()) self.assertEqual( cm.exception.getCondition(), - "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE", + "STREAM_READER_OFFSET_DID_NOT_ADVANCE", ) def test_simple_stream_reader_empty_iterator_start_equals_end_allowed(self): From 28cdcdcd8371d553fe9d7dbbb2970ef262545769 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Mon, 9 Feb 2026 20:32:21 -0800 Subject: [PATCH 5/7] Change order of ERROR class --- python/pyspark/errors/error-conditions.json | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyspark/errors/error-conditions.json b/python/pyspark/errors/error-conditions.json index be5003ce0a389..ecb1445807a35 100644 --- a/python/pyspark/errors/error-conditions.json +++ b/python/pyspark/errors/error-conditions.json @@ -1185,11 +1185,6 @@ "SparkContext or SparkSession should be created first." ] }, - "STREAM_READER_OFFSET_DID_NOT_ADVANCE": { - "message": [ - "Stream reader read() returned a non-empty batch but the end offset did not advance past the start offset. Returning end equal to start with data would cause the same batch to be processed repeatedly. The end offset must represent the position after the last record returned." - ] - }, "SLICE_WITH_STEP": { "message": [ "Slice with step is not supported." @@ -1215,6 +1210,11 @@ "Cannot serialize the function ``. If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For `foreachBatch`, please access the Spark session using `df.sparkSession`, where `df` is the first parameter in your `foreachBatch` function. For `StreamingQueryListener`, please access the Spark session using `self.spark`. For details please check out the PySpark doc for `foreachBatch` and `StreamingQueryListener`." ] }, + "STREAM_READER_OFFSET_DID_NOT_ADVANCE": { + "message": [ + "Stream reader read() returned a non-empty batch but the end offset did not advance past the start offset. Returning end equal to start with data would cause the same batch to be processed repeatedly. The end offset must represent the position after the last record returned." + ] + }, "ST_INVALID_ALGORITHM_VALUE" : { "message" : [ "Invalid or unsupported edge interpolation algorithm value: ''." From 3c0b2057c115b6911408001b414b89ad1ad45558 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Tue, 10 Feb 2026 11:32:20 -0800 Subject: [PATCH 6/7] Fix review comments --- .../docs/source/tutorial/sql/python_data_source.rst | 2 +- python/pyspark/errors/error-conditions.json | 10 +++++----- python/pyspark/sql/datasource_internal.py | 13 ++++++++----- .../sql/tests/test_python_streaming_datasource.py | 8 ++++---- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/python/docs/source/tutorial/sql/python_data_source.rst b/python/docs/source/tutorial/sql/python_data_source.rst index 4219c14bc5bcf..07f35722e73ff 100644 --- a/python/docs/source/tutorial/sql/python_data_source.rst +++ b/python/docs/source/tutorial/sql/python_data_source.rst @@ -315,7 +315,7 @@ This is the same dummy streaming reader that generates 2 rows every batch implem For example, returning 2 records from start_idx 0 means end should be {"offset": 2} (i.e. start + 2). When there is no data to read, you may return the same offset as end and - start,but you must provide an empty iterator. + start, but you must provide an empty iterator. """ start_idx = start["offset"] it = iter([(i,) for i in range(start_idx, start_idx + 2)]) diff --git a/python/pyspark/errors/error-conditions.json b/python/pyspark/errors/error-conditions.json index ecb1445807a35..bbc4d005b490a 100644 --- a/python/pyspark/errors/error-conditions.json +++ b/python/pyspark/errors/error-conditions.json @@ -1185,6 +1185,11 @@ "SparkContext or SparkSession should be created first." ] }, + "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE": { + "message": [ + "SimpleDataSourceStreamReader.read() returned a non-empty batch but the end offset: did not advance past the start offset: . The end offset must represent the position after the last record returned." + ] + }, "SLICE_WITH_STEP": { "message": [ "Slice with step is not supported." @@ -1210,11 +1215,6 @@ "Cannot serialize the function ``. If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For `foreachBatch`, please access the Spark session using `df.sparkSession`, where `df` is the first parameter in your `foreachBatch` function. For `StreamingQueryListener`, please access the Spark session using `self.spark`. For details please check out the PySpark doc for `foreachBatch` and `StreamingQueryListener`." ] }, - "STREAM_READER_OFFSET_DID_NOT_ADVANCE": { - "message": [ - "Stream reader read() returned a non-empty batch but the end offset did not advance past the start offset. Returning end equal to start with data would cause the same batch to be processed repeatedly. The end offset must represent the position after the last record returned." - ] - }, "ST_INVALID_ALGORITHM_VALUE" : { "message" : [ "Invalid or unsupported edge interpolation algorithm value: ''." diff --git a/python/pyspark/sql/datasource_internal.py b/python/pyspark/sql/datasource_internal.py index e97e3811ea2dc..51c1794641bef 100644 --- a/python/pyspark/sql/datasource_internal.py +++ b/python/pyspark/sql/datasource_internal.py @@ -93,14 +93,14 @@ def getDefaultReadLimit(self) -> ReadLimit: # We do not consider providing different read limit on simple stream reader. return ReadAllAvailable() - def _validate_read_result(self, start: dict, end: dict, it: Iterator[Tuple]) -> None: + def add_result_to_cache(self, start: dict, end: dict, it: Iterator[Tuple]) -> None: """ Validates that read() did not return a non-empty batch with end equal to start, which would cause the same batch to be processed repeatedly. When end != start, appends the result to the cache; when end == start with empty iterator, does not cache (avoids unbounded cache growth). """ - if json.dumps(end) != json.dumps(start): + if end != start: self.cache.append(PrefetchedCacheEntry(start, end, it)) return try: @@ -108,8 +108,11 @@ def _validate_read_result(self, start: dict, end: dict, it: Iterator[Tuple]) -> except StopIteration: return raise PySparkException( - errorClass="STREAM_READER_OFFSET_DID_NOT_ADVANCE", - messageParameters={}, + errorClass="SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE", + messageParameters={ + "start_offset": start, + "end_offset": end, + }, ) def latestOffset(self, start: dict, limit: ReadLimit) -> dict: @@ -119,7 +122,7 @@ def latestOffset(self, start: dict, limit: ReadLimit) -> dict: ), "simple stream reader does not support read limit" (iter, end) = self.simple_reader.read(start) - self._validate_read_result(start, end, iter) + self.add_result_to_cache(start, end, iter) return end def commit(self, end: dict) -> None: diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py b/python/pyspark/sql/tests/test_python_streaming_datasource.py index 2b519083ecf9e..5f6aaf10fe01a 100644 --- a/python/pyspark/sql/tests/test_python_streaming_datasource.py +++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py @@ -511,7 +511,7 @@ def check_batch(df, batch_id): self.assertIsNone(q.exception(), "No exception has to be propagated.") def test_simple_stream_reader_offset_did_not_advance_raises(self): - """Validate that returning end == start with non-empty data raises STREAM_READER_OFFSET_DID_NOT_ADVANCE.""" + """Validate that returning end == start with non-empty data raises SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE.""" from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper class BuggySimpleStreamReader(SimpleDataSourceStreamReader): @@ -522,7 +522,7 @@ def read(self, start: dict): # Bug: return same offset as end despite returning data start_idx = start["offset"] it = iter([(i,) for i in range(start_idx, start_idx + 3)]) - return (it, {"offset": start_idx}) + return (it, start) def readBetweenOffsets(self, start: dict, end: dict): return iter([]) @@ -536,7 +536,7 @@ def commit(self, end: dict): wrapper.latestOffset({"offset": 0}, ReadAllAvailable()) self.assertEqual( cm.exception.getCondition(), - "STREAM_READER_OFFSET_DID_NOT_ADVANCE", + "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE", ) def test_simple_stream_reader_empty_iterator_start_equals_end_allowed(self): @@ -549,7 +549,7 @@ def initialOffset(self): def read(self, start: dict): # Valid: same offset as end but empty iterator (no data) - return (iter([]), {"offset": start["offset"]}) + return (iter([]), start) def readBetweenOffsets(self, start: dict, end: dict): return iter([]) From a2908753c92e9ae52046e3ee52adb0e7b1bc6b42 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Tue, 10 Feb 2026 12:17:05 -0800 Subject: [PATCH 7/7] Fix test failure --- python/pyspark/sql/datasource_internal.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/datasource_internal.py b/python/pyspark/sql/datasource_internal.py index 51c1794641bef..2ac6c280e822e 100644 --- a/python/pyspark/sql/datasource_internal.py +++ b/python/pyspark/sql/datasource_internal.py @@ -100,7 +100,9 @@ def add_result_to_cache(self, start: dict, end: dict, it: Iterator[Tuple]) -> No appends the result to the cache; when end == start with empty iterator, does not cache (avoids unbounded cache growth). """ - if end != start: + start_str = json.dumps(start) + end_str = json.dumps(end) + if end_str != start_str: self.cache.append(PrefetchedCacheEntry(start, end, it)) return try: @@ -110,8 +112,8 @@ def add_result_to_cache(self, start: dict, end: dict, it: Iterator[Tuple]) -> No raise PySparkException( errorClass="SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE", messageParameters={ - "start_offset": start, - "end_offset": end, + "start_offset": start_str, + "end_offset": end_str, }, )