Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## [Unreleased]

## [0.24.0] - 2022-10-28

- Airbyte integration
- `dp deploy` is able to add / update connections on Airbyte instance
- `dp deploy` is able to create DAG at the beggining of dbt builds that will execute ingestion tasks
- `dp deploy` accept additional attribute `auth-token` that can be used to authorize access to cloud services

## [0.23.0] - 2022-10-19

## [0.22.1] - 2022-10-11
Expand Down
52 changes: 12 additions & 40 deletions data_pipelines_cli/airbyte_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,49 +9,20 @@
import yaml

from .cli_constants import BUILD_DIR
from .cli_utils import echo_error, echo_info, get_idToken_from_service_account_file
from .errors import AirbyteFactoryError
from .cli_utils import echo_error, echo_info


class AirbyteFactory:
"""A class used to create and update Airbyte connections defined in config yaml file"""

airbyte_config_path: pathlib.Path
"""Path to config yaml file containing connections definitions"""
iap_enabled: bool
"""Whether Airbyte instance is secured with IAP"""
airbyte_iap_client_id: Optional[str]
"""IAP Client ID of Airbyte instance"""
gcp_sa_key_path: Optional[str]
"""Path to the key file of GCP service account for communication with IAP"""
""""""
auth_token: Optional[str]
"""Authorization OIDC ID token for a service account to communication with Airbyte instance"""

def __init__(
self,
airbyte_config_path: pathlib.Path,
iap_enabled: bool,
airbyte_iap_client_id: Optional[str] = None,
gcp_sa_key_path: Optional[str] = None,
) -> None:
def __init__(self, airbyte_config_path: pathlib.Path, auth_token: Optional[str]) -> None:
self.airbyte_config_path = airbyte_config_path
self.airbyte_url = None
self.id_token = None

if iap_enabled:
if airbyte_iap_client_id is None:
raise AirbyteFactoryError(
"Missing information to authorize IAP request to Airbyte."
"Provide `--airbyte-iap-client-id` argument to the dp command."
)
elif gcp_sa_key_path is None:
raise AirbyteFactoryError(
"Missing information to authorize IAP request to Airbyte."
"Provide `--gcp-sa-key-path` argument to the dp command."
)
else:
self.id_token = get_idToken_from_service_account_file(
gcp_sa_key_path, airbyte_iap_client_id
)
self.auth_token = auth_token

