From cdeb81770ffbe2a79cebdd808cd3ea9e07eb6417 Mon Sep 17 00:00:00 2001 From: Suraj Date: Wed, 18 Mar 2026 21:26:41 +0530 Subject: [PATCH 01/17] feat(airflowctl): add task instance management support --- .../test_airflowctl_commands.py | 17 +- airflow-ctl/docs/images/command_hashes.txt | 4 +- airflow-ctl/src/airflowctl/api/client.py | 1 + airflow-ctl/src/airflowctl/api/operations.py | 59 +++++ airflow-ctl/src/airflowctl/ctl/cli_config.py | 22 +- .../tests/airflow_ctl/api/test_operations.py | 201 +++++++++++++++++- .../ctl/commands/test_pool_command.py | 8 +- 7 files changed, 296 insertions(+), 16 deletions(-) diff --git a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py index e1cfc665804d9..dddaabd9cc571 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py @@ -94,12 +94,17 @@ def date_param(): "dags update example_bash_operator --no-is-paused", # Dag Run commands "dagrun list --dag-id example_bash_operator --state success --limit=1", - # XCom commands - need a Dag run with completed tasks - 'xcom add example_bash_operator "manual__{date_param}" runme_0 {xcom_key} \'{{"test": "value"}}\'', - 'xcom get example_bash_operator "manual__{date_param}" runme_0 {xcom_key}', - 'xcom list example_bash_operator "manual__{date_param}" runme_0', - 'xcom edit example_bash_operator "manual__{date_param}" runme_0 {xcom_key} \'{{"updated": "value"}}\'', - 'xcom delete example_bash_operator "manual__{date_param}" runme_0 {xcom_key}', + # Task Instance commands + 'taskinstance list --dag-id=example_bash_operator --dag-run-id="manual__{date_param}"', + 'taskinstance get --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0', + "taskinstance clear --dag-id=example_bash_operator --dry-run", + 'taskinstance update --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --new-state=success', + # XCom commands - need a DAG run with completed tasks + 'xcom add --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key} --value=\'{{"test": "value"}}\'', + 'xcom get --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key}', + 'xcom list --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0', + 'xcom edit --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key} --value=\'{{"updated": "value"}}\'', + 'xcom delete --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key}', # Jobs commands "jobs list", # Pools commands diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 3a0962cc4eee6..ea07d9fd1c8c7 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -1,5 +1,5 @@ -main:27a22c00dcf32e7a1a4f06672dc8e3c8 -assets:6419e20452692f577c4c6f570b74be0c +main:df0fbf2487ad50774d706a96d76f5c70 +assets:b3ae2b933e54528bf486ff28e887804d auth:d79e9c7d00c432bdbcbc2a86e2e32053 backfill:74c8737b0a62a86ed3605fa9e6165874 config:a3d936cb15fe3b547bf6c82cf93d923f diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py index b01200fac1c7f..e2877a444d11d 100644 --- a/airflow-ctl/src/airflowctl/api/client.py +++ b/airflow-ctl/src/airflowctl/api/client.py @@ -57,6 +57,7 @@ PoolsOperations, ProvidersOperations, ServerResponseError, + TaskInstanceOperations, VariablesOperations, VersionOperations, XComOperations, diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index d1782afaaaafc..7dce7ed81c8b7 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -39,6 +39,7 @@ BulkBodyPoolBody, BulkBodyVariableBody, BulkResponse, + ClearTaskInstancesBody, Config, ConnectionBody, ConnectionCollectionResponse, @@ -59,6 +60,7 @@ ImportErrorCollectionResponse, ImportErrorResponse, JobCollectionResponse, + PatchTaskInstanceBody, PluginCollectionResponse, PluginImportErrorCollectionResponse, PoolBody, @@ -68,6 +70,8 @@ ProviderCollectionResponse, QueuedEventCollectionResponse, QueuedEventResponse, + TaskInstanceCollectionResponse, + TaskInstanceResponse, TriggerDAGRunPostBody, VariableBody, VariableCollectionResponse, @@ -925,3 +929,58 @@ def list_import_errors(self) -> PluginImportErrorCollectionResponse | ServerResp return PluginImportErrorCollectionResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e + + +class TaskInstanceOperations(BaseOperations): + """Task instance operations.""" + + def get(self, dag_id: str, dag_run_id: str, task_id: str) -> Any: + """Get a task instance.""" + try: + self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") + data = self.response.json() + if isinstance(data, list): + return [TaskInstanceResponse.model_validate(item) for item in data] + if "task_instances" in data: + return TaskInstanceCollectionResponse.model_validate(data) + return TaskInstanceResponse.model_validate(data) + except ServerResponseError as e: + raise e + + def list(self, dag_id: str, dag_run_id: str) -> TaskInstanceCollectionResponse | ServerResponseError: + """List task instances.""" + return super().execute_list( + path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances", + data_model=TaskInstanceCollectionResponse, + ) + + def clear( + self, dag_id: str, body: ClearTaskInstancesBody + ) -> TaskInstanceCollectionResponse | ServerResponseError: + """Clear task instances.""" + try: + self.response = self.client.post( + f"dags/{dag_id}/clearTaskInstances", + json=body.model_dump(mode="json", exclude_unset=True), + ) + return TaskInstanceCollectionResponse.model_validate_json(self.response.content) + except ServerResponseError as e: + raise e + + def update( + self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody + ) -> Any: + """Update a task instance.""" + try: + self.response = self.client.patch( + f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}", + json=body.model_dump(mode="json", exclude_unset=True), + ) + data = self.response.json() + if isinstance(data, list): + return [TaskInstanceResponse.model_validate(item) for item in data] + if "task_instances" in data: + return TaskInstanceCollectionResponse.model_validate(data) + return TaskInstanceResponse.model_validate(data) + except ServerResponseError as e: + raise e diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 11ff4542e01ef..5733a6700d484 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -394,7 +394,17 @@ def __init__(self, file_path: str | Path | None = None): # Exclude parameters that are not needed for CLI from datamodels self.excluded_parameters = ["schema_"] # This list is used to determine if the command/operation needs to output data - self.output_command_list = ["list", "get", "create", "delete", "update", "trigger", "add", "edit"] + self.output_command_list = [ + "list", + "get", + "create", + "delete", + "update", + "trigger", + "add", + "edit", + "clear", + ] self.exclude_operation_names = ["LoginOperations", "VersionOperations", "BaseOperations"] self.exclude_method_names = [ "error", @@ -712,10 +722,16 @@ def _get_func(args: Namespace, api_operation: dict, api_client: Client = NEW_API datamodel_param_name = parameter_key if expanded_parameter in self.excluded_parameters: continue - if expanded_parameter in args_dict.keys(): + if expanded_parameter in args_dict.keys() and args_dict[expanded_parameter] is not None: + val = args_dict[expanded_parameter] + # Automatically convert comma-separated strings to lists if the field expects a list + field_annotation = str(datamodel.model_fields[expanded_parameter].annotation).lower() + if "list" in field_annotation and isinstance(val, str): + val = [v.strip() for v in val.split(",") if v.strip()] + method_params[parameter_key][ self._sanitize_method_param_key(expanded_parameter) - ] = args_dict[expanded_parameter] + ] = val if datamodel: if datamodel_param_name: diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 74bc68ed05e65..7603bc6e30c70 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -48,6 +48,7 @@ BulkCreateActionPoolBody, BulkCreateActionVariableBody, BulkResponse, + ClearTaskInstancesBody, Config, ConfigOption, ConfigSection, @@ -79,6 +80,7 @@ ImportErrorResponse, JobCollectionResponse, JobResponse, + PatchTaskInstanceBody, PluginCollectionResponse, PluginImportErrorCollectionResponse, PluginImportErrorResponse, @@ -91,6 +93,9 @@ QueuedEventCollectionResponse, QueuedEventResponse, ReprocessBehavior, + TaskInstanceCollectionResponse, + TaskInstanceResponse, + TaskInstanceState, TriggerDAGRunPostBody, VariableBody, VariableCollectionResponse, @@ -1933,10 +1938,10 @@ class TestPluginsOperations: ) def test_list(self): - """Test listing plugins""" + """Test listing plugins.""" def handle_request(request: httpx.Request) -> httpx.Response: - assert request.url.path == ("/api/v2/plugins") + assert request.url.path == "/api/v2/plugins" return httpx.Response(200, json=json.loads(self.plugin_collection_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) @@ -1944,7 +1949,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert response == self.plugin_collection_response def test_list_import_errors(self): - """Test listing plugin import errors""" + """Test listing plugin import errors.""" def handle_request(request: httpx.Request) -> httpx.Response: assert request.url.path == "/api/v2/plugins/importErrors" @@ -1955,3 +1960,193 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_api_client(transport=httpx.MockTransport(handle_request)) response = client.plugins.list_import_errors() assert response == self.plugin_import_error_collection_response + + +class TestTaskInstanceOperations: + """Test suite for Task Instance operations.""" + + dag_id: str = "test_dag" + dag_run_id: str = "manual__2025-01-24T00:00:00+00:00" + task_id: str = "test_task" + + task_instance_response = TaskInstanceResponse( + id=uuid.uuid4(), + task_id=task_id, + dag_id=dag_id, + dag_run_id=dag_run_id, + map_index=-1, + logical_date=datetime.datetime(2025, 1, 24, 0, 0, 0), + run_after=datetime.datetime(2025, 1, 24, 0, 0, 0), + start_date=datetime.datetime(2025, 1, 24, 0, 0, 1), + end_date=datetime.datetime(2025, 1, 24, 0, 0, 10), + duration=9.0, + state=TaskInstanceState.SUCCESS, + try_number=1, + max_tries=0, + task_display_name=task_id, + dag_display_name=dag_id, + hostname="hostname", + unixname="airflow", + pool="default_pool", + pool_slots=1, + queue="default", + priority_weight=1, + operator="EmptyOperator", + executor_config="{}", + note=None, + ) + + task_instance_collection_response = TaskInstanceCollectionResponse( + task_instances=[task_instance_response], + total_entries=1, + ) + + def test_get(self): + """Test fetching a single task instance.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == ( + f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}" + ) + return httpx.Response(200, json=json.loads(self.task_instance_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.task_instances.get( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + task_id=self.task_id, + ) + assert response == self.task_instance_response + + def test_list(self): + """Test listing task instances for a DAG run.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == (f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances") + return httpx.Response( + 200, json=json.loads(self.task_instance_collection_response.model_dump_json()) + ) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.task_instances.list( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + ) + assert response == self.task_instance_collection_response + + def test_clear(self): + """Test clearing task instances with default options.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == f"/api/v2/dags/{self.dag_id}/clearTaskInstances" + request_body = json.loads(request.content) + assert request_body["dry_run"] is True + return httpx.Response( + 200, json=json.loads(self.task_instance_collection_response.model_dump_json()) + ) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + body = ClearTaskInstancesBody(dry_run=True) + response = client.task_instances.clear( + dag_id=self.dag_id, + body=body, + ) + assert response == self.task_instance_collection_response + + def test_clear_with_options(self): + """Test clearing task instances with specific options.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == f"/api/v2/dags/{self.dag_id}/clearTaskInstances" + request_body = json.loads(request.content) + assert request_body["dry_run"] is False + assert request_body["only_failed"] is True + assert request_body["task_ids"] == [self.task_id] + assert request_body["dag_run_id"] == self.dag_run_id + return httpx.Response( + 200, json=json.loads(self.task_instance_collection_response.model_dump_json()) + ) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + body = ClearTaskInstancesBody( + dry_run=False, + only_failed=True, + task_ids=[self.task_id], + dag_run_id=self.dag_run_id, + ) + response = client.task_instances.clear( + dag_id=self.dag_id, + body=body, + ) + assert response == self.task_instance_collection_response + + def test_update(self): + """Test updating a task instance state — API always returns a collection.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == ( + f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}" + ) + request_body = json.loads(request.content) + assert request_body["new_state"] == TaskInstanceState.FAILED.value + return httpx.Response( + 200, json=json.loads(self.task_instance_collection_response.model_dump_json()) + ) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + body = PatchTaskInstanceBody(new_state=TaskInstanceState.FAILED) + response = client.task_instances.update( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + task_id=self.task_id, + body=body, + ) + assert response == self.task_instance_collection_response + + def test_update_with_note(self): + """Test updating a task instance with a note only (no new_state).""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == ( + f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}" + ) + request_body = json.loads(request.content) + assert request_body["note"] == "Manually marked as success" + assert "new_state" not in request_body + return httpx.Response( + 200, json=json.loads(self.task_instance_collection_response.model_dump_json()) + ) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + body = PatchTaskInstanceBody(note="Manually marked as success") + response = client.task_instances.update( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + task_id=self.task_id, + body=body, + ) + assert response == self.task_instance_collection_response + + def test_update_with_map_index(self): + """Test that map_index routes to the indexed endpoint, scoping the update to one instance.""" + map_index = 0 + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == ( + f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}" + f"/taskInstances/{self.task_id}/{map_index}" + ) + return httpx.Response( + 200, json=json.loads(self.task_instance_collection_response.model_dump_json()) + ) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + body = PatchTaskInstanceBody(new_state=TaskInstanceState.SUCCESS) + response = client.task_instances.update( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + task_id=self.task_id, + body=body, + map_index=map_index, + ) + assert response == self.task_instance_collection_response diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py index 0bc2438929454..153297a5dad54 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py @@ -106,7 +106,7 @@ def test_import_success(self, mock_client, tmp_path, capsys): # Update the assertion to match the actual output format captured = capsys.readouterr() - assert str(["test_pool"]) in captured.out + assert "'test_pool'" in captured.out @pytest.mark.parametrize( ("action_on_existing_key", "expected_enum"), @@ -175,7 +175,11 @@ def test_export_json_to_file(self, mock_client, tmp_path, capsys): # Verify output message captured = capsys.readouterr() expected_output = f"Exported {len(exported_data)} pool(s) to {export_file}" - assert expected_output in captured.out.replace("\n", "") + out_str = captured.out.replace("\n", "") + # The output contains rich ANSI codes, so we check for key substrings instead + assert "Exported" in out_str + assert str(len(exported_data)) in out_str + assert "pool" in out_str @pytest.mark.parametrize("output_format", ["table", "yaml", "plain"]) def test_export_non_json_uses_airflow_console(self, mock_client, tmp_path, output_format): From 326c9639bc0c658002c2d5573039cf0798798e46 Mon Sep 17 00:00:00 2001 From: Suraj Date: Thu, 2 Apr 2026 00:07:41 +0530 Subject: [PATCH 02/17] Fix ruff formatting and remove pointless try/except blocks in TaskInstanceOperations - Remove bare re-raise try/except from get, clear, and update methods - Fix ruff E302: add missing blank line before TaskInstanceOperations class - Fix ruff F841: remove unused variable in pool command export test - Run ruff format on operations.py, cli_config.py, test_operations.py --- airflow-ctl/src/airflowctl/api/operations.py | 58 ++++++++----------- airflow-ctl/src/airflowctl/ctl/cli_config.py | 9 ++- .../tests/airflow_ctl/api/test_operations.py | 1 + .../ctl/commands/test_pool_command.py | 1 - 4 files changed, 32 insertions(+), 37 deletions(-) diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index 7dce7ed81c8b7..faf14ed9fe5f1 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -931,21 +931,19 @@ def list_import_errors(self) -> PluginImportErrorCollectionResponse | ServerResp raise e + class TaskInstanceOperations(BaseOperations): """Task instance operations.""" def get(self, dag_id: str, dag_run_id: str, task_id: str) -> Any: """Get a task instance.""" - try: - self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") - data = self.response.json() - if isinstance(data, list): - return [TaskInstanceResponse.model_validate(item) for item in data] - if "task_instances" in data: - return TaskInstanceCollectionResponse.model_validate(data) - return TaskInstanceResponse.model_validate(data) - except ServerResponseError as e: - raise e + self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") + data = self.response.json() + if isinstance(data, list): + return [TaskInstanceResponse.model_validate(item) for item in data] + if "task_instances" in data: + return TaskInstanceCollectionResponse.model_validate(data) + return TaskInstanceResponse.model_validate(data) def list(self, dag_id: str, dag_run_id: str) -> TaskInstanceCollectionResponse | ServerResponseError: """List task instances.""" @@ -958,29 +956,21 @@ def clear( self, dag_id: str, body: ClearTaskInstancesBody ) -> TaskInstanceCollectionResponse | ServerResponseError: """Clear task instances.""" - try: - self.response = self.client.post( - f"dags/{dag_id}/clearTaskInstances", - json=body.model_dump(mode="json", exclude_unset=True), - ) - return TaskInstanceCollectionResponse.model_validate_json(self.response.content) - except ServerResponseError as e: - raise e + self.response = self.client.post( + f"dags/{dag_id}/clearTaskInstances", + json=body.model_dump(mode="json", exclude_unset=True), + ) + return TaskInstanceCollectionResponse.model_validate_json(self.response.content) - def update( - self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody - ) -> Any: + def update(self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody) -> Any: """Update a task instance.""" - try: - self.response = self.client.patch( - f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}", - json=body.model_dump(mode="json", exclude_unset=True), - ) - data = self.response.json() - if isinstance(data, list): - return [TaskInstanceResponse.model_validate(item) for item in data] - if "task_instances" in data: - return TaskInstanceCollectionResponse.model_validate(data) - return TaskInstanceResponse.model_validate(data) - except ServerResponseError as e: - raise e + self.response = self.client.patch( + f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}", + json=body.model_dump(mode="json", exclude_unset=True), + ) + data = self.response.json() + if isinstance(data, list): + return [TaskInstanceResponse.model_validate(item) for item in data] + if "task_instances" in data: + return TaskInstanceCollectionResponse.model_validate(data) + return TaskInstanceResponse.model_validate(data) diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 5733a6700d484..6002805d57c43 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -722,10 +722,15 @@ def _get_func(args: Namespace, api_operation: dict, api_client: Client = NEW_API datamodel_param_name = parameter_key if expanded_parameter in self.excluded_parameters: continue - if expanded_parameter in args_dict.keys() and args_dict[expanded_parameter] is not None: + if ( + expanded_parameter in args_dict.keys() + and args_dict[expanded_parameter] is not None + ): val = args_dict[expanded_parameter] # Automatically convert comma-separated strings to lists if the field expects a list - field_annotation = str(datamodel.model_fields[expanded_parameter].annotation).lower() + field_annotation = str( + datamodel.model_fields[expanded_parameter].annotation + ).lower() if "list" in field_annotation and isinstance(val, str): val = [v.strip() for v in val.split(",") if v.strip()] diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 7603bc6e30c70..5dffd5dc13c73 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -1962,6 +1962,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert response == self.plugin_import_error_collection_response + class TestTaskInstanceOperations: """Test suite for Task Instance operations.""" diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py index 153297a5dad54..0716991432b0e 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py @@ -174,7 +174,6 @@ def test_export_json_to_file(self, mock_client, tmp_path, capsys): # Verify output message captured = capsys.readouterr() - expected_output = f"Exported {len(exported_data)} pool(s) to {export_file}" out_str = captured.out.replace("\n", "") # The output contains rich ANSI codes, so we check for key substrings instead assert "Exported" in out_str From eb51ad6551b6c5faa7918a4273860a3daa3a75f6 Mon Sep 17 00:00:00 2001 From: Suraj Date: Wed, 8 Apr 2026 00:28:38 +0530 Subject: [PATCH 03/17] Fix PR review comments on Task Instance operations --- airflow-ctl/src/airflowctl/api/client.py | 6 ++++ airflow-ctl/src/airflowctl/api/operations.py | 26 ++++++++------ airflow-ctl/src/airflowctl/ctl/cli_config.py | 13 ++++--- .../tests/airflow_ctl/api/test_operations.py | 34 +++++++++++++++++++ .../ctl/commands/test_pool_command.py | 10 +++--- 5 files changed, 66 insertions(+), 23 deletions(-) diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py index e2877a444d11d..f9a2e0c427fe7 100644 --- a/airflow-ctl/src/airflowctl/api/client.py +++ b/airflow-ctl/src/airflowctl/api/client.py @@ -474,6 +474,12 @@ def plugins(self): """Operations related to plugins.""" return PluginsOperations(self) + @lru_cache() # type: ignore[prop-decorator] + @property + def task_instances(self): + """Operations related to task instances.""" + return TaskInstanceOperations(self) + # API Client Decorator for CLI Actions @contextlib.contextmanager diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index faf14ed9fe5f1..4e0ebe1a0d64a 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -935,16 +935,23 @@ def list_import_errors(self) -> PluginImportErrorCollectionResponse | ServerResp class TaskInstanceOperations(BaseOperations): """Task instance operations.""" - def get(self, dag_id: str, dag_run_id: str, task_id: str) -> Any: - """Get a task instance.""" - self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") - data = self.response.json() + def _parse_task_instance_response( + self, data: dict | list + ) -> TaskInstanceResponse | list[TaskInstanceResponse] | TaskInstanceCollectionResponse: + """Parse task instance response data into appropriate models.""" if isinstance(data, list): return [TaskInstanceResponse.model_validate(item) for item in data] if "task_instances" in data: return TaskInstanceCollectionResponse.model_validate(data) return TaskInstanceResponse.model_validate(data) + def get( + self, dag_id: str, dag_run_id: str, task_id: str + ) -> TaskInstanceResponse | list[TaskInstanceResponse] | TaskInstanceCollectionResponse: + """Get a task instance.""" + self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") + return self._parse_task_instance_response(self.response.json()) + def list(self, dag_id: str, dag_run_id: str) -> TaskInstanceCollectionResponse | ServerResponseError: """List task instances.""" return super().execute_list( @@ -962,15 +969,12 @@ def clear( ) return TaskInstanceCollectionResponse.model_validate_json(self.response.content) - def update(self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody) -> Any: + def update( + self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody + ) -> TaskInstanceResponse | list[TaskInstanceResponse] | TaskInstanceCollectionResponse: """Update a task instance.""" self.response = self.client.patch( f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}", json=body.model_dump(mode="json", exclude_unset=True), ) - data = self.response.json() - if isinstance(data, list): - return [TaskInstanceResponse.model_validate(item) for item in data] - if "task_instances" in data: - return TaskInstanceCollectionResponse.model_validate(data) - return TaskInstanceResponse.model_validate(data) + return self._parse_task_instance_response(self.response.json()) diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 6002805d57c43..432c2eea5d571 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -727,13 +727,12 @@ def _get_func(args: Namespace, api_operation: dict, api_client: Client = NEW_API and args_dict[expanded_parameter] is not None ): val = args_dict[expanded_parameter] - # Automatically convert comma-separated strings to lists if the field expects a list - field_annotation = str( - datamodel.model_fields[expanded_parameter].annotation - ).lower() - if "list" in field_annotation and isinstance(val, str): - val = [v.strip() for v in val.split(",") if v.strip()] - + if isinstance(val, str) and expanded_parameter in datamodel.model_fields: + import typing + annotation = datamodel.model_fields[expanded_parameter].annotation + origin = typing.get_origin(annotation) + if origin is list or getattr(annotation, "__origin__", None) is list: + val = [v.strip() for v in val.split(",") if v.strip()] method_params[parameter_key][ self._sanitize_method_param_key(expanded_parameter) ] = val diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 5dffd5dc13c73..4d5396d8ec24f 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -2019,6 +2019,40 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) assert response == self.task_instance_response + def test_get_list(self): + """Test fetching a task instance that returns a list (e.g. mapped tasks).""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == ( + f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}" + ) + return httpx.Response(200, json=[json.loads(self.task_instance_response.model_dump_json())]) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.task_instances.get( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + task_id=self.task_id, + ) + assert response == [self.task_instance_response] + + def test_get_collection(self): + """Test fetching a task instance that returns a collection.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == ( + f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}" + ) + return httpx.Response(200, json=json.loads(self.task_instance_collection_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.task_instances.get( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + task_id=self.task_id, + ) + assert response == self.task_instance_collection_response + def test_list(self): """Test listing task instances for a DAG run.""" diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py index 0716991432b0e..bbd5d57d7cfa2 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py @@ -174,11 +174,11 @@ def test_export_json_to_file(self, mock_client, tmp_path, capsys): # Verify output message captured = capsys.readouterr() - out_str = captured.out.replace("\n", "") - # The output contains rich ANSI codes, so we check for key substrings instead - assert "Exported" in out_str - assert str(len(exported_data)) in out_str - assert "pool" in out_str + import re + + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + out_str = ansi_escape.sub("", captured.out).strip() + assert out_str == f"Exported {len(exported_data)} pool(s) to {export_file}" @pytest.mark.parametrize("output_format", ["table", "yaml", "plain"]) def test_export_non_json_uses_airflow_console(self, mock_client, tmp_path, output_format): From f265e400a2614477e400b1906c941e24c52efe86 Mon Sep 17 00:00:00 2001 From: Suraj Date: Wed, 8 Apr 2026 00:36:26 +0530 Subject: [PATCH 04/17] Fix test assertion and client property usage --- airflow-ctl/src/airflowctl/ctl/cli_config.py | 2 ++ .../tests/airflow_ctl/api/test_operations.py | 16 +++++++++------- .../ctl/commands/test_pool_command.py | 3 ++- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 432c2eea5d571..b16c25d5ca955 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -413,6 +413,7 @@ def __init__(self, file_path: str | Path | None = None): "_check_flag_and_exit_if_server_response_error", # Excluding bulk operation. Out of scope for CLI. Should use implemented commands. "bulk", + "_parse_task_instance_response", ] self.excluded_output_keys = [ "total_entries", @@ -729,6 +730,7 @@ def _get_func(args: Namespace, api_operation: dict, api_client: Client = NEW_API val = args_dict[expanded_parameter] if isinstance(val, str) and expanded_parameter in datamodel.model_fields: import typing + annotation = datamodel.model_fields[expanded_parameter].annotation origin = typing.get_origin(annotation) if origin is list or getattr(annotation, "__origin__", None) is list: diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 4d5396d8ec24f..247a71471b5bb 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -2012,7 +2012,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: return httpx.Response(200, json=json.loads(self.task_instance_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.task_instances.get( + response = client.tasks.get( dag_id=self.dag_id, dag_run_id=self.dag_run_id, task_id=self.task_id, @@ -2029,7 +2029,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: return httpx.Response(200, json=[json.loads(self.task_instance_response.model_dump_json())]) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.task_instances.get( + response = client.tasks.get( dag_id=self.dag_id, dag_run_id=self.dag_run_id, task_id=self.task_id, @@ -2043,10 +2043,12 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert request.url.path == ( f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}" ) - return httpx.Response(200, json=json.loads(self.task_instance_collection_response.model_dump_json())) + return httpx.Response( + 200, json=json.loads(self.task_instance_collection_response.model_dump_json()) + ) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.task_instances.get( + response = client.tasks.get( dag_id=self.dag_id, dag_run_id=self.dag_run_id, task_id=self.task_id, @@ -2063,7 +2065,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.task_instances.list( + response = client.tasks.list( dag_id=self.dag_id, dag_run_id=self.dag_run_id, ) @@ -2082,7 +2084,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_api_client(transport=httpx.MockTransport(handle_request)) body = ClearTaskInstancesBody(dry_run=True) - response = client.task_instances.clear( + response = client.tasks.clear( dag_id=self.dag_id, body=body, ) @@ -2109,7 +2111,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: task_ids=[self.task_id], dag_run_id=self.dag_run_id, ) - response = client.task_instances.clear( + response = client.tasks.clear( dag_id=self.dag_id, body=body, ) diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py index bbd5d57d7cfa2..edf693b0025b6 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py @@ -177,7 +177,8 @@ def test_export_json_to_file(self, mock_client, tmp_path, capsys): import re ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") - out_str = ansi_escape.sub("", captured.out).strip() + out_str = ansi_escape.sub("", captured.out).replace("\n", "") + # Since rich wraps long lines, newlines are removed, so we assert the combined text. assert out_str == f"Exported {len(exported_data)} pool(s) to {export_file}" @pytest.mark.parametrize("output_format", ["table", "yaml", "plain"]) From f9d7127a1aae56c8a0042feceffd1137a6284fee Mon Sep 17 00:00:00 2001 From: Suraj Date: Wed, 6 May 2026 15:38:59 +0530 Subject: [PATCH 05/17] Add task instance CLI commands (get, list, clear, update) to airflowctl --- airflow-ctl/src/airflowctl/ctl/cli_config.py | 34 ++++++++++++++++--- .../tests/airflow_ctl/api/test_operations.py | 12 +++---- .../ctl/commands/test_pool_command.py | 3 +- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index b16c25d5ca955..fb560039e5dd7 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -26,6 +26,7 @@ import inspect import os import sys +import typing from argparse import Namespace from collections.abc import Callable, Iterable from enum import Enum @@ -52,6 +53,31 @@ BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ +def _is_list_annotation(annotation: Any) -> bool: + """ + Check whether a Pydantic field annotation is a list type. + + Handles ``Annotated[list[...] | None, ...]`` and similar wrapped forms + that ``typing.get_origin`` alone cannot detect. + """ + origin = typing.get_origin(annotation) + + # Direct list[...] + if origin is list: + return True + + # Unwrap Annotated[X, ...] + if origin is typing.Annotated: + inner = typing.get_args(annotation)[0] + return _is_list_annotation(inner) + + # Unwrap Union / X | None + if origin is typing.Union: + return any(_is_list_annotation(arg) for arg in typing.get_args(annotation) if arg is not type(None)) + + return False + + def lazy_load_command(import_path: str) -> Callable: """Create a lazy loader for command.""" _, _, name = import_path.rpartition(".") @@ -729,11 +755,9 @@ def _get_func(args: Namespace, api_operation: dict, api_client: Client = NEW_API ): val = args_dict[expanded_parameter] if isinstance(val, str) and expanded_parameter in datamodel.model_fields: - import typing - - annotation = datamodel.model_fields[expanded_parameter].annotation - origin = typing.get_origin(annotation) - if origin is list or getattr(annotation, "__origin__", None) is list: + if _is_list_annotation( + datamodel.model_fields[expanded_parameter].annotation + ): val = [v.strip() for v in val.split(",") if v.strip()] method_params[parameter_key][ self._sanitize_method_param_key(expanded_parameter) diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 247a71471b5bb..52faceab26014 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -2012,7 +2012,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: return httpx.Response(200, json=json.loads(self.task_instance_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.tasks.get( + response = client.task_instances.get( dag_id=self.dag_id, dag_run_id=self.dag_run_id, task_id=self.task_id, @@ -2029,7 +2029,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: return httpx.Response(200, json=[json.loads(self.task_instance_response.model_dump_json())]) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.tasks.get( + response = client.task_instances.get( dag_id=self.dag_id, dag_run_id=self.dag_run_id, task_id=self.task_id, @@ -2048,7 +2048,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.tasks.get( + response = client.task_instances.get( dag_id=self.dag_id, dag_run_id=self.dag_run_id, task_id=self.task_id, @@ -2065,7 +2065,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.tasks.list( + response = client.task_instances.list( dag_id=self.dag_id, dag_run_id=self.dag_run_id, ) @@ -2084,7 +2084,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_api_client(transport=httpx.MockTransport(handle_request)) body = ClearTaskInstancesBody(dry_run=True) - response = client.tasks.clear( + response = client.task_instances.clear( dag_id=self.dag_id, body=body, ) @@ -2111,7 +2111,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: task_ids=[self.task_id], dag_run_id=self.dag_run_id, ) - response = client.tasks.clear( + response = client.task_instances.clear( dag_id=self.dag_id, body=body, ) diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py index edf693b0025b6..115a53afd5304 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py @@ -19,6 +19,7 @@ from __future__ import annotations import json +import re from unittest import mock import pytest @@ -174,8 +175,6 @@ def test_export_json_to_file(self, mock_client, tmp_path, capsys): # Verify output message captured = capsys.readouterr() - import re - ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") out_str = ansi_escape.sub("", captured.out).replace("\n", "") # Since rich wraps long lines, newlines are removed, so we assert the combined text. From f0627808de08c4c338a7a9cbab330a6a56be7c2d Mon Sep 17 00:00:00 2001 From: Suraj Date: Sun, 10 May 2026 17:23:42 +0530 Subject: [PATCH 06/17] fix: mypy ci failure error added _list = list --- airflow-ctl/src/airflowctl/api/operations.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index 4e0ebe1a0d64a..a2d1ebb573551 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -91,6 +91,10 @@ T = TypeVar("T", bound=BaseModel) +# Type alias used inside classes that define a ``list()`` method, which +# shadows the builtin ``list`` and confuses mypy when used in annotations. +_list = list + # Generic Server Response Error class ServerResponseError(httpx.HTTPStatusError): @@ -936,8 +940,8 @@ class TaskInstanceOperations(BaseOperations): """Task instance operations.""" def _parse_task_instance_response( - self, data: dict | list - ) -> TaskInstanceResponse | list[TaskInstanceResponse] | TaskInstanceCollectionResponse: + self, data: dict | _list + ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: """Parse task instance response data into appropriate models.""" if isinstance(data, list): return [TaskInstanceResponse.model_validate(item) for item in data] @@ -947,7 +951,7 @@ def _parse_task_instance_response( def get( self, dag_id: str, dag_run_id: str, task_id: str - ) -> TaskInstanceResponse | list[TaskInstanceResponse] | TaskInstanceCollectionResponse: + ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: """Get a task instance.""" self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") return self._parse_task_instance_response(self.response.json()) @@ -971,7 +975,7 @@ def clear( def update( self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody - ) -> TaskInstanceResponse | list[TaskInstanceResponse] | TaskInstanceCollectionResponse: + ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: """Update a task instance.""" self.response = self.client.patch( f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}", From dcaa4108667562e19b70478a9aa9191dc4b34fda Mon Sep 17 00:00:00 2001 From: Suraj Date: Wed, 3 Jun 2026 16:11:01 +0530 Subject: [PATCH 07/17] fix: correct list field CLI parsing, taskinstance update output, and task_ids validation --- airflow-ctl/src/airflowctl/api/operations.py | 42 ++++++--------- airflow-ctl/src/airflowctl/ctl/cli_config.py | 54 ++++++++++++------- .../tests/airflow_ctl/api/test_operations.py | 36 ------------- 3 files changed, 52 insertions(+), 80 deletions(-) diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index a2d1ebb573551..0c12d9904b69b 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -91,10 +91,6 @@ T = TypeVar("T", bound=BaseModel) -# Type alias used inside classes that define a ``list()`` method, which -# shadows the builtin ``list`` and confuses mypy when used in annotations. -_list = list - # Generic Server Response Error class ServerResponseError(httpx.HTTPStatusError): @@ -939,22 +935,10 @@ def list_import_errors(self) -> PluginImportErrorCollectionResponse | ServerResp class TaskInstanceOperations(BaseOperations): """Task instance operations.""" - def _parse_task_instance_response( - self, data: dict | _list - ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: - """Parse task instance response data into appropriate models.""" - if isinstance(data, list): - return [TaskInstanceResponse.model_validate(item) for item in data] - if "task_instances" in data: - return TaskInstanceCollectionResponse.model_validate(data) - return TaskInstanceResponse.model_validate(data) - - def get( - self, dag_id: str, dag_run_id: str, task_id: str - ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: + def get(self, dag_id: str, dag_run_id: str, task_id: str) -> TaskInstanceResponse: """Get a task instance.""" self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") - return self._parse_task_instance_response(self.response.json()) + return TaskInstanceResponse.model_validate_json(self.response.content) def list(self, dag_id: str, dag_run_id: str) -> TaskInstanceCollectionResponse | ServerResponseError: """List task instances.""" @@ -974,11 +958,17 @@ def clear( return TaskInstanceCollectionResponse.model_validate_json(self.response.content) def update( - self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody - ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: - """Update a task instance.""" - self.response = self.client.patch( - f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}", - json=body.model_dump(mode="json", exclude_unset=True), - ) - return self._parse_task_instance_response(self.response.json()) + self, + dag_id: str, + dag_run_id: str, + task_id: str, + body: PatchTaskInstanceBody, + map_index: int | None = None, + ) -> TaskInstanceCollectionResponse: + """Update task instance state. When map_index is given, only that mapped instance is affected.""" + if map_index is not None: + path = f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}" + else: + path = f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}" + self.response = self.client.patch(path, json=body.model_dump(mode="json", exclude_unset=True)) + return TaskInstanceCollectionResponse.model_validate_json(self.response.content) diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index fb560039e5dd7..4b84d6f88c1da 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -26,6 +26,7 @@ import inspect import os import sys +import types as builtin_types import typing from argparse import Namespace from collections.abc import Callable, Iterable @@ -54,28 +55,40 @@ def _is_list_annotation(annotation: Any) -> bool: - """ - Check whether a Pydantic field annotation is a list type. - - Handles ``Annotated[list[...] | None, ...]`` and similar wrapped forms - that ``typing.get_origin`` alone cannot detect. - """ + """Check whether a Pydantic field annotation is a list type (including Optional[list[...]]).""" origin = typing.get_origin(annotation) - - # Direct list[...] if origin is list: return True + # Handle both typing.Union (Optional[list[...]]) and PEP-604 X | Y (types.UnionType) + if origin is typing.Union or isinstance(annotation, builtin_types.UnionType): + return any(_is_list_annotation(arg) for arg in typing.get_args(annotation) if arg is not type(None)) + return False - # Unwrap Annotated[X, ...] - if origin is typing.Annotated: - inner = typing.get_args(annotation)[0] - return _is_list_annotation(inner) - # Unwrap Union / X | None - if origin is typing.Union: - return any(_is_list_annotation(arg) for arg in typing.get_args(annotation) if arg is not type(None)) +def _parse_task_ids_cli_arg(value: str) -> list: + """ + Parse the --task-ids CLI string into the format expected by ClearTaskInstancesBody. - return False + Accepts comma-separated entries of two forms: + - ``task_id`` — clears all map indices of that task + - ``task_id:map_index`` — clears only the specific mapped instance + + Example: ``"extract:0,transform"`` → ``[TaskIds(root=["extract", 0]), "transform"]`` + """ + entries: list = [] + for entry in (v.strip() for v in value.split(",") if v.strip()): + if ":" in entry: + task_id, _, map_index_str = entry.rpartition(":") + try: + entries.append(generated_datamodels.TaskIds(root=[task_id, int(map_index_str)])) + except ValueError: + raise ValueError( + f"Invalid --task-ids entry '{entry}': " + f"expected 'task_id:map_index' where map_index is an integer." + ) from None + else: + entries.append(entry) + return entries def lazy_load_command(import_path: str) -> Callable: @@ -439,10 +452,11 @@ def __init__(self, file_path: str | Path | None = None): "_check_flag_and_exit_if_server_response_error", # Excluding bulk operation. Out of scope for CLI. Should use implemented commands. "bulk", - "_parse_task_instance_response", ] self.excluded_output_keys = [ "total_entries", + "next_cursor", + "previous_cursor", ] def _inspect_operations(self) -> None: @@ -755,7 +769,11 @@ def _get_func(args: Namespace, api_operation: dict, api_client: Client = NEW_API ): val = args_dict[expanded_parameter] if isinstance(val, str) and expanded_parameter in datamodel.model_fields: - if _is_list_annotation( + if expanded_parameter == "task_ids": + # task_ids supports nested [task_id, map_index] pairs; + # use the dedicated parser to preserve that structure. + val = _parse_task_ids_cli_arg(val) + elif _is_list_annotation( datamodel.model_fields[expanded_parameter].annotation ): val = [v.strip() for v in val.split(",") if v.strip()] diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 52faceab26014..5dffd5dc13c73 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -2019,42 +2019,6 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) assert response == self.task_instance_response - def test_get_list(self): - """Test fetching a task instance that returns a list (e.g. mapped tasks).""" - - def handle_request(request: httpx.Request) -> httpx.Response: - assert request.url.path == ( - f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}" - ) - return httpx.Response(200, json=[json.loads(self.task_instance_response.model_dump_json())]) - - client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.task_instances.get( - dag_id=self.dag_id, - dag_run_id=self.dag_run_id, - task_id=self.task_id, - ) - assert response == [self.task_instance_response] - - def test_get_collection(self): - """Test fetching a task instance that returns a collection.""" - - def handle_request(request: httpx.Request) -> httpx.Response: - assert request.url.path == ( - f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}" - ) - return httpx.Response( - 200, json=json.loads(self.task_instance_collection_response.model_dump_json()) - ) - - client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.task_instances.get( - dag_id=self.dag_id, - dag_run_id=self.dag_run_id, - task_id=self.task_id, - ) - assert response == self.task_instance_collection_response - def test_list(self): """Test listing task instances for a DAG run.""" From 2a18fa3dc0406fc989056431a6d34ad2e19fc3d0 Mon Sep 17 00:00:00 2001 From: Suraj Date: Wed, 3 Jun 2026 17:08:06 +0530 Subject: [PATCH 08/17] add taskinstance help_texts --- airflow-ctl/docs/images/command_hashes.txt | 4 +- airflow-ctl/docs/images/output_main.svg | 134 +++++++++--------- .../src/airflowctl/ctl/help_texts.yaml | 6 + 3 files changed, 77 insertions(+), 67 deletions(-) diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index ea07d9fd1c8c7..352b3c3007ed6 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -1,5 +1,5 @@ -main:df0fbf2487ad50774d706a96d76f5c70 -assets:b3ae2b933e54528bf486ff28e887804d +main:398879e5076319e2328e5cfbdd9c6bc4 +assets:70619a2d92bda80930cde2aefcd8e1cd auth:d79e9c7d00c432bdbcbc2a86e2e32053 backfill:74c8737b0a62a86ed3605fa9e6165874 config:a3d936cb15fe3b547bf6c82cf93d923f diff --git a/airflow-ctl/docs/images/output_main.svg b/airflow-ctl/docs/images/output_main.svg index f586877bce8eb..84233ca715d16 100644 --- a/airflow-ctl/docs/images/output_main.svg +++ b/airflow-ctl/docs/images/output_main.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + - + - + - - Usage:airflowctl [-hGROUP_OR_COMMAND... - -Positional Arguments: -GROUP_OR_COMMAND - -    Groups -assetsPerform Assets operations -authManage authentication for CLI. Either pass token from -environment variable/parameter or pass username and -password. -backfillPerform Backfill operations -configPerform Config operations -connectionsPerform Connections operations -dagrunPerform DagRun operations -dagsPerform Dags operations -jobsPerform Jobs operations -pluginsPerform Plugins operations -poolsPerform Pools operations -providersPerform Providers operations -variablesPerform Variables operations -xcomPerform XCom operations - -    Commands: -versionShow version information - -Options: --h--helpshow this help message and exit + + Usage:airflowctl [-hGROUP_OR_COMMAND... + +Positional Arguments: +GROUP_OR_COMMAND + +    Groups +assetsPerform Assets operations +authManage authentication for CLI. Either pass token from +environment variable/parameter or pass username and +password. +backfillPerform Backfill operations +configPerform Config operations +connectionsPerform Connections operations +dagrunPerform DagRun operations +dagsPerform Dags operations +jobsPerform Jobs operations +pluginsPerform Plugins operations +poolsPerform Pools operations +providersPerform Providers operations +taskinstancePerform TaskInstance operations +variablesPerform Variables operations +xcomPerform XCom operations + +    Commands: +versionShow version information + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml index 2e3266fc20561..cefce60c1d4f1 100644 --- a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml +++ b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml @@ -100,3 +100,9 @@ xcom: plugins: list: "List all installed Airflow plugins" list-import-errors: "List all plugin import errors" + +taskinstance: + get: "Retrieve a task instance by Dag ID, run ID, and task ID" + list: "List all task instances for a given Dag run" + clear: "Clear task instance state, optionally filtering by task IDs or run scope" + update: "Update task instance state or note; use --map-index to target a single mapped instance" From d80a10e999826f7d04c7ceee312e84ef14acbab8 Mon Sep 17 00:00:00 2001 From: Suraj Date: Thu, 4 Jun 2026 00:25:51 +0530 Subject: [PATCH 09/17] Fix taskinstance and xcom CLI integration test commands to use positional args Required path params (dag_id, dag_run_id, task_id, key, value) are generated as positional CLI arguments by CommandFactory, not optional --flags. Also fix ruff-format: remove extra blank lines before TaskInstanceOperations and TestTaskInstanceOperations class definitions. --- .../test_airflowctl_commands.py | 18 +++++++++--------- airflow-ctl/src/airflowctl/api/operations.py | 1 - .../tests/airflow_ctl/api/test_operations.py | 1 - 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py index dddaabd9cc571..c95aa99c4fd9c 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py @@ -95,16 +95,16 @@ def date_param(): # Dag Run commands "dagrun list --dag-id example_bash_operator --state success --limit=1", # Task Instance commands - 'taskinstance list --dag-id=example_bash_operator --dag-run-id="manual__{date_param}"', - 'taskinstance get --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0', - "taskinstance clear --dag-id=example_bash_operator --dry-run", - 'taskinstance update --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --new-state=success', + 'taskinstance list example_bash_operator "manual__{date_param}"', + 'taskinstance get example_bash_operator "manual__{date_param}" runme_0', + "taskinstance clear example_bash_operator --dry-run", + 'taskinstance update example_bash_operator "manual__{date_param}" runme_0 --new-state=success', # XCom commands - need a DAG run with completed tasks - 'xcom add --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key} --value=\'{{"test": "value"}}\'', - 'xcom get --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key}', - 'xcom list --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0', - 'xcom edit --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key} --value=\'{{"updated": "value"}}\'', - 'xcom delete --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key}', + 'xcom add example_bash_operator "manual__{date_param}" runme_0 {xcom_key} \'{{"test": "value"}}\'', + 'xcom get example_bash_operator "manual__{date_param}" runme_0 {xcom_key}', + 'xcom list example_bash_operator "manual__{date_param}" runme_0', + 'xcom edit example_bash_operator "manual__{date_param}" runme_0 {xcom_key} \'{{"updated": "value"}}\'', + 'xcom delete example_bash_operator "manual__{date_param}" runme_0 {xcom_key}', # Jobs commands "jobs list", # Pools commands diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index 0c12d9904b69b..a126f83c44bc1 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -931,7 +931,6 @@ def list_import_errors(self) -> PluginImportErrorCollectionResponse | ServerResp raise e - class TaskInstanceOperations(BaseOperations): """Task instance operations.""" diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 5dffd5dc13c73..7603bc6e30c70 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -1962,7 +1962,6 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert response == self.plugin_import_error_collection_response - class TestTaskInstanceOperations: """Test suite for Task Instance operations.""" From 18ab353740390c712f84acbcc43554d7d3e88123 Mon Sep 17 00:00:00 2001 From: Suraj Date: Thu, 4 Jun 2026 00:51:48 +0530 Subject: [PATCH 10/17] Rename TaskInstanceOperations to TasksOperations, align CLI with Airflow core --- .../test_airflowctl_commands.py | 10 +++--- airflow-ctl/src/airflowctl/api/client.py | 8 ++--- airflow-ctl/src/airflowctl/api/operations.py | 4 +-- airflow-ctl/src/airflowctl/ctl/cli_config.py | 32 +------------------ .../src/airflowctl/ctl/help_texts.yaml | 2 +- .../tests/airflow_ctl/api/test_operations.py | 18 +++++------ .../run_capture_airflowctl_help.py | 1 + 7 files changed, 23 insertions(+), 52 deletions(-) diff --git a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py index c95aa99c4fd9c..65077ae298b5e 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py @@ -94,11 +94,11 @@ def date_param(): "dags update example_bash_operator --no-is-paused", # Dag Run commands "dagrun list --dag-id example_bash_operator --state success --limit=1", - # Task Instance commands - 'taskinstance list example_bash_operator "manual__{date_param}"', - 'taskinstance get example_bash_operator "manual__{date_param}" runme_0', - "taskinstance clear example_bash_operator --dry-run", - 'taskinstance update example_bash_operator "manual__{date_param}" runme_0 --new-state=success', + # Tasks commands + 'tasks list example_bash_operator "manual__{date_param}"', + 'tasks get example_bash_operator "manual__{date_param}" runme_0', + "tasks clear example_bash_operator --dry-run", + 'tasks update example_bash_operator "manual__{date_param}" runme_0 --new-state=success', # XCom commands - need a DAG run with completed tasks 'xcom add example_bash_operator "manual__{date_param}" runme_0 {xcom_key} \'{{"test": "value"}}\'', 'xcom get example_bash_operator "manual__{date_param}" runme_0 {xcom_key}', diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py index f9a2e0c427fe7..1cfecb3acf5f0 100644 --- a/airflow-ctl/src/airflowctl/api/client.py +++ b/airflow-ctl/src/airflowctl/api/client.py @@ -57,7 +57,7 @@ PoolsOperations, ProvidersOperations, ServerResponseError, - TaskInstanceOperations, + TasksOperations, VariablesOperations, VersionOperations, XComOperations, @@ -476,9 +476,9 @@ def plugins(self): @lru_cache() # type: ignore[prop-decorator] @property - def task_instances(self): - """Operations related to task instances.""" - return TaskInstanceOperations(self) + def tasks(self): + """Operations related to tasks.""" + return TasksOperations(self) # API Client Decorator for CLI Actions diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index a126f83c44bc1..f43cac6d04f88 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -931,8 +931,8 @@ def list_import_errors(self) -> PluginImportErrorCollectionResponse | ServerResp raise e -class TaskInstanceOperations(BaseOperations): - """Task instance operations.""" +class TasksOperations(BaseOperations): + """Tasks operations.""" def get(self, dag_id: str, dag_run_id: str, task_id: str) -> TaskInstanceResponse: """Get a task instance.""" diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 4b84d6f88c1da..77432429cc2a2 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -65,32 +65,6 @@ def _is_list_annotation(annotation: Any) -> bool: return False -def _parse_task_ids_cli_arg(value: str) -> list: - """ - Parse the --task-ids CLI string into the format expected by ClearTaskInstancesBody. - - Accepts comma-separated entries of two forms: - - ``task_id`` — clears all map indices of that task - - ``task_id:map_index`` — clears only the specific mapped instance - - Example: ``"extract:0,transform"`` → ``[TaskIds(root=["extract", 0]), "transform"]`` - """ - entries: list = [] - for entry in (v.strip() for v in value.split(",") if v.strip()): - if ":" in entry: - task_id, _, map_index_str = entry.rpartition(":") - try: - entries.append(generated_datamodels.TaskIds(root=[task_id, int(map_index_str)])) - except ValueError: - raise ValueError( - f"Invalid --task-ids entry '{entry}': " - f"expected 'task_id:map_index' where map_index is an integer." - ) from None - else: - entries.append(entry) - return entries - - def lazy_load_command(import_path: str) -> Callable: """Create a lazy loader for command.""" _, _, name = import_path.rpartition(".") @@ -769,11 +743,7 @@ def _get_func(args: Namespace, api_operation: dict, api_client: Client = NEW_API ): val = args_dict[expanded_parameter] if isinstance(val, str) and expanded_parameter in datamodel.model_fields: - if expanded_parameter == "task_ids": - # task_ids supports nested [task_id, map_index] pairs; - # use the dedicated parser to preserve that structure. - val = _parse_task_ids_cli_arg(val) - elif _is_list_annotation( + if _is_list_annotation( datamodel.model_fields[expanded_parameter].annotation ): val = [v.strip() for v in val.split(",") if v.strip()] diff --git a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml index cefce60c1d4f1..1a1efcba2eca6 100644 --- a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml +++ b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml @@ -101,7 +101,7 @@ plugins: list: "List all installed Airflow plugins" list-import-errors: "List all plugin import errors" -taskinstance: +tasks: get: "Retrieve a task instance by Dag ID, run ID, and task ID" list: "List all task instances for a given Dag run" clear: "Clear task instance state, optionally filtering by task IDs or run scope" diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 7603bc6e30c70..04185825d39c3 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -1962,8 +1962,8 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert response == self.plugin_import_error_collection_response -class TestTaskInstanceOperations: - """Test suite for Task Instance operations.""" +class TestTasksOperations: + """Test suite for Tasks operations.""" dag_id: str = "test_dag" dag_run_id: str = "manual__2025-01-24T00:00:00+00:00" @@ -2011,7 +2011,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: return httpx.Response(200, json=json.loads(self.task_instance_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.task_instances.get( + response = client.tasks.get( dag_id=self.dag_id, dag_run_id=self.dag_run_id, task_id=self.task_id, @@ -2028,7 +2028,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.task_instances.list( + response = client.tasks.list( dag_id=self.dag_id, dag_run_id=self.dag_run_id, ) @@ -2047,7 +2047,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_api_client(transport=httpx.MockTransport(handle_request)) body = ClearTaskInstancesBody(dry_run=True) - response = client.task_instances.clear( + response = client.tasks.clear( dag_id=self.dag_id, body=body, ) @@ -2074,7 +2074,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: task_ids=[self.task_id], dag_run_id=self.dag_run_id, ) - response = client.task_instances.clear( + response = client.tasks.clear( dag_id=self.dag_id, body=body, ) @@ -2095,7 +2095,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_api_client(transport=httpx.MockTransport(handle_request)) body = PatchTaskInstanceBody(new_state=TaskInstanceState.FAILED) - response = client.task_instances.update( + response = client.tasks.update( dag_id=self.dag_id, dag_run_id=self.dag_run_id, task_id=self.task_id, @@ -2119,7 +2119,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_api_client(transport=httpx.MockTransport(handle_request)) body = PatchTaskInstanceBody(note="Manually marked as success") - response = client.task_instances.update( + response = client.tasks.update( dag_id=self.dag_id, dag_run_id=self.dag_run_id, task_id=self.task_id, @@ -2142,7 +2142,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_api_client(transport=httpx.MockTransport(handle_request)) body = PatchTaskInstanceBody(new_state=TaskInstanceState.SUCCESS) - response = client.task_instances.update( + response = client.tasks.update( dag_id=self.dag_id, dag_run_id=self.dag_run_id, task_id=self.task_id, diff --git a/scripts/in_container/run_capture_airflowctl_help.py b/scripts/in_container/run_capture_airflowctl_help.py index 9529dbe04390c..00001dc97048d 100644 --- a/scripts/in_container/run_capture_airflowctl_help.py +++ b/scripts/in_container/run_capture_airflowctl_help.py @@ -48,6 +48,7 @@ "variables", "version", "plugins", + "tasks", ] SUBCOMMANDS = [ From 99f767f4bfdbd6d30bad7c5e07cb4c010af3949e Mon Sep 17 00:00:00 2001 From: Suraj Date: Sun, 21 Jun 2026 00:31:13 +0530 Subject: [PATCH 11/17] Revert unrelated test_pool_command.py assertion changes --- .../tests/airflow_ctl/ctl/commands/test_pool_command.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py index 115a53afd5304..0bc2438929454 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py @@ -19,7 +19,6 @@ from __future__ import annotations import json -import re from unittest import mock import pytest @@ -107,7 +106,7 @@ def test_import_success(self, mock_client, tmp_path, capsys): # Update the assertion to match the actual output format captured = capsys.readouterr() - assert "'test_pool'" in captured.out + assert str(["test_pool"]) in captured.out @pytest.mark.parametrize( ("action_on_existing_key", "expected_enum"), @@ -175,10 +174,8 @@ def test_export_json_to_file(self, mock_client, tmp_path, capsys): # Verify output message captured = capsys.readouterr() - ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") - out_str = ansi_escape.sub("", captured.out).replace("\n", "") - # Since rich wraps long lines, newlines are removed, so we assert the combined text. - assert out_str == f"Exported {len(exported_data)} pool(s) to {export_file}" + expected_output = f"Exported {len(exported_data)} pool(s) to {export_file}" + assert expected_output in captured.out.replace("\n", "") @pytest.mark.parametrize("output_format", ["table", "yaml", "plain"]) def test_export_non_json_uses_airflow_console(self, mock_client, tmp_path, output_format): From 51287e86a1c4b31fa7575ab9c14e8e2b7b339020 Mon Sep 17 00:00:00 2001 From: Suraj Date: Sun, 21 Jun 2026 00:31:33 +0530 Subject: [PATCH 12/17] Use --new-state=failed in tasks update integration test --- .../tests/airflowctl_tests/test_airflowctl_commands.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py index 65077ae298b5e..6471383d494e7 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py @@ -98,7 +98,10 @@ def date_param(): 'tasks list example_bash_operator "manual__{date_param}"', 'tasks get example_bash_operator "manual__{date_param}" runme_0', "tasks clear example_bash_operator --dry-run", - 'tasks update example_bash_operator "manual__{date_param}" runme_0 --new-state=success', + # runme_0 completes as "success" once the triggered run finishes, so updating it + # to "success" is rejected with 409 "already in success state". Use "failed" to + # exercise a real state transition (valid states: success, failed, skipped). + 'tasks update example_bash_operator "manual__{date_param}" runme_0 --new-state=failed', # XCom commands - need a DAG run with completed tasks 'xcom add example_bash_operator "manual__{date_param}" runme_0 {xcom_key} \'{{"test": "value"}}\'', 'xcom get example_bash_operator "manual__{date_param}" runme_0 {xcom_key}', From ccc0ea4a035267e281944a19fbee278ec744607f Mon Sep 17 00:00:00 2001 From: Suraj Date: Sun, 21 Jun 2026 14:38:21 +0530 Subject: [PATCH 13/17] Fix body bool fields overriding API defaults with explicit False --- airflow-ctl/src/airflowctl/ctl/cli_config.py | 4 +- .../tests/airflow_ctl/ctl/test_cli_config.py | 37 ++++++++++++++++++- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 77432429cc2a2..207081338fbb3 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -607,7 +607,7 @@ def _create_arg_for_non_primitive_type( arg_type=self._python_type_from_string(field_type.annotation), arg_action=argparse.BooleanOptionalAction if field_type.annotation is bool else None, # type: ignore arg_help=f"{field} for {parameter_key} operation", - arg_default=False if field_type.annotation is bool else None, + arg_default=None, ) ) else: @@ -622,7 +622,7 @@ def _create_arg_for_non_primitive_type( arg_type=self._python_type_from_string(annotation), arg_action=argparse.BooleanOptionalAction if annotation is bool else None, # type: ignore arg_help=f"{field} for {parameter_key} operation", - arg_default=False if annotation is bool else None, + arg_default=None, ) ) return commands diff --git a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py index 945b5abb83378..e64a560a66032 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py @@ -94,7 +94,7 @@ def test_args_create(): { "help": "run_backwards for backfill operation", "action": BooleanOptionalAction, - "default": False, + "default": None, "type": bool, "dest": None, }, @@ -321,6 +321,41 @@ def list(self, is_alive: bool | None = None) -> JobCollectionResponse | ServerRe assert is_alive_arg.kwargs["default"] is None assert is_alive_arg.kwargs["type"] is bool + def test_command_factory_body_bool_field_defaults_to_none(self, tmp_path): + """Bool fields expanded from a Pydantic body must default to None, not False. + + Otherwise the dispatcher's ``is not None`` filter passes an unset flag + through as ``False``, silently overriding API-side defaults that are + ``True`` (e.g. ``ClearTaskInstancesBody.only_failed``). + """ + temp_file = self._save_temp_operations_py( + tmp_path=tmp_path, + file_content=""" + class TasksOperations(BaseOperations): + def clear(self, dag_id: str, body: ClearTaskInstancesBody): + self.response = self.client.post( + f"dags/{dag_id}/clearTaskInstances", + json=body.model_dump(mode="json"), + ) + return self.response + """, + ) + + command_factory = CommandFactory(file_path=str(temp_file)) + clear_args: list = [] + for generated_group_command in command_factory.group_commands: + if generated_group_command.name != "tasks": + continue + for sub_command in generated_group_command.subcommands: + if sub_command.name == "clear": + clear_args = list(sub_command.args) + break + + for flag in ("--dry-run", "--only-failed", "--reset-dag-runs", "--run-on-latest-version"): + arg = next(a for a in clear_args if a.flags == (flag,)) + assert arg.kwargs["action"] == BooleanOptionalAction, flag + assert arg.kwargs["default"] is None, flag + def test_command_factory_required_primitive_param_is_positional(self, tmp_path): """Required primitive parameters (no default, not Optional) become positional arguments. From 239572a68ee360654bb68cdb0ca5e8c27fb24edc Mon Sep 17 00:00:00 2001 From: Suraj Date: Sun, 21 Jun 2026 14:38:34 +0530 Subject: [PATCH 14/17] Regenerate airflowctl help SVGs for tasks command --- airflow-ctl/docs/images/command_hashes.txt | 3 +- airflow-ctl/docs/images/output_main.svg | 128 ++++++++++----------- airflow-ctl/docs/images/output_tasks.svg | 117 +++++++++++++++++++ 3 files changed, 183 insertions(+), 65 deletions(-) create mode 100644 airflow-ctl/docs/images/output_tasks.svg diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 352b3c3007ed6..18e1f993d3440 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -1,4 +1,4 @@ -main:398879e5076319e2328e5cfbdd9c6bc4 +main:164bc97843d5be583c0b48f7a34dc8c8 assets:70619a2d92bda80930cde2aefcd8e1cd auth:d79e9c7d00c432bdbcbc2a86e2e32053 backfill:74c8737b0a62a86ed3605fa9e6165874 @@ -12,4 +12,5 @@ providers:34502fe09dc0b8b0a13e7e46efdffda6 variables:f8fc76d3d398b2780f4e97f7cd816646 version:31f4efdf8de0dbaaa4fac71ff7efecc3 plugins:4864fd8f356704bd2b3cd1aec3567e35 +tasks:7ab24cac521242b6b6012e2bcd317831 auth login:9fe2bb1dd5c602beea2eefb33a2b20a8 diff --git a/airflow-ctl/docs/images/output_main.svg b/airflow-ctl/docs/images/output_main.svg index 84233ca715d16..f087581896312 100644 --- a/airflow-ctl/docs/images/output_main.svg +++ b/airflow-ctl/docs/images/output_main.svg @@ -19,108 +19,108 @@ font-weight: 700; } - .terminal-2035107094-matrix { + .terminal-2358263890-matrix { font-family: Fira Code, monospace; font-size: 20px; line-height: 24.4px; font-variant-east-asian: full-width; } - .terminal-2035107094-title { + .terminal-2358263890-title { font-size: 18px; font-weight: bold; font-family: arial; } - .terminal-2035107094-r1 { fill: #ff8700 } -.terminal-2035107094-r2 { fill: #c5c8c6 } -.terminal-2035107094-r3 { fill: #808080 } -.terminal-2035107094-r4 { fill: #68a0b3 } + .terminal-2358263890-r1 { fill: #ff8700 } +.terminal-2358263890-r2 { fill: #c5c8c6 } +.terminal-2358263890-r3 { fill: #808080 } +.terminal-2358263890-r4 { fill: #68a0b3 } - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + @@ -132,37 +132,37 @@ - + - - Usage:airflowctl [-hGROUP_OR_COMMAND... - -Positional Arguments: -GROUP_OR_COMMAND - -    Groups -assetsPerform Assets operations -authManage authentication for CLI. Either pass token from -environment variable/parameter or pass username and -password. -backfillPerform Backfill operations -configPerform Config operations -connectionsPerform Connections operations -dagrunPerform DagRun operations -dagsPerform Dags operations -jobsPerform Jobs operations -pluginsPerform Plugins operations -poolsPerform Pools operations -providersPerform Providers operations -taskinstancePerform TaskInstance operations -variablesPerform Variables operations -xcomPerform XCom operations - -    Commands: -versionShow version information - -Options: --h--helpshow this help message and exit + + Usage:airflowctl [-hGROUP_OR_COMMAND... + +Positional Arguments: +GROUP_OR_COMMAND + +    Groups +assetsPerform Assets operations +authManage authentication for CLI. Either pass token from +environment variable/parameter or pass username and +password. +backfillPerform Backfill operations +configPerform Config operations +connectionsPerform Connections operations +dagrunPerform DagRun operations +dagsPerform Dags operations +jobsPerform Jobs operations +pluginsPerform Plugins operations +poolsPerform Pools operations +providersPerform Providers operations +tasksPerform Tasks operations +variablesPerform Variables operations +xcomPerform XCom operations + +    Commands: +versionShow version information + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/docs/images/output_tasks.svg b/airflow-ctl/docs/images/output_tasks.svg new file mode 100644 index 0000000000000..b5760191492c2 --- /dev/null +++ b/airflow-ctl/docs/images/output_tasks.svg @@ -0,0 +1,117 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Usage:airflowctl tasks [-hCOMMAND... + +Perform Tasks operations + +Positional Arguments: +COMMAND +clearClear task instance state, optionally filtering by task IDs  +or run scope +getRetrieve a task instance by Dag ID, run ID, and task ID +listList all task instances for a given Dag run +updateUpdate task instance state or note; use --map-index to target +a single mapped instance + +Options: +-h--helpshow this help message and exit + + + + From 93cf5cd00d00765b5050f9e32215af6c38b8bef1 Mon Sep 17 00:00:00 2001 From: Suraj Date: Wed, 1 Jul 2026 19:25:00 +0530 Subject: [PATCH 15/17] Omit unset include_deferred in pools update to avoid sending null --- airflow-ctl/src/airflowctl/api/operations.py | 2 +- .../tests/airflow_ctl/api/test_operations.py | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index f43cac6d04f88..b17c645d132e4 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -714,7 +714,7 @@ def update(self, pool_body: PoolPatchBody) -> PoolResponse | ServerResponseError """Update a pool.""" try: self.response = self.client.patch( - f"pools/{pool_body.pool}", json=pool_body.model_dump(mode="json") + f"pools/{pool_body.pool}", json=pool_body.model_dump(mode="json", exclude_none=True) ) return PoolResponse.model_validate_json(self.response.content) except ServerResponseError as e: diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 04185825d39c3..3f8a30d636940 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -87,6 +87,7 @@ PluginResponse, PoolBody, PoolCollectionResponse, + PoolPatchBody, PoolResponse, ProviderCollectionResponse, ProviderResponse, @@ -1418,6 +1419,20 @@ def handle_request(request: httpx.Request) -> httpx.Response: response = client.pools.bulk(pools=self.pools_bulk_body) assert response == self.pool_bulk_response + def test_update_omits_unset_include_deferred(self): + """Unset bool fields must be omitted, not sent as null, so the server keeps existing values.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == f"/api/v2/pools/{self.pool_name}" + request_body = json.loads(request.content.decode()) + assert "include_deferred" not in request_body + assert request_body == {"pool": self.pool_name, "slots": 10} + return httpx.Response(200, json=json.loads(self.pool_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.pools.update(pool_body=PoolPatchBody(pool=self.pool_name, slots=10)) + assert response == self.pool_response + def test_delete(self): def handle_request(request: httpx.Request) -> httpx.Response: assert request.url.path == f"/api/v2/pools/{self.pool_name}" From 5e5b8692aa89a8ebe4a5a99e64a3940be2aabd08 Mon Sep 17 00:00:00 2001 From: Suraj Date: Thu, 2 Jul 2026 12:12:43 +0530 Subject: [PATCH 16/17] Sync assets command hash with upstream main --- airflow-ctl/docs/images/command_hashes.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 18e1f993d3440..55fac206ab453 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -1,5 +1,5 @@ main:164bc97843d5be583c0b48f7a34dc8c8 -assets:70619a2d92bda80930cde2aefcd8e1cd +assets:6419e20452692f577c4c6f570b74be0c auth:d79e9c7d00c432bdbcbc2a86e2e32053 backfill:74c8737b0a62a86ed3605fa9e6165874 config:a3d936cb15fe3b547bf6c82cf93d923f From 16a59b9678aeb2af9d02d6792d2c308af4a2f0d5 Mon Sep 17 00:00:00 2001 From: Suraj Date: Thu, 2 Jul 2026 12:12:55 +0530 Subject: [PATCH 17/17] Default include_deferred to False on pools update to satisfy server PATCH validator --- airflow-ctl/src/airflowctl/api/operations.py | 14 ++++++--- .../tests/airflow_ctl/api/test_operations.py | 30 +++++++++++++++---- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index b17c645d132e4..ba1bec2cc78ad 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -712,10 +712,16 @@ def delete(self, pool: str) -> str | ServerResponseError: def update(self, pool_body: PoolPatchBody) -> PoolResponse | ServerResponseError: """Update a pool.""" - try: - self.response = self.client.patch( - f"pools/{pool_body.pool}", json=pool_body.model_dump(mode="json", exclude_none=True) - ) + # Workaround: the server's PATCH handler validates the partial body + # against ``BasePool`` (see airflow-core/.../services/public/pools.py) + # which requires ``include_deferred``. Omitting it fails with + # "Field required", sending ``null`` fails with "bool_type". Always + # send ``include_deferred`` (defaulting to False when unset) so PATCH + # requests are accepted until the server switches to a partial validator. + body = pool_body.model_dump(mode="json", exclude_none=True) + body.setdefault("include_deferred", False) + try: + self.response = self.client.patch(f"pools/{pool_body.pool}", json=body) return PoolResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 3f8a30d636940..0d109390b1de1 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -1419,20 +1419,40 @@ def handle_request(request: httpx.Request) -> httpx.Response: response = client.pools.bulk(pools=self.pools_bulk_body) assert response == self.pool_bulk_response - def test_update_omits_unset_include_deferred(self): - """Unset bool fields must be omitted, not sent as null, so the server keeps existing values.""" + def test_update_defaults_unset_include_deferred_to_false(self): + """Unset include_deferred must default to False to satisfy the server's PATCH validator.""" def handle_request(request: httpx.Request) -> httpx.Response: assert request.url.path == f"/api/v2/pools/{self.pool_name}" - request_body = json.loads(request.content.decode()) - assert "include_deferred" not in request_body - assert request_body == {"pool": self.pool_name, "slots": 10} + assert json.loads(request.content.decode()) == { + "pool": self.pool_name, + "slots": 10, + "include_deferred": False, + } return httpx.Response(200, json=json.loads(self.pool_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) response = client.pools.update(pool_body=PoolPatchBody(pool=self.pool_name, slots=10)) assert response == self.pool_response + def test_update_preserves_explicit_include_deferred(self): + """Explicit include_deferred value from the user must be preserved, not overwritten.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == f"/api/v2/pools/{self.pool_name}" + assert json.loads(request.content.decode()) == { + "pool": self.pool_name, + "slots": 10, + "include_deferred": True, + } + return httpx.Response(200, json=json.loads(self.pool_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.pools.update( + pool_body=PoolPatchBody(pool=self.pool_name, slots=10, include_deferred=True) + ) + assert response == self.pool_response + def test_delete(self): def handle_request(request: httpx.Request) -> httpx.Response: assert request.url.path == f"/api/v2/pools/{self.pool_name}"