diff --git a/k8s/auth-service/values-prod.yaml b/k8s/auth-service/values-prod.yaml index 9b20ee6db7..1545b4f160 100644 --- a/k8s/auth-service/values-prod.yaml +++ b/k8s/auth-service/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-auth-api - tag: prod-875fce3e-1734088534 + tag: prod-7c9244bc-1734092169 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml index 10e1d25849..cf4a5d2ec7 100644 --- a/k8s/device-registry/values-prod.yaml +++ b/k8s/device-registry/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-device-registry-api - tag: prod-875fce3e-1734088534 + tag: prod-7c9244bc-1734092169 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/exceedance/values-prod-airqo.yaml b/k8s/exceedance/values-prod-airqo.yaml index e76a72c5c2..e6df26b109 100644 --- a/k8s/exceedance/values-prod-airqo.yaml +++ b/k8s/exceedance/values-prod-airqo.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/airqo-exceedance-job - tag: prod-875fce3e-1734088534 + tag: prod-7c9244bc-1734092169 nameOverride: '' fullnameOverride: '' diff --git a/k8s/exceedance/values-prod-kcca.yaml b/k8s/exceedance/values-prod-kcca.yaml index 5b8964cb9c..6c056ea15d 100644 --- a/k8s/exceedance/values-prod-kcca.yaml +++ b/k8s/exceedance/values-prod-kcca.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/kcca-exceedance-job - tag: prod-875fce3e-1734088534 + tag: prod-7c9244bc-1734092169 nameOverride: '' fullnameOverride: '' diff --git a/src/workflows/dags/dag_docs.py b/src/workflows/dags/dag_docs.py index 56f104fef7..b9d6114601 100644 --- a/src/workflows/dags/dag_docs.py +++ b/src/workflows/dags/dag_docs.py @@ -74,3 +74,45 @@ - Kafka - devices-topic - AirQo """ + +daily_measurements_clean_up_doc = """ +### AirQo daily measurements data clean up +#### Purpose +Clean daily devices measurements in bigquery by removing duplicates. +#### Notes + + +Data sources: +- BigQuery: daily_device_measurements +Data Destinations: +- BigQuery: daily_device_measurements +- AirQo +""" + +daily_devices_measurements_realtime_doc = """ +### AirQo daily measurements data clean up +#### Purpose +Aggregate daily device measurements using hourly devices measurements stored in bigquery +#### Notes + + +Data sources: +- BigQuery: daily_device_measurements +Data Destinations: +- BigQuery: daily_device_measurements +- AirQo +""" + +daily_devices_measurements_historical_doc = """ +### AirQo daily measurements data clean up - historical +#### Purpose +Aggregate daily device measurements using hourly devices measurements stored in bigquery going back a couple of days. +#### Notes + + +Data sources: +- BigQuery: daily_device_measurements +Data Destinations: +- BigQuery: daily_device_measurements +- AirQo +""" diff --git a/src/workflows/dags/daily_measurements.py b/src/workflows/dags/daily_measurements.py index 1be8edc821..c673c63855 100644 --- a/src/workflows/dags/daily_measurements.py +++ b/src/workflows/dags/daily_measurements.py @@ -1,11 +1,18 @@ from airflow.decorators import dag, task from airqo_etl_utils.workflows_custom_utils import AirflowUtils +from datetime import timedelta +from dag_docs import ( + daily_measurements_clean_up_doc, + daily_devices_measurements_realtime_doc, + daily_devices_measurements_historical_doc, +) @dag( "Cleanup-Daily-Measurements", schedule="0 11 * * *", + doc_md=daily_measurements_clean_up_doc, default_args=AirflowUtils.dag_default_configs(), catchup=False, tags=["daily", "cleanup"], @@ -13,7 +20,11 @@ def cleanup_measurements(): import pandas as pd - @task() + @task( + provide_context=True, + retries=3, + retry_delay=timedelta(minutes=5), + ) def extract(**kwargs) -> pd.DataFrame: from airqo_etl_utils.date import DateUtils @@ -40,6 +51,7 @@ def cleanup_and_load(data: pd.DataFrame): @dag( "Realtime-Daily-Measurements", schedule="50 * * * *", + doc_md=daily_devices_measurements_realtime_doc, default_args=AirflowUtils.dag_default_configs(), catchup=False, tags=["realtime", "daily"], @@ -47,7 +59,7 @@ def cleanup_and_load(data: pd.DataFrame): def realtime_daily_measurements(): import pandas as pd - @task() + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract(): from airqo_etl_utils.daily_data_utils import DailyDataUtils from airqo_etl_utils.date import date_to_str_days @@ -69,7 +81,7 @@ def resample(data: pd.DataFrame): return DailyDataUtils.average_data(data=data) - @task() + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def load(data: pd.DataFrame): from airqo_etl_utils.daily_data_utils import DailyDataUtils @@ -83,6 +95,7 @@ def load(data: pd.DataFrame): @dag( "Historical-Daily-Measurements", schedule="0 0 * * *", + doc_md=daily_devices_measurements_historical_doc, default_args=AirflowUtils.dag_default_configs(), catchup=False, tags=["daily", "historical"], @@ -90,7 +103,7 @@ def load(data: pd.DataFrame): def historical_daily_measurements(): import pandas as pd - @task() + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract(**kwargs): from airqo_etl_utils.date import DateUtils from airqo_etl_utils.daily_data_utils import DailyDataUtils @@ -109,7 +122,7 @@ def resample(data: pd.DataFrame): return DailyDataUtils.average_data(data=data) - @task() + @task(retries=3, retry_delay=timedelta(minutes=5)) def load(data: pd.DataFrame): from airqo_etl_utils.daily_data_utils import DailyDataUtils