Skip to content

Commit 9a6e8b0

Browse files
author
herve.le-bars
committed
feat: migrate compute_port_geometry
Signed-off-by: herve.le-bars <[email protected]>
1 parent 9eb4364 commit 9a6e8b0

File tree

7 files changed

+168
-165
lines changed

7 files changed

+168
-165
lines changed

.env.template

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@ LOGGING_LEVEL=INFO
1919

2020
AMP_DATA_CSV_PATH=
2121
PORT_DATA_CSV_PATH=
22+
PORT_POLYGON_DATA_CSV_PATH=
2223
VESSEL_DATA_CSV_PATH=
2324

2425
VESSEL_POSITIONS_DATA_CSV_PATH=
2526

27+
#PORT_RADIUS_M=3000
28+
#PORT_RESOLUTION=10
29+
2630

2731
###############################################################################################
2832
# DOCKER SPECIFIC VARIABLES

src/bloom/config.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ class Settings(BaseSettings):
5151
)
5252

5353
amp_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/zones_subset.csv")
54-
port_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/ports_rad3000_res10.csv")
54+
port_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/ports.csv")
55+
port_polygon_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/ports_rad3000_res10.csv")
56+
port_radius_m:int = 3000 # Radius in meters
57+
port_resolution:int = 10 # Number of points in the resulting polygon
5558
vessel_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/chalutiers_pelagiques.csv")
5659

5760

src/bloom/infra/repositories/repository_port.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,11 @@ def get_empty_geometry_buffer_ports(self, session: Session) -> list[Port]:
4040
return [PortRepository.map_to_domain(entity) for entity in q]
4141

4242
def get_ports_updated_created_after(self, session: Session, created_updated_after: datetime) -> list[Port]:
43-
stmt = select(sql_model.Port).where(or_(sql_model.Port.created_at >= created_updated_after,
44-
sql_model.Port.updated_at >= created_updated_after))
43+
if created_updated_after is None:
44+
stmt = select(sql_model.Port)
45+
else:
46+
stmt = select(sql_model.Port).where(or_(sql_model.Port.created_at >= created_updated_after,
47+
sql_model.Port.updated_at >= created_updated_after))
4548
q = session.execute(stmt).scalars()
4649
if not q:
4750
return []

src/bloom/tasks/compute_port_geometry_buffer.py

Lines changed: 0 additions & 129 deletions
This file was deleted.

src/tasks/data/load_port_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def run(self,*args,**kwargs):
1515
engine = create_engine(settings.db_url, echo=False)
1616

1717
df = pd.read_csv(
18-
settings.port_data_csv_path,
18+
settings.port_polygon_data_csv_path,
1919
sep=";",
2020
)
2121

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from tasks.base import BaseTask
22
from bloom.config import settings
3-
import logging
43

54
from pathlib import Path
65
from time import perf_counter
@@ -15,12 +14,10 @@
1514
from pydantic import ValidationError
1615
from shapely import wkt
1716

18-
logging.basicConfig()
19-
logging.getLogger("bloom.tasks").setLevel(settings.logging_level)
2017

2118

2219
class LoadDimPortFromCsv(BaseTask):
23-
def map_to_domain(row) -> Port:
20+
def map_to_domain(self,row) -> Port:
2421
iso_code = pycountry.countries.get(name=row["country"])
2522
iso_code = iso_code.alpha_3 if iso_code is not None else "XXX"
2623

