-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update fix/clean up #4066
Update fix/clean up #4066
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,3 +74,45 @@ | |
- Kafka - devices-topic | ||
- <a href="https://airqo.africa/" target="_blank">AirQo</a> | ||
""" | ||
|
||
daily_measurements_clean_up_doc = """ | ||
### AirQo daily measurements data clean up | ||
#### Purpose | ||
Clean daily devices measurements in bigquery by removing duplicates. | ||
#### Notes | ||
|
||
|
||
Data sources: | ||
- BigQuery: daily_device_measurements | ||
Data Destinations: | ||
- BigQuery: daily_device_measurements | ||
- <a href="https://airqo.africa/" target="_blank">AirQo</a> | ||
""" | ||
|
||
daily_devices_measurements_realtime_doc = """ | ||
### AirQo daily measurements data clean up | ||
#### Purpose | ||
Aggregate daily device measurements using hourly devices measurements stored in bigquery | ||
#### Notes | ||
|
||
|
||
Data sources: | ||
- BigQuery: daily_device_measurements | ||
Data Destinations: | ||
- BigQuery: daily_device_measurements | ||
- <a href="https://airqo.africa/" target="_blank">AirQo</a> | ||
""" | ||
|
||
daily_devices_measurements_historical_doc = """ | ||
### AirQo daily measurements data clean up - historical | ||
#### Purpose | ||
Aggregate daily device measurements using hourly devices measurements stored in bigquery going back a couple of days. | ||
#### Notes | ||
|
||
|
||
Data sources: | ||
- BigQuery: daily_device_measurements | ||
Data Destinations: | ||
- BigQuery: daily_device_measurements | ||
- <a href="https://airqo.africa/" target="_blank">AirQo</a> | ||
""" | ||
Comment on lines
+106
to
+118
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix title and add historical processing details Similar to the realtime documentation, this needs attention:
Apply this diff to fix the title: -### AirQo daily measurements data clean up - historical
+### AirQo historical daily measurements aggregation
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,30 @@ | ||
from airflow.decorators import dag, task | ||
|
||
from airqo_etl_utils.workflows_custom_utils import AirflowUtils | ||
from datetime import timedelta | ||
from dag_docs import ( | ||
daily_measurements_clean_up_doc, | ||
daily_devices_measurements_realtime_doc, | ||
daily_devices_measurements_historical_doc, | ||
) | ||
|
||
|
||
@dag( | ||
"Cleanup-Daily-Measurements", | ||
schedule="0 11 * * *", | ||
doc_md=daily_measurements_clean_up_doc, | ||
default_args=AirflowUtils.dag_default_configs(), | ||
catchup=False, | ||
tags=["daily", "cleanup"], | ||
) | ||
def cleanup_measurements(): | ||
import pandas as pd | ||
|
||
@task() | ||
@task( | ||
provide_context=True, | ||
retries=3, | ||
retry_delay=timedelta(minutes=5), | ||
) | ||
Comment on lines
+23
to
+27
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add retry logic to cleanup_and_load task While the extract task has proper retry logic, the cleanup_and_load task is missing similar error handling. Since it performs database operations, it should also have retry capabilities. Apply this diff to add retry logic: - @task()
+ @task(retries=3, retry_delay=timedelta(minutes=5))
def cleanup_and_load(data: pd.DataFrame): Also applies to: 41-42 |
||
def extract(**kwargs) -> pd.DataFrame: | ||
from airqo_etl_utils.date import DateUtils | ||
|
||
|
@@ -40,14 +51,15 @@ def cleanup_and_load(data: pd.DataFrame): | |
@dag( | ||
"Realtime-Daily-Measurements", | ||
schedule="50 * * * *", | ||
doc_md=daily_devices_measurements_realtime_doc, | ||
default_args=AirflowUtils.dag_default_configs(), | ||
catchup=False, | ||
tags=["realtime", "daily"], | ||
) | ||
def realtime_daily_measurements(): | ||
import pandas as pd | ||
|
||
@task() | ||
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) | ||
def extract(): | ||
from airqo_etl_utils.daily_data_utils import DailyDataUtils | ||
from airqo_etl_utils.date import date_to_str_days | ||
|
@@ -69,7 +81,7 @@ def resample(data: pd.DataFrame): | |
|
||
return DailyDataUtils.average_data(data=data) | ||
|
||
@task() | ||
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) | ||
def load(data: pd.DataFrame): | ||
from airqo_etl_utils.daily_data_utils import DailyDataUtils | ||
|
||
|
@@ -83,14 +95,15 @@ def load(data: pd.DataFrame): | |
@dag( | ||
"Historical-Daily-Measurements", | ||
schedule="0 0 * * *", | ||
doc_md=daily_devices_measurements_historical_doc, | ||
default_args=AirflowUtils.dag_default_configs(), | ||
catchup=False, | ||
tags=["daily", "historical"], | ||
) | ||
def historical_daily_measurements(): | ||
import pandas as pd | ||
|
||
@task() | ||
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) | ||
def extract(**kwargs): | ||
from airqo_etl_utils.date import DateUtils | ||
from airqo_etl_utils.daily_data_utils import DailyDataUtils | ||
|
@@ -109,7 +122,7 @@ def resample(data: pd.DataFrame): | |
|
||
return DailyDataUtils.average_data(data=data) | ||
|
||
@task() | ||
@task(retries=3, retry_delay=timedelta(minutes=5)) | ||
def load(data: pd.DataFrame): | ||
from airqo_etl_utils.daily_data_utils import DailyDataUtils | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect title and enhance documentation
There are a few issues with this documentation:
Apply this diff to fix the title: