Skip to content

Commit

Permalink
🔖(prefect) add static and session indicators
Browse files Browse the repository at this point in the history
- add indicators i4, i7
- add indicators u10, u11, u12, u13, u9, u5, u6
- add indicators c1, c2
- modif : timespan, tests, national, statique
  • Loading branch information
philippe thomy committed Feb 1, 2025
1 parent ce08da3 commit 5473f46
Show file tree
Hide file tree
Showing 34 changed files with 3,380 additions and 234 deletions.
15 changes: 15 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
}
]
}
17 changes: 15 additions & 2 deletions src/prefect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,20 @@ and this project adheres to

### Added

- Implement t1 workflow
- Implement i1 workflow
- Implement i4 workflow
- Implement i7 workflow
- Implement c1 workflow
- Implement c2 workflow
- Implement u5 workflow
- Implement u6 workflow
- Implement u9 workflow
- Implement u10 workflow
- Implement u11 workflow
- Implement u12 workflow
- Implement u13 workflow

### Updated

- use Statique table instead of PointDeCharge table

[unreleased]: https://github.com/MTES-MCT/qualicharge/
64 changes: 29 additions & 35 deletions src/prefect/indicators/infrastructure/i1.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,37 @@
I1: the number of publicly open points of charge.
"""

from datetime import datetime
from string import Template
from typing import List
from uuid import UUID

import numpy as np
import pandas as pd # type: ignore
from prefect import flow, runtime, task
from prefect.artifacts import create_markdown_artifact
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
from sqlalchemy import text
from sqlalchemy.engine import Connection

from ..conf import settings
from ..models import Indicator, IndicatorPeriod, Level
from ..models import Indicator, IndicatorTimeSpan, Level
from ..utils import (
export_indic,
get_database_engine,
get_num_for_level_query_params,
get_targets_for_level,
)

NUM_POCS_FOR_LEVEL_QUERY_TEMPLATE = """
SELECT
COUNT(DISTINCT PointDeCharge.id_pdc_itinerance) AS value,
COUNT(DISTINCT id_pdc_itinerance) AS value,
$level_id AS level_id
FROM
PointDeCharge
INNER JOIN Station ON PointDeCharge.station_id = Station.id
INNER JOIN Localisation ON Station.localisation_id = Localisation.id
INNER JOIN City ON Localisation.code_insee_commune = City.code
Statique
--PointDeCharge
--INNER JOIN Station ON PointDeCharge.station_id = Station.id
--INNER JOIN Localisation ON Station.localisation_id = Localisation.id
INNER JOIN City ON code_insee_commune = City.code
$join_extras
WHERE $level_id IN ($indexes)
GROUP BY $level_id
Expand All @@ -46,22 +46,23 @@ def get_values_for_targets(
) -> pd.DataFrame:
"""Fetch points of charge given input level and target index."""
query_template = Template(NUM_POCS_FOR_LEVEL_QUERY_TEMPLATE)
query_params: dict = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))}
query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))}
query_params |= get_num_for_level_query_params(level)
return pd.read_sql_query(query_template.substitute(query_params), con=connection)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="i1-{period.value}-{level:02d}-{at:%y-%m-%d}",
flow_run_name="i1-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}",
)
def i1_for_level(
level: Level,
period: IndicatorPeriod,
at: datetime,
timespan: IndicatorTimeSpan,
chunk_size=settings.DEFAULT_CHUNK_SIZE,
) -> pd.DataFrame:
"""Calculate i1 for a level."""
if level == Level.NATIONAL:
return i1_national(timespan)
engine = get_database_engine()
with engine.connect() as connection:
targets = get_targets_for_level(connection, level)
Expand All @@ -87,8 +88,8 @@ def i1_for_level(
"value": merged["value"].fillna(0),
"code": "i1",
"level": level,
"period": period,
"timestamp": at,
"period": timespan.period,
"timestamp": timespan.start.isoformat(),
"category": None,
"extras": None,
}
Expand All @@ -97,9 +98,9 @@ def i1_for_level(

@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="i1-{period.value}-00-{at:%y-%m-%d}",
flow_run_name="i1-{timespan.period.value}-00-{timespan.start:%y-%m-%d}",
)
def i1_national(period: IndicatorPeriod, at: datetime) -> pd.DataFrame:
def i1_national(timespan: IndicatorTimeSpan) -> pd.DataFrame:
"""Calculate i1 at the national level."""
engine = get_database_engine()
with engine.connect() as connection:
Expand All @@ -111,37 +112,30 @@ def i1_national(period: IndicatorPeriod, at: datetime) -> pd.DataFrame:
Indicator(
code="i1",
level=Level.NATIONAL,
period=period,
period=timespan.period,
value=count,
timestamp=at,
timestamp=timespan.start.isoformat(),
).model_dump(),
]
)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="meta-i1-{period.value}",
flow_run_name="meta-i1-{timespan.period.value}",
)
def calculate(
period: IndicatorPeriod, create_artifact: bool = False, chunk_size: int = 1000
timespan: IndicatorTimeSpan,
levels: List[Level],
create_artifact: bool = False,
chunk_size: int = 1000,
format_pd: bool = False,
) -> List[Indicator]:
"""Run all i1 subflows."""
now = pd.Timestamp.now()
subflows_results = [
i1_national(period, now),
i1_for_level(Level.REGION, period, now, chunk_size=chunk_size),
i1_for_level(Level.DEPARTMENT, period, now, chunk_size=chunk_size),
i1_for_level(Level.EPCI, period, now, chunk_size=chunk_size),
i1_for_level(Level.CITY, period, now, chunk_size=chunk_size),
i1_for_level(level, timespan, chunk_size=chunk_size) for level in levels
]
indicators = pd.concat(subflows_results, ignore_index=True)

