from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from uuid import UUID
from advanced_alchemy.extensions.litestar.dto import SQLAlchemyDTO
from litestar import Controller, delete, get, patch, post
from app.db import models as m
from app.domain.accounts.guards import requires_active_user, requires_superuser
from app.domain.tags.services import TagService
from app.lib import dto
from app.lib.deps import create_service_dependencies
from . import urls
if TYPE_CHECKING:
from advanced_alchemy.filters import FilterTypes
from advanced_alchemy.service import OffsetPagination
from litestar.dto import DTOData
from litestar.params import Dependency, Parameter
[docs]
class TagDTO(SQLAlchemyDTO[m.Tag]):
config = dto.config(max_nested_depth=0, exclude={"created_at", "updated_at", "teams"})
[docs]
class TagCreateDTO(SQLAlchemyDTO[m.Tag]):
config = dto.config(max_nested_depth=0, exclude={"id", "created_at", "updated_at", "teams"})
[docs]
class TagUpdateDTO(SQLAlchemyDTO[m.Tag]):
config = dto.config(max_nested_depth=0, exclude={"id", "created_at", "updated_at", "teams"}, partial=True)
[docs]
class TagController(Controller):
"""Handles the interactions within the Tag objects.
We use a DTO in this controller purely as an example of how to integrate it with SQLAlchemy.
You can use either pattern you see in this repository as an example.
"""
guards = [requires_active_user]
dependencies = create_service_dependencies(
TagService,
key="tags_service",
load=[m.Tag.teams],
filters={
"id_filter": UUID,
"created_at": True,
"updated_at": True,
"sort_field": "name",
"search": "name,slug",
},
)
tags = ["Tags"]
return_dto = TagDTO
@get(operation_id="ListTags", path=urls.TAG_LIST)
async def list_tags(
self,
tags_service: TagService,
filters: Annotated[list[FilterTypes], Dependency(skip_validation=True)],
) -> OffsetPagination[m.Tag]:
"""List tags."""
results, total = await tags_service.list_and_count(*filters)
return tags_service.to_schema(data=results, total=total, filters=filters)
@get(operation_id="GetTag", path=urls.TAG_DETAILS)
async def get_tag(
self,
tags_service: TagService,
tag_id: Annotated[UUID, Parameter(title="Tag ID", description="The tag to retrieve.")],
) -> m.Tag:
"""Get a tag."""
db_obj = await tags_service.get(tag_id)
return tags_service.to_schema(db_obj)
@post(operation_id="CreateTag", guards=[requires_superuser], path=urls.TAG_CREATE, dto=TagCreateDTO)
async def create_tag(self, tags_service: TagService, data: DTOData[m.Tag]) -> m.Tag:
"""Create a new tag."""
db_obj = await tags_service.create(data)
return tags_service.to_schema(db_obj)
@patch(operation_id="UpdateTag", path=urls.TAG_UPDATE, guards=[requires_superuser], dto=TagUpdateDTO)
async def update_tag(
self,
tags_service: TagService,
data: DTOData[m.Tag],
tag_id: Annotated[UUID, Parameter(title="Tag ID", description="The tag to update.")],
) -> m.Tag:
"""Update a tag."""
db_obj = await tags_service.update(item_id=tag_id, data=data.create_instance())
return tags_service.to_schema(db_obj)
@delete(operation_id="DeleteTag", path=urls.TAG_DELETE, guards=[requires_superuser], return_dto=None)
async def delete_tag(
self,
tags_service: TagService,
tag_id: Annotated[UUID, Parameter(title="Tag ID", description="The tag to delete.")],
) -> None:
"""Delete a tag."""
_ = await tags_service.delete(tag_id)