Source code for litestar_permissions.resolver

from __future__ import annotations

import asyncio
import time
from collections import OrderedDict
from typing import TYPE_CHECKING
from uuid import UUID

from sqlalchemy import and_, or_, select

if TYPE_CHECKING:
    from sqlalchemy.ext.asyncio import AsyncSession

    from litestar_permissions.config import PermissionsConfig


[docs] class PermissionResolver: """Resolves whether a user has a specific permission, optionally scoped to a resource. Supports hierarchical inheritance: if a user has org-admin on org X, they inherit all permissions on projects and applications within org X. """
[docs] def __init__(self, config: PermissionsConfig, models: dict[str, type]) -> None: self.config = config self.models = models self._cache: OrderedDict[str, tuple[bool, float]] = OrderedDict()
[docs] async def can( self, user_id: UUID | str, permission: str, resource_type: str | None = None, resource_id: UUID | str | None = None, *, db: AsyncSession, ) -> bool: """Check if user has the given permission, optionally scoped to a resource.""" cache_key = f"{user_id}:{permission}:{resource_type}:{resource_id}" # Check cache if self.config.cache_ttl > 0: cached = self._cache.get(cache_key) if cached is not None: result, ts = cached if time.monotonic() - ts < self.config.cache_ttl: self._cache.move_to_end(cache_key) return result del self._cache[cache_key] result = await self._resolve(user_id, permission, resource_type, resource_id, db=db) # Store in cache if self.config.cache_ttl > 0: self._cache[cache_key] = (result, time.monotonic()) # LRU eviction while len(self._cache) > self.config.cache_max_size: self._cache.popitem(last=False) return result
async def _resolve( self, user_id: UUID | str, permission: str, resource_type: str | None, resource_id: UUID | str | None, *, db: AsyncSession, ) -> bool: """Core resolution logic.""" role_model = self.models["Role"] permission_model = self.models["Permission"] role_permission = self.models["RolePermission"] user_role_assignment = self.models["UserRoleAssignment"] # Build the set of resource scopes to check (including ancestors) scopes: list[tuple[str | None, UUID | str | None]] = [(None, None)] # global if resource_type and resource_id: scopes.append((resource_type, resource_id)) # Walk up the hierarchy current_type = resource_type current_id = resource_id while current_type in self.config.hierarchy: parent_type = self.config.hierarchy[current_type] # Resolve parent ID via the resource_resolver callback if self.config.resource_resolver: result = self.config.resource_resolver(current_type, current_id, db) # Support both sync and async resource resolvers if asyncio.iscoroutine(result): result = await result parent_resource = result if parent_resource and parent_resource.parent: current_type = parent_type current_id = parent_resource.parent.id scopes.append((current_type, current_id)) else: break else: break # Query: does user have ANY role at ANY of these scopes # that includes the requested permission? stmt = ( select(permission_model.codename) .join(role_permission, role_permission.permission_id == permission_model.id) .join(role_model, role_model.id == role_permission.role_id) .join(user_role_assignment, user_role_assignment.role_id == role_model.id) .where(user_role_assignment.user_id == user_id) .where(permission_model.codename == permission) ) # Filter by scopes scope_filters = _build_scope_filters(user_role_assignment, scopes) stmt = stmt.where(or_(*scope_filters)) result = await db.execute(stmt) return result.first() is not None
[docs] async def get_user_permissions( self, user_id: UUID | str, resource_type: str | None = None, resource_id: UUID | str | None = None, *, db: AsyncSession, ) -> set[str]: """Get all permission codenames a user has at the given scope (+ ancestors).""" role_model = self.models["Role"] permission_model = self.models["Permission"] role_permission = self.models["RolePermission"] user_role_assignment = self.models["UserRoleAssignment"] scopes: list[tuple[str | None, UUID | str | None]] = [(None, None)] if resource_type and resource_id: scopes.append((resource_type, resource_id)) current_type = resource_type current_id = resource_id while current_type in self.config.hierarchy: parent_type = self.config.hierarchy[current_type] if self.config.resource_resolver: result = self.config.resource_resolver(current_type, current_id, db) # Support both sync and async resource resolvers if asyncio.iscoroutine(result): result = await result parent_resource = result if parent_resource and parent_resource.parent: current_type = parent_type current_id = parent_resource.parent.id scopes.append((current_type, current_id)) else: break else: break stmt = ( select(permission_model.codename) .join(role_permission, role_permission.permission_id == permission_model.id) .join(role_model, role_model.id == role_permission.role_id) .join(user_role_assignment, user_role_assignment.role_id == role_model.id) .where(user_role_assignment.user_id == user_id) ) scope_filters = _build_scope_filters(user_role_assignment, scopes) stmt = stmt.where(or_(*scope_filters)) result = await db.execute(stmt) rows = result.all() return {row[0] for row in rows}
[docs] def invalidate_user(self, user_id: UUID | str) -> None: """Remove all cached entries for a user.""" prefix = f"{user_id}:" keys_to_remove = [k for k in self._cache if k.startswith(prefix)] for k in keys_to_remove: del self._cache[k]
[docs] def invalidate_all(self) -> None: """Clear the entire cache.""" self._cache.clear()
def _build_scope_filters( user_role_assignment: type, scopes: list[tuple[str | None, UUID | str | None]], ) -> list: """Build SQLAlchemy scope filter clauses for the given scopes.""" scope_filters = [] for scope_type, scope_id in scopes: if scope_type is None: scope_filters.append( and_( user_role_assignment.resource_type.is_(None), user_role_assignment.resource_id.is_(None), ) ) else: scope_filters.append( and_( user_role_assignment.resource_type == scope_type, user_role_assignment.resource_id == scope_id, ) ) return scope_filters