Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
47ec7a4
skeleton of native declarative source support
aaronsteers Apr 7, 2024
8288ce8
improve implementation
aaronsteers Apr 7, 2024
be4ecd4
update with native execution, wrapping the proper CDK classes
aaronsteers May 30, 2024
d9445ca
Merge remote-tracking branch 'origin/main' into aj/feat/source-declar…
aaronsteers May 30, 2024
95d0561
update pyproject.toml
aaronsteers May 30, 2024
8857c41
add missing property 'reported_version'
aaronsteers May 30, 2024
2b263dc
partially working sync! 🎉
aaronsteers May 30, 2024
9f828d1
downgrade cdk to declared version in manifest
aaronsteers May 30, 2024
28cdb77
lint fix
aaronsteers May 30, 2024
24431d6
add error handler in declarative yaml
aaronsteers May 30, 2024
3a8e79f
fix start page
aaronsteers May 30, 2024
1ae64ac
feat: enable detecting yaml declarative sources in `get_available_con…
aaronsteers May 31, 2024
564f572
feat: auto-download manifest.yaml from github `master` branch
aaronsteers May 31, 2024
46be6f2
update example script
aaronsteers May 31, 2024
b438206
add hard-coded low-code exclusions
aaronsteers May 31, 2024
925f4de
raise errors when low-code definition isn't viable
aaronsteers May 31, 2024
bb49e79
print debug info on non-installable connectors
aaronsteers May 31, 2024
8ecf5cc
add simple test
aaronsteers Jun 1, 2024
5e18e3d
include printed count of connectors in example script
aaronsteers Jun 1, 2024
8f0d838
lint fixes
aaronsteers Jun 1, 2024
438fbce
chore: bump source-faker and airbyte-cdk to latest
aaronsteers Jun 1, 2024
b1c5c71
add integration tests for all hard-coded failures
aaronsteers Jun 1, 2024
8e28bfc
remove stale failures list
aaronsteers Jun 1, 2024
82baf69
tests: only autouse source registry in specific files
aaronsteers Jun 1, 2024
e6b2655
lint fixes
aaronsteers Jun 1, 2024
40cae57
fix autouse
aaronsteers Jun 1, 2024
f0183e0
fix some tests
aaronsteers Jun 2, 2024
7623619
add missing fixture inclusions
aaronsteers Jun 2, 2024
d3aef35
Merge remote-tracking branch 'origin/main' into aj/feat/source-declar…
aaronsteers Jun 3, 2024
b5d0924
improve fixture isolation for 'source_test_registry'
aaronsteers Jun 3, 2024
4376af1
chore: improve error message
aaronsteers Jun 3, 2024
b178d33
fix: add new bamboo connector to exception list
aaronsteers Jun 3, 2024
6753854
use enum values directly
aaronsteers Jun 3, 2024
83ead8e
make registry parsing safer and more resilient
aaronsteers Jun 3, 2024
836e00f
bump to latest airbyte-cdk
aaronsteers Jun 3, 2024
b34c45d
Auto-fix lint and format issues
Jun 3, 2024
8b09522
tests: xfail pokemon no-code on windows
aaronsteers Jun 3, 2024
3cdaf7e
Merge branch 'main' into aj/feat/source-declarative-manifest
aaronsteers Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
documents,
exceptions, # noqa: ICN001 # No 'exc' alias for top-level module
experimental,
records,
results,
secrets,
sources,
Expand Down
1 change: 1 addition & 0 deletions airbyte/_processors/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

from airbyte._processors.sql import snowflakecortex
from airbyte._processors.sql.snowflakecortex import (
SnowflakeCortexSqlProcessor,
SnowflakeCortexTypeConverter,
Expand Down
2 changes: 1 addition & 1 deletion airbyte/caches/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from __future__ import annotations

from airbyte.caches import bigquery, duckdb, motherduck, postgres, snowflake, util
from airbyte.caches import base, bigquery, duckdb, motherduck, postgres, snowflake, util
from airbyte.caches.base import CacheBase
from airbyte.caches.bigquery import BigQueryCache
from airbyte.caches.duckdb import DuckDBCache
Expand Down
104 changes: 104 additions & 0 deletions airbyte/sources/declarative.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Support for declarative yaml source testing."""

from __future__ import annotations

import json
from pathlib import Path
from typing import TYPE_CHECKING, cast

from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource

from airbyte._executor import Executor
from airbyte.exceptions import PyAirbyteInternalError
from airbyte.sources.base import Source


if TYPE_CHECKING:
from collections.abc import Iterator


class DeclarativeExecutor(Executor):
"""An executor for declarative sources."""

def __init__(
self,
manifest: str | dict | Path,
) -> None:
"""Initialize a declarative executor.

- If `manifest` is a path, it will be read as a json file.
- If `manifest` is a string, it will be parsed as an HTTP path.
- If `manifest` is a dict, it will be used as is.
"""
self._manifest_dict: dict
if isinstance(manifest, Path):
self._manifest_dict = cast(dict, json.loads(manifest.read_text()))

elif isinstance(manifest, str):
# TODO: Implement HTTP path parsing
raise NotImplementedError("HTTP path parsing is not yet implemented.")

elif isinstance(manifest, dict):
self._manifest_dict = manifest

if not isinstance(self._manifest_dict, dict):
raise PyAirbyteInternalError(message="Manifest must be a dict.")

self.declarative_source = ManifestDeclarativeSource(source_config=self._manifest_dict)
self.reported_version: str | None = None # TODO: Consider adding version detection

def execute(self, args: list[str]) -> Iterator[str]:
"""Execute the declarative source."""
source_entrypoint = AirbyteEntrypoint(self.declarative_source)
parsed_args = source_entrypoint.parse_args(args)
yield from source_entrypoint.run(parsed_args)

def ensure_installation(self, *, auto_fix: bool = True) -> None:
"""No-op. The declarative source is included with PyAirbyte."""
_ = auto_fix
pass

def install(self) -> None:
"""No-op. The declarative source is included with PyAirbyte."""
pass

def uninstall(self) -> None:
"""No-op. The declarative source is included with PyAirbyte."""
pass


class DeclarativeSource(Source):
"""A declarative source using Airbyte's Yaml low-code/no-code framework."""

def __init__(
self,
manifest: str | dict | Path,
) -> None:
"""Initialize a declarative source.

Sample usages:
```python
manifest_path = "path/to/manifest.yaml"

