From dba509e65c537b35a1ea9332f96459633585e8f0 Mon Sep 17 00:00:00 2001 From: Julien Maupetit Date: Fri, 24 Jan 2025 14:53:34 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=EF=B8=8F(api)=20use=20the=20statique?= =?UTF-8?q?=20materialized=20view=20in=20API=20endpoints?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We now use the Statique materialized view to improve API performances. --- src/api/CHANGELOG.md | 1 + src/api/qualicharge/api/v1/routers/static.py | 81 +++++++++++-------- src/api/qualicharge/schemas/core.py | 7 ++ src/api/tests/api/v1/routers/test_statique.py | 17 +++- src/api/tests/schemas/test_static.py | 17 +++- 5 files changed, 88 insertions(+), 35 deletions(-) diff --git a/src/api/CHANGELOG.md b/src/api/CHANGELOG.md index 6a88f767..03c83281 100644 --- a/src/api/CHANGELOG.md +++ b/src/api/CHANGELOG.md @@ -29,6 +29,7 @@ and this project adheres to - Improve JSON string parsing using pyarrow engine - Add default values for optional Statique model fields - Migrate database enum types from names to values +- Improve API performance by integrating the `Statique` materialized view - Upgrade alembic to `1.14.1` - Upgrade geoalchemy2 to `0.17.0` - Upgrade psycopg to `3.2.4` diff --git a/src/api/qualicharge/api/v1/routers/static.py b/src/api/qualicharge/api/v1/routers/static.py index bc8f3011..949c6e8d 100644 --- a/src/api/qualicharge/api/v1/routers/static.py +++ b/src/api/qualicharge/api/v1/routers/static.py @@ -18,8 +18,14 @@ ) from psycopg import Error as PGError from pydantic import AnyHttpUrl, BaseModel, computed_field -from sqlalchemy import func -from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError +from sqlalchemy import any_, func +from sqlalchemy.dialects.postgresql import array +from sqlalchemy.exc import ( + IntegrityError, + NoResultFound, + OperationalError, + ProgrammingError, +) from sqlalchemy.schema import Column as SAColumn from sqlmodel import Session, select @@ -36,13 +42,11 @@ PermissionDenied, ) from qualicharge.models.static import Statique -from qualicharge.schemas.core import OperationalUnit, PointDeCharge, Station +from qualicharge.schemas.core import PointDeCharge, StatiqueMV from qualicharge.schemas.sql import StatiqueImporter from qualicharge.schemas.utils import ( are_pdcs_allowed_for_user, - build_statique, is_pdc_allowed_for_user, - list_statique, save_statique, update_statique, ) @@ -115,34 +119,38 @@ async def list( ), session: Session = Depends(get_session), ) -> PaginatedStatiqueListResponse: - """List statique items.""" + """List statique items. + + Note that it can take up to 10 minutes for a created Statique item to appear in + this endpoint response. + """ current_url = request.url previous_url = next_url = None - total_statement = select(func.count(cast(SAColumn, PointDeCharge.id))) - operational_units = None + + total_statement = select(func.count(cast(SAColumn, StatiqueMV.pdc_id))) + + ou_filter: array | None = None if not user.is_superuser: - operational_units = user.operational_units - total_statement = ( - total_statement.join_from( - PointDeCharge, - Station, - PointDeCharge.station_id == Station.id, # type: ignore[arg-type] - ) - .join_from( - Station, - OperationalUnit, - Station.operational_unit_id == OperationalUnit.id, # type: ignore[arg-type] - ) - .where( - cast(SAColumn, OperationalUnit.id).in_( - ou.id for ou in user.operational_units - ) - ) + # If user has no assigned operational units, we filter on an empty VARCHAR array + ou_filter = array([f"{ou.code}%" for ou in user.operational_units] or [""]) + + if ou_filter is not None: + total_statement = total_statement.where( + cast(SAColumn, StatiqueMV.id_pdc_itinerance).like(any_(ou_filter)) ) total = session.exec(total_statement).one() + + statement = select(StatiqueMV) + if ou_filter is not None: + statement = statement.where( + cast(SAColumn, StatiqueMV.id_pdc_itinerance).like(any_(ou_filter)) + ) + statement = ( + statement.order_by(StatiqueMV.id_pdc_itinerance).offset(offset).limit(limit) + ) statiques = [ - statique - for statique in list_statique(session, offset, limit, operational_units) + Statique(**s.model_dump(exclude={"pdc_id", "pdc_updated_at"})) + for s in session.exec(statement).all() ] previous_offset = offset - limit if offset > limit else 0 @@ -176,18 +184,27 @@ async def read( ], session: Session = Depends(get_session), ) -> Statique: - """Read statique item (point de charge).""" + """Read statique item (point de charge). + + Note that it can take up to 10 minutes for a created Statique item to appear in + this endpoint response. + """ if not is_pdc_allowed_for_user(id_pdc_itinerance, user): raise PermissionDenied("You don't manage this point of charge") try: - statique = build_statique(session, id_pdc_itinerance) - except ObjectDoesNotExist as err: + statique_mv = session.exec( + select(StatiqueMV).where(StatiqueMV.id_pdc_itinerance == id_pdc_itinerance) + ).one() + except NoResultFound as err: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail="Requested statique does not exist", + detail=( + "Requested statique does not exist yet. You should wait up to " + "10 minutes for a newly created entry." + ), ) from err - return statique + return Statique(**statique_mv.model_dump(exclude={"pdc_id", "pdc_updated_at"})) @router.put("/{id_pdc_itinerance}", status_code=status.HTTP_200_OK) diff --git a/src/api/qualicharge/schemas/core.py b/src/api/qualicharge/schemas/core.py index 9b070169..88f4926a 100644 --- a/src/api/qualicharge/schemas/core.py +++ b/src/api/qualicharge/schemas/core.py @@ -494,6 +494,13 @@ class StatiqueMV(Statique, SQLModel): pdc_id: UUID pdc_updated_at: datetime + # WKBElement to Coordinate + @field_serializer("coordonneesXY") + @staticmethod + def _wkb_to_coordinates(value: WKBElement): + """Convert WKB to Coordinate.""" + return Localisation._wkb_to_coordinates(value) + class _StatiqueMV(SQLModel): """Statique Materialized view. diff --git a/src/api/tests/api/v1/routers/test_statique.py b/src/api/tests/api/v1/routers/test_statique.py index 4e1fc6cb..0cc3524a 100644 --- a/src/api/tests/api/v1/routers/test_statique.py +++ b/src/api/tests/api/v1/routers/test_statique.py @@ -10,6 +10,7 @@ from pydantic_extra_types.coordinate import Coordinate from sqlalchemy import Column as SAColumn from sqlalchemy import func +from sqlalchemy_utils import refresh_materialized_view from sqlmodel import select from qualicharge.auth.factories import GroupFactory @@ -18,6 +19,7 @@ from qualicharge.factories.static import StatiqueFactory from qualicharge.models.static import Statique from qualicharge.schemas.core import ( + STATIQUE_MV_TABLE_NAME, OperationalUnit, PointDeCharge, Station, @@ -63,6 +65,7 @@ def test_list_for_superuser(client_auth, db_session): n_statiques = 3 statiques = StatiqueFactory.batch(n_statiques) save_statiques(db_session, statiques) + refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME) db_statiques = sorted( statiques, key=lambda s: s.id_pdc_itinerance, @@ -113,6 +116,7 @@ def test_list_for_user(client_auth, db_session): # Create statiques n_statiques = 20 save_statiques(db_session, StatiqueFactory.batch(n_statiques)) + refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME) # Select operational units linked to stations operational_units = db_session.exec( @@ -221,6 +225,7 @@ def test_list_pagination(client_auth, db_session): """Test the /statique/ list endpoint results pagination.""" n_statiques = 3 save_statiques(db_session, StatiqueFactory.batch(n_statiques)) + refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME) offset = 0 limit = 2 @@ -288,6 +293,7 @@ def test_read_for_superuser(client_auth, db_session): db_statique = save_statique( db_session, StatiqueFactory.build(id_pdc_itinerance=id_pdc_itinerance) ) + refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME) response = client_auth.get(f"/statique/{id_pdc_itinerance}") assert response.status_code == status.HTTP_200_OK @@ -318,6 +324,7 @@ def test_read_for_user(client_auth, db_session): db_statique = save_statique( db_session, StatiqueFactory.build(id_pdc_itinerance=id_pdc_itinerance) ) + refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME) # User has no assigned operational units response = client_auth.get(f"/statique/{id_pdc_itinerance}") @@ -343,7 +350,12 @@ def test_read_when_statique_does_not_exist(client_auth): response = client_auth.get(f"/statique/{id_pdc_itinerance}") assert response.status_code == status.HTTP_404_NOT_FOUND json_response = response.json() - assert json_response == {"detail": "Requested statique does not exist"} + assert json_response == { + "detail": ( + "Requested statique does not exist yet. You should wait up to " + "10 minutes for a newly created entry." + ) + } @pytest.mark.parametrize( @@ -384,7 +396,7 @@ def test_create_for_superuser(client_auth): assert json_response["items"][0] == id_pdc_itinerance -def test_create_without_optional_fields(client_auth): +def test_create_without_optional_fields(client_auth, db_session): """Test the /statique/ create endpoint when optional fields are not provided.""" id_pdc_itinerance = "FR911E1111ER1" data = Statique( @@ -410,6 +422,7 @@ def test_create_without_optional_fields(client_auth): assert json_response["items"][0] == id_pdc_itinerance # Get created Statique and check defaults + refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME) response = client_auth.get(f"/statique/{id_pdc_itinerance}") assert response.status_code == status.HTTP_200_OK json_response = response.json() diff --git a/src/api/tests/schemas/test_static.py b/src/api/tests/schemas/test_static.py index 0d2e606e..b79c3754 100644 --- a/src/api/tests/schemas/test_static.py +++ b/src/api/tests/schemas/test_static.py @@ -24,6 +24,7 @@ StatiqueFactory, ) from qualicharge.schemas.core import ( + STATIQUE_MV_TABLE_NAME, Amenageur, Localisation, OperationalUnit, @@ -410,7 +411,7 @@ def test_statique_materialized_view(db_session): n_pdc = 4 statiques = StatiqueFactory.batch(n_pdc) save_statiques(db_session, statiques) - refresh_materialized_view(db_session, "statique") + refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME) db_statiques = db_session.exec(select(StatiqueMV)).all() assert len(db_statiques) == n_pdc @@ -421,3 +422,17 @@ def test_statique_materialized_view(db_session): assert isinstance(db_statiques[0].coordonneesXY, WKBElement) assert isinstance(db_statiques[0].pdc_id, UUID) assert isinstance(db_statiques[0].pdc_updated_at, datetime) + + +def test_statique_materialized_view_coordonneesXY_field(db_session): + """Test the coordonneesXY field from the Statique schema.""" + n_pdc = 1 + statiques = StatiqueFactory.batch(n_pdc) + save_statiques(db_session, statiques) + refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME) + + db_statique = db_session.exec(select(StatiqueMV)).one() + assert isinstance(db_statique.coordonneesXY, WKBElement) + crds = Coordinate(**db_statique.model_dump()["coordonneesXY"]) + assert hasattr(crds, "latitude") + assert hasattr(crds, "longitude")