diff --git a/airflow/migrations/versions/0141_2_10_0_add_new_executor_field_to_db.py b/airflow/migrations/versions/0141_2_10_0_add_new_executor_field_to_db.py
new file mode 100644
index 0000000000000..67e463f1cc1d5
--- /dev/null
+++ b/airflow/migrations/versions/0141_2_10_0_add_new_executor_field_to_db.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add new executor field to db
+
+Revision ID: 677fdbb7fc54
+Revises: 1949afb29106
+Create Date: 2024-04-01 15:26:59.186579
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+
+# revision identifiers, used by Alembic.
+revision = '677fdbb7fc54'
+down_revision = '1949afb29106'
+branch_labels = None
+depends_on = None
+airflow_version = '2.10.0'
+
+
+def upgrade():
+ """Apply add executor field to task instance"""
+ op.add_column('task_instance', sa.Column('executor', sa.String(length=1000), default=None))
+
+
+def downgrade():
+ """Unapply add executor field to task instance"""
+ op.drop_column('task_instance', 'executor')
diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py
index 380c6fcf00f3d..c78eeb9ee7d6f 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -60,6 +60,7 @@
DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner")
DEFAULT_POOL_SLOTS: int = 1
DEFAULT_PRIORITY_WEIGHT: int = 1
+DEFAULT_EXECUTOR: str | None = None
DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue")
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = conf.getboolean(
"scheduler", "ignore_first_depends_on_past_by_default"
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 5334fc90205aa..05807db47e1b7 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -63,6 +63,7 @@
)
from airflow.lineage import apply_lineage, prepare_lineage
from airflow.models.abstractoperator import (
+ DEFAULT_EXECUTOR,
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
DEFAULT_OWNER,
DEFAULT_POOL_SLOTS,
@@ -208,6 +209,7 @@ def partial(**kwargs):
"wait_for_past_depends_before_skipping": DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
"wait_for_downstream": False,
"retries": DEFAULT_RETRIES,
+ "executor": DEFAULT_EXECUTOR,
"queue": DEFAULT_QUEUE,
"pool_slots": DEFAULT_POOL_SLOTS,
"execution_timeout": DEFAULT_TASK_EXECUTION_TIMEOUT,
@@ -259,6 +261,7 @@ def partial(
on_retry_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET,
on_skipped_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET,
run_as_user: str | None | ArgNotSet = NOTSET,
+ executor: str | None | ArgNotSet = NOTSET,
executor_config: dict | None | ArgNotSet = NOTSET,
inlets: Any | None | ArgNotSet = NOTSET,
outlets: Any | None | ArgNotSet = NOTSET,
@@ -326,6 +329,7 @@ def partial(
"on_success_callback": on_success_callback,
"on_skipped_callback": on_skipped_callback,
"run_as_user": run_as_user,
+ "executor": executor,
"executor_config": executor_config,
"inlets": inlets,
"outlets": outlets,
@@ -682,6 +686,7 @@ class derived from this one results in the creation of a task object,
runs across execution_dates.
:param max_active_tis_per_dagrun: When set, a task will be able to limit the concurrent
task instances per DAG run.
+ :param executor: Which executor to target when running this task. NOT YET SUPPORTED
:param executor_config: Additional task-level configuration parameters that are
interpreted by a specific executor. Parameters are namespaced by the name of
executor.
@@ -783,6 +788,7 @@ def say_hello_world(**context):
"do_xcom_push",
"multiple_outputs",
"allow_nested_operators",
+ "executor",
}
# Defines if the operator supports lineage without manual definitions
@@ -851,6 +857,7 @@ def __init__(
map_index_template: str | None = None,
max_active_tis_per_dag: int | None = None,
max_active_tis_per_dagrun: int | None = None,
+ executor: str | None = None,
executor_config: dict | None = None,
do_xcom_push: bool = True,
multiple_outputs: bool = False,
@@ -924,6 +931,12 @@ def __init__(
if end_date:
self.end_date = timezone.convert_to_utc(end_date)
+ if executor:
+ warnings.warn(
+ "Specifying executors for operators is not yet"
+ f"supported, the value {executor!r} will have no effect"
+ )
+ self.executor = executor
self.executor_config = executor_config or {}
self.run_as_user = run_as_user
self.retries = parse_retries(retries)
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 994e041d9fa5c..9e1200c0494c8 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -28,6 +28,7 @@
from airflow.compat.functools import cache
from airflow.exceptions import AirflowException, UnmappableOperator
from airflow.models.abstractoperator import (
+ DEFAULT_EXECUTOR,
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
DEFAULT_OWNER,
DEFAULT_POOL_SLOTS,
@@ -620,6 +621,10 @@ def on_skipped_callback(self, value: TaskStateChangeCallbackAttrType) -> None:
def run_as_user(self) -> str | None:
return self.partial_kwargs.get("run_as_user")
+ @property
+ def executor(self) -> str | None:
+ return self.partial_kwargs.get("executor", DEFAULT_EXECUTOR)
+
@property
def executor_config(self) -> dict:
return self.partial_kwargs.get("executor_config", {})
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index c9bd2ce617154..c0db3d6c101f7 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -547,6 +547,7 @@ def _refresh_from_db(
task_instance.queued_dttm = ti.queued_dttm
task_instance.queued_by_job_id = ti.queued_by_job_id
task_instance.pid = ti.pid
+ task_instance.executor = ti.executor
task_instance.executor_config = ti.executor_config
task_instance.external_executor_id = ti.external_executor_id
task_instance.trigger_id = ti.trigger_id
@@ -952,6 +953,7 @@ def _refresh_from_task(
task_instance.run_as_user = task.run_as_user
# Do not set max_tries to task.retries here because max_tries is a cumulative
# value that needs to be stored in the db.
+ task_instance.executor = task.executor
task_instance.executor_config = task.executor_config
task_instance.operator = task.task_type
task_instance.custom_operator_name = getattr(task, "custom_operator_name", None)
@@ -1298,6 +1300,7 @@ class TaskInstance(Base, LoggingMixin):
queued_dttm = Column(UtcDateTime)
queued_by_job_id = Column(Integer)
pid = Column(Integer)
+ executor = Column(String(1000))
executor_config = Column(ExecutorConfigType(pickler=dill))
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow)
rendered_map_index = Column(String(250))
@@ -1483,6 +1486,7 @@ def insert_mapping(run_id: str, task: Operator, map_index: int) -> dict[str, Any
"priority_weight": priority_weight,
"run_as_user": task.run_as_user,
"max_tries": task.retries,
+ "executor": task.executor,
"executor_config": task.executor_config,
"operator": task.task_type,
"custom_operator_name": getattr(task, "custom_operator_name", None),
@@ -3676,6 +3680,7 @@ def __init__(
try_number: int,
map_index: int,
state: str,
+ executor: str | None,
executor_config: Any,
pool: str,
queue: str,
@@ -3691,6 +3696,7 @@ def __init__(
self.end_date = end_date
self.try_number = try_number
self.state = state
+ self.executor = executor
self.executor_config = executor_config
self.run_as_user = run_as_user
self.pool = pool
@@ -3729,6 +3735,7 @@ def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
end_date=ti.end_date,
try_number=ti.try_number,
state=ti.state,
+ executor=ti.executor,
executor_config=ti.executor_config,
pool=ti.pool,
queue=ti.queue,
diff --git a/airflow/serialization/pydantic/taskinstance.py b/airflow/serialization/pydantic/taskinstance.py
index 16c44860998fa..cf27d755b5ed9 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -99,6 +99,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
queued_dttm: Optional[datetime]
queued_by_job_id: Optional[int]
pid: Optional[int]
+ executor: Optional[str]
executor_config: Any
updated_at: Optional[datetime]
rendered_map_index: Optional[str]
diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json
index a2a6732763641..85631e0965078 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -261,6 +261,7 @@
"params": { "$ref": "#/definitions/params_dict" },
"priority_weight": { "type": "number" },
"weight_rule": { "type": "string" },
+ "executor": { "type": "string" },
"executor_config": { "$ref": "#/definitions/dict" },
"do_xcom_push": { "type": "boolean" },
"ui_color": { "$ref": "#/definitions/color" },
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256
index 09f84daea2acb..d1514428254b4 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-2a24225537326f38be5df14e0b7a8dca867122093e0fa932f1a11ac12d1fb11c
\ No newline at end of file
+cccb1a4a3f22027e354cea27bb34996fd45146494cbe6893d938c02c2ddb1a61
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg
index dc32fe0566902..58ea95b0f403f 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -220,99 +220,99 @@
ab_user_role
-
-ab_user_role
-
-id
- [INTEGER]
- NOT NULL
-
-role_id
- [INTEGER]
-
-user_id
- [INTEGER]
+
+ab_user_role
+
+id
+ [INTEGER]
+ NOT NULL
+
+role_id
+ [INTEGER]
+
+user_id
+ [INTEGER]
ab_user--ab_user_role
-
-0..N
-{0,1}
+
+0..N
+{0,1}
dag_run_note
-
-dag_run_note
-
-dag_run_id
- [INTEGER]
- NOT NULL
-
-content
- [VARCHAR(1000)]
-
-created_at
- [TIMESTAMP]
- NOT NULL
-
-updated_at
- [TIMESTAMP]
- NOT NULL
-
-user_id
- [INTEGER]
+
+dag_run_note
+
+dag_run_id
+ [INTEGER]
+ NOT NULL
+
+content
+ [VARCHAR(1000)]
+
+created_at
+ [TIMESTAMP]
+ NOT NULL
+
+updated_at
+ [TIMESTAMP]
+ NOT NULL
+
+user_id
+ [INTEGER]
ab_user--dag_run_note
-
-0..N
-{0,1}
+
+0..N
+{0,1}
task_instance_note
-
-task_instance_note
-
-dag_id
- [VARCHAR(250)]
- NOT NULL
-
-map_index
- [INTEGER]
- NOT NULL
-
-run_id
- [VARCHAR(250)]
- NOT NULL
-
-task_id
- [VARCHAR(250)]
- NOT NULL
-
-content
- [VARCHAR(1000)]
-
-created_at
- [TIMESTAMP]
- NOT NULL
-
-updated_at
- [TIMESTAMP]
- NOT NULL
-
-user_id
- [INTEGER]
+
+task_instance_note
+
+dag_id
+ [VARCHAR(250)]
+ NOT NULL
+
+map_index
+ [INTEGER]
+ NOT NULL
+
+run_id
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+ [VARCHAR(250)]
+ NOT NULL
+
+content
+ [VARCHAR(1000)]
+
+created_at
+ [TIMESTAMP]
+ NOT NULL
+
+updated_at
+ [TIMESTAMP]
+ NOT NULL
+
+user_id
+ [INTEGER]
ab_user--task_instance_note
-
-0..N
-{0,1}
+
+0..N
+{0,1}
@@ -940,507 +940,510 @@
dag_run--dag_run_note
-
-1
-1
+
+1
+1
dagrun_dataset_event
-
-dagrun_dataset_event
-
-dag_run_id
- [INTEGER]
- NOT NULL
-
-event_id
- [INTEGER]
- NOT NULL
+
+dagrun_dataset_event
+
+dag_run_id
+ [INTEGER]
+ NOT NULL
+
+event_id
+ [INTEGER]
+ NOT NULL
dag_run--dagrun_dataset_event
-
-1
-1
+
+1
+1
task_instance
-
-task_instance
-
-dag_id
- [VARCHAR(250)]
- NOT NULL
-
-map_index
- [INTEGER]
- NOT NULL
-
-run_id
- [VARCHAR(250)]
- NOT NULL
-
-task_id
- [VARCHAR(250)]
- NOT NULL
-
-custom_operator_name
- [VARCHAR(1000)]
-
-duration
- [DOUBLE_PRECISION]
-
-end_date
- [TIMESTAMP]
-
-executor_config
- [BYTEA]
-
-external_executor_id
- [VARCHAR(250)]
-
-hostname
- [VARCHAR(1000)]
-
-job_id
- [INTEGER]
-
-max_tries
- [INTEGER]
-
-next_kwargs
- [JSON]
-
-next_method
- [VARCHAR(1000)]
-
-operator
- [VARCHAR(1000)]
-
-pid
- [INTEGER]
-
-pool
- [VARCHAR(256)]
- NOT NULL
-
-pool_slots
- [INTEGER]
- NOT NULL
-
-priority_weight
- [INTEGER]
-
-queue
- [VARCHAR(256)]
-
-queued_by_job_id
- [INTEGER]
-
-queued_dttm
- [TIMESTAMP]
-
-rendered_map_index
- [VARCHAR(250)]
-
-start_date
- [TIMESTAMP]
-
-state
- [VARCHAR(20)]
-
-task_display_name
- [VARCHAR(2000)]
-
-trigger_id
- [INTEGER]
-
-trigger_timeout
- [TIMESTAMP]
-
-try_number
- [INTEGER]
-
-unixname
- [VARCHAR(1000)]
-
-updated_at
- [TIMESTAMP]
+
+task_instance
+
+dag_id
+ [VARCHAR(250)]
+ NOT NULL
+
+map_index
+ [INTEGER]
+ NOT NULL
+
+run_id
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+ [VARCHAR(250)]
+ NOT NULL
+
+custom_operator_name
+ [VARCHAR(1000)]
+
+duration
+ [DOUBLE_PRECISION]
+
+end_date
+ [TIMESTAMP]
+
+executor
+ [VARCHAR(1000)]
+
+executor_config
+ [BYTEA]
+
+external_executor_id
+ [VARCHAR(250)]
+
+hostname
+ [VARCHAR(1000)]
+
+job_id
+ [INTEGER]
+
+max_tries
+ [INTEGER]
+
+next_kwargs
+ [JSON]
+
+next_method
+ [VARCHAR(1000)]
+
+operator
+ [VARCHAR(1000)]
+
+pid
+ [INTEGER]
+
+pool
+ [VARCHAR(256)]
+ NOT NULL
+
+pool_slots
+ [INTEGER]
+ NOT NULL
+
+priority_weight
+ [INTEGER]
+
+queue
+ [VARCHAR(256)]
+
+queued_by_job_id
+ [INTEGER]
+
+queued_dttm
+ [TIMESTAMP]
+
+rendered_map_index
+ [VARCHAR(250)]
+
+start_date
+ [TIMESTAMP]
+
+state
+ [VARCHAR(20)]
+
+task_display_name
+ [VARCHAR(2000)]
+
+trigger_id
+ [INTEGER]
+
+trigger_timeout
+ [TIMESTAMP]
+
+try_number
+ [INTEGER]
+
+unixname
+ [VARCHAR(1000)]
+
+updated_at
+ [TIMESTAMP]
dag_run--task_instance
-
-1
-1
+
+1
+1
dag_run--task_instance
-
-1
-1
+
+1
+1
task_reschedule
-
-task_reschedule
-
-id
- [INTEGER]
- NOT NULL
-
-dag_id
- [VARCHAR(250)]
- NOT NULL
-
-duration
- [INTEGER]
- NOT NULL
-
-end_date
- [TIMESTAMP]
- NOT NULL
-
-map_index
- [INTEGER]
- NOT NULL
-
-reschedule_date
- [TIMESTAMP]
- NOT NULL
-
-run_id
- [VARCHAR(250)]
- NOT NULL
-
-start_date
- [TIMESTAMP]
- NOT NULL
-
-task_id
- [VARCHAR(250)]
- NOT NULL
-
-try_number
- [INTEGER]
- NOT NULL
+
+task_reschedule
+
+id
+ [INTEGER]
+ NOT NULL
+
+dag_id
+ [VARCHAR(250)]
+ NOT NULL
+
+duration
+ [INTEGER]
+ NOT NULL
+
+end_date
+ [TIMESTAMP]
+ NOT NULL
+
+map_index
+ [INTEGER]
+ NOT NULL
+
+reschedule_date
+ [TIMESTAMP]
+ NOT NULL
+
+run_id
+ [VARCHAR(250)]
+ NOT NULL
+
+start_date
+ [TIMESTAMP]
+ NOT NULL
+
+task_id
+ [VARCHAR(250)]
+ NOT NULL
+
+try_number
+ [INTEGER]
+ NOT NULL
dag_run--task_reschedule
-
-0..N
-1
+
+0..N
+1
dag_run--task_reschedule
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_note
-
-1
-1
+
+1
+1
task_instance--task_instance_note
-
-1
-1
+
+1
+1
task_instance--task_instance_note
-
-1
-1
+
+1
+1
task_instance--task_instance_note
-
-1
-1
+
+1
+1
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
task_fail
-
-task_fail
-
-id
- [INTEGER]
- NOT NULL
-
-dag_id
- [VARCHAR(250)]
- NOT NULL
-
-duration
- [INTEGER]
-
-end_date
- [TIMESTAMP]
-
-map_index
- [INTEGER]
- NOT NULL
-
-run_id
- [VARCHAR(250)]
- NOT NULL
-
-start_date
- [TIMESTAMP]
-
-task_id
- [VARCHAR(250)]
- NOT NULL
+
+task_fail
+
+id
+ [INTEGER]
+ NOT NULL
+
+dag_id
+ [VARCHAR(250)]
+ NOT NULL
+
+duration
+ [INTEGER]
+
+end_date
+ [TIMESTAMP]
+
+map_index
+ [INTEGER]
+ NOT NULL
+
+run_id
+ [VARCHAR(250)]
+ NOT NULL
+
+start_date
+ [TIMESTAMP]
+
+task_id
+ [VARCHAR(250)]
+ NOT NULL
task_instance--task_fail
-
-0..N
-1
+
+0..N
+1
task_instance--task_fail
-
-0..N
-1
+
+0..N
+1
task_instance--task_fail
-
-0..N
-1
+
+0..N
+1
task_instance--task_fail
-
-0..N
-1
+
+0..N
+1
task_map
-
-task_map
-
-dag_id
- [VARCHAR(250)]
- NOT NULL
-
-map_index
- [INTEGER]
- NOT NULL
-
-run_id
- [VARCHAR(250)]
- NOT NULL
-
-task_id
- [VARCHAR(250)]
- NOT NULL
-
-keys
- [JSON]
-
-length
- [INTEGER]
- NOT NULL
+
+task_map
+
+dag_id
+ [VARCHAR(250)]
+ NOT NULL
+
+map_index
+ [INTEGER]
+ NOT NULL
+
+run_id
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+ [VARCHAR(250)]
+ NOT NULL
+
+keys
+ [JSON]
+
+length
+ [INTEGER]
+ NOT NULL
task_instance--task_map
-
-1
-1
+
+1
+1
task_instance--task_map
-
-1
-1
+
+1
+1
task_instance--task_map
-
-1
-1
+
+1
+1
task_instance--task_map
-
-1
-1
+
+1
+1
xcom
-
-xcom
-
-dag_run_id
- [INTEGER]
- NOT NULL
-
-key
- [VARCHAR(512)]
- NOT NULL
-
-map_index
- [INTEGER]
- NOT NULL
-
-task_id
- [VARCHAR(250)]
- NOT NULL
-
-dag_id
- [VARCHAR(250)]
- NOT NULL
-
-run_id
- [VARCHAR(250)]
- NOT NULL
-
-timestamp
- [TIMESTAMP]
- NOT NULL
-
-value
- [BYTEA]
+
+xcom
+
+dag_run_id
+ [INTEGER]
+ NOT NULL
+
+key
+ [VARCHAR(512)]
+ NOT NULL
+
+map_index
+ [INTEGER]
+ NOT NULL
+
+task_id
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_id
+ [VARCHAR(250)]
+ NOT NULL
+
+run_id
+ [VARCHAR(250)]
+ NOT NULL
+
+timestamp
+ [TIMESTAMP]
+ NOT NULL
+
+value
+ [BYTEA]
task_instance--xcom
-
-1
-1
+
+0..N
+1
task_instance--xcom
-
-0..N
-1
+
+1
+1
task_instance--xcom
-
-1
-1
+
+0..N
+1
task_instance--xcom
-
-0..N
-1
+
+1
+1
rendered_task_instance_fields
-
-rendered_task_instance_fields
-
-dag_id
- [VARCHAR(250)]
- NOT NULL
-
-map_index
- [INTEGER]
- NOT NULL
-
-run_id
- [VARCHAR(250)]
- NOT NULL
-
-task_id
- [VARCHAR(250)]
- NOT NULL
-
-k8s_pod_yaml
- [JSON]
-
-rendered_fields
- [JSON]
- NOT NULL
+
+rendered_task_instance_fields
+
+dag_id
+ [VARCHAR(250)]
+ NOT NULL
+
+map_index
+ [INTEGER]
+ NOT NULL
+
+run_id
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+ [VARCHAR(250)]
+ NOT NULL
+
+k8s_pod_yaml
+ [JSON]
+
+rendered_fields
+ [JSON]
+ NOT NULL
task_instance--rendered_task_instance_fields
-
-1
-1
+
+1
+1
task_instance--rendered_task_instance_fields
-
-1
-1
+
+1
+1
task_instance--rendered_task_instance_fields
-
-1
-1
+
+1
+1
task_instance--rendered_task_instance_fields
-
-1
-1
+
+1
+1
@@ -1482,25 +1485,25 @@
ab_permission_view_role
-
-ab_permission_view_role
-
-id
- [INTEGER]
- NOT NULL
-
-permission_view_id
- [INTEGER]
-
-role_id
- [INTEGER]
+
+ab_permission_view_role
+
+id
+ [INTEGER]
+ NOT NULL
+
+permission_view_id
+ [INTEGER]
+
+role_id
+ [INTEGER]
ab_permission_view--ab_permission_view_role
-
-0..N
-{0,1}
+
+0..N
+{0,1}
@@ -1540,16 +1543,16 @@
ab_role--ab_user_role
-
-0..N
-{0,1}
+
+0..N
+{0,1}
ab_role--ab_permission_view_role
-
-0..N
-{0,1}
+
+0..N
+{0,1}
@@ -1588,9 +1591,9 @@
dataset_event--dagrun_dataset_event
-
-1
-1
+
+1
+1
@@ -1620,9 +1623,9 @@
trigger--task_instance
-
-0..N
-{0,1}
+
+0..N
+{0,1}
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 13c70abe9d64f..ac509b0c03b49 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
-| ``1949afb29106`` (head) | ``ee1467d4aa35`` | ``2.9.0`` | update trigger kwargs type |
+| ``677fdbb7fc54`` (head) | ``1949afb29106`` | ``2.10.0`` | add new executor field to db |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``1949afb29106`` | ``ee1467d4aa35`` | ``2.9.0`` | update trigger kwargs type |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``ee1467d4aa35`` | ``b4078ac230a1`` | ``2.9.0`` | add display name for dag and task instance |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index e6187311429c9..8dacc839cb8a3 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -3315,6 +3315,7 @@ def test_refresh_from_db(self, create_task_instance):
"rendered_map_index": None,
"queued_by_job_id": 321,
"pid": 123,
+ "executor": "some_executor",
"executor_config": {"Some": {"extra": "information"}},
"external_executor_id": "some_executor_id",
"trigger_timeout": None,
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 4b5632cd20f8a..b02cda5e3be37 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -1266,6 +1266,7 @@ def test_no_new_fields_added_to_base_operator(self):
"email_on_failure": True,
"email_on_retry": True,
"execution_timeout": None,
+ "executor": None,
"executor_config": {},
"ignore_first_depends_on_past": True,
"inlets": [],
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 90cc5bbc1c67a..bc7ce29cec73f 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -1043,6 +1043,7 @@ def test_task_instances(admin_client):
"duration": None,
"end_date": None,
"execution_date": DEFAULT_DATE.isoformat(),
+ "executor": None,
"executor_config": {},
"external_executor_id": None,
"hostname": "",
@@ -1077,6 +1078,7 @@ def test_task_instances(admin_client):
"duration": None,
"end_date": None,
"execution_date": DEFAULT_DATE.isoformat(),
+ "executor": None,
"executor_config": {},
"external_executor_id": None,
"hostname": "",
@@ -1111,6 +1113,7 @@ def test_task_instances(admin_client):
"duration": None,
"end_date": None,
"execution_date": DEFAULT_DATE.isoformat(),
+ "executor": None,
"executor_config": {},
"external_executor_id": None,
"hostname": "",
@@ -1145,6 +1148,7 @@ def test_task_instances(admin_client):
"duration": None,
"end_date": None,
"execution_date": DEFAULT_DATE.isoformat(),
+ "executor": None,
"executor_config": {},
"external_executor_id": None,
"hostname": "",
@@ -1179,6 +1183,7 @@ def test_task_instances(admin_client):
"duration": None,
"end_date": None,
"execution_date": DEFAULT_DATE.isoformat(),
+ "executor": None,
"executor_config": {},
"external_executor_id": None,
"hostname": "",
@@ -1213,6 +1218,7 @@ def test_task_instances(admin_client):
"duration": None,
"end_date": None,
"execution_date": DEFAULT_DATE.isoformat(),
+ "executor": None,
"executor_config": {},
"external_executor_id": None,
"hostname": "",
@@ -1247,6 +1253,7 @@ def test_task_instances(admin_client):
"duration": None,
"end_date": None,
"execution_date": DEFAULT_DATE.isoformat(),
+ "executor": None,
"executor_config": {},
"external_executor_id": None,
"hostname": "",