Skip to content

Commit

Permalink
⚡️(api) use the statique materialized view in API endpoints
Browse files Browse the repository at this point in the history
We now use the Statique materialized view to improve API performances.
  • Loading branch information
jmaupetit committed Jan 28, 2025
1 parent 003f9ce commit ebe318a
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 30 deletions.
1 change: 1 addition & 0 deletions src/api/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to
- Improve JSON string parsing using pyarrow engine
- Add default values for optional Statique model fields
- Migrate database enum types from names to values
- Improve API performance by integrating the `Statique` materialized view
- Upgrade alembic to `1.14.1`
- Upgrade geoalchemy2 to `0.17.0`
- Upgrade psycopg to `3.2.4`
Expand Down
67 changes: 38 additions & 29 deletions src/api/qualicharge/api/v1/routers/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
)
from psycopg import Error as PGError
from pydantic import AnyHttpUrl, BaseModel, computed_field
from sqlalchemy import func
from sqlalchemy import any_, func
from sqlalchemy.dialects.postgresql import ARRAY, array
from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError
from sqlalchemy.schema import Column as SAColumn
from sqlmodel import Session, select
Expand All @@ -36,13 +37,11 @@
PermissionDenied,
)
from qualicharge.models.static import Statique
from qualicharge.schemas.core import OperationalUnit, PointDeCharge, Station
from qualicharge.schemas.core import PointDeCharge, StatiqueMV
from qualicharge.schemas.sql import StatiqueImporter
from qualicharge.schemas.utils import (
are_pdcs_allowed_for_user,
build_statique,
is_pdc_allowed_for_user,
list_statique,
save_statique,
update_statique,
)
Expand Down Expand Up @@ -115,34 +114,38 @@ async def list(
),
session: Session = Depends(get_session),
) -> PaginatedStatiqueListResponse:
"""List statique items."""
"""List statique items.
Note that it can take up to 10 minutes for a created Statique item to appear in
this endpoint response.
"""
current_url = request.url
previous_url = next_url = None
total_statement = select(func.count(cast(SAColumn, PointDeCharge.id)))
operational_units = None

total_statement = select(func.count(cast(SAColumn, StatiqueMV.pdc_id)))

ou_filter: array | None = None
if not user.is_superuser:
operational_units = user.operational_units
total_statement = (
total_statement.join_from(
PointDeCharge,
Station,
PointDeCharge.station_id == Station.id, # type: ignore[arg-type]
)
.join_from(
Station,
OperationalUnit,
Station.operational_unit_id == OperationalUnit.id, # type: ignore[arg-type]
)
.where(
cast(SAColumn, OperationalUnit.id).in_(
ou.id for ou in user.operational_units
)
)
# If user has no assigned operational units, we filter on an empty VARCHAR array
ou_filter = array([f"{ou.code}%" for ou in user.operational_units] or [""])

if ou_filter is not None:
total_statement = total_statement.where(
cast(SAColumn, StatiqueMV.id_pdc_itinerance).like(any_(ou_filter))
)
total = session.exec(total_statement).one()

statement = select(StatiqueMV)
if ou_filter is not None:
statement = statement.where(
cast(SAColumn, StatiqueMV.id_pdc_itinerance).like(any_(ou_filter))
)
statement = (
statement.order_by(StatiqueMV.id_pdc_itinerance).offset(offset).limit(limit)
)
statiques = [
statique
for statique in list_statique(session, offset, limit, operational_units)
Statique(**s.model_dump(exclude={"pdc_id", "pdc_updated_at"}))
for s in session.exec(statement).all()
]

previous_offset = offset - limit if offset > limit else 0
Expand Down Expand Up @@ -176,18 +179,24 @@ async def read(
],
session: Session = Depends(get_session),
) -> Statique:
"""Read statique item (point de charge)."""
"""Read statique item (point de charge).
Note that it can take up to 10 minutes for a created Statique item to appear in
this endpoint response.
"""
if not is_pdc_allowed_for_user(id_pdc_itinerance, user):
raise PermissionDenied("You don't manage this point of charge")

try:
statique = build_statique(session, id_pdc_itinerance)
statique_mv = session.exec(
select(StatiqueMV).where(StatiqueMV.id_pdc_itinerance == id_pdc_itinerance)
).one()
except ObjectDoesNotExist as err:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Requested statique does not exist",
) from err
return statique
return Statique(**statique_mv.model_dump(exclude={"pdc_id", "pdc_updated_at"}))


