From c3ed43b37939e708953a9a3142e59e3e8e16ac0f Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 9 Sep 2023 21:30:03 +0200 Subject: [PATCH 1/4] Make param validation consistent for DAG validation and triggering --- airflow/models/dag.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 2cdcfc197abce..ceb2ca2eb70a5 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -84,6 +84,7 @@ AirflowSkipException, DuplicateTaskIdFound, FailStopDagInvalidTriggerRule, + ParamValidationError, RemovedInAirflow3Warning, TaskNotFound, ) @@ -3276,20 +3277,20 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed def validate_schedule_and_params(self): """ - Validate Param values when the schedule_interval is not None. + Validate Param values when the DAG has schedule defined. - Raise exception if there are any Params in the DAG which neither have a default value nor - have the null in schema['type'] list, but the DAG have a schedule_interval which is not None. + Raise exception if there are any Params which can not be resolved by their schema definition. """ if not self.timetable.can_be_scheduled: return - for v in self.params.values(): - # As type can be an array, we would check if `null` is an allowed type or not - if not v.has_value and ("type" not in v.schema or "null" not in v.schema["type"]): - raise AirflowException( - "DAG Schedule must be None, if there are any required params without default values" - ) + try: + self.params.validate() + except ParamValidationError as pverr: + raise AirflowException( + "DAG is not allowed to define a Schedule, " + "if there are any required params without default values or default values are not valid." + ) from pverr def iter_invalid_owner_links(self) -> Iterator[tuple[str, str]]: """ From d5fa69ce77b279597e18c1321785960c991b7fbc Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 10 Sep 2023 00:15:56 +0200 Subject: [PATCH 2/4] Allow optional defaults in required fields with manual triggered dags --- .../example_params_ui_tutorial.py | 24 +++------ airflow/models/dag.py | 6 +-- docs/apache-airflow/core-concepts/params.rst | 9 +++- tests/dags/test_invalid_param.py | 2 +- tests/dags/test_invalid_param2.py | 45 +++++++++++++++++ tests/dags/test_invalid_param3.py | 45 +++++++++++++++++ tests/dags/test_invalid_param4.py | 45 +++++++++++++++++ tests/dags/test_valid_param.py | 49 +++++++++++++++++++ tests/dags/test_valid_param2.py | 47 ++++++++++++++++++ tests/jobs/test_scheduler_job.py | 3 ++ tests/models/test_dagbag.py | 25 +++++++++- 11 files changed, 277 insertions(+), 23 deletions(-) create mode 100644 tests/dags/test_invalid_param2.py create mode 100644 tests/dags/test_invalid_param3.py create mode 100644 tests/dags/test_invalid_param4.py create mode 100644 tests/dags/test_valid_param.py create mode 100644 tests/dags/test_valid_param2.py diff --git a/airflow/example_dags/example_params_ui_tutorial.py b/airflow/example_dags/example_params_ui_tutorial.py index a9da876bf2b7c..c63fc61b73b0a 100644 --- a/airflow/example_dags/example_params_ui_tutorial.py +++ b/airflow/example_dags/example_params_ui_tutorial.py @@ -25,16 +25,10 @@ import datetime import json from pathlib import Path -from typing import TYPE_CHECKING from airflow import DAG from airflow.decorators import task -from airflow.exceptions import AirflowSkipException -from airflow.models.param import Param - -if TYPE_CHECKING: - from airflow.models.dagrun import DagRun - from airflow.models.taskinstance import TaskInstance +from airflow.models.param import Param, ParamsDict with DAG( dag_id=Path(__file__).stem, @@ -170,9 +164,11 @@ ), # Fields can be required or not. If the defined fields are typed they are getting required by default # (else they would not pass JSON schema validation) - to make typed fields optional you must - # permit the optional "null" type + # permit the optional "null" type. + # You can omit a default value if the DAG is triggered manually "required_field": Param( - "You can not trigger if no text is given here!", + # In this example we have no default value + # Form will enforce a value supplied by users to be able to trigger type="string", title="Required text field", description="This field is required. You can not submit without having text in here.", @@ -303,13 +299,9 @@ }, ) as dag: - @task(task_id="show_params") + @task def show_params(**kwargs) -> None: - ti: TaskInstance = kwargs["ti"] - dag_run: DagRun = ti.dag_run - if not dag_run.conf: - print("Uups, no parameters supplied as DagRun.conf, was the trigger w/o form?") - raise AirflowSkipException("No DagRun.conf parameters supplied.") - print(f"This DAG was triggered with the following parameters:\n{json.dumps(dag_run.conf, indent=4)}") + params: ParamsDict = kwargs["params"] + print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n") show_params() diff --git a/airflow/models/dag.py b/airflow/models/dag.py index ceb2ca2eb70a5..18f2f32eeace5 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -710,7 +710,7 @@ def validate(self): f"inconsistent schedule: timetable {self.timetable.summary!r} " f"does not match schedule_interval {self.schedule_interval!r}", ) - self.params.validate() + self.validate_schedule_and_params() self.timetable.validate() self.validate_setup_teardown() @@ -3288,8 +3288,8 @@ def validate_schedule_and_params(self): self.params.validate() except ParamValidationError as pverr: raise AirflowException( - "DAG is not allowed to define a Schedule, " - "if there are any required params without default values or default values are not valid." + "There are either required params without default values or default values are not valid " + f"in DAG {self.dag_id}. This is not allowed if the DAG defines a Schedule." ) from pverr def iter_invalid_owner_links(self) -> Iterator[tuple[str, str]]: diff --git a/docs/apache-airflow/core-concepts/params.rst b/docs/apache-airflow/core-concepts/params.rst index 4159231505b5b..b4fe054a0744b 100644 --- a/docs/apache-airflow/core-concepts/params.rst +++ b/docs/apache-airflow/core-concepts/params.rst @@ -158,6 +158,11 @@ JSON Schema Validation }, ): +.. note:: + Schema validation on DAG definitions is only made if a schedule is defined. If a manual or external trigger + is defined (e.g. via ``schedule=None``) then params are validated during submission. This allows to enforce + undefined parameters being defined at trigger time. + .. note:: As of now, for security reasons, one can not use Param objects derived out of custom classes. We are planning to have a registration system for custom Param classes, just like we've for Operator ExtraLinks. @@ -175,7 +180,7 @@ The following features are supported in the Trigger UI Form: - Direct scalar values (boolean, int, string, lists, dicts) from top-level DAG params are interpreted and render a corresponding field type. The name of the param is used as label and no further validation is made, all values are treated as optional. -- If you use the :class:`~airflow.modules.param.Param` class as definition of the param value, the following parameters can be added: +- If you use the :class:`~airflow.modules.param.Param` class as definition of the param value, the following attributes can be added: - The Param attribute ``title`` is used to render the form field label of the entry box - The Param attribute ``description`` is rendered below an entry field as help text in gray color. @@ -198,6 +203,8 @@ The following features are supported in the Trigger UI Form: - Note: Per default if you specify a type, a field will be made required with input - because of JSON validation. If you want to have a field value being added optional only, you must allow JSON schema validation allowing null values via: ``type=["null", "string"]`` + - Note: If the field is required you need to specify a valid default value as well. Exception is if DAG is defined with + ``schedule=None`` which enforces the user to provide a value in the form at time of submission. - The Param attribute ``enum`` generates a drop-down select list for scalar values. As of JSON validation, a value must be selected or the field must be marked as optional explicit. diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param.py index 54a149b8a9cda..4f52761762502 100644 --- a/tests/dags/test_invalid_param.py +++ b/tests/dags/test_invalid_param.py @@ -25,7 +25,7 @@ with DAG( "test_invalid_param", start_date=datetime(2021, 1, 1), - schedule="@once", + schedule="0 0 * * *", params={ # a mandatory str param "str_param": Param(type="string", minLength=2, maxLength=4), diff --git a/tests/dags/test_invalid_param2.py b/tests/dags/test_invalid_param2.py new file mode 100644 index 0000000000000..7ed9d5c443c10 --- /dev/null +++ b/tests/dags/test_invalid_param2.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator + +with DAG( + "test_invalid_param2", + start_date=datetime(2021, 1, 1), + schedule="0 0 * * *", + params={ + # a mandatory str param but pass None as value which is invalid + "str_param": Param(default=None, type="string", minLength=2, maxLength=4), + }, +) as the_dag: + + def print_these(*params): + for param in params: + print(param) + + PythonOperator( + task_id="ref_params", + python_callable=print_these, + op_args=[ + "{{ params.str_param }}", + ], + ) diff --git a/tests/dags/test_invalid_param3.py b/tests/dags/test_invalid_param3.py new file mode 100644 index 0000000000000..67cb1bbff7ce2 --- /dev/null +++ b/tests/dags/test_invalid_param3.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator + +with DAG( + "test_invalid_param3", + start_date=datetime(2021, 1, 1), + schedule="0 0 * * *", + params={ + # a mandatory number param but pass a string as default value + "int_param": Param(default="banana", type="integer"), + }, +) as the_dag: + + def print_these(*params): + for param in params: + print(param) + + PythonOperator( + task_id="ref_params", + python_callable=print_these, + op_args=[ + "{{ params.int_param }}", + ], + ) diff --git a/tests/dags/test_invalid_param4.py b/tests/dags/test_invalid_param4.py new file mode 100644 index 0000000000000..7c6354af0dd60 --- /dev/null +++ b/tests/dags/test_invalid_param4.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator + +with DAG( + "test_invalid_param4", + start_date=datetime(2021, 1, 1), + schedule="0 0 * * *", + params={ + # a mandatory string but the default is not valid in length validation + "str_param": Param(default="banana", type="string", minLength=2, maxLength=4), + }, +) as the_dag: + + def print_these(*params): + for param in params: + print(param) + + PythonOperator( + task_id="ref_params", + python_callable=print_these, + op_args=[ + "{{ params.str_param }}", + ], + ) diff --git a/tests/dags/test_valid_param.py b/tests/dags/test_valid_param.py new file mode 100644 index 0000000000000..67a6b4e814919 --- /dev/null +++ b/tests/dags/test_valid_param.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator + +with DAG( + "test_valid_param", + start_date=datetime(2021, 1, 1), + schedule=None, + params={ + # a string default is not mandatory as DAG has no schedule + "str_param": Param(type="string", minLength=2, maxLength=4), + # a string with None as default is also accepted as no schedule + "str_param2": Param(None, type="string", minLength=2, maxLength=4), + # But of course adding a valid default is also fine + "str_param3": Param("valid_default", type="string", minLength=2, maxLength=15), + }, +) as the_dag: + + def print_these(*params): + for param in params: + print(param) + + PythonOperator( + task_id="ref_params", + python_callable=print_these, + op_args=[ + "{{ params.str_param }}", + ], + ) diff --git a/tests/dags/test_valid_param2.py b/tests/dags/test_valid_param2.py new file mode 100644 index 0000000000000..9767357d3b3e0 --- /dev/null +++ b/tests/dags/test_valid_param2.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator + +with DAG( + "test_valid_param2", + start_date=datetime(2021, 1, 1), + schedule="0 0 * * *", + params={ + # mandatory string has default, this is how we want it! + "str_param": Param("some_default", type="string", minLength=2, maxLength=12), + # Field does not need to have a default if type is nullable + "optional_str_param": Param(None, type=["null", "string"]), + }, +) as the_dag: + + def print_these(*params): + for param in params: + print(param) + + PythonOperator( + task_id="ref_params", + python_callable=print_these, + op_args=[ + "{{ params.str_param }}", + ], + ) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 4fdb006a38a66..e1b98a9d5b01c 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3113,6 +3113,9 @@ def test_list_py_file_paths(self): "test_invalid_dup_task.py", "test_ignore_this.py", "test_invalid_param.py", + "test_invalid_param2.py", + "test_invalid_param3.py", + "test_invalid_param4.py", "test_nested_dag.py", "test_imports.py", "__init__.py", diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index e0a999b9d8a95..a34fb7b92b6dd 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -283,9 +283,14 @@ def test_process_file_cron_validity_check( def test_process_file_invalid_param_check(self, tmp_path): """ - test if an invalid param in the dag param can be identified + test if an invalid param in the dags can be identified """ - invalid_dag_files = ["test_invalid_param.py"] + invalid_dag_files = [ + "test_invalid_param.py", + "test_invalid_param2.py", + "test_invalid_param3.py", + "test_invalid_param4.py", + ] dagbag = models.DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) assert len(dagbag.import_errors) == 0 @@ -294,6 +299,22 @@ def test_process_file_invalid_param_check(self, tmp_path): assert len(dagbag.import_errors) == len(invalid_dag_files) assert len(dagbag.dags) == 0 + def test_process_file_valid_param_check(self, tmp_path): + """ + test if valid params in the dags param can be validated (positive test) + """ + valid_dag_files = [ + "test_valid_param.py", + "test_valid_param2.py", + ] + dagbag = models.DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + + assert len(dagbag.import_errors) == 0 + for file in valid_dag_files: + dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, file)) + assert len(dagbag.import_errors) == 0 + assert len(dagbag.dags) == len(valid_dag_files) + @patch.object(DagModel, "get_current") def test_get_dag_without_refresh(self, mock_dagmodel): """ From 407418d276831534b7cc8f002aa9bb604c1f179a Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Tue, 26 Sep 2023 22:51:10 +0200 Subject: [PATCH 3/4] Documentation review feedback --- docs/apache-airflow/core-concepts/params.rst | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/apache-airflow/core-concepts/params.rst b/docs/apache-airflow/core-concepts/params.rst index f2d1ba3b48562..fb30286137066 100644 --- a/docs/apache-airflow/core-concepts/params.rst +++ b/docs/apache-airflow/core-concepts/params.rst @@ -160,9 +160,10 @@ JSON Schema Validation ): .. note:: - Schema validation on DAG definitions is only made if a schedule is defined. If a manual or external trigger - is defined (e.g. via ``schedule=None``) then params are validated during submission. This allows to enforce - undefined parameters being defined at trigger time. + If ``schedule`` is defined for a DAG params with defaults must be valid. This is validated during DAG parsing. + If ``schedule=None`` then params are not validated while DAG is parsed but finally at trigger of DAG. + This is useful in cases where the DAG author does not want to provide defaults but wants to force users provide valid parameters + at time of trigger. .. note:: As of now, for security reasons, one can not use :class:`~airflow.models.param.Param` objects derived out of custom classes. We are @@ -314,8 +315,8 @@ The following features are supported in the Trigger UI Form: - If you want to render custom HTML as form on top of the provided features, you can use the ``custom_html_form`` attribute. .. note:: - If the field is required you need to specify a valid default value as well. Exception is if DAG is defined with - ``schedule=None`` which enforces the user to provide a value in the form at time of submission. + If the field is required the default value must be valid according to the schema as well. If the DAG is defined with + ``schedule=None`` the parameter value validation is made at time of trigger. For examples also please take a look to two example DAGs provided: ``example_params_trigger_ui`` and ``example_params_ui_tutorial``. From 90a0bcb01e40017d815aca0172dd89c1df113bac Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Mon, 2 Oct 2023 23:32:53 +0200 Subject: [PATCH 4/4] Review feedback --- docs/apache-airflow/core-concepts/params.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/apache-airflow/core-concepts/params.rst b/docs/apache-airflow/core-concepts/params.rst index 00b07369ba787..86de69d5c58e0 100644 --- a/docs/apache-airflow/core-concepts/params.rst +++ b/docs/apache-airflow/core-concepts/params.rst @@ -160,8 +160,8 @@ JSON Schema Validation ): .. note:: - If ``schedule`` is defined for a DAG params with defaults must be valid. This is validated during DAG parsing. - If ``schedule=None`` then params are not validated while DAG is parsed but finally at trigger of DAG. + If ``schedule`` is defined for a DAG, params with defaults must be valid. This is validated during DAG parsing. + If ``schedule=None`` then params are not validated during DAG parsing but before triggering a DAG. This is useful in cases where the DAG author does not want to provide defaults but wants to force users provide valid parameters at time of trigger.