Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""add new executor field to db

Revision ID: 677fdbb7fc54
Revises: 1949afb29106
Create Date: 2024-04-01 15:26:59.186579

"""

import sqlalchemy as sa
from alembic import op


# revision identifiers, used by Alembic.
revision = '677fdbb7fc54'
down_revision = '1949afb29106'
branch_labels = None
depends_on = None
airflow_version = '2.10.0'


def upgrade():
"""Apply add executor field to task instance"""
op.add_column('task_instance', sa.Column('executor', sa.String(length=1000), default=None))


def downgrade():
"""Unapply add executor field to task instance"""
op.drop_column('task_instance', 'executor')
1 change: 1 addition & 0 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner")
DEFAULT_POOL_SLOTS: int = 1
DEFAULT_PRIORITY_WEIGHT: int = 1
DEFAULT_EXECUTOR: str | None = None
DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue")
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = conf.getboolean(
"scheduler", "ignore_first_depends_on_past_by_default"
Expand Down
13 changes: 13 additions & 0 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
)
from airflow.lineage import apply_lineage, prepare_lineage
from airflow.models.abstractoperator import (
DEFAULT_EXECUTOR,
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
DEFAULT_OWNER,
DEFAULT_POOL_SLOTS,
Expand Down Expand Up @@ -208,6 +209,7 @@ def partial(**kwargs):
"wait_for_past_depends_before_skipping": DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
"wait_for_downstream": False,
"retries": DEFAULT_RETRIES,
"executor": DEFAULT_EXECUTOR,
"queue": DEFAULT_QUEUE,
"pool_slots": DEFAULT_POOL_SLOTS,
"execution_timeout": DEFAULT_TASK_EXECUTION_TIMEOUT,
Expand Down Expand Up @@ -259,6 +261,7 @@ def partial(
on_retry_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET,
on_skipped_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET,
run_as_user: str | None | ArgNotSet = NOTSET,
executor: str | None | ArgNotSet = NOTSET,
executor_config: dict | None | ArgNotSet = NOTSET,
inlets: Any | None | ArgNotSet = NOTSET,
outlets: Any | None | ArgNotSet = NOTSET,
Expand Down Expand Up @@ -326,6 +329,7 @@ def partial(
"on_success_callback": on_success_callback,
"on_skipped_callback": on_skipped_callback,
"run_as_user": run_as_user,
"executor": executor,
"executor_config": executor_config,
"inlets": inlets,
"outlets": outlets,
Expand Down Expand Up @@ -682,6 +686,7 @@ class derived from this one results in the creation of a task object,
runs across execution_dates.
:param max_active_tis_per_dagrun: When set, a task will be able to limit the concurrent
task instances per DAG run.
:param executor: Which executor to target when running this task. NOT YET SUPPORTED
:param executor_config: Additional task-level configuration parameters that are
interpreted by a specific executor. Parameters are namespaced by the name of
executor.
Expand Down Expand Up @@ -783,6 +788,7 @@ def say_hello_world(**context):
"do_xcom_push",
"multiple_outputs",
"allow_nested_operators",
"executor",
}

# Defines if the operator supports lineage without manual definitions
Expand Down Expand Up @@ -851,6 +857,7 @@ def __init__(
map_index_template: str | None = None,
max_active_tis_per_dag: int | None = None,
max_active_tis_per_dagrun: int | None = None,
executor: str | None = None,
executor_config: dict | None = None,
do_xcom_push: bool = True,
multiple_outputs: bool = False,
Expand Down Expand Up @@ -924,6 +931,12 @@ def __init__(
if end_date:
self.end_date = timezone.convert_to_utc(end_date)

if executor:
warnings.warn(
"Specifying executors for operators is not yet"
f"supported, the value {executor!r} will have no effect"
)
self.executor = executor
self.executor_config = executor_config or {}
self.run_as_user = run_as_user
self.retries = parse_retries(retries)
Expand Down
5 changes: 5 additions & 0 deletions airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from airflow.compat.functools import cache
from airflow.exceptions import AirflowException, UnmappableOperator
from airflow.models.abstractoperator import (
DEFAULT_EXECUTOR,
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
DEFAULT_OWNER,
DEFAULT_POOL_SLOTS,
Expand Down Expand Up @@ -620,6 +621,10 @@ def on_skipped_callback(self, value: TaskStateChangeCallbackAttrType) -> None:
def run_as_user(self) -> str | None:
return self.partial_kwargs.get("run_as_user")

@property
def executor(self) -> str | None:
return self.partial_kwargs.get("executor", DEFAULT_EXECUTOR)

@property
def executor_config(self) -> dict:
return self.partial_kwargs.get("executor_config", {})
Expand Down
7 changes: 7 additions & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ def _refresh_from_db(
task_instance.queued_dttm = ti.queued_dttm
task_instance.queued_by_job_id = ti.queued_by_job_id
task_instance.pid = ti.pid
task_instance.executor = ti.executor
task_instance.executor_config = ti.executor_config
task_instance.external_executor_id = ti.external_executor_id
task_instance.trigger_id = ti.trigger_id
Expand Down Expand Up @@ -952,6 +953,7 @@ def _refresh_from_task(
task_instance.run_as_user = task.run_as_user
# Do not set max_tries to task.retries here because max_tries is a cumulative
# value that needs to be stored in the db.
task_instance.executor = task.executor
task_instance.executor_config = task.executor_config
task_instance.operator = task.task_type
task_instance.custom_operator_name = getattr(task, "custom_operator_name", None)
Expand Down Expand Up @@ -1298,6 +1300,7 @@ class TaskInstance(Base, LoggingMixin):
queued_dttm = Column(UtcDateTime)
queued_by_job_id = Column(Integer)
pid = Column(Integer)
executor = Column(String(1000))
executor_config = Column(ExecutorConfigType(pickler=dill))
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow)
rendered_map_index = Column(String(250))
Expand Down Expand Up @@ -1483,6 +1486,7 @@ def insert_mapping(run_id: str, task: Operator, map_index: int) -> dict[str, Any
"priority_weight": priority_weight,
"run_as_user": task.run_as_user,
"max_tries": task.retries,
"executor": task.executor,
"executor_config": task.executor_config,
"operator": task.task_type,
"custom_operator_name": getattr(task, "custom_operator_name", None),
Expand Down Expand Up @@ -3676,6 +3680,7 @@ def __init__(
try_number: int,
map_index: int,
state: str,
executor: str | None,
executor_config: Any,
pool: str,
queue: str,
Expand All @@ -3691,6 +3696,7 @@ def __init__(
self.end_date = end_date
self.try_number = try_number
self.state = state
self.executor = executor
self.executor_config = executor_config
self.run_as_user = run_as_user
self.pool = pool
Expand Down Expand Up @@ -3729,6 +3735,7 @@ def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
end_date=ti.end_date,
try_number=ti.try_number,
state=ti.state,
executor=ti.executor,
executor_config=ti.executor_config,
pool=ti.pool,
queue=ti.queue,
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/pydantic/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
queued_dttm: Optional[datetime]
queued_by_job_id: Optional[int]
pid: Optional[int]
executor: Optional[str]
executor_config: Any
updated_at: Optional[datetime]
rendered_map_index: Optional[str]
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@
"params": { "$ref": "#/definitions/params_dict" },
"priority_weight": { "type": "number" },
"weight_rule": { "type": "string" },
"executor": { "type": "string" },
"executor_config": { "$ref": "#/definitions/dict" },
"do_xcom_push": { "type": "boolean" },
"ui_color": { "$ref": "#/definitions/color" },
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2a24225537326f38be5df14e0b7a8dca867122093e0fa932f1a11ac12d1fb11c
cccb1a4a3f22027e354cea27bb34996fd45146494cbe6893d938c02c2ddb1a61
Loading