from __future__ import annotations
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING, Any, cast
from advanced_alchemy.extensions.litestar import repository, service
from litestar.exceptions import ClientException, PermissionDeniedException
from sqlalchemy.orm import undefer_group
from app.db import models as m
from app.lib import constants, crypt
from app.lib.deps import CompositeServiceMixin
from app.lib.validation import PasswordValidationError, validate_password_strength
MAX_FAILED_RESET_ATTEMPTS = 5
if TYPE_CHECKING:
from uuid import UUID
from httpx_oauth.oauth2 import OAuth2Token
from app.domain.accounts.services._user_oauth_account import UserOAuthAccountService
[docs]
class UserService(CompositeServiceMixin, service.SQLAlchemyAsyncRepositoryService[m.User]):
"""Handles database operations for users."""
[docs]
class Repo(repository.SQLAlchemyAsyncRepository[m.User]):
"""User SQLAlchemy Repository."""
model_type = m.User
repository_type = Repo
default_role = constants.DEFAULT_ACCESS_ROLE
match_fields = ["email"]
@property
def oauth_accounts(self) -> UserOAuthAccountService:
"""Lazy-loaded OAuth account service sharing this session."""
from app.domain.accounts.services._user_oauth_account import UserOAuthAccountService
return self._get_service(UserOAuthAccountService)
[docs]
async def to_model_on_create(self, data: service.ModelDictT[m.User]) -> service.ModelDictT[m.User]:
return await self._populate_model(data)
[docs]
async def to_model_on_update(self, data: service.ModelDictT[m.User]) -> service.ModelDictT[m.User]:
return await self._populate_model(data)
[docs]
async def to_model_on_upsert(self, data: service.ModelDictT[m.User]) -> service.ModelDictT[m.User]:
return await self._populate_model(data)
[docs]
async def authenticate(self, username: str, password: bytes | str) -> m.User:
"""Authenticate a user against the stored hashed password.
Returns:
The user object if authentication is successful.
Raises:
PermissionDeniedException: If the user is not found, the password is invalid, or the account is inactive.
"""
db_obj = await self.get_one_or_none(email=username, load=[undefer_group("security_sensitive")])
if db_obj is None:
msg = "User not found or password invalid"
raise PermissionDeniedException(detail=msg)
if db_obj.hashed_password is None:
msg = "User not found or password invalid."
raise PermissionDeniedException(detail=msg)
if not await crypt.verify_password(password, db_obj.hashed_password):
msg = "User not found or password invalid"
raise PermissionDeniedException(detail=msg)
if not db_obj.is_active:
msg = "User account is inactive"
raise PermissionDeniedException(detail=msg)
return db_obj
[docs]
async def verify_email(self, user_id: UUID, email: str) -> m.User:
"""Mark user's email as verified.
Args:
user_id: The user's UUID
email: The email address that was verified
Returns:
The updated user object
Raises:
ClientException: If user not found or email doesn't match
"""
db_obj = await self.get_one_or_none(id=user_id)
if db_obj is None:
raise ClientException(detail="User not found", status_code=404)
if db_obj.email != email:
raise ClientException(detail="Email address does not match user account", status_code=400)
db_obj.is_verified = True
db_obj.verified_at = datetime.now(UTC).date()
return await self.update(data=db_obj)
[docs]
async def is_email_verified(self, user_id: UUID) -> bool:
"""Check if user's email is verified.
Args:
user_id: The user's UUID
Returns:
True if email is verified, False otherwise
"""
db_obj = await self.get_one_or_none(id=user_id)
return db_obj.is_verified if db_obj else False
[docs]
async def require_verified_email(self, user: m.User) -> None:
"""Raise exception if user's email is not verified.
Args:
user: The user object to check
Raises:
PermissionDeniedException: If email is not verified
"""
if not user.is_verified:
msg = "Email verification required"
raise PermissionDeniedException(detail=msg)
[docs]
async def update_password(self, data: dict[str, Any], db_obj: m.User) -> None:
"""Modify stored user password.
Raises:
PermissionDeniedException: If the user is not found, the password is invalid, or the account is inactive.
"""
db_obj = await self.get(db_obj.id, load=[undefer_group("security_sensitive")])
if db_obj.hashed_password is None:
msg = "User not found or password invalid."
raise PermissionDeniedException(detail=msg)
if not await crypt.verify_password(data["current_password"], db_obj.hashed_password):
msg = "User not found or password invalid."
raise PermissionDeniedException(detail=msg)
if not db_obj.is_active:
msg = "User account is not active"
raise PermissionDeniedException(detail=msg)
db_obj.hashed_password = await crypt.get_password_hash(data["new_password"])
await self.update(
item_id=db_obj.id,
data={"hashed_password": db_obj.hashed_password},
auto_commit=True,
)
[docs]
@staticmethod
async def has_role_id(db_obj: m.User, role_id: UUID) -> bool:
"""Return true if user has specified role ID"""
return any(assigned_role.role_id for assigned_role in db_obj.roles if assigned_role.role_id == role_id)
[docs]
@staticmethod
async def has_role(db_obj: m.User, role_name: str) -> bool:
"""Return true if user has specified role ID"""
return any(assigned_role.role_id for assigned_role in db_obj.roles if assigned_role.role_name == role_name)
@staticmethod
def is_superuser(user: m.User) -> bool:
return bool(
user.is_superuser
or any(
assigned_role.role_name
for assigned_role in user.roles
if assigned_role.role_name == constants.SUPERUSER_ACCESS_ROLE
),
)
[docs]
async def reset_password_with_token(self, user_id: UUID, new_password: str) -> m.User:
"""Reset user's password using a validated token.
Args:
user_id: The user's UUID
new_password: The new password
Returns:
The updated user object
Raises:
ClientException: If user not found or password validation fails
"""
try:
validate_password_strength(new_password)
except PasswordValidationError as e:
raise ClientException(detail=str(e), status_code=400) from e
db_obj = await self.get_one_or_none(id=user_id)
if db_obj is None:
raise ClientException(detail="User not found", status_code=404)
if not db_obj.is_active:
raise ClientException(detail="User account is inactive", status_code=403)
db_obj.hashed_password = await crypt.get_password_hash(new_password)
db_obj.password_reset_at = datetime.now(UTC)
db_obj.failed_reset_attempts = 0
db_obj.reset_locked_until = None
return await self.update(db_obj)
[docs]
async def is_reset_rate_limited(self, user_id: UUID) -> bool:
"""Check if user is rate limited for password resets.
Args:
user_id: The user's UUID
Returns:
True if user is rate limited, False otherwise
"""
db_obj = await self.get_one_or_none(id=user_id)
if db_obj is None:
return False
return bool(db_obj.reset_locked_until and db_obj.reset_locked_until > datetime.now(UTC))
[docs]
async def increment_failed_reset_attempt(self, user_id: UUID) -> None:
"""Increment failed reset attempts counter.
Args:
user_id: The user's UUID
"""
db_obj = await self.get_one_or_none(id=user_id)
if db_obj is None:
return
db_obj.failed_reset_attempts += 1
if db_obj.failed_reset_attempts >= MAX_FAILED_RESET_ATTEMPTS:
db_obj.reset_locked_until = datetime.now(UTC) + timedelta(hours=1)
await self.update(
item_id=db_obj.id,
data={
"failed_reset_attempts": db_obj.failed_reset_attempts,
"reset_locked_until": db_obj.reset_locked_until,
},
auto_commit=True,
)
async def _populate_model(self, data: service.ModelDictT[m.User]) -> service.ModelDictT[m.User]:
data = service.schema_dump(data)
data = await self._populate_with_hashed_password(data)
data = await self._populate_with_backup_codes(data)
return await self._populate_with_role(data)
async def _populate_with_hashed_password(self, data: service.ModelDictT[m.User]) -> service.ModelDictT[m.User]:
if service.is_dict(data) and (password := data.pop("password", None)) is not None:
data["hashed_password"] = await crypt.get_password_hash(password)
return data
async def _populate_with_backup_codes(self, data: service.ModelDictT[m.User]) -> service.ModelDictT[m.User]:
if not service.is_dict(data):
return data
if "backup_codes" not in data:
return data
codes = data.get("backup_codes")
if not isinstance(codes, list):
return data
typed_codes = cast("list[object]", codes)
if not all(code is None or isinstance(code, str) for code in typed_codes):
return data
validated_codes = cast("list[str | None]", typed_codes)
non_null_codes = [code for code in validated_codes if code is not None]
if not non_null_codes or all(code.startswith("$") for code in non_null_codes):
return data
data["backup_codes"] = [
None if code is None else await crypt.get_password_hash(code) for code in validated_codes
]
return data
async def _populate_with_role(self, data: service.ModelDictT[m.User]) -> service.ModelDictT[m.User]:
if service.is_dict(data) and (role_id := data.pop("role_id", None)) is not None:
data = await self.to_model(data)
data.roles.append(m.UserRole(role_id=role_id, assigned_at=datetime.now(UTC)))
return data
[docs]
async def create_user_from_oauth(
self,
oauth_data: dict[str, Any],
provider: str,
token_data: OAuth2Token,
) -> m.User:
"""Create new user from OAuth data.
Args:
oauth_data: User data from OAuth provider
provider: OAuth provider name (e.g., 'google')
token_data: OAuth token data
Returns:
The created user object
"""
email = oauth_data.get("email", "")
name = oauth_data.get("name", "")
user_data = {
"email": email,
"name": name,
"is_verified": True,
"verified_at": datetime.now(UTC).date(),
"is_active": True,
}
return await self.create(data=user_data)
[docs]
async def authenticate_or_create_oauth_user(
self,
provider: str,
oauth_data: dict[str, Any],
token_data: OAuth2Token,
) -> tuple[m.User, bool]:
"""Authenticate existing OAuth user or create new one.
Args:
provider: OAuth provider name
oauth_data: User data from OAuth provider
token_data: OAuth token data
Returns:
Tuple of (user, is_new_user)
"""
email = oauth_data.get("email", "")
existing_user = await self.get_one_or_none(email=email) if email else None
if existing_user:
await self.oauth_accounts.create_or_update_oauth_account(
user_id=existing_user.id,
provider=provider,
oauth_data=oauth_data,
token_data=token_data,
)
return existing_user, False
new_user = await self.create_user_from_oauth(
oauth_data=oauth_data,
provider=provider,
token_data=token_data,
)
await self.oauth_accounts.create_or_update_oauth_account(
user_id=new_user.id,
provider=provider,
oauth_data=oauth_data,
token_data=token_data,
)
return new_user, True