Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
cdeb817
feat(airflowctl): add task instance management support
Suraj-kumar00 Mar 18, 2026
326c963
Fix ruff formatting and remove pointless try/except blocks in TaskIns…
Suraj-kumar00 Apr 1, 2026
eb51ad6
Fix PR review comments on Task Instance operations
Suraj-kumar00 Apr 7, 2026
f265e40
Fix test assertion and client property usage
Suraj-kumar00 Apr 7, 2026
f9d7127
Add task instance CLI commands (get, list, clear, update) to airflowctl
Suraj-kumar00 May 6, 2026
f062780
fix: mypy ci failure error added _list = list
Suraj-kumar00 May 10, 2026
dcaa410
fix: correct list field CLI parsing, taskinstance update output, and …
Suraj-kumar00 Jun 3, 2026
2a18fa3
add taskinstance help_texts
Suraj-kumar00 Jun 3, 2026
d80a10e
Fix taskinstance and xcom CLI integration test commands to use positi…
Suraj-kumar00 Jun 3, 2026
18ab353
Rename TaskInstanceOperations to TasksOperations, align CLI with Airf…
Suraj-kumar00 Jun 3, 2026
99f767f
Revert unrelated test_pool_command.py assertion changes
Suraj-kumar00 Jun 20, 2026
51287e8
Use --new-state=failed in tasks update integration test
Suraj-kumar00 Jun 20, 2026
ccc0ea4
Fix body bool fields overriding API defaults with explicit False
Suraj-kumar00 Jun 21, 2026
239572a
Regenerate airflowctl help SVGs for tasks command
Suraj-kumar00 Jun 21, 2026
93cf5cd
Omit unset include_deferred in pools update to avoid sending null
Suraj-kumar00 Jul 1, 2026
5e5b869
Sync assets command hash with upstream main
Suraj-kumar00 Jul 2, 2026
16a59b9
Default include_deferred to False on pools update to satisfy server P…
Suraj-kumar00 Jul 2, 2026
7313c23
Merge branch 'main' into feat/task-instance-cli-support
Suraj-kumar00 Jul 2, 2026
a6558b4
Merge branch 'main' into feat/task-instance-cli-support
Suraj-kumar00 Jul 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ def date_param():
"dags update example_bash_operator --no-is-paused",
# Dag Run commands
"dagrun list --dag-id example_bash_operator --state success --limit=1",
# XCom commands - need a Dag run with completed tasks
# Tasks commands
'tasks list example_bash_operator "manual__{date_param}"',
'tasks get example_bash_operator "manual__{date_param}" runme_0',
"tasks clear example_bash_operator --dry-run",
# runme_0 completes as "success" once the triggered run finishes, so updating it
# to "success" is rejected with 409 "already in success state". Use "failed" to
# exercise a real state transition (valid states: success, failed, skipped).
'tasks update example_bash_operator "manual__{date_param}" runme_0 --new-state=failed',
# XCom commands - need a DAG run with completed tasks
'xcom add example_bash_operator "manual__{date_param}" runme_0 {xcom_key} \'{{"test": "value"}}\'',
'xcom get example_bash_operator "manual__{date_param}" runme_0 {xcom_key}',
'xcom list example_bash_operator "manual__{date_param}" runme_0',
Expand Down
3 changes: 2 additions & 1 deletion airflow-ctl/docs/images/command_hashes.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
main:27a22c00dcf32e7a1a4f06672dc8e3c8
main:164bc97843d5be583c0b48f7a34dc8c8
assets:6419e20452692f577c4c6f570b74be0c
auth:d79e9c7d00c432bdbcbc2a86e2e32053
backfill:74c8737b0a62a86ed3605fa9e6165874
Expand All @@ -12,4 +12,5 @@ providers:34502fe09dc0b8b0a13e7e46efdffda6
variables:f8fc76d3d398b2780f4e97f7cd816646
version:31f4efdf8de0dbaaa4fac71ff7efecc3
plugins:4864fd8f356704bd2b3cd1aec3567e35
tasks:7ab24cac521242b6b6012e2bcd317831
auth login:9fe2bb1dd5c602beea2eefb33a2b20a8
134 changes: 69 additions & 65 deletions airflow-ctl/docs/images/output_main.svg
Comment thread
Suraj-kumar00 marked this conversation as resolved.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
117 changes: 117 additions & 0 deletions airflow-ctl/docs/images/output_tasks.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions airflow-ctl/src/airflowctl/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
PoolsOperations,
ProvidersOperations,
ServerResponseError,
TasksOperations,
VariablesOperations,
VersionOperations,
XComOperations,
Expand Down Expand Up @@ -474,6 +475,12 @@ def plugins(self):
"""Operations related to plugins."""
return PluginsOperations(self)

