From 7ebd34b58cf7d37c48a653e1f4cea5d6d26ffc74 Mon Sep 17 00:00:00 2001 From: philippe thomy Date: Wed, 29 Jan 2025 16:41:35 +0100 Subject: [PATCH] add u5 --- src/prefect/CHANGELOG.md | 2 + src/prefect/indicators/infrastructure/t1.py | 1 - src/prefect/indicators/usage/u13.py | 179 ++++++++++++++++++ src/prefect/indicators/usage/u5.py | 162 ++++++++++++++++ src/prefect/prefect.yaml | 14 ++ src/prefect/tests/infrastructure/_test_i1.py | 15 +- src/prefect/tests/infrastructure/_test_i4.py | 15 +- src/prefect/tests/infrastructure/_test_i7.py | 15 +- src/prefect/tests/infrastructure/_test_t1.py | 2 + src/prefect/tests/param_tests.py | 53 ++++++ src/prefect/tests/usage/_test_u10.py | 15 +- .../tests/usage/{test_u11.py => _test_u11.py} | 15 +- src/prefect/tests/usage/_test_u12.py | 23 ++- src/prefect/tests/usage/_test_u13.py | 127 +++++++++++++ src/prefect/tests/usage/_test_u9.py | 2 + src/prefect/tests/usage/test_u5.py | 97 ++++++++++ 16 files changed, 672 insertions(+), 65 deletions(-) create mode 100644 src/prefect/indicators/usage/u13.py create mode 100644 src/prefect/indicators/usage/u5.py create mode 100644 src/prefect/tests/param_tests.py rename src/prefect/tests/usage/{test_u11.py => _test_u11.py} (91%) create mode 100644 src/prefect/tests/usage/_test_u13.py create mode 100644 src/prefect/tests/usage/test_u5.py diff --git a/src/prefect/CHANGELOG.md b/src/prefect/CHANGELOG.md index 89b03b1e..0365c7b3 100644 --- a/src/prefect/CHANGELOG.md +++ b/src/prefect/CHANGELOG.md @@ -12,9 +12,11 @@ and this project adheres to - Implement i4 workflow - Implement i7 workflow +- Implement u5 workflow - Implement u9 workflow - Implement u10 workflow - Implement u11 workflow - Implement u12 workflow +- Implement u13 workflow [unreleased]: https://github.com/MTES-MCT/qualicharge/ diff --git a/src/prefect/indicators/infrastructure/t1.py b/src/prefect/indicators/infrastructure/t1.py index 0ca37ec3..e469f7bc 100644 --- a/src/prefect/indicators/infrastructure/t1.py +++ b/src/prefect/indicators/infrastructure/t1.py @@ -129,7 +129,6 @@ def t1_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: query_template = Template(QUERY_NATIONAL_TEMPLATE) query_params = POWER_RANGE_CTE with engine.connect() as connection: - # result = pd.read_sql_query(QUERY_NATIONAL, con=connection) res = pd.read_sql_query(query_template.substitute(query_params), con=connection) indicators = { "target": None, diff --git a/src/prefect/indicators/usage/u13.py b/src/prefect/indicators/usage/u13.py new file mode 100644 index 00000000..f2134258 --- /dev/null +++ b/src/prefect/indicators/usage/u13.py @@ -0,0 +1,179 @@ +"""QualiCharge prefect indicators: usage. + +U13: cumulative power of POC in operation. +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + POWER_RANGE_CTE, + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, + get_timespan_filter_query_params, +) + +POWER_POC_IN_OPERATION_TEMPLATE = """ + WITH + statusf AS ( + SELECT + point_de_charge_id + FROM + status + WHERE + $timespan + GROUP BY + point_de_charge_id + ) + SELECT + sum(puissance_nominale) AS value, + $level_id AS level_id + FROM + statusf + INNER JOIN PointDeCharge ON statusf.point_de_charge_id = PointDeCharge.id + LEFT JOIN Station ON station_id = Station.id + LEFT JOIN Localisation ON localisation_id = Localisation.id + LEFT JOIN City ON City.code = code_insee_commune + $join_extras + WHERE + $level_id IN ($indexes) + GROUP BY + $level_id + """ +QUERY_NATIONAL_TEMPLATE = """ + WITH + statusf AS ( + SELECT + point_de_charge_id + FROM + status + WHERE + $timespan + GROUP BY + point_de_charge_id + ) + SELECT + sum(puissance_nominale) AS value + FROM + statusf + INNER JOIN PointDeCharge ON statusf.point_de_charge_id = PointDeCharge.id + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, + level: Level, + timespan: IndicatorTimeSpan, + indexes: List[UUID], +) -> pd.DataFrame: + """Fetch sessions given input level, timestamp and target index.""" + query_template = Template(POWER_POC_IN_OPERATION_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= POWER_RANGE_CTE + query_params |= get_num_for_level_query_params(level) + query_params |= get_timespan_filter_query_params(timespan, session=False) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u13-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def u13_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate u13 for a level and a timestamp.""" + if level == Level.NATIONAL: + return u13_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "u13", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": None, + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u13-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def u13_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate u13 at the national level.""" + engine = get_database_engine() + query_template = Template(QUERY_NATIONAL_TEMPLATE) + query_params = get_timespan_filter_query_params(timespan, session=False) + query_params |= POWER_RANGE_CTE + with engine.connect() as connection: + res = pd.read_sql_query(query_template.substitute(query_params), con=connection) + indicators = { + "target": None, + "value": res["value"].fillna(0), + "code": "u13", + "level": Level.NATIONAL, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": None, + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-u13-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level] = [Level.NATIONAL, Level.REGION], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all u13 subflows.""" + subflows_results = [ + u13_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"u13 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/indicators/usage/u5.py b/src/prefect/indicators/usage/u5.py new file mode 100644 index 00000000..471ed35c --- /dev/null +++ b/src/prefect/indicators/usage/u5.py @@ -0,0 +1,162 @@ +"""QualiCharge prefect indicators: usage. + +U5: Hourly distribution of sessions (number). +""" + +from string import Template +from typing import List +from uuid import UUID + +import numpy as np +import pandas as pd # type: ignore +from prefect import flow, runtime, task +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner +from sqlalchemy.engine import Connection + +from ..conf import settings +from ..models import Indicator, IndicatorTimeSpan, Level +from ..utils import ( + export_indic, + get_database_engine, + get_num_for_level_query_params, + get_targets_for_level, + get_timespan_filter_query_params, +) + +HOURLY_SESSIONS_QUERY_TEMPLATE = """ + SELECT + count(Session.id) AS value, + extract(HOUR FROM start) AS category, + $level_id AS level_id + FROM + Session + INNER JOIN PointDeCharge ON point_de_charge_id = PointDeCharge.id + LEFT JOIN Station ON station_id = Station.id + LEFT JOIN Localisation ON localisation_id = Localisation.id + LEFT JOIN City ON City.code = code_insee_commune + $join_extras + WHERE + $timespan + AND $level_id IN ($indexes) + GROUP BY + category, + $level_id + """ + +QUERY_NATIONAL_TEMPLATE = """ + SELECT + count(Session.id) AS value, + extract(HOUR FROM start) AS category + FROM + SESSION + WHERE + $timespan + GROUP BY + category + """ + + +@task(task_run_name="values-for-target-{level:02d}") +def get_values_for_targets( + connection: Connection, + level: Level, + timespan: IndicatorTimeSpan, + indexes: List[UUID], +) -> pd.DataFrame: + """Fetch sessions given input level, timestamp and target index.""" + query_template = Template(HOURLY_SESSIONS_QUERY_TEMPLATE) + query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))} + query_params |= get_num_for_level_query_params(level) + query_params |= get_timespan_filter_query_params(timespan, session=True) + return pd.read_sql_query(query_template.substitute(query_params), con=connection) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u5-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}", +) +def u5_for_level( + level: Level, + timespan: IndicatorTimeSpan, + chunk_size=settings.DEFAULT_CHUNK_SIZE, +) -> pd.DataFrame: + """Calculate u5 for a level and a timestamp.""" + if level == Level.NATIONAL: + return u5_national(timespan) + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + ids = targets["id"] + chunks = ( + np.array_split(ids, int(len(ids) / chunk_size)) + if len(ids) > chunk_size + else [ids.to_numpy()] + ) + futures = [ + get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload] + for chunk in chunks + ] + wait(futures) + + # Concatenate results and serialize indicators + results = pd.concat([future.result() for future in futures], ignore_index=True) + merged = targets.merge(results, how="left", left_on="id", right_on="level_id") + + # Build result DataFrame + indicators = { + "target": merged["code"], + "value": merged["value"].fillna(0), + "code": "u5", + "level": level, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": merged["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="u5-{timespan.period.value}-00-{timespan.start:%y-%m-%d}", +) +def u5_national(timespan: IndicatorTimeSpan) -> pd.DataFrame: + """Calculate u5 at the national level.""" + engine = get_database_engine() + query_template = Template(QUERY_NATIONAL_TEMPLATE) + query_params = get_timespan_filter_query_params(timespan, session=True) + with engine.connect() as connection: + res = pd.read_sql_query(query_template.substitute(query_params), con=connection) + indicators = { + "target": None, + "value": res["value"].fillna(0), + "code": "u5", + "level": Level.NATIONAL, + "period": timespan.period, + "timestamp": timespan.start.isoformat(), + "category": res["category"].astype("str"), + "extras": None, + } + return pd.DataFrame(indicators) + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS), + flow_run_name="meta-u5-{timespan.period.value}", +) +def calculate( + timespan: IndicatorTimeSpan, + levels: List[Level], + create_artifact: bool = False, + chunk_size: int = 1000, + format_pd: bool = False, +) -> List[Indicator]: + """Run all u5 subflows.""" + subflows_results = [ + u5_for_level(level, timespan, chunk_size=chunk_size) for level in levels + ] + indicators = pd.concat(subflows_results, ignore_index=True) + description = f"u5 report at {timespan.start} (period: {timespan.period.value})" + flow_name = runtime.flow_run.name + return export_indic(indicators, create_artifact, flow_name, description, format_pd) diff --git a/src/prefect/prefect.yaml b/src/prefect/prefect.yaml index 1ad2263e..6999816c 100644 --- a/src/prefect/prefect.yaml +++ b/src/prefect/prefect.yaml @@ -59,6 +59,20 @@ deployments: work_pool: name: indicators work_queue_name: default + - name: u5-daily + entrypoint: indicators/run.py:u9_calculate + concurrency_limit: 10 + schedules: + - cron: "4 15 * * *" + timezone: "Europe/Paris" + active: true + parameters: + period: d + create_artifact: true + chunk_size: 1000 + work_pool: + name: indicators + work_queue_name: default - name: u9-daily entrypoint: indicators/run.py:u9_calculate concurrency_limit: 10 diff --git a/src/prefect/tests/infrastructure/_test_i1.py b/src/prefect/tests/infrastructure/_test_i1.py index 6b0a8517..4a0cf206 100644 --- a/src/prefect/tests/infrastructure/_test_i1.py +++ b/src/prefect/tests/infrastructure/_test_i1.py @@ -15,6 +15,8 @@ # expected result for level [city, epci, dpt, reg] N_LEVEL = [212, 2250, 1489, 8724] N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 + TIMESPAN = IndicatorTimeSpan(start=datetime.now(), period=IndicatorPeriod.DAY) PARAMETERS_CHUNK = [10, 50, 100, 500] @@ -111,18 +113,7 @@ def test_flow_i1_national(db_connection): def test_flow_i1_calculate(db_connection): """Test the `calculate` flow.""" - result = db_connection.execute( - text( - """ - SELECT - (SELECT COUNT(*) AS region_count FROM Region), - (SELECT COUNT(*) AS department_count FROM Department), - (SELECT COUNT(*) AS epci_count FROM EPCI), - (SELECT COUNT(*) AS city_count FROM City) - """ - ) - ) - expected = sum(result.one()) + 1 + expected = N_NAT_REG_DPT_EPCI_CITY all_levels = [ Level.NATIONAL, Level.REGION, diff --git a/src/prefect/tests/infrastructure/_test_i4.py b/src/prefect/tests/infrastructure/_test_i4.py index 2194d5f6..8abd5675 100644 --- a/src/prefect/tests/infrastructure/_test_i4.py +++ b/src/prefect/tests/infrastructure/_test_i4.py @@ -15,6 +15,8 @@ # expected result N_LEVEL = [65, 1068, 419, 3786] N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 + TIMESPAN = IndicatorTimeSpan(start=datetime.now(), period=IndicatorPeriod.DAY) PARAMETERS_CHUNK = [10, 50, 100, 500] @@ -111,18 +113,7 @@ def test_flow_i4_national(db_connection): def test_flow_i4_calculate(db_connection): """Test the `calculate` flow.""" - result = db_connection.execute( - text( - """ - SELECT - (SELECT COUNT(*) AS region_count FROM Region), - (SELECT COUNT(*) AS department_count FROM Department), - (SELECT COUNT(*) AS epci_count FROM EPCI), - (SELECT COUNT(*) AS city_count FROM City) - """ - ) - ) - expected = sum(result.one()) + 1 + expected = N_NAT_REG_DPT_EPCI_CITY all_levels = [ Level.NATIONAL, Level.REGION, diff --git a/src/prefect/tests/infrastructure/_test_i7.py b/src/prefect/tests/infrastructure/_test_i7.py index b7938ed9..96930059 100644 --- a/src/prefect/tests/infrastructure/_test_i7.py +++ b/src/prefect/tests/infrastructure/_test_i7.py @@ -14,6 +14,8 @@ # expected result N_LEVEL = [18998, 137622, 132546, 664670] N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 + TIMESPAN = IndicatorTimeSpan(start=datetime.now(), period=IndicatorPeriod.DAY) @@ -119,18 +121,7 @@ def test_flow_i7_national(db_connection): def test_flow_i7_calculate(db_connection): """Test the `calculate` flow.""" - result = db_connection.execute( - text( - """ - SELECT - (SELECT COUNT(*) AS region_count FROM Region), - (SELECT COUNT(*) AS department_count FROM Department), - (SELECT COUNT(*) AS epci_count FROM EPCI), - (SELECT COUNT(*) AS city_count FROM City) - """ - ) - ) - expected = sum(result.one()) + 1 + expected = N_NAT_REG_DPT_EPCI_CITY all_levels = [ Level.NATIONAL, Level.REGION, diff --git a/src/prefect/tests/infrastructure/_test_t1.py b/src/prefect/tests/infrastructure/_test_t1.py index 5f038067..7ab2bfe6 100644 --- a/src/prefect/tests/infrastructure/_test_t1.py +++ b/src/prefect/tests/infrastructure/_test_t1.py @@ -14,6 +14,8 @@ # expected result N_LEVEL = [212, 2250, 1489, 8724] N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 + TIMESPAN = IndicatorTimeSpan(start=datetime.now(), period=IndicatorPeriod.DAY) PARAMETERS_CHUNK = [10, 50, 100, 500] diff --git a/src/prefect/tests/param_tests.py b/src/prefect/tests/param_tests.py new file mode 100644 index 00000000..dc6303c3 --- /dev/null +++ b/src/prefect/tests/param_tests.py @@ -0,0 +1,53 @@ +"""Common parameters for tests.""" + +from indicators.models import Level # type: ignore + +# number of departments +N_DPTS = 109 + +# cumulate number of national(1), regions, departments, EPCI, city +N_NAT_REG_DPT_EPCI_CITY = 36465 + +PARAMETERS_CHUNK = [10, 50, 100, 500] + +PARAM_FLOW = [ + ( + Level.CITY, + "SELECT COUNT(*) FROM City", + ["75056", "13055", "69123"], + ), + ( + Level.EPCI, + "SELECT COUNT(*) FROM EPCI", + ["200054781", "200054807", "200046977"], + ), + ( + Level.DEPARTMENT, + "SELECT COUNT(*) FROM Department", + ["59", "75", "13"], + ), + ( + Level.REGION, + "SELECT COUNT(*) FROM Region", + ["11", "84", "75"], + ), +] + +PARAM_VALUE = [ + ( + Level.CITY, + "SELECT id FROM City WHERE name IN ('Paris', 'Marseille', 'Lyon')", + ), + ( + Level.EPCI, + "SELECT id FROM EPCI WHERE code IN ('200054781', '200054807', '200046977')", + ), + ( + Level.DEPARTMENT, + "SELECT id FROM Department WHERE code IN ('59', '75', '13')", + ), + ( + Level.REGION, + "SELECT id FROM Region WHERE code IN ('11', '84', '75')", + ), +] diff --git a/src/prefect/tests/usage/_test_u10.py b/src/prefect/tests/usage/_test_u10.py index f1e1331a..4dcb6c78 100644 --- a/src/prefect/tests/usage/_test_u10.py +++ b/src/prefect/tests/usage/_test_u10.py @@ -14,6 +14,8 @@ # expected result for level [city, epci, dpt, reg, nat] N_LEVEL = [32, 307, 172, 1055, 2718] N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 + TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) PARAMETERS_CHUNK = [10, 50, 100, 500] @@ -108,18 +110,7 @@ def test_flow_u10_national(db_connection): def test_flow_u10_calculate(db_connection): """Test the `calculate` flow.""" - result = db_connection.execute( - text( - """ - SELECT - (SELECT COUNT(*) AS region_count FROM Region), - (SELECT COUNT(*) AS department_count FROM Department), - (SELECT COUNT(*) AS epci_count FROM EPCI), - (SELECT COUNT(*) AS city_count FROM City) - """ - ) - ) - expected = sum(result.one()) + 1 + expected = N_NAT_REG_DPT_EPCI_CITY all_levels = [ Level.NATIONAL, Level.REGION, diff --git a/src/prefect/tests/usage/test_u11.py b/src/prefect/tests/usage/_test_u11.py similarity index 91% rename from src/prefect/tests/usage/test_u11.py rename to src/prefect/tests/usage/_test_u11.py index cc74df33..c3655f46 100644 --- a/src/prefect/tests/usage/test_u11.py +++ b/src/prefect/tests/usage/_test_u11.py @@ -14,6 +14,8 @@ # expected result for level [city, epci, dpt, reg, nat] N_LEVEL = [32, 301, 166, 1031, 2639] N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 + TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) PARAMETERS_CHUNK = [10, 50, 100, 500] @@ -108,18 +110,7 @@ def test_flow_u11_national(db_connection): def test_flow_u11_calculate(db_connection): """Test the `calculate` flow.""" - result = db_connection.execute( - text( - """ - SELECT - (SELECT COUNT(*) AS region_count FROM Region), - (SELECT COUNT(*) AS department_count FROM Department), - (SELECT COUNT(*) AS epci_count FROM EPCI), - (SELECT COUNT(*) AS city_count FROM City) - """ - ) - ) - expected = sum(result.one()) + 1 + expected = N_NAT_REG_DPT_EPCI_CITY all_levels = [ Level.NATIONAL, Level.REGION, diff --git a/src/prefect/tests/usage/_test_u12.py b/src/prefect/tests/usage/_test_u12.py index 280df71c..dc96abae 100644 --- a/src/prefect/tests/usage/_test_u12.py +++ b/src/prefect/tests/usage/_test_u12.py @@ -14,6 +14,8 @@ # expected result for level [city, epci, dpt, reg, nat] N_LEVEL = [42, 352, 229, 1379, 3853] N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 + TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) PARAMETERS_CHUNK = [10, 50, 100, 500] @@ -72,7 +74,9 @@ def test_task_get_values_for_target(db_connection, level, query, expected): """Test the `get_values_for_target` task.""" result = db_connection.execute(text(query)) indexes = list(result.scalars().all()) - poc_by_power = u12.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) + poc_by_power = u12.get_values_for_targets.fn( + db_connection, level, TIMESPAN, indexes + ) assert len(set(poc_by_power["level_id"])) == len(indexes) assert poc_by_power["value"].sum() == expected @@ -112,13 +116,24 @@ def test_flow_u12_calculate(db_connection): [ u12.u12_for_level(Level.CITY, TIMESPAN, chunk_size=1000)["value"].sum(), u12.u12_for_level(Level.EPCI, TIMESPAN, chunk_size=1000)["value"].sum(), - u12.u12_for_level(Level.DEPARTMENT, TIMESPAN, chunk_size=1000)["value"].sum(), + u12.u12_for_level(Level.DEPARTMENT, TIMESPAN, chunk_size=1000)[ + "value" + ].sum(), u12.u12_for_level(Level.REGION, TIMESPAN, chunk_size=1000)["value"].sum(), u12.u12_national(TIMESPAN)["value"].sum(), ] ) - all_levels = [Level.NATIONAL, Level.REGION, Level.DEPARTMENT, Level.CITY, Level.EPCI] - indicators = u12.calculate(TIMESPAN, all_levels, create_artifact=True, format_pd=True) + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = u12.calculate( + TIMESPAN, all_levels, create_artifact=True, format_pd=True + ) assert indicators["value"].sum() == expected + # query used to get N_LEVEL diff --git a/src/prefect/tests/usage/_test_u13.py b/src/prefect/tests/usage/_test_u13.py new file mode 100644 index 00000000..a0b35b85 --- /dev/null +++ b/src/prefect/tests/usage/_test_u13.py @@ -0,0 +1,127 @@ +"""QualiCharge prefect indicators tests: usage. + +U13: cumulative power of POC in operation. +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore +from indicators.usage import u13 # type: ignore + +# expected result for level [city, epci, dpt, reg, nat] +N_LEVEL = [3079, 25870, 25793, 114966, 326782] +N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 +TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) + +PARAMETERS_CHUNK = [10, 50, 100, 500] +PARAMETERS_FLOW = [ + ( + Level.CITY, + "SELECT COUNT(*) FROM City", + ["75056", "13055", "69123"], + N_LEVEL[0], + ), + ( + Level.EPCI, + "SELECT COUNT(*) FROM EPCI", + ["200054781", "200054807", "200046977"], + N_LEVEL[1], + ), + ( + Level.DEPARTMENT, + "SELECT COUNT(*) FROM Department", + ["59", "75", "13"], + N_LEVEL[2], + ), + ( + Level.REGION, + "SELECT COUNT(*) FROM Region", + ["11", "84", "75"], + N_LEVEL[3], + ), +] +PARAMETERS_GET_VALUES = [ + ( + Level.CITY, + "SELECT id FROM City WHERE name IN ('Paris', 'Marseille', 'Lyon')", + N_LEVEL[0], + ), + ( + Level.EPCI, + "SELECT id FROM EPCI WHERE code IN ('200054781', '200054807', '200046977')", + N_LEVEL[1], + ), + ( + Level.DEPARTMENT, + "SELECT id FROM Department WHERE code IN ('59', '75', '13')", + N_LEVEL[2], + ), + ( + Level.REGION, + "SELECT id FROM Region WHERE code IN ('11', '84', '75')", + N_LEVEL[3], + ), +] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_GET_VALUES) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + values = u13.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) + assert len(values) == len(indexes) + assert int(values["value"].sum()) == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + u13.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_u13_for_level(db_connection, level, query, targets, expected): + """Test the `u13_for_level` flow.""" + indicators = u13.u13_for_level(level, TIMESPAN, chunk_size=1000) + # assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_u13_for_level_with_various_chunk_sizes(chunk_size): + """Test the `u13_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = u13.u13_for_level(level, TIMESPAN, chunk_size=chunk_size) + # assert len(indicators) == N_DPTS + assert ( + int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) + == expected + ) + + +def test_flow_u13_national(db_connection): + """Test the `u13_national` flow.""" + indicators = u13.u13_national(TIMESPAN) + assert int(indicators["value"].sum()) == N_LEVEL[4] + + +def test_flow_u13_calculate(db_connection): + """Test the `calculate` flow.""" + expected = N_NAT_REG_DPT_EPCI_CITY + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = u13.calculate(TIMESPAN, all_levels, create_artifact=True) + assert len(indicators) == expected diff --git a/src/prefect/tests/usage/_test_u9.py b/src/prefect/tests/usage/_test_u9.py index 4f7c3c59..5475e933 100644 --- a/src/prefect/tests/usage/_test_u9.py +++ b/src/prefect/tests/usage/_test_u9.py @@ -14,6 +14,8 @@ # expected result for level [city, epci, dpt, reg, nat] N_LEVEL = [1150, 10235, 5489, 34639, 88135] N_DPTS = 109 +N_NAT_REG_DPT_EPCI_CITY = 36465 + TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) PARAMETERS_CHUNK = [10, 50, 100, 500] diff --git a/src/prefect/tests/usage/test_u5.py b/src/prefect/tests/usage/test_u5.py new file mode 100644 index 00000000..d08e7851 --- /dev/null +++ b/src/prefect/tests/usage/test_u5.py @@ -0,0 +1,97 @@ +"""QualiCharge prefect indicators tests: usage. + +U5: Hourly distribution of sessions (number). +""" + +from datetime import datetime + +import pytest # type: ignore +from sqlalchemy import text + +from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore +from indicators.usage import u5 # type: ignore + +from ..param_tests import ( + PARAM_FLOW, + PARAM_VALUE, + PARAMETERS_CHUNK, +) + +# expected result for level [city, epci, dpt, reg] +N_LEVEL = [32, 307, 172, 1055] +N_LEVEL_NATIONAL = 2718 + +TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) +PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] +PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] + + +@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) +def test_task_get_values_for_target(db_connection, level, query, expected): + """Test the `get_values_for_target` task.""" + result = db_connection.execute(text(query)) + indexes = list(result.scalars().all()) + ses_by_hour = u5.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) + assert len(set(ses_by_hour["level_id"])) == len(indexes) + assert ses_by_hour["value"].sum() == expected + + +def test_task_get_values_for_target_unexpected_level(db_connection): + """Test the `get_values_for_target` task (unknown level).""" + with pytest.raises(NotImplementedError, match="Unsupported level"): + u5.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) + + +@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) +def test_flow_u5_for_level(db_connection, level, query, targets, expected): + """Test the `u5_for_level` flow.""" + indicators = u5.u5_for_level(level, TIMESPAN, chunk_size=1000) + # assert len(indicators) == db_connection.execute(text(query)).scalars().one() + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) +def test_flow_u5_for_level_with_various_chunk_sizes(chunk_size): + """Test the `u5_for_level` flow with various chunk sizes.""" + level, query, targets, expected = PARAMETERS_FLOW[2] + indicators = u5.u5_for_level(level, TIMESPAN, chunk_size=chunk_size) + assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected + + +def test_flow_u5_national(db_connection): + """Test the `u5_national` flow.""" + indicators = u5.u5_national(TIMESPAN) + assert indicators["value"].sum() == N_LEVEL_NATIONAL + + +def test_flow_u5_calculate(db_connection): + """Test the `calculate` flow.""" + expected = sum( + [ + u5.u5_for_level(Level.CITY, TIMESPAN, chunk_size=1000)["value"].sum(), + u5.u5_for_level(Level.EPCI, TIMESPAN, chunk_size=1000)["value"].sum(), + u5.u5_for_level(Level.DEPARTMENT, TIMESPAN, chunk_size=1000)["value"].sum(), + u5.u5_for_level(Level.REGION, TIMESPAN, chunk_size=1000)["value"].sum(), + u5.u5_national(TIMESPAN)["value"].sum(), + ] + ) + all_levels = [ + Level.NATIONAL, + Level.REGION, + Level.DEPARTMENT, + Level.CITY, + Level.EPCI, + ] + indicators = u5.calculate( + TIMESPAN, all_levels, create_artifact=True, format_pd=True + ) + assert indicators["value"].sum() == expected + + +# query used to get N_LEVEL +N_LEVEL_NAT = """ + +""" +N_LEVEL_3 = """ + +"""