@router.put("/{id_pdc_itinerance}", status_code=status.HTTP_200_OK)
Expand Down
7 changes: 7 additions & 0 deletions src/api/qualicharge/schemas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,13 @@ class StatiqueMV(Statique, SQLModel):
pdc_id: UUID
pdc_updated_at: datetime

# WKBElement to Coordinate
@field_serializer("coordonneesXY")
@staticmethod
def _wkb_to_coordinates(value: WKBElement):
"""Convert WKB to Coordinate."""
return Localisation._wkb_to_coordinates(value)


class _StatiqueMV(SQLModel):
"""Statique Materialized view.
Expand Down
7 changes: 7 additions & 0 deletions src/api/tests/api/v1/routers/test_statique.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pydantic_extra_types.coordinate import Coordinate
from sqlalchemy import Column as SAColumn
from sqlalchemy import func
from sqlalchemy_utils import refresh_materialized_view
from sqlmodel import select

from qualicharge.auth.factories import GroupFactory
Expand All @@ -18,6 +19,7 @@
from qualicharge.factories.static import StatiqueFactory
from qualicharge.models.static import Statique
from qualicharge.schemas.core import (
STATIQUE_MV_TABLE_NAME,
OperationalUnit,
PointDeCharge,
Station,
Expand Down Expand Up @@ -63,6 +65,7 @@ def test_list_for_superuser(client_auth, db_session):
n_statiques = 3
statiques = StatiqueFactory.batch(n_statiques)
save_statiques(db_session, statiques)
refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME)
db_statiques = sorted(
statiques,
key=lambda s: s.id_pdc_itinerance,
Expand Down Expand Up @@ -113,6 +116,7 @@ def test_list_for_user(client_auth, db_session):
# Create statiques
n_statiques = 20
save_statiques(db_session, StatiqueFactory.batch(n_statiques))
refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME)

# Select operational units linked to stations
operational_units = db_session.exec(
Expand Down Expand Up @@ -221,6 +225,7 @@ def test_list_pagination(client_auth, db_session):
"""Test the /statique/ list endpoint results pagination."""
n_statiques = 3
save_statiques(db_session, StatiqueFactory.batch(n_statiques))
refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME)

offset = 0
limit = 2
Expand Down Expand Up @@ -288,6 +293,7 @@ def test_read_for_superuser(client_auth, db_session):
db_statique = save_statique(
db_session, StatiqueFactory.build(id_pdc_itinerance=id_pdc_itinerance)
)
refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME)

response = client_auth.get(f"/statique/{id_pdc_itinerance}")
assert response.status_code == status.HTTP_200_OK
Expand Down Expand Up @@ -318,6 +324,7 @@ def test_read_for_user(client_auth, db_session):
db_statique = save_statique(
db_session, StatiqueFactory.build(id_pdc_itinerance=id_pdc_itinerance)
)
refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME)

# User has no assigned operational units
response = client_auth.get(f"/statique/{id_pdc_itinerance}")
Expand Down
17 changes: 16 additions & 1 deletion src/api/tests/schemas/test_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
StatiqueFactory,
)
from qualicharge.schemas.core import (
STATIQUE_MV_TABLE_NAME,
Amenageur,
Localisation,
OperationalUnit,
Expand Down Expand Up @@ -410,7 +411,7 @@ def test_statique_materialized_view(db_session):
n_pdc = 4
statiques = StatiqueFactory.batch(n_pdc)
save_statiques(db_session, statiques)
refresh_materialized_view(db_session, "statique")
refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME)

db_statiques = db_session.exec(select(StatiqueMV)).all()
assert len(db_statiques) == n_pdc
Expand All @@ -421,3 +422,17 @@ def test_statique_materialized_view(db_session):
assert isinstance(db_statiques[0].coordonneesXY, WKBElement)
assert isinstance(db_statiques[0].pdc_id, UUID)
assert isinstance(db_statiques[0].pdc_updated_at, datetime)


def test_statique_materialized_view_coordonneesXY_field(db_session):
"""Test the coordonneesXY field from the Statique schema."""
n_pdc = 1
statiques = StatiqueFactory.batch(n_pdc)
save_statiques(db_session, statiques)
refresh_materialized_view(db_session, STATIQUE_MV_TABLE_NAME)

db_statique = db_session.exec(select(StatiqueMV)).one()
assert isinstance(db_statique.coordonneesXY, WKBElement)
crds = Coordinate(**db_statique.model_dump()["coordonneesXY"])
assert hasattr(crds, "latitude")
assert hasattr(crds, "longitude")

0 comments on commit ebe318a

Please sign in to comment.