From 62c98045af1c0dc7b42726a17fb0a3eb7b1d9df1 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Wed, 17 Apr 2024 15:34:51 +0400 Subject: [PATCH 1/4] Capture warnings during collect DAGs --- airflow/models/dagbag.py | 33 ++++++++++++++++---- tests/dags/test_dag_warnings.py | 50 +++++++++++++++++++++++++++++++ tests/dags/test_dag_warnings.zip | Bin 0 -> 983 bytes tests/models/test_dagbag.py | 49 +++++++++++++++++++++++++++++- 4 files changed, 126 insertions(+), 6 deletions(-) create mode 100644 tests/dags/test_dag_warnings.py create mode 100644 tests/dags/test_dag_warnings.zip diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 1902c985be771..d13866393904c 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -67,13 +67,23 @@ class FileLoadStat(NamedTuple): - """Information about single file.""" + """ + Information about single file. + + :param file: Loaded file. + :param duration: Time spent on process file. + :param dag_num: Total number of DAGs loaded in this file. + :param task_num: Total number of Tasks loaded in this file. + :param dags: DAGs names loaded in this file. + :param warnings: Total warnings captured during the process file. + """ file: str duration: timedelta dag_num: int task_num: int dags: str + warnings: int class DagBag(LoggingMixin): @@ -139,6 +149,7 @@ def __init__( # the file's last modified timestamp when we last read it self.file_last_changed: dict[str, datetime] = {} self.import_errors: dict[str, str] = {} + self.captured_warnings: dict[str, tuple[str, ...]] = {} self.has_logged = False self.read_dags_from_db = read_dags_from_db # Only used by read_dags_from_db=True @@ -314,10 +325,21 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): # Ensure we don't pick up anything else we didn't mean to DagContext.autoregistered_dags.clear() - if filepath.endswith(".py") or not zipfile.is_zipfile(filepath): - mods = self._load_modules_from_file(filepath, safe_mode) - else: - mods = self._load_modules_from_zip(filepath, safe_mode) + self.captured_warnings.pop(filepath, None) + with warnings.catch_warnings(record=True) as captured_warnings: + if filepath.endswith(".py") or not zipfile.is_zipfile(filepath): + mods = self._load_modules_from_file(filepath, safe_mode) + else: + mods = self._load_modules_from_zip(filepath, safe_mode) + + if captured_warnings: + formatted_warnings = [] + for msg in captured_warnings: + category = msg.category.__name__ + if (module := msg.category.__module__) != "builtins": + category = f"{module}.{category}" + formatted_warnings.append(f"{msg.filename}:{msg.lineno}: {category}: {msg.message}") + self.captured_warnings[filepath] = tuple(formatted_warnings) found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk) @@ -566,6 +588,7 @@ def collect_dags( dag_num=len(found_dags), task_num=sum(len(dag.tasks) for dag in found_dags), dags=str([dag.dag_id for dag in found_dags]), + warnings=len(self.captured_warnings.get(filepath, [])), ) ) except Exception as e: diff --git a/tests/dags/test_dag_warnings.py b/tests/dags/test_dag_warnings.py new file mode 100644 index 0000000000000..3eb3e4ccf7d8a --- /dev/null +++ b/tests/dags/test_dag_warnings.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import warnings +from datetime import datetime + +from airflow.exceptions import RemovedInAirflow3Warning +from airflow.models.baseoperator import BaseOperator +from airflow.models.dag import DAG + +DAG_ID = "test_dag_warnings" + + +class TestOperator(BaseOperator): + def __init__(self, *, parameter: str | None = None, deprecated_parameter: str | None = None, **kwargs): + super().__init__(**kwargs) + if deprecated_parameter: + warnings.warn("Deprecated Parameter", category=RemovedInAirflow3Warning, stacklevel=2) + parameter = deprecated_parameter + self.parameter = parameter + + def execute(self, context): + return None + + +def some_warning(): + warnings.warn("Some Warning", category=UserWarning, stacklevel=1) + + +with DAG(DAG_ID, start_date=datetime(2024, 1, 1), schedule=None): + TestOperator(task_id="test-task", parameter="foo") + TestOperator(task_id="test-task-deprecated", deprecated_parameter="bar") + +some_warning() diff --git a/tests/dags/test_dag_warnings.zip b/tests/dags/test_dag_warnings.zip new file mode 100644 index 0000000000000000000000000000000000000000..eb35a415ffa0839c89eb71e14a94155f56d49f72 GIT binary patch literal 983 zcmWIWW@Zs#U}E54_*OkJVoCCmBfiWG4D9R-3?d8+3?-?>CGjbV>G9=>MR}Qd>BV{l zmAxUc`HS5|YM+P4Ocb|PN-=*MtYu$u-LrewrqvHjZmFBJam%qvSQ*DX3azg>{~>Ym zwY6E1+a%BJ{6436{mw=DMo(`QxP?u;Ypg3h@sW1e&fM0ts*rXo;jmS+(oX*<^xKi* zyDIH{*OX;mj8j(K%GzliAv2}cQK&U--HvT)DqS;ozj}Sv?AITMojS6%lb0=DSgZY3 zP9Sqd=&z|oAwrRAvB`mx))WSm8XdSC!!h^dE{Dg@R-|s+zU0=dtv64z-Q9N1{f~v0 zVeQ?8nl*A)6E_$Z+OP;-c)#NDn|rPD8Egy|{|YY8|9@qocHxq!NGl1u>uPo0J2zh7 ziVO|*3S8c2V_{v|aA3VN@7B#s1s8uRO!ZGxSyRqxb@ASJoTlGrCM&A5AG*UJ)T_y1wa&v$V+m_5F; ze6wY`X;$-7O@>TPbBTJ%V3FTnG9}#$9rQ1Ln|}85w76;h{-?j1Rdr;3cA0PfI&@2~ zS$2%=rJ}#0N8%TM)lzT^$~cnWTgImlyi3M%cFu(r2XsB`WiA(J70)~r6)WR%Fl0sk zn(XcJysy2a%>9^KCCwI1+7_+M{LH?lb>8&s(7;Jg*sCwhQSocMB$jffTA}K&WKof? z&-sJ%Th^|rNa?xm?5S89x616z^d-V34D)w1-P2h$-S$cQ^nLHDqD@Qp{`A}Y;t zjrV`wFk?z#-fg{4K{WGu^!t&cL^9w$dD(pZO2I ze$@^UO?&Y)?pedX&6O3eDi6(3Fxr3TAxpvfi@7iA&iuLB>9D_-#a^L2NTW99&+Qhb z6Jjs3Pu|}VbLZ^+trn$i^SztS%@c^WaGkHXyys_c@Slrq9d|EJyZ=T-{OpZuyH4|N zF6LRRCD(2xJ)yo>mf5L!^2*!khks2v*k63*om|?D`H}mbWgT{lZrUSZURU=0;k)Av zc8_e?8Ti+{3HWqqrSz0Nx3#s;WGiiU-QHvV?$rD@7Y?1D`sw-q_Ojx0H}~&J=dM>b w>8gDH_~SpO0B=SnduH4j5SRzSU`Zp0!j>Hayjj^ma*ROe1f=VM*_VL<0IdhaSO5S3 literal 0 HcmV?d00001 diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 95baefff2c11a..af0de68a5e38c 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -22,6 +22,7 @@ import pathlib import sys import textwrap +import warnings import zipfile from copy import deepcopy from datetime import datetime, timedelta, timezone @@ -35,7 +36,7 @@ import airflow.example_dags from airflow import settings -from airflow.exceptions import SerializationError +from airflow.exceptions import RemovedInAirflow3Warning, SerializationError from airflow.models.dag import DAG, DagModel from airflow.models.dagbag import DagBag from airflow.models.serialized_dag import SerializedDagModel @@ -1145,3 +1146,49 @@ def test_dagbag_dag_collection(self): # test that dagbag.dags is not empty if collect_dags is True dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False) assert dagbag.dags + + @pytest.mark.filterwarnings("default::airflow.exceptions.RemovedInAirflow3Warning") + def test_dabgag_captured_warnings(self): + dag_file = os.path.join(TEST_DAGS_FOLDER, "test_dag_warnings.py") + dagbag = DagBag(dag_folder=dag_file, include_examples=False, collect_dags=False) + assert dag_file not in dagbag.captured_warnings + + dagbag.collect_dags(dag_folder=dagbag.dag_folder, include_examples=False, only_if_updated=False) + assert len(dagbag.dag_ids) == 1 + assert dag_file in dagbag.captured_warnings + captured_warnings = dagbag.captured_warnings[dag_file] + assert len(captured_warnings) == 2 + assert dagbag.dagbag_stats[0].warnings == 2 + + assert captured_warnings[0] == ( + f"{dag_file}:48: airflow.exceptions.RemovedInAirflow3Warning: Deprecated Parameter" + ) + assert captured_warnings[1] == f"{dag_file}:50: UserWarning: Some Warning" + + with warnings.catch_warnings(): + # Disable capture RemovedInAirflow3Warning, and it should be reflected in captured warnings + warnings.simplefilter("ignore", RemovedInAirflow3Warning) + dagbag.collect_dags(dag_folder=dagbag.dag_folder, include_examples=False, only_if_updated=False) + assert dag_file in dagbag.captured_warnings + assert len(dagbag.captured_warnings[dag_file]) == 1 + assert dagbag.dagbag_stats[0].warnings == 1 + + # Disable all warnings, no captured warnings expected + warnings.simplefilter("ignore") + dagbag.collect_dags(dag_folder=dagbag.dag_folder, include_examples=False, only_if_updated=False) + assert dag_file not in dagbag.captured_warnings + assert dagbag.dagbag_stats[0].warnings == 0 + + @pytest.mark.filterwarnings("default::airflow.exceptions.RemovedInAirflow3Warning") + def test_dabgag_captured_warnings_zip(self): + dag_file = os.path.join(TEST_DAGS_FOLDER, "test_dag_warnings.zip") + in_zip_dag_file = f"{dag_file}/test_dag_warnings.py" + dagbag = DagBag(dag_folder=dag_file, include_examples=False) + assert len(dagbag.dag_ids) == 1 + assert dag_file in dagbag.captured_warnings + captured_warnings = dagbag.captured_warnings[dag_file] + assert len(captured_warnings) == 2 + assert captured_warnings[0] == ( + f"{in_zip_dag_file}:48: airflow.exceptions.RemovedInAirflow3Warning: Deprecated Parameter" + ) + assert captured_warnings[1] == f"{in_zip_dag_file}:50: UserWarning: Some Warning" From c0267927dbcbe045d9be0db4575487a9ca3546c1 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Thu, 18 Apr 2024 16:11:07 +0400 Subject: [PATCH 2/4] Reraise captured warnings --- airflow/models/dagbag.py | 3 +- airflow/utils/warnings.py | 40 ++++++++++++++++++ tests/utils/test_warnings.py | 79 ++++++++++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 airflow/utils/warnings.py create mode 100644 tests/utils/test_warnings.py diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index d13866393904c..b692f4c8e95ef 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -58,6 +58,7 @@ from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.timeout import timeout from airflow.utils.types import NOTSET +from airflow.utils.warnings import capture_with_reraise if TYPE_CHECKING: from sqlalchemy.orm import Session @@ -326,7 +327,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): DagContext.autoregistered_dags.clear() self.captured_warnings.pop(filepath, None) - with warnings.catch_warnings(record=True) as captured_warnings: + with capture_with_reraise() as captured_warnings: if filepath.endswith(".py") or not zipfile.is_zipfile(filepath): mods = self._load_modules_from_file(filepath, safe_mode) else: diff --git a/airflow/utils/warnings.py b/airflow/utils/warnings.py new file mode 100644 index 0000000000000..bcff4c06fa286 --- /dev/null +++ b/airflow/utils/warnings.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import warnings +from collections.abc import Generator +from contextlib import contextmanager + + +@contextmanager +def capture_with_reraise() -> Generator[list[warnings.WarningMessage], None, None]: + """Capture warnings in context and re-raise it on exit from the context manager.""" + captured_warnings = [] + try: + with warnings.catch_warnings(record=True) as captured_warnings: + yield captured_warnings + finally: + if captured_warnings: + for cw in captured_warnings: + warnings.warn_explicit( + message=cw.message, + category=cw.category, + filename=cw.filename, + lineno=cw.lineno, + source=cw.source, + ) diff --git a/tests/utils/test_warnings.py b/tests/utils/test_warnings.py new file mode 100644 index 0000000000000..666e565d796e2 --- /dev/null +++ b/tests/utils/test_warnings.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import warnings + +import pytest + +from airflow.utils.warnings import capture_with_reraise + + +class TestCaptureWithReraise: + @staticmethod + def raise_warnings(): + warnings.warn("Foo", UserWarning, stacklevel=2) + warnings.warn("Bar", UserWarning, stacklevel=2) + warnings.warn("Baz", UserWarning, stacklevel=2) + + def test_capture_no_warnings(self): + with warnings.catch_warnings(): + warnings.simplefilter("error") + with capture_with_reraise() as cw: + pass + assert cw == [] + + def test_capture_warnings(self): + with pytest.warns(UserWarning, match="(Foo|Bar|Baz)") as ctx: + with capture_with_reraise() as cw: + self.raise_warnings() + assert len(cw) == 3 + assert len(ctx.list) == 3 + + def test_capture_warnings_with_parent_error_filter(self): + with warnings.catch_warnings(record=True) as records: + warnings.filterwarnings("error", message="Bar") + with capture_with_reraise() as cw: + with pytest.raises(UserWarning, match="Bar"): + self.raise_warnings() + assert len(cw) == 1 + assert len(records) == 1 + + def test_capture_warnings_with_parent_ignore_filter(self): + with warnings.catch_warnings(record=True) as records: + warnings.filterwarnings("ignore", message="Baz") + with capture_with_reraise() as cw: + self.raise_warnings() + assert len(cw) == 2 + assert len(records) == 2 + + def test_capture_warnings_with_filters(self): + with warnings.catch_warnings(record=True) as records: + with capture_with_reraise() as cw: + warnings.filterwarnings("ignore", message="Foo") + self.raise_warnings() + assert len(cw) == 2 + assert len(records) == 2 + + def test_capture_warnings_with_error_filters(self): + with warnings.catch_warnings(record=True) as records: + with capture_with_reraise() as cw: + warnings.filterwarnings("error", message="Bar") + with pytest.raises(UserWarning, match="Bar"): + self.raise_warnings() + assert len(cw) == 1 + assert len(records) == 1 From 8d4d85df3fca5f5fb4ea7e557574f6bf18a25c38 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Fri, 19 Apr 2024 12:36:54 +0400 Subject: [PATCH 3/4] Update airflow/models/dagbag.py Co-authored-by: Tzu-ping Chung --- airflow/models/dagbag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index b692f4c8e95ef..82155d19edff9 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -76,7 +76,7 @@ class FileLoadStat(NamedTuple): :param dag_num: Total number of DAGs loaded in this file. :param task_num: Total number of Tasks loaded in this file. :param dags: DAGs names loaded in this file. - :param warnings: Total warnings captured during the process file. + :param warnings: Total number of warnings captured from processing this file. """ file: str From 7332527787e3e543034507b2f337298dec8eeeab Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Fri, 19 Apr 2024 12:40:56 +0400 Subject: [PATCH 4/4] warnings -> warning_num in FileLoadStat --- airflow/models/dagbag.py | 6 +++--- tests/models/test_dagbag.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 82155d19edff9..1afea71c00167 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -76,7 +76,7 @@ class FileLoadStat(NamedTuple): :param dag_num: Total number of DAGs loaded in this file. :param task_num: Total number of Tasks loaded in this file. :param dags: DAGs names loaded in this file. - :param warnings: Total number of warnings captured from processing this file. + :param warning_num: Total number of warnings captured from processing this file. """ file: str @@ -84,7 +84,7 @@ class FileLoadStat(NamedTuple): dag_num: int task_num: int dags: str - warnings: int + warning_num: int class DagBag(LoggingMixin): @@ -589,7 +589,7 @@ def collect_dags( dag_num=len(found_dags), task_num=sum(len(dag.tasks) for dag in found_dags), dags=str([dag.dag_id for dag in found_dags]), - warnings=len(self.captured_warnings.get(filepath, [])), + warning_num=len(self.captured_warnings.get(filepath, [])), ) ) except Exception as e: diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index af0de68a5e38c..3623aa38a554e 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -1158,7 +1158,7 @@ def test_dabgag_captured_warnings(self): assert dag_file in dagbag.captured_warnings captured_warnings = dagbag.captured_warnings[dag_file] assert len(captured_warnings) == 2 - assert dagbag.dagbag_stats[0].warnings == 2 + assert dagbag.dagbag_stats[0].warning_num == 2 assert captured_warnings[0] == ( f"{dag_file}:48: airflow.exceptions.RemovedInAirflow3Warning: Deprecated Parameter" @@ -1171,13 +1171,13 @@ def test_dabgag_captured_warnings(self): dagbag.collect_dags(dag_folder=dagbag.dag_folder, include_examples=False, only_if_updated=False) assert dag_file in dagbag.captured_warnings assert len(dagbag.captured_warnings[dag_file]) == 1 - assert dagbag.dagbag_stats[0].warnings == 1 + assert dagbag.dagbag_stats[0].warning_num == 1 # Disable all warnings, no captured warnings expected warnings.simplefilter("ignore") dagbag.collect_dags(dag_folder=dagbag.dag_folder, include_examples=False, only_if_updated=False) assert dag_file not in dagbag.captured_warnings - assert dagbag.dagbag_stats[0].warnings == 0 + assert dagbag.dagbag_stats[0].warning_num == 0 @pytest.mark.filterwarnings("default::airflow.exceptions.RemovedInAirflow3Warning") def test_dabgag_captured_warnings_zip(self):