diff --git a/UPDATING.md b/UPDATING.md index bbacbe05202af..5e925b59f63e9 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -95,6 +95,12 @@ The `default_queue` configuration option has been moved from `[celery]` section This allows Airflow to work more reliably with some environments (like Azure) by default. +### `sync-perm` CLI no longer syncs DAG specific permissions by default + +The `sync-perm` CLI command will no longer sync DAG specific permissions by default as they are now being handled during +DAG parsing. If you need or want the old behavior, you can pass `--include-dags` to have `sync-perm` also sync DAG +specific permissions. + ## Airflow 2.0.1 ### Permission to view Airflow Configurations has been removed from `User` and `Viewer` role diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 87c87fc81a672..f3f0344886d63 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -729,6 +729,11 @@ def _check(value): help="If passed, this command will be successful even if multiple matching alive jobs are found.", ) +# sync-perm +ARG_INCLUDE_DAGS = Arg( + ("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true" +) + ALTERNATIVE_CONN_SPECS_ARGS = [ ARG_CONN_TYPE, ARG_CONN_DESCRIPTION, @@ -1556,9 +1561,9 @@ class GroupCommand(NamedTuple): ), ActionCommand( name='sync-perm', - help="Update permissions for existing roles and DAGs", + help="Update permissions for existing roles and optionally DAGs", func=lazy_load_command('airflow.cli.commands.sync_perm_command.sync_perm'), - args=(), + args=(ARG_INCLUDE_DAGS,), ), ActionCommand( name='rotate-fernet-key', diff --git a/airflow/cli/commands/sync_perm_command.py b/airflow/cli/commands/sync_perm_command.py index e382b89998c45..d957fcbfaaeac 100644 --- a/airflow/cli/commands/sync_perm_command.py +++ b/airflow/cli/commands/sync_perm_command.py @@ -16,7 +16,6 @@ # specific language governing permissions and limitations # under the License. """Sync permission command""" -from airflow.models import DagBag from airflow.utils import cli as cli_utils from airflow.www.app import cached_app @@ -29,9 +28,6 @@ def sync_perm(args): # Add missing permissions for all the Base Views _before_ syncing/creating roles appbuilder.add_permissions(update_perms=True) appbuilder.sm.sync_roles() - print('Updating permission on all DAG views') - dagbag = DagBag(read_dags_from_db=True) - dagbag.collect_dags_from_db() - dags = dagbag.dags.values() - for dag in dags: - appbuilder.sm.sync_perm_for_dag(dag.dag_id, dag.access_control) + if args.include_dags: + print('Updating permission on all DAG views') + appbuilder.sm.create_dag_specific_permissions() diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 7099e185eef4d..10662e5ab21c3 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -539,11 +539,17 @@ def _serialize_dag_capturing_errors(dag, session): return [] try: # We cant use bulk_write_to_db as we want to capture each error individually - SerializedDagModel.write_dag( + dag_was_updated = SerializedDagModel.write_dag( dag, min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL, session=session, ) + if dag_was_updated: + self.log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id) + from airflow.www.security import ApplessAirflowSecurityManager + + security_manager = ApplessAirflowSecurityManager(session=session) + security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control) return [] except OperationalError: raise diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 0184307aca6dd..c2706f492fab8 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -101,7 +101,7 @@ def __repr__(self): @classmethod @provide_session - def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: Session = None): + def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: Session = None) -> bool: """Serializes a DAG and writes it into database. If the record already exists, it checks if the Serialized DAG changed or not. If it is changed, it updates the record, ignores otherwise. @@ -109,6 +109,8 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: :param dag: a DAG to be written into database :param min_update_interval: minimal interval in seconds to update serialized DAG :param session: ORM Session + + :returns: Boolean indicating if the DAG was written to the DB """ # Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval # If Yes, does nothing @@ -122,7 +124,7 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: ) ) ).scalar(): - return + return False log.debug("Checking if DAG (%s) changed", dag.dag_id) new_serialized_dag = cls(dag) @@ -130,11 +132,12 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: if serialized_dag_hash_from_db == new_serialized_dag.dag_hash: log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id) - return + return False log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id) session.merge(new_serialized_dag) log.debug("DAG: %s written to the DB", dag.dag_id) + return True @classmethod @provide_session diff --git a/airflow/www/security.py b/airflow/www/security.py index 25ff0b9952f98..dcccf47233ef3 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -27,9 +27,8 @@ from sqlalchemy import or_ from sqlalchemy.orm import joinedload -from airflow import models from airflow.exceptions import AirflowException -from airflow.models import DagModel +from airflow.models import DagBag, DagModel from airflow.security import permissions from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import provide_session @@ -540,24 +539,28 @@ def _get_all_roles_with_permissions(self) -> Dict[str, Role]: def create_dag_specific_permissions(self) -> None: """ - Creates 'can_read' and 'can_edit' permissions for all active and paused DAGs. + Creates 'can_read' and 'can_edit' permissions for all DAGs, + along with any `access_control` permissions provided in them. + + This does iterate through ALL the DAGs, which can be slow. See `sync_perm_for_dag` + if you only need to sync a single DAG. :return: None. """ perms = self.get_all_permissions() - rows = ( - self.get_session.query(models.DagModel.dag_id) - .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) - .all() - ) + dagbag = DagBag(read_dags_from_db=True) + dagbag.collect_dags_from_db() + dags = dagbag.dags.values() - for row in rows: - dag_id = row[0] + for dag in dags: + dag_resource_name = self.prefixed_dag_id(dag.dag_id) for perm_name in self.DAG_PERMS: - dag_resource_name = self.prefixed_dag_id(dag_id) if (perm_name, dag_resource_name) not in perms: self._merge_perm(perm_name, dag_resource_name) + if dag.access_control: + self._sync_dag_view_permissions(dag_resource_name, dag.access_control) + def update_admin_perm_view(self): """ Admin should have all the permission-views, except the dag views. @@ -595,7 +598,6 @@ def sync_roles(self): """ # Create global all-dag VM self.create_perm_vm_for_all_dag() - self.create_dag_specific_permissions() # Sync the default roles (Admin, Viewer, User, Op, public) with related permissions self.bulk_sync_roles(self.ROLE_CONFIGS) @@ -617,7 +619,7 @@ def sync_resource_permissions(self, perms=None): def sync_perm_for_dag(self, dag_id, access_control=None): """ Sync permissions for given dag id. The dag id surely exists in our dag bag - as only / refresh button or cli.sync_perm will call this function + as only / refresh button or DagBag will call this function :param dag_id: the ID of the DAG whose permissions should be updated :type dag_id: str @@ -629,9 +631,7 @@ def sync_perm_for_dag(self, dag_id, access_control=None): """ prefixed_dag_id = self.prefixed_dag_id(dag_id) for dag_perm in self.DAG_PERMS: - perm_on_dag = self.find_permission_view_menu(dag_perm, prefixed_dag_id) - if perm_on_dag is None: - self.add_permission_view_menu(dag_perm, prefixed_dag_id) + self.add_permission_view_menu(dag_perm, prefixed_dag_id) if access_control: self._sync_dag_view_permissions(prefixed_dag_id, access_control) @@ -728,3 +728,14 @@ def check_authorization( return False return True + + +class ApplessAirflowSecurityManager(AirflowSecurityManager): + """Security Manager that doesn't need the whole flask app""" + + def __init__(self, session=None): # pylint: disable=super-init-not-called + self.session = session + + @property + def get_session(self): + return self.session diff --git a/tests/cli/commands/test_sync_perm_command.py b/tests/cli/commands/test_sync_perm_command.py index 5f7f86e477cf7..9fff6d686fa19 100644 --- a/tests/cli/commands/test_sync_perm_command.py +++ b/tests/cli/commands/test_sync_perm_command.py @@ -21,48 +21,33 @@ from airflow.cli import cli_parser from airflow.cli.commands import sync_perm_command -from airflow.models.dag import DAG -from airflow.models.dagbag import DagBag -from airflow.security import permissions class TestCliSyncPerm(unittest.TestCase): @classmethod def setUpClass(cls): - cls.dagbag = DagBag(include_examples=True) cls.parser = cli_parser.get_parser() @mock.patch("airflow.cli.commands.sync_perm_command.cached_app") - @mock.patch("airflow.cli.commands.sync_perm_command.DagBag") - def test_cli_sync_perm(self, dagbag_mock, mock_cached_app): - dags = [ - DAG('has_access_control', access_control={'Public': {permissions.ACTION_CAN_READ}}), - DAG('no_access_control'), - ] + def test_cli_sync_perm(self, mock_cached_app): + appbuilder = mock_cached_app.return_value.appbuilder + appbuilder.sm = mock.Mock() - collect_dags_from_db_mock = mock.Mock() - dagbag = mock.Mock() + args = self.parser.parse_args(['sync-perm']) + sync_perm_command.sync_perm(args) - dagbag.dags = {dag.dag_id: dag for dag in dags} - dagbag.collect_dags_from_db = collect_dags_from_db_mock - dagbag_mock.return_value = dagbag + appbuilder.add_permissions.assert_called_once_with(update_perms=True) + appbuilder.sm.sync_roles.assert_called_once_with() + appbuilder.sm.create_dag_specific_permissions.assert_not_called() + @mock.patch("airflow.cli.commands.sync_perm_command.cached_app") + def test_cli_sync_perm_include_dags(self, mock_cached_app): appbuilder = mock_cached_app.return_value.appbuilder appbuilder.sm = mock.Mock() - args = self.parser.parse_args(['sync-perm']) + args = self.parser.parse_args(['sync-perm', '--include-dags']) sync_perm_command.sync_perm(args) - assert appbuilder.sm.sync_roles.call_count == 1 - - dagbag_mock.assert_called_once_with(read_dags_from_db=True) - collect_dags_from_db_mock.assert_called_once_with() - assert 2 == len(appbuilder.sm.sync_perm_for_dag.mock_calls) - appbuilder.sm.sync_perm_for_dag.assert_any_call( - 'has_access_control', {'Public': {permissions.ACTION_CAN_READ}} - ) - appbuilder.sm.sync_perm_for_dag.assert_any_call( - 'no_access_control', - None, - ) appbuilder.add_permissions.assert_called_once_with(update_perms=True) + appbuilder.sm.sync_roles.assert_called_once_with() + appbuilder.sm.create_dag_specific_permissions.assert_called_once_with() diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 950d99d128ee5..2a0d589aff7ac 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -687,6 +687,43 @@ def test_sync_to_db_is_retried(self, mock_bulk_write_to_db, mock_s10n_write_dag, ] ) + @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5) + @patch("airflow.www.security.ApplessAirflowSecurityManager") + def test_sync_to_db_handles_dag_specific_permissions(self, mock_security_manager): + """ + Test that when dagbag.sync_to_db is called new DAGs and updates DAGs have their + DAG specific permissions synced + """ + with create_session() as session: + # New DAG + dagbag = DagBag( + dag_folder=os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py"), + include_examples=False, + ) + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)): + dagbag.sync_to_db(session=session) + + mock_security_manager.return_value.sync_perm_for_dag.assert_called_once_with( + "test_example_bash_operator", None + ) + + # DAG is updated + mock_security_manager.reset_mock() + dagbag.dags["test_example_bash_operator"].tags = ["new_tag"] + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 20)): + dagbag.sync_to_db(session=session) + + mock_security_manager.return_value.sync_perm_for_dag.assert_called_once_with( + "test_example_bash_operator", None + ) + + # DAG isn't updated + mock_security_manager.reset_mock() + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 40)): + dagbag.sync_to_db(session=session) + + mock_security_manager.return_value.sync_perm_for_dag.assert_not_called() + @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5) @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5) def test_get_dag_with_dag_serialization(self): diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index 72380c55d153d..db8282f943f7b 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -78,29 +78,32 @@ def test_serialized_dag_is_updated_only_if_dag_is_changed(self): example_dags = make_example_dags(example_dags_module) example_bash_op_dag = example_dags.get("example_bash_operator") - SDM.write_dag(dag=example_bash_op_dag) + dag_updated = SDM.write_dag(dag=example_bash_op_dag) + assert dag_updated is True with create_session() as session: s_dag = session.query(SDM).get(example_bash_op_dag.dag_id) # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated # column is not updated - SDM.write_dag(dag=example_bash_op_dag) + dag_updated = SDM.write_dag(dag=example_bash_op_dag) s_dag_1 = session.query(SDM).get(example_bash_op_dag.dag_id) assert s_dag_1.dag_hash == s_dag.dag_hash assert s_dag.last_updated == s_dag_1.last_updated + assert dag_updated is False # Update DAG example_bash_op_dag.tags += ["new_tag"] assert set(example_bash_op_dag.tags) == {"example", "example2", "new_tag"} - SDM.write_dag(dag=example_bash_op_dag) + dag_updated = SDM.write_dag(dag=example_bash_op_dag) s_dag_2 = session.query(SDM).get(example_bash_op_dag.dag_id) assert s_dag.last_updated != s_dag_2.last_updated assert s_dag.dag_hash != s_dag_2.dag_hash assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"] + assert dag_updated is True def test_read_dags(self): """DAGs can be read from database.""" diff --git a/tests/www/test_security.py b/tests/www/test_security.py index 8a85d9671725c..87ce7635dcd58 100644 --- a/tests/www/test_security.py +++ b/tests/www/test_security.py @@ -29,6 +29,7 @@ from airflow import settings from airflow.exceptions import AirflowException from airflow.models import DagModel +from airflow.models.dag import DAG from airflow.security import permissions from airflow.www import app as application from airflow.www.utils import CustomSQLAInterface @@ -431,7 +432,7 @@ def test_has_all_dag_access(self, mock_has_role, mock_has_perm): def test_access_control_with_non_existent_role(self): with pytest.raises(AirflowException) as ctx: - self.security_manager.sync_perm_for_dag( + self.security_manager._sync_dag_view_permissions( dag_id='access-control-test', access_control={ 'this-role-does-not-exist': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ] @@ -473,7 +474,7 @@ def test_access_control_with_invalid_permission(self): for permission in invalid_permissions: self.expect_user_is_in_role(user, rolename='team-a') with pytest.raises(AirflowException) as ctx: - self.security_manager.sync_perm_for_dag( + self.security_manager._sync_dag_view_permissions( 'access_control_test', access_control={'team-a': {permission}} ) assert "invalid permissions" in str(ctx.value) @@ -489,7 +490,7 @@ def test_access_control_is_set_on_init(self): permissions=[], ) self.expect_user_is_in_role(user, rolename='team-a') - self.security_manager.sync_perm_for_dag( + self.security_manager._sync_dag_view_permissions( 'access_control_test', access_control={'team-a': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]}, ) @@ -517,12 +518,12 @@ def test_access_control_stale_perms_are_revoked(self): permissions=[], ) self.expect_user_is_in_role(user, rolename='team-a') - self.security_manager.sync_perm_for_dag( + self.security_manager._sync_dag_view_permissions( 'access_control_test', access_control={'team-a': READ_WRITE} ) self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id='access_control_test', user=user) - self.security_manager.sync_perm_for_dag( + self.security_manager._sync_dag_view_permissions( 'access_control_test', access_control={'team-a': READ_ONLY} ) self.assert_user_has_dag_perms( @@ -562,24 +563,46 @@ def test_correct_roles_have_perms_to_read_config(self): f"on {permissions.RESOURCE_CONFIG}" ) - def test_create_dag_specific_permissions(self): - dag_id = 'some_dag_id' - dag_permission_name = self.security_manager.prefixed_dag_id(dag_id) - assert ('can_read', dag_permission_name) not in self.security_manager.get_all_permissions() + @mock.patch("airflow.www.security.DagBag") + def test_create_dag_specific_permissions(self, dagbag_mock): + access_control = {'Public': {permissions.ACTION_CAN_READ}} + dags = [ + DAG('has_access_control', access_control=access_control), + DAG('no_access_control'), + ] - dag_model = DagModel( - dag_id=dag_id, fileloc='/tmp/dag_.py', schedule_interval='2 2 * * *', is_paused=True - ) - self.session.add(dag_model) - self.session.commit() + collect_dags_from_db_mock = mock.Mock() + dagbag = mock.Mock() + + dagbag.dags = {dag.dag_id: dag for dag in dags} + dagbag.collect_dags_from_db = collect_dags_from_db_mock + dagbag_mock.return_value = dagbag + + self.security_manager._sync_dag_view_permissions = mock.Mock() + + for dag in dags: + prefixed_dag_id = self.security_manager.prefixed_dag_id(dag.dag_id) + all_perms = self.security_manager.get_all_permissions() + assert ('can_read', prefixed_dag_id) not in all_perms + assert ('can_edit', prefixed_dag_id) not in all_perms self.security_manager.create_dag_specific_permissions() - self.session.commit() - assert ('can_read', dag_permission_name) in self.security_manager.get_all_permissions() + dagbag_mock.assert_called_once_with(read_dags_from_db=True) + collect_dags_from_db_mock.assert_called_once_with() + + for dag in dags: + prefixed_dag_id = self.security_manager.prefixed_dag_id(dag.dag_id) + all_perms = self.security_manager.get_all_permissions() + assert ('can_read', prefixed_dag_id) in all_perms + assert ('can_edit', prefixed_dag_id) in all_perms + + self.security_manager._sync_dag_view_permissions.assert_called_once_with( + self.security_manager.prefixed_dag_id('has_access_control'), access_control + ) - # Make sure we short circuit when the perms already exist - with assert_queries_count(2): # One query to get DagModels, one query to get all perms + del dagbag.dags["has_access_control"] + with assert_queries_count(1): # one query to get all perms; dagbag is mocked self.security_manager.create_dag_specific_permissions() def test_get_all_permissions(self):