@lru_cache() # type: ignore[prop-decorator]
@property
def tasks(self):
"""Operations related to tasks."""
return TasksOperations(self)


# API Client Decorator for CLI Actions
@contextlib.contextmanager
Expand Down
60 changes: 56 additions & 4 deletions airflow-ctl/src/airflowctl/api/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
BulkBodyPoolBody,
BulkBodyVariableBody,
BulkResponse,
ClearTaskInstancesBody,
Config,
ConnectionBody,
ConnectionCollectionResponse,
Expand All @@ -59,6 +60,7 @@
ImportErrorCollectionResponse,
ImportErrorResponse,
JobCollectionResponse,
PatchTaskInstanceBody,
PluginCollectionResponse,
PluginImportErrorCollectionResponse,
PoolBody,
Expand All @@ -68,6 +70,8 @@
ProviderCollectionResponse,
QueuedEventCollectionResponse,
QueuedEventResponse,
TaskInstanceCollectionResponse,
TaskInstanceResponse,
TriggerDAGRunPostBody,
VariableBody,
VariableCollectionResponse,
Expand Down Expand Up @@ -733,10 +737,16 @@ def delete(self, pool: str) -> str | ServerResponseError:

def update(self, pool_body: PoolPatchBody) -> PoolResponse | ServerResponseError:
"""Update a pool."""
try:
self.response = self.client.patch(
f"pools/{pool_body.pool}", json=pool_body.model_dump(mode="json")
)
# Workaround: the server's PATCH handler validates the partial body
# against ``BasePool`` (see airflow-core/.../services/public/pools.py)
# which requires ``include_deferred``. Omitting it fails with
# "Field required", sending ``null`` fails with "bool_type". Always
# send ``include_deferred`` (defaulting to False when unset) so PATCH
# requests are accepted until the server switches to a partial validator.
body = pool_body.model_dump(mode="json", exclude_none=True)
body.setdefault("include_deferred", False)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a regression test for updating only slots on a pool with include_deferred=True? I think this might accidentally send include_deferred=false and flip the existing setting.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if the pool currently has include_deferred=True and I use airflowctl to set slots=10, this would also send include_deferred=False and may update that setting unintentionally.

try:
self.response = self.client.patch(f"pools/{pool_body.pool}", json=body)
return PoolResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
Expand Down Expand Up @@ -950,3 +960,45 @@ def list_import_errors(self) -> PluginImportErrorCollectionResponse | ServerResp
return PluginImportErrorCollectionResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e


class TasksOperations(BaseOperations):
"""Tasks operations."""

def get(self, dag_id: str, dag_run_id: str, task_id: str) -> TaskInstanceResponse:
"""Get a task instance."""
self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}")
return TaskInstanceResponse.model_validate_json(self.response.content)

def list(self, dag_id: str, dag_run_id: str) -> TaskInstanceCollectionResponse | ServerResponseError:
"""List task instances."""
return super().execute_list(
path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances",
data_model=TaskInstanceCollectionResponse,
)

def clear(
self, dag_id: str, body: ClearTaskInstancesBody
) -> TaskInstanceCollectionResponse | ServerResponseError:
"""Clear task instances."""
self.response = self.client.post(
f"dags/{dag_id}/clearTaskInstances",
json=body.model_dump(mode="json", exclude_unset=True),
)
return TaskInstanceCollectionResponse.model_validate_json(self.response.content)

def update(
self,
dag_id: str,
dag_run_id: str,
task_id: str,
body: PatchTaskInstanceBody,
map_index: int | None = None,
) -> TaskInstanceCollectionResponse:
"""Update task instance state. When map_index is given, only that mapped instance is affected."""
if map_index is not None:
path = f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}"
else:
path = f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}"
self.response = self.client.patch(path, json=body.model_dump(mode="json", exclude_unset=True))
return TaskInstanceCollectionResponse.model_validate_json(self.response.content)
Loading
Loading