diff --git a/k8s/auth-service/values-prod.yaml b/k8s/auth-service/values-prod.yaml
index 9b20ee6db7..1545b4f160 100644
--- a/k8s/auth-service/values-prod.yaml
+++ b/k8s/auth-service/values-prod.yaml
@@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
- tag: prod-875fce3e-1734088534
+ tag: prod-7c9244bc-1734092169
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml
index 10e1d25849..cf4a5d2ec7 100644
--- a/k8s/device-registry/values-prod.yaml
+++ b/k8s/device-registry/values-prod.yaml
@@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
- tag: prod-875fce3e-1734088534
+ tag: prod-7c9244bc-1734092169
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
diff --git a/k8s/exceedance/values-prod-airqo.yaml b/k8s/exceedance/values-prod-airqo.yaml
index e76a72c5c2..e6df26b109 100644
--- a/k8s/exceedance/values-prod-airqo.yaml
+++ b/k8s/exceedance/values-prod-airqo.yaml
@@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
- tag: prod-875fce3e-1734088534
+ tag: prod-7c9244bc-1734092169
nameOverride: ''
fullnameOverride: ''
diff --git a/k8s/exceedance/values-prod-kcca.yaml b/k8s/exceedance/values-prod-kcca.yaml
index 5b8964cb9c..6c056ea15d 100644
--- a/k8s/exceedance/values-prod-kcca.yaml
+++ b/k8s/exceedance/values-prod-kcca.yaml
@@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
- tag: prod-875fce3e-1734088534
+ tag: prod-7c9244bc-1734092169
nameOverride: ''
fullnameOverride: ''
diff --git a/src/workflows/dags/dag_docs.py b/src/workflows/dags/dag_docs.py
index 56f104fef7..b9d6114601 100644
--- a/src/workflows/dags/dag_docs.py
+++ b/src/workflows/dags/dag_docs.py
@@ -74,3 +74,45 @@
- Kafka - devices-topic
- AirQo
"""
+
+daily_measurements_clean_up_doc = """
+### AirQo daily measurements data clean up
+#### Purpose
+Clean daily devices measurements in bigquery by removing duplicates.
+#### Notes
+
+
+Data sources:
+- BigQuery: daily_device_measurements
+Data Destinations:
+- BigQuery: daily_device_measurements
+- AirQo
+"""
+
+daily_devices_measurements_realtime_doc = """
+### AirQo daily measurements data clean up
+#### Purpose
+Aggregate daily device measurements using hourly devices measurements stored in bigquery
+#### Notes
+
+
+Data sources:
+- BigQuery: daily_device_measurements
+Data Destinations:
+- BigQuery: daily_device_measurements
+- AirQo
+"""
+
+daily_devices_measurements_historical_doc = """
+### AirQo daily measurements data clean up - historical
+#### Purpose
+Aggregate daily device measurements using hourly devices measurements stored in bigquery going back a couple of days.
+#### Notes
+
+
+Data sources:
+- BigQuery: daily_device_measurements
+Data Destinations:
+- BigQuery: daily_device_measurements
+- AirQo
+"""
diff --git a/src/workflows/dags/daily_measurements.py b/src/workflows/dags/daily_measurements.py
index 1be8edc821..c673c63855 100644
--- a/src/workflows/dags/daily_measurements.py
+++ b/src/workflows/dags/daily_measurements.py
@@ -1,11 +1,18 @@
from airflow.decorators import dag, task
from airqo_etl_utils.workflows_custom_utils import AirflowUtils
+from datetime import timedelta
+from dag_docs import (
+ daily_measurements_clean_up_doc,
+ daily_devices_measurements_realtime_doc,
+ daily_devices_measurements_historical_doc,
+)
@dag(
"Cleanup-Daily-Measurements",
schedule="0 11 * * *",
+ doc_md=daily_measurements_clean_up_doc,
default_args=AirflowUtils.dag_default_configs(),
catchup=False,
tags=["daily", "cleanup"],
@@ -13,7 +20,11 @@
def cleanup_measurements():
import pandas as pd
- @task()
+ @task(
+ provide_context=True,
+ retries=3,
+ retry_delay=timedelta(minutes=5),
+ )
def extract(**kwargs) -> pd.DataFrame:
from airqo_etl_utils.date import DateUtils
@@ -40,6 +51,7 @@ def cleanup_and_load(data: pd.DataFrame):
@dag(
"Realtime-Daily-Measurements",
schedule="50 * * * *",
+ doc_md=daily_devices_measurements_realtime_doc,
default_args=AirflowUtils.dag_default_configs(),
catchup=False,
tags=["realtime", "daily"],
@@ -47,7 +59,7 @@ def cleanup_and_load(data: pd.DataFrame):
def realtime_daily_measurements():
import pandas as pd
- @task()
+ @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract():
from airqo_etl_utils.daily_data_utils import DailyDataUtils
from airqo_etl_utils.date import date_to_str_days
@@ -69,7 +81,7 @@ def resample(data: pd.DataFrame):
return DailyDataUtils.average_data(data=data)
- @task()
+ @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def load(data: pd.DataFrame):
from airqo_etl_utils.daily_data_utils import DailyDataUtils
@@ -83,6 +95,7 @@ def load(data: pd.DataFrame):
@dag(
"Historical-Daily-Measurements",
schedule="0 0 * * *",
+ doc_md=daily_devices_measurements_historical_doc,
default_args=AirflowUtils.dag_default_configs(),
catchup=False,
tags=["daily", "historical"],
@@ -90,7 +103,7 @@ def load(data: pd.DataFrame):
def historical_daily_measurements():
import pandas as pd
- @task()
+ @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract(**kwargs):
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.daily_data_utils import DailyDataUtils
@@ -109,7 +122,7 @@ def resample(data: pd.DataFrame):
return DailyDataUtils.average_data(data=data)
- @task()
+ @task(retries=3, retry_delay=timedelta(minutes=5))
def load(data: pd.DataFrame):
from airqo_etl_utils.daily_data_utils import DailyDataUtils