from __future__ import annotations
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any
from uuid import UUID # noqa: TCH003
from advanced_alchemy.repository import Empty, EmptyType, ErrorMessages
from advanced_alchemy.service import (
ModelDictT,
SQLAlchemyAsyncRepositoryService,
is_dict,
is_msgspec_model,
is_pydantic_model,
)
from litestar.exceptions import PermissionDeniedException
from app.config import constants
from app.db.models import Role, User, UserOauthAccount, UserRole
from app.lib import crypt
from .repositories import RoleRepository, UserOauthAccountRepository, UserRepository, UserRoleRepository
if TYPE_CHECKING:
from collections.abc import Iterable
from advanced_alchemy.repository import LoadSpec
from sqlalchemy.orm import InstrumentedAttribute
[docs]
class UserService(SQLAlchemyAsyncRepositoryService[User]):
"""Handles database operations for users."""
repository_type = UserRepository
default_role = constants.DEFAULT_USER_ROLE
[docs]
def __init__(self, **repo_kwargs: Any) -> None:
self.repository: UserRepository = self.repository_type(**repo_kwargs)
self.model_type = self.repository.model_type
[docs]
async def create(
self,
data: ModelDictT[User],
*,
auto_commit: bool | None = None,
auto_expunge: bool | None = None,
auto_refresh: bool | None = None,
error_messages: ErrorMessages | None | EmptyType = Empty,
) -> User:
"""Create a new User and assign default Role."""
if isinstance(data, dict):
role_id: UUID | None = data.pop("role_id", None)
data = await self.to_model(data, "create")
if role_id:
data.roles.append(UserRole(role_id=role_id, assigned_at=datetime.now(timezone.utc))) # noqa: UP017
return await super().create(
data=data,
auto_commit=auto_commit,
auto_expunge=auto_expunge,
auto_refresh=auto_refresh,
error_messages=error_messages,
)
[docs]
async def update(
self,
data: ModelDictT[User],
item_id: Any | None = None,
*,
id_attribute: str | InstrumentedAttribute[Any] | None = None,
attribute_names: Iterable[str] | None = None,
with_for_update: bool | None = None,
auto_commit: bool | None = None,
auto_expunge: bool | None = None,
auto_refresh: bool | None = None,
error_messages: ErrorMessages | None | EmptyType = Empty,
load: LoadSpec | None = None,
execution_options: dict[str, Any] | None = None,
) -> User:
if isinstance(data, dict):
role_id: UUID | None = data.pop("role_id", None)
data = await self.to_model(data, "update")
if role_id:
data.roles.append(UserRole(role_id=role_id, assigned_at=datetime.now(timezone.utc))) # noqa: UP017
return await super().update(
data=data,
item_id=item_id,
attribute_names=attribute_names,
with_for_update=with_for_update,
auto_commit=auto_commit,
auto_expunge=auto_expunge,
auto_refresh=auto_refresh,
id_attribute=id_attribute,
error_messages=error_messages,
load=load,
execution_options=execution_options,
)
[docs]
async def authenticate(self, username: str, password: bytes | str) -> User:
"""Authenticate a user.
Args:
username (str): _description_
password (str | bytes): _description_
Raises:
NotAuthorizedException: Raised when the user doesn't exist, isn't verified, or is not active.
Returns:
User: The user object
"""
db_obj = await self.get_one_or_none(email=username)
if db_obj is None:
msg = "User not found or password invalid"
raise PermissionDeniedException(detail=msg)
if db_obj.hashed_password is None:
msg = "User not found or password invalid."
raise PermissionDeniedException(detail=msg)
if not await crypt.verify_password(password, db_obj.hashed_password):
msg = "User not found or password invalid"
raise PermissionDeniedException(detail=msg)
if not db_obj.is_active:
msg = "User account is inactive"
raise PermissionDeniedException(detail=msg)
return db_obj
[docs]
async def update_password(self, data: dict[str, Any], db_obj: User) -> None:
"""Update stored user password.
This is only used when not used IAP authentication.
Args:
data (UserPasswordUpdate): _description_
db_obj (User): _description_
Raises:
PermissionDeniedException: _description_
"""
if db_obj.hashed_password is None:
msg = "User not found or password invalid."
raise PermissionDeniedException(detail=msg)
if not await crypt.verify_password(data["current_password"], db_obj.hashed_password):
msg = "User not found or password invalid."
raise PermissionDeniedException(detail=msg)
if not db_obj.is_active:
msg = "User account is not active"
raise PermissionDeniedException(detail=msg)
db_obj.hashed_password = await crypt.get_password_hash(data["new_password"])
await self.repository.update(db_obj)
[docs]
@staticmethod
async def has_role_id(db_obj: User, role_id: UUID) -> bool:
"""Return true if user has specified role ID"""
return any(assigned_role.role_id for assigned_role in db_obj.roles if assigned_role.role_id == role_id)
[docs]
@staticmethod
async def has_role(db_obj: User, role_name: str) -> bool:
"""Return true if user has specified role ID"""
return any(assigned_role.role_id for assigned_role in db_obj.roles if assigned_role.role_name == role_name)
@staticmethod
def is_superuser(user: User) -> bool:
return bool(
user.is_superuser
or any(
assigned_role.role.name
for assigned_role in user.roles
if assigned_role.role.name in {constants.SUPERUSER_ACCESS_ROLE}
),
)
[docs]
async def to_model(self, data: ModelDictT[User], operation: str | None = None) -> User:
if isinstance(data, dict) and "password" in data:
password: bytes | str | None = data.pop("password", None)
if password is not None:
data.update({"hashed_password": await crypt.get_password_hash(password)})
return await super().to_model(data, operation)
[docs]
class RoleService(SQLAlchemyAsyncRepositoryService[Role]):
"""Handles database operations for users."""
repository_type = RoleRepository
match_fields = ["name"]
[docs]
def __init__(self, **repo_kwargs: Any) -> None:
self.repository: RoleRepository = self.repository_type(**repo_kwargs)
self.model_type = self.repository.model_type
[docs]
async def to_model(self, data: ModelDictT[Role], operation: str | None = None) -> Role:
if (is_msgspec_model(data) or is_pydantic_model(data)) and operation == "create" and data.slug is None: # type: ignore[union-attr]
data.slug = await self.repository.get_available_slug(data.name) # type: ignore[union-attr]
if (is_msgspec_model(data) or is_pydantic_model(data)) and operation == "update" and data.slug is None: # type: ignore[union-attr]
data.slug = await self.repository.get_available_slug(data.name) # type: ignore[union-attr]
if is_dict(data) and "slug" not in data and operation == "create":
data["slug"] = await self.repository.get_available_slug(data["name"])
if is_dict(data) and "slug" not in data and "name" in data and operation == "update":
data["slug"] = await self.repository.get_available_slug(data["name"])
return await super().to_model(data, operation)
[docs]
class UserRoleService(SQLAlchemyAsyncRepositoryService[UserRole]):
"""Handles database operations for user roles."""
repository_type = UserRoleRepository
[docs]
class UserOAuthAccountService(SQLAlchemyAsyncRepositoryService[UserOauthAccount]):
"""Handles database operations for user roles."""
repository_type = UserOauthAccountRepository