From a6658e6ead091bf2dce5de0a49926c39beacfbd2 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 17 Apr 2025 08:13:16 -0700 Subject: [PATCH 01/17] connector builder: initial changes to pass file reference info to data --- .../parsers/model_to_component_factory.py | 11 +++- .../declarative/retrievers/file_uploader.py | 61 +++++++++++++++++-- .../declarative/yaml_declarative_source.py | 2 + .../declarative/file/test_file_stream.py | 49 ++++++++++++++- 4 files changed, 114 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 466b661f1..00753c7f7 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -481,7 +481,8 @@ SimpleRetriever, SimpleRetrieverTestReadDecorator, ) -from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader +from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader, FileWriter, NoopFileWriter, \ + ConnectorBuilderFileUploader, BaseFileUploader from airbyte_cdk.sources.declarative.schema import ( ComplexFieldType, DefaultSchemaLoader, @@ -3592,7 +3593,7 @@ def create_fixed_window_call_rate_policy( def create_file_uploader( self, model: FileUploaderModel, config: Config, **kwargs: Any - ) -> FileUploader: + ) -> BaseFileUploader: name = "File Uploader" requester = self._create_component_from_model( model=model.requester, @@ -3606,14 +3607,18 @@ def create_file_uploader( name=name, **kwargs, ) - return FileUploader( + emit_connector_builder_messages = self._emit_connector_builder_messages + file_uploader = FileUploader( requester=requester, download_target_extractor=download_target_extractor, config=config, + file_writer=NoopFileWriter() if emit_connector_builder_messages else FileWriter(), parameters=model.parameters or {}, filename_extractor=model.filename_extractor if model.filename_extractor else None, ) + return ConnectorBuilderFileUploader(file_uploader) if emit_connector_builder_messages else file_uploader + def create_moving_window_call_rate_policy( self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any ) -> MovingWindowCallRatePolicy: diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py index 98342e1af..b025dde2b 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py @@ -9,6 +9,7 @@ from pathlib import Path from typing import Any, Mapping, Optional, Union +from abc import ABC, abstractmethod from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.interpolation.interpolated_string import ( @@ -24,12 +25,56 @@ logger = logging.getLogger("airbyte") +@dataclass +class BaseFileUploader(ABC): + """ + Base class for file uploader + """ + + @abstractmethod + def upload(self, record: Record) -> None: + """ + Uploads the file to the specified location + """ + ... + +class BaseFileWriter(ABC): + """ + Base File writer class + """ + + @abstractmethod + def write(self, file_path: Path, content: bytes) -> int: + """ + Writes the file to the specified location + """ + ... + +class FileWriter(BaseFileWriter): + + def write(self, file_path: Path, content: bytes) -> int: + """ + Writes the file to the specified location + """ + with open(str(file_path), "wb") as f: + f.write(content) + + return file_path.stat().st_size + +class NoopFileWriter(BaseFileWriter): + + def write(self, file_path: Path, content: bytes) -> int: + """ + Noop file writer + """ + return 0 @dataclass -class FileUploader: +class FileUploader(BaseFileUploader): requester: Requester download_target_extractor: RecordExtractor config: Config + file_writer: BaseFileWriter parameters: InitVar[Mapping[str, Any]] filename_extractor: Optional[Union[InterpolatedString, str]] = None @@ -77,9 +122,7 @@ def upload(self, record: Record) -> None: full_path = files_directory / file_relative_path full_path.parent.mkdir(parents=True, exist_ok=True) - with open(str(full_path), "wb") as f: - f.write(response.content) - file_size_bytes = full_path.stat().st_size + file_size_bytes = self.file_writer.write(full_path, content=response.content) logger.info("File uploaded successfully") logger.info(f"File url: {str(full_path)}") @@ -91,3 +134,13 @@ def upload(self, record: Record) -> None: source_file_relative_path=str(file_relative_path), file_size_bytes=file_size_bytes, ) + + +@dataclass +class ConnectorBuilderFileUploader(BaseFileUploader): + file_uploader: FileUploader + + def upload(self, record: Record) -> None: + self.file_uploader.upload(record=record) + for file_reference_attribute in [file_reference_attribute for file_reference_attribute in record.file_reference.__dict__ if not file_reference_attribute.startswith('_')]: + record.data[file_reference_attribute] = getattr(record.file_reference, file_reference_attribute) diff --git a/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 93bdc55e9..ce902e1c4 100644 --- a/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -24,6 +24,7 @@ def __init__( catalog: Optional[ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[AirbyteStateMessage]] = None, + emit_connector_builder_messages: Optional[bool] = False ) -> None: """ :param path_to_yaml: Path to the yaml file describing the source @@ -36,6 +37,7 @@ def __init__( config=config or {}, state=state or [], source_config=source_config, + emit_connector_builder_messages=emit_connector_builder_messages ) def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition: diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index e6ee40d5b..1adf628da 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional from unittest import TestCase -from unittest.mock import Mock +from unittest.mock import Mock, patch from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource @@ -34,6 +34,7 @@ def _source( config: Dict[str, Any], state: Optional[List[AirbyteStateMessage]] = None, yaml_file: Optional[str] = None, + emit_connector_builder_messages: Optional[bool] = False ) -> YamlDeclarativeSource: if not yaml_file: yaml_file = "file_stream_manifest.yaml" @@ -42,6 +43,7 @@ def _source( catalog=catalog, config=config, state=state, + emit_connector_builder_messages=emit_connector_builder_messages ) @@ -51,11 +53,12 @@ def read( state_builder: Optional[StateBuilder] = None, expecting_exception: bool = False, yaml_file: Optional[str] = None, + emit_connector_builder_messages: Optional[bool] = False ) -> EntrypointOutput: config = config_builder.build() state = state_builder.build() if state_builder else StateBuilder().build() return entrypoint_read( - _source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception + _source(catalog, config, state, yaml_file, emit_connector_builder_messages), config, catalog, state, expecting_exception ) @@ -190,6 +193,48 @@ def test_get_article_attachments_with_filename_extractor(self) -> None: ) assert file_reference.file_size_bytes + def test_get_article_attachments_messages_for_connector_builder(self) -> None: + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url=STREAM_URL), + HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200), + ) + http_mocker.get( + HttpRequest(url=STREAM_ATTACHMENTS_URL), + HttpResponse( + json.dumps(find_template("file_api/article_attachments", __file__)), 200 + ), + ) + http_mocker.get( + HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL), + HttpResponse( + find_binary_response("file_api/article_attachment_content.png", __file__), 200 + ), + ) + + output = read( + self._config(), + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) + .build(), + yaml_file="test_file_stream_with_filename_extractor.yaml", + emit_connector_builder_messages=True, + ) + + assert len(output.records) == 1 + file_reference = output.records[0].record.file_reference + assert file_reference + assert file_reference.staging_file_url + assert file_reference.source_file_relative_path + # because we didn't write the file, the size is 0 + assert file_reference.file_size_bytes == 0 + + # Assert file reference fields are copied to record data + record_data = output.records[0].record.data + assert record_data["staging_file_url"] == file_reference.staging_file_url + assert record_data["source_file_relative_path"] == file_reference.source_file_relative_path + assert record_data["file_size_bytes"] == file_reference.file_size_bytes + def test_discover_article_attachments(self) -> None: output = discover(self._config()) From 66c1f7d78a3dd6443c9773f5ce0512e414b98f5f Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 17 Apr 2025 16:09:10 -0700 Subject: [PATCH 02/17] file-mode-api: refactor classes to independent files --- .../retrievers/file_uploader/__init__.py | 8 +++ .../file_uploader/base_file_uploader.py | 22 +++++++ .../file_uploader/base_file_writer.py | 19 ++++++ .../connector_builder_file_uploader.py | 22 +++++++ .../{ => file_uploader}/file_uploader.py | 58 +++---------------- .../retrievers/file_uploader/file_writer.py | 19 ++++++ .../file_uploader/noop_file_writer.py | 16 +++++ 7 files changed, 114 insertions(+), 50 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py create mode 100644 airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py create mode 100644 airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_writer.py create mode 100644 airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py rename airbyte_cdk/sources/declarative/retrievers/{ => file_uploader}/file_uploader.py (71%) create mode 100644 airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py create mode 100644 airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py new file mode 100644 index 000000000..e6102c93a --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py @@ -0,0 +1,8 @@ +from .file_uploader import FileUploader +from .base_file_uploader import BaseFileUploader +from .base_file_writer import BaseFileWriter +from .connector_builder_file_uploader import ConnectorBuilderFileUploader +from .noop_file_writer import NoopFileWriter +from .file_writer import FileWriter + +__all__ = ["FileUploader", "FileWriter", "NoopFileWriter", "ConnectorBuilderFileUploader", "BaseFileUploader", "BaseFileWriter"] \ No newline at end of file diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py new file mode 100644 index 000000000..c4014b163 --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass + +from abc import ABC, abstractmethod +from airbyte_cdk.sources.declarative.types import Record + + +@dataclass +class BaseFileUploader(ABC): + """ + Base class for file uploader + """ + + @abstractmethod + def upload(self, record: Record) -> None: + """ + Uploads the file to the specified location + """ + ... \ No newline at end of file diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_writer.py new file mode 100644 index 000000000..33ba85fa3 --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_writer.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from pathlib import Path + +from abc import ABC, abstractmethod + +class BaseFileWriter(ABC): + """ + Base File writer class + """ + + @abstractmethod + def write(self, file_path: Path, content: bytes) -> int: + """ + Writes the file to the specified location + """ + ... \ No newline at end of file diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py new file mode 100644 index 000000000..ae3475550 --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass + +from airbyte_cdk.sources.declarative.retrievers.file_uploader import BaseFileUploader, FileUploader +from airbyte_cdk.sources.declarative.types import Record + + +@dataclass +class ConnectorBuilderFileUploader(BaseFileUploader): + """ + Connector builder file uploader + Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data. + """ + file_uploader: FileUploader + + def upload(self, record: Record) -> None: + self.file_uploader.upload(record=record) + for file_reference_attribute in [file_reference_attribute for file_reference_attribute in record.file_reference.__dict__ if not file_reference_attribute.startswith('_')]: + record.data[file_reference_attribute] = getattr(record.file_reference, file_reference_attribute) \ No newline at end of file diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py similarity index 71% rename from airbyte_cdk/sources/declarative/retrievers/file_uploader.py rename to airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py index b025dde2b..da34fac55 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py @@ -9,7 +9,6 @@ from pathlib import Path from typing import Any, Mapping, Optional, Union -from abc import ABC, abstractmethod from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.interpolation.interpolated_string import ( @@ -23,54 +22,20 @@ from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.utils.files_directory import get_files_directory -logger = logging.getLogger("airbyte") - -@dataclass -class BaseFileUploader(ABC): - """ - Base class for file uploader - """ - - @abstractmethod - def upload(self, record: Record) -> None: - """ - Uploads the file to the specified location - """ - ... - -class BaseFileWriter(ABC): - """ - Base File writer class - """ - - @abstractmethod - def write(self, file_path: Path, content: bytes) -> int: - """ - Writes the file to the specified location - """ - ... - -class FileWriter(BaseFileWriter): +from .base_file_uploader import BaseFileUploader +from .base_file_writer import BaseFileWriter - def write(self, file_path: Path, content: bytes) -> int: - """ - Writes the file to the specified location - """ - with open(str(file_path), "wb") as f: - f.write(content) - - return file_path.stat().st_size +logger = logging.getLogger("airbyte") -class NoopFileWriter(BaseFileWriter): - def write(self, file_path: Path, content: bytes) -> int: - """ - Noop file writer - """ - return 0 @dataclass class FileUploader(BaseFileUploader): + """ + File uploader class + Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write() + Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies. + """ requester: Requester download_target_extractor: RecordExtractor config: Config @@ -136,11 +101,4 @@ def upload(self, record: Record) -> None: ) -@dataclass -class ConnectorBuilderFileUploader(BaseFileUploader): - file_uploader: FileUploader - def upload(self, record: Record) -> None: - self.file_uploader.upload(record=record) - for file_reference_attribute in [file_reference_attribute for file_reference_attribute in record.file_reference.__dict__ if not file_reference_attribute.startswith('_')]: - record.data[file_reference_attribute] = getattr(record.file_reference, file_reference_attribute) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py new file mode 100644 index 000000000..47914bb10 --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from pathlib import Path + +from airbyte_cdk.sources.declarative.retrievers.file_uploader import BaseFileWriter + + +class FileWriter(BaseFileWriter): + + def write(self, file_path: Path, content: bytes) -> int: + """ + Writes the file to the specified location + """ + with open(str(file_path), "wb") as f: + f.write(content) + + return file_path.stat().st_size \ No newline at end of file diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py new file mode 100644 index 000000000..edbc68521 --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from pathlib import Path + +from airbyte_cdk.sources.declarative.retrievers.file_uploader import BaseFileWriter + + +class NoopFileWriter(BaseFileWriter): + + def write(self, file_path: Path, content: bytes) -> int: + """ + Noop file writer + """ + return 0 \ No newline at end of file From 5aaa9fcdd3864e0e3dfe9068b030533de6fce4b4 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 17 Apr 2025 23:11:53 +0000 Subject: [PATCH 03/17] Auto-fix lint and format issues --- .../parsers/model_to_component_factory.py | 15 ++++++++++++--- .../retrievers/file_uploader/__init__.py | 13 ++++++++++--- .../file_uploader/base_file_uploader.py | 4 ++-- .../retrievers/file_uploader/base_file_writer.py | 4 ++-- .../connector_builder_file_uploader.py | 11 +++++++++-- .../retrievers/file_uploader/file_uploader.py | 5 +---- .../retrievers/file_uploader/file_writer.py | 3 +-- .../retrievers/file_uploader/noop_file_writer.py | 3 +-- .../declarative/yaml_declarative_source.py | 4 ++-- .../sources/declarative/file/test_file_stream.py | 16 +++++++++++----- 10 files changed, 51 insertions(+), 27 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 00753c7f7..5299c85b1 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -481,8 +481,13 @@ SimpleRetriever, SimpleRetrieverTestReadDecorator, ) -from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader, FileWriter, NoopFileWriter, \ - ConnectorBuilderFileUploader, BaseFileUploader +from airbyte_cdk.sources.declarative.retrievers.file_uploader import ( + BaseFileUploader, + ConnectorBuilderFileUploader, + FileUploader, + FileWriter, + NoopFileWriter, +) from airbyte_cdk.sources.declarative.schema import ( ComplexFieldType, DefaultSchemaLoader, @@ -3617,7 +3622,11 @@ def create_file_uploader( filename_extractor=model.filename_extractor if model.filename_extractor else None, ) - return ConnectorBuilderFileUploader(file_uploader) if emit_connector_builder_messages else file_uploader + return ( + ConnectorBuilderFileUploader(file_uploader) + if emit_connector_builder_messages + else file_uploader + ) def create_moving_window_call_rate_policy( self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py index e6102c93a..69e3e4bbf 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py @@ -1,8 +1,15 @@ -from .file_uploader import FileUploader from .base_file_uploader import BaseFileUploader from .base_file_writer import BaseFileWriter from .connector_builder_file_uploader import ConnectorBuilderFileUploader -from .noop_file_writer import NoopFileWriter +from .file_uploader import FileUploader from .file_writer import FileWriter +from .noop_file_writer import NoopFileWriter -__all__ = ["FileUploader", "FileWriter", "NoopFileWriter", "ConnectorBuilderFileUploader", "BaseFileUploader", "BaseFileWriter"] \ No newline at end of file +__all__ = [ + "FileUploader", + "FileWriter", + "NoopFileWriter", + "ConnectorBuilderFileUploader", + "BaseFileUploader", + "BaseFileWriter", +] diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py index c4014b163..124997e32 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py @@ -2,9 +2,9 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. # +from abc import ABC, abstractmethod from dataclasses import dataclass -from abc import ABC, abstractmethod from airbyte_cdk.sources.declarative.types import Record @@ -19,4 +19,4 @@ def upload(self, record: Record) -> None: """ Uploads the file to the specified location """ - ... \ No newline at end of file + ... diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_writer.py index 33ba85fa3..1cdcb1d31 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_writer.py @@ -2,9 +2,9 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. # +from abc import ABC, abstractmethod from pathlib import Path -from abc import ABC, abstractmethod class BaseFileWriter(ABC): """ @@ -16,4 +16,4 @@ def write(self, file_path: Path, content: bytes) -> int: """ Writes the file to the specified location """ - ... \ No newline at end of file + ... diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py index ae3475550..7f791f5c5 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py @@ -14,9 +14,16 @@ class ConnectorBuilderFileUploader(BaseFileUploader): Connector builder file uploader Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data. """ + file_uploader: FileUploader def upload(self, record: Record) -> None: self.file_uploader.upload(record=record) - for file_reference_attribute in [file_reference_attribute for file_reference_attribute in record.file_reference.__dict__ if not file_reference_attribute.startswith('_')]: - record.data[file_reference_attribute] = getattr(record.file_reference, file_reference_attribute) \ No newline at end of file + for file_reference_attribute in [ + file_reference_attribute + for file_reference_attribute in record.file_reference.__dict__ + if not file_reference_attribute.startswith("_") + ]: + record.data[file_reference_attribute] = getattr( + record.file_reference, file_reference_attribute + ) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py index da34fac55..858e18c58 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py @@ -28,7 +28,6 @@ logger = logging.getLogger("airbyte") - @dataclass class FileUploader(BaseFileUploader): """ @@ -36,6 +35,7 @@ class FileUploader(BaseFileUploader): Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write() Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies. """ + requester: Requester download_target_extractor: RecordExtractor config: Config @@ -99,6 +99,3 @@ def upload(self, record: Record) -> None: source_file_relative_path=str(file_relative_path), file_size_bytes=file_size_bytes, ) - - - diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py index 47914bb10..6ac4a93fa 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py @@ -8,7 +8,6 @@ class FileWriter(BaseFileWriter): - def write(self, file_path: Path, content: bytes) -> int: """ Writes the file to the specified location @@ -16,4 +15,4 @@ def write(self, file_path: Path, content: bytes) -> int: with open(str(file_path), "wb") as f: f.write(content) - return file_path.stat().st_size \ No newline at end of file + return file_path.stat().st_size diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py index edbc68521..81a9c190c 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py @@ -8,9 +8,8 @@ class NoopFileWriter(BaseFileWriter): - def write(self, file_path: Path, content: bytes) -> int: """ Noop file writer """ - return 0 \ No newline at end of file + return 0 diff --git a/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte_cdk/sources/declarative/yaml_declarative_source.py index ce902e1c4..ac50a1346 100644 --- a/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -24,7 +24,7 @@ def __init__( catalog: Optional[ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[AirbyteStateMessage]] = None, - emit_connector_builder_messages: Optional[bool] = False + emit_connector_builder_messages: Optional[bool] = False, ) -> None: """ :param path_to_yaml: Path to the yaml file describing the source @@ -37,7 +37,7 @@ def __init__( config=config or {}, state=state or [], source_config=source_config, - emit_connector_builder_messages=emit_connector_builder_messages + emit_connector_builder_messages=emit_connector_builder_messages, ) def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition: diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index 1adf628da..47963e50f 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -34,7 +34,7 @@ def _source( config: Dict[str, Any], state: Optional[List[AirbyteStateMessage]] = None, yaml_file: Optional[str] = None, - emit_connector_builder_messages: Optional[bool] = False + emit_connector_builder_messages: Optional[bool] = False, ) -> YamlDeclarativeSource: if not yaml_file: yaml_file = "file_stream_manifest.yaml" @@ -43,7 +43,7 @@ def _source( catalog=catalog, config=config, state=state, - emit_connector_builder_messages=emit_connector_builder_messages + emit_connector_builder_messages=emit_connector_builder_messages, ) @@ -53,12 +53,16 @@ def read( state_builder: Optional[StateBuilder] = None, expecting_exception: bool = False, yaml_file: Optional[str] = None, - emit_connector_builder_messages: Optional[bool] = False + emit_connector_builder_messages: Optional[bool] = False, ) -> EntrypointOutput: config = config_builder.build() state = state_builder.build() if state_builder else StateBuilder().build() return entrypoint_read( - _source(catalog, config, state, yaml_file, emit_connector_builder_messages), config, catalog, state, expecting_exception + _source(catalog, config, state, yaml_file, emit_connector_builder_messages), + config, + catalog, + state, + expecting_exception, ) @@ -232,7 +236,9 @@ def test_get_article_attachments_messages_for_connector_builder(self) -> None: # Assert file reference fields are copied to record data record_data = output.records[0].record.data assert record_data["staging_file_url"] == file_reference.staging_file_url - assert record_data["source_file_relative_path"] == file_reference.source_file_relative_path + assert ( + record_data["source_file_relative_path"] == file_reference.source_file_relative_path + ) assert record_data["file_size_bytes"] == file_reference.file_size_bytes def test_discover_article_attachments(self) -> None: From 3ff54b71208c9942ae25d0c6044a1336297e1b8a Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 17 Apr 2025 16:21:11 -0700 Subject: [PATCH 04/17] file-mode-api: fix imports --- .../file_uploader/connector_builder_file_uploader.py | 4 ++-- .../declarative/retrievers/file_uploader/file_writer.py | 3 +-- .../declarative/retrievers/file_uploader/noop_file_writer.py | 3 +-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py index 7f791f5c5..4fe064860 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py @@ -4,9 +4,9 @@ from dataclasses import dataclass -from airbyte_cdk.sources.declarative.retrievers.file_uploader import BaseFileUploader, FileUploader from airbyte_cdk.sources.declarative.types import Record - +from .base_file_uploader import BaseFileUploader +from .file_uploader import FileUploader @dataclass class ConnectorBuilderFileUploader(BaseFileUploader): diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py index 6ac4a93fa..128485b32 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py @@ -4,8 +4,7 @@ from pathlib import Path -from airbyte_cdk.sources.declarative.retrievers.file_uploader import BaseFileWriter - +from .base_file_writer import BaseFileWriter class FileWriter(BaseFileWriter): def write(self, file_path: Path, content: bytes) -> int: diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py index 81a9c190c..8db299e2b 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py @@ -4,8 +4,7 @@ from pathlib import Path -from airbyte_cdk.sources.declarative.retrievers.file_uploader import BaseFileWriter - +from .file_uploader import BaseFileWriter class NoopFileWriter(BaseFileWriter): def write(self, file_path: Path, content: bytes) -> int: From c568be281feb8776b88560ccc69ba13cb9f9f40b Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 17 Apr 2025 23:22:13 +0000 Subject: [PATCH 05/17] Auto-fix lint and format issues --- .../retrievers/file_uploader/connector_builder_file_uploader.py | 2 ++ .../sources/declarative/retrievers/file_uploader/file_writer.py | 1 + .../declarative/retrievers/file_uploader/noop_file_writer.py | 1 + 3 files changed, 4 insertions(+) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py index 4fe064860..14f162848 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py @@ -5,9 +5,11 @@ from dataclasses import dataclass from airbyte_cdk.sources.declarative.types import Record + from .base_file_uploader import BaseFileUploader from .file_uploader import FileUploader + @dataclass class ConnectorBuilderFileUploader(BaseFileUploader): """ diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py index 128485b32..eff6333d0 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py @@ -6,6 +6,7 @@ from .base_file_writer import BaseFileWriter + class FileWriter(BaseFileWriter): def write(self, file_path: Path, content: bytes) -> int: """ diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py index 8db299e2b..a4257bc71 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py @@ -6,6 +6,7 @@ from .file_uploader import BaseFileWriter + class NoopFileWriter(BaseFileWriter): def write(self, file_path: Path, content: bytes) -> int: """ From 52abace68796a2ed19f5d82fa09f06503d586de0 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 17 Apr 2025 17:00:35 -0700 Subject: [PATCH 06/17] file-api: fix mypy typing --- .../retrievers/file_uploader/connector_builder_file_uploader.py | 2 +- airbyte_cdk/sources/declarative/yaml_declarative_source.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py index 14f162848..bfa40d911 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py @@ -26,6 +26,6 @@ def upload(self, record: Record) -> None: for file_reference_attribute in record.file_reference.__dict__ if not file_reference_attribute.startswith("_") ]: - record.data[file_reference_attribute] = getattr( + record.data[file_reference_attribute] = getattr( # type: ignore record.file_reference, file_reference_attribute ) diff --git a/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte_cdk/sources/declarative/yaml_declarative_source.py index ac50a1346..87703e5ff 100644 --- a/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -37,7 +37,7 @@ def __init__( config=config or {}, state=state or [], source_config=source_config, - emit_connector_builder_messages=emit_connector_builder_messages, + emit_connector_builder_messages=emit_connector_builder_messages or False, ) def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition: From 6136e2cd304ddb80abe79410b410375b8721ce03 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 18 Apr 2025 00:02:18 +0000 Subject: [PATCH 07/17] Auto-fix lint and format issues --- .../retrievers/file_uploader/connector_builder_file_uploader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py index bfa40d911..be6c9c2d4 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py @@ -26,6 +26,6 @@ def upload(self, record: Record) -> None: for file_reference_attribute in record.file_reference.__dict__ if not file_reference_attribute.startswith("_") ]: - record.data[file_reference_attribute] = getattr( # type: ignore + record.data[file_reference_attribute] = getattr( # type: ignore record.file_reference, file_reference_attribute ) From 0fbf122195cecacaf15c17625463b1668dc6d796 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Fri, 18 Apr 2025 08:43:38 -0700 Subject: [PATCH 08/17] file-api: minor changes to connector builder file uploader --- .../file_uploader/connector_builder_file_uploader.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py index be6c9c2d4..fdc858e70 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py @@ -21,11 +21,6 @@ class ConnectorBuilderFileUploader(BaseFileUploader): def upload(self, record: Record) -> None: self.file_uploader.upload(record=record) - for file_reference_attribute in [ - file_reference_attribute - for file_reference_attribute in record.file_reference.__dict__ - if not file_reference_attribute.startswith("_") - ]: - record.data[file_reference_attribute] = getattr( # type: ignore - record.file_reference, file_reference_attribute - ) + for file_reference_key, file_reference_value in record.file_reference.__dict__.items(): + if not file_reference_key.startswith("_"): + record.data[file_reference_key] = file_reference_value # type: ignore From 1e61e193bbda4b94a9cb3d306f2b2b923ed141f7 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez <168454423+aldogonzalez8@users.noreply.github.com> Date: Wed, 23 Apr 2025 11:41:22 -0600 Subject: [PATCH 09/17] Update airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .../declarative/retrievers/file_uploader/noop_file_writer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py index a4257bc71..f10aaf03a 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py @@ -4,7 +4,8 @@ from pathlib import Path -from .file_uploader import BaseFileWriter +-from .file_uploader import BaseFileWriter ++from .base_file_writer import BaseFileWriter class NoopFileWriter(BaseFileWriter): From 06e55813c7f395feff8aa521d851c0ebeaafe4b1 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez <168454423+aldogonzalez8@users.noreply.github.com> Date: Wed, 23 Apr 2025 11:42:12 -0600 Subject: [PATCH 10/17] Update unit_tests/sources/declarative/file/test_file_stream.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- unit_tests/sources/declarative/file/test_file_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index 47963e50f..82c258bbc 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional from unittest import TestCase -from unittest.mock import Mock, patch +from unittest.mock import Mock from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource From 5d113ec8e899fa8eb674b604fb1037f5b7282212 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Wed, 23 Apr 2025 10:58:43 -0700 Subject: [PATCH 11/17] file-api: fix coderabbit messing --- .../declarative/retrievers/file_uploader/noop_file_writer.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py index f10aaf03a..3860bf4a9 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py @@ -4,8 +4,7 @@ from pathlib import Path --from .file_uploader import BaseFileWriter -+from .base_file_writer import BaseFileWriter +from .base_file_writer import BaseFileWriter class NoopFileWriter(BaseFileWriter): From e860e3f982752f24cdde07e52e4529763327e0b6 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Wed, 23 Apr 2025 15:36:30 -0700 Subject: [PATCH 12/17] file-mode-api: fix classes and names to fit our vocabulary. --- .../declarative/extractors/record_selector.py | 4 +- .../parsers/model_to_component_factory.py | 16 +-- .../retrievers/file_uploader/__init__.py | 14 +-- .../file_uploader/base_file_uploader.py | 22 ---- .../connector_builder_file_uploader.py | 6 +- .../file_uploader/default_file_uploader.py | 101 ++++++++++++++++++ .../retrievers/file_uploader/file_uploader.py | 99 ++--------------- .../retrievers/file_uploader/file_writer.py | 13 +-- ...er.py => local_file_system_file_writer.py} | 13 ++- .../file_uploader/noop_file_writer.py | 7 +- .../declarative/file/test_file_stream.py | 4 +- 11 files changed, 150 insertions(+), 149 deletions(-) delete mode 100644 airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py create mode 100644 airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py rename airbyte_cdk/sources/declarative/retrievers/file_uploader/{base_file_writer.py => local_file_system_file_writer.py} (53%) diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index 73e854076..80d0155e3 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -15,7 +15,7 @@ ) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import SchemaNormalization -from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader +from airbyte_cdk.sources.declarative.retrievers.file_uploader import DefaultFileUploader from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState from airbyte_cdk.sources.utils.transform import TypeTransformer @@ -43,7 +43,7 @@ class RecordSelector(HttpSelector): record_filter: Optional[RecordFilter] = None transformations: List[RecordTransformation] = field(default_factory=lambda: []) transform_before_filtering: bool = False - file_uploader: Optional[FileUploader] = None + file_uploader: Optional[DefaultFileUploader] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 5299c85b1..4416f9d8c 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -482,10 +482,10 @@ SimpleRetrieverTestReadDecorator, ) from airbyte_cdk.sources.declarative.retrievers.file_uploader import ( - BaseFileUploader, - ConnectorBuilderFileUploader, FileUploader, - FileWriter, + ConnectorBuilderFileUploader, + DefaultFileUploader, + LocalFileSystemFileWriter, NoopFileWriter, ) from airbyte_cdk.sources.declarative.schema import ( @@ -2812,7 +2812,7 @@ def create_record_selector( transformations: List[RecordTransformation] | None = None, decoder: Decoder | None = None, client_side_incremental_sync: Dict[str, Any] | None = None, - file_uploader: Optional[FileUploader] = None, + file_uploader: Optional[DefaultFileUploader] = None, **kwargs: Any, ) -> RecordSelector: extractor = self._create_component_from_model( @@ -2908,7 +2908,7 @@ def create_simple_retriever( stop_condition_on_cursor: bool = False, client_side_incremental_sync: Optional[Dict[str, Any]] = None, transformations: List[RecordTransformation], - file_uploader: Optional[FileUploader] = None, + file_uploader: Optional[DefaultFileUploader] = None, incremental_sync: Optional[ Union[ IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel @@ -3598,7 +3598,7 @@ def create_fixed_window_call_rate_policy( def create_file_uploader( self, model: FileUploaderModel, config: Config, **kwargs: Any - ) -> BaseFileUploader: + ) -> FileUploader: name = "File Uploader" requester = self._create_component_from_model( model=model.requester, @@ -3613,11 +3613,11 @@ def create_file_uploader( **kwargs, ) emit_connector_builder_messages = self._emit_connector_builder_messages - file_uploader = FileUploader( + file_uploader = DefaultFileUploader( requester=requester, download_target_extractor=download_target_extractor, config=config, - file_writer=NoopFileWriter() if emit_connector_builder_messages else FileWriter(), + file_writer=NoopFileWriter() if emit_connector_builder_messages else LocalFileSystemFileWriter(), parameters=model.parameters or {}, filename_extractor=model.filename_extractor if model.filename_extractor else None, ) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py index 69e3e4bbf..267477678 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py @@ -1,15 +1,15 @@ -from .base_file_uploader import BaseFileUploader -from .base_file_writer import BaseFileWriter -from .connector_builder_file_uploader import ConnectorBuilderFileUploader from .file_uploader import FileUploader from .file_writer import FileWriter +from .connector_builder_file_uploader import ConnectorBuilderFileUploader +from .default_file_uploader import DefaultFileUploader +from .local_file_system_file_writer import LocalFileSystemFileWriter from .noop_file_writer import NoopFileWriter __all__ = [ - "FileUploader", - "FileWriter", + "DefaultFileUploader", + "LocalFileSystemFileWriter", "NoopFileWriter", "ConnectorBuilderFileUploader", - "BaseFileUploader", - "BaseFileWriter", + "FileUploader", + "FileWriter", ] diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py deleted file mode 100644 index 124997e32..000000000 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py +++ /dev/null @@ -1,22 +0,0 @@ -# -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. -# - -from abc import ABC, abstractmethod -from dataclasses import dataclass - -from airbyte_cdk.sources.declarative.types import Record - - -@dataclass -class BaseFileUploader(ABC): - """ - Base class for file uploader - """ - - @abstractmethod - def upload(self, record: Record) -> None: - """ - Uploads the file to the specified location - """ - ... diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py index fdc858e70..438917c37 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py @@ -6,18 +6,18 @@ from airbyte_cdk.sources.declarative.types import Record -from .base_file_uploader import BaseFileUploader from .file_uploader import FileUploader +from .default_file_uploader import DefaultFileUploader @dataclass -class ConnectorBuilderFileUploader(BaseFileUploader): +class ConnectorBuilderFileUploader(FileUploader): """ Connector builder file uploader Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data. """ - file_uploader: FileUploader + file_uploader: DefaultFileUploader def upload(self, record: Record) -> None: self.file_uploader.upload(record=record) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py new file mode 100644 index 000000000..db2d38906 --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py @@ -0,0 +1,101 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +import json +import logging +import uuid +from dataclasses import InitVar, dataclass, field +from pathlib import Path +from typing import Any, Mapping, Optional, Union + +from airbyte_cdk.models import AirbyteRecordMessageFileReference +from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import ( + InterpolatedString, +) +from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( + SafeResponse, +) +from airbyte_cdk.sources.declarative.requesters import Requester +from airbyte_cdk.sources.declarative.types import Record, StreamSlice +from airbyte_cdk.sources.types import Config +from airbyte_cdk.sources.utils.files_directory import get_files_directory + +from .file_uploader import FileUploader +from .file_writer import FileWriter + +logger = logging.getLogger("airbyte") + + +@dataclass +class DefaultFileUploader(FileUploader): + """ + File uploader class + Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write() + Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies. + """ + + requester: Requester + download_target_extractor: RecordExtractor + config: Config + file_writer: FileWriter + parameters: InitVar[Mapping[str, Any]] + + filename_extractor: Optional[Union[InterpolatedString, str]] = None + content_extractor: Optional[RecordExtractor] = None + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + if self.filename_extractor: + self.filename_extractor = InterpolatedString.create( + self.filename_extractor, + parameters=parameters, + ) + + def upload(self, record: Record) -> None: + mocked_response = SafeResponse() + mocked_response.content = json.dumps(record.data).encode() + download_targets = list(self.download_target_extractor.extract_records(mocked_response)) + if not download_targets: + raise ValueError("No download targets found") + + download_target = download_targets[0] # we just expect one download target + if not isinstance(download_target, str): + raise ValueError( + f"download_target is expected to be a str but was {type(download_target)}: {download_target}" + ) + + response = self.requester.send_request( + stream_slice=StreamSlice( + partition={}, cursor_slice={}, extra_fields={"download_target": download_target} + ), + ) + + if self.content_extractor: + raise NotImplementedError("Content extraction is not yet implemented. The content_extractor component is currently not supported.") + else: + files_directory = Path(get_files_directory()) + + file_name = ( + self.filename_extractor.eval(self.config, record=record) + if self.filename_extractor + else str(uuid.uuid4()) + ) + file_name = file_name.lstrip("/") + file_relative_path = Path(record.stream_name) / Path(file_name) + + full_path = files_directory / file_relative_path + full_path.parent.mkdir(parents=True, exist_ok=True) + + file_size_bytes = self.file_writer.write(full_path, content=response.content) + + logger.info("File uploaded successfully") + logger.info(f"File url: {str(full_path)}") + logger.info(f"File size: {file_size_bytes / 1024} KB") + logger.info(f"File relative path: {str(file_relative_path)}") + + record.file_reference = AirbyteRecordMessageFileReference( + staging_file_url=str(full_path), + source_file_relative_path=str(file_relative_path), + file_size_bytes=file_size_bytes, + ) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py index 858e18c58..f68de46ef 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py @@ -2,100 +2,21 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. # -import json -import logging -import uuid -from dataclasses import InitVar, dataclass, field -from pathlib import Path -from typing import Any, Mapping, Optional, Union +from abc import ABC, abstractmethod +from dataclasses import dataclass -from airbyte_cdk.models import AirbyteRecordMessageFileReference -from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor -from airbyte_cdk.sources.declarative.interpolation.interpolated_string import ( - InterpolatedString, -) -from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( - SafeResponse, -) -from airbyte_cdk.sources.declarative.requesters import Requester -from airbyte_cdk.sources.declarative.types import Record, StreamSlice -from airbyte_cdk.sources.types import Config -from airbyte_cdk.sources.utils.files_directory import get_files_directory - -from .base_file_uploader import BaseFileUploader -from .base_file_writer import BaseFileWriter - -logger = logging.getLogger("airbyte") +from airbyte_cdk.sources.declarative.types import Record @dataclass -class FileUploader(BaseFileUploader): +class FileUploader(ABC): """ - File uploader class - Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write() - Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies. + Base class for file uploader """ - requester: Requester - download_target_extractor: RecordExtractor - config: Config - file_writer: BaseFileWriter - parameters: InitVar[Mapping[str, Any]] - - filename_extractor: Optional[Union[InterpolatedString, str]] = None - content_extractor: Optional[RecordExtractor] = None - - def __post_init__(self, parameters: Mapping[str, Any]) -> None: - if self.filename_extractor: - self.filename_extractor = InterpolatedString.create( - self.filename_extractor, - parameters=parameters, - ) - + @abstractmethod def upload(self, record: Record) -> None: - mocked_response = SafeResponse() - mocked_response.content = json.dumps(record.data).encode() - download_targets = list(self.download_target_extractor.extract_records(mocked_response)) - if not download_targets: - raise ValueError("No download targets found") - - download_target = download_targets[0] # we just expect one download target - if not isinstance(download_target, str): - raise ValueError( - f"download_target is expected to be a str but was {type(download_target)}: {download_target}" - ) - - response = self.requester.send_request( - stream_slice=StreamSlice( - partition={}, cursor_slice={}, extra_fields={"download_target": download_target} - ), - ) - - if self.content_extractor: - raise NotImplementedError("TODO") - else: - files_directory = Path(get_files_directory()) - - file_name = ( - self.filename_extractor.eval(self.config, record=record) - if self.filename_extractor - else str(uuid.uuid4()) - ) - file_name = file_name.lstrip("/") - file_relative_path = Path(record.stream_name) / Path(file_name) - - full_path = files_directory / file_relative_path - full_path.parent.mkdir(parents=True, exist_ok=True) - - file_size_bytes = self.file_writer.write(full_path, content=response.content) - - logger.info("File uploaded successfully") - logger.info(f"File url: {str(full_path)}") - logger.info(f"File size: {file_size_bytes / 1024} KB") - logger.info(f"File relative path: {str(file_relative_path)}") - - record.file_reference = AirbyteRecordMessageFileReference( - staging_file_url=str(full_path), - source_file_relative_path=str(file_relative_path), - file_size_bytes=file_size_bytes, - ) + """ + Uploads the file to the specified location + """ + ... diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py index eff6333d0..b91923c8a 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py @@ -2,17 +2,18 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. # +from abc import ABC, abstractmethod from pathlib import Path -from .base_file_writer import BaseFileWriter +class FileWriter(ABC): + """ + Base File writer class + """ -class FileWriter(BaseFileWriter): + @abstractmethod def write(self, file_path: Path, content: bytes) -> int: """ Writes the file to the specified location """ - with open(str(file_path), "wb") as f: - f.write(content) - - return file_path.stat().st_size + ... diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/local_file_system_file_writer.py similarity index 53% rename from airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_writer.py rename to airbyte_cdk/sources/declarative/retrievers/file_uploader/local_file_system_file_writer.py index 1cdcb1d31..8880fe1e4 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/local_file_system_file_writer.py @@ -2,18 +2,17 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod from pathlib import Path +from .file_writer import FileWriter -class BaseFileWriter(ABC): - """ - Base File writer class - """ - @abstractmethod +class LocalFileSystemFileWriter(FileWriter): def write(self, file_path: Path, content: bytes) -> int: """ Writes the file to the specified location """ - ... + with open(str(file_path), "wb") as f: + f.write(content) + + return file_path.stat().st_size diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py index 3860bf4a9..e79c3739f 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py @@ -4,12 +4,13 @@ from pathlib import Path -from .base_file_writer import BaseFileWriter +from .file_writer import FileWriter -class NoopFileWriter(BaseFileWriter): +class NoopFileWriter(FileWriter): + NOOP_FILE_SIZE = -1 def write(self, file_path: Path, content: bytes) -> int: """ Noop file writer """ - return 0 + return self.NOOP_FILE_SIZE diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index 82c258bbc..f77f259ac 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -14,7 +14,7 @@ from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse from airbyte_cdk.test.mock_http.response_builder import find_binary_response, find_template from airbyte_cdk.test.state_builder import StateBuilder - +from airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer import NoopFileWriter class ConfigBuilder: def build(self) -> Dict[str, Any]: @@ -231,7 +231,7 @@ def test_get_article_attachments_messages_for_connector_builder(self) -> None: assert file_reference.staging_file_url assert file_reference.source_file_relative_path # because we didn't write the file, the size is 0 - assert file_reference.file_size_bytes == 0 + assert file_reference.file_size_bytes == NoopFileWriter.NOOP_FILE_SIZE # Assert file reference fields are copied to record data record_data = output.records[0].record.data From 3ea1674026016bc50bbcf2e9da52e5445b72c5c7 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Wed, 23 Apr 2025 15:41:16 -0700 Subject: [PATCH 13/17] file-mode-api: run ruff format --- .../sources/declarative/parsers/model_to_component_factory.py | 4 +++- .../retrievers/file_uploader/default_file_uploader.py | 4 +++- .../declarative/retrievers/file_uploader/noop_file_writer.py | 1 + unit_tests/sources/declarative/file/test_file_stream.py | 1 + 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4416f9d8c..e4d10f375 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3617,7 +3617,9 @@ def create_file_uploader( requester=requester, download_target_extractor=download_target_extractor, config=config, - file_writer=NoopFileWriter() if emit_connector_builder_messages else LocalFileSystemFileWriter(), + file_writer=NoopFileWriter() + if emit_connector_builder_messages + else LocalFileSystemFileWriter(), parameters=model.parameters or {}, filename_extractor=model.filename_extractor if model.filename_extractor else None, ) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py index db2d38906..1ef1a98bd 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py @@ -72,7 +72,9 @@ def upload(self, record: Record) -> None: ) if self.content_extractor: - raise NotImplementedError("Content extraction is not yet implemented. The content_extractor component is currently not supported.") + raise NotImplementedError( + "Content extraction is not yet implemented. The content_extractor component is currently not supported." + ) else: files_directory = Path(get_files_directory()) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py index e79c3739f..a074e1787 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py @@ -9,6 +9,7 @@ class NoopFileWriter(FileWriter): NOOP_FILE_SIZE = -1 + def write(self, file_path: Path, content: bytes) -> int: """ Noop file writer diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index f77f259ac..cff68527d 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -16,6 +16,7 @@ from airbyte_cdk.test.state_builder import StateBuilder from airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer import NoopFileWriter + class ConfigBuilder: def build(self) -> Dict[str, Any]: return { From 20359566f521ef8a536217d76ffbd75492121e6f Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Wed, 23 Apr 2025 15:44:15 -0700 Subject: [PATCH 14/17] file-mode-api: run ruff check . --fix --- .../sources/declarative/parsers/model_to_component_factory.py | 2 +- .../sources/declarative/retrievers/file_uploader/__init__.py | 4 ++-- .../file_uploader/connector_builder_file_uploader.py | 2 +- unit_tests/sources/declarative/file/test_file_stream.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index e4d10f375..258b3975c 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -482,9 +482,9 @@ SimpleRetrieverTestReadDecorator, ) from airbyte_cdk.sources.declarative.retrievers.file_uploader import ( - FileUploader, ConnectorBuilderFileUploader, DefaultFileUploader, + FileUploader, LocalFileSystemFileWriter, NoopFileWriter, ) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py index 267477678..e839d9ba1 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py @@ -1,7 +1,7 @@ -from .file_uploader import FileUploader -from .file_writer import FileWriter from .connector_builder_file_uploader import ConnectorBuilderFileUploader from .default_file_uploader import DefaultFileUploader +from .file_uploader import FileUploader +from .file_writer import FileWriter from .local_file_system_file_writer import LocalFileSystemFileWriter from .noop_file_writer import NoopFileWriter diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py index 438917c37..6db5a608f 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py @@ -6,8 +6,8 @@ from airbyte_cdk.sources.declarative.types import Record -from .file_uploader import FileUploader from .default_file_uploader import DefaultFileUploader +from .file_uploader import FileUploader @dataclass diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index cff68527d..f209d6a5e 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -6,6 +6,7 @@ from unittest.mock import Mock from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status +from airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer import NoopFileWriter from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput @@ -14,7 +15,6 @@ from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse from airbyte_cdk.test.mock_http.response_builder import find_binary_response, find_template from airbyte_cdk.test.state_builder import StateBuilder -from airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer import NoopFileWriter class ConfigBuilder: From aa0a832b6ea1c0389a7b93cfbbcb30b42bbb41b2 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Wed, 23 Apr 2025 16:38:14 -0700 Subject: [PATCH 15/17] file-mode-api: Mock the test instead of messing with Classes --- .../declarative/yaml_declarative_source.py | 4 +- .../declarative/file/test_file_stream.py | 68 +++++++++++-------- 2 files changed, 39 insertions(+), 33 deletions(-) diff --git a/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 87703e5ff..0b99a9ea6 100644 --- a/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -24,7 +24,6 @@ def __init__( catalog: Optional[ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[AirbyteStateMessage]] = None, - emit_connector_builder_messages: Optional[bool] = False, ) -> None: """ :param path_to_yaml: Path to the yaml file describing the source @@ -36,8 +35,7 @@ def __init__( catalog=catalog or ConfiguredAirbyteCatalog(streams=[]), config=config or {}, state=state or [], - source_config=source_config, - emit_connector_builder_messages=emit_connector_builder_messages or False, + source_config=source_config ) def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition: diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index f209d6a5e..738d98af5 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -3,9 +3,10 @@ from pathlib import Path from typing import Any, Dict, List, Optional from unittest import TestCase -from unittest.mock import Mock +from unittest.mock import Mock, patch from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status +from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory as OriginalModelToComponentFactory from airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer import NoopFileWriter from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder @@ -35,7 +36,6 @@ def _source( config: Dict[str, Any], state: Optional[List[AirbyteStateMessage]] = None, yaml_file: Optional[str] = None, - emit_connector_builder_messages: Optional[bool] = False, ) -> YamlDeclarativeSource: if not yaml_file: yaml_file = "file_stream_manifest.yaml" @@ -43,8 +43,7 @@ def _source( path_to_yaml=str(Path(__file__).parent / yaml_file), catalog=catalog, config=config, - state=state, - emit_connector_builder_messages=emit_connector_builder_messages, + state=state ) @@ -53,13 +52,12 @@ def read( catalog: ConfiguredAirbyteCatalog, state_builder: Optional[StateBuilder] = None, expecting_exception: bool = False, - yaml_file: Optional[str] = None, - emit_connector_builder_messages: Optional[bool] = False, + yaml_file: Optional[str] = None ) -> EntrypointOutput: config = config_builder.build() state = state_builder.build() if state_builder else StateBuilder().build() return entrypoint_read( - _source(catalog, config, state, yaml_file, emit_connector_builder_messages), + _source(catalog, config, state, yaml_file), config, catalog, state, @@ -185,7 +183,7 @@ def test_get_article_attachments_with_filename_extractor(self) -> None: yaml_file="test_file_stream_with_filename_extractor.yaml", ) - assert output.records + assert len(output.records) == 1 file_reference = output.records[0].record.file_reference assert file_reference assert ( @@ -217,30 +215,40 @@ def test_get_article_attachments_messages_for_connector_builder(self) -> None: ), ) - output = read( - self._config(), - CatalogBuilder() - .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) - .build(), - yaml_file="test_file_stream_with_filename_extractor.yaml", - emit_connector_builder_messages=True, - ) + # Define a mock factory that forces emit_connector_builder_messages=True + class MockModelToComponentFactory(OriginalModelToComponentFactory): + def __init__(self, *args, **kwargs): + kwargs['emit_connector_builder_messages'] = True + super().__init__(*args, **kwargs) - assert len(output.records) == 1 - file_reference = output.records[0].record.file_reference - assert file_reference - assert file_reference.staging_file_url - assert file_reference.source_file_relative_path - # because we didn't write the file, the size is 0 - assert file_reference.file_size_bytes == NoopFileWriter.NOOP_FILE_SIZE + # Patch the factory class where ConcurrentDeclarativeSource (parent of YamlDeclarativeSource) imports it + with patch( + "airbyte_cdk.sources.declarative.concurrent_declarative_source.ModelToComponentFactory", + new=MockModelToComponentFactory + ): + output = read( + self._config(), + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) + .build(), + yaml_file="test_file_stream_with_filename_extractor.yaml" + ) - # Assert file reference fields are copied to record data - record_data = output.records[0].record.data - assert record_data["staging_file_url"] == file_reference.staging_file_url - assert ( - record_data["source_file_relative_path"] == file_reference.source_file_relative_path - ) - assert record_data["file_size_bytes"] == file_reference.file_size_bytes + assert len(output.records) == 1 + file_reference = output.records[0].record.file_reference + assert file_reference + assert file_reference.staging_file_url + assert file_reference.source_file_relative_path + # because we didn't write the file, the size is NOOP_FILE_SIZE + assert file_reference.file_size_bytes == NoopFileWriter.NOOP_FILE_SIZE + + # Assert file reference fields are copied to record data + record_data = output.records[0].record.data + assert record_data["staging_file_url"] == file_reference.staging_file_url + assert ( + record_data["source_file_relative_path"] == file_reference.source_file_relative_path + ) + assert record_data["file_size_bytes"] == file_reference.file_size_bytes def test_discover_article_attachments(self) -> None: output = discover(self._config()) From c8e29d1db01688a92021d38c0deb95987ef61593 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Wed, 23 Apr 2025 16:40:35 -0700 Subject: [PATCH 16/17] file-api: run ruff format . --- .../declarative/yaml_declarative_source.py | 2 +- .../declarative/file/test_file_stream.py | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 0b99a9ea6..93bdc55e9 100644 --- a/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -35,7 +35,7 @@ def __init__( catalog=catalog or ConfiguredAirbyteCatalog(streams=[]), config=config or {}, state=state or [], - source_config=source_config + source_config=source_config, ) def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition: diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index 738d98af5..7b645f540 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -6,7 +6,9 @@ from unittest.mock import Mock, patch from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status -from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory as OriginalModelToComponentFactory +from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( + ModelToComponentFactory as OriginalModelToComponentFactory, +) from airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer import NoopFileWriter from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder @@ -43,7 +45,7 @@ def _source( path_to_yaml=str(Path(__file__).parent / yaml_file), catalog=catalog, config=config, - state=state + state=state, ) @@ -52,7 +54,7 @@ def read( catalog: ConfiguredAirbyteCatalog, state_builder: Optional[StateBuilder] = None, expecting_exception: bool = False, - yaml_file: Optional[str] = None + yaml_file: Optional[str] = None, ) -> EntrypointOutput: config = config_builder.build() state = state_builder.build() if state_builder else StateBuilder().build() @@ -218,20 +220,20 @@ def test_get_article_attachments_messages_for_connector_builder(self) -> None: # Define a mock factory that forces emit_connector_builder_messages=True class MockModelToComponentFactory(OriginalModelToComponentFactory): def __init__(self, *args, **kwargs): - kwargs['emit_connector_builder_messages'] = True + kwargs["emit_connector_builder_messages"] = True super().__init__(*args, **kwargs) # Patch the factory class where ConcurrentDeclarativeSource (parent of YamlDeclarativeSource) imports it with patch( "airbyte_cdk.sources.declarative.concurrent_declarative_source.ModelToComponentFactory", - new=MockModelToComponentFactory + new=MockModelToComponentFactory, ): output = read( self._config(), CatalogBuilder() .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) .build(), - yaml_file="test_file_stream_with_filename_extractor.yaml" + yaml_file="test_file_stream_with_filename_extractor.yaml", ) assert len(output.records) == 1 @@ -246,7 +248,8 @@ def __init__(self, *args, **kwargs): record_data = output.records[0].record.data assert record_data["staging_file_url"] == file_reference.staging_file_url assert ( - record_data["source_file_relative_path"] == file_reference.source_file_relative_path + record_data["source_file_relative_path"] + == file_reference.source_file_relative_path ) assert record_data["file_size_bytes"] == file_reference.file_size_bytes From bc4c7d637ccffef8484c4693461f2fbe043d0c38 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Mon, 28 Apr 2025 12:36:55 -0700 Subject: [PATCH 17/17] file-api: remove unused content_extractor --- .../file_uploader/default_file_uploader.py | 46 ++++++++----------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py index 1ef1a98bd..1312ab34d 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py @@ -43,7 +43,6 @@ class DefaultFileUploader(FileUploader): parameters: InitVar[Mapping[str, Any]] filename_extractor: Optional[Union[InterpolatedString, str]] = None - content_extractor: Optional[RecordExtractor] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: if self.filename_extractor: @@ -71,33 +70,28 @@ def upload(self, record: Record) -> None: ), ) - if self.content_extractor: - raise NotImplementedError( - "Content extraction is not yet implemented. The content_extractor component is currently not supported." - ) - else: - files_directory = Path(get_files_directory()) + files_directory = Path(get_files_directory()) - file_name = ( - self.filename_extractor.eval(self.config, record=record) - if self.filename_extractor - else str(uuid.uuid4()) - ) - file_name = file_name.lstrip("/") - file_relative_path = Path(record.stream_name) / Path(file_name) + file_name = ( + self.filename_extractor.eval(self.config, record=record) + if self.filename_extractor + else str(uuid.uuid4()) + ) + file_name = file_name.lstrip("/") + file_relative_path = Path(record.stream_name) / Path(file_name) - full_path = files_directory / file_relative_path - full_path.parent.mkdir(parents=True, exist_ok=True) + full_path = files_directory / file_relative_path + full_path.parent.mkdir(parents=True, exist_ok=True) - file_size_bytes = self.file_writer.write(full_path, content=response.content) + file_size_bytes = self.file_writer.write(full_path, content=response.content) - logger.info("File uploaded successfully") - logger.info(f"File url: {str(full_path)}") - logger.info(f"File size: {file_size_bytes / 1024} KB") - logger.info(f"File relative path: {str(file_relative_path)}") + logger.info("File uploaded successfully") + logger.info(f"File url: {str(full_path)}") + logger.info(f"File size: {file_size_bytes / 1024} KB") + logger.info(f"File relative path: {str(file_relative_path)}") - record.file_reference = AirbyteRecordMessageFileReference( - staging_file_url=str(full_path), - source_file_relative_path=str(file_relative_path), - file_size_bytes=file_size_bytes, - ) + record.file_reference = AirbyteRecordMessageFileReference( + staging_file_url=str(full_path), + source_file_relative_path=str(file_relative_path), + file_size_bytes=file_size_bytes, + )