From d487a3137dc3bca0ede87411ebc664a5eb4521c5 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 20 Jul 2024 00:22:53 +0200 Subject: [PATCH 1/2] Add support for basic authentication on internal API client --- airflow/api_internal/internal_api_call.py | 32 +++++++++++++++++------ 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index c3a67d03ee18c..dbdda7859fe66 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -21,10 +21,11 @@ import json import logging from functools import wraps -from typing import Callable, TypeVar +from typing import TYPE_CHECKING, Callable, TypeVar import requests import tenacity +from requests.auth import HTTPBasicAuth from urllib3.exceptions import NewConnectionError from airflow.configuration import conf @@ -32,6 +33,9 @@ from airflow.settings import _ENABLE_AIP_44 from airflow.typing_compat import ParamSpec +if TYPE_CHECKING: + from requests.auth import AuthBase + PS = ParamSpec("PS") RT = TypeVar("RT") @@ -44,6 +48,7 @@ class InternalApiConfig: _initialized = False _use_internal_api = False _internal_api_endpoint = "" + _internal_api_auth: AuthBase | None = None @staticmethod def force_database_direct_access(): @@ -68,21 +73,31 @@ def get_internal_api_endpoint(): InternalApiConfig._init_values() return InternalApiConfig._internal_api_endpoint + @staticmethod + def get_auth() -> AuthBase | None: + return InternalApiConfig._internal_api_auth + @staticmethod def _init_values(): use_internal_api = conf.getboolean("core", "database_access_isolation", fallback=False) if use_internal_api and not _ENABLE_AIP_44: raise RuntimeError("The AIP_44 is not enabled so you cannot use it.") - internal_api_endpoint = "" if use_internal_api: - internal_api_url = conf.get("core", "internal_api_url") - internal_api_endpoint = internal_api_url + "/internal_api/v1/rpcapi" - if not internal_api_endpoint.startswith("http://"): - raise AirflowConfigException("[core]internal_api_url must start with http://") + internal_api_endpoint = conf.get("core", "internal_api_url") + if internal_api_endpoint.find("/", 8) == -1: + internal_api_endpoint = internal_api_endpoint + "/internal_api/v1/rpcapi" + if not internal_api_endpoint.startswith("http://") and not internal_api_endpoint.startswith( + "https://" + ): + raise AirflowConfigException("[core]internal_api_url must start with http:// or https://") + InternalApiConfig._internal_api_endpoint = internal_api_endpoint + internal_api_user = conf.get("core", "internal_api_user") + internal_api_password = conf.get("core", "internal_api_password") + if internal_api_user and internal_api_password: + InternalApiConfig._internal_api_auth = HTTPBasicAuth(internal_api_user, internal_api_password) InternalApiConfig._initialized = True InternalApiConfig._use_internal_api = use_internal_api - InternalApiConfig._internal_api_endpoint = internal_api_endpoint def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: @@ -112,7 +127,8 @@ def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: def make_jsonrpc_request(method_name: str, params_json: str) -> bytes: data = {"jsonrpc": "2.0", "method": method_name, "params": params_json} internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint() - response = requests.post(url=internal_api_endpoint, data=json.dumps(data), headers=headers) + auth = InternalApiConfig.get_auth() + response = requests.post(url=internal_api_endpoint, data=json.dumps(data), headers=headers, auth=auth) if response.status_code != 200: raise AirflowException( f"Got {response.status_code}:{response.reason} when sending " From 2847a6859af9df5ff2ffdb6532677d82e4ad7ee2 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 20 Jul 2024 12:05:40 +0200 Subject: [PATCH 2/2] Review Feedback --- airflow/api_internal/internal_api_call.py | 14 +++++++------- airflow/config_templates/config.yml | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index dbdda7859fe66..5768fbdd6d807 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -22,6 +22,7 @@ import logging from functools import wraps from typing import TYPE_CHECKING, Callable, TypeVar +from urllib.parse import urlparse import requests import tenacity @@ -83,14 +84,13 @@ def _init_values(): if use_internal_api and not _ENABLE_AIP_44: raise RuntimeError("The AIP_44 is not enabled so you cannot use it.") if use_internal_api: - internal_api_endpoint = conf.get("core", "internal_api_url") - if internal_api_endpoint.find("/", 8) == -1: - internal_api_endpoint = internal_api_endpoint + "/internal_api/v1/rpcapi" - if not internal_api_endpoint.startswith("http://") and not internal_api_endpoint.startswith( - "https://" - ): + url_conf = urlparse(conf.get("core", "internal_api_url")) + api_path = url_conf.path + if len(api_path) < 2: + api_path = "/internal_api/v1/rpcapi" + if url_conf.scheme in ["http", "https"]: raise AirflowConfigException("[core]internal_api_url must start with http:// or https://") - InternalApiConfig._internal_api_endpoint = internal_api_endpoint + InternalApiConfig._internal_api_endpoint = f"{url_conf.scheme}://{url_conf.netloc}{api_path}" internal_api_user = conf.get("core", "internal_api_user") internal_api_password = conf.get("core", "internal_api_password") if internal_api_user and internal_api_password: diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 1cd39166ba084..81eeda7d4f819 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -513,6 +513,22 @@ core: type: string default: ~ example: 'http://localhost:8080' + internal_api_user: + description: | + (experimental) If internal API is access-protected the user name for basic authentication. + version_added: 2.10.0 + type: string + default: ~ + example: 'api_user' + internal_api_password: + description: | + (experimental) If internal API is access-protected the password for basic authentication. + Note: we expect not to hard-code the password but recommend starting the processes via an env + ``AIRFLOW__CORE__INTERNAL_API_PASSWORD``. + version_added: 2.10.0 + type: string + default: ~ + example: 'i_willKeep!thisSecret4All?' test_connection: description: | The ability to allow testing connections across Airflow UI, API and CLI.