Source code for app.domain.accounts.guards

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from litestar.exceptions import PermissionDeniedException
from litestar.security.jwt import OAuth2PasswordBearerAuth

from app.config import constants
from app.config.app import alchemy
from app.config.base import get_settings
from app.db.models import User
from app.domain.accounts import urls
from app.domain.accounts.dependencies import provide_users_service

if TYPE_CHECKING:
    from litestar.connection import ASGIConnection
    from litestar.handlers.base import BaseRouteHandler
    from litestar.security.jwt import Token


__all__ = ("requires_superuser", "requires_active_user", "requires_verified_user", "current_user_from_token", "auth")


settings = get_settings()


[docs] def requires_active_user(connection: ASGIConnection, _: BaseRouteHandler) -> None: """Request requires active user. Verifies the request user is active. Args: connection (ASGIConnection): HTTP Request _ (BaseRouteHandler): Route handler Raises: PermissionDeniedException: Permission denied exception """ if connection.user.is_active: return msg = "Inactive account" raise PermissionDeniedException(msg)
[docs] def requires_superuser(connection: ASGIConnection, _: BaseRouteHandler) -> None: """Request requires active superuser. Args: connection (ASGIConnection): HTTP Request _ (BaseRouteHandler): Route handler Raises: PermissionDeniedException: Permission denied exception Returns: None: Returns None when successful """ if connection.user.is_superuser: return raise PermissionDeniedException(detail="Insufficient privileges")
[docs] def requires_verified_user(connection: ASGIConnection, _: BaseRouteHandler) -> None: """Verify the connection user is a superuser. Args: connection (ASGIConnection): Request/Connection object. _ (BaseRouteHandler): Route handler. Raises: PermissionDeniedException: Not authorized Returns: None: Returns None when successful """ if connection.user.is_verified: return raise PermissionDeniedException(detail="User account is not verified.")
[docs] async def current_user_from_token(token: Token, connection: ASGIConnection[Any, Any, Any, Any]) -> User | None: """Lookup current user from local JWT token. Fetches the user information from the database Args: token (str): JWT Token Object connection (ASGIConnection[Any, Any, Any, Any]): ASGI connection. Returns: User: User record mapped to the JWT identifier """ service = await anext(provide_users_service(alchemy.provide_session(connection.app.state, connection.scope))) user = await service.get_one_or_none(email=token.sub) return user if user and user.is_active else None
auth = OAuth2PasswordBearerAuth[User]( retrieve_user_handler=current_user_from_token, token_secret=settings.app.SECRET_KEY, token_url=urls.ACCOUNT_LOGIN, exclude=[ constants.HEALTH_ENDPOINT, urls.ACCOUNT_LOGIN, urls.ACCOUNT_REGISTER, "^/schema", "^/public/", "^/saq/static/", ], )