source_a = DeclarativeSource(manifest=Path(manifest_path))
source_b = DeclarativeSource(manifest=Path(manifest_path).read_text())
source_c = DeclarativeSource(manifest=yaml.load(Path(manifest_path).read_text()))
```

Args:
manifest: The manifest for the declarative source. This can be a path to a yaml file, a
yaml string, or a dict.
"""
# TODO: Conform manifest to a dict or str (TBD)
self.manifest = manifest

# Initialize the source using the base class implementation
super().__init__(
name="Declarative", # TODO: Get name from manifest
config={ # TODO: Put 'real' config here
"manifest": manifest,
},
executor=DeclarativeExecutor(manifest),
)
155 changes: 151 additions & 4 deletions airbyte/sources/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

import json
import os
import warnings
from copy import copy
from dataclasses import dataclass
from enum import Enum
from pathlib import Path

import requests
Expand All @@ -19,6 +21,89 @@
_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"

_LOWCODE_LABEL = "cdk:low-code"

_LOWCODE_CONNECTORS_NEEDING_PYTHON = [
"source-alpha-vantage",
"source-amplitude",
"source-apify-dataset",
"source-avni",
Comment thread
aaronsteers marked this conversation as resolved.
"source-bamboo-hr",
"source-braintree",
"source-braze",
"source-chargebee",
"source-close-com",
"source-commercetools",
"source-facebook-pages",
"source-fastbill",
"source-freshdesk",
"source-gitlab",
"source-gnews",
"source-greenhouse",
"source-instatus",
"source-intercom",
"source-iterable",
"source-jira",
"source-klaviyo",
"source-mailchimp",
"source-mixpanel",
"source-monday",
"source-my-hours",
"source-notion",
"source-okta",
"source-outreach",
"source-partnerstack",
"source-paypal-transaction",
"source-pinterest",
"source-pipedrive",
"source-pocket",
"source-posthog",
"source-prestashop",
"source-public-apis",
"source-qualaroo",
"source-quickbooks",
"source-railz",
"source-recharge",
"source-retently",
"source-rss",
"source-slack",
"source-surveymonkey",
"source-the-guardian-api",
"source-trello",
"source-typeform",
"source-xero",
"source-younium",
"source-zendesk-chat",
"source-zendesk-sunshine",
"source-zendesk-support",
"source-zendesk-talk",
"source-zenloop",
"source-zoom",
]
_LOWCODE_CONNECTORS_FAILING_VALIDATION = [
"source-amazon-ads",
]
_LOWCODE_CONNECTORS_404 = [
"source-unleash",
]
_LOWCODE_CONNECTORS_EXCLUDED: list[str] = [
*_LOWCODE_CONNECTORS_FAILING_VALIDATION,
*_LOWCODE_CONNECTORS_404,
*_LOWCODE_CONNECTORS_NEEDING_PYTHON,
]


class InstallType(str, Enum):
YAML = "yaml"
PYTHON = "python"
DOCKER = "docker"
JAVA = "java"


class Language(str, Enum):
PYTHON = InstallType.PYTHON.value
JAVA = InstallType.JAVA.value


@dataclass
class ConnectorMetadata:
Expand All @@ -33,6 +118,12 @@ class ConnectorMetadata:
pypi_package_name: str | None
"""The name of the PyPI package for the connector, if it exists."""

language: Language | None
"""The language of the connector."""

install_types: set[InstallType]
"""The supported install types for the connector."""


def _get_registry_url() -> str:
if _REGISTRY_ENV_VAR in os.environ:
Expand All @@ -43,14 +134,36 @@ def _get_registry_url() -> str:

def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
name = entry["dockerRepository"].replace("airbyte/", "")
language: Language | None = None
if "language" in entry and entry["language"] is not None:
try:
language = Language(entry["language"])
except Exception:
warnings.warn(
message=f"Invalid language for connector {name}: {entry['language']}",
stacklevel=2,
)
remote_registries: dict = entry.get("remoteRegistries", {})
pypi_registry: dict = remote_registries.get("pypi", {})
pypi_package_name: str = pypi_registry.get("packageName", None)
pypi_enabled: bool = pypi_registry.get("enabled", False)
install_types: set[InstallType] = {
x
for x in [
InstallType.DOCKER if entry.get("dockerImageTag") else None,
InstallType.PYTHON if pypi_enabled else None,
InstallType.JAVA if language == Language.JAVA else None,
InstallType.YAML if _LOWCODE_LABEL in entry.get("tags", []) else None,
]
if x
}

return ConnectorMetadata(
name=name,
latest_available_version=entry["dockerImageTag"],
latest_available_version=entry.get("dockerImageTag", None),
pypi_package_name=pypi_package_name if pypi_enabled else None,
language=language,
install_types=install_types,
)


Expand Down Expand Up @@ -114,11 +227,45 @@ def get_connector_metadata(name: str) -> ConnectorMetadata:
return cache[name]


def get_available_connectors() -> list[str]:
def get_available_connectors(install_type: InstallType | str = InstallType.PYTHON) -> list[str]:
"""Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".
"""
return sorted(
conn.name for conn in _get_registry_cache().values() if conn.pypi_package_name is not None
if not isinstance(install_type, InstallType):
install_type = InstallType(install_type)

if install_type == InstallType.PYTHON:
return sorted(
conn.name
for conn in _get_registry_cache().values()
if conn.pypi_package_name is not None
)

if install_type == InstallType.JAVA:
warnings.warn(
message="Java connectors are not yet supported.",
stacklevel=2,
)
return sorted(
conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
)

if install_type == InstallType.DOCKER:
return sorted(conn.name for conn in _get_registry_cache().values())

if install_type == InstallType.YAML:
return sorted(
conn.name
for conn in _get_registry_cache().values()
if InstallType.YAML in conn.install_types
and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
)

# pragma: no cover # Should never be reached.
raise exc.PyAirbyteInputError(
message="Invalid install type.",
context={
"install_type": install_type,
},
)
Loading