Dieses Dokument beschreibt die docker-compose.yml-Datei, die zur Definition und Ausführung der Multi-Container-Anwendung verwendet wird.
Diese docker-compose.yml-Datei definiert eine Multi-Container-Anwendung, die ein Umweltmonitoring-System mit einem Backend, Frontend, Datenbank, Redis-Cache, Prefect-Server, Prefect-Worker und einem Datenbank-Explorer bereitstellt, alles hinter einem Caddy-Proxy.
3.8
- Beschreibung: Der PostgreSQL-Datenbankdienst mit TimescaleDB-Erweiterung.
- Image:
timescale/timescaledb:latest-pg16 - Container Name:
timescale_db_umw - Umgebungsvariablen: Konfiguriert den PostgreSQL-Benutzer (
POSTGRES_USER), das Passwort (POSTGRES_PASSWORD) und den Datenbanknamen (POSTGRES_DB) mittels Umgebungsvariablen oder Standardwerten. - Volumes: Persistiert Datenbankdaten in
postgres_dataund führt Initialisierungsskripte aus dem./init_scripts-Verzeichnis aus. - Ports: Exponiert Port
5432des Containers nach außen. - Restart Policy: Startet immer neu, es sei denn, er wird explizit gestoppt.
- Healthcheck: Überprüft die Datenbankverbindung mit
pg_isready.
- Beschreibung: Der Redis-Cache-Dienst für FastAPI-Caching und andere Zwecke.
- Image:
redis:latest - Container Name:
redis_cache_umw - Restart Policy: Startet immer neu, es sei denn, er wird explizit gestoppt.
- Volumes: Persistiert Redis-Daten in
redis_data. - Ports: Exponiert Port
6379des Containers nach außen.
- Beschreibung: Der FastAPI-Backend-Dienst.
- Container Name:
backend_fastapi_umw - Build Kontext: Baut das Docker-Image aus dem aktuellen Verzeichnis mit dem Dockerfile in
./services/backend/Dockerfile. - Volumes: Bindet lokale Verzeichnisse (
./services/backend,./shared,./host_models_dir) in den Container ein. - Command: Führt die FastAPI-Anwendung mit
uvicornim Reload-Modus aus. - Umgebungsvariablen: Setzt
PREFECT_API_URLzur Kommunikation mit dem Prefect-Server. - Working Directory: Setzt das Arbeitsverzeichnis im Container auf
/app/backend. - Dependencies: Startet, nachdem
db,redis,prefectundprefect-workergestartet und als "healthy" (fürdb) oder "started" (für andere) befunden wurden. - Restart Policy: Startet immer neu, es sei denn, er wird explizit gestoppt.
- Ports: Exponiert Port
8000des Containers nach außen. - Healthcheck: Überprüft die Erreichbarkeit des
/api/health-Endpunkts des Backends.
- Beschreibung: Der Dash-Frontend-Dienst.
- Container Name:
frontend_dash_umw - Build Kontext: Baut das Docker-Image aus dem aktuellen Verzeichnis mit dem Dockerfile in
./services/frontend/Dockerfile. - Volumes: Bindet lokale Verzeichnisse (
./services/frontend,./shared) in den Container ein. - Command: Führt die Dash-Anwendung mit
uvicornaus. - Umgebungsvariablen: Setzt
BACKEND_API_URLzur Kommunikation mit dem Backend-Dienst. - Working Directory: Setzt das Arbeitsverzeichnis im Container auf
/app/frontend. - Dependencies: Startet, nachdem das
backend-Service als "healthy" befunden wurde. - Restart Policy: Startet immer neu, es sei denn, er wird explizit gestoppt.
- Ports: Exponiert Port
8050des Containers nach außen.
- Beschreibung: Der Prefect-Server-Dienst, der die Orchestrierung von Workflows verwaltet.
- Container Name:
prefect_server_umw - Image:
prefecthq/prefect:3-latest - Command: Startet den Prefect-Server.
- Umgebungsvariablen: Konfiguriert die Datenbankverbindung für den Prefect-Server und die API-URL.
- Dependencies: Startet, nachdem der
db-Dienst als "healthy" befunden wurde. - Restart Policy: Startet immer neu, es sei denn, er wird explizit gestoppt.
- Ports: Exponiert Port
4200des Containers nach außen (für das Prefect UI und die API).
- Beschreibung: Der Prefect-Worker-Dienst, der Prefect Flows ausführt.
- Container Name:
prefect_worker_umw - Build Kontext: Baut das Docker-Image aus dem aktuellen Verzeichnis mit dem Dockerfile in
./services/ml_service/Dockerfile. - Volumes: Bindet lokale Verzeichnisse (
./services/ml_service,./shared,./host_models_dir) in den Container ein. - Working Directory: Setzt das Arbeitsverzeichnis im Container auf
/app/ml_service. - Command: Führt das
run_worker.py-Skript aus, das die Deployments registriert und den Worker startet. - Umgebungsvariablen: Setzt
PREFECT_API_URLundDATABASE_URL_PREFECTfür die Worker-Konfiguration. - Dependencies: Startet, nachdem
dbals "healthy" undprefectals "started" befunden wurde. - Restart Policy: Startet immer neu, es sei denn, er wird explizit gestoppt.
- Ports: Exponiert Port
8001des Containers nach außen.
- Beschreibung: Ein Web-basierter Datenbank-Explorer (pgweb) für PostgreSQL.
- Container Name:
pgweb_explorer_umw - Image:
sosedoff/pgweb:latest - Command: Startet
pgwebund konfiguriert die Verbindung zurdb-Datenbank. - Umgebungsvariablen: Setzt Datenbank-Anmeldedaten, die vom
commandbenötigt werden. - Dependencies: Startet, nachdem der
db-Dienst als "healthy" befunden wurde. - Restart Policy: Startet immer neu, es sei denn, er wird explizit gestoppt.
- Ports: Exponiert Port
8081des Containers nach außen.
- Beschreibung: Ein Reverse-Proxy und Webserver (Caddy).
- Container Name:
caddy_proxy_umw - Image:
caddy:latest - Restart Policy: Startet immer neu, es sei denn, er wird explizit gestoppt.
- Ports: Exponiert Port
3000des Containers nach außen. - Volumes: Bindet die lokale
Caddyfileund persistente Daten/Konfigurationen für Caddy ein. - Dependencies: Stellt sicher, dass
backend,prefectunddb-explorergestartet sind, bevor Caddy startet.
postgres_data: Persistentes Volume für PostgreSQL-Daten.redis_data: Persistentes Volume für Redis-Daten.caddy_data: Persistentes Volume für Caddy-Daten.caddy_config: Persistentes Volume für Caddy-Konfigurationen.
Diese Datei ist der zentrale Einstiegspunkt für die FastAPI-Anwendung. Sie konfiguriert die Anwendung, bindet API-Router ein, initialisiert Dienste wie den Redis-Cache und den Prefect Scheduler und handhabt den Anwendungslebenszyklus.
Die main.py-Datei startet die FastAPI-Anwendung für das Umweltmonitoring-Backend. Sie ist verantwortlich für:
- Anwendungslebenszyklus (
lifespan): Führt Startup-Logik aus, wie die initiale Datenaufnahme über einen Prefect Flow und die Initialisierung des Redis-Caches. Stoppt den Scheduler beim Herunterfahren. - Routing: Bindet API-Router für Sensoren und Vorhersagen ein.
- Gesundheitscheck: Stellt einen einfachen Gesundheitscheck-Endpunkt bereit.
- Statische Dateien: Dient statische Dateien (z.B. Favicon) aus.
FastAPI,APIRouter,status: FastAPI-Kernkomponenten.logging: Für die Protokollierung von Ereignissen.asyncio: Für asynchrone Operationen, insbesondere imlifespan-Kontext.contextlib.asynccontextmanager: Für denlifespan-Manager.starlette.concurrency.run_in_threadpool: Für das Ausführen blockierender I/O in einem Thread-Pool (wird hier deklariert, aber nicht direkt aufgerufen).apscheduler.schedulers.asyncio.AsyncIOScheduler,apscheduler.triggers.interval.IntervalTrigger: Für die Zeitplanung von Aufgaben (der Scheduler wird initialisiert, aber aktuell nicht verwendet, um Aufgaben hinzuzufügen).datetime.timedelta: Für Zeitintervalle (verwendet mit APScheduler).fastapi.staticfiles.StaticFiles,starlette.responses.FileResponse: Zum Servieren statischer Dateien.utils.db_session.SessionLocal: Für den Zugriff auf die Datenbank-Session.core.config.settings: Anwendungseinstellungen (z.B. Datenbank-Konfiguration, Redis-Host, Sensor-Box-ID).shared.crud.crud_sensor: CRUD-Operationen für Sensor-Datenbankmodelle.api.v1.endpoints.sensors,api.v1.endpoints.predictions: Router für die API-Endpunkte der Version 1.fastapi_cache.FastAPICache,fastapi_cache.backends.redis.RedisBackend,redis.asyncio as aioredis: Für die Konfiguration des FastAPI-Caches mit Redis.prefect.deployments.run_deployment,prefect.exceptions.ObjectNotFound: Zum Starten von Prefect Deployments.
logger: Ein Logger-Objekt für die Anwendung.BASE_DIR,project_root,STATIC_DIR,FAVICON_PATH: Pfad-Konfigurationen für statische Dateien.scheduler: Eine Instanz vonAsyncIOScheduler.
Der lifespan-Kontextmanager handhabt Aufgaben, die beim Starten und Herunterfahren der FastAPI-Anwendung ausgeführt werden sollen.
- Startup-Logik:
- Datenbank-Check: Überprüft, ob die konfigurierte
SENSOR_BOX_IDbereits in der Datenbank existiert. - Initiale Datenaufnahme (Prefect Flow): Wenn die SensorBox nicht in der Datenbank gefunden wird, versucht der Lebenszyklus, einen Prefect Flow namens
"data_ingestion_flow/timeseries-data-ingestion"zu starten.- Es gibt eine Wiederholungslogik mit maximal 10 Versuchen und einer Verzögerung von 3 Sekunden, um auf das Deployment im Prefect Server zu warten.
- Parameter wie
box_id,initial_fetch_daysundfetch_chunk_dayswerden an den Flow übergeben.
- Redis-Cache-Initialisierung: Stellt eine Verbindung zu Redis her (
aioredis.from_url) und initialisiertFastAPICachemit dem Redis-Backend.
- Datenbank-Check: Überprüft, ob die konfigurierte
- Shutdown-Logik:
- Stoppt den
AsyncIOScheduler, falls er läuft.
- Stoppt den
- Die FastAPI-Anwendung wird mit einem Titel (
"Umweltmonitoring Backend") und einer Version ("0.1.0") initialisiert. - Der
lifespan-Kontextmanager wird an die App gebunden.
/favicon.ico: Ein Endpunkt, der das Favicon aus demSTATIC_DIRdient./assets: Ein Mount-Punkt für statische Dateien imSTATIC_DIR./api/health: Ein einfacher Gesundheitscheck-Endpunkt, der{"status": "ok"}zurückgibt.- API-Router:
sensors_routerwird unter/api/v1eingebunden, mit dem Tag"sensors".predictions_routerwird unter/api/v1eingebunden, mit dem Tag"predictions".health_routerwird unter/apieingebunden, mit dem Tag"health-check".
- Der
lifespan-Manager fängt kritische Fehler während des Startups ab, loggt sie und fährt den Scheduler herunter, bevor er eineRuntimeError-Ausnahme auslöst, um das Scheitern des Starts zu signalisieren. - Die Wiederholungslogik für den Prefect Flow Run fängt
ObjectNotFound(wenn das Deployment nicht sofort verfügbar ist) und andere generischeExceptions ab.
Diese Datei konfiguriert die SQLAlchemy-Datenbank-Engine und stellt eine Funktion zum Abrufen einer Datenbank-Session bereit.
Die db_session.py-Datei ist für die Einrichtung der Datenbankverbindung für die FastAPI-Anwendung verantwortlich. Sie verwendet die in core.config.settings definierten Datenbank-URLs, um eine SQLAlchemy Engine zu erstellen, und konfiguriert eine Session-Fabrik (SessionLocal). Außerdem bietet sie eine Dependency-Injektions-Funktion (get_db) für FastAPI, um Datenbank-Sessions sicher zu verwalten.
sqlalchemy.create_engine: Zum Erstellen der Datenbank-Engine.sqlalchemy.orm.sessionmaker: Zum Erstellen einer Session-Fabrik.core.config.settings: Die Anwendungseinstellungen, die dieDATABASE_URLenthalten.
engine: Die SQLAlchemy Engine-Instanz, die beim Laden des Moduls erstellt wird. Sie ist mitpool_pre_ping=Truekonfiguriert, um die Konnektivität des Pools zu überprüfen.SessionLocal: Eine SQLAlchemy Session-Fabrik, die an dieenginegebunden ist. Sie ist so konfiguriert, dass sieautocommit=Falseundautoflush=Falseverwendet, was manuelle Commits und Flushes erfordert und Transaktionen expliziter macht.
get_db():- Beschreibung: Eine Generatorfunktion, die als FastAPI-Dependency verwendet wird. Sie stellt eine frische Datenbank-Session für jeden Request bereit und stellt sicher, dass die Session nach Abschluss des Requests (oder im Fehlerfall) ordnungsgemäß geschlossen wird.
- Logik:
- Erstellt eine neue Session aus
SessionLocal. yieldt die Session, wodurch der Request-Handler ausgeführt werden kann.- Im
finally-Block wirddb.close()aufgerufen, um die Session zu schließen und Ressourcen freizugeben, unabhängig davon, ob der Request erfolgreich war oder ein Fehler auftrat.
- Erstellt eine neue Session aus
- Zentralisierte Datenbank-Initialisierung.
- Bereitstellung einer thread-sicheren und request-spezifischen Datenbank-Session für FastAPI-Endpunkte.
- Sicherstellung des ordnungsgemäßen Ressourcenmanagements der Datenbankverbindungen.
Diese Datei enthält mehrere Hilfsfunktionen, die dynamische Cache-Schlüssel für verschiedene API-Endpunkte der FastAPI-Anwendung generieren.
Die keybuilder.py-Datei stellt eine Sammlung von Funktionen bereit, die darauf ausgelegt sind, konsistente und eindeutige Cache-Schlüssel zu erstellen. Diese Schlüssel werden von fastapi-cache verwendet, um API-Antworten zu speichern und wiederherzustellen, wodurch die Notwendigkeit reduziert wird, wiederholt teure Operationen wie Datenbankabfragen oder externe API-Aufrufe durchzuführen. Jeder Schlüssel berücksichtigt die relevanten Parameter des API-Endpunkts, um sicherzustellen, dass unterschiedliche Anfragen unterschiedliche Cache-Einträge erhalten.
_format_datetime(dt: datetime | None) -> str: Eine private Hilfsfunktion, die eindatetime-Objekt in einen ISO-formatierten String umwandelt oder "None" zurückgibt, wenn das ObjektNoneist. Dies gewährleistet eine konsistente Datumsformatierung in den Cache-Schlüsseln.
Alle Schlüssel-Builder-Funktionen folgen einem ähnlichen Muster:
- Sie nehmen
func,*args,**kwargsals Eingabe an.funcist die gecachte Funktion (der API-Endpunkt), undkwargsenthält die Parameter des Endpunkts. - Ein
prefixwird basierend auf dem Modul und Namen der Funktion generiert ({func.__module__}:{func.__name__}). - Die spezifischen Endpunkt-Parameter werden aus
kwargs.get('kwargs', {})extrahiert. - Der
cache_keywird durch Konkatenierung des Präfixes und der relevanten Parameter-Werte erstellt. - Der generierte Schlüssel wird zu Debug-Zwecken geloggt.
Hier sind die spezifischen Schlüssel-Builder:
-
aggregate_key_builder(func, *args, **kwargs) -> str:- Zweck: Erstellt einen Cache-Schlüssel für aggregierte Sensordaten.
- Parameter im Schlüssel:
sensor_id,from_date,to_date,interval,aggregation_type,smoothing_window,interpolation_method.
-
list_sensors_key_builder(func, *args, **kwargs) -> str:- Zweck: Erstellt einen Cache-Schlüssel für die Auflistung von Sensoren.
- Parameter im Schlüssel:
skip,limit,name_filter.
-
box_detail_key_builder(func, *args, **kwargs) -> str:- Zweck: Erstellt einen Cache-Schlüssel für die Details einer Sensorbox.
- Parameter im Schlüssel:
box_id.
-
sensors_for_box_key_builder(func, *args, **kwargs) -> str:- Zweck: Erstellt einen Cache-Schlüssel für Sensoren, die zu einer bestimmten Box gehören.
- Parameter im Schlüssel:
box_id,skip,limit.
-
summary_stats_key_builder(func, *args, **kwargs) -> str:- Zweck: Erstellt einen Cache-Schlüssel für zusammenfassende Statistiken.
- Parameter im Schlüssel:
sensor_id,from_date,to_date.
-
latest_data_key_builder(func, *args, **kwargs) -> str:- Zweck: Erstellt einen Cache-Schlüssel für die neuesten Sensordaten.
- Parameter im Schlüssel:
sensor_id.
-
raw_data_key_builder(func, *args, **kwargs) -> str:- Zweck: Erstellt einen Cache-Schlüssel für Rohsensordaten.
- Parameter im Schlüssel:
sensor_id,from_date,to_date,skip,limit.
- Caching-Effizienz: Ermöglicht
fastapi-cache, die Antworten von Endpunkten basierend auf allen relevanten Anfragelparametern eindeutig zu identifizieren und zwischenzuspeichern. - Reduzierung der Last: Minimiert die Notwendigkeit, wiederholt auf die Datenbank oder externe APIs zuzugreifen, wenn dieselbe Anfrage erneut gestellt wird.
- Wartbarkeit: Zentralisiert die Logik zur Schlüsselgenerierung, was die Verwaltung und Fehlerbehebung von Caching-Problemen erleichtert.
Diese Datei enthält eine Funktion zur Durchführung von Feature Engineering und zur Vorbereitung von historischen Sensordaten für die Vorhersage.
create_features_for_prediction(historical_df: pd.DataFrame) -> pd.DataFrame
Die Funktion create_features_for_prediction nimmt einen DataFrame mit historischen Temperaturdaten entgegen und wendet eine Reihe von Feature-Engineering-Schritten an. Ziel ist es, ein Feature-Set zu erstellen, das für die Eingabe in ein Machine-Learning-Vorhersagemodell geeignet ist. Die Funktion gibt die Features für den letzten verfügbaren Zeitpunkt zurück, was typisch für Inferenz-Szenarien ist.
historical_df(pd.DataFrame): Ein Pandas DataFrame, das stündliche historische Temperaturdaten enthält. Es wird erwartet, dass es einenDatetimeIndexund eine Spalte namens'temperatur'hat.
pd.DataFrame: Ein DataFrame mit einer einzigen Zeile, die alle generierten Features für den letzten verfügbaren Zeitstempel in den Eingabedaten enthält.
TIMEZONE(str): Die Zeitzone, in die die Daten konvertiert werden sollen (Standard:"Europe/London").
- Zeitzonen-Korrektur: Stellt sicher, dass der Index des Eingabe-DataFrames zeitzonen-bewusst ist und in die konfigurierte
TIMEZONEkonvertiert wird. - Externe Features: Ruft Solar-Features (
get_solar_features) und Wetter-Features (get_weather_features) für den relevanten Zeitbereich ab und verbindet diese mit dem Haupt-DataFrame. - Zeit-Features: Erstellt
hour_sinundhour_cosaus der Stunde des Tages, um die zyklische Natur der Zeit zu erfassen. - Lag-Features: Erzeugt verzögerte Versionen der
'temperatur'-Spalte (z.B. 1, 2, 3, 24 Stunden zurück) und ausgewählter externer Features (Wettertemperatur, Globalstrahlung, Bewölkung). - Rolling Window Features: Berechnet gleitende Durchschnitte und Standardabweichungen der Temperatur über verschiedene Zeitfenster (z.B. 3, 6, 24, 48, 72, 168 Stunden). Hierbei wird
shift(1)verwendet, um sicherzustellen, dass die gleitenden Statistiken keine zukünftigen Daten enthalten. - Temperaturdifferenzen: Berechnet die Differenz der Temperatur über verschiedene Zeitintervalle (z.B. 1, 3, 6, 12, 24 Stunden).
- Imputation: Führt eine lineare Interpolation durch, um fehlende Werte (z.B. durch Joins oder
shift-Operationen) aufzufüllen. Anschließend werden verbleibende NaN-Werte (z.B. am Anfang des DataFrames) mit der nächstfolgenden gültigen Beobachtung gefüllt (bfill). - Spaltenbereinigung: Entfernt die ursprüngliche
'temperatur'-Spalte, da sie nicht Teil des Feature-Sets sein soll. - Letzte Zeile Extraktion: Gibt die letzte Zeile des verarbeiteten DataFrames zurück, die alle Features für den neuesten verfügbaren Zeitpunkt enthält.
utils.feature_enhancer.get_solar_features: Zum Abrufen von Solar-Features.utils.feature_enhancer.get_weather_features: Zum Abrufen von Wetter-Features.
- Bereitstellung eines konsistenten Feature-Engineering-Prozesses für die Vorhersagephase.
- Sicherstellung, dass die Eingabedaten für das ML-Modell die gleiche Struktur und Qualität wie die Trainingsdaten aufweisen.
Diese Datei enthält Funktionen zur Berechnung zusätzlicher Features aus geografischen und Wetterdaten, die zur Verbesserung von Machine-Learning-Modellen dienen.
LATITUDE(float): Breitengrad des Standorts (Standard:52.019364).LONGITUDE(float): Längengrad des Standorts (Standard:-1.73893).TIMEZONE(str): Zeitzone des Standorts (Standard:"Europe/London").
get_solar_features(df_index: pd.DatetimeIndex) -> pd.DataFrameget_weather_features(start_date: str, end_date: str) -> Optional[pd.DataFrame]
Berechnet Sonnenstand-Features (Höhe über dem Horizont und Azimut) für jeden Zeitstempel in einem gegebenen Pandas DatetimeIndex unter Verwendung der pvlib-Bibliothek und den konfigurierten Geokoordinaten. Die Werte werden in ein für ML-Modelle geeignetes Format transformiert (z.B. Sinus/Kosinus-Transformation für zyklische Werte).
Ruft historische Wetterdaten (Luftfeuchtigkeit, Bewölkung, Windgeschwindigkeit, Globalstrahlung) von der Open-Meteo Archive API für einen angegebenen Datumsbereich ab. Die abgerufenen Daten werden in ein Pandas DataFrame konvertiert und für die Nutzung in ML-Modellen vorbereitet (z.B. Umbenennung von Spalten, Zeitzonen-Handhabung).
- Erstellt ein
pvlib.location.Location-Objekt mit den Geodaten. - Verwendet
location.get_solarposition()um die Sonnenposition für jeden Zeitstempel imdf_indexzu berechnen. - Wählt relevante Spalten aus (
apparent_elevation,azimuth) und benennt sie um. - Transformiert
solar_elevationmit der Sinusfunktion undsolar_azimuthin Sinus-/Kosinus-Paare, um zyklische Abhängigkeiten für Modelle besser abzubilden und die Werte zu normalisieren.
- Definiert die API-URL für Open-Meteo und die benötigten Wettervariablen (
hourlyparameters). - Führt einen HTTP GET-Request an die Open-Meteo API aus und übergibt die Datumsbereiche und Parameter.
- Überprüft den HTTP-Statuscode (
raise_for_status). - Konvertiert die JSON-Antwort in ein Pandas DataFrame.
- Setzt den 'time'-Index und konvertiert ihn in die konfigurierte Zeitzone (
TIMEZONE), wobei Besonderheiten wie Sommer-/Winterzeitübergänge berücksichtigt werden. - Benennt die Spalten für eine bessere Lesbarkeit und Konsistenz um.
get_weather_featuresfängtrequests.exceptions.RequestExceptionab und gibtNonezurück, falls ein Fehler beim Abrufen der Wetterdaten auftritt.
Diese Datei definiert das SQLAlchemy-Modell TrainedModel, das zur Speicherung von Metadaten und Metriken trainierter Zeitreihen-Vorhersagemodelle in einer Datenbank verwendet wird.
TrainedModel
Das TrainedModel-Modell ist eine SQLAlchemy-Klasse, die die Struktur der trained_models-Tabelle in der Datenbank abbildet. Sie dient dazu, wichtige Informationen über jedes trainierte ML-Modell zu persistieren, einschließlich seiner Identifikation, des Speicherpfads, der Trainingsmetriken und der Leistungsmetriken aus der Validierung. Das Modell unterstützt automatische Versionierung über die version_id-Spalte.
trained_models
id(Integer, Primary Key): Eindeutiger Bezeichner des Modelleintrags.model_name(String(255), Not Null, Index): Der Name des Modells.forecast_horizon_hours(Integer, Not Null, Index): Der Vorhersagehorizont in Stunden, für den dieses Modell trainiert wurde.model_path(String(512), Not Null, Unique): Der Dateipfad, unter dem das trainierte Modell gespeichert ist. Dies muss eindeutig sein.version_id(Integer, Not Null): Die Versionsnummer des Modells. Diese Spalte wird für die automatische Optimistic Concurrency Control (OCC) von SQLAlchemy verwendet.last_trained_at(DateTime, Timezone True, Server Default:func.now(), On Update:func.now()): Zeitstempel des letzten Trainings oder der letzten Aktualisierung des Modelleintrags.training_duration_seconds(Float, Nullable): Die Dauer des Trainingsprozesses in Sekunden.val_mae(Float, Nullable): Mean Absolute Error (MAE) aus der Validierung.val_rmse(Float, Nullable): Root Mean Squared Error (RMSE) aus der Validierung.val_mape(Float, Nullable): Mean Absolute Percentage Error (MAPE) aus der Validierung.val_r2(Float, Nullable): R-squared (Bestimmtheitsmaß) aus der Validierung.
- Automatische Versionierung: Die
version_id-Spalte ist als Versionszähler für SQLAlchemy konfiguriert (__mapper_args__ = {'version_id_col': version_id}). Dies ermöglicht Optimistic Concurrency Control, um Dateninkonsistenzen bei gleichzeitigen Schreibzugriffen zu vermeiden. - Zeitstempel-Automatisierung:
last_trained_atwird beim Erstellen und jeder Aktualisierung des Eintrags automatisch mit dem aktuellen Zeitstempel versehen. - Indizes:
model_nameundforecast_horizon_hourssind indiziert, um effiziente Abfragen zu ermöglichen. - Eindeutiger Modellpfad:
model_pathmuss eindeutig sein, um sicherzustellen, dass jeder Modelleintrag auf einen spezifischen, einmaligen Dateipfad verweist.
Diese Datei definiert die Anwendungseinstellungen unter Verwendung von Pydantic-Settings und lädt Umgebungsvariablen für die Datenbank- und Redis-Konfiguration.
Settings
Die Settings-Klasse erbt von Pydantic's BaseSettings und wird verwendet, um Umgebungsvariablen und Standardwerte für die Anwendungskonfiguration zu verwalten. Sie ist speziell für Datenbank-, Redis- und Prefect-Verbindungsdetails konfiguriert und generiert automatisch die DATABASE_URL und MAINTENANCE_DATABASE_URL aus den einzelnen Komponenten.
DB_USER(str): Datenbank-Benutzername.DB_PASSWORD(str): Datenbank-Passwort.DB_HOST(str): Datenbank-Host.DB_PORT(str, Standard:"5432"): Datenbank-Port.DB_NAME(str): Name der Hauptdatenbank.DATABASE_URL(str | None): Die vollständige Datenbank-Verbindungs-URL für die Hauptdatenbank. Wird automatisch generiert, wenn nicht explizit gesetzt.MAINTENANCE_DATABASE_URL(str | None): Die Datenbank-Verbindungs-URL für die Maintenance-Datenbank (normalerweisepostgres). Wird automatisch generiert, wenn nicht explizit gesetzt.REDIS_HOST(str, Standard:"localhost"): Host des Redis-Servers.REDIS_PORT(int, Standard:6379): Port des Redis-Servers.PREFECT_DB_NAME(str): Name der Prefect-Datenbank.PREFECT_UI_SERVE_BASE(str, Standard:"/prefect"): Basis-URL, unter der das Prefect UI bereitgestellt wird.SENSOR_BOX_ID(str): ID der Sensorbox, die für die initiale Datenaufnahme verwendet wird.INITIAL_TIME_WINDOW_IN_DAYS(int, Standard:365): Initiales Zeitfenster in Tagen für den Datenabruf einer neuen Box.FETCH_TIME_WINDOW_DAYS(int, Standard:4): Zeitfenster in Tagen für den chunk-weisen Datenabruf.
- Pydantic-Settings: Die Klasse erbt von
BaseSettings, was das Laden von Umgebungsvariablen (und optional aus.env-Dateien) automatisiert. - URL-Generierung: Im
__init__-Method wird dieDATABASE_URLundMAINTENANCE_DATABASE_URLdynamisch aus den einzelnen Komponenten (DB_USER,DB_PASSWORD, etc.) erstellt, wobei das Passwort URL-sicher kodiert wird (quote_plus). .env-Datei Unterstützung:model_config = SettingsConfigDict(env_file='../.env', env_file_encoding='utf-8', extra='ignore')konfiguriert Pydantic, um Umgebungsvariablen aus einer.env-Datei zu laden. Der Pfad../.envdeutet an, dass die.env-Datei eine Ebene über dem aktuellencore-Ordner erwartet wird (z.B. imbackend-Ordner oder im Projekt-Root).- Globale Instanz: Eine Instanz der
Settings-Klasse (settings = Settings()) wird direkt in der Datei erstellt, um den einfachen Zugriff auf die Konfigurationseinstellungen zu ermöglichen.
Diese Datei definiert die API-Endpunkte für den Zugriff auf Sensoren und deren Daten. Sie bietet Funktionen zum Abrufen von Sensorboxen, einzelnen Sensoren, Rohdaten, aggregierten Daten und statistischen Zusammenfassungen.
Die sensors.py-Datei enthält FastAPI-Router-Definitionen, die es Clients ermöglichen, Informationen über Sensorboxen und Sensordaten abzurufen. Die Endpunkte nutzen Caching (fastapi-cache) zur Performance-Optimierung und interagieren mit der Datenbank über CRUD-Operationen.
logging: Für die Protokollierung von Informationen.fastapi: FastAPI-Core-Komponenten.fastapi_cache.decorator.cache: Decorator für das Caching von Endpunkt-Antworten.sqlalchemy.orm.Session: Für die Datenbank-Session.datetime: Für Datums-/Zeit-Manipulationen.utils.db_session.get_db: Dependency-Funktion zum Abrufen der Datenbank-Session.shared.crud.crud_sensor: CRUD-Operationen für Sensoren und Sensordaten.shared.schemas.sensor as sensor_schema: Pydantic-Schemas für Sensor-bezogene Datenmodelle.utils.keybuilder: Hilfsfunktionen zum Generieren von Cache-Schlüsseln.
router = APIRouter(): Eine Instanz von APIRouter zur Gruppierung der Sensor-Endpunkte.
- Zweck: Ruft eine Liste aller Sensorboxen ab.
- Parameter:
skip(int, optional),limit(int, optional). - Response Model:
List[sensor_schema.SensorBox] - Caching: Ja, mit
list_sensors_key_builder, expire 900 Sekunden.
- Zweck: Ruft Details einer spezifischen Sensorbox ab.
- Parameter:
box_id(str, Pfadparameter). - Response Model:
sensor_schema.SensorBox - Caching: Ja, mit
box_detail_key_builder, expire 900 Sekunden. - Fehler:
404 Not Found, wenn die Box nicht existiert.
- Zweck: Ruft alle Sensoren ab, die zu einer spezifischen Sensorbox gehören.
- Parameter:
box_id(str, Pfadparameter),skip(int, optional),limit(int, optional). - Response Model:
List[sensor_schema.Sensor] - Caching: Ja, mit
sensors_for_box_key_builder, expire 900 Sekunden. - Fehler:
404 Not Found, wenn die Box nicht existiert.
- Zweck: Ruft Rohdatenpunkte für einen spezifischen Sensor ab.
- Parameter:
sensor_id(str, Pfadparameter),from_date(datetime, optional),to_date(datetime, optional),skip(int, optional),limit(int, optional). - Response Model:
List[sensor_schema.SensorData] - Caching: Ja, mit
raw_data_key_builder, expire 900 Sekunden. - Fehler:
404 Not Found, wenn der Sensor nicht existiert.
- Zweck: Ruft tägliche Zusammenfassungen (Min, Max, Avg, Count) für einen spezifischen Sensor in einem Zeitraum ab.
- Parameter:
sensor_id(str, Pfadparameter),from_date(datetime, Query-Parameter, erforderlich),to_date(datetime, Query-Parameter, erforderlich). - Response Model:
sensor_schema.SensorDataDailySummaries - Caching: Ja, mit
summary_stats_key_builder, expire 900 Sekunden. - Fehler:
404 Not Found, wenn der Sensor nicht existiert.
- Zweck: Ruft statistische Kennzahlen (Avg, Min, Max, Count, StdDev) für einen spezifischen Sensor in einem Zeitraum ab.
- Parameter:
sensor_id(str, Pfadparameter),from_date(datetime, Query-Parameter, erforderlich),to_date(datetime, Query-Parameter, erforderlich). - Response Model:
sensor_schema.SensorDataStatistics - Caching: Ja, mit
summary_stats_key_builder, expire 900 Sekunden. - Fehler:
404 Not Found, wenn der Sensor nicht existiert.
- Zweck: Ruft aggregierte Daten für einen spezifischen Sensor mit flexiblem Intervall und Aggregationstyp ab. Unterstützt optional Glättung und Interpolation.
- Parameter:
sensor_id(str, Pfadparameter),from_date(datetime, Query-Parameter, erforderlich),to_date(datetime, Query-Parameter, erforderlich),interval(str, erforderlich),aggregation_type(str, erforderlich),smoothing_window(int, optional),interpolation_method(str, optional). - Response Model:
sensor_schema.SensorDataAggregatedResponse - Caching: Ja, mit
aggregate_key_builder, expire 900 Sekunden. - Fehler:
404 Not Found, wenn der Sensor nicht existiert;400 Bad Requestbei ungültigen Aggregationsparametern. - Besonderheit: Loggt, wenn ein Fallback auf Rohdatenaggregation erfolgt, falls keine kontinuierliche Aggregation verwendet werden kann.
Diese Datei definiert die API-Endpunkte für den Zugriff auf trainierte ML-Modelle und zur Generierung von Temperaturvorhersagen.
Die predictions.py-Datei enthält FastAPI-Router-Definitionen, die es Clients ermöglichen, Informationen über trainierte Vorhersagemodelle abzurufen und aktuelle Temperaturvorhersagen zu generieren. Die Vorhersagefunktion integriert den Abruf historischer Daten, Feature Engineering und die Anwendung der trainierten Modelle.
logging: Für die Protokollierung.os,joblib: Für Dateisystemoperationen und das Laden von Modellen.datetime,timedelta,timezone: Für Datums-/Zeit-Manipulationen.typing: Für Type-Hints.fastapi: FastAPI-Core-Komponenten.pydantic: Für Datenvalidierungsmodelle.sqlalchemy.orm.Session: Für die Datenbank-Session.pandas,numpy: Für Datenmanipulation.custom_types.prediction.TrainedModel: Das SQLAlchemy-Modell für trainierte Modelle.utils.db_session.get_db: Dependency-Funktion zum Abrufen der Datenbank-Session.utils.feature_enhancer: Für Solar- und Wetter-Features.utils.data_transformations.create_features_for_prediction: Zum Erstellen von Features für die Vorhersage.shared.crud.crud_sensor: CRUD-Operationen für Sensordaten.
MODEL_PATH(str): Der Basispfad, unter dem die trainierten Modelle gespeichert sind (/app/backend/models).FORECAST_HORIZON(int): Die Anzahl der Stunden, für die eine Vorhersage generiert werden soll (48).TEMPERATURE_SENSOR_ID(str): Die ID des Temperatursensors, für den Vorhersagen erstellt werden ("5faeb5589b2df8001b980307").TIMEZONE(str): Die Zeitzone für Datums-/Zeit-Konvertierungen ("Europe/London").
ModelResponse: Beschreibt die Struktur der Daten, die für ein einzelnes trainiertes Modell zurückgegeben werden.PredictionPoint: Beschreibt einen einzelnen Datenpunkt in der Vorhersage (Zeitstempel, Wert, Typ: historisch/vorhergesagt).PredictionResponse: Beschreibt die Gesamtstruktur der Vorhersage-Antwort (Liste vonPredictionPoints,last_updated-Zeitstempel, Nachricht).
loaded_models = {"models": {}, "timestamp": None}: Ein globales Dictionary zum Zwischenspeichern der geladenen Modelle, um wiederholtes Laden vom Dateisystem zu vermeiden.
load_prediction_models(db: Session) -> Dict[int, Any]:- Zweck: Lädt alle trainierten Modelle aus der Datenbank und den zugehörigen
.joblib-Dateien vom Dateisystem. - Logik: Prüft, ob Modelle bereits im Cache sind und ob der Cache noch gültig ist (gültig für 15 Minuten). Wenn nicht, werden alle
TrainedModel-Einträge aus der Datenbank abgefragt. Für jeden Datenbankeintrag wird der Dateipfad korrigiert (z.B. von einem Trainingspfad zu einem Backend-Pfad) und das Modell mitjoblib.load()geladen. Geladene Modelle werden imloaded_models-Cache gespeichert.
- Zweck: Lädt alle trainierten Modelle aus der Datenbank und den zugehörigen
- Zweck: Gibt eine Liste aller trainierten Modelle und ihrer Metriken aus der Datenbank zurück.
- Response Model:
List[ModelResponse] - Logik: Fragt alle
TrainedModel-Einträge ab und gibt sie sortiert nach Vorhersagehorizont zurück.
- Zweck: Erstellt und liefert eine neue Temperaturvorhersage, kombiniert mit historischen Daten für einen Plot.
- Response (Schema): Gibt ein Dictionary zurück, das den
PredictionResponseModellen ähnelt, aber Zeitstempel als ISO-Strings enthält. - Logik:
- Modellladen: Lädt die Vorhersagemodelle über
load_prediction_models. - Historische Daten abrufen: Ruft die neuesten historischen Temperatursensordaten (letzten 31 Tage) aus der Datenbank über
crud_sensor.sensor_data.get_aggregated_data_by_sensor_idab. Die Daten werden stündlich aggregiert (1h,avg). - Datenaufbereitung: Konvertiert die abgerufenen Daten in ein Pandas DataFrame, setzt den Zeitstempel als Index und sortiert/interpoliert fehlende Werte.
- Feature Engineering: Verwendet
create_features_for_prediction, um die Features für den letzten verfügbaren historischen Zeitpunkt zu generieren. - Vorhersagegenerierung: Iteriert über den
FORECAST_HORIZON. Für jede Stunde wird das entsprechende Modell (models.get(h)) verwendet, um eine Vorhersage zu treffen. Wenn ein Modell oder die Vorhersage fehlschlägt, wirdNoneals Wert gesetzt. Die Zeitstempel der Vorhersagepunkte werden korrekt als UTC-datetimes berechnet und dann in ISO-Strings umgewandelt. - Kombination: Kombiniert die historischen Datenpunkte und die generierten Vorhersagepunkte zu einer einzigen Liste.
- Antwort: Gibt ein Dictionary zurück, das diese kombinierte Liste (
plot_data), den Zeitstempel der letzten Aktualisierung (last_updated) und eine Erfolgsmeldung enthält.
- Modellladen: Lädt die Vorhersagemodelle über
- Fehlerbehandlung: Fängt
HTTPExceptions (z.B. wenn keine Modelle oder historischen Daten gefunden werden) ab und reicht sie weiter. Fängt andere unerwartete Fehler ab, loggt sie detailliert (inkl. Traceback) und gibt eine generische500 Internal Server Error-Antwort zurück. - Zeitstempel-Handling: Stellt sicher, dass alle Zeitstempel konsistent in UTC (
datetime.now(timezone.utc)) sind und im ISO 8601-Format für die API-Antwort konvertiert werden.
Diese Datei definiert das SQLAlchemy-Modell TrainedModel, das zur Speicherung von Metadaten und Metriken trainierter Zeitreihen-Vorhersagemodelle in einer Datenbank verwendet wird.
TrainedModel
Das TrainedModel-Modell ist eine SQLAlchemy-Klasse, die die Struktur der trained_models-Tabelle in der Datenbank abbildet. Sie dient dazu, wichtige Informationen über jedes trainierte ML-Modell zu persistieren, einschließlich seiner Identifikation, des Speicherpfads, der Trainingsmetriken und der Leistungsmetriken aus der Validierung. Das Modell unterstützt automatische Versionierung über die version_id-Spalte.
trained_models
id(Integer, Primary Key): Eindeutiger Bezeichner des Modelleintrags.model_name(String(255), Not Null, Index): Der Name des Modells.forecast_horizon_hours(Integer, Not Null, Index): Der Vorhersagehorizont in Stunden, für den dieses Modell trainiert wurde.model_path(String(512), Not Null, Unique): Der Dateipfad, unter dem das trainierte Modell gespeichert ist. Dies muss eindeutig sein.version_id(Integer, Not Null): Die Versionsnummer des Modells. Diese Spalte wird für die automatische Optimistic Concurrency Control (OCC) von SQLAlchemy verwendet.last_trained_at(DateTime, Timezone True, Server Default:func.now(), On Update:func.now()): Zeitstempel des letzten Trainings oder der letzten Aktualisierung des Modelleintrags.training_duration_seconds(Float, Nullable): Die Dauer des Trainingsprozesses in Sekunden.val_mae(Float, Nullable): Mean Absolute Error (MAE) aus der Validierung.val_rmse(Float, Nullable): Root Mean Squared Error (RMSE) aus der Validierung.val_mape(Float, Nullable): Mean Absolute Percentage Error (MAPE) aus der Validierung.val_r2(Float, Nullable): R-squared (Bestimmtheitsmaß) aus der Validierung.
- Automatische Versionierung: Die
version_id-Spalte ist als Versionszähler für SQLAlchemy konfiguriert (__mapper_args__ = {'version_id_col': version_id}). Dies ermöglicht Optimistic Concurrency Control, um Dateninkonsistenzen bei gleichzeitigen Schreibzugriffen zu vermeiden. - Zeitstempel-Automatisierung:
last_trained_atwird beim Erstellen und jeder Aktualisierung des Eintrags automatisch mit dem aktuellen Zeitstempel versehen. - Indizes:
model_nameundforecast_horizon_hourssind indiziert, um effiziente Abfragen zu ermöglichen. - Eindeutiger Modellpfad:
model_pathmuss eindeutig sein, um sicherzustellen, dass jeder Modelleintrag auf einen spezifischen, einmaligen Dateipfad verweist.
Dieser Flow ist verantwortlich für die Aufnahme von Sensordaten von einer externen API und deren Speicherung in einer Datenbank.
data_ingestion_flow
Der data_ingestion_flow ruft Metadaten und Sensordaten für eine bestimmte Box-ID ab, synchronisiert diese mit der Datenbank und speichert die Messdaten in Chunks. Er verwendet Dask als Task-Runner für parallele Datenabrufe.
box_id(str): Die ID der Box, für die Daten abgerufen werden sollen.initial_fetch_days(int, optional, Standard:365): Die Anzahl der Tage historischer Daten, die beim ersten Abruf geholt werden sollen.fetch_chunk_days(int, optional, Standard:4): Die Größe der Zeit-Chunks (in Tagen) für den Datenabruf.
fetch_box_metadata: Ruft Metadaten für die angegebene Box ab.sync_box_and_sensors_in_db: Synchronisiert Box- und Sensordaten in der Datenbank.determine_fetch_window: Bestimmt das Zeitfenster für den Datenabruf basierend auf dem aktuellen DB-Status und dem letzten Messzeitpunkt der API.fetch_store_sensor_chunk: Ruft Sensordaten für einen bestimmten Sensor innerhalb eines Zeit-Chunks ab und speichert sie.update_final_box_status: Aktualisiert den letzten Datenabruf-Zeitstempel der Box in der Datenbank.
- Die Metadaten der angegebenen
box_idwerden abgerufen. - Die Box- und Sensordaten werden mit der Datenbank synchronisiert, und der aktuelle Datenbankstatus wird ermittelt.
- Das Abruf-Zeitfenster (
from_date,to_date) wird bestimmt. Wenn keine Datenaktualisierung notwendig ist, wird der Flow beendet. - Für jeden Sensor, der mit der Box verbunden ist, werden Daten in definierten
fetch_chunk_days-Blöcken parallel abgerufen und gespeichert. - Nach erfolgreichem Abschluss aller Chunks wird der finale
last_data_fetched-Status der Box in der Datenbank aktualisiert.
- Verwendet
DaskTaskRunnerfür die Parallelisierung derfetch_store_sensor_chunk-Tasks. - Erstellt ein Markdown-Artefakt, das den Status des Datenabrufs zusammenfasst.
- Loggt detaillierte Informationen über den Fortschritt und eventuelle Fehler.
Dieser Flow orchestriert den gesamten Machine-Learning-Trainingsprozess von der Datenbeschaffung bis zur Modellvalidierung und Speicherung.
train_all_models
Der train_all_models-Flow lädt Sensordaten, erstellt daraus ML-Features und trainiert separate Modelle für verschiedene Vorhersagehorizonte. Es beinhaltet eine Validierungsphase und speichert die final trainierten Modelle sowie deren Metriken in einer Datenbank.
- Keine direkten Flow-Parameter, Konfiguration über Konstanten und importierte
settings.
FORECAST_TIME_WINDOW(int): Die maximale Anzahl von Stunden, für die Vorhersagen generiert werden (Standard:48).MODEL_PATH(Path): Der Basispfad, unter dem die trainierten Modelle gespeichert werden (Standard:/app/ml_service/models).
initialize_database: Stellt sicher, dass die Datenbanktabelle für Modelle existiert.fetch_sensor_data_for_ml: Ruft Sensordaten für ML-Trainingszwecke ab.create_ml_features: Erzeugt Features (z.B. Lag-Features, gleitende Durchschnitte, Zeit-Features) und teilt die Daten in Trainings- und Validierungssätze.train_single_model: Trainiert ein einzelnes Modell für einen spezifischen Vorhersagehorizont.generate_validation_flow(Sub-Flow): Führt eine Validierung der Modelle durch und erstellt einen Plot der Vorhersagen im Vergleich zu den tatsächlichen Werten._update_or_create_model_in_db: Aktualisiert oder erstellt einen Eintrag für ein Modell in der Datenbank mit seinen Metriken._create_beautiful_markdown: Hilfsfunktion zur Erstellung eines formatierten Markdown-Berichts der Trainingsmetriken.
- Datenbank-Initialisierung: Stellt sicher, dass die
models-Tabelle in der Datenbank existiert. - Datenbeschaffung: Ruft Rohdaten von Sensoren ab (standardmäßig für die letzten 16 Wochen). Ein Beispiel der Rohdaten wird als Markdown-Artefakt gespeichert.
- Feature Engineering: Erzeugt ML-Features aus den Rohdaten und teilt sie in
X_train,X_val,Y_targets_trainundY_targetsauf. Ein Beispiel der Features und Targets wird als Markdown-Artefakt gespeichert. - Erstes Modelltraining (für Validierung): Modelle werden auf einem Teil der Trainingsdaten (
X_train,Y_targets_train) für jeden Vorhersagehorizont bisFORECAST_TIME_WINDOWtrainiert. Die Modelle werden im Speicher gehalten. - Validierung: Der
generate_validation_flowwird als Sub-Flow aufgerufen, um Vorhersagen auf dem Validierungsdatensatz (X_val,y_val) zu generieren und einen Plot zu erstellen, der als Artefakt gespeichert wird. - Zweites Modelltraining (für Forecasting): Modelle werden auf dem gesamten verfügbaren Trainingsdatensatz (
X,Y_targets) für jeden Vorhersagehorizont erneut trainiert. Diese Modelle werden auf die Festplatte gespeichert. - Ergebnispersistenz: Die Metriken und Pfade der final trainierten Modelle werden in der Datenbank aktualisiert oder neu erstellt.
- Artefakt-Erstellung: Ein Markdown-Artefakt mit einer Zusammenfassung der Trainingsmetriken für alle Horizonte wird erstellt.
- Verwendet
DaskTaskRunnermit 3 Workern für paralleles Modelltraining. - Führt zwei separate Trainingsrunden durch: eine für die Validierung und eine für die spätere Verwendung im Forecasting.
- Speichert relevante Zwischenschritte und Endergebnisse als Prefect-Artefakte (z.B. Datenproben, Validierungsplots, Trainingsmetriken).
- Umfassende Fehlerbehandlung und Logging.
Dieser Flow dient dazu, aktuelle Temperaturvorhersagen zu generieren und visuell darzustellen.
Generate Forecast and Plot
Der generate_forecast_flow lädt zuvor trainierte Modelle, ruft die aktuellsten Daten ab, erstellt Vorhersagen für die nächsten FORECAST_TIME_WINDOW Stunden und visualisiert diese zusammen mit historischen Daten in einem Plot, der als Markdown-Artefakt gespeichert wird.
- Keine Parameter, da der Flow aktuelle Daten abruft und Vorhersagen generiert.
FORECAST_TIME_WINDOW(int): Die Anzahl der Stunden, für die eine Vorhersage generiert werden soll (Standard:48).MODEL_PATH(str): Der Pfad, unter dem die trainierten Modelle gespeichert sind (Standard:./models).
load_all_trained_models_task: Lädt alle trainierten Modelle aus dem angegebenen Pfad.get_latest_features_for_prediction_task: Ruft die neuesten Sensordaten ab und bereitet die Features für die Vorhersage vor, inklusive historischer Daten für den Plot.generate_all_predictions_task: Generiert Vorhersagen für alle Horizonte unter Verwendung der geladenen Modelle und der aktuellen Features.create_forecast_plot_task: Erstellt ein Bild des Vorhersage-Plots, das historische Daten und die generierte Vorhersage zeigt.create_markdown_artifact: Speichert den generierten Plot als Base64-kodiertes Bild in einem Markdown-Artefakt.
- Modellladen: Alle für die Vorhersage benötigten Modelle werden vom
MODEL_PATHgeladen. Falls keine Modelle gefunden werden, wird ein Fehler gemeldet und der Flow beendet. - Feature-Vorbereitung: Die aktuellsten Sensordaten werden abgerufen und zu Features für die Vorhersage transformiert. Es werden auch historische Daten für die Visualisierung gesammelt.
- Vorhersagegenerierung: Unter Verwendung der geladenen Modelle und der aktuellen Features werden Vorhersagen für die nächsten
FORECAST_TIME_WINDOWStunden generiert. - Plot-Erstellung: Ein Plot, der die historischen Daten und die generierte Vorhersage kombiniert, wird erstellt.
- Artefakt-Speicherung: Der Plot wird in ein Base64-kodiertes Bild umgewandelt und zusammen mit beschreibendem Text als Markdown-Artefakt in Prefect gespeichert.
- Überprüft, ob Modelle geladen werden konnten, bevor die Vorhersage fortgesetzt wird.
- Der generierte Plot ist direkt im Prefect UI als Artefakt sichtbar.
- Verwendet
base64Encoding, um das Bild direkt in das Markdown-Artefakt einzubetten.
Diese Datei enthält einen Prefect Flow zur Durchführung der Validierung von trainierten Machine-Learning-Modellen und zur Erstellung eines Berichts der Validierungsmetriken.
Generate Validation, Plot
Der generate_validation_flow nimmt Validierungs-Features (X_val), die zugehörigen tatsächlichen Werte (y_val) und ein Dictionary von trainierten Modellen entgegen. Er iteriert über die Validierungsdaten, generiert Vorhersagen für jeden Horizont und vergleicht diese mit den tatsächlichen Werten. Anschließend berechnet er wichtige Metriken wie Mean Absolute Error (MAE), Mean Absolute Percentage Error (MAEP) und R-squared (R2) für jeden Vorhersagehorizont. Die Ergebnisse werden in einem Markdown-Artefakt für Prefect visualisiert.
X_val(Union[np.ndarray, pd.DataFrame, pd.Series]): Die Feature-Daten für die Validierung.y_val(Union[np.ndarray, pd.DataFrame, pd.Series]): Die tatsächlichen Zielwerte für die Validierung. Muss eine Pandas Series oder ein DataFrame mit genau einer Spalte sein.trained_models(Dict[int, Any]): Ein Dictionary, bei dem die Schlüssel die Vorhersagehorizonte in Stunden und die Werte die geladenen, trainierten Modelle sind.
pd.DataFrame: Ein Pandas DataFrame, das die berechneten Metriken (MAE, MAEP, R2) pro Vorhersagehorizont enthält.
MODEL_PATH(str): (Wird im Flow selbst nicht direkt verwendet, aber im Kontext der Datei deklariert) Basispfad für Modelle (Standard:./models).FORECAST_TIME_WINDOW(int): Die maximale Anzahl von Stunden, für die Vorhersagen und Metriken generiert werden (Standard:48).
calculate_robust_maep(y_true, y_pred): Eine Funktion, die einen robusten Mean Absolute Percentage Error (MAEP) berechnet. Sie filtertNaN-Werte und vermeidet Division durch Null durch Hinzufügen eines kleinen Offsets zum Nenner.
generate_all_predictions_task: Ein Task, der Vorhersagen für gegebene Features und Modelle generiert.
- Eingabevalidierung: Überprüft, ob
X_valodery_valleer sind oder ob keine Modelle übergeben wurden. Stellt sicher, dassy_valeine Pandas Series ist. - Iterative Vorhersage: Der Flow iteriert über jede Zeile (jeden potenziellen Startzeitpunkt) in
X_val.- Für jede Zeile wird
generate_all_predictions_taskaufgerufen, um Vorhersagen für alle Horizonte ab diesem Startzeitpunkt zu generieren. - Die generierten Vorhersagen werden mit den entsprechenden tatsächlichen Werten aus
y_valfür jeden Horizont abgeglichen. Nur gültige (nicht-NaN) Paare von Vorhersage und tatsächlichem Wert werden gesammelt.
- Für jede Zeile wird
- Metrikberechnung: Für jeden Vorhersagehorizont (von 1 bis
FORECAST_TIME_WINDOW):- Die gesammelten tatsächlichen Werte (
all_horizon_actuals[h]) und vorhergesagten Werte (all_horizon_predictions[h]) werden verwendet. mean_absolute_errorundr2_scorewerden vonsklearn.metricsverwendet.calculate_robust_maepwird aufgerufen, um den MAEP zu berechnen.- Besondere Handhabung für den R2-Score, wenn die Standardabweichung der tatsächlichen Werte Null ist.
- Die berechneten Metriken (MAE, MAEP, R2) werden in einem Dictionary
metrics_by_horizongespeichert.
- Die gesammelten tatsächlichen Werte (
- Markdown-Artefakt: Ein Markdown-String wird erstellt, der eine formatierte Tabelle der Metriken pro Horizont enthält.
- Artefakt-Veröffentlichung: Das Markdown-Artefakt wird über
create_markdown_artifactin Prefect veröffentlicht.
- Gibt leere DataFrames zurück, wenn
X_val,y_valodertrained_modelsleer sind. - Fängt
TypeErrorab, wenny_valnicht das erwartete Format hat. - Fängt Fehler bei der Metrikberechnung ab und setzt die Metriken auf
np.nan.
Diese Datei enthält den Task zum Generieren von Vorhersagen für verschiedene Horizonte unter Verwendung trainierter Modelle.
Generate All Predictions
Der generate_all_predictions_task nimmt ein DataFrame mit den neuesten Features und ein Dictionary von trainierten Modellen entgegen. Er iteriert über einen definierten Vorhersagehorizont und generiert für jede Stunde eine separate Vorhersage mithilfe des entsprechenden Modells. Die Ergebnisse werden in einem Pandas DataFrame zusammengefasst.
current_features_df(pd.DataFrame): Ein DataFrame, das die aktuellsten Features enthält, auf denen die Vorhersagen basieren sollen.trained_models(Dict[int, Any]): Ein Dictionary, bei dem die Schlüssel die Vorhersagehorizonte in Stunden (z.B. 1 für t+1h, 2 für t+2h) und die Werte die geladenen, trainierten Modelle sind.forecast_window(int): Die maximale Anzahl von Stunden, für die Vorhersagen generiert werden sollen.prediction_start_time(pd.Timestamp): Der Startzeitpunkt, ab dem die Vorhersagen generiert werden sollen. Die Zeitstempel im Ausgabe-DataFrame werden basierend auf diesem Wert und dem Horizont berechnet.
pd.DataFrame: Ein DataFrame mit zwei Spalten:forecast_timestamp(dem Zeitstempel der Vorhersage) undpredicted_temp(dem vorhergesagten Temperaturwert). Derforecast_timestampwird als Index gesetzt.
- Es wird geprüft, ob das
current_features_dfleer ist oder keine trainierten Modelle übergeben wurden. Wenn dies der Fall ist, wird einValueErrorausgelöst. - Für jede Stunde im
forecast_window(von 1 bisforecast_window) wird versucht, das entsprechende Modell aus demtrained_models-Dictionary abzurufen. - Falls ein Modell für den aktuellen Horizont vorhanden ist, wird
model.predict()aufgerufen, um eine Vorhersage zu erhalten. - Die vorhergesagten Werte und die entsprechenden Zeitstempel werden gesammelt. Falls ein Fehler bei der Vorhersage auftritt oder kein Modell gefunden wird, wird
pd.NA(odernp.nan) als Platzhalter verwendet. - Abschließend wird ein DataFrame aus den gesammelten Vorhersagen und Zeitstempeln erstellt und der Zeitstempel als Index gesetzt.
- Fängt
ValueErrorab, wenn Eingabe-DataFrames leer sind oder keine Modelle vorhanden sind. - Fängt allgemeine
Exceptions während der Modellvorhersage ab und setzt den Vorhersagewert aufNaN. - Gibt Warnungen aus, wenn für einen bestimmten Horizont kein Modell gefunden wird.
Diese Datei definiert den Task zum Trainieren eines einzelnen Machine-Learning-Modells, spezifisch eines LightGBM Regressors, inklusive Hyperparameter-Optimierung und Metrikberechnung.
Train Single Forecasting Model with LGBM
Der train_single_model-Task trainiert ein LightGBM Regressionsmodell für einen bestimmten Vorhersagehorizont. Er führt eine Kreuzvalidierung mittels TimeSeriesSplit durch, optimiert Hyperparameter mittels GridSearchCV und berechnet wichtige Validierungsmetriken (MAE, RMSE, MAPE, R2). Das beste Modell wird gespeichert und ein Dictionary mit Trainingsergebnissen zurückgegeben.
X_train_df(pd.DataFrame): DataFrame mit den Trainings-Features.y_train_series(pd.Series): Serie mit den Zielwerten für das Training.horizon_hours(int): Der Vorhersagehorizont in Stunden, für den dieses Modell trainiert wird.base_save_path(str): Der Basispfad, unter dem das trainierte Modell gespeichert werden soll.return_model_object(Optional[bool], Standard:True): WennTrue, wird neben dem Ergebnis-Dictionary auch das trainierte Modellobjekt zurückgegeben.tscv_n_splits(Optional[int], Standard:3): Die Anzahl der Splits für dieTimeSeriesSplit-Kreuzvalidierung.
Union[Tuple[Dict[str, Any], lgb.LGBMRegressor], Dict[str, Any]]: Ein Dictionary mit Trainingsmetriken und Metadaten. Optional wird zusätzlich das trainiertelgb.LGBMRegressor-Objekt zurückgegeben, wennreturn_model_objectaufTruegesetzt ist. Im Fehlerfall wird ein Dictionary mit derforecast_horizon_hoursund der Fehlermeldung zurückgegeben.
- Ein
LGBMRegressor-Modell wird initialisiert und eineTimeSeriesSplit-Instanz für die Kreuzvalidierung erstellt. - Ein
param_gridfürGridSearchCVwird definiert, um die besten Hyperparameter zu finden. GridSearchCVwird mit den definierten Parametern und Metriken (neg_rmse,neg_mae,neg_mape,r2) konfiguriert und auf den Trainingsdaten (X_train_df,y_train_series) gefittet.- Nach dem Training werden die besten Parameter und das beste Estimator-Modell extrahiert.
- Die Validierungsmetriken (MAE, RMSE, MAPE, R2) werden manuell über alle Folds der Kreuzvalidierung berechnet, um eine konsistente Metrik zu gewährleisten.
- Das trainierte Modell (
best_estimator) wird imbase_save_pathunter einem spezifischen Dateinamen (temp_forecast_lgbm_model_h{horizon_hours}.joblib) gespeichert. - Ein Ergebnis-Dictionary mit allen relevanten Metadaten und Metriken wird erstellt und zurückgegeben.
- Fängt allgemeine
Exceptions während des Trainingsprozesses ab und gibt ein Fehler-Dictionary zurück. - Loggt Warnungen, wenn MAE größer als RMSE ist, was auf Datenqualitätsprobleme hindeuten kann.
- Nutzt Prefect-eigene Retries, um bei temporären Problemen das Training erneut zu versuchen.
Diese Datei enthält Tasks zur Synchronisierung von Sensorbox- und Sensordaten sowie zur Aktualisierung des finalen Status des Datenabrufs in der Datenbank.
Sync Box and Sensors in DBUpdate Final Box Status
Dieser Task ist dafür zuständig, die Metadaten einer Sensorbox und ihrer zugehörigen Sensoren in der Datenbank abzugleichen. Er prüft, ob eine Box bereits existiert, erstellt sie gegebenenfalls und synchronisiert dann die Liste ihrer Sensoren. Zudem wird das Feld lastMeasurementAt der Box in der Datenbank aktualisiert.
Dieser Task aktualisiert den last_data_fetched-Zeitstempel einer Sensorbox in der Datenbank. Er berücksichtigt die Ergebnisse der einzelnen Datenabruf-Chunks, um sicherzustellen, dass der Zeitstempel nur bis zum letzten erfolgreich verarbeiteten Datenpunkt aktualisiert wird.
box_metadata(Dict[str, Any]): Ein Dictionary, das die Metadaten der Sensorbox von der API enthält.initial_fetch_days(int): Die Anzahl der Tage, die zurückgerechnet werden sollen, um den initialenlast_data_fetched-Zeitstempel für eine neue Box zu bestimmen.
box_id(str): Die ID der Sensorbox, deren Status aktualisiert werden soll.overall_to_date(datetime): Das Enddatum des gesamten Abrufzeitfensters, das für die Aktualisierung verwendet werden soll, wenn alle Chunks erfolgreich waren.fetch_results(List[Dict[str, Any]]): Eine Liste von Dictionaries, die die Ergebnisse der einzelnenfetch_store_sensor_chunk-Tasks enthalten. Jedes Ergebnis-Dictionary solltesuccess(bool) undlast_timestamp_in_chunk(datetime) enthalten.
Dict[str, Any]: Ein Dictionary, das den aktuellen Zustand der Box in der Datenbank widerspiegelt, einschließlichbox_id,db_last_measurement_at,db_last_data_fetchedundsensor_ids.
None
- Eine Datenbank-Session wird geöffnet.
- Es wird versucht, die Sensorbox anhand der
box_idaus der Datenbank abzurufen. - Wenn die Box nicht existiert: Eine neue Box wird erstellt. Der
last_data_fetched-Zeitstempel wird initial aufinitial_fetch_daysvor demlastMeasurementAtder API gesetzt. Pydantic-Schemas werden zur Validierung der Daten vor der Erstellung verwendet. - Sensorsynchronisation: Bestehende Sensoren für die Box werden erfasst. Für jeden Sensor in den API-Metadaten wird geprüft, ob er bereits in der DB existiert. Nicht vorhandene Sensoren werden erstellt und mit der Box verknüpft.
- Das
lastMeasurementAt-Feld der Box in der Datenbank wird aktualisiert, falls der Wert aus den API-Metadaten neuer ist. - Ein Status-Dictionary mit relevanten DB-Informationen (inkl. aller synchronisierten
sensor_ids) wird zurückgegeben.
- Der Task iteriert durch die
fetch_results, um den spätesten erfolgreichen Zeitstempel (latest_successful_ts) in den abgerufenen Chunks zu finden. - Eine Datenbank-Session wird geöffnet und die Sensorbox abgerufen.
- Wenn alle Chunks erfolgreich waren:
last_data_fetchedwird auf dasoverall_to_datedes Flows gesetzt. - Wenn Fehler aufgetreten sind:
last_data_fetchedwird auf denlatest_successful_ts(oder den vorherigen Wert, falls kein neuerer Erfolg vorliegt) gesetzt, um den Fortschritt bis zum Fehlerpunkt zu speichern. - Das
last_data_fetched-Feld der Box wird in der Datenbank aktualisiert, falls ein gültiger und neuerer Zeitstempel ermittelt wurde.
- Beide Tasks nutzen Prefect-Logging und fangen
SQLAlchemyErrors sowie allgemeineExceptions ab, um detaillierte Fehlermeldungen zu protokollieren und den Fehler weiterzuleiten. sync_box_and_sensors_in_dbprüft auf fehlende Box-IDs in Metadaten und Validierungsfehler bei der Payload-Erstellung.update_final_box_statushandhabt Fälle, in denen keine Fetch-Ergebnisse vorliegen oder die Box nicht in der DB gefunden wird.
Diese Datei enthält Tasks zum Abrufen von Metadaten für Sensorboxen und von Messdaten für Sensoren von der OpenSenseMap API.
Fetch OpenSenseMap Box MetadataFetch and Store Sensor ChunkGet Sensor Data for ML
Ruft die Metadaten einer bestimmten Sensorbox von der OpenSenseMap API ab.
Holt Messdaten für einen spezifischen Sensor innerhalb eines definierten Zeitbereichs (Chunk) von der OpenSenseMap API, parst diese und speichert sie in der Datenbank.
Ruft aggregierte stündliche Sensordaten für einen längeren Zeitraum (z.B. mehrere Wochen) von einer Backend-API ab und bereitet sie als Pandas DataFrame für Machine-Learning-Zwecke auf.
box_id(str): Die eindeutige ID der Sensorbox.
sensor_id(str): Die ID des Sensors, dessen Daten abgerufen werden sollen.box_id(str): Die ID der Box, zu der der Sensor gehört.chunk_from_date(datetime): Der Startzeitpunkt (inklusive) des Daten-Chunks.chunk_to_date(datetime): Der Endzeitpunkt (exklusive) des Daten-Chunks.
weeks(int, Standard:8): Die Anzahl der Wochen historischer Daten, die abgerufen werden sollen.
Dict[str, Any]: Ein Dictionary, das die JSON-Antwort der API mit den Metadaten der Box enthält.
Dict[str, Any]: Ein Ergebnis-Dictionary, dassensor_id,chunk_from,chunk_to,success(bool),points_fetched(Anzahl der gespeicherten Punkte) undlast_timestamp_in_chunk(der späteste Zeitstempel im Chunk) enthält.
pd.DataFrame: Ein Pandas DataFrame mit den Spaltenmeasurement_timestamp(als Index) undtemperatur, sortiert nach Zeitstempel.
- Baut die API-URL für die spezifische Box-ID zusammen.
- Führt einen GET-Request an die OpenSenseMap API aus.
- Überprüft den HTTP-Statuscode und parst die JSON-Antwort.
- Initialisiert ein Ergebnis-Dictionary und formatiert die
chunk_from_dateundchunk_to_datefür den API-Aufruf. - Sendet einen GET-Request an die OpenSenseMap API, um die Sensordaten für den definierten Zeitbereich abzurufen.
- Iteriert durch die erhaltenen Messwerte: parst den Zeitstempel und den Wert, konvertiert den Wert zu Float und stellt sicher, dass der Zeitstempel UTC ist. Erstellt Pydantic-Schemas für jeden Datenpunkt.
- Verwendet
crud_sensor.sensor_data.create_multium die validierten Datenpunkte in einem Bulk-Insert in der Datenbank zu speichern. - Aktualisiert das Ergebnis-Dictionary mit der Anzahl der gespeicherten Punkte und dem spätesten Zeitstempel im Chunk.
- Bestimmt den Start- und Endzeitpunkt für den Datenabruf basierend auf der aktuellen Zeit und dem
weeks-Parameter. - Formuliert die API-Anfrage an den Backend-Aggregations-Endpunkt für stündliche Durchschnittswerte.
- Führt den GET-Request aus und verarbeitet die JSON-Antwort.
- Wandelt die aggregierten Daten in ein Pandas DataFrame um, benennt Spalten um und setzt den Zeitstempel als Index.
- Alle Tasks implementieren spezifische Fehlerbehandlung für
requests.exceptions(HTTPError, ConnectionError, Timeout, RequestException) undJSONDecodeError. fetch_store_sensor_chunkfängt auchValueError,TypeErrorundKeyErrorwährend des Parsens einzelner Messwerte ab und loggt Warnungen.fetch_store_sensor_chunkundfetch_sensor_data_for_mlnutzen Prefect-eigene Retries, um bei temporären Netzwerkproblemen erneut zu versuchen.- Bei fehlenden Box-IDs oder leeren Daten wird ein
ValueErrorausgelöst.
Diese Datei enthält den Task zum Erstellen von Machine-Learning-Features und Zielvariablen aus stündlichen Temperaturdaten.
Create ML Features and Targets
Der create_ml_features-Task transformiert ein DataFrame mit stundengenauen Temperaturdaten in ein Set von Features und entsprechenden Zielvariablen, die für das Training von Zeitreihen-Vorhersagemodellen geeignet sind. Dies beinhaltet die Generierung von Zeit-basierten Features (Sin/Cos der Stunde), Lag-Features der Temperatur und anderer relevanter Spalten, Rolling-Window-Statistiken sowie die Erstellung von Target-Variablen für verschiedene Vorhersagehorizonte.
df_hourly(pd.DataFrame): Ein Pandas DataFrame mit stündlichen Temperaturdaten. Es muss einenDatetimeIndexhaben und eine Spalte namenstemperaturenthalten.
Dict[str, pd.DataFrame]: Ein Dictionary, das die folgenden Pandas DataFrames enthält:X_train: Features für den Trainingsdatensatz.X_val: Features für den Validierungsdatensatz.X: Alle Features (vor der Aufteilung in Train/Val).Y_targets_train: Zielvariablen für den Trainingsdatensatz.Y_targets: Alle Zielvariablen (vor der Aufteilung in Train/Val).original_features_df: Das DataFrame, das alle ursprünglichen und generierten Features sowie Targets enthält, bevor NaNs entfernt wurden.
FORECAST_TIME_WINDOW(int): Die maximale Anzahl von Stunden, für die Vorhersage-Targets erstellt werden (Standard:48).LATITUDE(float): Breitengrad für die Generierung von Solar-Features (Standard:52.019364).LONGITUDE(float): Längengrad für die Generierung von Solar-Features (Standard:-1.73893).TIMEZONE(str): Zeitzone für die Datums-/Zeitberechnungen (Standard:"Europe/London").
- Validierung: Überprüft, ob der Eingabe-DataFrame einen
DatetimeIndexund einetemperatur-Spalte hat. - Externe Features: Ruft Solar-Features (z.B. Sonneneinstrahlung) und Wetter-Features (z.B. Wettertemperatur, Bewölkung) ab und verbindet sie mit dem Haupt-DataFrame.
- Zeit-Features: Erstellt Sinus- und Kosinus-Transformationen der Stunden, um die zyklische Natur der Zeit abzubilden.
- Lag-Features: Erzeugt verzögerte (gelaggte) Versionen der
temperatur-Spalte und ausgewählter externer Features für verschiedene Zeitverzögerungen (z.B. 1h, 24h). - Rolling Window Features: Berechnet gleitende Durchschnitte und Standardabweichungen der Temperatur über verschiedene Fenstergrößen (z.B. 3h, 24h).
- Temperaturdifferenzen: Erstellt Features, die die Temperaturänderung über verschiedene Zeitintervalle darstellen.
- Target-Variablen: Erzeugt
target_temp_plus_Xh-Spalten, indem dietemperatur-Spalte umXStunden in die Zukunft verschoben wird. Dies sind die Werte, die das Modell vorhersagen soll. - NaN-Entfernung: Zeilen, die aufgrund der Feature-Generierung (z.B. durch Shifting)
NaN-Werte enthalten, werden entfernt. - Datenaufteilung: Die Daten werden in Feature- (
X) und Target-DataFrames (Y_targets) unterteilt. Zusätzlich werdenX_train,X_valundY_targets_trainbasierend auf einem festgelegten Zeitraum für die Validierung erstellt.
- Löst
ValueErroraus, wenn der Eingabe-DataFrame nicht die erwartete Struktur (DatetimeIndex, 'temperatur'-Spalte) hat. - Gibt eine Warnung aus, wenn nach dem Feature Engineering und der NaN-Entfernung keine Daten mehr übrig sind.
Diese Datei enthält den Task zum Laden von zuvor trainierten Machine-Learning-Modellen aus dem Dateisystem.
Load All Trained Models
Der load_all_trained_models_task durchsucht ein angegebenes Verzeichnis nach serialisierten Modellobjekten für verschiedene Vorhersagehorizonte. Er lädt jedes gefundene Modell und speichert es in einem Dictionary, wobei der Horizont als Schlüssel dient.
model_base_path(str): Der Basispfad des Verzeichnisses, in dem die Modelle gespeichert sind.forecast_window(int): Die maximale Anzahl von Stunden, für die Modelle geladen werden sollen (definiert den Bereich der Horizonte).
Dict[int, Any]: Ein Dictionary, bei dem die Schlüssel die Vorhersagehorizonte (z.B. 1, 2, ...,forecast_window) und die Werte die geladenen Modellobjekte sind. Falls ein Modell für einen Horizont nicht gefunden oder nicht geladen werden kann, ist der Wert für diesen HorizontNoneim Dictionary.
FORECAST_TIME_WINDOW(int): Standard-Vorhersagefenster, wenn nicht anders angegeben (Standard:12).MODEL_PATH(str): Standardpfad, unter dem die Modelle gespeichert sind (Standard:"./models").
- Initialisiert ein leeres Dictionary
trained_models. - Iteriert von Horizont
1bisforecast_window. - Für jeden Horizont wird ein erwarteter Dateiname (
temp_forecast_lgbm_model_h{h}.joblib) konstruiert. - Es wird geprüft, ob die Modelldatei existiert. Wenn ja, wird
joblib.load()verwendet, um das Modell zu laden, und es wird imtrained_models-Dictionary gespeichert. - Wenn die Datei nicht existiert oder ein Fehler beim Laden auftritt, wird eine Warnung/Fehlermeldung protokolliert und der Wert für diesen Horizont im Dictionary auf
Nonegesetzt. - Nach dem Durchlaufen aller Horizonte wird geprüft, ob überhaupt Modelle erfolgreich geladen wurden. Wenn nicht, wird ein
FileNotFoundErrorausgelöst.
- Fängt
FileNotFoundErrorab, wenn keine Modelle geladen werden konnten. - Fängt allgemeine
Exceptions beim Laden einzelner Modelle ab und loggt detaillierte Fehlermeldungen.
Diese Datei enthält einen Task zur Erstellung eines Plots, der historische Daten und Vorhersagedaten kombiniert, und gibt diesen Plot als Bytes zurück.
Create Forecast Plot
Der create_forecast_plot_task generiert eine Visualisierung, die historische Temperaturdaten und eine entsprechende Temperaturvorhersage in einem einzigen Diagramm darstellt. Der erstellte Plot wird als PNG-Bild in Bytes zurückgegeben, was nützlich für die Speicherung als Artefakt oder die direkte Anzeige ist.
historical_data_df(pd.DataFrame): Ein DataFrame mit historischen Temperaturdaten. Es sollte einenDatetimeIndexund eine Spaltetemperaturenthalten.forecast_df(pd.DataFrame): Ein DataFrame mit vorhergesagten Temperaturdaten. Es sollte einenDatetimeIndexund eine Spaltepredicted_tempenthalten.
bytes: Die Bytes des generierten Plots im PNG-Format.
- Eine neue Matplotlib-Figur wird erstellt.
- Wenn
historical_data_dfnicht leer ist, werden die historischen Temperaturen geplottet. - Wenn
forecast_dfnicht leer ist, werden die vorhergesagten Temperaturen geplottet, typischerweise mit einer gestrichelten Linie zur Unterscheidung. - Titel, Achsenbeschriftungen, Legende und Gitterlinien werden hinzugefügt.
- Der Plot wird in ein
io.BytesIO-Objekt im PNG-Format gespeichert. - Das
BytesIO-Objekt wird gelesen, um die Bild-Bytes zu erhalten, und anschließend geschlossen.
- Verwendet
matplotlib.pyplotfür die Plot-Erstellung. - Speichert den Plot direkt in einem In-Memory-Buffer (
io.BytesIO), um ihn als Bytes zurückzugeben, anstatt ihn auf der Festplatte zu speichern. - Sorgt dafür, dass die Matplotlib-Figur nach dem Speichern geschlossen wird (
plt.close()), um Speicherlecks zu vermeiden.
Diese Datei enthält einen Task, der die neuesten Sensordaten abruft und daraus die für die Vorhersage benötigten Features sowie historische Daten für die Visualisierung generiert.
Get Latest Features for Prediction
Der get_latest_features_for_prediction_task orchestriert den Abruf aktueller Sensordaten und deren Transformation in ein Format, das für die Modellanwendung geeignet ist. Er liefert die aktuellste Feature-Zeile für die Vorhersage sowie einen Teil der historischen Daten, der für die Visualisierung der Vorhersage im Kontext der Vergangenheit nützlich ist.
fetch_data_task_fn: Eine Funktion oder ein Prefect Task, der für den Abruf der rohen Sensordaten verantwortlich ist (z.B.fetch_sensor_data_for_ml).create_features_task_fn: Eine Funktion oder ein Prefect Task, der für die Erstellung von ML-Features aus den rohen Daten verantwortlich ist (z.B.create_ml_features).lookback_days_for_plot(int, Standard:7): Die Anzahl der Tage historischer Daten, die für den Plot zurückgegeben werden sollen.
Tuple[pd.DataFrame, pd.DataFrame, pd.Timestamp]: Ein Tupel, das Folgendes enthält:- Das neueste Feature-DataFrame (
latest_X_features), eine einzelne Zeile, die für die aktuelle Vorhersage verwendet wird. - Ein DataFrame mit historischen Daten (
historical_data_for_plot), das für die Visualisierung der Vorhersage im Kontext der Vergangenheit nützlich ist. - Der
prediction_start_base_time, der den Zeitstempel des letzten bekannten Datenpunkts darstellt und als Basis für die Vorhersagestempel dient.
- Das neueste Feature-DataFrame (
- Ruft die neuesten stündlichen Daten über die
fetch_data_task_fnab. - Ermittelt den aktuellsten Zeitstempel in den abgerufenen Daten (
current_time_for_features). - Ruft die
create_features_task_fnauf, um die ML-Features aus den aktuellen Daten zu generieren. - Extrahiert die neueste Zeile aus dem generierten Feature-DataFrame (
X_all_recent), die die Eingabe für die Vorhersage darstellt. Falls dercurrent_time_for_featuresnicht exakt im Index der Features gefunden wird (was durch NaN-Entfernung passieren kann), wird die allerletzte Zeile verwendet. - Filtert die
historical_data_for_plotaus denrecent_hourly_databasierend auf demlookback_days_for_plot-Parameter.
- Löst einen
ValueErroraus, wennfetch_data_task_fnkeine Daten zurückgibt oder wenn nach der Feature-Erstellung das Feature-DataFrame leer ist.
Diese Datei enthält die Logik zur Initialisierung des Datenbankschemas, insbesondere zur Überprüfung und Erstellung der trained_models-Tabelle.
initialize_database()
Die Funktion initialize_database prüft, ob die Datenbanktabelle trained_models bereits existiert. Falls die Tabelle nicht gefunden wird, erstellt sie alle Tabellen, die in der Base.metadata der SQLAlchemy-Deklaration definiert sind. Dies gewährleistet, dass das Datenbankschema vor der Nutzung korrekt eingerichtet ist.
- Ruft eine Instanz der SQLAlchemy-Engine über
get_engine_instance()ab. - Verwendet
sqlalchemy.inspect, um den Zustand der Datenbank zu überprüfen. - Ermittelt den Tabellennamen der
TrainedModel-Klasse. - Wenn die Tabelle nicht existiert (
inspector.has_tablegibtFalsezurück), werden alle inBase.metadataregistrierten Tabellen (TrainedModelist dort registriert) in der Datenbank erstellt. - Andernfalls, wenn die Tabelle bereits existiert, wird die Erstellung übersprungen.
utils.db_utils.get_engine_instance: Zum Abrufen der SQLAlchemy Engine.custom_types.prediction.Base,custom_types.prediction.TrainedModel: Das SQLAlchemy Base-Objekt und das Modell für die zu erstellende Tabelle.
Diese Datei enthält eine Hilfsfunktion zum Erstellen eines formatierten Markdown-Berichts aus den Trainingsergebnissen von ML-Modellen.
_create_beautiful_markdown(training_results: List[Dict[str, Any]], forecast_window: int) -> str
Die Funktion _create_beautiful_markdown generiert einen übersichtlichen Markdown-String, der eine Zusammenfassung der Modelltrainingsergebnisse enthält. Sie unterteilt die Ergebnisse in erfolgreich trainierte und fehlgeschlagene Modelle und stellt die Metriken der erfolgreichen Modelle in einer formatierten Tabelle dar.
training_results(List[Dict[str, Any]]): Eine Liste von Dictionaries, wobei jedes Dictionary die Trainingsergebnisse und Metriken für ein einzelnes Modell enthält (wie sie z.B. vonml_training.pyzurückgegeben werden).forecast_window(int): Die maximale Anzahl der Horizonte, für die Modelle trainiert werden sollten. Dies wird für die Zusammenfassung verwendet.
str: Ein Markdown-formatierter String, der den Bericht enthält.
- Leere Ergebnisse: Prüft, ob
training_resultsleer ist. Falls ja, wird ein einfacher Markdown-Text zurückgegeben. - Datenaufbereitung: Konvertiert die Liste der Dictionaries in ein Pandas DataFrame und filtert die ersten 48 Zeilen heraus (vermutlich, da die ersten 48 Horizonte im Kontext der Validierung im Training anders behandelt werden könnten).
- Zusammenfassungsabschnitt: Zählt die Anzahl der erfolgreichen und fehlgeschlagenen Trainings und erstellt einen einleitenden Markdown-Text.
- Erfolgreiche Trainings-Tabelle:
- Filtert die erfolgreichen Trainings aus dem DataFrame.
- Wählt relevante Spalten aus und benennt sie für die Anzeige um.
- Konvertiert
val_mapevon einem Dezimalwert in einen Prozentwert. - Formatiert die numerischen Spalten (
RMSE,MAE,MAPE,R²,Dauer) auf eine bestimmte Anzahl von Dezimalstellen. - Erstellt eine Markdown-Tabelle aus dem vorbereiteten DataFrame.
- Fehlgeschlagene Trainings-Tabelle:
- Filtert die fehlgeschlagenen Trainings aus dem DataFrame.
- Wählt die Spalten für Horizont und Fehlermeldung aus und benennt sie um.
- Erstellt eine Markdown-Tabelle.
- Zusammensetzen: Fügt alle erstellten Abschnitte (Titel, Zusammenfassung, erfolgreiche Tabelle, fehlgeschlagene Tabelle) zu einem finalen Markdown-String zusammen.
Diese Datei stellt Dienstprogramme für die Datenbankverbindung und -session-Verwaltung mit SQLAlchemy bereit.
DATABASE_URL(str): Die URL der Datenbank, aus den Einstellungen geladen.engine: Die SQLAlchemy Engine-Instanz. Wird einmalig beim Laden des Moduls initialisiert.SessionLocal: Die SQLAlchemy Session-Fabrik. Wird einmalig beim Laden des Moduls initialisiert.
get_db_session() -> Iterator[Session | None]get_engine_instance()
Beim Laden des Moduls versucht die Datei, eine SQLAlchemy Engine und eine Session-Fabrik (SessionLocal) zu erstellen. Die Engine wird mit Connection-Pooling-Parametern (pool_pre_ping, pool_size, max_overflow) konfiguriert, um eine effiziente und robuste Datenbankverbindung zu gewährleisten. Ein kurzer Verbindungstest wird durchgeführt.
Dies ist ein Context Manager, der eine SQLAlchemy Session bereitstellt. Er handhabt das Öffnen, Committen, Rollbacken und Schließen der Datenbank-Session. Er ist dafür vorgesehen, in Prefect Tasks verwendet zu werden, um Datenbankoperationen sicher auszuführen.
Diese Funktion gibt die global initialisierte SQLAlchemy Engine-Instanz zurück.
- Prüft, ob
DATABASE_URLgesetzt ist. - Wenn ja, wird
create_engineaufgerufen, um die Engine mit spezifischen Pooling-Optionen zu erstellen. sessionmakerwird verwendet, umSessionLocalals konfigurierte Session-Fabrik zu erstellen.- Ein kurzer Test der Verbindung wird durchgeführt.
- Bei Fehlern während der Initialisierung werden
engineundSessionLocalaufNonegesetzt und der Fehler geloggt.
- Überprüft, ob
SessionLocalinitialisiert wurde. Falls nicht, wird ein Fehler geloggt undNonezurückgegeben. - Erstellt eine neue Session von
SessionLocal. - Verwendet einen
try...except...finally-Block:yield session: Gibt die Session an den aufrufenden Code zurück.session.commit(): Nach erfolgreicher Ausführung des Blocks werden Änderungen festgeschrieben.session.rollback(): Im Fehlerfall werden Änderungen rückgängig gemacht.session.close(): Stellt sicher, dass die Session immer geschlossen wird, unabhängig vom Erfolg oder Misserfolg.
- Prüft, ob die
engineerfolgreich initialisiert wurde. - Gibt die
engine-Instanz zurück.
- Fängt Fehler während der
engine-Erstellung ab und loggt diese. - Der
get_db_session-Context Manager fängt alle Ausnahmen ab, führt einen Rollback durch, loggt den Fehler und gibt ihn weiter. get_engine_instancelöst einenRuntimeErroraus, wenn die Engine nicht initialisiert werden konnte.
Diese Datei enthält Funktionen zur Berechnung zusätzlicher Features aus geografischen und Wetterdaten, die zur Verbesserung von Machine-Learning-Modellen dienen.
LATITUDE(float): Breitengrad des Standorts (Standard:52.019364).LONGITUDE(float): Längengrad des Standorts (Standard:-1.73893).TIMEZONE(str): Zeitzone des Standorts (Standard:"Europe/London").
get_solar_features(df_index: pd.DatetimeIndex) -> pd.DataFrameget_weather_features(start_date: str, end_date: str) -> Optional[pd.DataFrame]
Berechnet Sonnenstand-Features (Höhe über dem Horizont und Azimut) für jeden Zeitstempel in einem gegebenen Pandas DatetimeIndex unter Verwendung der pvlib-Bibliothek und den konfigurierten Geokoordinaten. Die Werte werden in ein für ML-Modelle geeignetes Format transformiert (z.B. Sinus/Kosinus-Transformation für zyklische Werte).
Ruft historische Wetterdaten (Luftfeuchtigkeit, Bewölkung, Windgeschwindigkeit, Globalstrahlung) von der Open-Meteo Archive API für einen angegebenen Datumsbereich ab. Die abgerufenen Daten werden in ein Pandas DataFrame konvertiert und für die Nutzung in ML-Modellen vorbereitet (z.B. Umbenennung von Spalten, Zeitzonen-Handhabung).
- Erstellt ein
pvlib.location.Location-Objekt mitLATITUDE,LONGITUDEundTIMEZONE. - Verwendet
location.get_solarposition()um die Sonnenposition für jeden Zeitstempel imdf_indexzu berechnen. - Wählt relevante Spalten aus (
apparent_elevation,azimuth) und benennt sie um. - Transformiert
solar_elevationmit der Sinusfunktion undsolar_azimuthin Sinus-/Kosinus-Paare, um zyklische Abhängigkeiten für Modelle besser abzubilden und die Werte zu normalisieren.
- Definiert die API-URL für Open-Meteo und die benötigten Wettervariablen (
hourlyparameters). - Führt einen HTTP GET-Request an die Open-Meteo API aus und übergibt die Datumsbereiche und Parameter.
- Überprüft den HTTP-Statuscode (
raise_for_status). - Konvertiert die JSON-Antwort in ein Pandas DataFrame.
- Setzt den 'time'-Index und konvertiert ihn in die konfigurierte Zeitzone (
TIMEZONE), wobei Besonderheiten wie Sommer-/Winterzeitübergänge berücksichtigt werden. - Benennt die Spalten für eine bessere Lesbarkeit und Konsistenz um.
get_weather_featuresfängtrequests.exceptions.RequestExceptionab und gibtNonezurück, falls ein Fehler beim Abrufen der Wetterdaten auftritt.
Diese Datei definiert die Anwendungseinstellungen unter Verwendung von Pydantic-Settings und lädt Umgebungsvariablen für die Datenbankkonfiguration.
Settings
Die Settings-Klasse erbt von Pydantic's BaseSettings und wird verwendet, um Umgebungsvariablen und Standardwerte für die Anwendungskonfiguration zu verwalten. Sie ist speziell für Datenbankverbindungsdetails konfiguriert und generiert automatisch die DATABASE_URL und MAINTENANCE_DATABASE_URL aus den einzelnen Komponenten.
DB_USER(str): Datenbank-Benutzername.DB_PASSWORD(str): Datenbank-Passwort.DB_HOST(str): Datenbank-Host.DB_PORT(str, Standard:"5432"): Datenbank-Port.DB_NAME(str): Name der Hauptdatenbank.DATABASE_URL(str | None): Die vollständige Datenbank-Verbindungs-URL für die Hauptdatenbank. Wird automatisch generiert, wenn nicht explizit gesetzt.MAINTENANCE_DATABASE_URL(str | None): Die Datenbank-Verbindungs-URL für die Maintenance-Datenbank (normalerweisepostgres). Wird automatisch generiert, wenn nicht explizit gesetzt.INITIAL_TIME_WINDOW_IN_DAYS(int, Standard:365): Initiales Zeitfenster in Tagen für den Datenabruf.FETCH_TIME_WINDOW_DAYS(int, Standard:2): Zeitfenster in Tagen für den chunk-weisen Datenabruf.
- Pydantic-Settings: Die Klasse erbt von
BaseSettings, was das Laden von Umgebungsvariablen (und optional aus.env-Dateien) automatisiert. - URL-Generierung: Im
__init__-Method wird dieDATABASE_URLundMAINTENANCE_DATABASE_URLdynamisch aus den einzelnen Komponenten (DB_USER,DB_PASSWORD, etc.) erstellt, wobei das Passwort URL-sicher kodiert wird. .env-Datei Unterstützung:model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8', extra='ignore')konfiguriert Pydantic, um Umgebungsvariablen aus einer.env-Datei zu laden.- Globale Instanz: Eine Instanz der
Settings-Klasse (settings = Settings()) wird direkt in der Datei erstellt, um den Zugriff auf die Konfigurationseinstellungen zu erleichtern.
Diese Datei enthält einen Task, der das Zeitfenster für den Abruf von Sensordaten bestimmt.
Determine Fetch Time Window
Der determine_fetch_window-Task berechnet das Start- (from_date) und End-Datum (to_date) für den Datenabruf von Sensoren. Die Logik basiert auf dem lastMeasurementAt der API und dem db_last_data_fetched-Zeitstempel aus der Datenbank. Er stellt sicher, dass nur neue Daten abgerufen werden und verhindert redundante Abrufe.
db_box_state(Dict[str, Any]): Ein Dictionary, das den aktuellen Status der Sensorbox in der Datenbank enthält. Es solltebox_id,db_last_measurement_atunddb_last_data_fetchedenthalten.api_last_measurement_str(str | None): Der String des letzten Messzeitpunkts, wie von der API gemeldet.
Tuple[datetime | None, datetime | None]: Ein Tupel, das den Start- und Endzeitpunkt des Abruf-Zeitfensters enthält. Wenn keine neuen Daten abgerufen werden müssen, wird(None, None)zurückgegeben.
- Bestimmung des Enddatums (
to_date):- Vergleicht den
api_last_measurement_dtmit der aktuellen UTC-Zeit (now_utc). - Das
target_to_dateist entweder der API-Zeitstempel odernow_utc. - Das
actual_to_dateist das Minimum austarget_to_dateundnow_utc, um sicherzustellen, dass nicht in die Zukunft abgefragt wird. - Stellt sicher, dass das
actual_to_dateZeitzonen-aware (UTC) ist.
- Vergleicht den
- Bestimmung des Startdatums (
from_date):- Verwendet den
db_last_data_fetched-Wert ausdb_box_state. - Wandelt diesen Wert in ein Zeitzonen-aware
datetime-Objekt um.
- Verwendet den
- Prüfung auf Notwendigkeit des Abrufs:
- Wenn
actual_from_dateexistiert und größer oder gleichactual_to_dateist, sind die Daten aktuell, und es wird(None, None)zurückgegeben. - Andernfalls wird das berechnete
actual_from_dateundactual_to_datezurückgegeben.
- Wenn
- Protokolliert Warnungen, wenn
db_last_data_fetchedeinen unerwarteten Typ hat. - Stellt sicher, dass alle Datums-/Zeitobjekte im UTC-Format vorliegen, um Konsistenz zu gewährleisten.
Diese Datei enthält eine Hilfsfunktion zum Aktualisieren oder Erstellen von Modelleinträgen in der Datenbank.
_update_or_create_model_in_db(db: Session, result: dict, logger)
Die Funktion _update_or_create_model_in_db implementiert eine Upsert-Logik für Modelleinträge in der Datenbank. Sie sucht nach einem bestehenden TrainedModel-Eintrag für einen spezifischen Vorhersagehorizont. Wenn ein Eintrag gefunden wird, werden dessen Metadaten und Metriken aktualisiert. Andernfalls wird ein neuer TrainedModel-Eintrag in der Datenbank erstellt.
db(Session): Die SQLAlchemy-Datenbanksession.result(dict): Ein Dictionary, das die Trainingsergebnisse und Metadaten eines Modells enthält (z.B. den Horizont, Pfad, Metriken).logger: Ein Logger-Objekt (z.B. von Prefect'sget_run_logger()) zum Protokollieren von Informationen und Fehlern.
- Extrahiert den
forecast_horizon_hoursaus demresult-Dictionary. - Führt eine Datenbankabfrage aus, um zu prüfen, ob bereits ein
TrainedModel-Eintrag für diesen Horizont existiert. - UPDATE-Pfad: Wenn ein bestehendes Modell gefunden wird:
- Die Attribute des
db_model-Objekts (z.B.model_path,version_id,training_duration_seconds, Validierungsmetriken) werden mit den Werten aus demresult-Dictionary aktualisiert. - Der
last_trained_at-Zeitstempel wird automatisch durch dieonupdate-Konfiguration desTrainedModel-Modells in der Datenbank aktualisiert.
- Die Attribute des
- INSERT-Pfad: Wenn kein bestehendes Modell gefunden wird:
- Ein neues
TrainedModel-Objekt wird erstellt, wobei alle relevanten Informationen aus demresult-Dictionary verwendet werden. - Das neue Modell wird der Datenbank-Session hinzugefügt (
db.add(new_model)).
- Ein neues
- Protokolliert einen Fehler, wenn das
result-Dictionary den Schlüsselforecast_horizon_hoursnicht enthält.
Diese Datei enthält eine Hilfsfunktion zum sicheren Parsen von Datums-/Zeitstrings aus API-Antworten in zeitzonen-bewusste datetime-Objekte.
parse_api_datetime(date_str: str | None) -> datetime | None
Die Funktion parse_api_datetime versucht, einen Datums-/Zeitstring, der oft im ISO 8601-Format mit einem 'Z' für UTC (Zulu Time) endet, sicher in ein zeitzonen-bewusstes datetime-Objekt umzuwandeln. Sie stellt sicher, dass das resultierende datetime-Objekt immer auf UTC normalisiert ist.
date_str(str | None): Der Eingabe-String, der das Datum und die Zeit enthält. KannNonesein.
datetime | None: Ein zeitzonen-bewusstesdatetime-Objekt, das auf UTC normalisiert ist, oderNone, falls der Eingabe-String leer, ungültig oder nicht parsenbar ist.
- Leere Eingabe: Wenn
date_strNoneoder ein leerer String ist, wirdNonezurückgegeben. - 'Z' Handhabung: Wenn der String mit 'Z' endet (was UTC anzeigt), wird das 'Z' durch
+00:00ersetzt, um es mitdatetime.fromisoformatkompatibel zu machen. - Parsing:
datetime.fromisoformatwird verwendet, um den String zu parsen. - Zeitzonen-Normalisierung:
- Wenn das geparste
datetime-Objekt keine Zeitzoneninformationen hat (dt.tzinfo is None), wird es als UTC angenommen. - Anschließend wird das Objekt in die UTC-Zeitzone konvertiert (
dt.astimezone(timezone.utc)), um Konsistenz zu gewährleisten.
- Wenn das geparste
- Fehlerbehandlung: Ein
try...except-Block fängtValueErroroderTypeErrorab, die während des Parsens auftreten können, und gibt in diesem FallNonezurück.
Diese Datei ist die zentrale Konfigurationsdatei für Prefect-Deployments in diesem Projekt. Sie definiert Metadaten des Projekts und dient als Blaupause für die Bereitstellung von Flows.
Die prefect.yaml-Datei ist eine standardmäßige Prefect-Konfigurationsdatei, die zur Verwaltung und Bereitstellung von Flows verwendet wird. Sie enthält grundlegende Projektinformationen und Platzhalter für Build-, Push-, Pull- und Deployment-Konfigurationen. In der vorliegenden Form ist sie ein Ausgangspunkt für die Definition von Deployments, die später durch Skripte oder manuelle Anpassungen gefüllt werden können.
name: Der Name des Prefect-Projekts (vorstandsvorlagen).prefect-version: Die erforderliche Prefect-Version (2.20.0).build: Abschnitt für Build-bezogene Konfigurationen (z.B. Docker-Images). Aktuell aufnullgesetzt.push: Abschnitt für Push-bezogene Konfigurationen (z.B. Upload zu Remote-Speichern). Aktuell aufnullgesetzt.pull: Abschnitt für Pull-bezogene Konfigurationen (z.B. Klonen des Projekts von Remote-Speichern). Aktuell aufnullgesetzt.deployments: Eine Liste von Deployment-Definitionen. Im aktuellen Zustand ist ein leeres Deployment-Template vorhanden, das Felder wiename,flow_name,entrypoint,scheduleundwork_poolenthält, die später gefüllt werden können.
- Dient als zentrale, versionierte Konfigurationsdatei für Prefect-Deployments.
- Kann manuell bearbeitet oder durch Prefect CLI-Befehle (
prefect deploy) verwendet werden, um Deployments zu erstellen und zu verwalten. - Bietet einen Überblick über die Deployments, die Teil des Projekts sind.
Dieses Skript ist für die Einrichtung von Prefect Work Pools und Deployments über die Prefect API sowie für das Starten eines Prefect Process Workers zuständig.
Das run_worker.py-Skript automatisiert den Prozess des Setups der Prefect-Infrastruktur. Es interagiert direkt mit dem Prefect Server, um einen Work Pool zu erstellen (falls er nicht existiert) und mehrere Deployments für verschiedene Flows zu registrieren. Nach dem erfolgreichen Setup startet es einen lokalen ProcessWorker, der die Ausführung der geplanten Flow Runs überwacht und startet.
WORK_POOL_NAME(str): Name des zu erstellenden oder zu verwendenden Work Pools ("timeseries").DEPLOYMENT_NAME(str): Name des Haupt-Deployments für die Datenaufnahme ("timeseries-data-ingestion").FLOW_SCRIPT_PATH(Path): Pfad zum Flow-Skript (./flows/data_ingestion.py).FLOW_FUNCTION_NAME(str): Name der Flow-Funktion innerhalb des Skripts ("data_ingestion_flow").FLOW_ENTRYPOINT(str): Kombinierter Entrypoint für das Haupt-Deployment (./flows/data_ingestion.py:data_ingestion_flow).APP_BASE_PATH(Path): Der Basispfad der Anwendung im Container (/app/ml_service/).DEFAULT_BOX_ID(str): Standard-Box-ID für das Datenaufnahme-Deployment ("5faeb5589b2df8001b980304").INITIAL_FETCH_DAYS(int): Anzahl der Tage für den initialen Datenabruf (365).CHUNK_DAYS(int): Größe der Daten-Chunks in Tagen (4).INTERVAL_SECONDS(int): Intervall für die geplante Ausführung des Datenaufnahme-Flows (180Sekunden).
create_or_get_work_pool(client, name: str): Eine asynchrone Funktion, die überprüft, ob ein Work Pool mit dem gegebenen Namen existiert. Falls nicht, wird ein neuer Work Pool vom Typ "process" erstellt.main(): Die Haupt-Asynchronous-Funktion, die den gesamten Setup- und Startprozess steuert.
- Prefect Client: Stellt eine asynchrone Verbindung zum Prefect Server her.
- Work Pool Setup: Ruft
create_or_get_work_poolauf, um sicherzustellen, dass der Work Pooltimeseriesvorhanden ist. - Deployment:
timeseries-data-ingestion:- Definiert Parameter und einen Zeitplan (alle
180Sekunden) für dasdata_ingestion_flow. - Verwendet einen direkten HTTP POST-Request an die Prefect API, um das Deployment zu erstellen. Hierbei wird der
FLOW_ENTRYPOINT,APP_BASE_PATH, Parameter, Zeitpläne, Tags (ingestion,opensensemap,scheduled) und eine Beschreibung übermittelt.
- Definiert Parameter und einen Zeitplan (alle
- Deployment:
ml_training_temperature:- Definiert Tags (
ml_training) und eine Beschreibung für dastrain_all_models-Flow. - Erstellt dieses Deployment ebenfalls über einen HTTP POST-Request an die Prefect API.
- Definiert Tags (
- Deployment:
create_forecast:- Definiert Tags (
forecast) und eine Beschreibung für dasgenerate_forecast_flow. - Erstellt auch dieses Deployment über einen HTTP POST-Request an die Prefect API.
- Definiert Tags (
- Worker-Start: Initialisiert und startet einen
ProcessWorker, der an den Work Pooltimeseriesgebunden ist. Dieser Worker ist dann bereit, Flow Runs auszuführen, die Prefect für diesen Work Pool plant. - Fehlerbehandlung: Fängt
KeyboardInterruptab, um einen sauberen Exit des Workers zu ermöglichen, und loggt andere unerwartete Fehler.
- Verwendet
asynciofür die asynchrone Interaktion mit der Prefect API. - Nutzt
requestsfür direkte HTTP-Aufrufe an die Prefect API, um Deployments zu erstellen. - Das Skript ist idempotent für die Work Pool-Erstellung; es wird nur erstellt, wenn es nicht existiert.
- Die Pfade sind so konfiguriert, dass sie sowohl in der Entwicklungsumgebung als auch in Docker-Containern (
/app/ml_service/) funktionieren.
