Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions languages/python/sqlalchemy-oso/sqlalchemy_oso/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ def scoped_session(
scopefunc = scopefunc or (lambda: None)

def _scopefunc():
checked_permissions = frozenset(get_checked_permissions().items())
return (get_oso(), checked_permissions, get_user(), scopefunc())
perms = get_checked_permissions()
perms = frozenset() if perms is None else frozenset(perms.items())
return (get_oso(), perms, get_user(), scopefunc())

factory = authorized_sessionmaker(
get_oso, get_user, get_checked_permissions, **kwargs
Expand Down Expand Up @@ -315,6 +316,7 @@ def do_orm_execute(execute_state):
else:
logger.warning(f"Policy did not return filter for entity {entity}")


except ImportError:
from sqlalchemy.orm.query import Query

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def get_joinedload_entities(stmt):

return entities


except ImportError:
# This code should not be called for SQLAlchemy 1.4.
def all_entities_in_statement(statement):
Expand Down
11 changes: 11 additions & 0 deletions languages/python/sqlalchemy-oso/tests/test_sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,17 @@ def test_authorized_session_relationship(engine, oso, fixture_data):
assert post_7.created_by is None


def test_scoped_session_with_no_checked_permissions(engine, oso, fixture_data):
# the policy denies all requests
oso.load_str("allow(_, _, _) if false;")
# but passing None skips authorization
session = scoped_session(lambda: oso, lambda: "user", lambda: None)
session.configure(bind=engine)
posts = session.query(Post)
# check that any posts are allowed
assert posts.count()


def test_scoped_session_relationship(engine, oso, fixture_data):
oso.load_str(
"""allow("user", "read", post: Post) if post.id = 1;
Expand Down