Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move to production #4067

Merged
merged 8 commits into from
Dec 13, 2024
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-875fce3e-1734088534
tag: prod-7c9244bc-1734092169
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-875fce3e-1734088534
tag: prod-7c9244bc-1734092169
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-airqo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
tag: prod-875fce3e-1734088534
tag: prod-7c9244bc-1734092169
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-kcca.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
tag: prod-875fce3e-1734088534
tag: prod-7c9244bc-1734092169
nameOverride: ''
fullnameOverride: ''
42 changes: 42 additions & 0 deletions src/workflows/dags/dag_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,45 @@
- Kafka - devices-topic
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""

daily_measurements_clean_up_doc = """
### AirQo daily measurements data clean up
#### Purpose
Clean daily devices measurements in bigquery by removing duplicates.
#### Notes


Data sources:
- BigQuery: daily_device_measurements
Data Destinations:
- BigQuery: daily_device_measurements
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""

daily_devices_measurements_realtime_doc = """
### AirQo daily measurements data clean up
#### Purpose
Aggregate daily device measurements using hourly devices measurements stored in bigquery
#### Notes


Data sources:
- BigQuery: daily_device_measurements
Data Destinations:
- BigQuery: daily_device_measurements
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""

daily_devices_measurements_historical_doc = """
### AirQo daily measurements data clean up - historical
#### Purpose
Aggregate daily device measurements using hourly devices measurements stored in bigquery going back a couple of days.
#### Notes


Data sources:
- BigQuery: daily_device_measurements
Data Destinations:
- BigQuery: daily_device_measurements
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""
23 changes: 18 additions & 5 deletions src/workflows/dags/daily_measurements.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
from airflow.decorators import dag, task

from airqo_etl_utils.workflows_custom_utils import AirflowUtils
from datetime import timedelta
from dag_docs import (
daily_measurements_clean_up_doc,
daily_devices_measurements_realtime_doc,
daily_devices_measurements_historical_doc,
)


@dag(
"Cleanup-Daily-Measurements",
schedule="0 11 * * *",
doc_md=daily_measurements_clean_up_doc,
default_args=AirflowUtils.dag_default_configs(),
catchup=False,
tags=["daily", "cleanup"],
)
def cleanup_measurements():
import pandas as pd

@task()
@task(
provide_context=True,
retries=3,
retry_delay=timedelta(minutes=5),
)
def extract(**kwargs) -> pd.DataFrame:
from airqo_etl_utils.date import DateUtils

Expand All @@ -40,14 +51,15 @@ def cleanup_and_load(data: pd.DataFrame):
@dag(
"Realtime-Daily-Measurements",
schedule="50 * * * *",
doc_md=daily_devices_measurements_realtime_doc,
default_args=AirflowUtils.dag_default_configs(),
catchup=False,
tags=["realtime", "daily"],
)
def realtime_daily_measurements():
import pandas as pd

@task()
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract():
from airqo_etl_utils.daily_data_utils import DailyDataUtils
from airqo_etl_utils.date import date_to_str_days
Expand All @@ -69,7 +81,7 @@ def resample(data: pd.DataFrame):

return DailyDataUtils.average_data(data=data)

@task()
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def load(data: pd.DataFrame):
from airqo_etl_utils.daily_data_utils import DailyDataUtils

Expand All @@ -83,14 +95,15 @@ def load(data: pd.DataFrame):
@dag(
"Historical-Daily-Measurements",
schedule="0 0 * * *",
doc_md=daily_devices_measurements_historical_doc,
default_args=AirflowUtils.dag_default_configs(),
catchup=False,
tags=["daily", "historical"],
)
def historical_daily_measurements():
import pandas as pd

@task()
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract(**kwargs):
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.daily_data_utils import DailyDataUtils
Expand All @@ -109,7 +122,7 @@ def resample(data: pd.DataFrame):

return DailyDataUtils.average_data(data=data)

@task()
@task(retries=3, retry_delay=timedelta(minutes=5))
def load(data: pd.DataFrame):
from airqo_etl_utils.daily_data_utils import DailyDataUtils

Expand Down
Loading