From 549feaaf8ea0ed2c83eebbc8d115b676fd93181a Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Fri, 9 Apr 2021 11:53:09 -0600 Subject: [PATCH 01/13] WIP: Sync DAG specific permissions when parsing --- airflow/models/serialized_dag.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 0184307aca6dd..ac6ba1cc0439a 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -37,10 +37,25 @@ from airflow.utils import timezone from airflow.utils.session import provide_session from airflow.utils.sqlalchemy import UtcDateTime +from airflow.www.security import AirflowSecurityManager log = logging.getLogger(__name__) +class SimpleSecurityManager(AirflowSecurityManager): + """Security Manager that doesn't need the whole flask app""" + + def __init__(self): # pylint: disable=super-init-not-called + self.session = None + + @property + def get_session(self): + return self.session + + +security_manager = SimpleSecurityManager() + + class SerializedDagModel(Base): """A table for serialized DAGs. @@ -134,6 +149,11 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id) session.merge(new_serialized_dag) + + log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id) + security_manager.session = session + security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control) + log.debug("DAG: %s written to the DB", dag.dag_id) @classmethod From 0cd68c96ac4391192dbaf0a82ad4a686eb3e8281 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 8 Apr 2021 16:16:43 -0600 Subject: [PATCH 02/13] cli sync-perm and webserver start skips dag specific perms Fix access_control syncing --- airflow/cli/cli_parser.py | 9 ++- airflow/cli/commands/sync_perm_command.py | 10 +--- airflow/www/security.py | 25 ++++----- tests/cli/commands/test_sync_perm_command.py | 41 +++++--------- tests/www/test_security.py | 59 ++++++++++++++------ 5 files changed, 76 insertions(+), 68 deletions(-) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 87c87fc81a672..f3f0344886d63 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -729,6 +729,11 @@ def _check(value): help="If passed, this command will be successful even if multiple matching alive jobs are found.", ) +# sync-perm +ARG_INCLUDE_DAGS = Arg( + ("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true" +) + ALTERNATIVE_CONN_SPECS_ARGS = [ ARG_CONN_TYPE, ARG_CONN_DESCRIPTION, @@ -1556,9 +1561,9 @@ class GroupCommand(NamedTuple): ), ActionCommand( name='sync-perm', - help="Update permissions for existing roles and DAGs", + help="Update permissions for existing roles and optionally DAGs", func=lazy_load_command('airflow.cli.commands.sync_perm_command.sync_perm'), - args=(), + args=(ARG_INCLUDE_DAGS,), ), ActionCommand( name='rotate-fernet-key', diff --git a/airflow/cli/commands/sync_perm_command.py b/airflow/cli/commands/sync_perm_command.py index e382b89998c45..d957fcbfaaeac 100644 --- a/airflow/cli/commands/sync_perm_command.py +++ b/airflow/cli/commands/sync_perm_command.py @@ -16,7 +16,6 @@ # specific language governing permissions and limitations # under the License. """Sync permission command""" -from airflow.models import DagBag from airflow.utils import cli as cli_utils from airflow.www.app import cached_app @@ -29,9 +28,6 @@ def sync_perm(args): # Add missing permissions for all the Base Views _before_ syncing/creating roles appbuilder.add_permissions(update_perms=True) appbuilder.sm.sync_roles() - print('Updating permission on all DAG views') - dagbag = DagBag(read_dags_from_db=True) - dagbag.collect_dags_from_db() - dags = dagbag.dags.values() - for dag in dags: - appbuilder.sm.sync_perm_for_dag(dag.dag_id, dag.access_control) + if args.include_dags: + print('Updating permission on all DAG views') + appbuilder.sm.create_dag_specific_permissions() diff --git a/airflow/www/security.py b/airflow/www/security.py index 25ff0b9952f98..97f6e0a664635 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -27,9 +27,8 @@ from sqlalchemy import or_ from sqlalchemy.orm import joinedload -from airflow import models from airflow.exceptions import AirflowException -from airflow.models import DagModel +from airflow.models import DagBag, DagModel from airflow.security import permissions from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import provide_session @@ -540,24 +539,25 @@ def _get_all_roles_with_permissions(self) -> Dict[str, Role]: def create_dag_specific_permissions(self) -> None: """ - Creates 'can_read' and 'can_edit' permissions for all active and paused DAGs. + Creates 'can_read' and 'can_edit' permissions for all active and paused DAGs, + along with any `access_control` permissions provided in the DAG. :return: None. """ perms = self.get_all_permissions() - rows = ( - self.get_session.query(models.DagModel.dag_id) - .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) - .all() - ) + dagbag = DagBag(read_dags_from_db=True) + dagbag.collect_dags_from_db() + dags = dagbag.dags.values() - for row in rows: - dag_id = row[0] + for dag in dags: + dag_resource_name = self.prefixed_dag_id(dag.dag_id) for perm_name in self.DAG_PERMS: - dag_resource_name = self.prefixed_dag_id(dag_id) if (perm_name, dag_resource_name) not in perms: self._merge_perm(perm_name, dag_resource_name) + if dag.access_control: + self._sync_dag_view_permissions(dag_resource_name, dag.access_control) + def update_admin_perm_view(self): """ Admin should have all the permission-views, except the dag views. @@ -595,7 +595,6 @@ def sync_roles(self): """ # Create global all-dag VM self.create_perm_vm_for_all_dag() - self.create_dag_specific_permissions() # Sync the default roles (Admin, Viewer, User, Op, public) with related permissions self.bulk_sync_roles(self.ROLE_CONFIGS) @@ -617,7 +616,7 @@ def sync_resource_permissions(self, perms=None): def sync_perm_for_dag(self, dag_id, access_control=None): """ Sync permissions for given dag id. The dag id surely exists in our dag bag - as only / refresh button or cli.sync_perm will call this function + as only / refresh button or SerializedDagModel will call this function :param dag_id: the ID of the DAG whose permissions should be updated :type dag_id: str diff --git a/tests/cli/commands/test_sync_perm_command.py b/tests/cli/commands/test_sync_perm_command.py index 5f7f86e477cf7..9fff6d686fa19 100644 --- a/tests/cli/commands/test_sync_perm_command.py +++ b/tests/cli/commands/test_sync_perm_command.py @@ -21,48 +21,33 @@ from airflow.cli import cli_parser from airflow.cli.commands import sync_perm_command -from airflow.models.dag import DAG -from airflow.models.dagbag import DagBag -from airflow.security import permissions class TestCliSyncPerm(unittest.TestCase): @classmethod def setUpClass(cls): - cls.dagbag = DagBag(include_examples=True) cls.parser = cli_parser.get_parser() @mock.patch("airflow.cli.commands.sync_perm_command.cached_app") - @mock.patch("airflow.cli.commands.sync_perm_command.DagBag") - def test_cli_sync_perm(self, dagbag_mock, mock_cached_app): - dags = [ - DAG('has_access_control', access_control={'Public': {permissions.ACTION_CAN_READ}}), - DAG('no_access_control'), - ] + def test_cli_sync_perm(self, mock_cached_app): + appbuilder = mock_cached_app.return_value.appbuilder + appbuilder.sm = mock.Mock() - collect_dags_from_db_mock = mock.Mock() - dagbag = mock.Mock() + args = self.parser.parse_args(['sync-perm']) + sync_perm_command.sync_perm(args) - dagbag.dags = {dag.dag_id: dag for dag in dags} - dagbag.collect_dags_from_db = collect_dags_from_db_mock - dagbag_mock.return_value = dagbag + appbuilder.add_permissions.assert_called_once_with(update_perms=True) + appbuilder.sm.sync_roles.assert_called_once_with() + appbuilder.sm.create_dag_specific_permissions.assert_not_called() + @mock.patch("airflow.cli.commands.sync_perm_command.cached_app") + def test_cli_sync_perm_include_dags(self, mock_cached_app): appbuilder = mock_cached_app.return_value.appbuilder appbuilder.sm = mock.Mock() - args = self.parser.parse_args(['sync-perm']) + args = self.parser.parse_args(['sync-perm', '--include-dags']) sync_perm_command.sync_perm(args) - assert appbuilder.sm.sync_roles.call_count == 1 - - dagbag_mock.assert_called_once_with(read_dags_from_db=True) - collect_dags_from_db_mock.assert_called_once_with() - assert 2 == len(appbuilder.sm.sync_perm_for_dag.mock_calls) - appbuilder.sm.sync_perm_for_dag.assert_any_call( - 'has_access_control', {'Public': {permissions.ACTION_CAN_READ}} - ) - appbuilder.sm.sync_perm_for_dag.assert_any_call( - 'no_access_control', - None, - ) appbuilder.add_permissions.assert_called_once_with(update_perms=True) + appbuilder.sm.sync_roles.assert_called_once_with() + appbuilder.sm.create_dag_specific_permissions.assert_called_once_with() diff --git a/tests/www/test_security.py b/tests/www/test_security.py index 8a85d9671725c..87ce7635dcd58 100644 --- a/tests/www/test_security.py +++ b/tests/www/test_security.py @@ -29,6 +29,7 @@ from airflow import settings from airflow.exceptions import AirflowException from airflow.models import DagModel +from airflow.models.dag import DAG from airflow.security import permissions from airflow.www import app as application from airflow.www.utils import CustomSQLAInterface @@ -431,7 +432,7 @@ def test_has_all_dag_access(self, mock_has_role, mock_has_perm): def test_access_control_with_non_existent_role(self): with pytest.raises(AirflowException) as ctx: - self.security_manager.sync_perm_for_dag( + self.security_manager._sync_dag_view_permissions( dag_id='access-control-test', access_control={ 'this-role-does-not-exist': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ] @@ -473,7 +474,7 @@ def test_access_control_with_invalid_permission(self): for permission in invalid_permissions: self.expect_user_is_in_role(user, rolename='team-a') with pytest.raises(AirflowException) as ctx: - self.security_manager.sync_perm_for_dag( + self.security_manager._sync_dag_view_permissions( 'access_control_test', access_control={'team-a': {permission}} ) assert "invalid permissions" in str(ctx.value) @@ -489,7 +490,7 @@ def test_access_control_is_set_on_init(self): permissions=[], ) self.expect_user_is_in_role(user, rolename='team-a') - self.security_manager.sync_perm_for_dag( + self.security_manager._sync_dag_view_permissions( 'access_control_test', access_control={'team-a': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]}, ) @@ -517,12 +518,12 @@ def test_access_control_stale_perms_are_revoked(self): permissions=[], ) self.expect_user_is_in_role(user, rolename='team-a') - self.security_manager.sync_perm_for_dag( + self.security_manager._sync_dag_view_permissions( 'access_control_test', access_control={'team-a': READ_WRITE} ) self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id='access_control_test', user=user) - self.security_manager.sync_perm_for_dag( + self.security_manager._sync_dag_view_permissions( 'access_control_test', access_control={'team-a': READ_ONLY} ) self.assert_user_has_dag_perms( @@ -562,24 +563,46 @@ def test_correct_roles_have_perms_to_read_config(self): f"on {permissions.RESOURCE_CONFIG}" ) - def test_create_dag_specific_permissions(self): - dag_id = 'some_dag_id' - dag_permission_name = self.security_manager.prefixed_dag_id(dag_id) - assert ('can_read', dag_permission_name) not in self.security_manager.get_all_permissions() + @mock.patch("airflow.www.security.DagBag") + def test_create_dag_specific_permissions(self, dagbag_mock): + access_control = {'Public': {permissions.ACTION_CAN_READ}} + dags = [ + DAG('has_access_control', access_control=access_control), + DAG('no_access_control'), + ] - dag_model = DagModel( - dag_id=dag_id, fileloc='/tmp/dag_.py', schedule_interval='2 2 * * *', is_paused=True - ) - self.session.add(dag_model) - self.session.commit() + collect_dags_from_db_mock = mock.Mock() + dagbag = mock.Mock() + + dagbag.dags = {dag.dag_id: dag for dag in dags} + dagbag.collect_dags_from_db = collect_dags_from_db_mock + dagbag_mock.return_value = dagbag + + self.security_manager._sync_dag_view_permissions = mock.Mock() + + for dag in dags: + prefixed_dag_id = self.security_manager.prefixed_dag_id(dag.dag_id) + all_perms = self.security_manager.get_all_permissions() + assert ('can_read', prefixed_dag_id) not in all_perms + assert ('can_edit', prefixed_dag_id) not in all_perms self.security_manager.create_dag_specific_permissions() - self.session.commit() - assert ('can_read', dag_permission_name) in self.security_manager.get_all_permissions() + dagbag_mock.assert_called_once_with(read_dags_from_db=True) + collect_dags_from_db_mock.assert_called_once_with() + + for dag in dags: + prefixed_dag_id = self.security_manager.prefixed_dag_id(dag.dag_id) + all_perms = self.security_manager.get_all_permissions() + assert ('can_read', prefixed_dag_id) in all_perms + assert ('can_edit', prefixed_dag_id) in all_perms + + self.security_manager._sync_dag_view_permissions.assert_called_once_with( + self.security_manager.prefixed_dag_id('has_access_control'), access_control + ) - # Make sure we short circuit when the perms already exist - with assert_queries_count(2): # One query to get DagModels, one query to get all perms + del dagbag.dags["has_access_control"] + with assert_queries_count(1): # one query to get all perms; dagbag is mocked self.security_manager.create_dag_specific_permissions() def test_get_all_permissions(self): From a2cbb06c2190acd1c6115309bf5f9abb9b30eb31 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Mon, 12 Apr 2021 14:01:18 -0600 Subject: [PATCH 03/13] better test coverage --- airflow/www/security.py | 4 +--- tests/models/test_serialized_dag.py | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/airflow/www/security.py b/airflow/www/security.py index 97f6e0a664635..2e82738022ea5 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -628,9 +628,7 @@ def sync_perm_for_dag(self, dag_id, access_control=None): """ prefixed_dag_id = self.prefixed_dag_id(dag_id) for dag_perm in self.DAG_PERMS: - perm_on_dag = self.find_permission_view_menu(dag_perm, prefixed_dag_id) - if perm_on_dag is None: - self.add_permission_view_menu(dag_perm, prefixed_dag_id) + self.add_permission_view_menu(dag_perm, prefixed_dag_id) if access_control: self._sync_dag_view_permissions(prefixed_dag_id, access_control) diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index 72380c55d153d..acd8a3b952d7e 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -19,6 +19,7 @@ """Unit tests for SerializedDagModel.""" import unittest +from unittest import mock from airflow import DAG, example_dags as example_dags_module from airflow.models import DagBag @@ -60,7 +61,8 @@ def _write_example_dags(self): SDM.write_dag(dag) return example_dags - def test_write_dag(self): + @mock.patch('airflow.models.serialized_dag.security_manager') + def test_write_dag(self, mock_security_manager): """DAGs can be written into database.""" example_dags = self._write_example_dags() @@ -73,12 +75,16 @@ def test_write_dag(self): # Verifies JSON schema. SerializedDAG.validate_schema(result.data) - def test_serialized_dag_is_updated_only_if_dag_is_changed(self): + mock_security_manager.sync_perm_for_dag.assert_any_call(dag.dag_id, dag.access_control) + + @mock.patch('airflow.models.serialized_dag.security_manager') + def test_serialized_dag_is_updated_only_if_dag_is_changed(self, mock_security_manager): """Test Serialized DAG is updated if DAG is changed""" example_dags = make_example_dags(example_dags_module) example_bash_op_dag = example_dags.get("example_bash_operator") SDM.write_dag(dag=example_bash_op_dag) + mock_security_manager.reset_mock() with create_session() as session: s_dag = session.query(SDM).get(example_bash_op_dag.dag_id) @@ -90,8 +96,10 @@ def test_serialized_dag_is_updated_only_if_dag_is_changed(self): assert s_dag_1.dag_hash == s_dag.dag_hash assert s_dag.last_updated == s_dag_1.last_updated + mock_security_manager.sync_perm_for_dag.assert_not_called() # Update DAG + mock_security_manager.reset_mock() example_bash_op_dag.tags += ["new_tag"] assert set(example_bash_op_dag.tags) == {"example", "example2", "new_tag"} @@ -101,6 +109,9 @@ def test_serialized_dag_is_updated_only_if_dag_is_changed(self): assert s_dag.last_updated != s_dag_2.last_updated assert s_dag.dag_hash != s_dag_2.dag_hash assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"] + mock_security_manager.sync_perm_for_dag.assert_any_call( + example_bash_op_dag.dag_id, example_bash_op_dag.access_control + ) def test_read_dags(self): """DAGs can be read from database.""" @@ -144,5 +155,5 @@ def test_bulk_sync_to_db(self): DAG("dag_2"), DAG("dag_3"), ] - with assert_queries_count(10): + with assert_queries_count(48): SDM.bulk_sync_to_db(dags) From 7fd507a86ae820c031581bfbfbd32419381523a4 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Tue, 13 Apr 2021 12:23:32 -0600 Subject: [PATCH 04/13] Move simple security manager class to www --- airflow/models/serialized_dag.py | 16 ++-------------- airflow/www/security.py | 11 +++++++++++ 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index ac6ba1cc0439a..4500b21e638bb 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -37,23 +37,11 @@ from airflow.utils import timezone from airflow.utils.session import provide_session from airflow.utils.sqlalchemy import UtcDateTime -from airflow.www.security import AirflowSecurityManager +from airflow.www.security import SimpleAirflowSecurityManager log = logging.getLogger(__name__) - -class SimpleSecurityManager(AirflowSecurityManager): - """Security Manager that doesn't need the whole flask app""" - - def __init__(self): # pylint: disable=super-init-not-called - self.session = None - - @property - def get_session(self): - return self.session - - -security_manager = SimpleSecurityManager() +security_manager = SimpleAirflowSecurityManager() class SerializedDagModel(Base): diff --git a/airflow/www/security.py b/airflow/www/security.py index 2e82738022ea5..4b0d571adefa0 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -725,3 +725,14 @@ def check_authorization( return False return True + + +class SimpleAirflowSecurityManager(AirflowSecurityManager): + """Security Manager that doesn't need the whole flask app""" + + def __init__(self): # pylint: disable=super-init-not-called + self.session = None + + @property + def get_session(self): + return self.session From 650a37c35b120724ad9ecfc368e6f68e9a041b02 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Wed, 14 Apr 2021 13:40:01 -0600 Subject: [PATCH 05/13] PR feedback --- airflow/models/serialized_dag.py | 7 +++---- airflow/www/security.py | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 4500b21e638bb..28923697d1df5 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -37,12 +37,9 @@ from airflow.utils import timezone from airflow.utils.session import provide_session from airflow.utils.sqlalchemy import UtcDateTime -from airflow.www.security import SimpleAirflowSecurityManager log = logging.getLogger(__name__) -security_manager = SimpleAirflowSecurityManager() - class SerializedDagModel(Base): """A table for serialized DAGs. @@ -139,7 +136,9 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: session.merge(new_serialized_dag) log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id) - security_manager.session = session + from airflow.www.security import SimpleAirflowSecurityManager + + security_manager = SimpleAirflowSecurityManager(session=session) security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control) log.debug("DAG: %s written to the DB", dag.dag_id) diff --git a/airflow/www/security.py b/airflow/www/security.py index 4b0d571adefa0..dbeba2215b1ce 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -730,8 +730,8 @@ def check_authorization( class SimpleAirflowSecurityManager(AirflowSecurityManager): """Security Manager that doesn't need the whole flask app""" - def __init__(self): # pylint: disable=super-init-not-called - self.session = None + def __init__(self, session=None): # pylint: disable=super-init-not-called + self.session = session @property def get_session(self): From e4cb59cde41dd8598b4e08920c9bc6d22f38e12f Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Wed, 14 Apr 2021 14:52:31 -0600 Subject: [PATCH 06/13] Add entry to UPDATING --- UPDATING.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/UPDATING.md b/UPDATING.md index bbacbe05202af..d1fa7a828e9ff 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -95,6 +95,12 @@ The `default_queue` configuration option has been moved from `[celery]` section This allows Airflow to work more reliably with some environments (like Azure) by default. +### `sync-perm` CLI no longer syncs DAG specific permissions by default + +The `sync-perm` CLI command will no longer sync DAG specific permissions by default as they are now being handled during +DAG parsing. If you need or want the old behavoir, you can pass `--include-dags` to have `sync-perm` also sync DAG +specific permissions. + ## Airflow 2.0.1 ### Permission to view Airflow Configurations has been removed from `User` and `Viewer` role From 78a12a14224ac885942dc2e658945f09603b7e4f Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Wed, 14 Apr 2021 14:58:17 -0600 Subject: [PATCH 07/13] PR feedback --- airflow/www/security.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow/www/security.py b/airflow/www/security.py index dbeba2215b1ce..a1c35a66daf12 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -539,8 +539,11 @@ def _get_all_roles_with_permissions(self) -> Dict[str, Role]: def create_dag_specific_permissions(self) -> None: """ - Creates 'can_read' and 'can_edit' permissions for all active and paused DAGs, - along with any `access_control` permissions provided in the DAG. + Creates 'can_read' and 'can_edit' permissions for all DAGs, + along with any `access_control` permissions provided in them. + + This does iterate through ALL the DAGs, which can be slow. See `sync_perm_for_dag` + if you only need to sync a single DAG. :return: None. """ From 57dcfce335709569116729cc5b213898009e60fc Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 15 Apr 2021 09:21:10 -0600 Subject: [PATCH 08/13] Fix tests --- tests/models/test_serialized_dag.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index acd8a3b952d7e..f66ee798c1eb4 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -61,7 +61,7 @@ def _write_example_dags(self): SDM.write_dag(dag) return example_dags - @mock.patch('airflow.models.serialized_dag.security_manager') + @mock.patch('airflow.www.security.SimpleAirflowSecurityManager') def test_write_dag(self, mock_security_manager): """DAGs can be written into database.""" example_dags = self._write_example_dags() @@ -75,9 +75,11 @@ def test_write_dag(self, mock_security_manager): # Verifies JSON schema. SerializedDAG.validate_schema(result.data) - mock_security_manager.sync_perm_for_dag.assert_any_call(dag.dag_id, dag.access_control) + mock_security_manager.return_value.sync_perm_for_dag.assert_any_call( + dag.dag_id, dag.access_control + ) - @mock.patch('airflow.models.serialized_dag.security_manager') + @mock.patch('airflow.www.security.SimpleAirflowSecurityManager') def test_serialized_dag_is_updated_only_if_dag_is_changed(self, mock_security_manager): """Test Serialized DAG is updated if DAG is changed""" @@ -96,7 +98,7 @@ def test_serialized_dag_is_updated_only_if_dag_is_changed(self, mock_security_ma assert s_dag_1.dag_hash == s_dag.dag_hash assert s_dag.last_updated == s_dag_1.last_updated - mock_security_manager.sync_perm_for_dag.assert_not_called() + mock_security_manager.return_value.sync_perm_for_dag.assert_not_called() # Update DAG mock_security_manager.reset_mock() @@ -109,7 +111,7 @@ def test_serialized_dag_is_updated_only_if_dag_is_changed(self, mock_security_ma assert s_dag.last_updated != s_dag_2.last_updated assert s_dag.dag_hash != s_dag_2.dag_hash assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"] - mock_security_manager.sync_perm_for_dag.assert_any_call( + mock_security_manager.return_value.sync_perm_for_dag.assert_any_call( example_bash_op_dag.dag_id, example_bash_op_dag.access_control ) From b6bba033d2d4d0ebea51df7cf03bdfbda5374f6c Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 15 Apr 2021 13:00:13 -0600 Subject: [PATCH 09/13] Better tests --- tests/models/test_serialized_dag.py | 8 +++++ tests/test_utils/security_manager.py | 52 ++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 tests/test_utils/security_manager.py diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index f66ee798c1eb4..10295384b4551 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -28,6 +28,7 @@ from airflow.serialization.serialized_objects import SerializedDAG from airflow.utils.session import create_session from tests.test_utils.asserts import assert_queries_count +from tests.test_utils.security_manager import delete_dag_specific_permissions # To move it to a shared module. @@ -45,6 +46,10 @@ def clear_db_serialized_dags(): class SerializedDagModelTest(unittest.TestCase): """Unit tests for SerializedDagModel.""" + @classmethod + def setUpClass(cls): + delete_dag_specific_permissions() + def setUp(self): clear_db_serialized_dags() @@ -159,3 +164,6 @@ def test_bulk_sync_to_db(self): ] with assert_queries_count(48): SDM.bulk_sync_to_db(dags) + + with assert_queries_count(3): + SDM.bulk_sync_to_db(dags) diff --git a/tests/test_utils/security_manager.py b/tests/test_utils/security_manager.py new file mode 100644 index 0000000000000..1eb6b9dad7891 --- /dev/null +++ b/tests/test_utils/security_manager.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from flask_appbuilder.security.sqla.models import assoc_permissionview_role + +from airflow.security.permissions import RESOURCE_DAG_PREFIX +from airflow.utils.session import create_session +from airflow.www.security import SimpleAirflowSecurityManager + + +def delete_dag_specific_permissions(): + with create_session() as session: + security_manager = SimpleAirflowSecurityManager(session=session) + + dag_vms = ( + session.query(security_manager.viewmenu_model) + .filter(security_manager.viewmenu_model.name.like(f"{RESOURCE_DAG_PREFIX}%")) + .all() + ) + vm_ids = [d.id for d in dag_vms] + + dag_pvms = ( + session.query(security_manager.permissionview_model) + .filter(security_manager.permissionview_model.view_menu_id.in_(vm_ids)) + .all() + ) + pvm_ids = [d.id for d in dag_pvms] + + session.query(assoc_permissionview_role).filter( + assoc_permissionview_role.c.permission_view_id.in_(pvm_ids) + ).delete(synchronize_session=False) + session.query(security_manager.permissionview_model).filter( + security_manager.permissionview_model.view_menu_id.in_(vm_ids) + ).delete(synchronize_session=False) + session.query(security_manager.viewmenu_model).filter( + security_manager.viewmenu_model.id.in_(vm_ids) + ).delete(synchronize_session=False) From 99194364478ab18cdfecb1dc47d79251405f26c8 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 15 Apr 2021 15:26:45 -0600 Subject: [PATCH 10/13] Move dag perm syncing to dagbag --- airflow/models/dagbag.py | 8 ++++++- airflow/models/serialized_dag.py | 16 +++++-------- tests/models/test_dagbag.py | 37 +++++++++++++++++++++++++++++ tests/models/test_serialized_dag.py | 36 +++++++--------------------- 4 files changed, 59 insertions(+), 38 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 7099e185eef4d..ba7953073f9c1 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -539,11 +539,17 @@ def _serialize_dag_capturing_errors(dag, session): return [] try: # We cant use bulk_write_to_db as we want to capture each error individually - SerializedDagModel.write_dag( + dag_was_updated = SerializedDagModel.write_dag( dag, min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL, session=session, ) + if dag_was_updated: + self.log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id) + from airflow.www.security import SimpleAirflowSecurityManager + + security_manager = SimpleAirflowSecurityManager(session=session) + security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control) return [] except OperationalError: raise diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 28923697d1df5..c2706f492fab8 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -101,7 +101,7 @@ def __repr__(self): @classmethod @provide_session - def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: Session = None): + def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: Session = None) -> bool: """Serializes a DAG and writes it into database. If the record already exists, it checks if the Serialized DAG changed or not. If it is changed, it updates the record, ignores otherwise. @@ -109,6 +109,8 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: :param dag: a DAG to be written into database :param min_update_interval: minimal interval in seconds to update serialized DAG :param session: ORM Session + + :returns: Boolean indicating if the DAG was written to the DB """ # Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval # If Yes, does nothing @@ -122,7 +124,7 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: ) ) ).scalar(): - return + return False log.debug("Checking if DAG (%s) changed", dag.dag_id) new_serialized_dag = cls(dag) @@ -130,18 +132,12 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: if serialized_dag_hash_from_db == new_serialized_dag.dag_hash: log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id) - return + return False log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id) session.merge(new_serialized_dag) - - log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id) - from airflow.www.security import SimpleAirflowSecurityManager - - security_manager = SimpleAirflowSecurityManager(session=session) - security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control) - log.debug("DAG: %s written to the DB", dag.dag_id) + return True @classmethod @provide_session diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 950d99d128ee5..f247d1342a4da 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -687,6 +687,43 @@ def test_sync_to_db_is_retried(self, mock_bulk_write_to_db, mock_s10n_write_dag, ] ) + @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5) + @patch("airflow.www.security.SimpleAirflowSecurityManager") + def test_sync_to_db_handles_dag_specific_permissions(self, mock_security_manager): + """ + Test that when dagbag.sync_to_db is called new DAGs and updates DAGs have their + DAG specific permissions synced + """ + with create_session() as session: + # New DAG + dagbag = DagBag( + dag_folder=os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py"), + include_examples=False, + ) + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)): + dagbag.sync_to_db(session=session) + + mock_security_manager.return_value.sync_perm_for_dag.assert_called_once_with( + "test_example_bash_operator", None + ) + + # DAG is updated + mock_security_manager.reset_mock() + dagbag.dags["test_example_bash_operator"].tags = ["new_tag"] + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 20)): + dagbag.sync_to_db(session=session) + + mock_security_manager.return_value.sync_perm_for_dag.assert_called_once_with( + "test_example_bash_operator", None + ) + + # DAG isn't updated + mock_security_manager.reset_mock() + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 40)): + dagbag.sync_to_db(session=session) + + mock_security_manager.return_value.sync_perm_for_dag.assert_not_called() + @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5) @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5) def test_get_dag_with_dag_serialization(self): diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index 10295384b4551..db8282f943f7b 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -19,7 +19,6 @@ """Unit tests for SerializedDagModel.""" import unittest -from unittest import mock from airflow import DAG, example_dags as example_dags_module from airflow.models import DagBag @@ -28,7 +27,6 @@ from airflow.serialization.serialized_objects import SerializedDAG from airflow.utils.session import create_session from tests.test_utils.asserts import assert_queries_count -from tests.test_utils.security_manager import delete_dag_specific_permissions # To move it to a shared module. @@ -46,10 +44,6 @@ def clear_db_serialized_dags(): class SerializedDagModelTest(unittest.TestCase): """Unit tests for SerializedDagModel.""" - @classmethod - def setUpClass(cls): - delete_dag_specific_permissions() - def setUp(self): clear_db_serialized_dags() @@ -66,8 +60,7 @@ def _write_example_dags(self): SDM.write_dag(dag) return example_dags - @mock.patch('airflow.www.security.SimpleAirflowSecurityManager') - def test_write_dag(self, mock_security_manager): + def test_write_dag(self): """DAGs can be written into database.""" example_dags = self._write_example_dags() @@ -80,45 +73,37 @@ def test_write_dag(self, mock_security_manager): # Verifies JSON schema. SerializedDAG.validate_schema(result.data) - mock_security_manager.return_value.sync_perm_for_dag.assert_any_call( - dag.dag_id, dag.access_control - ) - - @mock.patch('airflow.www.security.SimpleAirflowSecurityManager') - def test_serialized_dag_is_updated_only_if_dag_is_changed(self, mock_security_manager): + def test_serialized_dag_is_updated_only_if_dag_is_changed(self): """Test Serialized DAG is updated if DAG is changed""" example_dags = make_example_dags(example_dags_module) example_bash_op_dag = example_dags.get("example_bash_operator") - SDM.write_dag(dag=example_bash_op_dag) - mock_security_manager.reset_mock() + dag_updated = SDM.write_dag(dag=example_bash_op_dag) + assert dag_updated is True with create_session() as session: s_dag = session.query(SDM).get(example_bash_op_dag.dag_id) # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated # column is not updated - SDM.write_dag(dag=example_bash_op_dag) + dag_updated = SDM.write_dag(dag=example_bash_op_dag) s_dag_1 = session.query(SDM).get(example_bash_op_dag.dag_id) assert s_dag_1.dag_hash == s_dag.dag_hash assert s_dag.last_updated == s_dag_1.last_updated - mock_security_manager.return_value.sync_perm_for_dag.assert_not_called() + assert dag_updated is False # Update DAG - mock_security_manager.reset_mock() example_bash_op_dag.tags += ["new_tag"] assert set(example_bash_op_dag.tags) == {"example", "example2", "new_tag"} - SDM.write_dag(dag=example_bash_op_dag) + dag_updated = SDM.write_dag(dag=example_bash_op_dag) s_dag_2 = session.query(SDM).get(example_bash_op_dag.dag_id) assert s_dag.last_updated != s_dag_2.last_updated assert s_dag.dag_hash != s_dag_2.dag_hash assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"] - mock_security_manager.return_value.sync_perm_for_dag.assert_any_call( - example_bash_op_dag.dag_id, example_bash_op_dag.access_control - ) + assert dag_updated is True def test_read_dags(self): """DAGs can be read from database.""" @@ -162,8 +147,5 @@ def test_bulk_sync_to_db(self): DAG("dag_2"), DAG("dag_3"), ] - with assert_queries_count(48): - SDM.bulk_sync_to_db(dags) - - with assert_queries_count(3): + with assert_queries_count(10): SDM.bulk_sync_to_db(dags) From 50f9266359b2461372bd0e6fd24eb2f7225653e4 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 15 Apr 2021 16:34:42 -0600 Subject: [PATCH 11/13] Rename class --- airflow/models/dagbag.py | 4 ++-- airflow/www/security.py | 2 +- tests/models/test_dagbag.py | 2 +- tests/test_utils/security_manager.py | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index ba7953073f9c1..10662e5ab21c3 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -546,9 +546,9 @@ def _serialize_dag_capturing_errors(dag, session): ) if dag_was_updated: self.log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id) - from airflow.www.security import SimpleAirflowSecurityManager + from airflow.www.security import ApplessAirflowSecurityManager - security_manager = SimpleAirflowSecurityManager(session=session) + security_manager = ApplessAirflowSecurityManager(session=session) security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control) return [] except OperationalError: diff --git a/airflow/www/security.py b/airflow/www/security.py index a1c35a66daf12..39c437a2a2ecf 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -730,7 +730,7 @@ def check_authorization( return True -class SimpleAirflowSecurityManager(AirflowSecurityManager): +class ApplessAirflowSecurityManager(AirflowSecurityManager): """Security Manager that doesn't need the whole flask app""" def __init__(self, session=None): # pylint: disable=super-init-not-called diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index f247d1342a4da..2a0d589aff7ac 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -688,7 +688,7 @@ def test_sync_to_db_is_retried(self, mock_bulk_write_to_db, mock_s10n_write_dag, ) @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5) - @patch("airflow.www.security.SimpleAirflowSecurityManager") + @patch("airflow.www.security.ApplessAirflowSecurityManager") def test_sync_to_db_handles_dag_specific_permissions(self, mock_security_manager): """ Test that when dagbag.sync_to_db is called new DAGs and updates DAGs have their diff --git a/tests/test_utils/security_manager.py b/tests/test_utils/security_manager.py index 1eb6b9dad7891..93ad09275ee6a 100644 --- a/tests/test_utils/security_manager.py +++ b/tests/test_utils/security_manager.py @@ -20,12 +20,12 @@ from airflow.security.permissions import RESOURCE_DAG_PREFIX from airflow.utils.session import create_session -from airflow.www.security import SimpleAirflowSecurityManager +from airflow.www.security import ApplessAirflowSecurityManager def delete_dag_specific_permissions(): with create_session() as session: - security_manager = SimpleAirflowSecurityManager(session=session) + security_manager = ApplessAirflowSecurityManager(session=session) dag_vms = ( session.query(security_manager.viewmenu_model) From a3ac0537f4f5543e6b96571a7f6e5458a64d6b19 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 15 Apr 2021 17:40:16 -0600 Subject: [PATCH 12/13] test cleanup --- tests/test_utils/security_manager.py | 52 ---------------------------- 1 file changed, 52 deletions(-) delete mode 100644 tests/test_utils/security_manager.py diff --git a/tests/test_utils/security_manager.py b/tests/test_utils/security_manager.py deleted file mode 100644 index 93ad09275ee6a..0000000000000 --- a/tests/test_utils/security_manager.py +++ /dev/null @@ -1,52 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from flask_appbuilder.security.sqla.models import assoc_permissionview_role - -from airflow.security.permissions import RESOURCE_DAG_PREFIX -from airflow.utils.session import create_session -from airflow.www.security import ApplessAirflowSecurityManager - - -def delete_dag_specific_permissions(): - with create_session() as session: - security_manager = ApplessAirflowSecurityManager(session=session) - - dag_vms = ( - session.query(security_manager.viewmenu_model) - .filter(security_manager.viewmenu_model.name.like(f"{RESOURCE_DAG_PREFIX}%")) - .all() - ) - vm_ids = [d.id for d in dag_vms] - - dag_pvms = ( - session.query(security_manager.permissionview_model) - .filter(security_manager.permissionview_model.view_menu_id.in_(vm_ids)) - .all() - ) - pvm_ids = [d.id for d in dag_pvms] - - session.query(assoc_permissionview_role).filter( - assoc_permissionview_role.c.permission_view_id.in_(pvm_ids) - ).delete(synchronize_session=False) - session.query(security_manager.permissionview_model).filter( - security_manager.permissionview_model.view_menu_id.in_(vm_ids) - ).delete(synchronize_session=False) - session.query(security_manager.viewmenu_model).filter( - security_manager.viewmenu_model.id.in_(vm_ids) - ).delete(synchronize_session=False) From 1ce6787dda578094311f6d24f84054601d1c5a69 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 15 Apr 2021 17:44:18 -0600 Subject: [PATCH 13/13] PR feedback --- UPDATING.md | 2 +- airflow/www/security.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index d1fa7a828e9ff..5e925b59f63e9 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -98,7 +98,7 @@ This allows Airflow to work more reliably with some environments (like Azure) by ### `sync-perm` CLI no longer syncs DAG specific permissions by default The `sync-perm` CLI command will no longer sync DAG specific permissions by default as they are now being handled during -DAG parsing. If you need or want the old behavoir, you can pass `--include-dags` to have `sync-perm` also sync DAG +DAG parsing. If you need or want the old behavior, you can pass `--include-dags` to have `sync-perm` also sync DAG specific permissions. ## Airflow 2.0.1 diff --git a/airflow/www/security.py b/airflow/www/security.py index 39c437a2a2ecf..dcccf47233ef3 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -619,7 +619,7 @@ def sync_resource_permissions(self, perms=None): def sync_perm_for_dag(self, dag_id, access_control=None): """ Sync permissions for given dag id. The dag id surely exists in our dag bag - as only / refresh button or SerializedDagModel will call this function + as only / refresh button or DagBag will call this function :param dag_id: the ID of the DAG whose permissions should be updated :type dag_id: str