Skip to content

Commit

Permalink
add u5
Browse files Browse the repository at this point in the history
  • Loading branch information
philippe thomy committed Jan 29, 2025
1 parent 008dfcd commit 7ebd34b
Show file tree
Hide file tree
Showing 16 changed files with 672 additions and 65 deletions.
2 changes: 2 additions & 0 deletions src/prefect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ and this project adheres to

- Implement i4 workflow
- Implement i7 workflow
- Implement u5 workflow
- Implement u9 workflow
- Implement u10 workflow
- Implement u11 workflow
- Implement u12 workflow
- Implement u13 workflow

[unreleased]: https://github.com/MTES-MCT/qualicharge/
1 change: 0 additions & 1 deletion src/prefect/indicators/infrastructure/t1.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def t1_national(timespan: IndicatorTimeSpan) -> pd.DataFrame:
query_template = Template(QUERY_NATIONAL_TEMPLATE)
query_params = POWER_RANGE_CTE
with engine.connect() as connection:
# result = pd.read_sql_query(QUERY_NATIONAL, con=connection)
res = pd.read_sql_query(query_template.substitute(query_params), con=connection)
indicators = {
"target": None,
Expand Down
179 changes: 179 additions & 0 deletions src/prefect/indicators/usage/u13.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""QualiCharge prefect indicators: usage.
U13: cumulative power of POC in operation.
"""

from string import Template
from typing import List
from uuid import UUID

import numpy as np
import pandas as pd # type: ignore
from prefect import flow, runtime, task
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
from sqlalchemy.engine import Connection

from ..conf import settings
from ..models import Indicator, IndicatorTimeSpan, Level
from ..utils import (
POWER_RANGE_CTE,
export_indic,
get_database_engine,
get_num_for_level_query_params,
get_targets_for_level,
get_timespan_filter_query_params,
)

POWER_POC_IN_OPERATION_TEMPLATE = """
WITH
statusf AS (
SELECT
point_de_charge_id
FROM
status
WHERE
$timespan
GROUP BY
point_de_charge_id
)
SELECT
sum(puissance_nominale) AS value,
$level_id AS level_id
FROM
statusf
INNER JOIN PointDeCharge ON statusf.point_de_charge_id = PointDeCharge.id
LEFT JOIN Station ON station_id = Station.id
LEFT JOIN Localisation ON localisation_id = Localisation.id
LEFT JOIN City ON City.code = code_insee_commune
$join_extras
WHERE
$level_id IN ($indexes)
GROUP BY
$level_id
"""
QUERY_NATIONAL_TEMPLATE = """
WITH
statusf AS (
SELECT
point_de_charge_id
FROM
status
WHERE
$timespan
GROUP BY
point_de_charge_id
)
SELECT
sum(puissance_nominale) AS value
FROM
statusf
INNER JOIN PointDeCharge ON statusf.point_de_charge_id = PointDeCharge.id
"""


@task(task_run_name="values-for-target-{level:02d}")
def get_values_for_targets(
connection: Connection,
level: Level,
timespan: IndicatorTimeSpan,
indexes: List[UUID],
) -> pd.DataFrame:
"""Fetch sessions given input level, timestamp and target index."""
query_template = Template(POWER_POC_IN_OPERATION_TEMPLATE)
query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))}
query_params |= POWER_RANGE_CTE
query_params |= get_num_for_level_query_params(level)
query_params |= get_timespan_filter_query_params(timespan, session=False)
return pd.read_sql_query(query_template.substitute(query_params), con=connection)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="u13-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}",
)
def u13_for_level(
level: Level,
timespan: IndicatorTimeSpan,
chunk_size=settings.DEFAULT_CHUNK_SIZE,
) -> pd.DataFrame:
"""Calculate u13 for a level and a timestamp."""
if level == Level.NATIONAL:
return u13_national(timespan)
engine = get_database_engine()
with engine.connect() as connection:
targets = get_targets_for_level(connection, level)
ids = targets["id"]
chunks = (
np.array_split(ids, int(len(ids) / chunk_size))
if len(ids) > chunk_size
else [ids.to_numpy()]
)
futures = [
get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload]
for chunk in chunks
]
wait(futures)

# Concatenate results and serialize indicators
results = pd.concat([future.result() for future in futures], ignore_index=True)
merged = targets.merge(results, how="left", left_on="id", right_on="level_id")

# Build result DataFrame
indicators = {
"target": merged["code"],
"value": merged["value"].fillna(0),
"code": "u13",
"level": level,
"period": timespan.period,
"timestamp": timespan.start.isoformat(),
"category": None,
"extras": None,
}
return pd.DataFrame(indicators)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="u13-{timespan.period.value}-00-{timespan.start:%y-%m-%d}",
)
def u13_national(timespan: IndicatorTimeSpan) -> pd.DataFrame:
"""Calculate u13 at the national level."""
engine = get_database_engine()
query_template = Template(QUERY_NATIONAL_TEMPLATE)
query_params = get_timespan_filter_query_params(timespan, session=False)
query_params |= POWER_RANGE_CTE
with engine.connect() as connection:
res = pd.read_sql_query(query_template.substitute(query_params), con=connection)
indicators = {
"target": None,
"value": res["value"].fillna(0),
"code": "u13",
"level": Level.NATIONAL,
"period": timespan.period,
"timestamp": timespan.start.isoformat(),
"category": None,
"extras": None,
}
return pd.DataFrame(indicators)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="meta-u13-{timespan.period.value}",
)
def calculate(
timespan: IndicatorTimeSpan,
levels: List[Level] = [Level.NATIONAL, Level.REGION],
create_artifact: bool = False,
chunk_size: int = 1000,
format_pd: bool = False,
) -> List[Indicator]:
"""Run all u13 subflows."""
subflows_results = [
u13_for_level(level, timespan, chunk_size=chunk_size) for level in levels
]
indicators = pd.concat(subflows_results, ignore_index=True)
description = f"u13 report at {timespan.start} (period: {timespan.period.value})"
flow_name = runtime.flow_run.name
return export_indic(indicators, create_artifact, flow_name, description, format_pd)
162 changes: 162 additions & 0 deletions src/prefect/indicators/usage/u5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""QualiCharge prefect indicators: usage.
U5: Hourly distribution of sessions (number).
"""

from string import Template
from typing import List
from uuid import UUID

import numpy as np
import pandas as pd # type: ignore
from prefect import flow, runtime, task
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
from sqlalchemy.engine import Connection

from ..conf import settings
from ..models import Indicator, IndicatorTimeSpan, Level
from ..utils import (
export_indic,
get_database_engine,
get_num_for_level_query_params,
get_targets_for_level,
get_timespan_filter_query_params,
)

HOURLY_SESSIONS_QUERY_TEMPLATE = """
SELECT
count(Session.id) AS value,
extract(HOUR FROM start) AS category,
$level_id AS level_id
FROM
Session
INNER JOIN PointDeCharge ON point_de_charge_id = PointDeCharge.id
LEFT JOIN Station ON station_id = Station.id
LEFT JOIN Localisation ON localisation_id = Localisation.id
LEFT JOIN City ON City.code = code_insee_commune
$join_extras
WHERE
$timespan
AND $level_id IN ($indexes)
GROUP BY
category,
$level_id
"""

QUERY_NATIONAL_TEMPLATE = """
SELECT
count(Session.id) AS value,
extract(HOUR FROM start) AS category
FROM
SESSION
WHERE
$timespan
GROUP BY
category
"""


@task(task_run_name="values-for-target-{level:02d}")
def get_values_for_targets(
connection: Connection,
level: Level,
timespan: IndicatorTimeSpan,
indexes: List[UUID],
) -> pd.DataFrame:
"""Fetch sessions given input level, timestamp and target index."""
query_template = Template(HOURLY_SESSIONS_QUERY_TEMPLATE)
query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))}
query_params |= get_num_for_level_query_params(level)
query_params |= get_timespan_filter_query_params(timespan, session=True)
return pd.read_sql_query(query_template.substitute(query_params), con=connection)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="u5-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}",
)
def u5_for_level(
level: Level,
timespan: IndicatorTimeSpan,
chunk_size=settings.DEFAULT_CHUNK_SIZE,
) -> pd.DataFrame:
"""Calculate u5 for a level and a timestamp."""
if level == Level.NATIONAL:
return u5_national(timespan)
engine = get_database_engine()
with engine.connect() as connection:
targets = get_targets_for_level(connection, level)
ids = targets["id"]
chunks = (
np.array_split(ids, int(len(ids) / chunk_size))
if len(ids) > chunk_size
else [ids.to_numpy()]
)
futures = [
get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload]
for chunk in chunks
]
wait(futures)

