From 8bd88dc817a86d0f8cb0105c4228e2f7b4a5aa80 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Tue, 19 Nov 2024 19:47:20 +0300 Subject: [PATCH] More modularity for dynamic queries --- src/analytics/api/models/events.py | 321 ++++++++++++++++++++++------- src/analytics/api/views/data.py | 8 +- 2 files changed, 250 insertions(+), 79 deletions(-) diff --git a/src/analytics/api/models/events.py b/src/analytics/api/models/events.py index 117575a1f0..77c6b469d9 100644 --- a/src/analytics/api/models/events.py +++ b/src/analytics/api/models/events.py @@ -61,28 +61,18 @@ def device_info_query(self): """Generates a device information query including site_id, tenant, and approximate location details.""" return ( f"{self.devices_table}.site_id AS site_id, " - f"{self.devices_table}.tenant AS tenant, " - f"{self.devices_table}.approximate_latitude AS device_latitude, " - f"{self.devices_table}.approximate_longitude AS device_longitude" + f"{self.devices_table}.tenant AS tenant " ) @property def device_info_query_airqloud(self): """Generates a device information query specifically for airqlouds, excluding the site_id.""" - return ( - f"{self.devices_table}.tenant AS tenant, " - f"{self.devices_table}.approximate_latitude AS device_latitude, " - f"{self.devices_table}.approximate_longitude AS device_longitude" - ) + return f"{self.devices_table}.tenant AS tenant " @property def site_info_query(self): """Generates a site information query to retrieve site name and approximate location details.""" - return ( - f"{self.sites_table}.name AS site_name, " - f"{self.sites_table}.approximate_latitude AS site_latitude, " - f"{self.sites_table}.approximate_longitude AS site_longitude" - ) + return f"{self.sites_table}.name AS site_name " @property def airqloud_info_query(self): @@ -157,6 +147,163 @@ def add_airqloud_join(self, data_query): f"RIGHT JOIN ({data_query}) data ON data.airqloud_id = {self.airqlouds_table}.id " ) + def get_time_grouping(self, frequency): + """ + Determines the appropriate time grouping fields based on the frequency. + + Args: + frequency (str): Frequency like 'raw', 'daily', 'hourly', 'weekly', etc. + + Returns: + str: The time grouping clause for the SQL query. + """ + grouping_map = { + "weekly": "TIMESTAMP_TRUNC(timestamp, WEEK(MONDAY)) AS week", + "monthly": "TIMESTAMP_TRUNC(timestamp, MONTH) AS month", + "yearly": "EXTRACT(YEAR FROM timestamp) AS year", + } + + return grouping_map.get(frequency, "timestamp") + + def get_device_query( + self, + data_table, + filter_value, + pollutants_query, + bam_pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific devices. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of device IDs to filter by. + pollutants_query (str): The SQL query for standard pollutants. + bam_pollutants_query (str): The SQL query for BAM pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve device-specific data, + including BAM data if applicable. + """ + query = ( + f"{pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name " + f"FROM {data_table} " + f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {data_table}.device_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + query += " GROUP BY ALL" + + query = self.add_site_join(query) + if frequency in ["hourly", "weekly", "monthly", "yearly"]: + bam_query = ( + f"{bam_pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name " + f"FROM {self.BIGQUERY_BAM_DATA} " + f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id " + f"WHERE {self.BIGQUERY_BAM_DATA}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + bam_query += " GROUP BY ALL" + bam_query = self.add_site_join(bam_query) + query = f"{query} UNION ALL {bam_query}" + + return query + + def get_site_query( + self, + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific sites. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of site IDs to filter by. + pollutants_query (str): The SQL query for pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve site-specific data. + """ + query = ( + f"{pollutants_query}, {time_grouping}, {self.site_info_query}, {data_table}.device_id AS device_name " + f"FROM {data_table} " + f"JOIN {self.sites_table} ON {self.sites_table}.id = {data_table}.site_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.sites_table}.id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + query += " GROUP BY ALL" + return self.add_device_join(query) + + def get_airqloud_query( + self, + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific AirQlouds. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of AirQloud IDs to filter by. + pollutants_query (str): The SQL query for pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve AirQloud-specific data. + """ + meta_data_query = ( + f"SELECT {self.airqlouds_sites_table}.airqloud_id, " + f"{self.airqlouds_sites_table}.site_id AS site_id " + f"FROM {self.airqlouds_sites_table} " + f"WHERE {self.airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) " + ) + meta_data_query = self.add_airqloud_join(meta_data_query) + meta_data_query = self.add_site_join(meta_data_query) + meta_data_query = self.add_device_join_to_airqlouds(meta_data_query) + + query = ( + f"{pollutants_query}, {time_grouping}, {data_table}.device_id AS device_name, meta_data.* " + f"FROM {data_table} " + f"RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + ) + order_by_clause = ( + f"ORDER BY {data_table}.timestamp" + if frequency not in ["weekly", "monthly", "yearly"] + else "GROUP BY ALL" + ) + + return query + order_by_clause + def build_query( self, data_table, @@ -184,56 +331,61 @@ def build_query( Returns: str: Final constructed SQL query. """ - if filter_type in ["devices", "device_ids", "device_names"]: - query = ( - f"{pollutants_query}, {self.device_info_query}, {self.devices_table}.name AS device_name, " - f"FROM {data_table} " - f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {data_table}.device_id " - f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " - f"AND {self.devices_table}.device_id IN UNNEST({filter_value}) " + time_grouping = self.get_time_grouping(frequency) + + # TODO Find a better way to do this. + if frequency in ["weekly", "monthly", "yearly"]: + # Drop datetime alias + pollutants_query = pollutants_query.replace( + f", FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {data_table}.timestamp) AS datetime", + "", + ) + bam_pollutants_query = bam_pollutants_query.replace( + f", FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {self.BIGQUERY_BAM_DATA}.timestamp) AS datetime", + "", ) - query = self.add_site_join(query) - - if frequency == "hourly": - bam_query = ( - f"{bam_pollutants_query}, {self.device_info_query}, {self.devices_table}.name AS device_name, " - f"FROM {self.BIGQUERY_BAM_DATA} " - f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id " - f"WHERE {self.BIGQUERY_BAM_DATA}.timestamp BETWEEN '{start_date}' AND '{end_date}' " - f"AND {self.devices_table}.device_id IN UNNEST({filter_value}) " - ) - bam_query = self.add_site_join(bam_query) - query = f"{query} UNION ALL {bam_query}" + if filter_type in ["devices", "device_ids", "device_names"]: + return self.get_device_query( + data_table, + filter_value, + pollutants_query, + bam_pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ) elif filter_type in ["sites", "site_names", "site_ids"]: - query = ( - f"{pollutants_query}, {self.site_info_query}, {data_table}.device_id AS device_name " - f"FROM {data_table} " - f"JOIN {self.sites_table} ON {self.sites_table}.id = {data_table}.site_id " - f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " - f"AND {self.sites_table}.id IN UNNEST(@filter_value) " + return self.get_site_query( + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, ) - query = self.add_device_join(query) elif filter_type == "airqlouds": - meta_data_query = ( - f"SELECT {self.airqlouds_sites_table}.airqloud_id, " - f"{self.airqlouds_sites_table}.site_id AS site_id " - f"FROM {self.airqlouds_sites_table} " - f"WHERE {self.airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) " + return self.get_airqloud_query( + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, ) + else: + raise ValueError("Invalid filter type") - meta_data_query = self.add_airqloud_join(meta_data_query) - meta_data_query = self.add_site_join(meta_data_query) - meta_data_query = self.add_device_join_to_airqlouds(meta_data_query) - - query = ( - f"{pollutants_query}, {data_table}.device_id AS device_name, meta_data.* " - f"FROM {data_table} " - f"RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id " - f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " - f"ORDER BY {data_table}.timestamp " - ) - return query + def get_columns(cls, mapping, frequency, data_type, decimal_places, data_table): + if frequency in ["weekly", "monthly", "yearly"]: + return [ + f"ROUND(AVG({data_table}.{col}), {decimal_places}) AS {col}" + for col in mapping + ] + return [ + f"ROUND({data_table}.{col}, {decimal_places}) AS {col}" for col in mapping + ] @classmethod @cache.memoize() @@ -266,14 +418,15 @@ def download_from_bigquery( """ decimal_places = cls.DATA_EXPORT_DECIMAL_PLACES - sorting_cols = ["site_id", "datetime", "device_name"] + sorting_cols = ["site_id", "device_name"] - # Define table mapping for dynamic selection based on frequency data_table = { "raw": cls.BIGQUERY_RAW_DATA, "daily": cls.BIGQUERY_DAILY_DATA, "hourly": cls.BIGQUERY_HOURLY_DATA, - }.get(frequency) + }.get( + frequency, cls.BIGQUERY_HOURLY_DATA + ) # Return hourly if the frequency is weekly, monthly yearly. Validation of frequency is done in data.py if not data_table: raise ValueError("Invalid frequency") @@ -292,23 +445,40 @@ def download_from_bigquery( key, [] ) pollutant_columns.extend( - [ - f"ROUND({data_table}.{mapping}, {decimal_places}) AS {mapping}" - for mapping in pollutant_mapping - ] + cls.get_columns( + cls, + pollutant_mapping, + frequency, + data_type, + decimal_places, + data_table, + ) ) + # TODO Clean up by use using `get_columns` helper method if pollutant in {"pm2_5", "pm10", "no2"}: - bam_pollutant_columns.extend( - [f"ROUND({pollutant}, {decimal_places}) AS {key}_value"] - ) - + if frequency in ["weekly", "monthly", "yearly"]: + bam_pollutant_columns.extend( + [f"ROUND(AVG({pollutant}), {decimal_places}) AS {key}_value"] + ) + else: + bam_pollutant_columns.extend( + [f"ROUND({pollutant}, {decimal_places}) AS {key}_value"] + ) + # TODO Fix query when weather data is included. Currently failing if weather_fields: for field in weather_fields: weather_mapping = WEATHER_FIELDS_MAPPER.get(field) if weather_mapping: - weather_columns.append( - f"ROUND({data_table}.{weather_mapping}, {decimal_places}) AS {weather_mapping}" + weather_columns.extend( + cls.get_columns( + cls, + weather_mapping, + frequency, + data_type, + decimal_places, + data_table, + ) ) selected_columns = set(pollutant_columns + weather_columns) @@ -322,7 +492,6 @@ def download_from_bigquery( f"SELECT {', '.join(bam_selected_columns)}, " f"FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {cls.BIGQUERY_BAM_DATA}.timestamp) AS datetime " ) - instance = cls("build_query") query = instance.build_query( data_table, @@ -353,9 +522,15 @@ def download_from_bigquery( if len(dataframe) == 0: return dataframe - dataframe.drop_duplicates( - subset=["datetime", "device_name"], inplace=True, keep="first" - ) + drop_columns = ["device_name"] + if frequency in ["weekly", "monthly", "yearly"]: + drop_columns.append(frequency[:-2]) + sorting_cols.append(frequency[:-2]) + else: + drop_columns.append("datetime") + sorting_cols.append("datetime") + + dataframe.drop_duplicates(subset=drop_columns, inplace=True, keep="first") dataframe.sort_values(sorting_cols, ascending=True, inplace=True) dataframe["frequency"] = frequency dataframe = dataframe.replace(np.nan, None) diff --git a/src/analytics/api/views/data.py b/src/analytics/api/views/data.py index 40963d01ca..caf4d6439d 100644 --- a/src/analytics/api/views/data.py +++ b/src/analytics/api/views/data.py @@ -2,7 +2,6 @@ import traceback from typing import List - import flask_excel as excel import pandas as pd from flasgger import swag_from @@ -88,7 +87,7 @@ def post(self): "download_types": ["csv", "json"], "data_types": ["calibrated", "raw"], "output_formats": ["airqo-standard", "aqcsv"], - "frequencies": ["hourly", "daily", "raw"], + "frequencies": ["hourly", "daily", "raw", "weekly", "monthly", "yearly"], } json_data = request.get_json() @@ -160,13 +159,10 @@ def post(self): AirQoRequests.Status.HTTP_404_NOT_FOUND, ) if minimum_output: + # Drop unnecessary columns data_frame.drop( columns=[ - "device_latitude", - "device_longitude", "site_id", - "site_latitude", - "site_longitude", ], inplace=True, )