from __future__ import annotations
import asyncio
import time
from collections import OrderedDict
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import and_, or_, select
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
from litestar_permissions.config import PermissionsConfig
[docs]
class PermissionResolver:
"""Resolves whether a user has a specific permission, optionally scoped to a resource.
Supports hierarchical inheritance: if a user has org-admin on org X,
they inherit all permissions on projects and applications within org X.
"""
[docs]
def __init__(self, config: PermissionsConfig, models: dict[str, type]) -> None:
self.config = config
self.models = models
self._cache: OrderedDict[str, tuple[bool, float]] = OrderedDict()
[docs]
async def can(
self,
user_id: UUID | str,
permission: str,
resource_type: str | None = None,
resource_id: UUID | str | None = None,
*,
db: AsyncSession,
) -> bool:
"""Check if user has the given permission, optionally scoped to a resource."""
cache_key = f"{user_id}:{permission}:{resource_type}:{resource_id}"
# Check cache
if self.config.cache_ttl > 0:
cached = self._cache.get(cache_key)
if cached is not None:
result, ts = cached
if time.monotonic() - ts < self.config.cache_ttl:
self._cache.move_to_end(cache_key)
return result
del self._cache[cache_key]
result = await self._resolve(user_id, permission, resource_type, resource_id, db=db)
# Store in cache
if self.config.cache_ttl > 0:
self._cache[cache_key] = (result, time.monotonic())
# LRU eviction
while len(self._cache) > self.config.cache_max_size:
self._cache.popitem(last=False)
return result
async def _resolve(
self,
user_id: UUID | str,
permission: str,
resource_type: str | None,
resource_id: UUID | str | None,
*,
db: AsyncSession,
) -> bool:
"""Core resolution logic."""
role_model = self.models["Role"]
permission_model = self.models["Permission"]
role_permission = self.models["RolePermission"]
user_role_assignment = self.models["UserRoleAssignment"]
# Build the set of resource scopes to check (including ancestors)
scopes: list[tuple[str | None, UUID | str | None]] = [(None, None)] # global
if resource_type and resource_id:
scopes.append((resource_type, resource_id))
# Walk up the hierarchy
current_type = resource_type
current_id = resource_id
while current_type in self.config.hierarchy:
parent_type = self.config.hierarchy[current_type]
# Resolve parent ID via the resource_resolver callback
if self.config.resource_resolver:
result = self.config.resource_resolver(current_type, current_id, db)
# Support both sync and async resource resolvers
if asyncio.iscoroutine(result):
result = await result
parent_resource = result
if parent_resource and parent_resource.parent:
current_type = parent_type
current_id = parent_resource.parent.id
scopes.append((current_type, current_id))
else:
break
else:
break
# Query: does user have ANY role at ANY of these scopes
# that includes the requested permission?
stmt = (
select(permission_model.codename)
.join(role_permission, role_permission.permission_id == permission_model.id)
.join(role_model, role_model.id == role_permission.role_id)
.join(user_role_assignment, user_role_assignment.role_id == role_model.id)
.where(user_role_assignment.user_id == user_id)
.where(permission_model.codename == permission)
)
# Filter by scopes
scope_filters = _build_scope_filters(user_role_assignment, scopes)
stmt = stmt.where(or_(*scope_filters))
result = await db.execute(stmt)
return result.first() is not None
[docs]
async def get_user_permissions(
self,
user_id: UUID | str,
resource_type: str | None = None,
resource_id: UUID | str | None = None,
*,
db: AsyncSession,
) -> set[str]:
"""Get all permission codenames a user has at the given scope (+ ancestors)."""
role_model = self.models["Role"]
permission_model = self.models["Permission"]
role_permission = self.models["RolePermission"]
user_role_assignment = self.models["UserRoleAssignment"]
scopes: list[tuple[str | None, UUID | str | None]] = [(None, None)]
if resource_type and resource_id:
scopes.append((resource_type, resource_id))
current_type = resource_type
current_id = resource_id
while current_type in self.config.hierarchy:
parent_type = self.config.hierarchy[current_type]
if self.config.resource_resolver:
result = self.config.resource_resolver(current_type, current_id, db)
# Support both sync and async resource resolvers
if asyncio.iscoroutine(result):
result = await result
parent_resource = result
if parent_resource and parent_resource.parent:
current_type = parent_type
current_id = parent_resource.parent.id
scopes.append((current_type, current_id))
else:
break
else:
break
stmt = (
select(permission_model.codename)
.join(role_permission, role_permission.permission_id == permission_model.id)
.join(role_model, role_model.id == role_permission.role_id)
.join(user_role_assignment, user_role_assignment.role_id == role_model.id)
.where(user_role_assignment.user_id == user_id)
)
scope_filters = _build_scope_filters(user_role_assignment, scopes)
stmt = stmt.where(or_(*scope_filters))
result = await db.execute(stmt)
rows = result.all()
return {row[0] for row in rows}
[docs]
def invalidate_user(self, user_id: UUID | str) -> None:
"""Remove all cached entries for a user."""
prefix = f"{user_id}:"
keys_to_remove = [k for k in self._cache if k.startswith(prefix)]
for k in keys_to_remove:
del self._cache[k]
[docs]
def invalidate_all(self) -> None:
"""Clear the entire cache."""
self._cache.clear()
def _build_scope_filters(
user_role_assignment: type,
scopes: list[tuple[str | None, UUID | str | None]],
) -> list:
"""Build SQLAlchemy scope filter clauses for the given scopes."""
scope_filters = []
for scope_type, scope_id in scopes:
if scope_type is None:
scope_filters.append(
and_(
user_role_assignment.resource_type.is_(None),
user_role_assignment.resource_id.is_(None),
)
)
else:
scope_filters.append(
and_(
user_role_assignment.resource_type == scope_type,
user_role_assignment.resource_id == scope_id,
)
)
return scope_filters