diff --git a/airflow/example_dags/example_params_ui_tutorial.py b/airflow/example_dags/example_params_ui_tutorial.py index 4373676b6a39b..12992c545c25a 100644 --- a/airflow/example_dags/example_params_ui_tutorial.py +++ b/airflow/example_dags/example_params_ui_tutorial.py @@ -25,16 +25,10 @@ import datetime import json from pathlib import Path -from typing import TYPE_CHECKING from airflow.decorators import task -from airflow.exceptions import AirflowSkipException from airflow.models.dag import DAG -from airflow.models.param import Param - -if TYPE_CHECKING: - from airflow.models.dagrun import DagRun - from airflow.models.taskinstance import TaskInstance +from airflow.models.param import Param, ParamsDict with DAG( dag_id=Path(__file__).stem, @@ -170,9 +164,11 @@ ), # Fields can be required or not. If the defined fields are typed they are getting required by default # (else they would not pass JSON schema validation) - to make typed fields optional you must - # permit the optional "null" type + # permit the optional "null" type. + # You can omit a default value if the DAG is triggered manually "required_field": Param( - "You can not trigger if no text is given here!", + # In this example we have no default value + # Form will enforce a value supplied by users to be able to trigger type="string", title="Required text field", description="This field is required. You can not submit without having text in here.", @@ -303,13 +299,9 @@ }, ) as dag: - @task(task_id="show_params") + @task def show_params(**kwargs) -> None: - ti: TaskInstance = kwargs["ti"] - dag_run: DagRun = ti.dag_run - if not dag_run.conf: - print("Uups, no parameters supplied as DagRun.conf, was the trigger w/o form?") - raise AirflowSkipException("No DagRun.conf parameters supplied.") - print(f"This DAG was triggered with the following parameters:\n{json.dumps(dag_run.conf, indent=4)}") + params: ParamsDict = kwargs["params"] + print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n") show_params() diff --git a/airflow/models/dag.py b/airflow/models/dag.py index d2540a250baa5..bef0bde9a7164 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -726,7 +726,7 @@ def validate(self): f"inconsistent schedule: timetable {self.timetable.summary!r} " f"does not match schedule_interval {self.schedule_interval!r}", ) - self.params.validate() + self.validate_schedule_and_params() self.timetable.validate() self.validate_setup_teardown() diff --git a/docs/apache-airflow/core-concepts/params.rst b/docs/apache-airflow/core-concepts/params.rst index 42fd523688068..b2b95252ec719 100644 --- a/docs/apache-airflow/core-concepts/params.rst +++ b/docs/apache-airflow/core-concepts/params.rst @@ -160,6 +160,12 @@ JSON Schema Validation }, ): +.. note:: + If ``schedule`` is defined for a DAG, params with defaults must be valid. This is validated during DAG parsing. + If ``schedule=None`` then params are not validated during DAG parsing but before triggering a DAG. + This is useful in cases where the DAG author does not want to provide defaults but wants to force users provide valid parameters + at time of trigger. + .. note:: As of now, for security reasons, one can not use :class:`~airflow.models.param.Param` objects derived out of custom classes. We are planning to have a registration system for custom :class:`~airflow.models.param.Param` classes, just like we've for Operator ExtraLinks. @@ -298,7 +304,6 @@ The following features are supported in the Trigger UI Form: - - ``Param(None, type=["null", "string"])`` - - If a form field is left empty, it is passed as ``None`` value to the params dict. - Form fields are rendered in the order of definition of ``params`` in the DAG. - If you want to add sections to the Form, add the attribute ``section`` to each field. The text will be used as section label. @@ -310,6 +315,10 @@ The following features are supported in the Trigger UI Form: If you want to change values manually, the JSON configuration can be adjusted. Changes are overridden when form fields change. - If you want to render custom HTML as form on top of the provided features, you can use the ``custom_html_form`` attribute. +.. note:: + If the field is required the default value must be valid according to the schema as well. If the DAG is defined with + ``schedule=None`` the parameter value validation is made at time of trigger. + For examples also please take a look to two example DAGs provided: ``example_params_trigger_ui`` and ``example_params_ui_tutorial``. .. image:: ../img/trigger-dag-tutorial-form.png diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param.py index 7e278cf02f543..9f5fdfa05f660 100644 --- a/tests/dags/test_invalid_param.py +++ b/tests/dags/test_invalid_param.py @@ -25,7 +25,7 @@ with DAG( "test_invalid_param", start_date=datetime(2021, 1, 1), - schedule="@once", + schedule="0 0 * * *", params={ # a mandatory str param "str_param": Param(type="string", minLength=2, maxLength=4), diff --git a/tests/dags/test_invalid_param2.py b/tests/dags/test_invalid_param2.py new file mode 100644 index 0000000000000..7ed9d5c443c10 --- /dev/null +++ b/tests/dags/test_invalid_param2.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator + +with DAG( + "test_invalid_param2", + start_date=datetime(2021, 1, 1), + schedule="0 0 * * *", + params={ + # a mandatory str param but pass None as value which is invalid + "str_param": Param(default=None, type="string", minLength=2, maxLength=4), + }, +) as the_dag: + + def print_these(*params): + for param in params: + print(param) + + PythonOperator( + task_id="ref_params", + python_callable=print_these, + op_args=[ + "{{ params.str_param }}", + ], + ) diff --git a/tests/dags/test_invalid_param3.py b/tests/dags/test_invalid_param3.py new file mode 100644 index 0000000000000..67cb1bbff7ce2 --- /dev/null +++ b/tests/dags/test_invalid_param3.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator + +with DAG( + "test_invalid_param3", + start_date=datetime(2021, 1, 1), + schedule="0 0 * * *", + params={ + # a mandatory number param but pass a string as default value + "int_param": Param(default="banana", type="integer"), + }, +) as the_dag: + + def print_these(*params): + for param in params: + print(param) + + PythonOperator( + task_id="ref_params", + python_callable=print_these, + op_args=[ + "{{ params.int_param }}", + ], + ) diff --git a/tests/dags/test_invalid_param4.py b/tests/dags/test_invalid_param4.py new file mode 100644 index 0000000000000..7c6354af0dd60 --- /dev/null +++ b/tests/dags/test_invalid_param4.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator + +with DAG( + "test_invalid_param4", + start_date=datetime(2021, 1, 1), + schedule="0 0 * * *", + params={ + # a mandatory string but the default is not valid in length validation + "str_param": Param(default="banana", type="string", minLength=2, maxLength=4), + }, +) as the_dag: + + def print_these(*params): + for param in params: + print(param) + + PythonOperator( + task_id="ref_params", + python_callable=print_these, + op_args=[ + "{{ params.str_param }}", + ], + ) diff --git a/tests/dags/test_valid_param.py b/tests/dags/test_valid_param.py new file mode 100644 index 0000000000000..67a6b4e814919 --- /dev/null +++ b/tests/dags/test_valid_param.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator + +with DAG( + "test_valid_param", + start_date=datetime(2021, 1, 1), + schedule=None, + params={ + # a string default is not mandatory as DAG has no schedule + "str_param": Param(type="string", minLength=2, maxLength=4), + # a string with None as default is also accepted as no schedule + "str_param2": Param(None, type="string", minLength=2, maxLength=4), + # But of course adding a valid default is also fine + "str_param3": Param("valid_default", type="string", minLength=2, maxLength=15), + }, +) as the_dag: + + def print_these(*params): + for param in params: + print(param) + + PythonOperator( + task_id="ref_params", + python_callable=print_these, + op_args=[ + "{{ params.str_param }}", + ], + ) diff --git a/tests/dags/test_valid_param2.py b/tests/dags/test_valid_param2.py new file mode 100644 index 0000000000000..9767357d3b3e0 --- /dev/null +++ b/tests/dags/test_valid_param2.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator + +with DAG( + "test_valid_param2", + start_date=datetime(2021, 1, 1), + schedule="0 0 * * *", + params={ + # mandatory string has default, this is how we want it! + "str_param": Param("some_default", type="string", minLength=2, maxLength=12), + # Field does not need to have a default if type is nullable + "optional_str_param": Param(None, type=["null", "string"]), + }, +) as the_dag: + + def print_these(*params): + for param in params: + print(param) + + PythonOperator( + task_id="ref_params", + python_callable=print_these, + op_args=[ + "{{ params.str_param }}", + ], + ) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 5e83c77a6463e..43cf0dc282377 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3110,6 +3110,9 @@ def test_list_py_file_paths(self): "test_invalid_dup_task.py", "test_ignore_this.py", "test_invalid_param.py", + "test_invalid_param2.py", + "test_invalid_param3.py", + "test_invalid_param4.py", "test_nested_dag.py", "test_imports.py", "__init__.py", diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index e204a16d1072e..8ebff1dc16e32 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -284,9 +284,14 @@ def test_process_file_cron_validity_check( def test_process_file_invalid_param_check(self, tmp_path): """ - test if an invalid param in the dag param can be identified + test if an invalid param in the dags can be identified """ - invalid_dag_files = ["test_invalid_param.py"] + invalid_dag_files = [ + "test_invalid_param.py", + "test_invalid_param2.py", + "test_invalid_param3.py", + "test_invalid_param4.py", + ] dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) assert len(dagbag.import_errors) == 0 @@ -295,6 +300,22 @@ def test_process_file_invalid_param_check(self, tmp_path): assert len(dagbag.import_errors) == len(invalid_dag_files) assert len(dagbag.dags) == 0 + def test_process_file_valid_param_check(self, tmp_path): + """ + test if valid params in the dags param can be validated (positive test) + """ + valid_dag_files = [ + "test_valid_param.py", + "test_valid_param2.py", + ] + dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + + assert len(dagbag.import_errors) == 0 + for file in valid_dag_files: + dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, file)) + assert len(dagbag.import_errors) == 0 + assert len(dagbag.dags) == len(valid_dag_files) + @patch.object(DagModel, "get_current") def test_get_dag_without_refresh(self, mock_dagmodel): """