Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fix/clean up #4066

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/workflows/dags/dag_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,45 @@
- Kafka - devices-topic
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""

daily_measurements_clean_up_doc = """
### AirQo daily measurements data clean up
#### Purpose
Clean daily devices measurements in bigquery by removing duplicates.
#### Notes


Data sources:
- BigQuery: daily_device_measurements
Data Destinations:
- BigQuery: daily_device_measurements
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""

daily_devices_measurements_realtime_doc = """
### AirQo daily measurements data clean up
#### Purpose
Aggregate daily device measurements using hourly devices measurements stored in bigquery
#### Notes


Data sources:
- BigQuery: daily_device_measurements
Data Destinations:
- BigQuery: daily_device_measurements
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""
Comment on lines +92 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect title and enhance documentation

There are a few issues with this documentation:

  1. The title incorrectly states "data clean up" when the purpose is about aggregation
  2. The Notes section could include:
    • The aggregation method used
    • The frequency of updates
    • Any data quality checks performed

Apply this diff to fix the title:

-### AirQo daily measurements data clean up
+### AirQo daily measurements aggregation

Committable suggestion skipped: line range outside the PR's diff.


daily_devices_measurements_historical_doc = """
### AirQo daily measurements data clean up - historical
#### Purpose
Aggregate daily device measurements using hourly devices measurements stored in bigquery going back a couple of days.
#### Notes


Data sources:
- BigQuery: daily_device_measurements
Data Destinations:
- BigQuery: daily_device_measurements
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""
Comment on lines +106 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix title and add historical processing details

Similar to the realtime documentation, this needs attention:

  1. The title incorrectly mentions "data clean up"
  2. The Notes section should include:
    • The historical data retention period
    • How backfilling works
    • Any special handling for historical data

Apply this diff to fix the title:

-### AirQo daily measurements data clean up - historical
+### AirQo historical daily measurements aggregation

Committable suggestion skipped: line range outside the PR's diff.

23 changes: 18 additions & 5 deletions src/workflows/dags/daily_measurements.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
from airflow.decorators import dag, task

from airqo_etl_utils.workflows_custom_utils import AirflowUtils
from datetime import timedelta
from dag_docs import (
daily_measurements_clean_up_doc,
daily_devices_measurements_realtime_doc,
daily_devices_measurements_historical_doc,
)


@dag(
"Cleanup-Daily-Measurements",
schedule="0 11 * * *",
doc_md=daily_measurements_clean_up_doc,
default_args=AirflowUtils.dag_default_configs(),
catchup=False,
tags=["daily", "cleanup"],
)
def cleanup_measurements():
import pandas as pd

@task()
@task(
provide_context=True,
retries=3,
retry_delay=timedelta(minutes=5),
)
Comment on lines +23 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add retry logic to cleanup_and_load task

While the extract task has proper retry logic, the cleanup_and_load task is missing similar error handling. Since it performs database operations, it should also have retry capabilities.

Apply this diff to add retry logic:

-    @task()
+    @task(retries=3, retry_delay=timedelta(minutes=5))
     def cleanup_and_load(data: pd.DataFrame):

Also applies to: 41-42

def extract(**kwargs) -> pd.DataFrame:
from airqo_etl_utils.date import DateUtils

Expand All @@ -40,14 +51,15 @@ def cleanup_and_load(data: pd.DataFrame):
@dag(
"Realtime-Daily-Measurements",
schedule="50 * * * *",
doc_md=daily_devices_measurements_realtime_doc,
default_args=AirflowUtils.dag_default_configs(),
catchup=False,
tags=["realtime", "daily"],
)
def realtime_daily_measurements():
import pandas as pd

@task()
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract():
from airqo_etl_utils.daily_data_utils import DailyDataUtils
from airqo_etl_utils.date import date_to_str_days
Expand All @@ -69,7 +81,7 @@ def resample(data: pd.DataFrame):

return DailyDataUtils.average_data(data=data)

@task()
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def load(data: pd.DataFrame):
from airqo_etl_utils.daily_data_utils import DailyDataUtils

Expand All @@ -83,14 +95,15 @@ def load(data: pd.DataFrame):
@dag(
"Historical-Daily-Measurements",
schedule="0 0 * * *",
doc_md=daily_devices_measurements_historical_doc,
default_args=AirflowUtils.dag_default_configs(),
catchup=False,
tags=["daily", "historical"],
)
def historical_daily_measurements():
import pandas as pd

@task()
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract(**kwargs):
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.daily_data_utils import DailyDataUtils
Expand All @@ -109,7 +122,7 @@ def resample(data: pd.DataFrame):

return DailyDataUtils.average_data(data=data)

@task()
@task(retries=3, retry_delay=timedelta(minutes=5))
def load(data: pd.DataFrame):
from airqo_etl_utils.daily_data_utils import DailyDataUtils

Expand Down
Loading