diff --git a/airflow/www/security.py b/airflow/www/security.py index 93f8fd11fcbec..3e61008501fca 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -682,25 +682,14 @@ def sync_perm_for_dag( for dag_action_name in self.DAG_ACTIONS: self.create_permission(dag_action_name, dag_resource_name) - def _revoke_all_stale_permissions(resource: Resource): - existing_dag_perms = self.get_resource_permissions(resource) - for perm in existing_dag_perms: - non_admin_roles = [role for role in perm.role if role.name != "Admin"] - for role in non_admin_roles: - self.log.info( - "Revoking '%s' on DAG '%s' for role '%s'", - perm.action, - dag_resource_name, - role.name, - ) - self.remove_permission_from_role(role, perm) - - if access_control: + if access_control is not None: + self.log.info("Syncing DAG-level permissions for DAG '%s'", dag_resource_name) self._sync_dag_view_permissions(dag_resource_name, access_control) else: - resource = self.get_resource(dag_resource_name) - if resource: - _revoke_all_stale_permissions(resource) + self.log.info( + "Not syncing DAG-level permissions for DAG '%s' as access control is unset.", + dag_resource_name, + ) def _sync_dag_view_permissions(self, dag_id: str, access_control: dict[str, Collection[str]]) -> None: """ diff --git a/docs/apache-airflow/security/access-control.rst b/docs/apache-airflow/security/access-control.rst index 0ac191f6f80e7..7692a54cb5bf9 100644 --- a/docs/apache-airflow/security/access-control.rst +++ b/docs/apache-airflow/security/access-control.rst @@ -229,3 +229,47 @@ List Plugins Plugins.can_read List Task Reschedules Task Reschedules.can_read Admin List Triggers Triggers.can_read Admin ====================================== ======================================================================= ============ + +These DAG-level controls can be set directly through the UI / CLI, or encoded in the dags themselves through the access_control arg. + +Order of precedence for DAG-level permissions +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Since DAG-level access control can be configured in multiple places, conflicts are inevitable and a clear resolution strategy is required. As a result, +Airflow considers the ``access_control`` argument supplied on a DAG itself to be completely authoritative if present, which has a few effects: + +Setting ``access_control`` on a DAG will overwrite any previously existing DAG-level permissions if it is any value other than ``None``: + +.. code-block:: python + + DAG( + dag_id="example_fine_grained_access", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + access_control={ + "Viewer": {"can_edit", "can_create", "can_delete"}, + }, + ) + +This also means that setting ``access_control={}`` will wipe any existing DAG-level permissions for a given DAG from the DB: + +.. code-block:: python + + DAG( + dag_id="example_no_fine_grained_access", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + access_control={}, + ) + +Conversely, removing the access_control block from a DAG altogether (or setting it to ``None``) won't make any changes and can leave dangling permissions. + +.. code-block:: python + + DAG( + dag_id="example_indifferent_to_fine_grained_access", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + ) + +In the case that there is no ``access_control`` defined on the DAG itself, Airflow will defer to existing permissions defined in the DB, which +may have been set through the UI, CLI or by previous access_control args on the DAG in question. + +In all cases, system-wide roles such as ``Can edit on DAG`` take precedence over dag-level access controls, such that they can be considered ``Can edit on DAG: *`` diff --git a/tests/www/test_security.py b/tests/www/test_security.py index 6ec4193913f7f..8dc0ebec26542 100644 --- a/tests/www/test_security.py +++ b/tests/www/test_security.py @@ -537,6 +537,140 @@ def test_sync_perm_for_dag_creates_permissions_on_resources(security_manager): assert security_manager.get_permission(permissions.ACTION_CAN_EDIT, prefixed_test_dag_id) is not None +def test_sync_perm_for_dag_creates_permissions_for_specified_roles(app, security_manager): + test_dag_id = "TEST_DAG" + test_role = "limited-role" + security_manager.bulk_sync_roles([{"role": test_role, "perms": []}]) + with app.app_context(): + with create_user_scope( + app, + username="test_user", + role_name=test_role, + permissions=[], + ) as user: + security_manager.sync_perm_for_dag( + test_dag_id, access_control={test_role: {"can_read", "can_edit"}} + ) + assert security_manager.can_read_dag(test_dag_id, user) + assert security_manager.can_edit_dag(test_dag_id, user) + assert not security_manager.can_delete_dag(test_dag_id, user) + + +def test_sync_perm_for_dag_removes_existing_permissions_if_empty(app, security_manager): + test_dag_id = "TEST_DAG" + test_role = "limited-role" + + with app.app_context(): + with create_user_scope( + app, + username="test_user", + role_name=test_role, + permissions=[], + ) as user: + + security_manager.bulk_sync_roles( + [ + { + "role": test_role, + "perms": [ + (permissions.ACTION_CAN_READ, f"DAG:{test_dag_id}"), + (permissions.ACTION_CAN_EDIT, f"DAG:{test_dag_id}"), + (permissions.ACTION_CAN_DELETE, f"DAG:{test_dag_id}"), + ], + } + ] + ) + + assert security_manager.can_read_dag(test_dag_id, user) + assert security_manager.can_edit_dag(test_dag_id, user) + assert security_manager.can_delete_dag(test_dag_id, user) + + # Need to clear cache on user perms + user._perms = None + + security_manager.sync_perm_for_dag(test_dag_id, access_control={test_role: {}}) + + assert not security_manager.can_read_dag(test_dag_id, user) + assert not security_manager.can_edit_dag(test_dag_id, user) + assert not security_manager.can_delete_dag(test_dag_id, user) + + +def test_sync_perm_for_dag_removes_permissions_from_other_roles(app, security_manager): + test_dag_id = "TEST_DAG" + test_role = "limited-role" + + with app.app_context(): + with create_user_scope( + app, + username="test_user", + role_name=test_role, + permissions=[], + ) as user: + + security_manager.bulk_sync_roles( + [ + { + "role": test_role, + "perms": [ + (permissions.ACTION_CAN_READ, f"DAG:{test_dag_id}"), + (permissions.ACTION_CAN_EDIT, f"DAG:{test_dag_id}"), + (permissions.ACTION_CAN_DELETE, f"DAG:{test_dag_id}"), + ], + }, + {"role": "other_role", "perms": []}, + ] + ) + + assert security_manager.can_read_dag(test_dag_id, user) + assert security_manager.can_edit_dag(test_dag_id, user) + assert security_manager.can_delete_dag(test_dag_id, user) + + # Need to clear cache on user perms + user._perms = None + + security_manager.sync_perm_for_dag(test_dag_id, access_control={"other_role": {"can_read"}}) + + assert not security_manager.can_read_dag(test_dag_id, user) + assert not security_manager.can_edit_dag(test_dag_id, user) + assert not security_manager.can_delete_dag(test_dag_id, user) + + +def test_sync_perm_for_dag_does_not_prune_roles_when_access_control_unset(app, security_manager): + test_dag_id = "TEST_DAG" + test_role = "limited-role" + + with app.app_context(): + with create_user_scope( + app, + username="test_user", + role_name=test_role, + permissions=[], + ) as user: + + security_manager.bulk_sync_roles( + [ + { + "role": test_role, + "perms": [ + (permissions.ACTION_CAN_READ, f"DAG:{test_dag_id}"), + (permissions.ACTION_CAN_EDIT, f"DAG:{test_dag_id}"), + ], + }, + ] + ) + + assert security_manager.can_read_dag(test_dag_id, user) + assert security_manager.can_edit_dag(test_dag_id, user) + + # Need to clear cache on user perms + user._perms = None + + security_manager.sync_perm_for_dag(test_dag_id, access_control=None) + + assert security_manager.can_read_dag(test_dag_id, user) + assert security_manager.can_edit_dag(test_dag_id, user) + + def test_has_all_dag_access(app, security_manager): for role_name in ["Admin", "Viewer", "Op", "User"]: with app.app_context():