Skip to content

Commit

Permalink
add c2
Browse files Browse the repository at this point in the history
  • Loading branch information
philippe thomy authored and loco-philippe committed Feb 1, 2025
1 parent 5e505bb commit 3747c3b
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/prefect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to
- Implement i4 workflow
- Implement i7 workflow
- Implement c1 workflow
- Implement c2 workflow
- Implement u5 workflow
- Implement u6 workflow
- Implement u9 workflow
Expand Down
161 changes: 161 additions & 0 deletions src/prefect/indicators/usage/c2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""QualiCharge prefect indicators: usage.
C2: Energy cumulate by operator.
"""

from string import Template
from typing import List
from uuid import UUID

import numpy as np
import pandas as pd # type: ignore
from prefect import flow, runtime, task
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
from sqlalchemy.engine import Connection

from ..conf import settings
from ..models import Indicator, IndicatorTimeSpan, Level
from ..utils import (
export_indic,
get_database_engine,
get_num_for_level_query_params,
get_targets_for_level,
get_timespan_filter_query_params,
)

SESSIONS_BY_OPERATOR_QUERY_TEMPLATE = """
SELECT
sum(energy) / 1000.0 AS value,
nom_operateur AS category,
$level_id AS level_id
FROM
Session
INNER JOIN statique ON point_de_charge_id = pdc_id
LEFT JOIN City ON City.code = code_insee_commune
$join_extras
WHERE
$timespan
AND $level_id IN ($indexes)
GROUP BY
category,
$level_id
"""

QUERY_NATIONAL_TEMPLATE = """
SELECT
sum(energy) / 1000.0 AS value,
nom_operateur AS category
FROM
SESSION
INNER JOIN statique ON point_de_charge_id = pdc_id
WHERE
$timespan
GROUP BY
category
"""


@task(task_run_name="values-for-target-{level:02d}")
def get_values_for_targets(
connection: Connection,
level: Level,
timespan: IndicatorTimeSpan,
indexes: List[UUID],
) -> pd.DataFrame:
"""Fetch sessions given input level, timestamp and target index."""
query_template = Template(SESSIONS_BY_OPERATOR_QUERY_TEMPLATE)
query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))}
query_params |= get_num_for_level_query_params(level)
query_params |= get_timespan_filter_query_params(timespan, session=True)
return pd.read_sql_query(query_template.substitute(query_params), con=connection)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="c2-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}",
)
def c2_for_level(
level: Level,
timespan: IndicatorTimeSpan,
chunk_size=settings.DEFAULT_CHUNK_SIZE,
) -> pd.DataFrame:
"""Calculate c2 for a level and a timestamp."""
if level == Level.NATIONAL:
return c2_national(timespan)
engine = get_database_engine()
with engine.connect() as connection:
targets = get_targets_for_level(connection, level)
ids = targets["id"]
chunks = (
np.array_split(ids, int(len(ids) / chunk_size))
if len(ids) > chunk_size
else [ids.to_numpy()]
)
futures = [
get_values_for_targets.submit(connection, level, timespan, chunk) # type: ignore[call-overload]
for chunk in chunks
]
wait(futures)

# Concatenate results and serialize indicators
results = pd.concat([future.result() for future in futures], ignore_index=True)
merged = targets.merge(results, how="left", left_on="id", right_on="level_id")

# Build result DataFrame
indicators = {
"target": merged["code"],
"value": merged["value"].fillna(0),
"code": "c2",
"level": level,
"period": timespan.period,
"timestamp": timespan.start.isoformat(),
"category": merged["category"].astype("str"),
"extras": None,
}
return pd.DataFrame(indicators)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="c2-{timespan.period.value}-00-{timespan.start:%y-%m-%d}",
)
def c2_national(timespan: IndicatorTimeSpan) -> pd.DataFrame:
"""Calculate c2 at the national level."""
engine = get_database_engine()
query_template = Template(QUERY_NATIONAL_TEMPLATE)
query_params = get_timespan_filter_query_params(timespan, session=True)
with engine.connect() as connection:
res = pd.read_sql_query(query_template.substitute(query_params), con=connection)
indicators = {
"target": None,
"value": res["value"].fillna(0),
"code": "c2",
"level": Level.NATIONAL,
"period": timespan.period,
"timestamp": timespan.start.isoformat(),
"category": res["category"].astype("str"),
"extras": None,
}
return pd.DataFrame(indicators)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="meta-c2-{timespan.period.value}",
)
def calculate(
timespan: IndicatorTimeSpan,
levels: List[Level],
create_artifact: bool = False,
chunk_size: int = 1000,
format_pd: bool = False,
) -> List[Indicator]:
"""Run all c2 subflows."""
subflows_results = [
c2_for_level(level, timespan, chunk_size=chunk_size) for level in levels
]
indicators = pd.concat(subflows_results, ignore_index=True)
description = f"c2 report at {timespan.start} (period: {timespan.period.value})"
flow_name = runtime.flow_run.name
return export_indic(indicators, create_artifact, flow_name, description, format_pd)
32 changes: 29 additions & 3 deletions src/prefect/prefect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ deployments:
work_pool:
name: indicators
work_queue_name: default

- name: i4-daily
entrypoint: indicators/run.py:i4_calculate
concurrency_limit: 10
Expand All @@ -31,6 +32,7 @@ deployments:
work_pool:
name: indicators
work_queue_name: default

- name: i7-daily
entrypoint: indicators/run.py:i7_calculate
concurrency_limit: 10
Expand All @@ -45,6 +47,7 @@ deployments:
work_pool:
name: indicators
work_queue_name: default

- name: t1-daily
entrypoint: indicators/run.py:t1_calculate
concurrency_limit: 10
Expand All @@ -59,6 +62,7 @@ deployments:
work_pool:
name: indicators
work_queue_name: default

- name: c1-daily
entrypoint: indicators/run.py:c1_calculate
concurrency_limit: 10
Expand All @@ -72,7 +76,23 @@ deployments:
chunk_size: 1000
work_pool:
name: indicators
work_queue_name: default
work_queue_name: default

- name: c2-daily
entrypoint: indicators/run.py:c2_calculate
concurrency_limit: 10
schedules:
- cron: "0 15 * * *"
timezone: "Europe/Paris"
active: true
parameters:
period: d
create_artifact: true
chunk_size: 1000
work_pool:
name: indicators
work_queue_name: default

- name: u5-daily
entrypoint: indicators/run.py:u5_calculate
concurrency_limit: 10
Expand All @@ -86,7 +106,8 @@ deployments:
chunk_size: 1000
work_pool:
name: indicators
work_queue_name: default
work_queue_name: default

- name: u6-daily
entrypoint: indicators/run.py:u6_calculate
concurrency_limit: 10
Expand All @@ -100,7 +121,8 @@ deployments:
chunk_size: 1000
work_pool:
name: indicators
work_queue_name: default
work_queue_name: default

- name: u9-daily
entrypoint: indicators/run.py:u9_calculate
concurrency_limit: 10
Expand All @@ -115,6 +137,7 @@ deployments:
work_pool:
name: indicators
work_queue_name: default

- name: u10-daily
entrypoint: indicators/run.py:u10_calculate
concurrency_limit: 10
Expand All @@ -129,6 +152,7 @@ deployments:
work_pool:
name: indicators
work_queue_name: default

- name: u11-daily
entrypoint: indicators/run.py:u11_calculate
concurrency_limit: 10
Expand All @@ -143,6 +167,7 @@ deployments:
work_pool:
name: indicators
work_queue_name: default

- name: u12-daily
entrypoint: indicators/run.py:u12_calculate
concurrency_limit: 10
Expand All @@ -157,6 +182,7 @@ deployments:
work_pool:
name: indicators
work_queue_name: default

- name: u13-daily
entrypoint: indicators/run.py:u13_calculate
concurrency_limit: 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ def test_task_get_values_for_target(db_connection, level, query, expected):
"""Test the `get_values_for_target` task."""
result = db_connection.execute(text(query))
indexes = list(result.scalars().all())
ses_by_hour = c1.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes)
assert len(set(ses_by_hour["level_id"])) == len(indexes)
assert ses_by_hour["value"].sum() == expected
values = c1.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes)
assert len(set(values["level_id"])) == len(indexes)
assert values["value"].sum() == expected


def test_task_get_values_for_target_unexpected_level(db_connection):
Expand Down Expand Up @@ -82,7 +82,7 @@ def test_flow_c1_calculate(db_connection):
# query used to get N_LEVEL
N_LEVEL_NAT = """
SELECT
count(*) AS value,
count(*) AS value
FROM
SESSION
INNER JOIN statique ON point_de_charge_id = pdc_id
Expand Down
Loading

0 comments on commit 3747c3b

Please sign in to comment.