Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions airflow-core/newsfragments/68175.significant.rst

This file was deleted.

2 changes: 0 additions & 2 deletions airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ dependencies = [
"universal-pathlib>=0.3.8",
"uuid6>=2024.7.10",
"apache-airflow-task-sdk<1.4.0,>=1.3.0",
"apache-airflow-ctl<0.1.6,>=0.1.5",
# pre-installed providers
"apache-airflow-providers-common-compat>=1.7.4",
"apache-airflow-providers-common-io>=1.6.3",
Expand Down Expand Up @@ -328,7 +327,6 @@ required-version = ">=0.11.8"

[tool.uv.sources]
apache-airflow-core = {workspace = true}
apache-airflow-ctl = {workspace = true}
apache-airflow-devel-common = { workspace = true }

[tool.airflow]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -14,25 +15,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""API Client that allows interacting with Airflow API."""

from __future__ import annotations

import subprocess
import sys
from airflow.api.client.local_client import Client


def test_airflowctl_is_importable():
# checks if airflowctl imports correctly
result = subprocess.run(
[
sys.executable,
"-c",
"import airflowctl; print('airflowctl imported successfully')",
],
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0, (
f"airflowctl import failed!\nstdout: {result.stdout}\nstderr: {result.stderr}"
)
def get_current_api_client() -> Client:
return Client()
107 changes: 107 additions & 0 deletions airflow-core/src/airflow/api/client/local_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Local client API."""

from __future__ import annotations

import httpx

from airflow.api.common import delete_dag, trigger_dag
from airflow.exceptions import AirflowBadRequest, PoolNotFound
from airflow.models.pool import Pool
from airflow.utils.types import DagRunTriggeredByType


class Client:
"""Local API client implementation."""

def __init__(self, auth=None, session: httpx.Client | None = None):
self._session: httpx.Client = session or httpx.Client()
if auth:
self._session.auth = auth

def trigger_dag(
self,
dag_id,
run_id=None,
conf=None,
logical_date=None,
triggering_user_name=None,
replace_microseconds=True,
) -> dict | None:
dag_run = trigger_dag.trigger_dag(
dag_id=dag_id,
triggered_by=DagRunTriggeredByType.CLI,
triggering_user_name=triggering_user_name,
run_id=run_id,
conf=conf,
logical_date=logical_date,
replace_microseconds=replace_microseconds,
)
if dag_run:
return {
"conf": dag_run.conf,
"dag_id": dag_run.dag_id,
"dag_run_id": dag_run.run_id,
"data_interval_start": dag_run.data_interval_start,
"data_interval_end": dag_run.data_interval_end,
"end_date": dag_run.end_date,
"last_scheduling_decision": dag_run.last_scheduling_decision,
"logical_date": dag_run.logical_date,
"run_type": dag_run.run_type,
"start_date": dag_run.start_date,
"state": dag_run.state,
"triggering_user_name": dag_run.triggering_user_name,
}
return dag_run

def delete_dag(self, dag_id):
count = delete_dag.delete_dag(dag_id)
return f"Removed {count} record(s)"

def get_pool(self, name):
pool = Pool.get_pool(pool_name=name)
if not pool:
raise PoolNotFound(f"Pool {name} not found")
return pool.pool, pool.slots, pool.description, pool.include_deferred, pool.team_name

def get_pools(self):
return [(p.pool, p.slots, p.description, p.include_deferred, p.team_name) for p in Pool.get_pools()]

def create_pool(self, name, slots, description, include_deferred, team_name=None):
if not (name and name.strip()):
raise AirflowBadRequest("Pool name shouldn't be empty")
pool_name_length = Pool.pool.property.columns[0].type.length
if len(name) > pool_name_length:
raise AirflowBadRequest(f"Pool name cannot be more than {pool_name_length} characters")
try:
slots = int(slots)
except ValueError:
raise AirflowBadRequest(f"Invalid value for `slots`: {slots}")
pool = Pool.create_or_update_pool(
name=name,
slots=slots,
description=description,
include_deferred=include_deferred,
team_name=team_name,
)
return pool.pool, pool.slots, pool.description, pool.team_name

def delete_pool(self, name):
pool = Pool.delete_pool(name=name)
return pool.pool, pool.slots, pool.description
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,6 @@ def generate_jwt(
self.serialize_user(user)
)

def get_cli_user(self) -> T:
"""
Return the user the local CLI acts as when calling the API server.

The Airflow CLI mints a short-lived JWT for this user (via :meth:`generate_jwt`)
so it can talk to the API server without persisting any credentials. A generic
auth manager cannot know which user is authorized for local CLI access, so the
default raises. Auth managers that support local CLI usage should override this
to return an administrative user. Otherwise, operators must provide a token via
the ``AIRFLOW_CLI_TOKEN`` environment variable.
"""
raise NotImplementedError(
f"{type(self).__name__} does not support minting a local CLI token. "
"Set the AIRFLOW_CLI_TOKEN environment variable with a valid API token instead."
)

@abstractmethod
def get_url_login(self, **kwargs) -> str:
"""Return the login page url."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,6 @@ def deserialize_user(self, token: dict[str, Any]) -> SimpleAuthManagerUser:
def serialize_user(self, user: SimpleAuthManagerUser) -> dict[str, Any]:
return {"sub": user.username, "role": user.role, "teams": user.teams}

def get_cli_user(self) -> SimpleAuthManagerUser:
return SimpleAuthManagerUser(username="cli", role=SimpleAuthManagerRole.ADMIN.name)

def is_authorized_configuration(
self,
*,
Expand Down
129 changes: 0 additions & 129 deletions airflow-core/src/airflow/cli/api_client.py

This file was deleted.

Loading
Loading