if create_artifact:
create_markdown_artifact(
key=runtime.flow_run.name,
markdown=indicators.to_markdown(),
description=f"i1 report at {now} (period: {period.value})",
)

return [Indicator(**record) for record in indicators.to_dict(orient="records")] # type: ignore[misc]
description = f"i1 report at {timespan.start} (period: {timespan.period.value})"
flow_name = runtime.flow_run.name
return export_indic(indicators, create_artifact, flow_name, description, format_pd)
139 changes: 139 additions & 0 deletions src/prefect/indicators/infrastructure/i4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""QualiCharge prefect indicators: infrastructure.
I4: the number of publicly open stations.
"""

from string import Template
from typing import List
from uuid import UUID

import numpy as np
import pandas as pd # type: ignore
from prefect import flow, runtime, task
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
from sqlalchemy import text
from sqlalchemy.engine import Connection

from ..conf import settings
from ..models import Indicator, IndicatorTimeSpan, Level
from ..utils import (
export_indic,
get_database_engine,
get_num_for_level_query_params,
get_targets_for_level,
)

NUM_STATIONS_FOR_LEVEL_QUERY_TEMPLATE = """
SELECT
COUNT(DISTINCT id_station_itinerance) AS value,
$level_id AS level_id
FROM
Station
INNER JOIN Localisation ON Station.localisation_id = Localisation.id
INNER JOIN City ON Localisation.code_insee_commune = City.code
$join_extras
WHERE $level_id IN ($indexes)
GROUP BY $level_id
"""


@task(task_run_name="values-for-target-{level:02d}")
def get_values_for_targets(
connection: Connection, level: Level, indexes: List[UUID]
) -> pd.DataFrame:
"""Fetch station given input level and target index."""
query_template = Template(NUM_STATIONS_FOR_LEVEL_QUERY_TEMPLATE)
query_params: dict = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))}
query_params |= get_num_for_level_query_params(level)
return pd.read_sql_query(query_template.substitute(query_params), con=connection)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="i4-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}",
)
def i4_for_level(
level: Level,
timespan: IndicatorTimeSpan,
chunk_size=settings.DEFAULT_CHUNK_SIZE,
) -> pd.DataFrame:
"""Calculate i4 for a level."""
if level == Level.NATIONAL:
return i4_national(timespan)
engine = get_database_engine()
with engine.connect() as connection:
targets = get_targets_for_level(connection, level)
ids = targets["id"]
chunks = (
np.array_split(ids, int(len(ids) / chunk_size))
if len(ids) > chunk_size
else [ids.to_numpy()]
)
futures = [
get_values_for_targets.submit(connection, level, chunk) # type: ignore[call-overload]
for chunk in chunks
]
wait(futures)

# Concatenate results and serialize indicators
results = pd.concat([future.result() for future in futures], ignore_index=True)
merged = targets.merge(results, how="left", left_on="id", right_on="level_id")

# Build result DataFrame
indicators = {
"target": merged["code"],
"value": merged["value"].fillna(0),
"code": "i4",
"level": level,
"period": timespan.period,
"timestamp": timespan.start.isoformat(),
"category": None,
"extras": None,
}
return pd.DataFrame(indicators)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="i4-{timespan.period.value}-00-{timespan.start:%y-%m-%d}",
)
def i4_national(timespan: IndicatorTimeSpan) -> pd.DataFrame:
"""Calculate i4 at the national level."""
engine = get_database_engine()
with engine.connect() as connection:
result = connection.execute(text("SELECT COUNT(*) FROM Station"))
count = result.one()[0]

return pd.DataFrame.from_records(
[
Indicator(
code="i4",
level=Level.NATIONAL,
period=timespan.period,
value=count,
timestamp=timespan.start.isoformat(),
).model_dump(),
]
)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="meta-i4-{timespan.period.value}",
)
def calculate(
timespan: IndicatorTimeSpan,
levels: List[Level],
create_artifact: bool = False,
chunk_size: int = 1000,
format_pd: bool = False,
) -> List[Indicator]:
"""Run all i4 subflows."""
subflows_results = [
i4_for_level(level, timespan, chunk_size=chunk_size) for level in levels
]
indicators = pd.concat(subflows_results, ignore_index=True)
description = f"i4 report at {timespan.start} (period: {timespan.period.value})"
flow_name = runtime.flow_run.name
return export_indic(indicators, create_artifact, flow_name, description, format_pd)
Loading

0 comments on commit 5473f46

Please sign in to comment.