with open(self.airbyte_config_path, "r") as airbyte_config_file:
self.airbyte_config = yaml.safe_load(airbyte_config_file)
Expand Down Expand Up @@ -80,7 +51,7 @@ def create_update_connections(self) -> None:
def create_update_connection(self, connection_config: Dict[str, Any]) -> Any:
connection_config_copy = copy.deepcopy(connection_config)
response_search = self.request_handler(
f"{self.airbyte_url}/api/v1/connections/search",
"connections/search",
{
"sourceId": connection_config_copy["sourceId"],
"destinationId": connection_config_copy["destinationId"],
Expand All @@ -91,7 +62,7 @@ def create_update_connection(self, connection_config: Dict[str, Any]) -> Any:
if not response_search["connections"]:
echo_info(f"Creating connection config for {connection_config_copy['name']}")
response_create = self.request_handler(
f"{self.airbyte_url}/api/v1/connections/create",
"connections/create",
connection_config_copy,
)
os.environ[response_create["name"]] = response_create["connectionId"]
Expand All @@ -103,7 +74,7 @@ def create_update_connection(self, connection_config: Dict[str, Any]) -> Any:
"connectionId"
]
response_update = self.request_handler(
f"{self.airbyte_url}/api/v1/connections/update",
"connections/update",
connection_config_copy,
)
os.environ[response_update["name"]] = response_update["connectionId"]
Expand All @@ -112,13 +83,14 @@ def update_file(self, updated_config: Dict[str, Any]) -> None:
with open(self.airbyte_config_path, "w") as airbyte_config_file:
yaml.safe_dump(updated_config, airbyte_config_file)

def request_handler(self, url: str, config: Dict[str, Any]) -> Union[Dict[str, Any], Any]:
def request_handler(self, endpoint: str, config: Dict[str, Any]) -> Union[Dict[str, Any], Any]:
url = f"{self.airbyte_url}/api/v1/{endpoint}"
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if self.id_token is not None:
headers["Authorization"] = f"Bearer {self.id_token}"
if self.auth_token is not None:
headers["Authorization"] = f"Bearer {self.auth_token}"

try:
response = requests.post(url=url, headers=headers, data=json.dumps(config))
Expand Down
1 change: 1 addition & 0 deletions data_pipelines_cli/cli_commands/compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def _dbt_compile(env: str) -> None:
run_dbt_command(("deps",), env, profiles_path)
run_dbt_command(("compile",), env, profiles_path)
run_dbt_command(("docs", "generate"), env, profiles_path)
run_dbt_command(("source", "freshness"), env, profiles_path)


def _copy_dbt_manifest() -> None:
Expand Down
30 changes: 8 additions & 22 deletions data_pipelines_cli/cli_commands/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ class DeployCommand:
e.g. path to a token, username, password, etc."""
env: str
bi_git_key_path: str
"""Path to JSON file containing key for GCP service account
used to communicate with IAP-secured applications"""
gcp_sa_key_path: Optional[str]
"""Client ID of Airbyte IAP-secured instance"""
airbyte_iap_client_id: Optional[str]
auth_token: Optional[str]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could add few words about this var for example
"""Authorization OIDC ID token for a service account to communication with Airbyte instance"""


def __init__(
self,
Expand All @@ -52,16 +48,14 @@ def __init__(
provider_kwargs_dict: Optional[Dict[str, Any]],
datahub_ingest: bool,
bi_git_key_path: str,
gcp_sa_key_path: Optional[str] = None,
airbyte_iap_client_id: Optional[str] = None,
auth_token: Optional[str],
) -> None:
self.docker_args = DockerArgs(env, None, {}) if docker_push else None
self.datahub_ingest = datahub_ingest
self.provider_kwargs_dict = provider_kwargs_dict or {}
self.env = env
self.bi_git_key_path = bi_git_key_path
self.gcp_sa_key_path = gcp_sa_key_path
self.airbyte_iap_client_id = airbyte_iap_client_id
self.auth_token = auth_token

try:
self.blob_address_path = (
Expand Down Expand Up @@ -153,10 +147,7 @@ def _enable_ingest(self) -> None:
echo_info("Ingesting airbyte config")
airbyte_config_path = AirbyteFactory.find_config_file(self.env, "airbyte")
AirbyteFactory(
airbyte_config_path=airbyte_config_path,
iap_enabled=True,
airbyte_iap_client_id=self.airbyte_iap_client_id,
gcp_sa_key_path=self.gcp_sa_key_path,
airbyte_config_path=airbyte_config_path, auth_token=self.auth_token
).create_update_connections()

def _sync_bucket(self) -> None:
Expand Down Expand Up @@ -199,13 +190,10 @@ def _sync_bucket(self) -> None:
help="Path to the key with write access to repo",
)
@click.option(
"--gcp-sa-key-path",
"--auth-token",
type=str,
required=False,
help="Path to the key file of GCP service account for communication with IAP",
)
@click.option(
"--airbyte-iap-client-id", type=str, required=False, help="IAP Client ID of Airbyte instance"
help="Authorization OIDC ID token for a service account to communication with cloud services",
)
def deploy_command(
env: str,
Expand All @@ -214,8 +202,7 @@ def deploy_command(
docker_push: bool,
datahub_ingest: bool,
bi_git_key_path: str,
gcp_sa_key_path: Optional[str],
airbyte_iap_client_id: Optional[str],
auth_token: Optional[str],
) -> None:
if blob_args:
try:
Expand All @@ -233,6 +220,5 @@ def deploy_command(
provider_kwargs_dict,
datahub_ingest,
bi_git_key_path,
gcp_sa_key_path,
airbyte_iap_client_id,
auth_token,
).deploy()
24 changes: 0 additions & 24 deletions data_pipelines_cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from typing import Any, List, Optional

import click
import google.auth.transport.requests
from google.oauth2 import service_account

from data_pipelines_cli.errors import (
DataPipelinesError,
Expand Down Expand Up @@ -124,25 +122,3 @@ def subprocess_run(
err.returncode,
err.output.decode(encoding=sys.stdout.encoding or "utf-8") if err.output else None,
)


def get_idToken_from_service_account_file(json_credentials_path: str, target_audience: str) -> str:
"""
Obtain ID Token for a Service Account against a provided target audience

:param json_credentials_path: Path to Service Account JSON credentials file
:type json_credentials_path: str
:param target_audience: The URL or target audience to obtain the ID token for.
:type target_audience: str
"""
credentials = service_account.IDTokenCredentials.from_service_account_file(
filename=json_credentials_path, target_audience=target_audience
)
request = google.auth.transport.requests.Request()
try:
credentials.refresh(request)
except google.auth.exceptions.RefreshError as err:
raise DataPipelinesError(
"An error occured while refreshing GCP Service Account credentials.", err
)
return credentials.token
7 changes: 0 additions & 7 deletions data_pipelines_cli/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,3 @@ def __init__(self) -> None:
"Variable 'target_id' cannot be found in 'bi.yml' "
"config file or the value not matched supported solutions."
)


class AirbyteFactoryError(DataPipelinesError):
"""Exception raised during Airbyte connections creation"""

def __init__(self, error_message: str) -> None:
super().__init__("An error occured during Airbyte connections creation." f"{error_message}")
Binary file modified docs/images/integration.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

setup(
name="data_pipelines_cli",
version="0.23.0",
version="0.24.0",
description="CLI for data platform",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
23 changes: 11 additions & 12 deletions tests/cli_commands/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def mock_init(
provider_kwargs_dict,
_datahub_ingest,
_bi_git_key_path,
_gcp_sa_key_path,
_airbyte_iap_client_id,
_auth_token,
):
nonlocal result_provider_kwargs
result_provider_kwargs = provider_kwargs_dict
Expand Down Expand Up @@ -160,7 +159,7 @@ def test_no_datahub_method(self):
):
with self.assertRaises(DependencyNotInstalledError):
DeployCommand(
"base", False, self.storage_uri, self.provider_args, True, None, None, None
"base", False, self.storage_uri, self.provider_args, True, None, None
).deploy()

@patch("data_pipelines_cli.cli_commands.deploy.BUILD_DIR", goldens_dir_path)
Expand All @@ -172,7 +171,7 @@ def test_datahub_run(self):
"data_pipelines_cli.cli_commands.deploy.bi"
):
DeployCommand(
"base", False, self.storage_uri, self.provider_args, True, None, None, None
"base", False, self.storage_uri, self.provider_args, True, None, None
).deploy()
self.assertListEqual(
[
Expand All @@ -190,7 +189,7 @@ def test_no_docker_method(self):
), patch("data_pipelines_cli.cli_constants.BUILD_DIR", self.build_temp_dir):
with self.assertRaises(DependencyNotInstalledError):
DeployCommand(
"base", True, self.storage_uri, self.provider_args, False, None, None, None
"base", True, self.storage_uri, self.provider_args, False, None, None
).deploy()

@patch(
Expand All @@ -199,7 +198,7 @@ def test_no_docker_method(self):
)
def test_no_airflow_address(self):
with self.assertRaises(AirflowDagsPathKeyError):
DeployCommand("base", False, None, None, False, None, None, None)
DeployCommand("base", False, None, None, False, None, None)

def test_airflow_address(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch(
Expand All @@ -214,7 +213,7 @@ def test_airflow_address(self):
tmp_airflow_path,
)

deploy_command = DeployCommand("base", False, None, None, False, None, None, None)
deploy_command = DeployCommand("base", False, None, None, False, None, None)
self.assertEqual(
"gcs://test-sync-project/sync-dir/dags/my-project-name",
deploy_command.blob_address_path,
Expand All @@ -236,7 +235,7 @@ def test_staging_airflow_address(self):
tmp_file_path,
)

deploy_command = DeployCommand("staging", False, None, None, False, None, None, None)
deploy_command = DeployCommand("staging", False, None, None, False, None, None)
self.assertEqual(
"gcs://test/jinja/path/com/my/project/name",
deploy_command.blob_address_path,
Expand Down Expand Up @@ -268,7 +267,7 @@ def _mock_docker(**kwargs):
"data_pipelines_cli.cli_commands.deploy.bi"
):
DeployCommand(
"base", True, self.storage_uri, self.provider_args, False, None, None, None
"base", True, self.storage_uri, self.provider_args, False, None, None
).deploy()

self.assertEqual("my_docker_repository_uri", docker_kwargs.get("repository"))
Expand Down Expand Up @@ -298,15 +297,15 @@ def _mock_docker(**_kwargs):
):
with self.assertRaises(DataPipelinesError):
DeployCommand(
"base", True, self.storage_uri, self.provider_args, False, None, None, None
"base", True, self.storage_uri, self.provider_args, False, None, None
).deploy()

def test_ingestion_is_false_by_default(self):
with patch("data_pipelines_cli.cli_constants.BUILD_DIR", self.build_temp_dir), patch(
"data_pipelines_cli.cli_commands.deploy.BUILD_DIR", self.build_temp_dir
):
deploy_command = DeployCommand(
"prod", True, self.storage_uri, self.provider_args, False, None, None, None
"prod", True, self.storage_uri, self.provider_args, False, None, None
)
self.assertEqual(deploy_command.enable_ingest, False)

Expand All @@ -315,6 +314,6 @@ def test_ingestion_is_read_from_env_directory(self):
"data_pipelines_cli.cli_commands.deploy.BUILD_DIR", self.build_temp_dir
):
deploy_command = DeployCommand(
"dev", True, self.storage_uri, self.provider_args, False, None, None, None
"dev", True, self.storage_uri, self.provider_args, False, None, None
)
self.assertEqual(deploy_command.enable_ingest, True)
Loading