Source code for app.domain.teams.services._team

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from advanced_alchemy.exceptions import RepositoryError
from advanced_alchemy.extensions.litestar import repository, service

from app.db import models as m
from app.lib import constants
from app.lib.deps import CompositeServiceMixin

if TYPE_CHECKING:
    from uuid import UUID

    from advanced_alchemy.service import ModelDictT

    from app.domain.tags.services import TagService


[docs] class TeamService(CompositeServiceMixin, service.SQLAlchemyAsyncRepositoryService[m.Team]): """Team Service."""
[docs] class Repo(repository.SQLAlchemyAsyncSlugRepository[m.Team]): """Team Repository.""" model_type = m.Team
repository_type = Repo match_fields = ["name"] @property def tags(self) -> TagService: """Lazy-loaded tag service sharing this session.""" from app.domain.tags.services import TagService return self._get_service(TagService)
[docs] async def to_model_on_create(self, data: ModelDictT[m.Team]) -> ModelDictT[m.Team]: data = service.schema_dump(data) data = await self._populate_slug(data) return await self._populate_with_owner_and_tags(data, "create")
[docs] async def to_model_on_update(self, data: ModelDictT[m.Team]) -> ModelDictT[m.Team]: data = service.schema_dump(data) data = await self._populate_slug(data) return await self._populate_with_owner_and_tags(data, "update")
[docs] async def to_model_on_upsert(self, data: ModelDictT[m.Team]) -> ModelDictT[m.Team]: data = service.schema_dump(data) data = await self._populate_slug(data) return await self._populate_with_owner_and_tags(data, "upsert")
@staticmethod def can_view_all(user: m.User) -> bool: return bool( user.is_superuser or any( assigned_role.role.name for assigned_role in user.roles if assigned_role.role.name == constants.SUPERUSER_ACCESS_ROLE ), ) async def _populate_slug(self, data: ModelDictT[m.Team]) -> ModelDictT[m.Team]: if service.is_dict_without_field(data, "slug") and service.is_dict_with_field(data, "name"): data["slug"] = await self.repository.get_available_slug(data["name"]) return data async def _populate_with_owner_and_tags( self, data: ModelDictT[m.Team], operation: str | None, ) -> ModelDictT[m.Team]: if not service.is_dict(data): return data owner_id: UUID | None = data.pop("owner_id", None) owner: m.User | None = data.pop("owner", None) input_tags: list[str] | None = data.pop("tags", None) if operation == "create" and owner_id is None and owner is None: msg = "'owner_id' is required to create a team." raise RepositoryError(msg) data = await super().to_model(data) # For create, add the owner as an admin member if operation == "create" and (owner or owner_id): owner_data: dict[str, Any] = {"user": owner} if owner else {"user_id": owner_id} data.members.append(m.TeamMember(**owner_data, role=m.TeamRoles.ADMIN, is_owner=True)) # Set tags - for updates, SQLAlchemy will replace the existing tags when # the attribute is copied to the existing instance in service.update() if input_tags is not None: data.tags = [await self.tags.upsert({"name": tag_text}) for tag_text in input_tags] return data
[docs] async def update( self, data: ModelDictT[m.Team], item_id: Any | None = None, **kwargs: Any, ) -> m.Team: """Update team with owner change support. Handles owner changes by updating member records after the base update. """ # Check if data contains owner_id for ownership transfer pending_owner_id: UUID | None = None if service.is_dict(data) and "owner_id" in data: pending_owner_id = data.get("owner_id") # Perform the base update team = await super().update(data, item_id=item_id, **kwargs) # Handle ownership transfer if needed if pending_owner_id is not None: await self._transfer_ownership(team, pending_owner_id) return team
async def _transfer_ownership(self, team: m.Team, new_owner_id: UUID) -> None: """Transfer team ownership to a new member.""" # Load members if not already loaded await self.repository.session.refresh(team, ["members"]) for member in team.members: if member.user_id == new_owner_id: member.is_owner = True member.role = m.TeamRoles.ADMIN elif member.is_owner: member.is_owner = False