Skip to content
6 changes: 6 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ The `default_queue` configuration option has been moved from `[celery]` section

This allows Airflow to work more reliably with some environments (like Azure) by default.

### `sync-perm` CLI no longer syncs DAG specific permissions by default

The `sync-perm` CLI command will no longer sync DAG specific permissions by default as they are now being handled during
DAG parsing. If you need or want the old behavior, you can pass `--include-dags` to have `sync-perm` also sync DAG
specific permissions.

## Airflow 2.0.1

### Permission to view Airflow Configurations has been removed from `User` and `Viewer` role
Expand Down
9 changes: 7 additions & 2 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,11 @@ def _check(value):
help="If passed, this command will be successful even if multiple matching alive jobs are found.",
)

# sync-perm
ARG_INCLUDE_DAGS = Arg(
("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true"
)

ALTERNATIVE_CONN_SPECS_ARGS = [
ARG_CONN_TYPE,
ARG_CONN_DESCRIPTION,
Expand Down Expand Up @@ -1556,9 +1561,9 @@ class GroupCommand(NamedTuple):
),
ActionCommand(
name='sync-perm',
help="Update permissions for existing roles and DAGs",
help="Update permissions for existing roles and optionally DAGs",
func=lazy_load_command('airflow.cli.commands.sync_perm_command.sync_perm'),
args=(),
args=(ARG_INCLUDE_DAGS,),
),
ActionCommand(
name='rotate-fernet-key',
Expand Down
10 changes: 3 additions & 7 deletions airflow/cli/commands/sync_perm_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
"""Sync permission command"""
from airflow.models import DagBag
from airflow.utils import cli as cli_utils
from airflow.www.app import cached_app

Expand All @@ -29,9 +28,6 @@ def sync_perm(args):
# Add missing permissions for all the Base Views _before_ syncing/creating roles
appbuilder.add_permissions(update_perms=True)
appbuilder.sm.sync_roles()
print('Updating permission on all DAG views')
dagbag = DagBag(read_dags_from_db=True)
dagbag.collect_dags_from_db()
dags = dagbag.dags.values()
for dag in dags:
appbuilder.sm.sync_perm_for_dag(dag.dag_id, dag.access_control)
if args.include_dags:
print('Updating permission on all DAG views')
appbuilder.sm.create_dag_specific_permissions()
8 changes: 7 additions & 1 deletion airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,17 @@ def _serialize_dag_capturing_errors(dag, session):
return []
try:
# We cant use bulk_write_to_db as we want to capture each error individually
SerializedDagModel.write_dag(
dag_was_updated = SerializedDagModel.write_dag(
dag,
min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
session=session,
)
if dag_was_updated:
self.log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id)
from airflow.www.security import ApplessAirflowSecurityManager

security_manager = ApplessAirflowSecurityManager(session=session)
security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control)
return []
except OperationalError:
raise
Expand Down
9 changes: 6 additions & 3 deletions airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ def __repr__(self):

@classmethod
@provide_session
def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: Session = None):
def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: Session = None) -> bool:
"""Serializes a DAG and writes it into database.
If the record already exists, it checks if the Serialized DAG changed or not. If it is
changed, it updates the record, ignores otherwise.

:param dag: a DAG to be written into database
:param min_update_interval: minimal interval in seconds to update serialized DAG
:param session: ORM Session

:returns: Boolean indicating if the DAG was written to the DB
"""
# Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval
# If Yes, does nothing
Expand All @@ -122,19 +124,20 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session:
)
)
).scalar():
return
return False

log.debug("Checking if DAG (%s) changed", dag.dag_id)
new_serialized_dag = cls(dag)
serialized_dag_hash_from_db = session.query(cls.dag_hash).filter(cls.dag_id == dag.dag_id).scalar()

if serialized_dag_hash_from_db == new_serialized_dag.dag_hash:
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
return
return False

log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
session.merge(new_serialized_dag)
log.debug("DAG: %s written to the DB", dag.dag_id)
return True

@classmethod
@provide_session
Expand Down
43 changes: 27 additions & 16 deletions airflow/www/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
from sqlalchemy import or_
from sqlalchemy.orm import joinedload

from airflow import models
from airflow.exceptions import AirflowException
from airflow.models import DagModel
from airflow.models import DagBag, DagModel
from airflow.security import permissions
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
Expand Down Expand Up @@ -540,24 +539,28 @@ def _get_all_roles_with_permissions(self) -> Dict[str, Role]:

