diff --git a/k8s/analytics/values-prod.yaml b/k8s/analytics/values-prod.yaml index 74304224cb..5ac058bce7 100644 --- a/k8s/analytics/values-prod.yaml +++ b/k8s/analytics/values-prod.yaml @@ -1,6 +1,6 @@ namespace: production -nameOverride: '' -fullnameOverride: '' +nameOverride: "" +fullnameOverride: "" images: repositories: api: eu.gcr.io/airqo-250220/airqo-analytics-api @@ -17,8 +17,8 @@ api: podAnnotations: {} resources: limits: - cpu: 100m - memory: 600Mi + cpu: 1000m + memory: 2000Mi requests: cpu: 10m memory: 250Mi diff --git a/k8s/analytics/values-stage.yaml b/k8s/analytics/values-stage.yaml index c4dca0a5e7..763969918f 100644 --- a/k8s/analytics/values-stage.yaml +++ b/k8s/analytics/values-stage.yaml @@ -1,6 +1,6 @@ namespace: staging -nameOverride: '' -fullnameOverride: '' +nameOverride: "" +fullnameOverride: "" images: repositories: api: eu.gcr.io/airqo-250220/airqo-stage-analytics-api @@ -17,8 +17,8 @@ api: podAnnotations: {} resources: limits: - cpu: 100m - memory: 600Mi + cpu: 500m + memory: 1000Mi requests: cpu: 10m memory: 250Mi diff --git a/src/analytics/api/models/events.py b/src/analytics/api/models/events.py index f527889cc8..77c6b469d9 100644 --- a/src/analytics/api/models/events.py +++ b/src/analytics/api/models/events.py @@ -15,6 +15,11 @@ class EventsModel(BasePyMongoModel): + """ + This class manages data retrieval and query construction for events data, integrating device, site, + and airqloud information for specified pollutants and weather fields. + """ + BIGQUERY_AIRQLOUDS_SITES = f"`{CONFIGURATIONS.BIGQUERY_AIRQLOUDS_SITES}`" BIGQUERY_AIRQLOUDS = f"`{CONFIGURATIONS.BIGQUERY_AIRQLOUDS}`" BIGQUERY_GRIDS = f"`{CONFIGURATIONS.BIGQUERY_GRIDS}`" @@ -37,9 +42,351 @@ class EventsModel(BasePyMongoModel): DEVICES_SUMMARY_TABLE = CONFIGURATIONS.DEVICES_SUMMARY_TABLE def __init__(self, tenant): + """ + Initializes the EventsModel with default settings and mappings for limit thresholds, + and specifies collections and BigQuery table references. + + Args: + tenant (str): The tenant identifier for managing database collections. + """ self.limit_mapper = {"pm2_5": 500.5, "pm10": 604.5, "no2": 2049} + self.sites_table = self.BIGQUERY_SITES + self.airqlouds_sites_table = self.BIGQUERY_AIRQLOUDS_SITES + self.devices_table = self.BIGQUERY_DEVICES + self.airqlouds_table = self.BIGQUERY_AIRQLOUDS super().__init__(tenant, collection_name="events") + @property + def device_info_query(self): + """Generates a device information query including site_id, tenant, and approximate location details.""" + return ( + f"{self.devices_table}.site_id AS site_id, " + f"{self.devices_table}.tenant AS tenant " + ) + + @property + def device_info_query_airqloud(self): + """Generates a device information query specifically for airqlouds, excluding the site_id.""" + return f"{self.devices_table}.tenant AS tenant " + + @property + def site_info_query(self): + """Generates a site information query to retrieve site name and approximate location details.""" + return f"{self.sites_table}.name AS site_name " + + @property + def airqloud_info_query(self): + """Generates an Airqloud information query to retrieve the airqloud name.""" + return f"{self.airqlouds_table}.name AS airqloud_name" + + def add_device_join(self, data_query, filter_clause=""): + """ + Joins device information with a given data query based on device_name. + + Args: + data_query (str): The data query to join with device information. + filter_clause (str): Optional SQL filter clause. + + Returns: + str: Modified query with device join. + """ + return ( + f"SELECT {self.device_info_query}, data.* " + f"FROM {self.devices_table} " + f"RIGHT JOIN ({data_query}) data ON data.device_name = {self.devices_table}.device_id " + f"{filter_clause}" + ) + + def add_device_join_to_airqlouds(self, data_query, filter_clause=""): + """ + Joins device information with airqloud data based on site_id. + + Args: + data_query (str): The data query to join with airqloud device information. + filter_clause (str): Optional SQL filter clause. + + Returns: + str: Modified query with device-airqloud join. + """ + return ( + f"SELECT {self.device_info_query_airqloud}, data.* " + f"FROM {self.devices_table} " + f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.devices_table}.site_id " + f"{filter_clause}" + ) + + def add_site_join(self, data_query): + """ + Joins site information with the given data query based on site_id. + + Args: + data_query (str): The data query to join with site information. + + Returns: + str: Modified query with site join. + """ + return ( + f"SELECT {self.site_info_query}, data.* " + f"FROM {self.sites_table} " + f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.sites_table}.id " + ) + + def add_airqloud_join(self, data_query): + """ + Joins Airqloud information with the provided data query based on airqloud_id. + + Args: + data_query (str): The data query to join with Airqloud information. + + Returns: + str: Modified query with Airqloud join. + """ + return ( + f"SELECT {self.airqloud_info_query}, data.* " + f"FROM {self.airqlouds_table} " + f"RIGHT JOIN ({data_query}) data ON data.airqloud_id = {self.airqlouds_table}.id " + ) + + def get_time_grouping(self, frequency): + """ + Determines the appropriate time grouping fields based on the frequency. + + Args: + frequency (str): Frequency like 'raw', 'daily', 'hourly', 'weekly', etc. + + Returns: + str: The time grouping clause for the SQL query. + """ + grouping_map = { + "weekly": "TIMESTAMP_TRUNC(timestamp, WEEK(MONDAY)) AS week", + "monthly": "TIMESTAMP_TRUNC(timestamp, MONTH) AS month", + "yearly": "EXTRACT(YEAR FROM timestamp) AS year", + } + + return grouping_map.get(frequency, "timestamp") + + def get_device_query( + self, + data_table, + filter_value, + pollutants_query, + bam_pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific devices. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of device IDs to filter by. + pollutants_query (str): The SQL query for standard pollutants. + bam_pollutants_query (str): The SQL query for BAM pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve device-specific data, + including BAM data if applicable. + """ + query = ( + f"{pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name " + f"FROM {data_table} " + f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {data_table}.device_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + query += " GROUP BY ALL" + + query = self.add_site_join(query) + if frequency in ["hourly", "weekly", "monthly", "yearly"]: + bam_query = ( + f"{bam_pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name " + f"FROM {self.BIGQUERY_BAM_DATA} " + f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id " + f"WHERE {self.BIGQUERY_BAM_DATA}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + bam_query += " GROUP BY ALL" + bam_query = self.add_site_join(bam_query) + query = f"{query} UNION ALL {bam_query}" + + return query + + def get_site_query( + self, + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific sites. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of site IDs to filter by. + pollutants_query (str): The SQL query for pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve site-specific data. + """ + query = ( + f"{pollutants_query}, {time_grouping}, {self.site_info_query}, {data_table}.device_id AS device_name " + f"FROM {data_table} " + f"JOIN {self.sites_table} ON {self.sites_table}.id = {data_table}.site_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.sites_table}.id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + query += " GROUP BY ALL" + return self.add_device_join(query) + + def get_airqloud_query( + self, + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific AirQlouds. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of AirQloud IDs to filter by. + pollutants_query (str): The SQL query for pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve AirQloud-specific data. + """ + meta_data_query = ( + f"SELECT {self.airqlouds_sites_table}.airqloud_id, " + f"{self.airqlouds_sites_table}.site_id AS site_id " + f"FROM {self.airqlouds_sites_table} " + f"WHERE {self.airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) " + ) + meta_data_query = self.add_airqloud_join(meta_data_query) + meta_data_query = self.add_site_join(meta_data_query) + meta_data_query = self.add_device_join_to_airqlouds(meta_data_query) + + query = ( + f"{pollutants_query}, {time_grouping}, {data_table}.device_id AS device_name, meta_data.* " + f"FROM {data_table} " + f"RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + ) + order_by_clause = ( + f"ORDER BY {data_table}.timestamp" + if frequency not in ["weekly", "monthly", "yearly"] + else "GROUP BY ALL" + ) + + return query + order_by_clause + + def build_query( + self, + data_table, + filter_type, + filter_value, + pollutants_query, + bam_pollutants_query, + start_date, + end_date, + frequency=None, + ): + """ + Builds a SQL query to retrieve pollutant and weather data with associated device or site information. + + Args: + data_table (str): The table name containing the main data records. + filter_type (str): Type of filter (e.g., devices, sites, airqlouds). + filter_value (list): Filter values corresponding to the filter type. + pollutants_query (str): Query for pollutant data. + bam_pollutants_query (str): Query for BAM pollutant data. + start_date (str): Start date for data retrieval. + end_date (str): End date for data retrieval. + frequency (str): Optional frequency filter. + + Returns: + str: Final constructed SQL query. + """ + time_grouping = self.get_time_grouping(frequency) + + # TODO Find a better way to do this. + if frequency in ["weekly", "monthly", "yearly"]: + # Drop datetime alias + pollutants_query = pollutants_query.replace( + f", FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {data_table}.timestamp) AS datetime", + "", + ) + bam_pollutants_query = bam_pollutants_query.replace( + f", FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {self.BIGQUERY_BAM_DATA}.timestamp) AS datetime", + "", + ) + + if filter_type in ["devices", "device_ids", "device_names"]: + return self.get_device_query( + data_table, + filter_value, + pollutants_query, + bam_pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ) + elif filter_type in ["sites", "site_names", "site_ids"]: + return self.get_site_query( + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + ) + elif filter_type == "airqlouds": + return self.get_airqloud_query( + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + ) + else: + raise ValueError("Invalid filter type") + + def get_columns(cls, mapping, frequency, data_type, decimal_places, data_table): + if frequency in ["weekly", "monthly", "yearly"]: + return [ + f"ROUND(AVG({data_table}.{col}), {decimal_places}) AS {col}" + for col in mapping + ] + return [ + f"ROUND({data_table}.{col}, {decimal_places}) AS {col}" for col in mapping + ] + @classmethod @cache.memoize() def download_from_bigquery( @@ -50,24 +397,36 @@ def download_from_bigquery( end_date, frequency, pollutants, + data_type, weather_fields, ) -> pd.DataFrame: + """ + Retrieves data from BigQuery with specified filters, frequency, pollutants, and weather fields. + + Args: + filter_type (str): Type of filter to apply (e.g., 'devices', 'sites', 'airqlouds'). + filter_value (list): Filter values (IDs or names) for the selected filter type. + start_date (str): Start date for the data query. + end_date (str): End date for the data query. + frequency (str): Data frequency (e.g., 'raw', 'daily', 'hourly'). + pollutants (list): List of pollutants to include in the data. + data_type (str): Type of data ('raw' or 'aggregated'). + weather_fields (list): List of weather fields to retrieve. + + Returns: + pd.DataFrame: Retrieved data in DataFrame format, with duplicates removed and sorted by timestamp. + """ decimal_places = cls.DATA_EXPORT_DECIMAL_PLACES - # Data sources - sites_table = cls.BIGQUERY_SITES - airqlouds_sites_table = cls.BIGQUERY_AIRQLOUDS_SITES - devices_table = cls.BIGQUERY_DEVICES - airqlouds_table = cls.BIGQUERY_AIRQLOUDS - - sorting_cols = ["site_id", "datetime", "device_name"] + sorting_cols = ["site_id", "device_name"] - # Define table mapping for dynamic selection based on frequency data_table = { "raw": cls.BIGQUERY_RAW_DATA, "daily": cls.BIGQUERY_DAILY_DATA, "hourly": cls.BIGQUERY_HOURLY_DATA, - }.get(frequency) + }.get( + frequency, cls.BIGQUERY_HOURLY_DATA + ) # Return hourly if the frequency is weekly, monthly yearly. Validation of frequency is done in data.py if not data_table: raise ValueError("Invalid frequency") @@ -76,180 +435,74 @@ def download_from_bigquery( bam_pollutant_columns = [] weather_columns = [] for pollutant in pollutants: - pollutant_mapping = BIGQUERY_FREQUENCY_MAPPER.get(frequency).get( - pollutant, [] + + if pollutant == "raw": + key = pollutant + else: + key = f"{pollutant}_{data_type}" + + pollutant_mapping = BIGQUERY_FREQUENCY_MAPPER.get(frequency, {}).get( + key, [] ) pollutant_columns.extend( - [ - f"ROUND({data_table}.{mapping}, {decimal_places}) AS {mapping}" - for mapping in pollutant_mapping - ] - ) - - if pollutant == "pm2_5": - bam_pollutant_columns.extend( - ["pm2_5 as pm2_5_raw_value", "pm2_5 as pm2_5_calibrated_value"] - ) - elif pollutant == "pm10": - bam_pollutant_columns.extend( - ["pm10 as pm10_raw_value", "pm10 as pm10_calibrated_value"] - ) - elif pollutant == "no2": - bam_pollutant_columns.extend( - ["no2 as no2_raw_value", "no2 as no2_calibrated_value"] + cls.get_columns( + cls, + pollutant_mapping, + frequency, + data_type, + decimal_places, + data_table, ) + ) - if weather_fields is not None: + # TODO Clean up by use using `get_columns` helper method + if pollutant in {"pm2_5", "pm10", "no2"}: + if frequency in ["weekly", "monthly", "yearly"]: + bam_pollutant_columns.extend( + [f"ROUND(AVG({pollutant}), {decimal_places}) AS {key}_value"] + ) + else: + bam_pollutant_columns.extend( + [f"ROUND({pollutant}, {decimal_places}) AS {key}_value"] + ) + # TODO Fix query when weather data is included. Currently failing + if weather_fields: for field in weather_fields: - weather_mapping = WEATHER_FIELDS_MAPPER.get(field, None) - weather_columns.extend( - [ - f"ROUND({data_table}.{weather_mapping}, {decimal_places}) AS {weather_mapping}" - ] - ) + weather_mapping = WEATHER_FIELDS_MAPPER.get(field) + if weather_mapping: + weather_columns.extend( + cls.get_columns( + cls, + weather_mapping, + frequency, + data_type, + decimal_places, + data_table, + ) + ) + + selected_columns = set(pollutant_columns + weather_columns) pollutants_query = ( - f" SELECT {', '.join(map(str, set(pollutant_columns + weather_columns)))} ," - f" FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {data_table}.timestamp) AS datetime " + f"SELECT {', '.join(selected_columns)}, " + f"FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {data_table}.timestamp) AS datetime " ) + + bam_selected_columns = set(bam_pollutant_columns) bam_pollutants_query = ( - f" SELECT {', '.join(map(str, set(bam_pollutant_columns)))} ," - f" FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {cls.BIGQUERY_BAM_DATA}.timestamp) AS datetime " + f"SELECT {', '.join(bam_selected_columns)}, " + f"FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {cls.BIGQUERY_BAM_DATA}.timestamp) AS datetime " + ) + instance = cls("build_query") + query = instance.build_query( + data_table, + filter_type, + filter_value, + pollutants_query, + bam_pollutants_query, + start_date, + end_date, + frequency=frequency, ) - - if filter_type == "devices": - # Adding device information, start and end times - query = ( - f" {pollutants_query} , " - f" {devices_table}.device_id AS device_name , " - f" {devices_table}.site_id AS site_id , " - f" {devices_table}.tenant AS tenant , " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - f" FROM {data_table} " - f" JOIN {devices_table} ON {devices_table}.device_id = {data_table}.device_id " - f" WHERE {data_table}.timestamp >= '{start_date}' " - f" AND {data_table}.timestamp <= '{end_date}' " - f" AND {devices_table}.device_id IN UNNEST(@filter_value) " - ) - - bam_query = ( - f" {bam_pollutants_query} , " - f" {devices_table}.device_id AS device_name , " - f" {devices_table}.site_id AS site_id , " - f" {devices_table}.tenant AS tenant , " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - f" FROM {cls.BIGQUERY_BAM_DATA} " - f" JOIN {devices_table} ON {devices_table}.device_id = {cls.BIGQUERY_BAM_DATA}.device_id " - f" WHERE {cls.BIGQUERY_BAM_DATA}.timestamp >= '{start_date}' " - f" AND {cls.BIGQUERY_BAM_DATA}.timestamp <= '{end_date}' " - f" AND {devices_table}.device_id IN UNNEST(@filter_value) " - ) - - # Adding site information - query = ( - f" SELECT " - f" {sites_table}.name AS site_name , " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" data.* " - f" FROM {sites_table} " - f" RIGHT JOIN ({query}) data ON data.site_id = {sites_table}.id " - ) - - bam_query = ( - f" SELECT " - f" {sites_table}.name AS site_name , " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" data.* " - f" FROM {sites_table} " - f" RIGHT JOIN ({bam_query}) data ON data.site_id = {sites_table}.id " - ) - - if frequency == "hourly": - query = f"{query} UNION ALL {bam_query}" - - elif filter_type == "sites": - # Adding site information, start and end times - query = ( - f" {pollutants_query} , " - f" {sites_table}.tenant AS tenant , " - f" {sites_table}.id AS site_id , " - f" {sites_table}.name AS site_name , " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" {data_table}.device_id AS device_name , " - f" FROM {data_table} " - f" JOIN {sites_table} ON {sites_table}.id = {data_table}.site_id " - f" WHERE {data_table}.timestamp >= '{start_date}' " - f" AND {data_table}.timestamp <= '{end_date}' " - f" AND {sites_table}.id IN UNNEST(@filter_value) " - ) - - # Adding device information - query = ( - f" SELECT " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - # f" {devices_table}.device_id AS device_name , " #this column creates a duplicate column - f" data.* " - f" FROM {devices_table} " - f" RIGHT JOIN ({query}) data ON data.device_name = {devices_table}.device_id " - ) - elif filter_type == "airqlouds": - sorting_cols = ["airqloud_id", "site_id", "datetime", "device_name"] - - meta_data_query = ( - f" SELECT {airqlouds_sites_table}.tenant , " - f" {airqlouds_sites_table}.airqloud_id , " - f" {airqlouds_sites_table}.site_id , " - f" FROM {airqlouds_sites_table} " - f" WHERE {airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) " - ) - - # Adding airqloud information - meta_data_query = ( - f" SELECT " - f" {airqlouds_table}.name AS airqloud_name , " - f" meta_data.* " - f" FROM {airqlouds_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.airqloud_id = {airqlouds_table}.id " - ) - - # Adding site information - meta_data_query = ( - f" SELECT " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" {sites_table}.name AS site_name , " - f" meta_data.* " - f" FROM {sites_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {sites_table}.id " - ) - - # Adding device information - meta_data_query = ( - f" SELECT " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - f" {devices_table}.device_id AS device_name , " - f" meta_data.* " - f" FROM {devices_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {devices_table}.site_id " - ) - - # Adding start and end times - query = ( - f" {pollutants_query} , " - f" meta_data.* " - f" FROM {data_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id " - f" WHERE {data_table}.timestamp >= '{start_date}' " - f" AND {data_table}.timestamp <= '{end_date}' " - f" ORDER BY {data_table}.timestamp " - ) - job_config = bigquery.QueryJobConfig( query_parameters=[ bigquery.ArrayQueryParameter("filter_value", "STRING", filter_value), @@ -269,9 +522,15 @@ def download_from_bigquery( if len(dataframe) == 0: return dataframe - dataframe.drop_duplicates( - subset=["datetime", "device_name"], inplace=True, keep="first" - ) + drop_columns = ["device_name"] + if frequency in ["weekly", "monthly", "yearly"]: + drop_columns.append(frequency[:-2]) + sorting_cols.append(frequency[:-2]) + else: + drop_columns.append("datetime") + sorting_cols.append("datetime") + + dataframe.drop_duplicates(subset=drop_columns, inplace=True, keep="first") dataframe.sort_values(sorting_cols, ascending=True, inplace=True) dataframe["frequency"] = frequency dataframe = dataframe.replace(np.nan, None) diff --git a/src/analytics/api/utils/data_formatters.py b/src/analytics/api/utils/data_formatters.py index e609953e5e..bace9559de 100644 --- a/src/analytics/api/utils/data_formatters.py +++ b/src/analytics/api/utils/data_formatters.py @@ -1,6 +1,6 @@ from enum import Enum from typing import Any, List, Union, Dict -import logging +from werkzeug.exceptions import BadRequest import pandas as pd import requests @@ -15,7 +15,8 @@ AQCSV_DATA_STATUS_MAPPER, ) from api.utils.http import AirQoRequests -from config import Config + +import logging logger = logging.getLogger(__name__) @@ -294,7 +295,7 @@ def device_category_to_str(device_category: str) -> str: return "" -def filter_non_private_sites(sites: List[str]) -> Dict[str, Any]: +def filter_non_private_sites(filter_type: str, sites: List[str]) -> Dict[str, Any]: """ Filters out private site IDs from a provided array of site IDs. @@ -313,17 +314,21 @@ def filter_non_private_sites(sites: List[str]) -> Dict[str, Any]: try: airqo_requests = AirQoRequests() response = airqo_requests.request( - endpoint=endpoint, body={"sites": sites}, method="post" + endpoint=endpoint, body={filter_type: sites}, method="post" ) - if response and response.get("status", None) == "success": - return response.get("data") + if response and response.get("status") == "success": + return airqo_requests.create_response( + message="Successfully returned data.", + data=response.get("data"), + success=True, + ) else: - raise RuntimeError(response.get("message")) - except RuntimeError as rex: - raise RuntimeError(f"Error while filtering non private sites {rex}") + return airqo_requests.create_response(response, success=False) + except Exception as rex: + logger.exception(f"Error while filtering non private devices {rex}") -def filter_non_private_devices(devices: List[str]) -> Dict[str, Any]: +def filter_non_private_devices(filter_type: str, devices: List[str]) -> Dict[str, Any]: """ FilterS out private device IDs from a provided array of device IDs. @@ -341,11 +346,15 @@ def filter_non_private_devices(devices: List[str]) -> Dict[str, Any]: try: airqo_requests = AirQoRequests() response = airqo_requests.request( - endpoint=endpoint, body={"devices": devices}, method="post" + endpoint=endpoint, body={filter_type: devices}, method="post" ) - if response and response.get("status", None) == "success": - return response.get("data") + if response and response.get("status") == "success": + return airqo_requests.create_response( + message="Successfully returned data.", + data=response.get("data"), + success=True, + ) else: - raise RuntimeError(response.get("message")) - except RuntimeError as rex: - raise RuntimeError(f"Error while filtering non private devices {rex}") + return airqo_requests.create_response(response, success=False) + except Exception as rex: + logger.exception(f"Error while filtering non private devices {rex}") diff --git a/src/analytics/api/utils/http.py b/src/analytics/api/utils/http.py index 4dde800778..d858ad74c8 100644 --- a/src/analytics/api/utils/http.py +++ b/src/analytics/api/utils/http.py @@ -163,7 +163,7 @@ def request(self, endpoint, params=None, body=None, method="get", base_url=None) success=True, ) else: - return self.create_response(f"Error: {response.status}", success=False) + return self.create_response(f"Error: {response.data}", success=False) except urllib3.exceptions.HTTPError as ex: logger.exception(f"HTTPError: {ex}") diff --git a/src/analytics/api/utils/pollutants/pm_25.py b/src/analytics/api/utils/pollutants/pm_25.py index e585d5abd7..6e3f2a986b 100644 --- a/src/analytics/api/utils/pollutants/pm_25.py +++ b/src/analytics/api/utils/pollutants/pm_25.py @@ -54,22 +54,26 @@ "no2": ["no2_calibrated_value", "no2_raw_value"], } +COMMON_POLLUTANT_MAPPING = { + "pm2_5_calibrated": ["pm2_5_calibrated_value"], + "pm2_5_raw": ["pm2_5_raw_value"], + "pm10_calibrated": ["pm10_calibrated_value"], + "pm10_raw": ["pm10_raw_value"], + "no2_calibrated": ["no2_calibrated_value"], + "no2_raw": ["no2_raw_value"], +} + BIGQUERY_FREQUENCY_MAPPER = { "raw": { "pm2_5": ["pm2_5", "s1_pm2_5", "s2_pm2_5"], "pm10": ["pm10", "s1_pm10", "s2_pm10"], "no2": ["no2"], }, - "daily": { - "pm2_5": ["pm2_5_calibrated_value", "pm2_5_raw_value"], - "pm10": ["pm10_calibrated_value", "pm10_raw_value"], - "no2": ["no2_calibrated_value", "no2_raw_value"], - }, - "hourly": { - "pm2_5": ["pm2_5_calibrated_value", "pm2_5_raw_value"], - "pm10": ["pm10_calibrated_value", "pm10_raw_value"], - "no2": ["no2_calibrated_value", "no2_raw_value"], - }, + "daily": COMMON_POLLUTANT_MAPPING, + "hourly": COMMON_POLLUTANT_MAPPING, + "weekly": COMMON_POLLUTANT_MAPPING, + "monthly": COMMON_POLLUTANT_MAPPING, + "yearly": COMMON_POLLUTANT_MAPPING, } PM_COLOR_CATEGORY = { @@ -113,9 +117,9 @@ } WEATHER_FIELDS_MAPPER = { - "temperature": "device_temperature", - "humidity": "device_humidity", - "wind_speed": "wind_speed", + "temperature": ["device_temperature"], + "humidity": ["device_humidity"], + "wind_speed": ["wind_speed"], } diff --git a/src/analytics/api/views/data.py b/src/analytics/api/views/data.py index 528bb228e4..caf4d6439d 100644 --- a/src/analytics/api/views/data.py +++ b/src/analytics/api/views/data.py @@ -1,6 +1,6 @@ import datetime import traceback -import logging +from typing import List import flask_excel as excel import pandas as pd @@ -33,6 +33,7 @@ from api.utils.http import AirQoRequests from api.utils.request_validators import validate_request_json, validate_request_params from main import rest_api_v2 +import logging logger = logging.getLogger(__name__) @@ -63,7 +64,11 @@ class DataExportResource(Resource): "outputFormat|optional:str", "pollutants|optional:list", "sites|optional:list", + "site_ids|optional:list", + "site_names|optional:list", + "device_ids|optional:list", "devices|optional:list", + "device_names|optional:list", "airqlouds|optional:list", "datatype|optional:str", "minimum|optional:bool", @@ -82,21 +87,21 @@ def post(self): "download_types": ["csv", "json"], "data_types": ["calibrated", "raw"], "output_formats": ["airqo-standard", "aqcsv"], - "frequencies": ["hourly", "daily", "raw"], + "frequencies": ["hourly", "daily", "raw", "weekly", "monthly", "yearly"], } json_data = request.get_json() start_date = json_data["startDateTime"] end_date = json_data["endDateTime"] - try: - filter_type, filter_value = self._get_validated_filter(json_data) - except ValueError as e: - return ( - AirQoRequests.create_response(f"An error occured: {e}", success=False), - AirQoRequests.Status.HTTP_400_BAD_REQUEST, + filter_type, filter_value, error_message = self._get_validated_filter( + json_data ) + if error_message: + return error_message, AirQoRequests.Status.HTTP_400_BAD_REQUEST + except Exception as e: + logger.exception(f"An error has occured; {e}") try: frequency = self._get_valid_option( @@ -144,6 +149,7 @@ def post(self): end_date=end_date, frequency=frequency, pollutants=pollutants, + data_type=data_type, weather_fields=weather_fields, ) @@ -153,13 +159,10 @@ def post(self): AirQoRequests.Status.HTTP_404_NOT_FOUND, ) if minimum_output: + # Drop unnecessary columns data_frame.drop( columns=[ - "device_latitude", - "device_longitude", "site_id", - "site_latitude", - "site_longitude", ], inplace=True, ) @@ -183,11 +186,10 @@ def post(self): records, "csv", file_name=f"{frequency}-air-quality{postfix}data" ) except Exception as ex: - print(ex) - traceback.print_exc() + logger.exception(f"An error occurred: {ex}") return ( AirQoRequests.create_response( - f"An Error occurred while processing your request. Please contact support", + f"An Error occurred while processing your request. Please contact support. {ex}", success=False, ), AirQoRequests.Status.HTTP_500_INTERNAL_SERVER_ERROR, @@ -195,8 +197,8 @@ def post(self): def _get_validated_filter(self, json_data): """ - Ensures that only one of 'airqlouds', 'sites', or 'devices' is provided in the request. - Calls filter_non_private_* only after confirming exclusivity. + Validates that exactly one of 'airqlouds', 'sites', or 'devices' is provided in the request, + and applies filtering if necessary. Args: json_data (dict): JSON payload from the request. @@ -207,31 +209,46 @@ def _get_validated_filter(self, json_data): Raises: ValueError: If more than one or none of the filters are provided. """ - provided_filters = [ - key for key in ["sites", "devices", "airqlouds"] if json_data.get(key) + error_message: str = "" + validated_data: List[str] = None + + # TODO Lias with device registry to cleanup this makeshift implementation + devices = ["devices", "device_ids", "device_names"] + sites = ["sites", "site_names", "site_ids"] + + valid_filters = [ + "sites", + "site_names", + "site_ids", + "devices", + "device_ids", + "airqlouds", + "device_names", ] - + provided_filters = [key for key in valid_filters if json_data.get(key)] if len(provided_filters) != 1: raise ValueError( - "Specify exactly one of 'airqlouds', 'sites', or 'devices' in the request body." + "Specify exactly one of 'airqlouds', 'sites', 'device_names', or 'devices' in the request body." ) - filter_type = provided_filters[0] filter_value = json_data.get(filter_type) - if filter_type == "sites": - validated_value = filter_non_private_sites(sites=filter_value).get( - "sites", [] - ) - elif filter_type == "devices": - validated_value = filter_non_private_devices(devices=filter_value).get( - "devices", [] + if filter_type in sites: + validated_value = filter_non_private_sites(filter_type, filter_value) + elif filter_type in devices: + validated_value = filter_non_private_devices(filter_type, filter_value) + else: + return filter_type, filter_value, None + + if validated_value and validated_value.get("status") == "success": + # TODO This should be cleaned up. + validated_data = validated_value.get("data", {}).get( + "sites" if filter_type in sites else "devices", [] ) else: - # No additional processing is needed for 'airqlouds' - validated_value = filter_value + error_message = validated_value.get("message", "Validation failed") - return filter_type, validated_value + return filter_type, validated_data, error_message def _get_valid_option(self, option, valid_options, option_name): """