Skip to content

Commit 3dd378d

Browse files
philippe thomyloco-philippe
authored andcommitted
add u6
1 parent bc2617c commit 3dd378d

File tree

5 files changed

+369
-0
lines changed

5 files changed

+369
-0
lines changed

src/prefect/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ and this project adheres to
1313
- Implement i4 workflow
1414
- Implement i7 workflow
1515
- Implement u5 workflow
16+
- Implement u6 workflow
1617
- Implement u9 workflow
1718
- Implement u10 workflow
1819
- Implement u11 workflow

src/prefect/indicators/usage/u6.py

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
"""QualiCharge prefect indicators: usage.
2+
3+
U6: session duration by power category.
4+
"""
5+
6+
from string import Template
7+
from typing import List
8+
from uuid import UUID
9+
10+
import numpy as np
11+
import pandas as pd # type: ignore
12+
from prefect import flow, runtime, task
13+
from prefect.futures import wait
14+
from prefect.task_runners import ThreadPoolTaskRunner
15+
from sqlalchemy.engine import Connection
16+
17+
from ..conf import settings
18+
from ..models import Indicator, IndicatorTimeSpan, Level
19+
from ..utils import (
20+
POWER_RANGE_CTE,
21+
export_indic,
22+
get_database_engine,
23+
get_num_for_level_query_params,
24+
get_targets_for_level,
25+
get_timespan_filter_query_params,
26+
)
27+
28+
DURATION_FOR_LEVEL_QUERY_TEMPLATE = """
29+
WITH
30+
$power_range,
31+
sessionf AS (
32+
SELECT
33+
point_de_charge_id,
34+
sum(session.end -session.start) as duree_pdc
35+
FROM
36+
session
37+
WHERE
38+
$timespan
39+
GROUP BY
40+
point_de_charge_id
41+
)
42+
SELECT
43+
extract ('epoch' from sum(duree_pdc)) / 3600.0 AS value,
44+
category,
45+
$level_id AS level_id
46+
FROM
47+
sessionf
48+
INNER JOIN PointDeCharge ON sessionf.point_de_charge_id = PointDeCharge.id
49+
LEFT JOIN Station ON station_id = Station.id
50+
LEFT JOIN Localisation ON localisation_id = Localisation.id
51+
LEFT JOIN City ON City.code = code_insee_commune
52+
LEFT JOIN puissance ON puissance_nominale::numeric <@ category
53+
$join_extras
54+
WHERE
55+
$level_id IN ($indexes)
56+
GROUP BY
57+
$level_id,
58+
category
59+
"""
60+
61+
QUERY_NATIONAL_TEMPLATE = """
62+
WITH
63+
$power_range,
64+
sessionf AS (
65+
SELECT
66+
point_de_charge_id,
67+
sum(session.end -session.start) as duree_pdc
68+
FROM
69+
session
70+
WHERE
71+
$timespan
72+
GROUP BY
73+
point_de_charge_id
74+
)
75+
SELECT
76+
extract ('epoch' from sum(duree_pdc)) / 3600.0 AS value,
77+
category
78+
FROM
79+
sessionf
80+
INNER JOIN PointDeCharge ON sessionf.point_de_charge_id = PointDeCharge.id
81+
LEFT JOIN puissance ON puissance_nominale::numeric <@ category
82+
GROUP BY
83+
category
84+
"""
85+
86+
87+
@task(task_run_name="values-for-target-{level:02d}")
88+
def get_values_for_targets(
89+
connection: Connection,
90+
level: Level,
91+
timespan: IndicatorTimeSpan,
92+
indexes: List[UUID],
93+
) -> pd.DataFrame:
94+
"""Fetch sessions given input level, timestamp and target index."""
95+
query_template = Template(DURATION_FOR_LEVEL_QUERY_TEMPLATE)
96+
query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))}
97+
query_params |= POWER_RANGE_CTE
98+
query_params |= get_num_for_level_query_params(level)
99+
query_params |= get_timespan_filter_query_params(timespan, session=True)
100+
return pd.read_sql_query(query_template.substitute(query_params), con=connection)
101+
102+
103+
@flow(
104+
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
105+
flow_run_name="u6-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}",
106+
)
107+
def u6_for_level(
108+
level: Level,
109+
timespan: IndicatorTimeSpan,
110+
chunk_size=settings.DEFAULT_CHUNK_SIZE,
111+
) -> pd.DataFrame:
112+
"""Calculate u6 for a level and a timestamp."""
113+
if level == Level.NATIONAL:
114+
return u6_national(timespan)
115+
engine = get_database_engine()
116+
with engine.connect() as connection:
117+
targets = get_targets_for_level(connection, level)
118+
ids = targets["id"]
119+
chunks = (
120+
np.array_split(ids, int(len(ids) / chunk_size))
121+
if len(ids) > chunk_size
122+
else [ids.to_numpy()]
123+
)
124+
futures = [
125+
get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload]
126+
for chunk in chunks
127+
]
128+
wait(futures)
129+
130+
# Concatenate results and serialize indicators
131+
results = pd.concat([future.result() for future in futures], ignore_index=True)
132+
merged = targets.merge(results, how="left", left_on="id", right_on="level_id")
133+
134+
# Build result DataFrame
135+
indicators = {
136+
"target": merged["code"],
137+
"value": merged["value"].fillna(0),
138+
"code": "u6",
139+
"level": level,
140+
"period": timespan.period,
141+
"timestamp": timespan.start.isoformat(),
142+
"category": merged["category"].astype("str"),
143+
"extras": None,
144+
}
145+
return pd.DataFrame(indicators)
146+
147+
148+
@flow(
149+
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
150+
flow_run_name="u6-{timespan.period.value}-00-{timespan.start:%y-%m-%d}",
151+
)
152+
def u6_national(timespan: IndicatorTimeSpan) -> pd.DataFrame:
153+
"""Calculate u6 at the national level."""
154+
engine = get_database_engine()
155+
query_template = Template(QUERY_NATIONAL_TEMPLATE)
156+
query_params = get_timespan_filter_query_params(timespan, session=True)
157+
query_params |= POWER_RANGE_CTE
158+
with engine.connect() as connection:
159+
res = pd.read_sql_query(query_template.substitute(query_params), con=connection)
160+
indicators = {
161+
"target": None,
162+
"value": res["value"].fillna(0),
163+
"code": "u6",
164+
"level": Level.NATIONAL,
165+
"period": timespan.period,
166+
"timestamp": timespan.start.isoformat(),
167+
"category": res["category"].astype("str"),
168+
"extras": None,
169+
}
170+
return pd.DataFrame(indicators)
171+
172+
173+
@flow(
174+
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
175+
flow_run_name="meta-u6-{timespan.period.value}",
176+
)
177+
def calculate(
178+
timespan: IndicatorTimeSpan,
179+
levels: List[Level],
180+
create_artifact: bool = False,
181+
chunk_size: int = 1000,
182+
format_pd: bool = False,
183+
) -> List[Indicator]:
184+
"""Run all u6 subflows."""
185+
subflows_results = [
186+
u6_for_level(level, timespan, chunk_size=chunk_size) for level in levels
187+
]
188+
indicators = pd.concat(subflows_results, ignore_index=True)
189+
description = f"u6 report at {timespan.start} (period: {timespan.period.value})"
190+
flow_name = runtime.flow_run.name
191+
return export_indic(indicators, create_artifact, flow_name, description, format_pd)