@@ -33,38 +30,34 @@ def map_to_domain(row) -> Port:
3330
longitude=float(row["longitude"]),
3431
geometry_point=row["geometry_point"],
3532
)
36-
def run(self):
37-
38-
39-
def run(csv_file_name: str) -> None:
40-
use_cases = UseCases()
41-
port_repository = use_cases.port_repository()
42-
db = use_cases.db()
43-
44-
ports = []
45-
total = 0
46-
try:
47-
df = pd.read_csv(csv_file_name, sep=";")
48-
df["geometry_point"] = df["geometry_point"].apply(wkt.loads)
49-
gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid)
50-
ports = gdf.apply(map_to_domain, axis=1)
51-
with db.session() as session:
52-
ports = port_repository.batch_create_port(session, list(ports))
53-
session.commit()
54-
total = len(ports)
55-
except ValidationError as e:
56-
logger.error("Erreur de validation des données de port")
57-
logger.error(e.errors())
58-
except DBException as e:
59-
logger.error("Erreur d'insertion en base")
60-
logger.info(f"{total} ports(s) créés")
33+
def run(self,*args,**kwargs):
34+
use_cases = UseCases()
35+
port_repository = use_cases.port_repository()
36+
db = use_cases.db()
37+
38+
ports = []
39+
total = 0
40+
try:
41+
df = pd.read_csv(settings.port_data_csv_path, sep=";")
42+
df["geometry_point"] = df["geometry_point"].apply(wkt.loads)
43+
gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid)
44+
ports = gdf.apply(self.map_to_domain, axis=1)
45+
with db.session() as session:
46+
ports = port_repository.batch_create_port(session, list(ports))
47+
session.commit()
48+
total = len(ports)
49+
except ValidationError as e:
50+
logger.error("Erreur de validation des données de port")
51+
logger.error(e.errors())
52+
except DBException as e:
53+
logger.error("Erreur d'insertion en base")
54+
logger.info(f"{total} ports(s) créés")
6155

6256

