Skip to content

Commit

Permalink
refactor(fire alerts): experiment with ports and adapters approach
Browse files Browse the repository at this point in the history
  • Loading branch information
gtempus committed Jan 31, 2024
1 parent 55c01b1 commit 3a8d4c6
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
line_length = 88
multi_line_output = 3
include_trailing_comma = True
known_third_party = PIL,aenum,affine,aioboto3,async_lru,asyncpg,boto3,botocore,cachetools,fastapi,gino,httpx,httpx_auth,mercantile,numpy,pendulum,pydantic,pytest,rasterio,shapely,sqlalchemy,starlette
known_third_party = PIL,aenum,affine,aioboto3,async_lru,asyncpg,boto3,botocore,cachetools,fastapi,gino,httpx,httpx_auth,mercantile,numpy,pendulum,psycopg2,pydantic,pytest,rasterio,shapely,sqlalchemy,starlette
Empty file added app/adapters/__init__.py
Empty file.
33 changes: 33 additions & 0 deletions app/adapters/data_api_geostore_geometry_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Any, Callable, Coroutine, Dict
from uuid import UUID

from fastapi import HTTPException

from app.domain.ports.geostore_geometry_port import GeostoreGeometryPort
from app.errors import BadResponseError, InvalidResponseError
from app.models.enumerators.geostore import GeostoreOrigin
from app.models.types import Geometry
from app.utils import rw_api


class DataApiGeostoreGeometryAdapter(GeostoreGeometryPort):
async def get_geometry(self, geostore_id, geostore_origin):
geostore_constructor: Dict[
str, Callable[[UUID], Coroutine[Any, Any, Geometry]]
] = {
# GeostoreOrigin.gfw: geostore.get_geostore_geometry,
GeostoreOrigin.rw: rw_api.get_geostore_geometry
}

try:
return await geostore_constructor[geostore_origin](geostore_id)
except KeyError:
raise HTTPException(
status_code=501,
detail=f"Geostore origin {geostore_origin} not fully implemented.",
)

except InvalidResponseError as e:
raise HTTPException(status_code=500, detail=str(e))
except BadResponseError as e:
raise HTTPException(status_code=400, detail=str(e))
32 changes: 4 additions & 28 deletions app/crud/async_db/vector_tiles/filters.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
from typing import Any, Callable, Coroutine, Dict, List, Optional, Union
from uuid import UUID
from typing import Any, Dict, List, Optional, Union

from fastapi import HTTPException
from shapely.geometry import box, shape
from sqlalchemy.sql.elements import TextClause

from ....application import db
from ....errors import BadResponseError, InvalidResponseError
from ....models.enumerators.geostore import GeostoreOrigin
from ....models.types import Bounds, Geometry
from ....utils import rw_api
from ....models.types import Bounds