def create_dag_specific_permissions(self) -> None:
"""
Creates 'can_read' and 'can_edit' permissions for all active and paused DAGs.
Creates 'can_read' and 'can_edit' permissions for all DAGs,
along with any `access_control` permissions provided in them.

This does iterate through ALL the DAGs, which can be slow. See `sync_perm_for_dag`
if you only need to sync a single DAG.

:return: None.
"""
perms = self.get_all_permissions()
rows = (
self.get_session.query(models.DagModel.dag_id)
.filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
Comment thread
jedcunningham marked this conversation as resolved.
Outdated
.all()
)
dagbag = DagBag(read_dags_from_db=True)
dagbag.collect_dags_from_db()
Comment thread
kaxil marked this conversation as resolved.
Outdated
dags = dagbag.dags.values()

for row in rows:
dag_id = row[0]
for dag in dags:
dag_resource_name = self.prefixed_dag_id(dag.dag_id)
for perm_name in self.DAG_PERMS:
dag_resource_name = self.prefixed_dag_id(dag_id)
if (perm_name, dag_resource_name) not in perms:
self._merge_perm(perm_name, dag_resource_name)

if dag.access_control:
self._sync_dag_view_permissions(dag_resource_name, dag.access_control)

def update_admin_perm_view(self):
"""
Admin should have all the permission-views, except the dag views.
Expand Down Expand Up @@ -595,7 +598,6 @@ def sync_roles(self):
"""
# Create global all-dag VM
self.create_perm_vm_for_all_dag()
self.create_dag_specific_permissions()

# Sync the default roles (Admin, Viewer, User, Op, public) with related permissions
self.bulk_sync_roles(self.ROLE_CONFIGS)
Expand All @@ -617,7 +619,7 @@ def sync_resource_permissions(self, perms=None):
def sync_perm_for_dag(self, dag_id, access_control=None):
"""
Sync permissions for given dag id. The dag id surely exists in our dag bag
as only / refresh button or cli.sync_perm will call this function
as only / refresh button or DagBag will call this function

:param dag_id: the ID of the DAG whose permissions should be updated
:type dag_id: str
Expand All @@ -629,9 +631,7 @@ def sync_perm_for_dag(self, dag_id, access_control=None):
"""
prefixed_dag_id = self.prefixed_dag_id(dag_id)
for dag_perm in self.DAG_PERMS:
perm_on_dag = self.find_permission_view_menu(dag_perm, prefixed_dag_id)
if perm_on_dag is None:
self.add_permission_view_menu(dag_perm, prefixed_dag_id)
self.add_permission_view_menu(dag_perm, prefixed_dag_id)

