From 1803ed3bd61d1c9fc6ca8eaaee51ce829b886242 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Mon, 11 Nov 2024 15:36:36 +0300 Subject: [PATCH 1/6] Update analytics api resources --- k8s/analytics/values-prod.yaml | 8 ++++---- k8s/analytics/values-stage.yaml | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/k8s/analytics/values-prod.yaml b/k8s/analytics/values-prod.yaml index 511e545c4b..e648ebd926 100644 --- a/k8s/analytics/values-prod.yaml +++ b/k8s/analytics/values-prod.yaml @@ -1,6 +1,6 @@ namespace: production -nameOverride: '' -fullnameOverride: '' +nameOverride: "" +fullnameOverride: "" images: repositories: api: eu.gcr.io/airqo-250220/airqo-analytics-api @@ -17,8 +17,8 @@ api: podAnnotations: {} resources: limits: - cpu: 100m - memory: 600Mi + cpu: 1000m + memory: 2000Mi requests: cpu: 10m memory: 250Mi diff --git a/k8s/analytics/values-stage.yaml b/k8s/analytics/values-stage.yaml index 6f8c019008..ad2ffdf0e1 100644 --- a/k8s/analytics/values-stage.yaml +++ b/k8s/analytics/values-stage.yaml @@ -1,6 +1,6 @@ namespace: staging -nameOverride: '' -fullnameOverride: '' +nameOverride: "" +fullnameOverride: "" images: repositories: api: eu.gcr.io/airqo-250220/airqo-stage-analytics-api @@ -17,8 +17,8 @@ api: podAnnotations: {} resources: limits: - cpu: 100m - memory: 600Mi + cpu: 500m + memory: 1000Mi requests: cpu: 10m memory: 250Mi From 37f33399c303fafada7274473949b5e9f903c872 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 14 Nov 2024 22:51:16 +0300 Subject: [PATCH 2/6] Modularise for readability and easy modification --- src/analytics/api/models/events.py | 408 +++++++++++++++++------------ 1 file changed, 246 insertions(+), 162 deletions(-) diff --git a/src/analytics/api/models/events.py b/src/analytics/api/models/events.py index f527889cc8..117575a1f0 100644 --- a/src/analytics/api/models/events.py +++ b/src/analytics/api/models/events.py @@ -15,6 +15,11 @@ class EventsModel(BasePyMongoModel): + """ + This class manages data retrieval and query construction for events data, integrating device, site, + and airqloud information for specified pollutants and weather fields. + """ + BIGQUERY_AIRQLOUDS_SITES = f"`{CONFIGURATIONS.BIGQUERY_AIRQLOUDS_SITES}`" BIGQUERY_AIRQLOUDS = f"`{CONFIGURATIONS.BIGQUERY_AIRQLOUDS}`" BIGQUERY_GRIDS = f"`{CONFIGURATIONS.BIGQUERY_GRIDS}`" @@ -37,9 +42,199 @@ class EventsModel(BasePyMongoModel): DEVICES_SUMMARY_TABLE = CONFIGURATIONS.DEVICES_SUMMARY_TABLE def __init__(self, tenant): + """ + Initializes the EventsModel with default settings and mappings for limit thresholds, + and specifies collections and BigQuery table references. + + Args: + tenant (str): The tenant identifier for managing database collections. + """ self.limit_mapper = {"pm2_5": 500.5, "pm10": 604.5, "no2": 2049} + self.sites_table = self.BIGQUERY_SITES + self.airqlouds_sites_table = self.BIGQUERY_AIRQLOUDS_SITES + self.devices_table = self.BIGQUERY_DEVICES + self.airqlouds_table = self.BIGQUERY_AIRQLOUDS super().__init__(tenant, collection_name="events") + @property + def device_info_query(self): + """Generates a device information query including site_id, tenant, and approximate location details.""" + return ( + f"{self.devices_table}.site_id AS site_id, " + f"{self.devices_table}.tenant AS tenant, " + f"{self.devices_table}.approximate_latitude AS device_latitude, " + f"{self.devices_table}.approximate_longitude AS device_longitude" + ) + + @property + def device_info_query_airqloud(self): + """Generates a device information query specifically for airqlouds, excluding the site_id.""" + return ( + f"{self.devices_table}.tenant AS tenant, " + f"{self.devices_table}.approximate_latitude AS device_latitude, " + f"{self.devices_table}.approximate_longitude AS device_longitude" + ) + + @property + def site_info_query(self): + """Generates a site information query to retrieve site name and approximate location details.""" + return ( + f"{self.sites_table}.name AS site_name, " + f"{self.sites_table}.approximate_latitude AS site_latitude, " + f"{self.sites_table}.approximate_longitude AS site_longitude" + ) + + @property + def airqloud_info_query(self): + """Generates an Airqloud information query to retrieve the airqloud name.""" + return f"{self.airqlouds_table}.name AS airqloud_name" + + def add_device_join(self, data_query, filter_clause=""): + """ + Joins device information with a given data query based on device_name. + + Args: + data_query (str): The data query to join with device information. + filter_clause (str): Optional SQL filter clause. + + Returns: + str: Modified query with device join. + """ + return ( + f"SELECT {self.device_info_query}, data.* " + f"FROM {self.devices_table} " + f"RIGHT JOIN ({data_query}) data ON data.device_name = {self.devices_table}.device_id " + f"{filter_clause}" + ) + + def add_device_join_to_airqlouds(self, data_query, filter_clause=""): + """ + Joins device information with airqloud data based on site_id. + + Args: + data_query (str): The data query to join with airqloud device information. + filter_clause (str): Optional SQL filter clause. + + Returns: + str: Modified query with device-airqloud join. + """ + return ( + f"SELECT {self.device_info_query_airqloud}, data.* " + f"FROM {self.devices_table} " + f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.devices_table}.site_id " + f"{filter_clause}" + ) + + def add_site_join(self, data_query): + """ + Joins site information with the given data query based on site_id. + + Args: + data_query (str): The data query to join with site information. + + Returns: + str: Modified query with site join. + """ + return ( + f"SELECT {self.site_info_query}, data.* " + f"FROM {self.sites_table} " + f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.sites_table}.id " + ) + + def add_airqloud_join(self, data_query): + """ + Joins Airqloud information with the provided data query based on airqloud_id. + + Args: + data_query (str): The data query to join with Airqloud information. + + Returns: + str: Modified query with Airqloud join. + """ + return ( + f"SELECT {self.airqloud_info_query}, data.* " + f"FROM {self.airqlouds_table} " + f"RIGHT JOIN ({data_query}) data ON data.airqloud_id = {self.airqlouds_table}.id " + ) + + def build_query( + self, + data_table, + filter_type, + filter_value, + pollutants_query, + bam_pollutants_query, + start_date, + end_date, + frequency=None, + ): + """ + Builds a SQL query to retrieve pollutant and weather data with associated device or site information. + + Args: + data_table (str): The table name containing the main data records. + filter_type (str): Type of filter (e.g., devices, sites, airqlouds). + filter_value (list): Filter values corresponding to the filter type. + pollutants_query (str): Query for pollutant data. + bam_pollutants_query (str): Query for BAM pollutant data. + start_date (str): Start date for data retrieval. + end_date (str): End date for data retrieval. + frequency (str): Optional frequency filter. + + Returns: + str: Final constructed SQL query. + """ + if filter_type in ["devices", "device_ids", "device_names"]: + query = ( + f"{pollutants_query}, {self.device_info_query}, {self.devices_table}.name AS device_name, " + f"FROM {data_table} " + f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {data_table}.device_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.devices_table}.device_id IN UNNEST({filter_value}) " + ) + query = self.add_site_join(query) + + if frequency == "hourly": + bam_query = ( + f"{bam_pollutants_query}, {self.device_info_query}, {self.devices_table}.name AS device_name, " + f"FROM {self.BIGQUERY_BAM_DATA} " + f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id " + f"WHERE {self.BIGQUERY_BAM_DATA}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.devices_table}.device_id IN UNNEST({filter_value}) " + ) + bam_query = self.add_site_join(bam_query) + query = f"{query} UNION ALL {bam_query}" + + elif filter_type in ["sites", "site_names", "site_ids"]: + query = ( + f"{pollutants_query}, {self.site_info_query}, {data_table}.device_id AS device_name " + f"FROM {data_table} " + f"JOIN {self.sites_table} ON {self.sites_table}.id = {data_table}.site_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.sites_table}.id IN UNNEST(@filter_value) " + ) + query = self.add_device_join(query) + elif filter_type == "airqlouds": + meta_data_query = ( + f"SELECT {self.airqlouds_sites_table}.airqloud_id, " + f"{self.airqlouds_sites_table}.site_id AS site_id " + f"FROM {self.airqlouds_sites_table} " + f"WHERE {self.airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) " + ) + + meta_data_query = self.add_airqloud_join(meta_data_query) + meta_data_query = self.add_site_join(meta_data_query) + meta_data_query = self.add_device_join_to_airqlouds(meta_data_query) + + query = ( + f"{pollutants_query}, {data_table}.device_id AS device_name, meta_data.* " + f"FROM {data_table} " + f"RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"ORDER BY {data_table}.timestamp " + ) + return query + @classmethod @cache.memoize() def download_from_bigquery( @@ -50,16 +245,27 @@ def download_from_bigquery( end_date, frequency, pollutants, + data_type, weather_fields, ) -> pd.DataFrame: + """ + Retrieves data from BigQuery with specified filters, frequency, pollutants, and weather fields. + + Args: + filter_type (str): Type of filter to apply (e.g., 'devices', 'sites', 'airqlouds'). + filter_value (list): Filter values (IDs or names) for the selected filter type. + start_date (str): Start date for the data query. + end_date (str): End date for the data query. + frequency (str): Data frequency (e.g., 'raw', 'daily', 'hourly'). + pollutants (list): List of pollutants to include in the data. + data_type (str): Type of data ('raw' or 'aggregated'). + weather_fields (list): List of weather fields to retrieve. + + Returns: + pd.DataFrame: Retrieved data in DataFrame format, with duplicates removed and sorted by timestamp. + """ decimal_places = cls.DATA_EXPORT_DECIMAL_PLACES - # Data sources - sites_table = cls.BIGQUERY_SITES - airqlouds_sites_table = cls.BIGQUERY_AIRQLOUDS_SITES - devices_table = cls.BIGQUERY_DEVICES - airqlouds_table = cls.BIGQUERY_AIRQLOUDS - sorting_cols = ["site_id", "datetime", "device_name"] # Define table mapping for dynamic selection based on frequency @@ -76,8 +282,14 @@ def download_from_bigquery( bam_pollutant_columns = [] weather_columns = [] for pollutant in pollutants: - pollutant_mapping = BIGQUERY_FREQUENCY_MAPPER.get(frequency).get( - pollutant, [] + + if pollutant == "raw": + key = pollutant + else: + key = f"{pollutant}_{data_type}" + + pollutant_mapping = BIGQUERY_FREQUENCY_MAPPER.get(frequency, {}).get( + key, [] ) pollutant_columns.extend( [ @@ -86,170 +298,42 @@ def download_from_bigquery( ] ) - if pollutant == "pm2_5": + if pollutant in {"pm2_5", "pm10", "no2"}: bam_pollutant_columns.extend( - ["pm2_5 as pm2_5_raw_value", "pm2_5 as pm2_5_calibrated_value"] - ) - elif pollutant == "pm10": - bam_pollutant_columns.extend( - ["pm10 as pm10_raw_value", "pm10 as pm10_calibrated_value"] - ) - elif pollutant == "no2": - bam_pollutant_columns.extend( - ["no2 as no2_raw_value", "no2 as no2_calibrated_value"] + [f"ROUND({pollutant}, {decimal_places}) AS {key}_value"] ) - if weather_fields is not None: + if weather_fields: for field in weather_fields: - weather_mapping = WEATHER_FIELDS_MAPPER.get(field, None) - weather_columns.extend( - [ + weather_mapping = WEATHER_FIELDS_MAPPER.get(field) + if weather_mapping: + weather_columns.append( f"ROUND({data_table}.{weather_mapping}, {decimal_places}) AS {weather_mapping}" - ] - ) + ) + + selected_columns = set(pollutant_columns + weather_columns) pollutants_query = ( - f" SELECT {', '.join(map(str, set(pollutant_columns + weather_columns)))} ," - f" FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {data_table}.timestamp) AS datetime " + f"SELECT {', '.join(selected_columns)}, " + f"FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {data_table}.timestamp) AS datetime " ) + + bam_selected_columns = set(bam_pollutant_columns) bam_pollutants_query = ( - f" SELECT {', '.join(map(str, set(bam_pollutant_columns)))} ," - f" FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {cls.BIGQUERY_BAM_DATA}.timestamp) AS datetime " + f"SELECT {', '.join(bam_selected_columns)}, " + f"FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {cls.BIGQUERY_BAM_DATA}.timestamp) AS datetime " ) - if filter_type == "devices": - # Adding device information, start and end times - query = ( - f" {pollutants_query} , " - f" {devices_table}.device_id AS device_name , " - f" {devices_table}.site_id AS site_id , " - f" {devices_table}.tenant AS tenant , " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - f" FROM {data_table} " - f" JOIN {devices_table} ON {devices_table}.device_id = {data_table}.device_id " - f" WHERE {data_table}.timestamp >= '{start_date}' " - f" AND {data_table}.timestamp <= '{end_date}' " - f" AND {devices_table}.device_id IN UNNEST(@filter_value) " - ) - - bam_query = ( - f" {bam_pollutants_query} , " - f" {devices_table}.device_id AS device_name , " - f" {devices_table}.site_id AS site_id , " - f" {devices_table}.tenant AS tenant , " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - f" FROM {cls.BIGQUERY_BAM_DATA} " - f" JOIN {devices_table} ON {devices_table}.device_id = {cls.BIGQUERY_BAM_DATA}.device_id " - f" WHERE {cls.BIGQUERY_BAM_DATA}.timestamp >= '{start_date}' " - f" AND {cls.BIGQUERY_BAM_DATA}.timestamp <= '{end_date}' " - f" AND {devices_table}.device_id IN UNNEST(@filter_value) " - ) - - # Adding site information - query = ( - f" SELECT " - f" {sites_table}.name AS site_name , " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" data.* " - f" FROM {sites_table} " - f" RIGHT JOIN ({query}) data ON data.site_id = {sites_table}.id " - ) - - bam_query = ( - f" SELECT " - f" {sites_table}.name AS site_name , " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" data.* " - f" FROM {sites_table} " - f" RIGHT JOIN ({bam_query}) data ON data.site_id = {sites_table}.id " - ) - - if frequency == "hourly": - query = f"{query} UNION ALL {bam_query}" - - elif filter_type == "sites": - # Adding site information, start and end times - query = ( - f" {pollutants_query} , " - f" {sites_table}.tenant AS tenant , " - f" {sites_table}.id AS site_id , " - f" {sites_table}.name AS site_name , " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" {data_table}.device_id AS device_name , " - f" FROM {data_table} " - f" JOIN {sites_table} ON {sites_table}.id = {data_table}.site_id " - f" WHERE {data_table}.timestamp >= '{start_date}' " - f" AND {data_table}.timestamp <= '{end_date}' " - f" AND {sites_table}.id IN UNNEST(@filter_value) " - ) - - # Adding device information - query = ( - f" SELECT " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - # f" {devices_table}.device_id AS device_name , " #this column creates a duplicate column - f" data.* " - f" FROM {devices_table} " - f" RIGHT JOIN ({query}) data ON data.device_name = {devices_table}.device_id " - ) - elif filter_type == "airqlouds": - sorting_cols = ["airqloud_id", "site_id", "datetime", "device_name"] - - meta_data_query = ( - f" SELECT {airqlouds_sites_table}.tenant , " - f" {airqlouds_sites_table}.airqloud_id , " - f" {airqlouds_sites_table}.site_id , " - f" FROM {airqlouds_sites_table} " - f" WHERE {airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) " - ) - - # Adding airqloud information - meta_data_query = ( - f" SELECT " - f" {airqlouds_table}.name AS airqloud_name , " - f" meta_data.* " - f" FROM {airqlouds_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.airqloud_id = {airqlouds_table}.id " - ) - - # Adding site information - meta_data_query = ( - f" SELECT " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" {sites_table}.name AS site_name , " - f" meta_data.* " - f" FROM {sites_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {sites_table}.id " - ) - - # Adding device information - meta_data_query = ( - f" SELECT " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - f" {devices_table}.device_id AS device_name , " - f" meta_data.* " - f" FROM {devices_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {devices_table}.site_id " - ) - - # Adding start and end times - query = ( - f" {pollutants_query} , " - f" meta_data.* " - f" FROM {data_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id " - f" WHERE {data_table}.timestamp >= '{start_date}' " - f" AND {data_table}.timestamp <= '{end_date}' " - f" ORDER BY {data_table}.timestamp " - ) - + instance = cls("build_query") + query = instance.build_query( + data_table, + filter_type, + filter_value, + pollutants_query, + bam_pollutants_query, + start_date, + end_date, + frequency=frequency, + ) job_config = bigquery.QueryJobConfig( query_parameters=[ bigquery.ArrayQueryParameter("filter_value", "STRING", filter_value), From 77cdb2354cea1524ab3176a83c00de24207bc109 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 14 Nov 2024 22:52:04 +0300 Subject: [PATCH 3/6] Improve error handling --- src/analytics/api/utils/data_formatters.py | 41 +++++++++++++--------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/analytics/api/utils/data_formatters.py b/src/analytics/api/utils/data_formatters.py index e609953e5e..bace9559de 100644 --- a/src/analytics/api/utils/data_formatters.py +++ b/src/analytics/api/utils/data_formatters.py @@ -1,6 +1,6 @@ from enum import Enum from typing import Any, List, Union, Dict -import logging +from werkzeug.exceptions import BadRequest import pandas as pd import requests @@ -15,7 +15,8 @@ AQCSV_DATA_STATUS_MAPPER, ) from api.utils.http import AirQoRequests -from config import Config + +import logging logger = logging.getLogger(__name__) @@ -294,7 +295,7 @@ def device_category_to_str(device_category: str) -> str: return "" -def filter_non_private_sites(sites: List[str]) -> Dict[str, Any]: +def filter_non_private_sites(filter_type: str, sites: List[str]) -> Dict[str, Any]: """ Filters out private site IDs from a provided array of site IDs. @@ -313,17 +314,21 @@ def filter_non_private_sites(sites: List[str]) -> Dict[str, Any]: try: airqo_requests = AirQoRequests() response = airqo_requests.request( - endpoint=endpoint, body={"sites": sites}, method="post" + endpoint=endpoint, body={filter_type: sites}, method="post" ) - if response and response.get("status", None) == "success": - return response.get("data") + if response and response.get("status") == "success": + return airqo_requests.create_response( + message="Successfully returned data.", + data=response.get("data"), + success=True, + ) else: - raise RuntimeError(response.get("message")) - except RuntimeError as rex: - raise RuntimeError(f"Error while filtering non private sites {rex}") + return airqo_requests.create_response(response, success=False) + except Exception as rex: + logger.exception(f"Error while filtering non private devices {rex}") -def filter_non_private_devices(devices: List[str]) -> Dict[str, Any]: +def filter_non_private_devices(filter_type: str, devices: List[str]) -> Dict[str, Any]: """ FilterS out private device IDs from a provided array of device IDs. @@ -341,11 +346,15 @@ def filter_non_private_devices(devices: List[str]) -> Dict[str, Any]: try: airqo_requests = AirQoRequests() response = airqo_requests.request( - endpoint=endpoint, body={"devices": devices}, method="post" + endpoint=endpoint, body={filter_type: devices}, method="post" ) - if response and response.get("status", None) == "success": - return response.get("data") + if response and response.get("status") == "success": + return airqo_requests.create_response( + message="Successfully returned data.", + data=response.get("data"), + success=True, + ) else: - raise RuntimeError(response.get("message")) - except RuntimeError as rex: - raise RuntimeError(f"Error while filtering non private devices {rex}") + return airqo_requests.create_response(response, success=False) + except Exception as rex: + logger.exception(f"Error while filtering non private devices {rex}") From 0327bbcdb7c41c987cb89c474fec06a003334609 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 14 Nov 2024 22:54:59 +0300 Subject: [PATCH 4/6] Clean up, Improve readability, improve error handling --- src/analytics/api/utils/http.py | 2 +- src/analytics/api/utils/pollutants/pm_25.py | 18 +++-- src/analytics/api/views/data.py | 75 +++++++++++++-------- 3 files changed, 61 insertions(+), 34 deletions(-) diff --git a/src/analytics/api/utils/http.py b/src/analytics/api/utils/http.py index 4dde800778..d858ad74c8 100644 --- a/src/analytics/api/utils/http.py +++ b/src/analytics/api/utils/http.py @@ -163,7 +163,7 @@ def request(self, endpoint, params=None, body=None, method="get", base_url=None) success=True, ) else: - return self.create_response(f"Error: {response.status}", success=False) + return self.create_response(f"Error: {response.data}", success=False) except urllib3.exceptions.HTTPError as ex: logger.exception(f"HTTPError: {ex}") diff --git a/src/analytics/api/utils/pollutants/pm_25.py b/src/analytics/api/utils/pollutants/pm_25.py index e585d5abd7..61e0387673 100644 --- a/src/analytics/api/utils/pollutants/pm_25.py +++ b/src/analytics/api/utils/pollutants/pm_25.py @@ -61,14 +61,20 @@ "no2": ["no2"], }, "daily": { - "pm2_5": ["pm2_5_calibrated_value", "pm2_5_raw_value"], - "pm10": ["pm10_calibrated_value", "pm10_raw_value"], - "no2": ["no2_calibrated_value", "no2_raw_value"], + "pm2_5_calibrated": ["pm2_5_calibrated_value"], + "pm2_5_raw": ["pm2_5_raw_value"], + "pm10_calibrated": ["pm10_calibrated_value"], + "pm10_raw": ["pm10_raw_value"], + "no2_calibrated": ["no2_calibrated_value"], + "no2_raw": ["no2_raw_value"], }, "hourly": { - "pm2_5": ["pm2_5_calibrated_value", "pm2_5_raw_value"], - "pm10": ["pm10_calibrated_value", "pm10_raw_value"], - "no2": ["no2_calibrated_value", "no2_raw_value"], + "pm2_5_calibrated": ["pm2_5_calibrated_value"], + "pm2_5_raw": ["pm2_5_raw_value"], + "pm10_calibrated": ["pm10_calibrated_value"], + "pm10_raw": ["pm10_raw_value"], + "no2_calibrated": ["no2_calibrated_value"], + "no2_raw": ["no2_raw_value"], }, } diff --git a/src/analytics/api/views/data.py b/src/analytics/api/views/data.py index 528bb228e4..40963d01ca 100644 --- a/src/analytics/api/views/data.py +++ b/src/analytics/api/views/data.py @@ -1,6 +1,7 @@ import datetime import traceback -import logging +from typing import List + import flask_excel as excel import pandas as pd @@ -33,6 +34,7 @@ from api.utils.http import AirQoRequests from api.utils.request_validators import validate_request_json, validate_request_params from main import rest_api_v2 +import logging logger = logging.getLogger(__name__) @@ -63,7 +65,11 @@ class DataExportResource(Resource): "outputFormat|optional:str", "pollutants|optional:list", "sites|optional:list", + "site_ids|optional:list", + "site_names|optional:list", + "device_ids|optional:list", "devices|optional:list", + "device_names|optional:list", "airqlouds|optional:list", "datatype|optional:str", "minimum|optional:bool", @@ -89,14 +95,14 @@ def post(self): start_date = json_data["startDateTime"] end_date = json_data["endDateTime"] - try: - filter_type, filter_value = self._get_validated_filter(json_data) - except ValueError as e: - return ( - AirQoRequests.create_response(f"An error occured: {e}", success=False), - AirQoRequests.Status.HTTP_400_BAD_REQUEST, + filter_type, filter_value, error_message = self._get_validated_filter( + json_data ) + if error_message: + return error_message, AirQoRequests.Status.HTTP_400_BAD_REQUEST + except Exception as e: + logger.exception(f"An error has occured; {e}") try: frequency = self._get_valid_option( @@ -144,6 +150,7 @@ def post(self): end_date=end_date, frequency=frequency, pollutants=pollutants, + data_type=data_type, weather_fields=weather_fields, ) @@ -183,11 +190,10 @@ def post(self): records, "csv", file_name=f"{frequency}-air-quality{postfix}data" ) except Exception as ex: - print(ex) - traceback.print_exc() + logger.exception(f"An error occurred: {ex}") return ( AirQoRequests.create_response( - f"An Error occurred while processing your request. Please contact support", + f"An Error occurred while processing your request. Please contact support. {ex}", success=False, ), AirQoRequests.Status.HTTP_500_INTERNAL_SERVER_ERROR, @@ -195,8 +201,8 @@ def post(self): def _get_validated_filter(self, json_data): """ - Ensures that only one of 'airqlouds', 'sites', or 'devices' is provided in the request. - Calls filter_non_private_* only after confirming exclusivity. + Validates that exactly one of 'airqlouds', 'sites', or 'devices' is provided in the request, + and applies filtering if necessary. Args: json_data (dict): JSON payload from the request. @@ -207,31 +213,46 @@ def _get_validated_filter(self, json_data): Raises: ValueError: If more than one or none of the filters are provided. """ - provided_filters = [ - key for key in ["sites", "devices", "airqlouds"] if json_data.get(key) + error_message: str = "" + validated_data: List[str] = None + + # TODO Lias with device registry to cleanup this makeshift implementation + devices = ["devices", "device_ids", "device_names"] + sites = ["sites", "site_names", "site_ids"] + + valid_filters = [ + "sites", + "site_names", + "site_ids", + "devices", + "device_ids", + "airqlouds", + "device_names", ] - + provided_filters = [key for key in valid_filters if json_data.get(key)] if len(provided_filters) != 1: raise ValueError( - "Specify exactly one of 'airqlouds', 'sites', or 'devices' in the request body." + "Specify exactly one of 'airqlouds', 'sites', 'device_names', or 'devices' in the request body." ) - filter_type = provided_filters[0] filter_value = json_data.get(filter_type) - if filter_type == "sites": - validated_value = filter_non_private_sites(sites=filter_value).get( - "sites", [] - ) - elif filter_type == "devices": - validated_value = filter_non_private_devices(devices=filter_value).get( - "devices", [] + if filter_type in sites: + validated_value = filter_non_private_sites(filter_type, filter_value) + elif filter_type in devices: + validated_value = filter_non_private_devices(filter_type, filter_value) + else: + return filter_type, filter_value, None + + if validated_value and validated_value.get("status") == "success": + # TODO This should be cleaned up. + validated_data = validated_value.get("data", {}).get( + "sites" if filter_type in sites else "devices", [] ) else: - # No additional processing is needed for 'airqlouds' - validated_value = filter_value + error_message = validated_value.get("message", "Validation failed") - return filter_type, validated_value + return filter_type, validated_data, error_message def _get_valid_option(self, option, valid_options, option_name): """ From c52e05885defcaf78a3d6de984553f4eceb9b841 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Tue, 19 Nov 2024 19:44:47 +0300 Subject: [PATCH 5/6] Update pollutant mapping for more dynamic queries --- src/analytics/api/utils/pollutants/pm_25.py | 36 ++++++++++----------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/src/analytics/api/utils/pollutants/pm_25.py b/src/analytics/api/utils/pollutants/pm_25.py index 61e0387673..6e3f2a986b 100644 --- a/src/analytics/api/utils/pollutants/pm_25.py +++ b/src/analytics/api/utils/pollutants/pm_25.py @@ -54,28 +54,26 @@ "no2": ["no2_calibrated_value", "no2_raw_value"], } +COMMON_POLLUTANT_MAPPING = { + "pm2_5_calibrated": ["pm2_5_calibrated_value"], + "pm2_5_raw": ["pm2_5_raw_value"], + "pm10_calibrated": ["pm10_calibrated_value"], + "pm10_raw": ["pm10_raw_value"], + "no2_calibrated": ["no2_calibrated_value"], + "no2_raw": ["no2_raw_value"], +} + BIGQUERY_FREQUENCY_MAPPER = { "raw": { "pm2_5": ["pm2_5", "s1_pm2_5", "s2_pm2_5"], "pm10": ["pm10", "s1_pm10", "s2_pm10"], "no2": ["no2"], }, - "daily": { - "pm2_5_calibrated": ["pm2_5_calibrated_value"], - "pm2_5_raw": ["pm2_5_raw_value"], - "pm10_calibrated": ["pm10_calibrated_value"], - "pm10_raw": ["pm10_raw_value"], - "no2_calibrated": ["no2_calibrated_value"], - "no2_raw": ["no2_raw_value"], - }, - "hourly": { - "pm2_5_calibrated": ["pm2_5_calibrated_value"], - "pm2_5_raw": ["pm2_5_raw_value"], - "pm10_calibrated": ["pm10_calibrated_value"], - "pm10_raw": ["pm10_raw_value"], - "no2_calibrated": ["no2_calibrated_value"], - "no2_raw": ["no2_raw_value"], - }, + "daily": COMMON_POLLUTANT_MAPPING, + "hourly": COMMON_POLLUTANT_MAPPING, + "weekly": COMMON_POLLUTANT_MAPPING, + "monthly": COMMON_POLLUTANT_MAPPING, + "yearly": COMMON_POLLUTANT_MAPPING, } PM_COLOR_CATEGORY = { @@ -119,9 +117,9 @@ } WEATHER_FIELDS_MAPPER = { - "temperature": "device_temperature", - "humidity": "device_humidity", - "wind_speed": "wind_speed", + "temperature": ["device_temperature"], + "humidity": ["device_humidity"], + "wind_speed": ["wind_speed"], } From 8bd88dc817a86d0f8cb0105c4228e2f7b4a5aa80 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Tue, 19 Nov 2024 19:47:20 +0300 Subject: [PATCH 6/6] More modularity for dynamic queries --- src/analytics/api/models/events.py | 321 ++++++++++++++++++++++------- src/analytics/api/views/data.py | 8 +- 2 files changed, 250 insertions(+), 79 deletions(-) diff --git a/src/analytics/api/models/events.py b/src/analytics/api/models/events.py index 117575a1f0..77c6b469d9 100644 --- a/src/analytics/api/models/events.py +++ b/src/analytics/api/models/events.py @@ -61,28 +61,18 @@ def device_info_query(self): """Generates a device information query including site_id, tenant, and approximate location details.""" return ( f"{self.devices_table}.site_id AS site_id, " - f"{self.devices_table}.tenant AS tenant, " - f"{self.devices_table}.approximate_latitude AS device_latitude, " - f"{self.devices_table}.approximate_longitude AS device_longitude" + f"{self.devices_table}.tenant AS tenant " ) @property def device_info_query_airqloud(self): """Generates a device information query specifically for airqlouds, excluding the site_id.""" - return ( - f"{self.devices_table}.tenant AS tenant, " - f"{self.devices_table}.approximate_latitude AS device_latitude, " - f"{self.devices_table}.approximate_longitude AS device_longitude" - ) + return f"{self.devices_table}.tenant AS tenant " @property def site_info_query(self): """Generates a site information query to retrieve site name and approximate location details.""" - return ( - f"{self.sites_table}.name AS site_name, " - f"{self.sites_table}.approximate_latitude AS site_latitude, " - f"{self.sites_table}.approximate_longitude AS site_longitude" - ) + return f"{self.sites_table}.name AS site_name " @property def airqloud_info_query(self): @@ -157,6 +147,163 @@ def add_airqloud_join(self, data_query): f"RIGHT JOIN ({data_query}) data ON data.airqloud_id = {self.airqlouds_table}.id " ) + def get_time_grouping(self, frequency): + """ + Determines the appropriate time grouping fields based on the frequency. + + Args: + frequency (str): Frequency like 'raw', 'daily', 'hourly', 'weekly', etc. + + Returns: + str: The time grouping clause for the SQL query. + """ + grouping_map = { + "weekly": "TIMESTAMP_TRUNC(timestamp, WEEK(MONDAY)) AS week", + "monthly": "TIMESTAMP_TRUNC(timestamp, MONTH) AS month", + "yearly": "EXTRACT(YEAR FROM timestamp) AS year", + } + + return grouping_map.get(frequency, "timestamp") + + def get_device_query( + self, + data_table, + filter_value, + pollutants_query, + bam_pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific devices. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of device IDs to filter by. + pollutants_query (str): The SQL query for standard pollutants. + bam_pollutants_query (str): The SQL query for BAM pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve device-specific data, + including BAM data if applicable. + """ + query = ( + f"{pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name " + f"FROM {data_table} " + f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {data_table}.device_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + query += " GROUP BY ALL" + + query = self.add_site_join(query) + if frequency in ["hourly", "weekly", "monthly", "yearly"]: + bam_query = ( + f"{bam_pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name " + f"FROM {self.BIGQUERY_BAM_DATA} " + f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id " + f"WHERE {self.BIGQUERY_BAM_DATA}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + bam_query += " GROUP BY ALL" + bam_query = self.add_site_join(bam_query) + query = f"{query} UNION ALL {bam_query}" + + return query + + def get_site_query( + self, + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific sites. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of site IDs to filter by. + pollutants_query (str): The SQL query for pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve site-specific data. + """ + query = ( + f"{pollutants_query}, {time_grouping}, {self.site_info_query}, {data_table}.device_id AS device_name " + f"FROM {data_table} " + f"JOIN {self.sites_table} ON {self.sites_table}.id = {data_table}.site_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.sites_table}.id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + query += " GROUP BY ALL" + return self.add_device_join(query) + + def get_airqloud_query( + self, + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific AirQlouds. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of AirQloud IDs to filter by. + pollutants_query (str): The SQL query for pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve AirQloud-specific data. + """ + meta_data_query = ( + f"SELECT {self.airqlouds_sites_table}.airqloud_id, " + f"{self.airqlouds_sites_table}.site_id AS site_id " + f"FROM {self.airqlouds_sites_table} " + f"WHERE {self.airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) " + ) + meta_data_query = self.add_airqloud_join(meta_data_query) + meta_data_query = self.add_site_join(meta_data_query) + meta_data_query = self.add_device_join_to_airqlouds(meta_data_query) + + query = ( + f"{pollutants_query}, {time_grouping}, {data_table}.device_id AS device_name, meta_data.* " + f"FROM {data_table} " + f"RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + ) + order_by_clause = ( + f"ORDER BY {data_table}.timestamp" + if frequency not in ["weekly", "monthly", "yearly"] + else "GROUP BY ALL" + ) + + return query + order_by_clause + def build_query( self, data_table, @@ -184,56 +331,61 @@ def build_query( Returns: str: Final constructed SQL query. """ - if filter_type in ["devices", "device_ids", "device_names"]: - query = ( - f"{pollutants_query}, {self.device_info_query}, {self.devices_table}.name AS device_name, " - f"FROM {data_table} " - f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {data_table}.device_id " - f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " - f"AND {self.devices_table}.device_id IN UNNEST({filter_value}) " + time_grouping = self.get_time_grouping(frequency) + + # TODO Find a better way to do this. + if frequency in ["weekly", "monthly", "yearly"]: + # Drop datetime alias + pollutants_query = pollutants_query.replace( + f", FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {data_table}.timestamp) AS datetime", + "", + ) + bam_pollutants_query = bam_pollutants_query.replace( + f", FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {self.BIGQUERY_BAM_DATA}.timestamp) AS datetime", + "", ) - query = self.add_site_join(query) - - if frequency == "hourly": - bam_query = ( - f"{bam_pollutants_query}, {self.device_info_query}, {self.devices_table}.name AS device_name, " - f"FROM {self.BIGQUERY_BAM_DATA} " - f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id " - f"WHERE {self.BIGQUERY_BAM_DATA}.timestamp BETWEEN '{start_date}' AND '{end_date}' " - f"AND {self.devices_table}.device_id IN UNNEST({filter_value}) " - ) - bam_query = self.add_site_join(bam_query) - query = f"{query} UNION ALL {bam_query}" + if filter_type in ["devices", "device_ids", "device_names"]: + return self.get_device_query( + data_table, + filter_value, + pollutants_query, + bam_pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ) elif filter_type in ["sites", "site_names", "site_ids"]: - query = ( - f"{pollutants_query}, {self.site_info_query}, {data_table}.device_id AS device_name " - f"FROM {data_table} " - f"JOIN {self.sites_table} ON {self.sites_table}.id = {data_table}.site_id " - f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " - f"AND {self.sites_table}.id IN UNNEST(@filter_value) " + return self.get_site_query( + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, ) - query = self.add_device_join(query) elif filter_type == "airqlouds": - meta_data_query = ( - f"SELECT {self.airqlouds_sites_table}.airqloud_id, " - f"{self.airqlouds_sites_table}.site_id AS site_id " - f"FROM {self.airqlouds_sites_table} " - f"WHERE {self.airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) " + return self.get_airqloud_query( + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, ) + else: + raise ValueError("Invalid filter type") - meta_data_query = self.add_airqloud_join(meta_data_query) - meta_data_query = self.add_site_join(meta_data_query) - meta_data_query = self.add_device_join_to_airqlouds(meta_data_query) - - query = ( - f"{pollutants_query}, {data_table}.device_id AS device_name, meta_data.* " - f"FROM {data_table} " - f"RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id " - f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " - f"ORDER BY {data_table}.timestamp " - ) - return query + def get_columns(cls, mapping, frequency, data_type, decimal_places, data_table): + if frequency in ["weekly", "monthly", "yearly"]: + return [ + f"ROUND(AVG({data_table}.{col}), {decimal_places}) AS {col}" + for col in mapping + ] + return [ + f"ROUND({data_table}.{col}, {decimal_places}) AS {col}" for col in mapping + ] @classmethod @cache.memoize() @@ -266,14 +418,15 @@ def download_from_bigquery( """ decimal_places = cls.DATA_EXPORT_DECIMAL_PLACES - sorting_cols = ["site_id", "datetime", "device_name"] + sorting_cols = ["site_id", "device_name"] - # Define table mapping for dynamic selection based on frequency data_table = { "raw": cls.BIGQUERY_RAW_DATA, "daily": cls.BIGQUERY_DAILY_DATA, "hourly": cls.BIGQUERY_HOURLY_DATA, - }.get(frequency) + }.get( + frequency, cls.BIGQUERY_HOURLY_DATA + ) # Return hourly if the frequency is weekly, monthly yearly. Validation of frequency is done in data.py if not data_table: raise ValueError("Invalid frequency") @@ -292,23 +445,40 @@ def download_from_bigquery( key, [] ) pollutant_columns.extend( - [ - f"ROUND({data_table}.{mapping}, {decimal_places}) AS {mapping}" - for mapping in pollutant_mapping - ] + cls.get_columns( + cls, + pollutant_mapping, + frequency, + data_type, + decimal_places, + data_table, + ) ) + # TODO Clean up by use using `get_columns` helper method if pollutant in {"pm2_5", "pm10", "no2"}: - bam_pollutant_columns.extend( - [f"ROUND({pollutant}, {decimal_places}) AS {key}_value"] - ) - + if frequency in ["weekly", "monthly", "yearly"]: + bam_pollutant_columns.extend( + [f"ROUND(AVG({pollutant}), {decimal_places}) AS {key}_value"] + ) + else: + bam_pollutant_columns.extend( + [f"ROUND({pollutant}, {decimal_places}) AS {key}_value"] + ) + # TODO Fix query when weather data is included. Currently failing if weather_fields: for field in weather_fields: weather_mapping = WEATHER_FIELDS_MAPPER.get(field) if weather_mapping: - weather_columns.append( - f"ROUND({data_table}.{weather_mapping}, {decimal_places}) AS {weather_mapping}" + weather_columns.extend( + cls.get_columns( + cls, + weather_mapping, + frequency, + data_type, + decimal_places, + data_table, + ) ) selected_columns = set(pollutant_columns + weather_columns) @@ -322,7 +492,6 @@ def download_from_bigquery( f"SELECT {', '.join(bam_selected_columns)}, " f"FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {cls.BIGQUERY_BAM_DATA}.timestamp) AS datetime " ) - instance = cls("build_query") query = instance.build_query( data_table, @@ -353,9 +522,15 @@ def download_from_bigquery( if len(dataframe) == 0: return dataframe - dataframe.drop_duplicates( - subset=["datetime", "device_name"], inplace=True, keep="first" - ) + drop_columns = ["device_name"] + if frequency in ["weekly", "monthly", "yearly"]: + drop_columns.append(frequency[:-2]) + sorting_cols.append(frequency[:-2]) + else: + drop_columns.append("datetime") + sorting_cols.append("datetime") + + dataframe.drop_duplicates(subset=drop_columns, inplace=True, keep="first") dataframe.sort_values(sorting_cols, ascending=True, inplace=True) dataframe["frequency"] = frequency dataframe = dataframe.replace(np.nan, None) diff --git a/src/analytics/api/views/data.py b/src/analytics/api/views/data.py index 40963d01ca..caf4d6439d 100644 --- a/src/analytics/api/views/data.py +++ b/src/analytics/api/views/data.py @@ -2,7 +2,6 @@ import traceback from typing import List - import flask_excel as excel import pandas as pd from flasgger import swag_from @@ -88,7 +87,7 @@ def post(self): "download_types": ["csv", "json"], "data_types": ["calibrated", "raw"], "output_formats": ["airqo-standard", "aqcsv"], - "frequencies": ["hourly", "daily", "raw"], + "frequencies": ["hourly", "daily", "raw", "weekly", "monthly", "yearly"], } json_data = request.get_json() @@ -160,13 +159,10 @@ def post(self): AirQoRequests.Status.HTTP_404_NOT_FOUND, ) if minimum_output: + # Drop unnecessary columns data_frame.drop( columns=[ - "device_latitude", - "device_longitude", "site_id", - "site_latitude", - "site_longitude", ], inplace=True, )