From 020adb4df19dc429a8796430700231ca3636fadf Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Fri, 13 Dec 2024 15:04:06 +0300 Subject: [PATCH 1/6] Add docs for the daily measurements dags --- src/workflows/dags/dag_docs.py | 42 ++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/workflows/dags/dag_docs.py b/src/workflows/dags/dag_docs.py index 56f104fef7..b9d6114601 100644 --- a/src/workflows/dags/dag_docs.py +++ b/src/workflows/dags/dag_docs.py @@ -74,3 +74,45 @@ - Kafka - devices-topic - AirQo """ + +daily_measurements_clean_up_doc = """ +### AirQo daily measurements data clean up +#### Purpose +Clean daily devices measurements in bigquery by removing duplicates. +#### Notes + + +Data sources: +- BigQuery: daily_device_measurements +Data Destinations: +- BigQuery: daily_device_measurements +- AirQo +""" + +daily_devices_measurements_realtime_doc = """ +### AirQo daily measurements data clean up +#### Purpose +Aggregate daily device measurements using hourly devices measurements stored in bigquery +#### Notes + + +Data sources: +- BigQuery: daily_device_measurements +Data Destinations: +- BigQuery: daily_device_measurements +- AirQo +""" + +daily_devices_measurements_historical_doc = """ +### AirQo daily measurements data clean up - historical +#### Purpose +Aggregate daily device measurements using hourly devices measurements stored in bigquery going back a couple of days. +#### Notes + + +Data sources: +- BigQuery: daily_device_measurements +Data Destinations: +- BigQuery: daily_device_measurements +- AirQo +""" From 516e471a848c04b848ce57251b47b3d7a7f3bcf4 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Fri, 13 Dec 2024 15:05:31 +0300 Subject: [PATCH 2/6] Enable data retries using the orginally scheduled date --- src/workflows/dags/daily_measurements.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/workflows/dags/daily_measurements.py b/src/workflows/dags/daily_measurements.py index 1be8edc821..c673c63855 100644 --- a/src/workflows/dags/daily_measurements.py +++ b/src/workflows/dags/daily_measurements.py @@ -1,11 +1,18 @@ from airflow.decorators import dag, task from airqo_etl_utils.workflows_custom_utils import AirflowUtils +from datetime import timedelta +from dag_docs import ( + daily_measurements_clean_up_doc, + daily_devices_measurements_realtime_doc, + daily_devices_measurements_historical_doc, +) @dag( "Cleanup-Daily-Measurements", schedule="0 11 * * *", + doc_md=daily_measurements_clean_up_doc, default_args=AirflowUtils.dag_default_configs(), catchup=False, tags=["daily", "cleanup"], @@ -13,7 +20,11 @@ def cleanup_measurements(): import pandas as pd - @task() + @task( + provide_context=True, + retries=3, + retry_delay=timedelta(minutes=5), + ) def extract(**kwargs) -> pd.DataFrame: from airqo_etl_utils.date import DateUtils @@ -40,6 +51,7 @@ def cleanup_and_load(data: pd.DataFrame): @dag( "Realtime-Daily-Measurements", schedule="50 * * * *", + doc_md=daily_devices_measurements_realtime_doc, default_args=AirflowUtils.dag_default_configs(), catchup=False, tags=["realtime", "daily"], @@ -47,7 +59,7 @@ def cleanup_and_load(data: pd.DataFrame): def realtime_daily_measurements(): import pandas as pd - @task() + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract(): from airqo_etl_utils.daily_data_utils import DailyDataUtils from airqo_etl_utils.date import date_to_str_days @@ -69,7 +81,7 @@ def resample(data: pd.DataFrame): return DailyDataUtils.average_data(data=data) - @task() + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def load(data: pd.DataFrame): from airqo_etl_utils.daily_data_utils import DailyDataUtils @@ -83,6 +95,7 @@ def load(data: pd.DataFrame): @dag( "Historical-Daily-Measurements", schedule="0 0 * * *", + doc_md=daily_devices_measurements_historical_doc, default_args=AirflowUtils.dag_default_configs(), catchup=False, tags=["daily", "historical"], @@ -90,7 +103,7 @@ def load(data: pd.DataFrame): def historical_daily_measurements(): import pandas as pd - @task() + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract(**kwargs): from airqo_etl_utils.date import DateUtils from airqo_etl_utils.daily_data_utils import DailyDataUtils @@ -109,7 +122,7 @@ def resample(data: pd.DataFrame): return DailyDataUtils.average_data(data=data) - @task() + @task(retries=3, retry_delay=timedelta(minutes=5)) def load(data: pd.DataFrame): from airqo_etl_utils.daily_data_utils import DailyDataUtils From 3640d9dd22a3597883a3b3618f3a8956b9bb9de9 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 13 Dec 2024 15:16:49 +0300 Subject: [PATCH 3/6] Update AirQo exceedance production image tag to prod-7c9244bc-1734092169 --- k8s/exceedance/values-prod-airqo.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/exceedance/values-prod-airqo.yaml b/k8s/exceedance/values-prod-airqo.yaml index e76a72c5c2..e6df26b109 100644 --- a/k8s/exceedance/values-prod-airqo.yaml +++ b/k8s/exceedance/values-prod-airqo.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/airqo-exceedance-job - tag: prod-875fce3e-1734088534 + tag: prod-7c9244bc-1734092169 nameOverride: '' fullnameOverride: '' From b3650e23338b182615e71b4b5d75a9798c083d7d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 13 Dec 2024 15:16:57 +0300 Subject: [PATCH 4/6] Update KCCA exceedance production image tag to prod-7c9244bc-1734092169 --- k8s/exceedance/values-prod-kcca.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/exceedance/values-prod-kcca.yaml b/k8s/exceedance/values-prod-kcca.yaml index 5b8964cb9c..6c056ea15d 100644 --- a/k8s/exceedance/values-prod-kcca.yaml +++ b/k8s/exceedance/values-prod-kcca.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/kcca-exceedance-job - tag: prod-875fce3e-1734088534 + tag: prod-7c9244bc-1734092169 nameOverride: '' fullnameOverride: '' From f68c62c4093993ddcb1175e4e70cf4aa56d0615f Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 13 Dec 2024 15:17:36 +0300 Subject: [PATCH 5/6] Update auth service production image tag to prod-7c9244bc-1734092169 --- k8s/auth-service/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/auth-service/values-prod.yaml b/k8s/auth-service/values-prod.yaml index 9b20ee6db7..1545b4f160 100644 --- a/k8s/auth-service/values-prod.yaml +++ b/k8s/auth-service/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-auth-api - tag: prod-875fce3e-1734088534 + tag: prod-7c9244bc-1734092169 nameOverride: '' fullnameOverride: '' podAnnotations: {} From fdcad8e3044abfc0b267cc31778454347e9ca5fe Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 13 Dec 2024 15:17:55 +0300 Subject: [PATCH 6/6] Update device registry production image tag to prod-7c9244bc-1734092169 --- k8s/device-registry/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml index 10e1d25849..cf4a5d2ec7 100644 --- a/k8s/device-registry/values-prod.yaml +++ b/k8s/device-registry/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-device-registry-api - tag: prod-875fce3e-1734088534 + tag: prod-7c9244bc-1734092169 nameOverride: '' fullnameOverride: '' podAnnotations: {}