async def geometry_filter(
geostore_id: Optional[UUID], bounds: Bounds, geostore_origin: str
bounds: Bounds, geometry: Dict[str, Any] | None
) -> Optional[TextClause]:
if geostore_id:
geometry: Geometry = await _get_geostore_geometry(geostore_id, geostore_origin)
if geometry:
envelope = shape(geometry).envelope
if not envelope.intersects(box(*bounds)):
raise HTTPException(
Expand Down Expand Up @@ -78,22 +73,3 @@ def filter_intersects(field, geometry) -> TextClause:
f = f.bindparams(**values)

return f


async def _get_geostore_geometry(geostore_id: UUID, geostore_origin: str) -> Geometry:
geostore_constructor: Dict[str, Callable[[UUID], Coroutine[Any, Any, Geometry]]] = {
# GeostoreOrigin.gfw: geostore.get_geostore_geometry,
GeostoreOrigin.rw: rw_api.get_geostore_geometry
}

try:
return await geostore_constructor[geostore_origin](geostore_id)
except KeyError:
raise HTTPException(
status_code=501,
detail=f"Geostore origin {geostore_origin} not fully implemented.",
)
except InvalidResponseError as e:
raise HTTPException(status_code=500, detail=str(e))
except BadResponseError as e:
raise HTTPException(status_code=400, detail=str(e))
Empty file added app/domain/ports/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions app/domain/ports/geostore_geometry_port.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import Any, Dict


class GeostoreGeometryPort:
async def get_geometry(self, geostore_id, geostore_origin) -> Dict[str, Any]:
pass
Empty file added app/domain/services/__init__.py
Empty file.
11 changes: 11 additions & 0 deletions app/domain/services/geostore_geometry_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from app.domain.ports.geostore_geometry_port import GeostoreGeometryPort


class GeostoreGeometryService:
def __init__(self, geostore_geometry_port: GeostoreGeometryPort):
self.geostore_geometry_port = geostore_geometry_port

async def get_geometry(self, geostore_id, geostore_origin):
return await self.geostore_geometry_port.get_geometry(
geostore_id, geostore_origin
)
55 changes: 55 additions & 0 deletions app/domain/services/nasa_viirs_fire_alerts_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from fastapi import HTTPException
from psycopg2._psycopg import QueryCanceledError

from app.crud.async_db.vector_tiles import nasa_viirs_fire_alerts
from app.crud.async_db.vector_tiles.filters import (
contextual_filter,
date_filter,
geometry_filter,
)
from app.domain.services.geostore_geometry_service import GeostoreGeometryService


class NasaViirsFireAlertsService:
def __init__(self, geostore_geometry_service: GeostoreGeometryService):
self.geostore_geometry_service = geostore_geometry_service

async def get_aggregated_tile(
self,
version,
geostore_id,
geostore_origin,
bbox,
extent,
high_confidence_only,
start_date,
end_date,
contextual_filters,
include_attribute,
):
geometry = None
if geostore_id:
geometry = self.geostore_geometry_service.get_geometry(
geostore_id, geostore_origin
)

filters = [
await geometry_filter(bbox, geometry),
nasa_viirs_fire_alerts.confidence_filter(high_confidence_only),
date_filter("alert__date", start_date, end_date),
] + contextual_filter(**contextual_filters)

# Remove empty filters
filters = [f for f in filters if f is not None]

try:
tile = await nasa_viirs_fire_alerts.get_aggregated_tile(
version, bbox, extent, include_attribute, filters
)
except QueryCanceledError:
raise HTTPException(
status_code=524,
detail="A timeout occurred while processing the request. Request canceled.",
)
else:
return tile
11 changes: 10 additions & 1 deletion app/routes/dynamic_vector_tiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
from sqlalchemy.sql import Select, TableClause
from sqlalchemy.sql.elements import ColumnClause

from ..adapters.data_api_geostore_geometry_adapter import DataApiGeostoreGeometryAdapter
from ..application import db
from ..crud.async_db.vector_tiles import get_mvt_table, get_tile
from ..crud.async_db.vector_tiles.filters import geometry_filter
from ..crud.sync_db.tile_cache_assets import get_attributes
from ..domain.services.geostore_geometry_service import GeostoreGeometryService
from ..models.enumerators.geostore import GeostoreOrigin
from ..models.types import Bounds
from ..responses import VectorTileResponse
Expand Down Expand Up @@ -55,7 +57,14 @@ async def dynamic_vector_tile(

filters: List[TableClause] = list()

geom_filter: TableClause = await geometry_filter(geostore_id, bbox, geostore_origin)
geometry = None
if geostore_id:
geostore_geometry_service = GeostoreGeometryService(
DataApiGeostoreGeometryAdapter
)
geometry = geostore_geometry_service.get_geometry(geostore_id, geostore_origin)

geom_filter: TableClause = await geometry_filter(bbox, geometry)

if geom_filter is not None:
filters.append(geom_filter)
Expand Down
44 changes: 18 additions & 26 deletions app/routes/nasa_viirs_fire_alerts/vector_tiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@

import pendulum
from aenum import Enum, extend_enum
from asyncpg import QueryCanceledError
from fastapi import APIRouter, Depends, HTTPException, Path, Query, Response
from fastapi.responses import ORJSONResponse

from ...crud.async_db.vector_tiles import nasa_viirs_fire_alerts
from ...crud.async_db.vector_tiles.filters import (
contextual_filter,
date_filter,
geometry_filter,
from ...adapters.data_api_geostore_geometry_adapter import (
DataApiGeostoreGeometryAdapter,
)
from ...crud.async_db.vector_tiles.max_date import get_max_date
from ...crud.sync_db.tile_cache_assets import default_end, default_start, get_versions
from ...domain.services.geostore_geometry_service import GeostoreGeometryService
from ...domain.services.nasa_viirs_fire_alerts_service import NasaViirsFireAlertsService
from ...errors import RecordNotFoundError
from ...models.enumerators.geostore import GeostoreOrigin
from ...models.enumerators.nasa_viirs_fire_alerts.supported_attributes import (
Expand Down Expand Up @@ -108,15 +106,6 @@ async def nasa_viirs_fire_alerts_tile(
bbox, _, extent = bbox_z
validate_dates(start_date, end_date, force_date_range)

filters = [
await geometry_filter(geostore_id, bbox, geostore_origin),
nasa_viirs_fire_alerts.confidence_filter(high_confidence_only),
date_filter("alert__date", start_date, end_date),
] + contextual_filter(**contextual_filters)

# Remove empty filters
filters = [f for f in filters if f is not None]

# If one of the default dates is used, we cannot cache the response for long,
# as content might change after next update. For non-default values we can be certain,
# that response will always be the same b/c we only add newer dates
Expand All @@ -128,17 +117,20 @@ async def nasa_viirs_fire_alerts_tile(
else:
response.headers["Cache-Control"] = "max-age=31536000" # 1 year

try:
tile = await nasa_viirs_fire_alerts.get_aggregated_tile(
version, bbox, extent, include_attribute, filters
)
except QueryCanceledError:
raise HTTPException(
status_code=524,
detail="A timeout occurred while processing the request. Request canceled.",
)
else:
return tile
return await NasaViirsFireAlertsService(
GeostoreGeometryService(DataApiGeostoreGeometryAdapter)
).get_aggregated_tile(
version,
geostore_id,
geostore_origin,
bbox,
extent,
high_confidence_only,
start_date,
end_date,
contextual_filters,
include_attribute,
)


@router.get(
Expand Down
21 changes: 19 additions & 2 deletions app/routes/umd_modis_burned_areas/vector_tiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from asyncpg import QueryCanceledError
from fastapi import APIRouter, Depends, HTTPException, Path, Query, Response

from ...adapters.data_api_geostore_geometry_adapter import (
DataApiGeostoreGeometryAdapter,
)
from ...application import db
from ...crud.async_db.vector_tiles import get_mvt_table, get_tile
from ...crud.async_db.vector_tiles.filters import (
Expand All @@ -14,6 +17,7 @@
geometry_filter,
)
from ...crud.sync_db.tile_cache_assets import default_end, default_start, get_versions
from ...domain.services.geostore_geometry_service import GeostoreGeometryService
from ...models.enumerators.geostore import GeostoreOrigin
from ...models.enumerators.tile_caches import TileCacheType
from ...models.enumerators.versions import Versions
Expand All @@ -27,7 +31,11 @@


class UmdModisBurnedAreas(str, Enum):
"""MODIS burned areas versions. When using `latest` call will be redirected (307) to version tagged as latest."""
"""MODIS burned areas versions.
When using `latest` call will be redirected (307) to version tagged
as latest.
"""

latest = "latest"

Expand Down Expand Up @@ -83,8 +91,17 @@ async def umd_modis_burned_areas_tile(
bbox, z, extent = bbox_z
validate_dates(start_date, end_date, force_date_range)

geometry = None
if geostore_id:
geostore_geometry_service = GeostoreGeometryService(
DataApiGeostoreGeometryAdapter
)
geometry = await geostore_geometry_service.get_geometry(
geostore_id, geostore_origin
)

filters = [
await geometry_filter(geostore_id, bbox, geostore_origin),
await geometry_filter(bbox, geometry),
date_filter("alert__date", start_date, end_date),
]

Expand Down

0 comments on commit 3a8d4c6

Please sign in to comment.