if access_control:
self._sync_dag_view_permissions(prefixed_dag_id, access_control)
Expand Down Expand Up @@ -728,3 +728,14 @@ def check_authorization(
return False

return True


class ApplessAirflowSecurityManager(AirflowSecurityManager):
"""Security Manager that doesn't need the whole flask app"""

def __init__(self, session=None): # pylint: disable=super-init-not-called
self.session = session

@property
def get_session(self):
return self.session
41 changes: 13 additions & 28 deletions tests/cli/commands/test_sync_perm_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,33 @@

from airflow.cli import cli_parser
from airflow.cli.commands import sync_perm_command
from airflow.models.dag import DAG
from airflow.models.dagbag import DagBag
from airflow.security import permissions


class TestCliSyncPerm(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.dagbag = DagBag(include_examples=True)
cls.parser = cli_parser.get_parser()

@mock.patch("airflow.cli.commands.sync_perm_command.cached_app")
@mock.patch("airflow.cli.commands.sync_perm_command.DagBag")
def test_cli_sync_perm(self, dagbag_mock, mock_cached_app):
dags = [
DAG('has_access_control', access_control={'Public': {permissions.ACTION_CAN_READ}}),
DAG('no_access_control'),
]
def test_cli_sync_perm(self, mock_cached_app):
appbuilder = mock_cached_app.return_value.appbuilder
appbuilder.sm = mock.Mock()

collect_dags_from_db_mock = mock.Mock()
dagbag = mock.Mock()
args = self.parser.parse_args(['sync-perm'])
sync_perm_command.sync_perm(args)

dagbag.dags = {dag.dag_id: dag for dag in dags}
dagbag.collect_dags_from_db = collect_dags_from_db_mock
dagbag_mock.return_value = dagbag
appbuilder.add_permissions.assert_called_once_with(update_perms=True)
appbuilder.sm.sync_roles.assert_called_once_with()
appbuilder.sm.create_dag_specific_permissions.assert_not_called()

@mock.patch("airflow.cli.commands.sync_perm_command.cached_app")
def test_cli_sync_perm_include_dags(self, mock_cached_app):
appbuilder = mock_cached_app.return_value.appbuilder
appbuilder.sm = mock.Mock()

args = self.parser.parse_args(['sync-perm'])
args = self.parser.parse_args(['sync-perm', '--include-dags'])
sync_perm_command.sync_perm(args)

assert appbuilder.sm.sync_roles.call_count == 1

dagbag_mock.assert_called_once_with(read_dags_from_db=True)
collect_dags_from_db_mock.assert_called_once_with()
assert 2 == len(appbuilder.sm.sync_perm_for_dag.mock_calls)
appbuilder.sm.sync_perm_for_dag.assert_any_call(
'has_access_control', {'Public': {permissions.ACTION_CAN_READ}}
)
appbuilder.sm.sync_perm_for_dag.assert_any_call(
'no_access_control',
None,
)
appbuilder.add_permissions.assert_called_once_with(update_perms=True)
appbuilder.sm.sync_roles.assert_called_once_with()
appbuilder.sm.create_dag_specific_permissions.assert_called_once_with()
37 changes: 37 additions & 0 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,43 @@ def test_sync_to_db_is_retried(self, mock_bulk_write_to_db, mock_s10n_write_dag,
]
)

@patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5)
@patch("airflow.www.security.ApplessAirflowSecurityManager")
def test_sync_to_db_handles_dag_specific_permissions(self, mock_security_manager):
"""
Test that when dagbag.sync_to_db is called new DAGs and updates DAGs have their
DAG specific permissions synced
"""
with create_session() as session:
# New DAG
dagbag = DagBag(
dag_folder=os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py"),
include_examples=False,
)
with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
dagbag.sync_to_db(session=session)

mock_security_manager.return_value.sync_perm_for_dag.assert_called_once_with(
"test_example_bash_operator", None
)

# DAG is updated
mock_security_manager.reset_mock()
dagbag.dags["test_example_bash_operator"].tags = ["new_tag"]
with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 20)):
dagbag.sync_to_db(session=session)

mock_security_manager.return_value.sync_perm_for_dag.assert_called_once_with(
"test_example_bash_operator", None
)

# DAG isn't updated
mock_security_manager.reset_mock()
with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 40)):
dagbag.sync_to_db(session=session)

mock_security_manager.return_value.sync_perm_for_dag.assert_not_called()

@patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5)
@patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
def test_get_dag_with_dag_serialization(self):
Expand Down
9 changes: 6 additions & 3 deletions tests/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,32 @@ def test_serialized_dag_is_updated_only_if_dag_is_changed(self):

example_dags = make_example_dags(example_dags_module)
example_bash_op_dag = example_dags.get("example_bash_operator")
SDM.write_dag(dag=example_bash_op_dag)
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
assert dag_updated is True

with create_session() as session:
s_dag = session.query(SDM).get(example_bash_op_dag.dag_id)

# Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
# column is not updated
SDM.write_dag(dag=example_bash_op_dag)
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
s_dag_1 = session.query(SDM).get(example_bash_op_dag.dag_id)

assert s_dag_1.dag_hash == s_dag.dag_hash
assert s_dag.last_updated == s_dag_1.last_updated
assert dag_updated is False

# Update DAG
example_bash_op_dag.tags += ["new_tag"]
assert set(example_bash_op_dag.tags) == {"example", "example2", "new_tag"}

SDM.write_dag(dag=example_bash_op_dag)
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
s_dag_2 = session.query(SDM).get(example_bash_op_dag.dag_id)

assert s_dag.last_updated != s_dag_2.last_updated
assert s_dag.dag_hash != s_dag_2.dag_hash
assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"]
assert dag_updated is True

def test_read_dags(self):
"""DAGs can be read from database."""
Expand Down
Loading