Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
sqlalchemy no checked permissions bug fix
  • Loading branch information
Gwen Whelan committed Dec 10, 2021
commit e1bb032bad7e728f083b8e169c48000e6198c88c
6 changes: 4 additions & 2 deletions languages/python/sqlalchemy-oso/sqlalchemy_oso/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ def scoped_session(
scopefunc = scopefunc or (lambda: None)

def _scopefunc():
checked_permissions = frozenset(get_checked_permissions().items())
return (get_oso(), checked_permissions, get_user(), scopefunc())
perms = get_checked_permissions()
perms = frozenset() if perms is None else frozenset(perms.items())
return (get_oso(), perms, get_user(), scopefunc())

factory = authorized_sessionmaker(
get_oso, get_user, get_checked_permissions, **kwargs
Expand Down Expand Up @@ -315,6 +316,7 @@ def do_orm_execute(execute_state):
else:
logger.warning(f"Policy did not return filter for entity {entity}")


except ImportError:
from sqlalchemy.orm.query import Query

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def get_joinedload_entities(stmt):

return entities


except ImportError:
# This code should not be called for SQLAlchemy 1.4.
def all_entities_in_statement(statement):
Expand Down
11 changes: 11 additions & 0 deletions languages/python/sqlalchemy-oso/tests/test_sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,17 @@ def test_authorized_session_relationship(engine, oso, fixture_data):
assert post_7.created_by is None


def test_scoped_session_with_no_checked_permissions(engine, oso, fixture_data):
# the policy denies all requests
oso.load_str('allow("user", "read", _) if false;')
# but passing None skips authorization
session = scoped_session(lambda: oso, lambda: "user", lambda: None)
session.configure(bind=engine)
posts = session.query(Post)
# check that any posts are allowed
assert posts.count()


def test_scoped_session_relationship(engine, oso, fixture_data):
oso.load_str(
"""allow("user", "read", post: Post) if post.id = 1;
Expand Down