src/prefect/prefect.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,20 @@ deployments:
7373
work_pool:
7474
name: indicators
7575
work_queue_name: default
76+
- name: u6-daily
77+
entrypoint: indicators/run.py:u6_calculate
78+
concurrency_limit: 10
79+
schedules:
80+
- cron: "5 15 * * *"
81+
timezone: "Europe/Paris"
82+
active: true
83+
parameters:
84+
period: d
85+
create_artifact: true
86+
chunk_size: 1000
87+
work_pool:
88+
name: indicators
89+
work_queue_name: default
7690
- name: u9-daily
7791
entrypoint: indicators/run.py:u9_calculate
7892
concurrency_limit: 10
@@ -129,3 +143,17 @@ deployments:
129143
work_pool:
130144
name: indicators
131145
work_queue_name: default
146+
- name: u13-daily
147+
entrypoint: indicators/run.py:u13_calculate
148+
concurrency_limit: 10
149+
schedules:
150+
- cron: "14 15 * * *"
151+
timezone: "Europe/Paris"
152+
active: true
153+
parameters:
154+
period: d
155+
create_artifact: true
156+
chunk_size: 1000
157+
work_pool:
158+
name: indicators
159+
work_queue_name: default

src/prefect/tests/usage/test_u6.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
"""QualiCharge prefect indicators tests: usage.
2+
3+
U6: session duration by power category.
4+
"""
5+
6+
from datetime import datetime
7+
8+
import pytest # type: ignore
9+
from sqlalchemy import text
10+
11+
from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore
12+
from indicators.usage import u6 # type: ignore
13+
14+
from ..param_tests import (
15+
PARAM_FLOW,
16+
PARAM_VALUE,
17+
PARAMETERS_CHUNK,
18+
)
19+
20+
# expected result for level [city, epci, dpt, reg]
21+
N_LEVEL = [18, 167, 85, 584]
22+
N_LEVEL_NATIONAL = 1509
23+
24+
TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY)
25+
PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)]
26+
PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)]
27+
28+
29+
@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE)
30+
def test_task_get_values_for_target(db_connection, level, query, expected):
31+
"""Test the `get_values_for_target` task."""
32+
result = db_connection.execute(text(query))
33+
indexes = list(result.scalars().all())
34+
values = u6.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes)
35+
assert len(set(values["level_id"])) == len(indexes)
36+
assert int(values["value"].sum()) == expected
37+
38+
39+
def test_task_get_values_for_target_unexpected_level(db_connection):
40+
"""Test the `get_values_for_target` task (unknown level)."""
41+
with pytest.raises(NotImplementedError, match="Unsupported level"):
42+
u6.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, [])
43+
44+
45+
@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW)
46+
def test_flow_u6_for_level(db_connection, level, query, targets, expected):
47+
"""Test the `u6_for_level` flow."""
48+
indicators = u6.u6_for_level(level, TIMESPAN, chunk_size=1000)
49+
# assert len(indicators) == db_connection.execute(text(query)).scalars().one()
50+
assert (
51+
int(indicators.loc[indicators["target"].isin(targets), "value"].sum())
52+
== expected
53+
)
54+
55+
56+
@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK)
57+
def test_flow_u6_for_level_with_various_chunk_sizes(chunk_size):
58+
"""Test the `u6_for_level` flow with various chunk sizes."""
59+
level, query, targets, expected = PARAMETERS_FLOW[2]
60+
indicators = u6.u6_for_level(level, TIMESPAN, chunk_size=chunk_size)
61+
# assert len(indicators) == N_DPTS
62+
assert (
63+
int(indicators.loc[indicators["target"].isin(targets), "value"].sum())
64+
== expected
65+
)
66+
67+
68+
def test_flow_u6_national(db_connection):
69+
"""Test the `u6_national` flow."""
70+
indicators = u6.u6_national(TIMESPAN)
71+
assert int(indicators["value"].sum()) == N_LEVEL_NATIONAL
72+
73+
74+
def test_flow_u6_calculate(db_connection):
75+
"""Test the `calculate` flow."""
76+
all_levels = [
77+
Level.NATIONAL,
78+
Level.REGION,
79+
Level.DEPARTMENT,
80+
Level.CITY,
81+
Level.EPCI,
82+
]
83+
indicators = u6.calculate(
84+
TIMESPAN, all_levels, create_artifact=True, format_pd=True
85+
)
86+
assert list(indicators["level"].unique()) == all_levels
87+
88+
89+
# query used to get N_LEVEL
90+
N_LEVEL_NAT = """
91+
WITH
92+
sessionf AS (
93+
SELECT
94+
point_de_charge_id,
95+
sum(SESSION.end - SESSION.start) AS duree_pdc
96+
FROM
97+
SESSION
98+
WHERE
99+
START >= date '2024-12-24'
100+
AND START < date '2024-12-25'
101+
GROUP BY
102+
point_de_charge_id
103+
)
104+
SELECT
105+
extract(
106+
'epoch'
107+
FROM
108+
sum(duree_pdc)
109+
) / 3600.0 AS duree
110+
FROM
111+
sessionf
112+
INNER JOIN PointDeCharge ON sessionf.point_de_charge_id = PointDeCharge.id
113+
LEFT JOIN station ON station_id = station.id
114+
LEFT JOIN localisation ON localisation_id = localisation.id
115+
LEFT JOIN city ON city.code = code_insee_commune
116+
LEFT JOIN department ON city.department_id = department.id
117+
LEFT JOIN region ON department.region_id = region.id
118+
"""
119+
N_LEVEL_3 = """
120+
WITH
121+
sessionf AS (
122+
SELECT
123+
point_de_charge_id,
124+
sum(SESSION.end - SESSION.start) AS duree_pdc
125+
FROM
126+
SESSION
127+
WHERE
128+
START >= date '2024-12-24'
129+
AND START < date '2024-12-25'
130+
GROUP BY
131+
point_de_charge_id
132+
)
133+
SELECT
134+
extract(
135+
'epoch'
136+
FROM
137+
sum(duree_pdc)
138+
) / 3600.0 AS duree
139+
FROM
140+
sessionf
141+
INNER JOIN PointDeCharge ON sessionf.point_de_charge_id = PointDeCharge.id
142+
LEFT JOIN station ON station_id = station.id
143+
LEFT JOIN localisation ON localisation_id = localisation.id
144+
LEFT JOIN city ON city.code = code_insee_commune
145+
LEFT JOIN department ON city.department_id = department.id
146+
LEFT JOIN region ON department.region_id = region.id
147+
WHERE
148+
region.code IN ('11', '84', '75')
149+
"""

0 commit comments

Comments
 (0)