From 3e1a5fb8d0e0a6de50ac7c89b0ad5a2507a2ea95 Mon Sep 17 00:00:00 2001 From: ChristoGrab Date: Mon, 24 Feb 2025 14:53:51 -0800 Subject: [PATCH 01/12] chore: read CSV in-memory --- .../sources/declarative/decoders/composite_raw_decoder.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index b8e8e3315..6b1280453 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -5,7 +5,7 @@ import logging from abc import ABC, abstractmethod from dataclasses import dataclass -from io import BufferedIOBase, TextIOWrapper +from io import BufferedIOBase, BytesIO, TextIOWrapper from typing import Any, Generator, MutableMapping, Optional import orjson @@ -124,7 +124,8 @@ def parse( """ Parse CSV data from decompressed bytes. """ - text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore + bytes_data = BytesIO(data.read()) + text_data = TextIOWrapper(bytes_data, encoding=self.encoding) # type: ignore reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",") for row in reader: yield row From bb470d80a50089e52a4049cc9c978124977e14ee Mon Sep 17 00:00:00 2001 From: ChristoGrab Date: Tue, 25 Feb 2025 10:05:47 -0800 Subject: [PATCH 02/12] feat: prevent csv raw response from auto-closing during sync --- .../sources/declarative/decoders/composite_raw_decoder.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 6b1280453..a0753dc2f 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -152,6 +152,8 @@ def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: if self.is_stream_response(): + response.raw.auto_close = False yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] + response.raw.close() else: yield from self.parser.parse(data=io.BytesIO(response.content)) From 92c19147455b25e91437bbb8bcdbf7e03c2a3260 Mon Sep 17 00:00:00 2001 From: ChristoGrab Date: Tue, 25 Feb 2025 10:28:26 -0800 Subject: [PATCH 03/12] whoops remove in-memory read --- .../sources/declarative/decoders/composite_raw_decoder.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index a0753dc2f..865de267a 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -5,7 +5,7 @@ import logging from abc import ABC, abstractmethod from dataclasses import dataclass -from io import BufferedIOBase, BytesIO, TextIOWrapper +from io import BufferedIOBase, TextIOWrapper from typing import Any, Generator, MutableMapping, Optional import orjson @@ -124,8 +124,7 @@ def parse( """ Parse CSV data from decompressed bytes. """ - bytes_data = BytesIO(data.read()) - text_data = TextIOWrapper(bytes_data, encoding=self.encoding) # type: ignore + text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",") for row in reader: yield row From 06ff3225460387984c9e1bee26f274b4353ec556 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Tue, 4 Mar 2025 16:19:53 -0500 Subject: [PATCH 04/12] add test --- .../decoders/test_composite_decoder.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 745113925..60b058cdf 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -4,7 +4,9 @@ import csv import gzip import json +from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO, StringIO +from threading import Thread from unittest.mock import patch import pytest @@ -202,6 +204,28 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d assert parsed_records == expected_data +class TestServer(BaseHTTPRequestHandler): + + def do_GET(self) -> None: + self.send_response(200) + self.end_headers() + self.wfile.write(bytes("col1,col2\nval1,val2", 'utf-8')) + + +def test_composite_raw_decoder_csv_parser(): + # start server + httpd = HTTPServer(('localhost', 8080), TestServer) + thread = Thread(target=httpd.serve_forever, args = ()) + thread.start() + + response = requests.get("http://localhost:8080", stream=True) + result = list(CompositeRawDecoder(parser=CsvParser()).decode(response)) + + assert len(result) == 1 + httpd.shutdown() # release port + thread.join() + + def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock): requests_mock.register_uri( "GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode() From 8fae49e755e6b26afe9a37072ed0eebb5f82aa83 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Tue, 4 Mar 2025 16:21:54 -0500 Subject: [PATCH 05/12] document test better --- .../sources/declarative/decoders/test_composite_decoder.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 60b058cdf..41a0cc039 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -212,7 +212,10 @@ def do_GET(self) -> None: self.wfile.write(bytes("col1,col2\nval1,val2", 'utf-8')) -def test_composite_raw_decoder_csv_parser(): +def test_composite_raw_decoder_csv_parser_without_mocked_response(): + """ + This test reproduce a `ValueError: I/O operation on closed file` error we had with CSV parsing. We could not catch this with other tests because the closing of the mocked response from requests_mock was not the same as the one in requests. + """ # start server httpd = HTTPServer(('localhost', 8080), TestServer) thread = Thread(target=httpd.serve_forever, args = ()) From f276a6ea8b6b1f669f971d9047be83ec0420dd02 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Tue, 4 Mar 2025 21:22:43 +0000 Subject: [PATCH 06/12] Auto-fix lint and format issues --- .../declarative/decoders/test_composite_decoder.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 41a0cc039..a67dd5c8d 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -205,20 +205,19 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d class TestServer(BaseHTTPRequestHandler): - def do_GET(self) -> None: - self.send_response(200) - self.end_headers() - self.wfile.write(bytes("col1,col2\nval1,val2", 'utf-8')) + self.send_response(200) + self.end_headers() + self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8")) def test_composite_raw_decoder_csv_parser_without_mocked_response(): """ - This test reproduce a `ValueError: I/O operation on closed file` error we had with CSV parsing. We could not catch this with other tests because the closing of the mocked response from requests_mock was not the same as the one in requests. + This test reproduce a `ValueError: I/O operation on closed file` error we had with CSV parsing. We could not catch this with other tests because the closing of the mocked response from requests_mock was not the same as the one in requests. """ # start server - httpd = HTTPServer(('localhost', 8080), TestServer) - thread = Thread(target=httpd.serve_forever, args = ()) + httpd = HTTPServer(("localhost", 8080), TestServer) + thread = Thread(target=httpd.serve_forever, args=()) thread.start() response = requests.get("http://localhost:8080", stream=True) From 5364059f268d7848b0906b3a1e87c3226e54e23b Mon Sep 17 00:00:00 2001 From: maxi297 Date: Wed, 5 Mar 2025 10:39:11 -0500 Subject: [PATCH 07/12] Improve documentation on the issue --- .../declarative/decoders/composite_raw_decoder.py | 2 ++ .../declarative/decoders/test_composite_decoder.py | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 865de267a..93775c1ec 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -151,6 +151,8 @@ def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: if self.is_stream_response(): + # urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content) + # We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution. response.raw.auto_close = False yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] response.raw.close() diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index a67dd5c8d..97f6b9f4b 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -214,7 +214,15 @@ def do_GET(self) -> None: def test_composite_raw_decoder_csv_parser_without_mocked_response(): """ This test reproduce a `ValueError: I/O operation on closed file` error we had with CSV parsing. We could not catch this with other tests because the closing of the mocked response from requests_mock was not the same as the one in requests. + + We first identified this issue while working with the sample defined https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv. + This should be reproducible by having the test server return the `self.wfile.write` statement as a comment below but it does not. However, it wasn't reproducible. + + Currently we use `self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8"))` to reproduce which we know is not a valid csv as it does not end with a newline character. However, this is the only we were able to reproduce locally. """ + # self.wfile.write(bytes('John,Doe,120 jefferson st.,Riverside, NJ, 08075\nJack,McGinnis,220 hobo Av.,Phila, PA,09119\n"John ""Da Man""",Repici,120 Jefferson St.,Riverside, NJ,08075\nStephen,Tyler,"7452 Terrace ""At the Plaza"" road",SomeTown,SD, 91234\n,Blankman,,SomeTown, SD, 00298\n"Joan ""the bone"", Anne",Jet,"9th, at Terrace plc",Desert City,CO,00123\n', "utf-8")) + + # start server httpd = HTTPServer(("localhost", 8080), TestServer) thread = Thread(target=httpd.serve_forever, args=()) @@ -224,8 +232,7 @@ def test_composite_raw_decoder_csv_parser_without_mocked_response(): result = list(CompositeRawDecoder(parser=CsvParser()).decode(response)) assert len(result) == 1 - httpd.shutdown() # release port - thread.join() + httpd.shutdown() # release port and kill the thread def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock): From db951b21e76d134d64d93e9de8fc8d6871953744 Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com> Date: Wed, 5 Mar 2025 11:09:19 -0500 Subject: [PATCH 08/12] Update unit_tests/sources/declarative/decoders/test_composite_decoder.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .../decoders/test_composite_decoder.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 97f6b9f4b..406019afe 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -227,13 +227,15 @@ def test_composite_raw_decoder_csv_parser_without_mocked_response(): httpd = HTTPServer(("localhost", 8080), TestServer) thread = Thread(target=httpd.serve_forever, args=()) thread.start() - - response = requests.get("http://localhost:8080", stream=True) - result = list(CompositeRawDecoder(parser=CsvParser()).decode(response)) - - assert len(result) == 1 - httpd.shutdown() # release port and kill the thread - + thread.start() + try: + response = requests.get(f"http://localhost:{port}", stream=True) + result = list(CompositeRawDecoder(parser=CsvParser()).decode(response)) + + assert len(result) == 1 + finally: + httpd.shutdown() # release port and kill the thread + thread.join(timeout=5) # ensure thread is cleaned up def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock): requests_mock.register_uri( From e4311af0ee82f4ba9d40ea5a8b9be71b3676081b Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Wed, 5 Mar 2025 16:10:34 +0000 Subject: [PATCH 09/12] Auto-fix lint and format issues --- .../sources/declarative/decoders/test_composite_decoder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 406019afe..b209a150d 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -222,7 +222,6 @@ def test_composite_raw_decoder_csv_parser_without_mocked_response(): """ # self.wfile.write(bytes('John,Doe,120 jefferson st.,Riverside, NJ, 08075\nJack,McGinnis,220 hobo Av.,Phila, PA,09119\n"John ""Da Man""",Repici,120 Jefferson St.,Riverside, NJ,08075\nStephen,Tyler,"7452 Terrace ""At the Plaza"" road",SomeTown,SD, 91234\n,Blankman,,SomeTown, SD, 00298\n"Joan ""the bone"", Anne",Jet,"9th, at Terrace plc",Desert City,CO,00123\n', "utf-8")) - # start server httpd = HTTPServer(("localhost", 8080), TestServer) thread = Thread(target=httpd.serve_forever, args=()) @@ -231,12 +230,13 @@ def test_composite_raw_decoder_csv_parser_without_mocked_response(): try: response = requests.get(f"http://localhost:{port}", stream=True) result = list(CompositeRawDecoder(parser=CsvParser()).decode(response)) - + assert len(result) == 1 finally: httpd.shutdown() # release port and kill the thread thread.join(timeout=5) # ensure thread is cleaned up + def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock): requests_mock.register_uri( "GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode() From a79256fec4e313d40ad7b500fdb796c30994f3a2 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Wed, 5 Mar 2025 11:30:27 -0500 Subject: [PATCH 10/12] Fix ai suggestion --- .../sources/declarative/decoders/test_composite_decoder.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index b209a150d..abecdb74c 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -226,9 +226,8 @@ def test_composite_raw_decoder_csv_parser_without_mocked_response(): httpd = HTTPServer(("localhost", 8080), TestServer) thread = Thread(target=httpd.serve_forever, args=()) thread.start() - thread.start() try: - response = requests.get(f"http://localhost:{port}", stream=True) + response = requests.get(f"http://localhost:8080", stream=True) result = list(CompositeRawDecoder(parser=CsvParser()).decode(response)) assert len(result) == 1 From 2af9ed2c2aaf2fb70b8761aa594f090197817350 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Thu, 6 Mar 2025 12:21:58 -0500 Subject: [PATCH 11/12] find port dynamically --- .../declarative/decoders/test_composite_decoder.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index abecdb74c..41999ef22 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -4,6 +4,7 @@ import csv import gzip import json +import socket from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO, StringIO from threading import Thread @@ -22,6 +23,12 @@ from airbyte_cdk.utils import AirbyteTracedException +def find_available_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('localhost', 0)) + return s.getsockname()[1] # type: ignore # this should return a int + + def compress_with_gzip(data: str, encoding: str = "utf-8"): """ Compress the data using Gzip. @@ -223,11 +230,12 @@ def test_composite_raw_decoder_csv_parser_without_mocked_response(): # self.wfile.write(bytes('John,Doe,120 jefferson st.,Riverside, NJ, 08075\nJack,McGinnis,220 hobo Av.,Phila, PA,09119\n"John ""Da Man""",Repici,120 Jefferson St.,Riverside, NJ,08075\nStephen,Tyler,"7452 Terrace ""At the Plaza"" road",SomeTown,SD, 91234\n,Blankman,,SomeTown, SD, 00298\n"Joan ""the bone"", Anne",Jet,"9th, at Terrace plc",Desert City,CO,00123\n', "utf-8")) # start server - httpd = HTTPServer(("localhost", 8080), TestServer) + port = find_available_port() + httpd = HTTPServer(("localhost", port), TestServer) thread = Thread(target=httpd.serve_forever, args=()) thread.start() try: - response = requests.get(f"http://localhost:8080", stream=True) + response = requests.get(f"http://localhost:{port}", stream=True) result = list(CompositeRawDecoder(parser=CsvParser()).decode(response)) assert len(result) == 1 From a28c3b384ddce1622e262589e838116ed286e69c Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 6 Mar 2025 17:36:43 +0000 Subject: [PATCH 12/12] Auto-fix lint and format issues --- .../sources/declarative/decoders/test_composite_decoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 41999ef22..39e74d8e6 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -25,7 +25,7 @@ def find_available_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('localhost', 0)) + s.bind(("localhost", 0)) return s.getsockname()[1] # type: ignore # this should return a int