From 5473f46974c117c4d51dd657d47201243f6c1179 Mon Sep 17 00:00:00 2001 From: philippe thomy Date: Mon, 13 Jan 2025 22:25:45 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=96(prefect)=20add=20static=20and=20se?= =?UTF-8?q?ssion=20indicators?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - add indicators i4, i7 - add indicators u10, u11, u12, u13, u9, u5, u6 - add indicators c1, c2 - modif : timespan, tests, national, statique --- .vscode/launch.json | 15 ++ src/prefect/CHANGELOG.md | 17 +- src/prefect/indicators/infrastructure/i1.py | 64 +++---- src/prefect/indicators/infrastructure/i4.py | 139 ++++++++++++++ src/prefect/indicators/infrastructure/i7.py | 143 +++++++++++++++ src/prefect/indicators/infrastructure/t1.py | 79 ++++---- src/prefect/indicators/run.py | 11 ++ src/prefect/indicators/usage/__init__.py | 1 + src/prefect/indicators/usage/c1.py | 161 ++++++++++++++++ src/prefect/indicators/usage/c2.py | 161 ++++++++++++++++ src/prefect/indicators/usage/u10.py | 157 ++++++++++++++++ src/prefect/indicators/usage/u11.py | 160 ++++++++++++++++ src/prefect/indicators/usage/u12.py | 189 +++++++++++++++++++ src/prefect/indicators/usage/u13.py | 180 ++++++++++++++++++ src/prefect/indicators/usage/u5.py | 163 +++++++++++++++++ src/prefect/indicators/usage/u6.py | 193 ++++++++++++++++++++ src/prefect/indicators/usage/u9.py | 193 ++++++++++++++++++++ src/prefect/indicators/utils.py | 52 +++++- src/prefect/prefect.yaml | 166 +++++++++++++++++ src/prefect/tests/infrastructure/test_i1.py | 130 ++++++------- src/prefect/tests/infrastructure/test_i4.py | 82 +++++++++ src/prefect/tests/infrastructure/test_i7.py | 90 +++++++++ src/prefect/tests/infrastructure/test_t1.py | 106 ++++------- src/prefect/tests/param_tests.py | 53 ++++++ src/prefect/tests/usage/__init__.py | 1 + src/prefect/tests/usage/test_c1.py | 106 +++++++++++ src/prefect/tests/usage/test_c2.py | 112 ++++++++++++ src/prefect/tests/usage/test_u10.py | 81 ++++++++ src/prefect/tests/usage/test_u11.py | 112 ++++++++++++ src/prefect/tests/usage/test_u12.py | 85 +++++++++ src/prefect/tests/usage/test_u13.py | 86 +++++++++ src/prefect/tests/usage/test_u5.py | 88 +++++++++ src/prefect/tests/usage/test_u6.py | 149 +++++++++++++++ src/prefect/tests/usage/test_u9.py | 89 +++++++++ 34 files changed, 3380 insertions(+), 234 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 src/prefect/indicators/infrastructure/i4.py create mode 100644 src/prefect/indicators/infrastructure/i7.py create mode 100644 src/prefect/indicators/usage/__init__.py create mode 100644 src/prefect/indicators/usage/c1.py create mode 100644 src/prefect/indicators/usage/c2.py create mode 100644 src/prefect/indicators/usage/u10.py create mode 100644 src/prefect/indicators/usage/u11.py create mode 100644 src/prefect/indicators/usage/u12.py create mode 100644 src/prefect/indicators/usage/u13.py create mode 100644 src/prefect/indicators/usage/u5.py create mode 100644 src/prefect/indicators/usage/u6.py create mode 100644 src/prefect/indicators/usage/u9.py create mode 100644 src/prefect/tests/infrastructure/test_i4.py create mode 100644 src/prefect/tests/infrastructure/test_i7.py create mode 100644 src/prefect/tests/param_tests.py create mode 100644 src/prefect/tests/usage/__init__.py create mode 100644 src/prefect/tests/usage/test_c1.py create mode 100644 src/prefect/tests/usage/test_c2.py create mode 100644 src/prefect/tests/usage/test_u10.py create mode 100644 src/prefect/tests/usage/test_u11.py create mode 100644 src/prefect/tests/usage/test_u12.py create mode 100644 src/prefect/tests/usage/test_u13.py create mode 100644 src/prefect/tests/usage/test_u5.py create mode 100644 src/prefect/tests/usage/test_u6.py create mode 100644 src/prefect/tests/usage/test_u9.py diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..6b76b4fa --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal" + } + ] +} \ No newline at end of file diff --git a/src/prefect/CHANGELOG.md b/src/prefect/CHANGELOG.md index 2949a8ad..4aa3e890 100644 --- a/src/prefect/CHANGELOG.md +++ b/src/prefect/CHANGELOG.md @@ -10,7 +10,20 @@ and this project adheres to ### Added -- Implement t1 workflow -- Implement i1 workflow +- Implement i4 workflow +- Implement i7 workflow +- Implement c1 workflow +- Implement c2 workflow +- Implement u5 workflow +- Implement u6 workflow +- Implement u9 workflow +- Implement u10 workflow +- Implement u11 workflow +- Implement u12 workflow +- Implement u13 workflow + +### Updated + +- use Statique table instead of PointDeCharge table [unreleased]: https://github.com/MTES-MCT/qualicharge/ diff --git a/src/prefect/indicators/infrastructure/i1.py b/src/prefect/indicators/infrastructure/i1.py index ff74a383..13060e42 100644 --- a/src/prefect/indicators/infrastructure/i1.py +++ b/src/prefect/indicators/infrastructure/i1.py @@ -3,7 +3,6 @@ I1: the number of publicly open points of charge. """ -from datetime import datetime from string import Template from typing import List from uuid import UUID @@ -11,15 +10,15 @@ import numpy as np import pandas as pd # type: ignore from prefect import flow, runtime, task -from prefect.artifacts import create_markdown_artifact from prefect.futures import wait from prefect.task_runners import ThreadPoolTaskRunner from sqlalchemy import text from sqlalchemy.engine import Connection from ..conf import settings -from ..models import Indicator, IndicatorPeriod, Level +from ..models import Indicator, IndicatorTimeSpan, Level from ..utils import ( + export_indic, get_database_engine, get_num_for_level_query_params, get_targets_for_level, @@ -27,13 +26,14 @@ NUM_POCS_FOR_LEVEL_QUERY_TEMPLATE = """ SELECT - COUNT(DISTINCT PointDeCharge.id_pdc_itinerance) AS value, + COUNT(DISTINCT id_pdc_itinerance) AS value, $level_id AS level_id FROM - PointDeCharge - INNER JOIN Station ON PointDeCharge.station_id = Station.id - INNER JOIN Localisation ON Station.localisation_id = Localisation.id - INNER JOIN City ON Localisation.code_insee_commune = City.code + Statique + --PointDeCharge + --INNER JOIN Station ON PointDeCharge.station_id = Station.id + --INNER JOIN Localisation ON Station.localisation_id = Localisation.id + INNER JOIN City ON code_insee_commune = City.code $join_extras WHERE $level_id IN ($indexes) GROUP BY $level_id @@ -46,22 +46,23 @@ def get_values_for_targets( ) -> pd.DataFrame: """Fetch points of charge given input level and target index.""" query_template = Template(NUM_POCS_FOR_LEVEL_QUERY_TEMPLATE) - query_params: dict = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} query_params |= get_num_for_level_query_params(level) return pd.read_sql_query(query_template.substitute(query_params), con=connection) @flow( task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), - flow_run_name="i1-{period.value}-{level:02d}-{at:%y-%m-%d}", + flow_run_name="i1-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", ) def i1_for_level( level: Level, - period: IndicatorPeriod, - at: datetime, + timespan: IndicatorTimeSpan, chunk_size=settings.DEFAULT_CHUNK_SIZE, ) -> pd.DataFrame: """Calculate i1 for a level.""" + if level == Level.NATIONAL: + return i1_national(timespan) engine = get_database_engine() with engine.connect() as connection: targets = get_targets_for_level(connection, level) @@ -87,8 +88,8 @@ def i1_for_level( "value": merged["value"].fillna(0), "code": "i1", "level": level, - "period": period, - "timestamp": at, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), "category": None, "extras": None, } @@ -97,9 +98,9 @@ def i1_for_level( @flow( task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), - flow_run_name="i1-{period.value}-00-{at:%y-%m-%d}", + flow_run_name="i1-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", ) -def i1_national(period: IndicatorPeriod, at: datetime) -> pd.DataFrame: +def i1_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: """Calculate i1 at the national level.""" engine = get_database_engine() with engine.connect() as connection: @@ -111,9 +112,9 @@ def i1_national(period: IndicatorPeriod, at: datetime) -> pd.DataFrame: Indicator( code="i1", level=Level.NATIONAL, - period=period, + period=timespan.period, value=count, - timestamp=at, + timestamp=timespan.start.isoformat(), ).model_dump(), ] ) @@ -121,27 +122,20 @@ def i1_national(period: IndicatorPeriod, at: datetime) -> pd.DataFrame: @flow( task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), - flow_run_name="meta-i1-{period.value}", + flow_run_name="meta-i1-{timespan.period.value}", ) def calculate( - period: IndicatorPeriod, create_artifact: bool = False, chunk_size: int = 1000 + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, ) -> List[Indicator]: """Run all i1 subflows.""" - now = pd.Timestamp.now() subflows_results = [ - i1_national(period, now), - i1_for_level(Level.REGION, period, now, chunk_size=chunk_size), - i1_for_level(Level.DEPARTMENT, period, now, chunk_size=chunk_size), - i1_for_level(Level.EPCI, period, now, chunk_size=chunk_size), - i1_for_level(Level.CITY, period, now, chunk_size=chunk_size), + i1_for_level(level, timespan, chunk_size=chunk_size) for level in levels ] indicators = pd.concat(subflows_results, ignore_index=True) - - if create_artifact: - create_markdown_artifact( - key=runtime.flow_run.name, - markdown=indicators.to_markdown(), - description=f"i1 report at {now} (period: {period.value})", - ) - - return [Indicator(**record) for record in indicators.to_dict(orient="records")] # type: ignore[misc] + description = f"i1 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/infrastructure/i4.py b/src/prefect/indicators/infrastructure/i4.py new file mode 100644 index 00000000..df649e46 --- /dev/null +++ b/src/prefect/indicators/infrastructure/i4.py @@ -0,0 +1,139 @@ +"""QualiCharge prefect indicators: infrastructure. + +I4: the number of publicly open stations. +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy import text +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, +) + +NUM_STATIONS_FOR_LEVEL_QUERY_TEMPLATE = """ + SELECT + COUNT(DISTINCT id_station_itinerance) AS value, + $level_id AS level_id + FROM + Station + INNER JOIN Localisation ON Station.localisation_id = Localisation.id + INNER JOIN City ON Localisation.code_insee_commune = City.code + $join_extras + WHERE $level_id IN ($indexes) + GROUP BY $level_id + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, level: Level, indexes: List[UUID] +) -> pd.DataFrame: + """Fetch station given input level and target index.""" + query_template = Template(NUM_STATIONS_FOR_LEVEL_QUERY_TEMPLATE) + query_params: dict = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= get_num_for_level_query_params(level) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="i4-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def i4_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate i4 for a level.""" + if level == Level.NATIONAL: + return i4_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "i4", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": None, + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="i4-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def i4_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate i4 at the national level.""" + engine = get_database_engine() + with engine.connect() as connection: + result = connection.execute(text("SELECT COUNT(*) FROM Station")) + count = result.one()[0] + + return pd.DataFrame.from_records( + [ + Indicator( + code="i4", + level=Level.NATIONAL, + period=timespan.period, + value=count, + timestamp=timespan.start.isoformat(), + ).model_dump(), + ] + ) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-i4-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all i4 subflows.""" + subflows_results = [ + i4_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"i4 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/infrastructure/i7.py b/src/prefect/indicators/infrastructure/i7.py new file mode 100644 index 00000000..15719cf8 --- /dev/null +++ b/src/prefect/indicators/infrastructure/i7.py @@ -0,0 +1,143 @@ +"""QualiCharge prefect indicators: infrastructure. + +I7: installed power. +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy import text +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, +) + +SUM_POWER_FOR_LEVEL_QUERY_TEMPLATE = """ + SELECT + sum(puissance_nominale) AS value, + $level_id AS level_id + FROM + statique + --pointdecharge + --INNER JOIN station ON station.id = station_id + --INNER JOIN localisation ON localisation_id = localisation.id + INNER JOIN city on city.code = code_insee_commune + $join_extras + WHERE $level_id IN ($indexes) + GROUP BY $level_id + ORDER BY value DESC + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, level: Level, indexes: List[UUID] +) -> pd.DataFrame: + """Fetch pdc given input level and target index.""" + query_template = Template(SUM_POWER_FOR_LEVEL_QUERY_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= get_num_for_level_query_params(level) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="i7-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def i7_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate i7 for a level.""" + if level == Level.NATIONAL: + return i7_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "i7", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": None, + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="i7-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def i7_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate i7 at the national level.""" + engine = get_database_engine() + with engine.connect() as connection: + result = connection.execute( + text("SELECT sum(puissance_nominale) FROM pointdecharge") + ) + count = result.one()[0] + return pd.DataFrame.from_records( + [ + Indicator( + code="i7", + level=Level.NATIONAL, + period=timespan.period, + value=count, + timestamp=timespan.start.isoformat(), + ).model_dump(), + ] + ) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-i7-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all i7 subflows.""" + subflows_results = [ + i7_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"i7 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/infrastructure/t1.py b/src/prefect/indicators/infrastructure/t1.py index 708feef7..00bcc679 100644 --- a/src/prefect/indicators/infrastructure/t1.py +++ b/src/prefect/indicators/infrastructure/t1.py @@ -3,7 +3,6 @@ T1: the number of publicly open points of charge by power level. """ -from datetime import datetime from string import Template from typing import List from uuid import UUID @@ -11,15 +10,15 @@ import numpy as np import pandas as pd # type: ignore from prefect import flow, runtime, task -from prefect.artifacts import create_markdown_artifact from prefect.futures import wait from prefect.task_runners import ThreadPoolTaskRunner from sqlalchemy.engine import Connection from ..conf import settings -from ..models import Indicator, IndicatorPeriod, Level +from ..models import Indicator, IndicatorTimeSpan, Level from ..utils import ( POWER_RANGE_CTE, + export_indic, get_database_engine, get_num_for_level_query_params, get_targets_for_level, @@ -29,14 +28,15 @@ WITH $power_range SELECT - COUNT(DISTINCT PointDeCharge.id_pdc_itinerance) AS value, + COUNT(DISTINCT id_pdc_itinerance) AS value, category, $level_id AS level_id FROM - PointDeCharge - INNER JOIN Station ON PointDeCharge.station_id = Station.id - INNER JOIN Localisation ON Station.localisation_id = Localisation.id - INNER JOIN City ON Localisation.code_insee_commune = City.code + Statique + --PointDeCharge + --INNER JOIN Station ON PointDeCharge.station_id = Station.id + --INNER JOIN Localisation ON Station.localisation_id = Localisation.id + INNER JOIN City ON code_insee_commune = City.code LEFT JOIN puissance ON puissance_nominale::numeric <@ category $join_extras WHERE @@ -51,10 +51,11 @@ WITH $power_range SELECT - COUNT(DISTINCT PointDeCharge.id_pdc_itinerance) AS value, + COUNT(DISTINCT id_pdc_itinerance) AS value, category FROM - PointDeCharge + Statique + --PointDeCharge LEFT JOIN puissance ON puissance_nominale::numeric <@ category GROUP BY category @@ -69,25 +70,24 @@ def get_values_for_targets( ) -> pd.DataFrame: """Fetch points of charge per power level given input level and target index.""" query_template = Template(NUM_POCS_BY_POWER_RANGE_FOR_LEVEL_QUERY_TEMPLATE) - query_params: dict = { - "indexes": ",".join(f"'{i}'" for i in map(str, indexes)), - "power_range": POWER_RANGE_CTE, - } + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= POWER_RANGE_CTE query_params |= get_num_for_level_query_params(level) return pd.read_sql_query(query_template.substitute(query_params), con=connection) @flow( task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), - flow_run_name="t1-{period.value}-{level:02d}-{at:%y-%m-%d}", + flow_run_name="t1-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", ) def t1_for_level( level: Level, - period: IndicatorPeriod, - at: datetime, + timespan: IndicatorTimeSpan, chunk_size=settings.DEFAULT_CHUNK_SIZE, ) -> pd.DataFrame: """Calculate t1 for a level.""" + if level == Level.NATIONAL: + return t1_national(timespan) engine = get_database_engine() with engine.connect() as connection: targets = get_targets_for_level(connection, level) @@ -113,8 +113,8 @@ def t1_for_level( "value": merged["value"].fillna(0), "code": "t1", "level": level, - "period": period, - "timestamp": at, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), "category": merged["category"].astype("str"), "extras": None, } @@ -123,23 +123,22 @@ def t1_for_level( @flow( task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), - flow_run_name="t1-{period.value}-00-{at:%y-%m-%d}", + flow_run_name="t1-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", ) -def t1_national(period: IndicatorPeriod, at: datetime) -> pd.DataFrame: +def t1_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: """Calculate t1 at the national level.""" engine = get_database_engine() query_template = Template(QUERY_NATIONAL_TEMPLATE) - query_params = {"power_range": POWER_RANGE_CTE} + query_params = POWER_RANGE_CTE with engine.connect() as connection: - # result = pd.read_sql_query(QUERY_NATIONAL, con=connection) res = pd.read_sql_query(query_template.substitute(query_params), con=connection) indicators = { "target": None, "value": res["value"].fillna(0), "code": "t1", "level": Level.NATIONAL, - "period": period, - "timestamp": at, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), "category": res["category"].astype("str"), "extras": None, } @@ -148,26 +147,20 @@ def t1_national(period: IndicatorPeriod, at: datetime) -> pd.DataFrame: @flow( task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), - flow_run_name="meta-t1-{period.value}", + flow_run_name="meta-t1-{timespan.period.value}", ) def calculate( - period: IndicatorPeriod, create_artifact: bool = False, chunk_size: int = 1000 + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, ) -> List[Indicator]: """Run all t1 subflows.""" - now = pd.Timestamp.now() - subflows_res = [ - t1_national(period, now), - t1_for_level(Level.REGION, period, now, chunk_size=chunk_size), - t1_for_level(Level.DEPARTMENT, period, now, chunk_size=chunk_size), - t1_for_level(Level.EPCI, period, now, chunk_size=chunk_size), - t1_for_level(Level.CITY, period, now, chunk_size=chunk_size), + subflows_results = [ + t1_for_level(level, timespan, chunk_size=chunk_size) for level in levels ] - indicators = pd.concat(subflows_res, ignore_index=True) - if create_artifact: - create_markdown_artifact( - key=runtime.flow_run.name, - markdown=indicators.to_markdown(), - description=f"t1 report at {now} (period: {period.value})", - ) - - return [Indicator(**record) for record in indicators.to_dict(orient="records")] # type: ignore[misc] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"t1 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/run.py b/src/prefect/indicators/run.py index 55c9fb42..37b8f653 100644 --- a/src/prefect/indicators/run.py +++ b/src/prefect/indicators/run.py @@ -2,4 +2,15 @@ # ruff: noqa: F401 from .infrastructure.i1 import calculate as i1_calculate +from .infrastructure.i4 import calculate as i4_calculate +from .infrastructure.i7 import calculate as i7_calculate from .infrastructure.t1 import calculate as t1_calculate +from .usage.c1 import calculate as c1_calculate +from .usage.c2 import calculate as c2_calculate +from .usage.u5 import calculate as u5_calculate +from .usage.u6 import calculate as u6_calculate +from .usage.u9 import calculate as u9_calculate +from .usage.u10 import calculate as u10_calculate +from .usage.u11 import calculate as u11_calculate +from .usage.u12 import calculate as u12_calculate +from .usage.u13 import calculate as u13_calculate diff --git a/src/prefect/indicators/usage/__init__.py b/src/prefect/indicators/usage/__init__.py new file mode 100644 index 00000000..d1f9f6fc --- /dev/null +++ b/src/prefect/indicators/usage/__init__.py @@ -0,0 +1 @@ +"""QualiCharge prefect indicators: usage.""" diff --git a/src/prefect/indicators/usage/c1.py b/src/prefect/indicators/usage/c1.py new file mode 100644 index 00000000..96bbc046 --- /dev/null +++ b/src/prefect/indicators/usage/c1.py @@ -0,0 +1,161 @@ +"""QualiCharge prefect indicators: usage. + +C1: Number of sessions by operator. +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, + get_timespan_filter_query_params, +) + +SESSIONS_BY_OPERATOR_QUERY_TEMPLATE = """ + SELECT + count(*) AS value, + nom_operateur AS category, + $level_id AS level_id + FROM + Session + INNER JOIN statique ON point_de_charge_id = pdc_id + LEFT JOIN City ON City.code = code_insee_commune + $join_extras + WHERE + $timespan + AND $level_id IN ($indexes) + GROUP BY + category, + $level_id + """ + +QUERY_NATIONAL_TEMPLATE = """ + SELECT + count(*) AS value, + nom_operateur AS category + FROM + SESSION + INNER JOIN statique ON point_de_charge_id = pdc_id + WHERE + $timespan + GROUP BY + category + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, + level: Level, + timespan: IndicatorTimeSpan, + indexes: List[UUID], +) -> pd.DataFrame: + """Fetch sessions given input level, timestamp and target index.""" + query_template = Template(SESSIONS_BY_OPERATOR_QUERY_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= get_num_for_level_query_params(level) + query_params |= get_timespan_filter_query_params(timespan, session=True) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="c1-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def c1_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate c1 for a level and a timestamp.""" + if level == Level.NATIONAL: + return c1_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "c1", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": merged["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="c1-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def c1_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate c1 at the national level.""" + engine = get_database_engine() + query_template = Template(QUERY_NATIONAL_TEMPLATE) + query_params = get_timespan_filter_query_params(timespan, session=True) + with engine.connect() as connection: + res = pd.read_sql_query(query_template.substitute(query_params), con=connection) + indicators = { + "target": None, + "value": res["value"].fillna(0), + "code": "c1", + "level": Level.NATIONAL, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": res["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-c1-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all c1 subflows.""" + subflows_results = [ + c1_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"c1 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/usage/c2.py b/src/prefect/indicators/usage/c2.py new file mode 100644 index 00000000..0d548b20 --- /dev/null +++ b/src/prefect/indicators/usage/c2.py @@ -0,0 +1,161 @@ +"""QualiCharge prefect indicators: usage. + +C2: Energy cumulate by operator. +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, + get_timespan_filter_query_params, +) + +SESSIONS_BY_OPERATOR_QUERY_TEMPLATE = """ + SELECT + sum(energy) / 1000.0 AS value, + nom_operateur AS category, + $level_id AS level_id + FROM + Session + INNER JOIN statique ON point_de_charge_id = pdc_id + LEFT JOIN City ON City.code = code_insee_commune + $join_extras + WHERE + $timespan + AND $level_id IN ($indexes) + GROUP BY + category, + $level_id + """ + +QUERY_NATIONAL_TEMPLATE = """ + SELECT + sum(energy) / 1000.0 AS value, + nom_operateur AS category + FROM + SESSION + INNER JOIN statique ON point_de_charge_id = pdc_id + WHERE + $timespan + GROUP BY + category + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, + level: Level, + timespan: IndicatorTimeSpan, + indexes: List[UUID], +) -> pd.DataFrame: + """Fetch sessions given input level, timestamp and target index.""" + query_template = Template(SESSIONS_BY_OPERATOR_QUERY_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= get_num_for_level_query_params(level) + query_params |= get_timespan_filter_query_params(timespan, session=True) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="c2-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def c2_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate c2 for a level and a timestamp.""" + if level == Level.NATIONAL: + return c2_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "c2", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": merged["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="c2-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def c2_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate c2 at the national level.""" + engine = get_database_engine() + query_template = Template(QUERY_NATIONAL_TEMPLATE) + query_params = get_timespan_filter_query_params(timespan, session=True) + with engine.connect() as connection: + res = pd.read_sql_query(query_template.substitute(query_params), con=connection) + indicators = { + "target": None, + "value": res["value"].fillna(0), + "code": "c2", + "level": Level.NATIONAL, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": res["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-c2-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all c2 subflows.""" + subflows_results = [ + c2_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"c2 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/usage/u10.py b/src/prefect/indicators/usage/u10.py new file mode 100644 index 00000000..6d78de82 --- /dev/null +++ b/src/prefect/indicators/usage/u10.py @@ -0,0 +1,157 @@ +"""QualiCharge prefect indicators: usage. + +U10: the number of sessions. +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, + get_timespan_filter_query_params, +) + +NUM_SESSIONS_FOR_LEVEL_QUERY_TEMPLATE = """ + SELECT + count(*) AS value, + $level_id AS level_id + FROM + SESSION + INNER JOIN statique ON point_de_charge_id = pdc_id + --INNER JOIN PointDeCharge ON point_de_charge_id = PointDeCharge.id + --LEFT JOIN station ON station_id = station.id + --LEFT JOIN localisation ON localisation_id = localisation.id + LEFT JOIN city ON city.code = code_insee_commune + $join_extras + WHERE + $timespan + AND $level_id IN ($indexes) + GROUP BY $level_id + """ + +QUERY_NATIONAL_TEMPLATE = """ + SELECT + count(*) AS value + FROM + SESSION + WHERE + $timespan + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, + level: Level, + timespan: IndicatorTimeSpan, + indexes: List[UUID], +) -> pd.DataFrame: + """Fetch sessions given input level, timestamp and target index.""" + query_template = Template(NUM_SESSIONS_FOR_LEVEL_QUERY_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= get_num_for_level_query_params(level) + query_params |= get_timespan_filter_query_params(timespan) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u10-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def u10_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate u10 for a level and a timestamp.""" + if level == Level.NATIONAL: + return u10_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "u10", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": None, + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u10-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def u10_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate u10 at the national level.""" + engine = get_database_engine() + query_template = Template(QUERY_NATIONAL_TEMPLATE) + query_params = get_timespan_filter_query_params(timespan) + with engine.connect() as connection: + res = pd.read_sql_query(query_template.substitute(query_params), con=connection) + indicators = { + "target": None, + "value": res["value"].fillna(0), + "code": "u10", + "level": Level.NATIONAL, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": None, + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-u10-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all u10 subflows.""" + subflows_results = [ + u10_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"u10 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/usage/u11.py b/src/prefect/indicators/usage/u11.py new file mode 100644 index 00000000..a1cf003f --- /dev/null +++ b/src/prefect/indicators/usage/u11.py @@ -0,0 +1,160 @@ +"""QualiCharge prefect indicators: usage. + +U11: the number of successful sessions. +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, + get_timespan_filter_query_params, +) + +NUM_SUCCESSFUL_SESSIONS_FOR_LEVEL_QUERY_TEMPLATE = """ + SELECT + count(*) AS value, + $level_id AS level_id + FROM + SESSION + INNER JOIN statique ON point_de_charge_id = pdc_id + --INNER JOIN PointDeCharge ON point_de_charge_id = PointDeCharge.id + --LEFT JOIN Station ON station_id = Station.id + --LEFT JOIN Localisation ON localisation_id = Localisation.id + LEFT JOIN City ON City.code = code_insee_commune + $join_extras + WHERE + $timespan + AND $level_id IN ($indexes) + AND energy > 0.5 + AND Session.end - Session.start > '3 minutes'::interval + GROUP BY $level_id + """ + +QUERY_NATIONAL_TEMPLATE = """ + SELECT + count(*) AS value + FROM + SESSION + WHERE + $timespan + AND energy > 0.5 + AND Session.end - Session.start > '3 minutes'::interval + """ + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, + level: Level, + timespan: IndicatorTimeSpan, + indexes: List[UUID], +) -> pd.DataFrame: + """Fetch sessions given input level, timestamp and target index.""" + query_template = Template(NUM_SUCCESSFUL_SESSIONS_FOR_LEVEL_QUERY_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= get_num_for_level_query_params(level) + query_params |= get_timespan_filter_query_params(timespan) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u11-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def u11_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate u11 for a level and a timestamp.""" + if level == Level.NATIONAL: + return u11_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "u11", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": None, + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u11-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def u11_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate u11 at the national level.""" + engine = get_database_engine() + query_template = Template(QUERY_NATIONAL_TEMPLATE) + query_params = get_timespan_filter_query_params(timespan) + with engine.connect() as connection: + res = pd.read_sql_query(query_template.substitute(query_params), con=connection) + indicators = { + "target": None, + "value": res["value"].fillna(0), + "code": "u11", + "level": Level.NATIONAL, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": None, + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-u11-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all u11 subflows.""" + subflows_results = [ + u11_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"u11 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/usage/u12.py b/src/prefect/indicators/usage/u12.py new file mode 100644 index 00000000..b68a0634 --- /dev/null +++ b/src/prefect/indicators/usage/u12.py @@ -0,0 +1,189 @@ +"""QualiCharge prefect indicators: usage. + +U12: the number of POC in operation by power category. +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + POWER_RANGE_CTE, + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, + get_timespan_filter_query_params, +) + +NUM_POC_IN_OPERATION_FOR_LEVEL_QUERY_TEMPLATE = """ + WITH + $power_range, + statusf AS ( + SELECT + point_de_charge_id + FROM + status + WHERE + $timespan + GROUP BY + point_de_charge_id + ) + SELECT + count(*) AS value, + category, + $level_id AS level_id + FROM + statusf + INNER JOIN statique ON point_de_charge_id = pdc_id + --INNER JOIN PointDeCharge ON statusf.point_de_charge_id = PointDeCharge.id + --LEFT JOIN Station ON station_id = Station.id + --LEFT JOIN Localisation ON localisation_id = Localisation.id + LEFT JOIN City ON City.code = code_insee_commune + LEFT JOIN puissance ON puissance_nominale::numeric <@ category + $join_extras + WHERE + $level_id IN ($indexes) + GROUP BY + $level_id, + category + """ +QUERY_NATIONAL_TEMPLATE = """ + WITH + $power_range, + statusf AS ( + SELECT + point_de_charge_id + FROM + status + WHERE + $timespan + GROUP BY + point_de_charge_id + ) + SELECT + count(*) AS value, + category + FROM + statusf + INNER JOIN PointDeCharge ON statusf.point_de_charge_id = PointDeCharge.id + LEFT JOIN puissance ON puissance_nominale::numeric <@ category + GROUP BY + category + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, + level: Level, + timespan: IndicatorTimeSpan, + indexes: List[UUID], +) -> pd.DataFrame: + """Fetch sessions given input level, timestamp and target index.""" + query_template = Template(NUM_POC_IN_OPERATION_FOR_LEVEL_QUERY_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= POWER_RANGE_CTE + query_params |= get_num_for_level_query_params(level) + query_params |= get_timespan_filter_query_params(timespan, session=False) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u12-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def u12_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate u12 for a level and a timestamp.""" + if level == Level.NATIONAL: + return u12_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "u12", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": merged["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u12-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def u12_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate u12 at the national level.""" + engine = get_database_engine() + query_template = Template(QUERY_NATIONAL_TEMPLATE) + query_params = get_timespan_filter_query_params(timespan, session=False) + query_params |= POWER_RANGE_CTE + with engine.connect() as connection: + res = pd.read_sql_query(query_template.substitute(query_params), con=connection) + indicators = { + "target": None, + "value": res["value"].fillna(0), + "code": "u12", + "level": Level.NATIONAL, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": res["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-u12-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level] = [Level.NATIONAL, Level.REGION], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all u12 subflows.""" + subflows_results = [ + u12_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"u12 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/usage/u13.py b/src/prefect/indicators/usage/u13.py new file mode 100644 index 00000000..4a8f280a --- /dev/null +++ b/src/prefect/indicators/usage/u13.py @@ -0,0 +1,180 @@ +"""QualiCharge prefect indicators: usage. + +U13: cumulative power of POC in operation. +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + POWER_RANGE_CTE, + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, + get_timespan_filter_query_params, +) + +POWER_POC_IN_OPERATION_TEMPLATE = """ + WITH + statusf AS ( + SELECT + point_de_charge_id + FROM + status + WHERE + $timespan + GROUP BY + point_de_charge_id + ) + SELECT + sum(puissance_nominale) AS value, + $level_id AS level_id + FROM + statusf + INNER JOIN statique ON point_de_charge_id = pdc_id + --INNER JOIN PointDeCharge ON statusf.point_de_charge_id = PointDeCharge.id + --LEFT JOIN Station ON station_id = Station.id + --LEFT JOIN Localisation ON localisation_id = Localisation.id + LEFT JOIN City ON City.code = code_insee_commune + $join_extras + WHERE + $level_id IN ($indexes) + GROUP BY + $level_id + """ +QUERY_NATIONAL_TEMPLATE = """ + WITH + statusf AS ( + SELECT + point_de_charge_id + FROM + status + WHERE + $timespan + GROUP BY + point_de_charge_id + ) + SELECT + sum(puissance_nominale) AS value + FROM + statusf + INNER JOIN PointDeCharge ON statusf.point_de_charge_id = PointDeCharge.id + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, + level: Level, + timespan: IndicatorTimeSpan, + indexes: List[UUID], +) -> pd.DataFrame: + """Fetch sessions given input level, timestamp and target index.""" + query_template = Template(POWER_POC_IN_OPERATION_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= POWER_RANGE_CTE + query_params |= get_num_for_level_query_params(level) + query_params |= get_timespan_filter_query_params(timespan, session=False) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u13-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def u13_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate u13 for a level and a timestamp.""" + if level == Level.NATIONAL: + return u13_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "u13", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": None, + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u13-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def u13_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate u13 at the national level.""" + engine = get_database_engine() + query_template = Template(QUERY_NATIONAL_TEMPLATE) + query_params = get_timespan_filter_query_params(timespan, session=False) + query_params |= POWER_RANGE_CTE + with engine.connect() as connection: + res = pd.read_sql_query(query_template.substitute(query_params), con=connection) + indicators = { + "target": None, + "value": res["value"].fillna(0), + "code": "u13", + "level": Level.NATIONAL, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": None, + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-u13-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level] = [Level.NATIONAL, Level.REGION], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all u13 subflows.""" + subflows_results = [ + u13_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"u13 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/usage/u5.py b/src/prefect/indicators/usage/u5.py new file mode 100644 index 00000000..0b933b54 --- /dev/null +++ b/src/prefect/indicators/usage/u5.py @@ -0,0 +1,163 @@ +"""QualiCharge prefect indicators: usage. + +U5: Hourly distribution of sessions (number). +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, + get_timespan_filter_query_params, +) + +HOURLY_SESSIONS_QUERY_TEMPLATE = """ + SELECT + count(Session.id) AS value, + extract(HOUR FROM start) AS category, + $level_id AS level_id + FROM + Session + INNER JOIN statique ON point_de_charge_id = pdc_id + --INNER JOIN PointDeCharge ON point_de_charge_id = PointDeCharge.id + --LEFT JOIN Station ON station_id = Station.id + --LEFT JOIN Localisation ON localisation_id = Localisation.id + LEFT JOIN City ON City.code = code_insee_commune + $join_extras + WHERE + $timespan + AND $level_id IN ($indexes) + GROUP BY + category, + $level_id + """ + +QUERY_NATIONAL_TEMPLATE = """ + SELECT + count(Session.id) AS value, + extract(HOUR FROM start) AS category + FROM + SESSION + WHERE + $timespan + GROUP BY + category + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, + level: Level, + timespan: IndicatorTimeSpan, + indexes: List[UUID], +) -> pd.DataFrame: + """Fetch sessions given input level, timestamp and target index.""" + query_template = Template(HOURLY_SESSIONS_QUERY_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= get_num_for_level_query_params(level) + query_params |= get_timespan_filter_query_params(timespan, session=True) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u5-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def u5_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate u5 for a level and a timestamp.""" + if level == Level.NATIONAL: + return u5_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "u5", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": merged["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u5-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def u5_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate u5 at the national level.""" + engine = get_database_engine() + query_template = Template(QUERY_NATIONAL_TEMPLATE) + query_params = get_timespan_filter_query_params(timespan, session=True) + with engine.connect() as connection: + res = pd.read_sql_query(query_template.substitute(query_params), con=connection) + indicators = { + "target": None, + "value": res["value"].fillna(0), + "code": "u5", + "level": Level.NATIONAL, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": res["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-u5-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all u5 subflows.""" + subflows_results = [ + u5_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"u5 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/usage/u6.py b/src/prefect/indicators/usage/u6.py new file mode 100644 index 00000000..fb01efed --- /dev/null +++ b/src/prefect/indicators/usage/u6.py @@ -0,0 +1,193 @@ +"""QualiCharge prefect indicators: usage. + +U6: session duration by power category. +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + POWER_RANGE_CTE, + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, + get_timespan_filter_query_params, +) + +DURATION_FOR_LEVEL_QUERY_TEMPLATE = """ + WITH + $power_range, + sessionf AS ( + SELECT + point_de_charge_id, + sum(session.end -session.start) as duree_pdc + FROM + session + WHERE + $timespan + GROUP BY + point_de_charge_id + ) + SELECT + extract ('epoch' from sum(duree_pdc)) / 3600.0 AS value, + category, + $level_id AS level_id + FROM + sessionf + INNER JOIN statique ON point_de_charge_id = pdc_id + --INNER JOIN PointDeCharge ON sessionf.point_de_charge_id = PointDeCharge.id + --LEFT JOIN Station ON station_id = Station.id + --LEFT JOIN Localisation ON localisation_id = Localisation.id + LEFT JOIN City ON City.code = code_insee_commune + LEFT JOIN puissance ON puissance_nominale::numeric <@ category + $join_extras + WHERE + $level_id IN ($indexes) + GROUP BY + $level_id, + category + """ + +QUERY_NATIONAL_TEMPLATE = """ + WITH + $power_range, + sessionf AS ( + SELECT + point_de_charge_id, + sum(session.end -session.start) as duree_pdc + FROM + session + WHERE + $timespan + GROUP BY + point_de_charge_id + ) + SELECT + extract ('epoch' from sum(duree_pdc)) / 3600.0 AS value, + category + FROM + sessionf + INNER JOIN statique ON point_de_charge_id = pdc_id + --INNER JOIN PointDeCharge ON sessionf.point_de_charge_id = PointDeCharge.id + LEFT JOIN puissance ON puissance_nominale::numeric <@ category + GROUP BY + category + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, + level: Level, + timespan: IndicatorTimeSpan, + indexes: List[UUID], +) -> pd.DataFrame: + """Fetch sessions given input level, timestamp and target index.""" + query_template = Template(DURATION_FOR_LEVEL_QUERY_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= POWER_RANGE_CTE + query_params |= get_num_for_level_query_params(level) + query_params |= get_timespan_filter_query_params(timespan, session=True) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u6-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def u6_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate u6 for a level and a timestamp.""" + if level == Level.NATIONAL: + return u6_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "u6", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": merged["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u6-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def u6_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate u6 at the national level.""" + engine = get_database_engine() + query_template = Template(QUERY_NATIONAL_TEMPLATE) + query_params = get_timespan_filter_query_params(timespan, session=True) + query_params |= POWER_RANGE_CTE + with engine.connect() as connection: + res = pd.read_sql_query(query_template.substitute(query_params), con=connection) + indicators = { + "target": None, + "value": res["value"].fillna(0), + "code": "u6", + "level": Level.NATIONAL, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": res["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-u6-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all u6 subflows.""" + subflows_results = [ + u6_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"u6 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/usage/u9.py b/src/prefect/indicators/usage/u9.py new file mode 100644 index 00000000..bb087a8e --- /dev/null +++ b/src/prefect/indicators/usage/u9.py @@ -0,0 +1,193 @@ +"""QualiCharge prefect indicators: usage. + +U9: energy by power category. +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + POWER_RANGE_CTE, + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, + get_timespan_filter_query_params, +) + +ENERGY_FOR_LEVEL_QUERY_TEMPLATE = """ + WITH + $power_range, + sessionf AS ( + SELECT + point_de_charge_id, + sum(energy) as energy_pdc + FROM + session + WHERE + $timespan + GROUP BY + point_de_charge_id + ) + SELECT + sum(energy_pdc) AS value, + category, + $level_id AS level_id + FROM + sessionf + INNER JOIN statique ON point_de_charge_id = pdc_id + --INNER JOIN PointDeCharge ON sessionf.point_de_charge_id = PointDeCharge.id + --LEFT JOIN Station ON station_id = Station.id + --LEFT JOIN Localisation ON localisation_id = Localisation.id + LEFT JOIN City ON City.code = code_insee_commune + LEFT JOIN puissance ON puissance_nominale::numeric <@ category + $join_extras + WHERE + $level_id IN ($indexes) + GROUP BY + $level_id, + category + """ + +QUERY_NATIONAL_TEMPLATE = """ + WITH + $power_range, + sessionf AS ( + SELECT + point_de_charge_id, + sum(energy) as energy_pdc + FROM + session + WHERE + $timespan + GROUP BY + point_de_charge_id + ) + SELECT + sum(energy_pdc) AS value, + category + FROM + sessionf + INNER JOIN statique ON point_de_charge_id = pdc_id + --INNER JOIN PointDeCharge ON sessionf.point_de_charge_id = PointDeCharge.id + LEFT JOIN puissance ON puissance_nominale::numeric <@ category + GROUP BY + category + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, + level: Level, + timespan: IndicatorTimeSpan, + indexes: List[UUID], +) -> pd.DataFrame: + """Fetch sessions given input level, timestamp and target index.""" + query_template = Template(ENERGY_FOR_LEVEL_QUERY_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= POWER_RANGE_CTE + query_params |= get_num_for_level_query_params(level) + query_params |= get_timespan_filter_query_params(timespan, session=True) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u9-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def u9_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate u9 for a level and a timestamp.""" + if level == Level.NATIONAL: + return u9_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "u9", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": merged["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u9-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def u9_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate u9 at the national level.""" + engine = get_database_engine() + query_template = Template(QUERY_NATIONAL_TEMPLATE) + query_params = get_timespan_filter_query_params(timespan, session=True) + query_params |= POWER_RANGE_CTE + with engine.connect() as connection: + res = pd.read_sql_query(query_template.substitute(query_params), con=connection) + indicators = { + "target": None, + "value": res["value"].fillna(0), + "code": "u9", + "level": Level.NATIONAL, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": res["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-u9-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all u9 subflows.""" + subflows_results = [ + u9_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"u9 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/utils.py b/src/prefect/indicators/utils.py index bbcfb270..321ecd8f 100644 --- a/src/prefect/indicators/utils.py +++ b/src/prefect/indicators/utils.py @@ -3,27 +3,44 @@ Common indicators functions and constants. """ +from string import Template + import pandas as pd # type: ignore from prefect import task +from prefect.artifacts import create_markdown_artifact from sqlalchemy import create_engine from sqlalchemy.engine import Connection, Engine from .conf import settings -from .models import Level +from .models import Indicator, IndicatorTimeSpan, Level, PeriodDuration -POWER_RANGE_CTE = """ +POWER_RANGE_CTE = { + "power_range": """ puissance(category, p_cat) AS ( VALUES - (numrange(0, 15.0), 1), - (numrange(15.0, 26.0), 2), - (numrange(26, 65.0), 3), - (numrange(65, 175.0), 4), - (numrange(175, 360.0), 5), - (numrange(360, NULL), 6) + (numrange(0.0, 7.4), 1), + (numrange(7.4, 22.0), 2), + (numrange(22.0, 50.0), 3), + (numrange(50, 150.0), 4), + (numrange(150, 350.0), 5), + (numrange(350, NULL), 6) )""" +} + + +def get_timespan_filter_query_params(timespan: IndicatorTimeSpan, session: bool = True): + """Get timespan query parameters.""" + date_end = timespan.start + PeriodDuration[timespan.period.name].value + sql_start = "'" + timespan.start.isoformat(sep=" ") + "'" + sql_end = "'" + date_end.isoformat(sep=" ") + "'" + interval_session = "start >= timestamp $start AND start < timestamp $end" + interval_status = "horodatage >= timestamp $start AND horodatage < timestamp $end" + interval = interval_session if session else interval_status + query_params = {"start": sql_start, "end": sql_end} + return {"timespan": Template(interval).substitute(query_params)} -def get_num_for_level_query_params(level): +def get_num_for_level_query_params(level: Level): """Get level_id and join_extras query parameters.""" match level: case Level.CITY: @@ -64,3 +81,20 @@ def get_targets_for_level(connection: Connection, level: Level) -> pd.DataFrame: if level == Level.NATIONAL: raise NotImplementedError("Unsupported level %d", level) return pd.read_sql_table(level.name.lower(), con=connection) + + +def export_indic( + indicators: pd.DataFrame, + create_artifact: bool, + flow_name: str, + description: str, + format_pd: bool, +): + """Export indicators.""" + if format_pd: + return indicators + if create_artifact: + create_markdown_artifact( + key=flow_name, markdown=indicators.to_markdown(), description=description + ) + return [Indicator(**record) for record in indicators.to_dict(orient="records")] # type: ignore[misc] diff --git a/src/prefect/prefect.yaml b/src/prefect/prefect.yaml index b9495a19..0f041db5 100644 --- a/src/prefect/prefect.yaml +++ b/src/prefect/prefect.yaml @@ -17,6 +17,37 @@ deployments: work_pool: name: indicators work_queue_name: default + + - name: i4-daily + entrypoint: indicators/run.py:i4_calculate + concurrency_limit: 10 + schedules: + - cron: "2 14 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default + + - name: i7-daily + entrypoint: indicators/run.py:i7_calculate + concurrency_limit: 10 + schedules: + - cron: "4 14 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default + - name: t1-daily entrypoint: indicators/run.py:t1_calculate concurrency_limit: 10 @@ -31,3 +62,138 @@ deployments: work_pool: name: indicators work_queue_name: default + + - name: c1-daily + entrypoint: indicators/run.py:c1_calculate + concurrency_limit: 10 + schedules: + - cron: "0 15 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default + + - name: c2-daily + entrypoint: indicators/run.py:c2_calculate + concurrency_limit: 10 + schedules: + - cron: "0 15 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default + + - name: u5-daily + entrypoint: indicators/run.py:u5_calculate + concurrency_limit: 10 + schedules: + - cron: "4 15 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default + + - name: u6-daily + entrypoint: indicators/run.py:u6_calculate + concurrency_limit: 10 + schedules: + - cron: "5 15 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default + + - name: u9-daily + entrypoint: indicators/run.py:u9_calculate + concurrency_limit: 10 + schedules: + - cron: "6 15 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default + + - name: u10-daily + entrypoint: indicators/run.py:u10_calculate + concurrency_limit: 10 + schedules: + - cron: "8 15 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default + +- name: u11-daily + entrypoint: indicators/run.py:u11_calculate + concurrency_limit: 10 + schedules: + - cron: "10 15 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default + +- name: u12-daily + entrypoint: indicators/run.py:u12_calculate + concurrency_limit: 10 + schedules: + - cron: "12 15 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default + +- name: u13-daily + entrypoint: indicators/run.py:u13_calculate + concurrency_limit: 10 + schedules: + - cron: "14 15 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default diff --git a/src/prefect/tests/infrastructure/test_i1.py b/src/prefect/tests/infrastructure/test_i1.py index 265b9062..292eb086 100644 --- a/src/prefect/tests/infrastructure/test_i1.py +++ b/src/prefect/tests/infrastructure/test_i1.py @@ -3,67 +3,31 @@ I1: the number of publicly open points of charge. """ -import pandas as pd # type: ignore +from datetime import datetime + import pytest # type: ignore from sqlalchemy import text from indicators.infrastructure import i1 # type: ignore -from indicators.models import IndicatorPeriod, Level # type: ignore +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore -PARAMETERS_FLOW = [ - ( - Level.CITY, - "SELECT COUNT(*) FROM City", - ["75056", "13055", "69123"], - 212, - ), - ( - Level.EPCI, - "SELECT COUNT(*) FROM EPCI", - ["200054781", "200054807", "200046977"], - 2257, - ), - ( - Level.DEPARTMENT, - "SELECT COUNT(*) FROM Department", - ["59", "75", "13"], - 1493, - ), - ( - Level.REGION, - "SELECT COUNT(*) FROM Region", - ["11", "84", "75"], - 8734, - ), -] -PARAMETERS_GET_VALUES = [ - ( - Level.CITY, - "SELECT id FROM City WHERE name IN ('Paris', 'Marseille', 'Lyon')", - 212, - ), - ( - Level.EPCI, - "SELECT id FROM EPCI WHERE code IN ('200054781', '200054807', '200046977')", - 2257, - ), - ( - Level.DEPARTMENT, - "SELECT id FROM Department WHERE code IN ('59', '75', '13')", - 1493, - ), - ( - Level.REGION, - "SELECT id FROM Region WHERE code IN ('11', '84', '75')", - 8734, - ), -] -PARAMETERS_CHUNK = [10, 50, 100, 500] -PERIOD = IndicatorPeriod.DAY +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result for level [city, epci, dpt, reg] +N_LEVEL = [212, 2250, 1489, 8724] N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 +TIMESPAN = IndicatorTimeSpan(start=datetime.now(), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] -@pytest.mark.parametrize("level,query,expected", PARAMETERS_GET_VALUES) + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) def test_task_get_values_for_target(db_connection, level, query, expected): """Test the `get_values_for_target` task.""" result = db_connection.execute(text(query)) @@ -82,8 +46,7 @@ def test_task_get_values_for_target_unexpected_level(db_connection): @pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) def test_flow_i1_for_level(db_connection, level, query, targets, expected): """Test the `i1_for_level` flow.""" - now = pd.Timestamp.now() - indicators = i1.i1_for_level(level, PERIOD, now, chunk_size=1000) + indicators = i1.i1_for_level(level, TIMESPAN, chunk_size=1000) assert len(indicators) == db_connection.execute(text(query)).scalars().one() assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected @@ -91,9 +54,8 @@ def test_flow_i1_for_level(db_connection, level, query, targets, expected): @pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) def test_flow_i1_for_level_with_various_chunk_sizes(chunk_size): """Test the `i1_for_level` flow with various chunk sizes.""" - now = pd.Timestamp.now() level, query, targets, expected = PARAMETERS_FLOW[2] - indicators = i1.i1_for_level(level, PERIOD, now, chunk_size=chunk_size) + indicators = i1.i1_for_level(level, TIMESPAN, chunk_size=chunk_size) assert len(indicators) == N_DPTS assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected @@ -102,23 +64,47 @@ def test_flow_i1_national(db_connection): """Test the `i1_national` flow.""" result = db_connection.execute(text("SELECT COUNT(id) FROM PointDeCharge")) expected = result.scalars().one() - indicators = i1.i1_national(PERIOD, pd.Timestamp.now()) + indicators = i1.i1_national(TIMESPAN) assert indicators.at[0, "value"] == expected -def test_flow_calculate(db_connection): +def test_flow_i1_calculate(db_connection): """Test the `calculate` flow.""" - result = db_connection.execute( - text( - """ - SELECT - (SELECT COUNT(*) AS region_count FROM Region), - (SELECT COUNT(*) AS department_count FROM Department), - (SELECT COUNT(*) AS epci_count FROM EPCI), - (SELECT COUNT(*) AS city_count FROM City) - """ - ) - ) - expected = sum(result.one()) + 1 - indicators = i1.calculate(PERIOD, create_artifact=True) + expected = N_NAT_REG_DPT_EPCI_CITY + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = i1.calculate(TIMESPAN, all_levels, create_artifact=True) assert len(indicators) == expected + + +# query used to get N_LEVEL +N_LEVEL_NAT = """ +SELECT + count(*) AS value +FROM + PointDeCharge +""" +N_LEVEL_3 = """ +SELECT + sum(value) +FROM + ( + SELECT + COUNT(DISTINCT PointDeCharge.id_pdc_itinerance) AS value + FROM + PointDeCharge + INNER JOIN Station ON PointDeCharge.station_id = Station.id + INNER JOIN Localisation ON Station.localisation_id = Localisation.id + INNER JOIN City ON Localisation.code_insee_commune = City.code + INNER JOIN Department ON City.department_id = Department.id + INNER JOIN Region ON Department.region_id = Region.id + WHERE + region.code IN ('11', '84', '75') + GROUP BY region.code + ) AS query +""" diff --git a/src/prefect/tests/infrastructure/test_i4.py b/src/prefect/tests/infrastructure/test_i4.py new file mode 100644 index 00000000..22ba9217 --- /dev/null +++ b/src/prefect/tests/infrastructure/test_i4.py @@ -0,0 +1,82 @@ +"""QualiCharge prefect indicators tests: infrastructure. + +I4: the number of publicly open points of charge. +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.infrastructure import i4 # type: ignore +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore + +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result +N_LEVEL = [65, 1068, 419, 3786] +N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 + +TIMESPAN = IndicatorTimeSpan(start=datetime.now(), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + values = i4.get_values_for_targets.fn(db_connection, level, indexes) + assert len(values) == len(indexes) + assert values["value"].sum() == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + i4.get_values_for_targets.fn(db_connection, Level.NATIONAL, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_i4_for_level(db_connection, level, query, targets, expected): + """Test the `i4_for_level` flow.""" + indicators = i4.i4_for_level(level, TIMESPAN, chunk_size=1000) + assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_i4_for_level_with_various_chunk_sizes(chunk_size): + """Test the `i4_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = i4.i4_for_level(level, TIMESPAN, chunk_size=chunk_size) + assert len(indicators) == N_DPTS + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +def test_flow_i4_national(db_connection): + """Test the `i4_national` flow.""" + result = db_connection.execute(text("SELECT COUNT(id) FROM Station")) + expected = result.scalars().one() + indicators = i4.i4_national(TIMESPAN) + assert indicators.at[0, "value"] == expected + + +def test_flow_i4_calculate(db_connection): + """Test the `calculate` flow.""" + expected = N_NAT_REG_DPT_EPCI_CITY + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = i4.calculate(TIMESPAN, all_levels, create_artifact=True) + assert len(indicators) == expected diff --git a/src/prefect/tests/infrastructure/test_i7.py b/src/prefect/tests/infrastructure/test_i7.py new file mode 100644 index 00000000..938635df --- /dev/null +++ b/src/prefect/tests/infrastructure/test_i7.py @@ -0,0 +1,90 @@ +"""QualiCharge prefect indicators tests: infrastructure. + +I7: installed power. +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.infrastructure import i7 # type: ignore +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore + +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result +N_LEVEL = [18998, 137622, 132546, 664670] +N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 + +TIMESPAN = IndicatorTimeSpan(start=datetime.now(), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + values = i7.get_values_for_targets.fn(db_connection, level, indexes) + assert len(values) == len(indexes) + assert int(values["value"].sum()) == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + i7.get_values_for_targets.fn(db_connection, Level.NATIONAL, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_i7_for_level(db_connection, level, query, targets, expected): + """Test the `i7_for_level` flow.""" + indicators = i7.i7_for_level(level, TIMESPAN, chunk_size=1000) + assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_i7_for_level_with_various_chunk_sizes(chunk_size): + """Test the `i7_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = i7.i7_for_level(level, TIMESPAN, chunk_size=chunk_size) + assert len(indicators) == N_DPTS + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +def test_flow_i7_national(db_connection): + """Test the `i7_national` flow.""" + result = db_connection.execute( + text("SELECT sum(puissance_nominale) FROM pointdecharge") + ) + expected = int(result.scalars().one()) + indicators = i7.i7_national(TIMESPAN) + assert int(indicators.at[0, "value"]) == expected + + +def test_flow_i7_calculate(db_connection): + """Test the `calculate` flow.""" + expected = N_NAT_REG_DPT_EPCI_CITY + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = i7.calculate(TIMESPAN, all_levels, create_artifact=True) + assert len(indicators) == expected diff --git a/src/prefect/tests/infrastructure/test_t1.py b/src/prefect/tests/infrastructure/test_t1.py index f7910d4a..e64894cb 100644 --- a/src/prefect/tests/infrastructure/test_t1.py +++ b/src/prefect/tests/infrastructure/test_t1.py @@ -3,66 +3,31 @@ T1: the number of publicly open points of charge by power level. """ -import pandas as pd # type: ignore +from datetime import datetime + import pytest # type: ignore from sqlalchemy import text from indicators.infrastructure import t1 # type: ignore -from indicators.models import IndicatorPeriod, Level # type: ignore +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore -PARAMETERS_FLOW = [ - ( - Level.CITY, - "SELECT COUNT(*) FROM City", - ["75056", "13055", "69123"], - 212, - ), - ( - Level.EPCI, - "SELECT COUNT(*) FROM EPCI", - ["200054781", "200054807", "200046977"], - 2257, - ), - ( - Level.DEPARTMENT, - "SELECT COUNT(*) FROM Department", - ["59", "75", "13"], - 1493, - ), - ( - Level.REGION, - "SELECT COUNT(*) FROM Region", - ["11", "84", "75"], - 8734, - ), -] -PARAMETERS_GET_VALUES = [ - ( - Level.CITY, - "SELECT id FROM City WHERE name IN ('Paris', 'Marseille', 'Lyon')", - 212, - ), - ( - Level.EPCI, - "SELECT id FROM EPCI WHERE code IN ('200054781', '200054807', '200046977')", - 2257, - ), - ( - Level.DEPARTMENT, - "SELECT id FROM Department WHERE code IN ('59', '75', '13')", - 1493, - ), - ( - Level.REGION, - "SELECT id FROM Region WHERE code IN ('11', '84', '75')", - 8734, - ), -] -PARAMETERS_CHUNK = [10, 50, 100, 500] -PERIOD = IndicatorPeriod.DAY +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result +N_LEVEL = [212, 2250, 1489, 8724] +N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 +TIMESPAN = IndicatorTimeSpan(start=datetime.now(), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] -@pytest.mark.parametrize("level,query,expected", PARAMETERS_GET_VALUES) + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) def test_task_get_values_for_target(db_connection, level, query, expected): """Test the `get_values_for_target` task.""" result = db_connection.execute(text(query)) @@ -81,8 +46,7 @@ def test_task_get_values_for_target_unexpected_level(db_connection): @pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) def test_flow_t1_for_level(db_connection, level, query, targets, expected): """Test the `t1_for_level` flow.""" - now = pd.Timestamp.now() - indicators = t1.t1_for_level(level, PERIOD, now, chunk_size=1000) + indicators = t1.t1_for_level(level, TIMESPAN, chunk_size=1000) # assert len(indicators) == db_connection.execute(text(query)).scalars().one() assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected @@ -91,8 +55,7 @@ def test_flow_t1_for_level(db_connection, level, query, targets, expected): def test_flow_t1_for_level_with_various_chunk_sizes(chunk_size): """Test the `t1_for_level` flow with various chunk sizes.""" level, query, targets, expected = PARAMETERS_FLOW[2] - now = pd.Timestamp.now() - indicators = t1.t1_for_level(level, PERIOD, now, chunk_size=chunk_size) + indicators = t1.t1_for_level(level, TIMESPAN, chunk_size=chunk_size) assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected @@ -100,24 +63,21 @@ def test_flow_t1_national(db_connection): """Test the `t1_national` flow.""" query = "SELECT COUNT(*) FROM PointDeCharge WHERE puissance_nominale::numeric >= 0" expected = db_connection.execute(text(query)).scalars().one() - indicators = t1.t1_national(PERIOD, pd.Timestamp.now()) + indicators = t1.t1_national(TIMESPAN) assert indicators["value"].sum() == expected -def test_flow_calculate(db_connection): +def test_flow_t1_calculate(db_connection): """Test the `calculate` flow.""" - now = pd.Timestamp.now() - expected = sum( - [ - t1.t1_for_level(Level.CITY, PERIOD, now, chunk_size=1000)["value"].sum(), - t1.t1_for_level(Level.EPCI, PERIOD, now, chunk_size=1000)["value"].sum(), - t1.t1_for_level(Level.DEPARTMENT, PERIOD, now, chunk_size=1000)[ - "value" - ].sum(), - t1.t1_for_level(Level.REGION, PERIOD, now, chunk_size=1000)["value"].sum(), - t1.t1_national(PERIOD, now)["value"].sum(), - ] + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = t1.calculate( + TIMESPAN, all_levels, create_artifact=True, format_pd=True ) - indicators = t1.calculate(PERIOD, create_artifact=True) - pd_indics = pd.DataFrame.from_records([indic.model_dump() for indic in indicators]) - assert pd_indics["value"].sum() == expected + # assert indicators["value"].sum() == expected + assert list(indicators["level"].unique()) == all_levels diff --git a/src/prefect/tests/param_tests.py b/src/prefect/tests/param_tests.py new file mode 100644 index 00000000..dc6303c3 --- /dev/null +++ b/src/prefect/tests/param_tests.py @@ -0,0 +1,53 @@ +"""Common parameters for tests.""" + +from indicators.models import Level # type: ignore + +# number of departments +N_DPTS = 109 + +# cumulate number of national(1), regions, departments, EPCI, city +N_NAT_REG_DPT_EPCI_CITY = 36465 + +PARAMETERS_CHUNK = [10, 50, 100, 500] + +PARAM_FLOW = [ + ( + Level.CITY, + "SELECT COUNT(*) FROM City", + ["75056", "13055", "69123"], + ), + ( + Level.EPCI, + "SELECT COUNT(*) FROM EPCI", + ["200054781", "200054807", "200046977"], + ), + ( + Level.DEPARTMENT, + "SELECT COUNT(*) FROM Department", + ["59", "75", "13"], + ), + ( + Level.REGION, + "SELECT COUNT(*) FROM Region", + ["11", "84", "75"], + ), +] + +PARAM_VALUE = [ + ( + Level.CITY, + "SELECT id FROM City WHERE name IN ('Paris', 'Marseille', 'Lyon')", + ), + ( + Level.EPCI, + "SELECT id FROM EPCI WHERE code IN ('200054781', '200054807', '200046977')", + ), + ( + Level.DEPARTMENT, + "SELECT id FROM Department WHERE code IN ('59', '75', '13')", + ), + ( + Level.REGION, + "SELECT id FROM Region WHERE code IN ('11', '84', '75')", + ), +] diff --git a/src/prefect/tests/usage/__init__.py b/src/prefect/tests/usage/__init__.py new file mode 100644 index 00000000..b11ef4a6 --- /dev/null +++ b/src/prefect/tests/usage/__init__.py @@ -0,0 +1 @@ +"""QualiCharge prefect indicators tests: usage.""" diff --git a/src/prefect/tests/usage/test_c1.py b/src/prefect/tests/usage/test_c1.py new file mode 100644 index 00000000..0ee050b3 --- /dev/null +++ b/src/prefect/tests/usage/test_c1.py @@ -0,0 +1,106 @@ +"""QualiCharge prefect indicators tests: usage. + +C1: Number of sessions by operator. +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore +from indicators.usage import c1 # type: ignore + +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result for level [city, epci, dpt, reg] +N_LEVEL = [32, 307, 172, 1055] +N_LEVEL_NATIONAL = 2718 + +TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + values = c1.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) + assert len(set(values["level_id"])) == len(indexes) + assert values["value"].sum() == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + c1.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_c1_for_level(db_connection, level, query, targets, expected): + """Test the `c1_for_level` flow.""" + indicators = c1.c1_for_level(level, TIMESPAN, chunk_size=1000) + # assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_c1_for_level_with_various_chunk_sizes(chunk_size): + """Test the `c1_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = c1.c1_for_level(level, TIMESPAN, chunk_size=chunk_size) + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +def test_flow_c1_national(db_connection): + """Test the `c1_national` flow.""" + indicators = c1.c1_national(TIMESPAN) + assert indicators["value"].sum() == N_LEVEL_NATIONAL + + +def test_flow_c1_calculate(db_connection): + """Test the `calculate` flow.""" + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = c1.calculate( + TIMESPAN, all_levels, create_artifact=True, format_pd=True + ) + assert list(indicators["level"].unique()) == all_levels + + +# query used to get N_LEVEL +N_LEVEL_NAT = """ + SELECT + count(*) AS value + FROM + SESSION + INNER JOIN statique ON point_de_charge_id = pdc_id + WHERE + START >= timestamp '2024-12-24' + AND START < timestamp '2024-12-25' +""" +N_LEVEL_3 = """ + SELECT + count(*) AS value + FROM + Session + INNER JOIN statique ON point_de_charge_id = pdc_id + LEFT JOIN City ON City.code = code_insee_commune + INNER JOIN Department ON City.department_id = Department.id + INNER JOIN Region ON Department.region_id = Region.id + WHERE + START >= timestamp '2024-12-24' + AND START < timestamp '2024-12-25' + AND region.code IN ('11', '84', '75') +""" diff --git a/src/prefect/tests/usage/test_c2.py b/src/prefect/tests/usage/test_c2.py new file mode 100644 index 00000000..09cd8cc0 --- /dev/null +++ b/src/prefect/tests/usage/test_c2.py @@ -0,0 +1,112 @@ +"""QualiCharge prefect indicators tests: usage. + +C2: Energy cumulate by operator. +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore +from indicators.usage import c2 # type: ignore + +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result for level [city, epci, dpt, reg] +N_LEVEL = [1, 10, 5, 34] +N_LEVEL_NATIONAL = 88 + +TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + values = c2.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) + assert len(set(values["level_id"])) == len(indexes) + assert int(values["value"].sum()) == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + c2.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_c2_for_level(db_connection, level, query, targets, expected): + """Test the `c2_for_level` flow.""" + indicators = c2.c2_for_level(level, TIMESPAN, chunk_size=1000) + # assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_c2_for_level_with_various_chunk_sizes(chunk_size): + """Test the `c2_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = c2.c2_for_level(level, TIMESPAN, chunk_size=chunk_size) + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +def test_flow_c2_national(db_connection): + """Test the `c2_national` flow.""" + indicators = c2.c2_national(TIMESPAN) + assert int(indicators["value"].sum()) == N_LEVEL_NATIONAL + + +def test_flow_c2_calculate(db_connection): + """Test the `calculate` flow.""" + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = c2.calculate( + TIMESPAN, all_levels, create_artifact=True, format_pd=True + ) + assert list(indicators["level"].unique()) == all_levels + + +# query used to get N_LEVEL +N_LEVEL_NAT = """ + SELECT + sum(energy) / 1000.0 AS value + FROM + SESSION + INNER JOIN statique ON point_de_charge_id = pdc_id + WHERE + START >= timestamp '2024-12-24' + AND START < timestamp '2024-12-25' +""" +N_LEVEL_3 = """ + SELECT + sum(energy) / 1000.0 AS value + FROM + Session + INNER JOIN statique ON point_de_charge_id = pdc_id + LEFT JOIN City ON City.code = code_insee_commune + INNER JOIN Department ON City.department_id = Department.id + INNER JOIN Region ON Department.region_id = Region.id + WHERE + START >= timestamp '2024-12-24' + AND START < timestamp '2024-12-25' + AND region.code IN ('11', '84', '75') +""" diff --git a/src/prefect/tests/usage/test_u10.py b/src/prefect/tests/usage/test_u10.py new file mode 100644 index 00000000..f2c44cc2 --- /dev/null +++ b/src/prefect/tests/usage/test_u10.py @@ -0,0 +1,81 @@ +"""QualiCharge prefect indicators tests: usage. + +U10: the number of sessions. +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore +from indicators.usage import u10 # type: ignore + +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result for level [city, epci, dpt, reg] +N_LEVEL = [32, 307, 172, 1055] +N_LEVEL_NATIONAL = 2718 +N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 + +TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + values = u10.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) + assert len(values) == len(indexes) + assert values["value"].sum() == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + u10.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_u10_for_level(db_connection, level, query, targets, expected): + """Test the `u10_for_level` flow.""" + indicators = u10.u10_for_level(level, TIMESPAN, chunk_size=1000) + assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_u10_for_level_with_various_chunk_sizes(chunk_size): + """Test the `u10_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = u10.u10_for_level(level, TIMESPAN, chunk_size=chunk_size) + assert len(indicators) == N_DPTS + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +def test_flow_u10_national(db_connection): + """Test the `u10_national` flow.""" + indicators = u10.u10_national(TIMESPAN) + assert indicators.at[0, "value"] == N_LEVEL_NATIONAL + + +def test_flow_u10_calculate(db_connection): + """Test the `calculate` flow.""" + expected = N_NAT_REG_DPT_EPCI_CITY + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = u10.calculate(TIMESPAN, all_levels, create_artifact=True) + assert len(indicators) == expected diff --git a/src/prefect/tests/usage/test_u11.py b/src/prefect/tests/usage/test_u11.py new file mode 100644 index 00000000..a9d64ce0 --- /dev/null +++ b/src/prefect/tests/usage/test_u11.py @@ -0,0 +1,112 @@ +"""QualiCharge prefect indicators tests: usage. + +U11: the number of successful sessions. +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore +from indicators.usage import u11 # type: ignore + +from ..param_tests import ( + N_DPTS, + N_NAT_REG_DPT_EPCI_CITY, + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result for level [city, epci, dpt, reg] +N_LEVEL = [32, 301, 166, 1031] +N_LEVEL_NATIONAL = 2639 + +TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + values = u11.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) + assert len(values) == len(indexes) + assert values["value"].sum() == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + u11.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_u11_for_level(db_connection, level, query, targets, expected): + """Test the `u11_for_level` flow.""" + indicators = u11.u11_for_level(level, TIMESPAN, chunk_size=1000) + assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_u11_for_level_with_various_chunk_sizes(chunk_size): + """Test the `u11_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = u11.u11_for_level(level, TIMESPAN, chunk_size=chunk_size) + assert len(indicators) == N_DPTS + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +def test_flow_u11_national(db_connection): + """Test the `u11_national` flow.""" + indicators = u11.u11_national(TIMESPAN) + assert indicators.at[0, "value"] == N_LEVEL_NATIONAL + + +def test_flow_u11_calculate(db_connection): + """Test the `calculate` flow.""" + expected = N_NAT_REG_DPT_EPCI_CITY + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = u11.calculate(TIMESPAN, all_levels, create_artifact=True) + assert len(indicators) == expected + + +# query used to get N_LEVEL +N_LEVEL_NAT = """ +SELECT + count(*) AS VALUE +FROM + SESSION +WHERE + START >= timestamp '2024-12-24' + AND START < timestamp '2024-12-25' + AND energy > 0.5 + AND SESSION.end - SESSION.start > '3 minutes'::interval +""" +N_LEVEL_3 = """ +SELECT + count(*) AS VALUE +FROM + SESSION + INNER JOIN PointDeCharge ON point_de_charge_id = PointDeCharge.id + LEFT JOIN Station ON station_id = Station.id + LEFT JOIN Localisation ON localisation_id = Localisation.id + LEFT JOIN City ON City.code = code_insee_commune + INNER JOIN Department ON City.department_id = Department.id + INNER JOIN Region ON Department.region_id = Region.id +WHERE + START >= timestamp '2024-12-24' + AND START < timestamp '2024-12-25' + AND energy > 0.5 + AND SESSION.end - SESSION.start > '3 minutes'::interval + AND region.code IN ('11', '84', '75')""" diff --git a/src/prefect/tests/usage/test_u12.py b/src/prefect/tests/usage/test_u12.py new file mode 100644 index 00000000..8c540c9f --- /dev/null +++ b/src/prefect/tests/usage/test_u12.py @@ -0,0 +1,85 @@ +"""QualiCharge prefect indicators tests: usage. + +U12: the number of POC in operation. +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore +from indicators.usage import u12 # type: ignore + +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result for level [city, epci, dpt, reg] +N_LEVEL = [36, 386, 261, 1462] +N_LEVEL_NATIONAL = 3851 + +TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + poc_by_power = u12.get_values_for_targets.fn( + db_connection, level, TIMESPAN, indexes + ) + assert len(set(poc_by_power["level_id"])) == len(indexes) + assert poc_by_power["value"].sum() == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + u12.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_u12_for_level(db_connection, level, query, targets, expected): + """Test the `u12_for_level` flow.""" + indicators = u12.u12_for_level(level, TIMESPAN, chunk_size=1000) + # assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_u12_for_level_with_various_chunk_sizes(chunk_size): + """Test the `u12_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = u12.u12_for_level(level, TIMESPAN, chunk_size=chunk_size) + # assert len(indicators) == N_DPTS + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +def test_flow_u12_national(db_connection): + """Test the `u12_national` flow.""" + indicators = u12.u12_national(TIMESPAN) + assert indicators["value"].sum() == N_LEVEL_NATIONAL + + +def test_flow_u12_calculate(db_connection): + """Test the `calculate` flow.""" + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = u12.calculate( + TIMESPAN, all_levels, create_artifact=True, format_pd=True + ) + assert list(indicators["level"].unique()) == all_levels + + +# query used to get N_LEVEL diff --git a/src/prefect/tests/usage/test_u13.py b/src/prefect/tests/usage/test_u13.py new file mode 100644 index 00000000..e7397e65 --- /dev/null +++ b/src/prefect/tests/usage/test_u13.py @@ -0,0 +1,86 @@ +"""QualiCharge prefect indicators tests: usage. + +U13: cumulative power of POC in operation. +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore +from indicators.usage import u13 # type: ignore + +from ..param_tests import ( + N_NAT_REG_DPT_EPCI_CITY, + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result for level [city, epci, dpt, reg] +N_LEVEL = [3079, 25870, 25793, 114966] +N_LEVEL_NATIONAL = 326782 + +TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + values = u13.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) + assert len(values) == len(indexes) + assert int(values["value"].sum()) == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + u13.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_u13_for_level(db_connection, level, query, targets, expected): + """Test the `u13_for_level` flow.""" + indicators = u13.u13_for_level(level, TIMESPAN, chunk_size=1000) + # assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_u13_for_level_with_various_chunk_sizes(chunk_size): + """Test the `u13_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = u13.u13_for_level(level, TIMESPAN, chunk_size=chunk_size) + # assert len(indicators) == N_DPTS + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +def test_flow_u13_national(db_connection): + """Test the `u13_national` flow.""" + indicators = u13.u13_national(TIMESPAN) + assert int(indicators["value"].sum()) == N_LEVEL_NATIONAL + + +def test_flow_u13_calculate(db_connection): + """Test the `calculate` flow.""" + expected = N_NAT_REG_DPT_EPCI_CITY + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = u13.calculate(TIMESPAN, all_levels, create_artifact=True) + assert len(indicators) == expected diff --git a/src/prefect/tests/usage/test_u5.py b/src/prefect/tests/usage/test_u5.py new file mode 100644 index 00000000..00b67dc0 --- /dev/null +++ b/src/prefect/tests/usage/test_u5.py @@ -0,0 +1,88 @@ +"""QualiCharge prefect indicators tests: usage. + +U5: Hourly distribution of sessions (number). +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore +from indicators.usage import u5 # type: ignore + +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result for level [city, epci, dpt, reg] +N_LEVEL = [32, 307, 172, 1055] +N_LEVEL_NATIONAL = 2718 + +TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + ses_by_hour = u5.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) + assert len(set(ses_by_hour["level_id"])) == len(indexes) + assert ses_by_hour["value"].sum() == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + u5.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_u5_for_level(db_connection, level, query, targets, expected): + """Test the `u5_for_level` flow.""" + indicators = u5.u5_for_level(level, TIMESPAN, chunk_size=1000) + # assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_u5_for_level_with_various_chunk_sizes(chunk_size): + """Test the `u5_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = u5.u5_for_level(level, TIMESPAN, chunk_size=chunk_size) + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +def test_flow_u5_national(db_connection): + """Test the `u5_national` flow.""" + indicators = u5.u5_national(TIMESPAN) + assert indicators["value"].sum() == N_LEVEL_NATIONAL + + +def test_flow_u5_calculate(db_connection): + """Test the `calculate` flow.""" + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = u5.calculate( + TIMESPAN, all_levels, create_artifact=True, format_pd=True + ) + assert list(indicators["level"].unique()) == all_levels + + +# query used to get N_LEVEL +N_LEVEL_NAT = """ + +""" +N_LEVEL_3 = """ + +""" diff --git a/src/prefect/tests/usage/test_u6.py b/src/prefect/tests/usage/test_u6.py new file mode 100644 index 00000000..79222ad6 --- /dev/null +++ b/src/prefect/tests/usage/test_u6.py @@ -0,0 +1,149 @@ +"""QualiCharge prefect indicators tests: usage. + +U6: session duration by power category. +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore +from indicators.usage import u6 # type: ignore + +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result for level [city, epci, dpt, reg] +N_LEVEL = [18, 167, 85, 584] +N_LEVEL_NATIONAL = 1509 + +TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + values = u6.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) + assert len(set(values["level_id"])) == len(indexes) + assert int(values["value"].sum()) == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + u6.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_u6_for_level(db_connection, level, query, targets, expected): + """Test the `u6_for_level` flow.""" + indicators = u6.u6_for_level(level, TIMESPAN, chunk_size=1000) + # assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_u6_for_level_with_various_chunk_sizes(chunk_size): + """Test the `u6_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = u6.u6_for_level(level, TIMESPAN, chunk_size=chunk_size) + # assert len(indicators) == N_DPTS + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +def test_flow_u6_national(db_connection): + """Test the `u6_national` flow.""" + indicators = u6.u6_national(TIMESPAN) + assert int(indicators["value"].sum()) == N_LEVEL_NATIONAL + + +def test_flow_u6_calculate(db_connection): + """Test the `calculate` flow.""" + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = u6.calculate( + TIMESPAN, all_levels, create_artifact=True, format_pd=True + ) + assert list(indicators["level"].unique()) == all_levels + + +# query used to get N_LEVEL +N_LEVEL_NAT = """ +WITH + sessionf AS ( + SELECT + point_de_charge_id, + sum(SESSION.end - SESSION.start) AS duree_pdc + FROM + SESSION + WHERE + START >= date '2024-12-24' + AND START < date '2024-12-25' + GROUP BY + point_de_charge_id + ) +SELECT + extract( + 'epoch' + FROM + sum(duree_pdc) + ) / 3600.0 AS duree +FROM + sessionf + INNER JOIN PointDeCharge ON sessionf.point_de_charge_id = PointDeCharge.id + LEFT JOIN station ON station_id = station.id + LEFT JOIN localisation ON localisation_id = localisation.id + LEFT JOIN city ON city.code = code_insee_commune + LEFT JOIN department ON city.department_id = department.id + LEFT JOIN region ON department.region_id = region.id +""" +N_LEVEL_3 = """ +WITH + sessionf AS ( + SELECT + point_de_charge_id, + sum(SESSION.end - SESSION.start) AS duree_pdc + FROM + SESSION + WHERE + START >= date '2024-12-24' + AND START < date '2024-12-25' + GROUP BY + point_de_charge_id + ) +SELECT + extract( + 'epoch' + FROM + sum(duree_pdc) + ) / 3600.0 AS duree +FROM + sessionf + INNER JOIN PointDeCharge ON sessionf.point_de_charge_id = PointDeCharge.id + LEFT JOIN station ON station_id = station.id + LEFT JOIN localisation ON localisation_id = localisation.id + LEFT JOIN city ON city.code = code_insee_commune + LEFT JOIN department ON city.department_id = department.id + LEFT JOIN region ON department.region_id = region.id +WHERE + region.code IN ('11', '84', '75') +""" diff --git a/src/prefect/tests/usage/test_u9.py b/src/prefect/tests/usage/test_u9.py new file mode 100644 index 00000000..07f4ce6a --- /dev/null +++ b/src/prefect/tests/usage/test_u9.py @@ -0,0 +1,89 @@ +"""QualiCharge prefect indicators tests: usage. + +U9: energy by power level. +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore +from indicators.usage import u9 # type: ignore + +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result for level [city, epci, dpt, reg] +N_LEVEL = [1150, 10235, 5489, 34639] +N_LEVEL_NATIONAL = 88135 + +TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + values = u9.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) + assert len(set(values["level_id"])) == len(indexes) + assert int(values["value"].sum()) == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + u9.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_u9_for_level(db_connection, level, query, targets, expected): + """Test the `u9_for_level` flow.""" + indicators = u9.u9_for_level(level, TIMESPAN, chunk_size=1000) + # assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_u9_for_level_with_various_chunk_sizes(chunk_size): + """Test the `u9_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = u9.u9_for_level(level, TIMESPAN, chunk_size=chunk_size) + # assert len(indicators) == N_DPTS + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +def test_flow_u9_national(db_connection): + """Test the `u9_national` flow.""" + indicators = u9.u9_national(TIMESPAN) + assert int(indicators["value"].sum()) == N_LEVEL_NATIONAL + + +def test_flow_u9_calculate(db_connection): + """Test the `calculate` flow.""" + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = u9.calculate( + TIMESPAN, all_levels, create_artifact=True, format_pd=True + ) + assert list(indicators["level"].unique()) == all_levels + + +# query used to get N_LEVEL