From 616f131495445713fa8c8d38f5785a39c618f070 Mon Sep 17 00:00:00 2001 From: Tyler Calder Date: Fri, 22 Sep 2023 12:57:34 -0600 Subject: [PATCH 1/5] fix: Make dry_run optional per docs This fixes an issue where dry_run is not actually and optional parameter in the patch task_instance api. --- .../schemas/task_instance_schema.py | 2 +- .../endpoints/test_task_instance_endpoint.py | 30 +++++++++++++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index 1d5fd29665b16..0824f0779e964 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -180,7 +180,7 @@ def validate_form(self, data, **kwargs): class SetSingleTaskInstanceStateFormSchema(Schema): """Schema for handling the request of updating state of a single task instance.""" - dry_run = fields.Boolean(dump_default=True) + dry_run = fields.Boolean(dump_default=True, load_default=False) new_state = TaskInstanceStateField( required=True, validate=validate.OneOf( diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index f09b55cf41849..f0d6291fdc4fd 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -210,7 +210,8 @@ def test_should_respond_200(self, username, session): # This prevents issue when users upgrade to 2.0+ # from 1.10.x # https://github.com/apache/airflow/issues/14421 - session.query(TaskInstance).update({TaskInstance.operator: None}, synchronize_session="fetch") + session.query(TaskInstance).update( + {TaskInstance.operator: None}, synchronize_session="fetch") session.commit() response = self.client.get( "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context", @@ -312,7 +313,8 @@ def test_should_respond_200_with_task_state_in_deferred(self, session): } def test_should_respond_200_with_task_state_in_removed(self, session): - self.create_task_instances(session, task_instances=[{"state": State.REMOVED}], update_extras=True) + self.create_task_instances(session, task_instances=[ + {"state": State.REMOVED}], update_extras=True) response = self.client.get( "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context", environ_overrides={"REMOTE_USER": "test"}, @@ -667,7 +669,8 @@ def test_should_respond_200_for_dag_id_filter(self, session): ) assert response.status_code == 200 - count = session.query(TaskInstance).filter(TaskInstance.dag_id == "example_python_operator").count() + count = session.query(TaskInstance).filter( + TaskInstance.dag_id == "example_python_operator").count() assert count == response.json["total_entries"] assert count == len(response.json["task_instances"]) @@ -1772,6 +1775,27 @@ def test_should_update_task_instance_state(self, session): assert response2.status_code == 200 assert response2.json["state"] == NEW_STATE + def test_should_update_task_instance_state_default_dry_run_to_false(self, session): + self.create_task_instances(session) + + NEW_STATE = "failed" + + self.client.patch( + self.ENDPOINT_URL, + environ_overrides={"REMOTE_USER": "test"}, + json={ + "new_state": NEW_STATE, + }, + ) + + response2 = self.client.get( + self.ENDPOINT_URL, + environ_overrides={"REMOTE_USER": "test"}, + json={}, + ) + assert response2.status_code == 200 + assert response2.json["state"] == NEW_STATE + def test_should_update_mapped_task_instance_state(self, session): NEW_STATE = "failed" map_index = 1 From 926cbd831fae95ab21b73b88d65a86d63d3b96b7 Mon Sep 17 00:00:00 2001 From: Tyler Calder Date: Fri, 22 Sep 2023 15:06:10 -0600 Subject: [PATCH 2/5] chore: remove formatting changes --- .../endpoints/test_task_instance_endpoint.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index f0d6291fdc4fd..ef6a917101e01 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -210,8 +210,7 @@ def test_should_respond_200(self, username, session): # This prevents issue when users upgrade to 2.0+ # from 1.10.x # https://github.com/apache/airflow/issues/14421 - session.query(TaskInstance).update( - {TaskInstance.operator: None}, synchronize_session="fetch") + session.query(TaskInstance).update({TaskInstance.operator: None}, synchronize_session="fetch") session.commit() response = self.client.get( "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context", @@ -313,8 +312,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, session): } def test_should_respond_200_with_task_state_in_removed(self, session): - self.create_task_instances(session, task_instances=[ - {"state": State.REMOVED}], update_extras=True) + self.create_task_instances(session, task_instances=[{"state": State.REMOVED}], update_extras=True) response = self.client.get( "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context", environ_overrides={"REMOTE_USER": "test"}, @@ -669,8 +667,7 @@ def test_should_respond_200_for_dag_id_filter(self, session): ) assert response.status_code == 200 - count = session.query(TaskInstance).filter( - TaskInstance.dag_id == "example_python_operator").count() + count = session.query(TaskInstance).filter(TaskInstance.dag_id == "example_python_operator").count() assert count == response.json["total_entries"] assert count == len(response.json["task_instances"]) From d0b66a7b835914b0755c60672f12fd0b93ecac91 Mon Sep 17 00:00:00 2001 From: Tyler Calder Date: Mon, 25 Sep 2023 06:02:34 -0600 Subject: [PATCH 3/5] fix: Make changes for api docs This updates the docs and the code so that they are in alignment while also being consistent with all other endpoints. All other Endpoints have dry run set to be True by default. --- airflow/api_connexion/openapi/v1.yaml | 2 +- airflow/api_connexion/schemas/task_instance_schema.py | 2 +- tests/api_connexion/endpoints/test_task_instance_endpoint.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index af9d0cf8a465e..6cd885f6dbaf2 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -4302,7 +4302,7 @@ components: If set, don't actually run this operation. The response will contain the task instance planned to be affected, but won't be modified in any way. type: boolean - default: false + default: true new_state: $ref: '#/components/schemas/UpdateTaskState' diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index 0824f0779e964..6def204214d21 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -180,7 +180,7 @@ def validate_form(self, data, **kwargs): class SetSingleTaskInstanceStateFormSchema(Schema): """Schema for handling the request of updating state of a single task instance.""" - dry_run = fields.Boolean(dump_default=True, load_default=False) + dry_run = fields.Boolean(dump_default=True, load_default=True) new_state = TaskInstanceStateField( required=True, validate=validate.OneOf( diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index ef6a917101e01..5056f7736dcec 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -1772,10 +1772,10 @@ def test_should_update_task_instance_state(self, session): assert response2.status_code == 200 assert response2.json["state"] == NEW_STATE - def test_should_update_task_instance_state_default_dry_run_to_false(self, session): + def test_should_update_task_instance_state_default_dry_run_to_true(self, session): self.create_task_instances(session) - NEW_STATE = "failed" + NEW_STATE = "running" self.client.patch( self.ENDPOINT_URL, From 54c25ebf0c6e195fc3fda6a59853e7112df92163 Mon Sep 17 00:00:00 2001 From: Tyler Calder Date: Wed, 27 Sep 2023 11:21:42 -0600 Subject: [PATCH 4/5] fix: Update static ts file for api change --- airflow/www/static/js/types/api-generated.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 9b86f9c5757d0..2361764feb04b 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1904,7 +1904,7 @@ export interface components { * @description If set, don't actually run this operation. The response will contain the task instance * planned to be affected, but won't be modified in any way. * - * @default false + * @default true */ dry_run?: boolean; new_state?: components["schemas"]["UpdateTaskState"]; From 895efde5b16a294cc21d3ebefcf4296ce5b328b7 Mon Sep 17 00:00:00 2001 From: Tyler Calder Date: Fri, 29 Sep 2023 09:33:35 -0600 Subject: [PATCH 5/5] fix: Remove dump_default --- airflow/api_connexion/schemas/task_instance_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index 6def204214d21..02dc1fb3f6fa6 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -180,7 +180,7 @@ def validate_form(self, data, **kwargs): class SetSingleTaskInstanceStateFormSchema(Schema): """Schema for handling the request of updating state of a single task instance.""" - dry_run = fields.Boolean(dump_default=True, load_default=True) + dry_run = fields.Boolean(load_default=True) new_state = TaskInstanceStateField( required=True, validate=validate.OneOf(