Source code for app.domain.accounts.services

from __future__ import annotations

from datetime import UTC, datetime
from typing import Any
from uuid import UUID  # noqa: TC003

from advanced_alchemy.repository import (
    SQLAlchemyAsyncRepository,
    SQLAlchemyAsyncSlugRepository,
)
from advanced_alchemy.service import (
    ModelDictT,
    SQLAlchemyAsyncRepositoryService,
    is_dict,
    is_dict_with_field,
    is_dict_without_field,
    schema_dump,
)
from litestar.exceptions import PermissionDeniedException

from app.config import constants
from app.db import models as m
from app.lib import crypt


[docs] class UserService(SQLAlchemyAsyncRepositoryService[m.User]): """Handles database operations for users."""
[docs] class UserRepository(SQLAlchemyAsyncRepository[m.User]): """User SQLAlchemy Repository.""" model_type = m.User
repository_type = UserRepository default_role = constants.DEFAULT_USER_ROLE match_fields = ["email"]
[docs] async def to_model_on_create(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]: return await self._populate_model(data)
[docs] async def to_model_on_update(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]: return await self._populate_model(data)
[docs] async def to_model_on_upsert(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]: return await self._populate_model(data)
[docs] async def authenticate(self, username: str, password: bytes | str) -> m.User: """Authenticate a user against the stored hashed password.""" db_obj = await self.get_one_or_none(email=username) if db_obj is None: msg = "User not found or password invalid" raise PermissionDeniedException(detail=msg) if db_obj.hashed_password is None: msg = "User not found or password invalid." raise PermissionDeniedException(detail=msg) if not await crypt.verify_password(password, db_obj.hashed_password): msg = "User not found or password invalid" raise PermissionDeniedException(detail=msg) if not db_obj.is_active: msg = "User account is inactive" raise PermissionDeniedException(detail=msg) return db_obj
[docs] async def update_password(self, data: dict[str, Any], db_obj: m.User) -> None: """Modify stored user password.""" if db_obj.hashed_password is None: msg = "User not found or password invalid." raise PermissionDeniedException(detail=msg) if not await crypt.verify_password(data["current_password"], db_obj.hashed_password): msg = "User not found or password invalid." raise PermissionDeniedException(detail=msg) if not db_obj.is_active: msg = "User account is not active" raise PermissionDeniedException(detail=msg) db_obj.hashed_password = await crypt.get_password_hash(data["new_password"]) await self.repository.update(db_obj)
[docs] @staticmethod async def has_role_id(db_obj: m.User, role_id: UUID) -> bool: """Return true if user has specified role ID""" return any(assigned_role.role_id for assigned_role in db_obj.roles if assigned_role.role_id == role_id)
[docs] @staticmethod async def has_role(db_obj: m.User, role_name: str) -> bool: """Return true if user has specified role ID""" return any(assigned_role.role_id for assigned_role in db_obj.roles if assigned_role.role_name == role_name)
@staticmethod def is_superuser(user: m.User) -> bool: return bool( user.is_superuser or any(assigned_role.role.name for assigned_role in user.roles if assigned_role.role.name in {"Superuser"}), ) async def _populate_model(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]: data = schema_dump(data) data = await self._populate_with_hashed_password(data) return await self._populate_with_role(data) async def _populate_with_hashed_password(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]: if is_dict(data) and (password := data.pop("password", None)) is not None: data["hashed_password"] = await crypt.get_password_hash(password) return data async def _populate_with_role(self, data: ModelDictT[m.User]) -> ModelDictT[m.User]: if is_dict(data) and (role_id := data.pop("role_id", None)) is not None: data = await self.to_model(data) data.roles.append(m.UserRole(role_id=role_id, assigned_at=datetime.now(UTC))) return data
[docs] class RoleService(SQLAlchemyAsyncRepositoryService[m.Role]): """Handles database operations for users."""
[docs] class Repository(SQLAlchemyAsyncSlugRepository[m.Role]): """User SQLAlchemy Repository.""" model_type = m.Role
repository_type = Repository match_fields = ["name"]
[docs] async def to_model_on_create(self, data: ModelDictT[m.Role]) -> ModelDictT[m.Role]: data = schema_dump(data) if is_dict_without_field(data, "slug"): data["slug"] = await self.repository.get_available_slug(data["name"]) return data
[docs] async def to_model_on_update(self, data: ModelDictT[m.Role]) -> ModelDictT[m.Role]: data = schema_dump(data) if is_dict_without_field(data, "slug") and is_dict_with_field(data, "name"): data["slug"] = await self.repository.get_available_slug(data["name"]) return data
[docs] class UserRoleService(SQLAlchemyAsyncRepositoryService[m.UserRole]): """Handles database operations for user roles."""
[docs] class Repository(SQLAlchemyAsyncRepository[m.UserRole]): """User Role SQLAlchemy Repository.""" model_type = m.UserRole
repository_type = Repository
[docs] class UserOAuthAccountService(SQLAlchemyAsyncRepositoryService[m.UserOauthAccount]): """Handles database operations for user roles."""
[docs] class Repository(SQLAlchemyAsyncRepository[m.UserOauthAccount]): """User SQLAlchemy Repository.""" model_type = m.UserOauthAccount
repository_type = Repository