# Concatenate results and serialize indicators
results = pd.concat([future.result() for future in futures], ignore_index=True)
merged = targets.merge(results, how="left", left_on="id", right_on="level_id")

# Build result DataFrame
indicators = {
"target": merged["code"],
"value": merged["value"].fillna(0),
"code": "u5",
"level": level,
"period": timespan.period,
"timestamp": timespan.start.isoformat(),
"category": merged["category"].astype("str"),
"extras": None,
}
return pd.DataFrame(indicators)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="u5-{timespan.period.value}-00-{timespan.start:%y-%m-%d}",
)
def u5_national(timespan: IndicatorTimeSpan) -> pd.DataFrame:
"""Calculate u5 at the national level."""
engine = get_database_engine()
query_template = Template(QUERY_NATIONAL_TEMPLATE)
query_params = get_timespan_filter_query_params(timespan, session=True)
with engine.connect() as connection:
res = pd.read_sql_query(query_template.substitute(query_params), con=connection)
indicators = {
"target": None,
"value": res["value"].fillna(0),
"code": "u5",
"level": Level.NATIONAL,
"period": timespan.period,
"timestamp": timespan.start.isoformat(),
"category": res["category"].astype("str"),
"extras": None,
}
return pd.DataFrame(indicators)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="meta-u5-{timespan.period.value}",
)
def calculate(
timespan: IndicatorTimeSpan,
levels: List[Level],
create_artifact: bool = False,
chunk_size: int = 1000,
format_pd: bool = False,
) -> List[Indicator]:
"""Run all u5 subflows."""
subflows_results = [
u5_for_level(level, timespan, chunk_size=chunk_size) for level in levels
]
indicators = pd.concat(subflows_results, ignore_index=True)
description = f"u5 report at {timespan.start} (period: {timespan.period.value})"
flow_name = runtime.flow_run.name
return export_indic(indicators, create_artifact, flow_name, description, format_pd)
14 changes: 14 additions & 0 deletions src/prefect/prefect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ deployments:
work_pool:
name: indicators
work_queue_name: default
- name: u5-daily
entrypoint: indicators/run.py:u9_calculate
concurrency_limit: 10
schedules:
- cron: "4 15 * * *"
timezone: "Europe/Paris"
active: true
parameters:
period: d
create_artifact: true
chunk_size: 1000
work_pool:
name: indicators
work_queue_name: default
- name: u9-daily
entrypoint: indicators/run.py:u9_calculate
concurrency_limit: 10
Expand Down
Loading

0 comments on commit 7ebd34b

Please sign in to comment.