6357
if __name__ == "__main__":
6458
time_start = perf_counter()
65-
file_name = Path(settings.data_folder).joinpath("./ports.csv")
66-
logger.info(f"DEBUT - Chargement des données de ports depuis le fichier {file_name}")
67-
run(file_name)
59+
logger.info(f"DEBUT - Chargement des données de ports depuis le fichier {settings.port_data_csv_path}")
60+
LoadDimPortFromCsv().start()
6861
time_end = perf_counter()
6962
duration = time_end - time_start
7063
logger.info(f"FIN - Chargement des données de ports en {duration:.2f}s")
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from time import perf_counter
2+
from tasks.base import BaseTask
3+
import geopandas as gpd
4+
import pandas as pd
5+
import pyproj
6+
import shapely
7+
from bloom.config import settings
8+
from bloom.container import UseCases
9+
from bloom.logger import logger
10+
from scipy.spatial import Voronoi
11+
from shapely.geometry import LineString, Polygon
12+
from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository
13+
from datetime import datetime, timezone
14+
15+
16+
17+
class ComputePort_GeometryBuffer(BaseTask):
18+
radius_m = settings.port_radius_m # Radius in meters
19+
resolution = settings.port_resolution # Number of points in the resulting polygon
20+
21+
# Function to create geodesic buffer around a point
22+
def geodesic_point_buffer(lat: float, lon: float, radius_m: int, resolution: int) -> Polygon:
23+
"""
24+
Input
25+
lat: latitude of the center point
26+
lon: longitude of the center point
27+
radius_m: radius of the buffer in meters
28+
resolution: number of points in the resulting polygon
29+
"""
30+
geod = pyproj.Geod(ellps="WGS84") # Define the ellipsoid
31+
# Create a circle in geodesic coordinates
32+
angles = range(0, 360, 360 // resolution)
33+
circle_points = []
34+
for angle in angles:
35+
# Calculate the point on the circle for this angle
36+
lon2, lat2, _ = geod.fwd(lon, lat, angle, radius_m)
37+
circle_points.append((lon2, lat2))
38+
# Create a polygon from these points
39+
return Polygon(circle_points)
40+
41+
def assign_voronoi_buffer(self,ports: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
42+
"""Computes a buffer around each port such as buffers do not overlap each other
43+
44+
:param gpd.GeoDataFrame ports: fields "id", "latitude", "longitude", "geometry_point"
45+
from the table "dim_ports"
46+
:return gpd.GeoDataFrame: same as input but field "geometry_point" is replaced
47+
by "geometry_buffer"
48+
"""
49+
# Convert to CRS 6933 to write distances as meters (instead of degrees)
50+
ports.to_crs(6933, inplace=True)
51+
52+
# Create an 8km buffer around each port
53+
# FIXME: maybe put the buffer distance as a global parameter
54+
ports["buffer"] = ports["geometry_point"].apply(lambda p: shapely.buffer(p, 8000))
55+
56+
# Convert points back to CRS 4326 (lat/lon) --> buffers are still expressed in 6933
57+
ports.to_crs(settings.srid, inplace=True)
58+
59+
# Convert buffers to 4326
60+
ports = gpd.GeoDataFrame(ports, geometry="buffer", crs=6933).to_crs(settings.srid)
61+
62+
# Create Voronoi polygons, i.e. match each port to the area which is closest to this port
63+
vor = Voronoi(list(zip(ports.longitude, ports.latitude)))
64+
lines = []
65+
for line in vor.ridge_vertices:
66+
if -1 not in line:
67+
lines.append(LineString(vor.vertices[line]))
68+
else:
69+
lines.append(LineString())
70+
vor_polys = [poly for poly in shapely.ops.polygonize(lines)]
71+
72+
# Match each port to its Voronoi polygon
73+
vor_gdf = gpd.GeoDataFrame(geometry=gpd.GeoSeries(vor_polys), crs=4326)
74+
vor_gdf["voronoi_poly"] = vor_gdf["geometry"].copy()
75+
ports = gpd.GeoDataFrame(ports, geometry="geometry_point", crs=4326)
76+
vor_ports = ports.sjoin(vor_gdf, how="left").drop(columns=["index_right"])
77+
78+
# Corner case: ports at extremes latitude and longitude have no Voroni polygon
79+
# (15 / >5800) cases --> duplicate regular buffer instead
80+
vor_ports.loc[vor_ports.voronoi_poly.isna(), "voronoi_poly"] = vor_ports.loc[
81+
vor_ports.voronoi_poly.isna(), "buffer"
82+
]
83+
84+
# Get intersection between fixed-size buffer and Voronoi polygon to get final buffer
85+
vor_ports["geometry_buffer"] = vor_ports.apply(
86+
lambda row: shapely.intersection(row["buffer"], row["voronoi_poly"]),
87+
axis=1
88+
)
89+
vor_ports["buffer_voronoi"] = vor_ports["geometry_buffer"].copy()
90+
vor_ports = gpd.GeoDataFrame(vor_ports, geometry="geometry_buffer", crs=4326)
91+
vor_ports = vor_ports[["id", "latitude", "longitude", "geometry_buffer"]].copy()
92+
93+
return vor_ports
94+
95+
96+
def run(self,*args,**kwargs)-> None:
97+
use_cases = UseCases()
98+
port_repository = use_cases.port_repository()
99+
db = use_cases.db()
100+
items = []
101+
with db.session() as session:
102+
point_in_time = TaskExecutionRepository.get_point_in_time(session, "compute_port_geometry_buffer")
103+
logger.info(f"Point in time={point_in_time}")
104+
now = datetime.now(timezone.utc)
105+
ports = port_repository.get_ports_updated_created_after(session, point_in_time)
106+
if ports:
107+
df = pd.DataFrame(
108+
[[p.id, p.geometry_point, p.latitude, p.longitude] for p in ports],
109+
columns=["id", "geometry_point", "latitude", "longitude"],
110+
)
111+
gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid)
112+
113+
# Apply the buffer function to create buffers
114+
gdf = self.assign_voronoi_buffer(gdf)
115+
116+
for row in gdf.itertuples():
117+
items.append({"id": row.id, "geometry_buffer": row.geometry_buffer})
118+
port_repository.batch_update_geometry_buffer(session, items)
119+
TaskExecutionRepository.set_point_in_time(session, "compute_port_geometry_buffer", now)
120+
session.commit()
121+
logger.info(f"{len(items)} buffer de ports mis à jour")
122+
123+
if __name__ == "__main__":
124+
time_start = perf_counter()
125+
logger.info("DEBUT - Calcul des buffer de ports")
126+
ComputePort_GeometryBuffer().start()
127+
time_end = perf_counter()
128+
duration = time_end - time_start
129+
logger.info(f"FIN - Calcul des buffer de ports en {duration:.2f}s")

0 commit comments

Comments
 (0)