diff --git a/src/api/qualicharge/api/v1/routers/dynamic.py b/src/api/qualicharge/api/v1/routers/dynamic.py index f0ddbc9c..7cd0a4fb 100644 --- a/src/api/qualicharge/api/v1/routers/dynamic.py +++ b/src/api/qualicharge/api/v1/routers/dynamic.py @@ -15,6 +15,7 @@ from qualicharge.auth.schemas import ScopesEnum, User from qualicharge.conf import settings from qualicharge.db import get_session +from qualicharge.exceptions import PermissionDenied from qualicharge.models.dynamic import ( SessionCreate, StatusCreate, @@ -22,6 +23,7 @@ ) from qualicharge.schemas.core import PointDeCharge, Station, Status from qualicharge.schemas.core import Session as QCSession +from qualicharge.schemas.utils import is_pdc_allowed_for_user logger = logging.getLogger(__name__) @@ -160,6 +162,9 @@ async def read_status( session: Session = Depends(get_session), ) -> StatusRead: """Read last known point of charge status.""" + if not is_pdc_allowed_for_user(id_pdc_itinerance, user): + raise PermissionDenied("You cannot read the status of this point of charge") + # Get target point de charge pdc_id = session.exec( select(PointDeCharge.id).where( @@ -226,6 +231,9 @@ async def read_status_history( session: Session = Depends(get_session), ) -> List[StatusRead]: """Read point of charge status history.""" + if not is_pdc_allowed_for_user(id_pdc_itinerance, user): + raise PermissionDenied("You cannot read statuses of this point of charge") + pdc_id = session.exec( select(PointDeCharge.id).where( PointDeCharge.id_pdc_itinerance == id_pdc_itinerance @@ -272,6 +280,9 @@ async def create_status( session: Session = Depends(get_session), ) -> None: """Create a status.""" + if not is_pdc_allowed_for_user(status.id_pdc_itinerance, user): + raise PermissionDenied("You cannot create statuses for this point of charge") + pdc = session.exec( select(PointDeCharge).where( PointDeCharge.id_pdc_itinerance == status.id_pdc_itinerance @@ -295,6 +306,12 @@ async def create_status_bulk( session: Session = Depends(get_session), ) -> None: """Create a statuses batch.""" + for status in statuses: + if not is_pdc_allowed_for_user(status.id_pdc_itinerance, user): + raise PermissionDenied( + "You cannot submit data for an organization you are not assigned to" + ) + # Check if all points of charge exist # ids_pdc_itinerance = list({status.id_pdc_itinerance for status in statuses}) ids_pdc_itinerance = [status.id_pdc_itinerance for status in statuses] @@ -335,6 +352,9 @@ async def create_session( db_session: Session = Depends(get_session), ) -> None: """Create a session.""" + if not is_pdc_allowed_for_user(session.id_pdc_itinerance, user): + raise PermissionDenied("You cannot create sessions for this point of charge") + # ⚠️ Please pay attention to the semantic: # # - `db_session` / `Session` refers to the database session, while, @@ -362,6 +382,12 @@ async def create_session_bulk( db_session: Session = Depends(get_session), ) -> None: """Create a sessions batch.""" + for session in sessions: + if not is_pdc_allowed_for_user(session.id_pdc_itinerance, user): + raise PermissionDenied( + "You cannot submit data for an organization you are not assigned to" + ) + # Check if all points of charge exist ids_pdc_itinerance = [session.id_pdc_itinerance for session in sessions] ids_pdc_itinerance_set = set(ids_pdc_itinerance) diff --git a/src/api/qualicharge/api/v1/routers/static.py b/src/api/qualicharge/api/v1/routers/static.py index 8a5efd43..86850013 100644 --- a/src/api/qualicharge/api/v1/routers/static.py +++ b/src/api/qualicharge/api/v1/routers/static.py @@ -23,11 +23,12 @@ from qualicharge.auth.schemas import ScopesEnum, User from qualicharge.conf import settings from qualicharge.db import get_session -from qualicharge.exceptions import IntegrityError, ObjectDoesNotExist +from qualicharge.exceptions import IntegrityError, ObjectDoesNotExist, PermissionDenied from qualicharge.models.static import Statique from qualicharge.schemas.core import OperationalUnit, PointDeCharge, Station from qualicharge.schemas.utils import ( build_statique, + is_pdc_allowed_for_user, list_statique, save_statique, save_statiques, @@ -150,6 +151,9 @@ async def read( session: Session = Depends(get_session), ) -> Statique: """Read statique item (point de charge).""" + if not is_pdc_allowed_for_user(id_pdc_itinerance, user): + raise PermissionDenied("You don't manage this point of charge") + try: statique = build_statique(session, id_pdc_itinerance) except ObjectDoesNotExist as err: @@ -176,6 +180,9 @@ async def update( session: Session = Depends(get_session), ) -> Statique: """Update statique item (point de charge).""" + if not is_pdc_allowed_for_user(id_pdc_itinerance, user): + raise PermissionDenied("You don't manage this point of charge") + try: update = update_statique(session, id_pdc_itinerance, statique) except IntegrityError as err: @@ -199,6 +206,11 @@ async def create( session: Session = Depends(get_session), ) -> StatiqueItemsCreatedResponse: """Create a statique item.""" + if not is_pdc_allowed_for_user(statique.id_pdc_itinerance, user): + raise PermissionDenied( + "You cannot submit data for an organization you are not assigned to" + ) + try: db_statique = save_statique(session, statique) except ObjectDoesNotExist as err: @@ -217,6 +229,12 @@ async def bulk( session: Session = Depends(get_session), ) -> StatiqueItemsCreatedResponse: """Create a set of statique items.""" + for statique in statiques: + if not is_pdc_allowed_for_user(statique.id_pdc_itinerance, user): + raise PermissionDenied( + "You cannot submit data for an organization you are not assigned to" + ) + try: statiques = [statique for statique in save_statiques(session, statiques)] except ObjectDoesNotExist as err: diff --git a/src/api/qualicharge/models/static.py b/src/api/qualicharge/models/static.py index 098505c0..f677dc68 100644 --- a/src/api/qualicharge/models/static.py +++ b/src/api/qualicharge/models/static.py @@ -13,11 +13,12 @@ PositiveFloat, PositiveInt, WithJsonSchema, + model_validator, ) from pydantic.types import PastDate from pydantic_extra_types.coordinate import Coordinate from pydantic_extra_types.phone_numbers import PhoneNumber -from typing_extensions import Annotated +from typing_extensions import Annotated, Self from .utils import ModelSchemaMixin @@ -140,3 +141,15 @@ class Statique(ModelSchemaMixin, BaseModel): observations: Optional[str] date_maj: PastDate cable_t2_attache: Optional[bool] + + @model_validator(mode="after") + def check_afirev_prefix(self) -> Self: + """Check that id_pdc_itinerance and id_station_itinerance prefixes match.""" + if self.id_station_itinerance[:5] != self.id_station_itinerance[:5]: + raise ValueError( + ( + "AFIREV prefixes from id_station_itinerance and " + "id_pdc_itinerance do not match" + ) + ) + return self diff --git a/src/api/qualicharge/schemas/utils.py b/src/api/qualicharge/schemas/utils.py index a25b787b..d11e8fee 100644 --- a/src/api/qualicharge/schemas/utils.py +++ b/src/api/qualicharge/schemas/utils.py @@ -8,6 +8,8 @@ from sqlalchemy.schema import Column as SAColumn from sqlmodel import Session, SQLModel, select +from qualicharge.auth.schemas import User + from ..exceptions import ( DatabaseQueryException, DuplicateEntriesSubmitted, @@ -340,3 +342,12 @@ def list_statique( ) for pdc in session.exec(statement).all(): yield pdc_to_statique(pdc) + + +def is_pdc_allowed_for_user(id_pdc_itinerance: str, user: User): + """Check if a user can create/read/update a PDC given its identifier.""" + if user.is_superuser: + return True + if id_pdc_itinerance[:5] in [ou.code for ou in user.operational_units]: + return True + return False