Skip to content

Commit

Permalink
More modularity for dynamic queries
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasTurner23 committed Nov 19, 2024
1 parent c52e058 commit 8bd88dc
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 79 deletions.
321 changes: 248 additions & 73 deletions src/analytics/api/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,18 @@ def device_info_query(self):
"""Generates a device information query including site_id, tenant, and approximate location details."""
return (
f"{self.devices_table}.site_id AS site_id, "
f"{self.devices_table}.tenant AS tenant, "
f"{self.devices_table}.approximate_latitude AS device_latitude, "
f"{self.devices_table}.approximate_longitude AS device_longitude"
f"{self.devices_table}.tenant AS tenant "
)

@property
def device_info_query_airqloud(self):
"""Generates a device information query specifically for airqlouds, excluding the site_id."""
return (
f"{self.devices_table}.tenant AS tenant, "
f"{self.devices_table}.approximate_latitude AS device_latitude, "
f"{self.devices_table}.approximate_longitude AS device_longitude"
)
return f"{self.devices_table}.tenant AS tenant "

@property
def site_info_query(self):
"""Generates a site information query to retrieve site name and approximate location details."""
return (
f"{self.sites_table}.name AS site_name, "
f"{self.sites_table}.approximate_latitude AS site_latitude, "
f"{self.sites_table}.approximate_longitude AS site_longitude"
)
return f"{self.sites_table}.name AS site_name "

@property
def airqloud_info_query(self):
Expand Down Expand Up @@ -157,6 +147,163 @@ def add_airqloud_join(self, data_query):
f"RIGHT JOIN ({data_query}) data ON data.airqloud_id = {self.airqlouds_table}.id "
)

def get_time_grouping(self, frequency):
"""
Determines the appropriate time grouping fields based on the frequency.
Args:
frequency (str): Frequency like 'raw', 'daily', 'hourly', 'weekly', etc.
Returns:
str: The time grouping clause for the SQL query.
"""
grouping_map = {
"weekly": "TIMESTAMP_TRUNC(timestamp, WEEK(MONDAY)) AS week",
"monthly": "TIMESTAMP_TRUNC(timestamp, MONTH) AS month",
"yearly": "EXTRACT(YEAR FROM timestamp) AS year",
}

return grouping_map.get(frequency, "timestamp")

def get_device_query(
self,
data_table,
filter_value,
pollutants_query,
bam_pollutants_query,
time_grouping,
start_date,
end_date,
frequency,
):
"""
Constructs a SQL query to retrieve data for specific devices.
Args:
data_table (str): The name of the data table containing measurements.
filter_value (str): The list of device IDs to filter by.
pollutants_query (str): The SQL query for standard pollutants.
bam_pollutants_query (str): The SQL query for BAM pollutants.
time_grouping (str): The time grouping clause based on frequency.
start_date (str): The start date for the query range.
end_date (str): The end date for the query range.
frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly').
Returns:
str: The SQL query string to retrieve device-specific data,
including BAM data if applicable.
"""
query = (
f"{pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name "
f"FROM {data_table} "
f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {data_table}.device_id "
f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) "
)
if frequency in ["weekly", "monthly", "yearly"]:
query += " GROUP BY ALL"

query = self.add_site_join(query)
if frequency in ["hourly", "weekly", "monthly", "yearly"]:
bam_query = (
f"{bam_pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name "
f"FROM {self.BIGQUERY_BAM_DATA} "
f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id "
f"WHERE {self.BIGQUERY_BAM_DATA}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) "
)
if frequency in ["weekly", "monthly", "yearly"]:
bam_query += " GROUP BY ALL"
bam_query = self.add_site_join(bam_query)
query = f"{query} UNION ALL {bam_query}"

return query

def get_site_query(
self,
data_table,
filter_value,
pollutants_query,
time_grouping,
start_date,
end_date,
frequency,
):
"""
Constructs a SQL query to retrieve data for specific sites.
Args:
data_table (str): The name of the data table containing measurements.
filter_value (str): The list of site IDs to filter by.
pollutants_query (str): The SQL query for pollutants.
time_grouping (str): The time grouping clause based on frequency.
start_date (str): The start date for the query range.
end_date (str): The end date for the query range.
frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly').
Returns:
str: The SQL query string to retrieve site-specific data.
"""
query = (
f"{pollutants_query}, {time_grouping}, {self.site_info_query}, {data_table}.device_id AS device_name "
f"FROM {data_table} "
f"JOIN {self.sites_table} ON {self.sites_table}.id = {data_table}.site_id "
f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.sites_table}.id IN UNNEST(@filter_value) "
)
if frequency in ["weekly", "monthly", "yearly"]:
query += " GROUP BY ALL"
return self.add_device_join(query)

def get_airqloud_query(
self,
data_table,
filter_value,
pollutants_query,
time_grouping,
start_date,
end_date,
frequency,
):
"""
Constructs a SQL query to retrieve data for specific AirQlouds.
Args:
data_table (str): The name of the data table containing measurements.
filter_value (str): The list of AirQloud IDs to filter by.
pollutants_query (str): The SQL query for pollutants.
time_grouping (str): The time grouping clause based on frequency.
start_date (str): The start date for the query range.
end_date (str): The end date for the query range.
frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly').
Returns:
str: The SQL query string to retrieve AirQloud-specific data.
"""
meta_data_query = (
f"SELECT {self.airqlouds_sites_table}.airqloud_id, "
f"{self.airqlouds_sites_table}.site_id AS site_id "
f"FROM {self.airqlouds_sites_table} "
f"WHERE {self.airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) "
)
meta_data_query = self.add_airqloud_join(meta_data_query)
meta_data_query = self.add_site_join(meta_data_query)
meta_data_query = self.add_device_join_to_airqlouds(meta_data_query)

query = (
f"{pollutants_query}, {time_grouping}, {data_table}.device_id AS device_name, meta_data.* "
f"FROM {data_table} "
f"RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id "
f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
)
order_by_clause = (
f"ORDER BY {data_table}.timestamp"
if frequency not in ["weekly", "monthly", "yearly"]
else "GROUP BY ALL"
)

return query + order_by_clause

def build_query(
self,
data_table,
Expand Down Expand Up @@ -184,56 +331,61 @@ def build_query(
Returns:
str: Final constructed SQL query.
"""
if filter_type in ["devices", "device_ids", "device_names"]:
query = (
f"{pollutants_query}, {self.device_info_query}, {self.devices_table}.name AS device_name, "
f"FROM {data_table} "
f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {data_table}.device_id "
f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.devices_table}.device_id IN UNNEST({filter_value}) "
time_grouping = self.get_time_grouping(frequency)

# TODO Find a better way to do this.
if frequency in ["weekly", "monthly", "yearly"]:
# Drop datetime alias
pollutants_query = pollutants_query.replace(
f", FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {data_table}.timestamp) AS datetime",
"",
)
bam_pollutants_query = bam_pollutants_query.replace(
f", FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {self.BIGQUERY_BAM_DATA}.timestamp) AS datetime",
"",
)
query = self.add_site_join(query)

if frequency == "hourly":
bam_query = (
f"{bam_pollutants_query}, {self.device_info_query}, {self.devices_table}.name AS device_name, "
f"FROM {self.BIGQUERY_BAM_DATA} "
f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id "
f"WHERE {self.BIGQUERY_BAM_DATA}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.devices_table}.device_id IN UNNEST({filter_value}) "
)
bam_query = self.add_site_join(bam_query)
query = f"{query} UNION ALL {bam_query}"

if filter_type in ["devices", "device_ids", "device_names"]:
return self.get_device_query(
data_table,
filter_value,
pollutants_query,
bam_pollutants_query,
time_grouping,
start_date,
end_date,
frequency,
)
elif filter_type in ["sites", "site_names", "site_ids"]:
query = (
f"{pollutants_query}, {self.site_info_query}, {data_table}.device_id AS device_name "
f"FROM {data_table} "
f"JOIN {self.sites_table} ON {self.sites_table}.id = {data_table}.site_id "
f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.sites_table}.id IN UNNEST(@filter_value) "
return self.get_site_query(
data_table,
filter_value,
pollutants_query,
time_grouping,
start_date,
end_date,
)
query = self.add_device_join(query)
elif filter_type == "airqlouds":
meta_data_query = (
f"SELECT {self.airqlouds_sites_table}.airqloud_id, "
f"{self.airqlouds_sites_table}.site_id AS site_id "
f"FROM {self.airqlouds_sites_table} "
f"WHERE {self.airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) "
return self.get_airqloud_query(
data_table,
filter_value,
pollutants_query,
time_grouping,
start_date,
end_date,
)
else:
raise ValueError("Invalid filter type")

meta_data_query = self.add_airqloud_join(meta_data_query)
meta_data_query = self.add_site_join(meta_data_query)
meta_data_query = self.add_device_join_to_airqlouds(meta_data_query)

query = (
f"{pollutants_query}, {data_table}.device_id AS device_name, meta_data.* "
f"FROM {data_table} "
f"RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id "
f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"ORDER BY {data_table}.timestamp "
)
return query
def get_columns(cls, mapping, frequency, data_type, decimal_places, data_table):
if frequency in ["weekly", "monthly", "yearly"]:
return [
f"ROUND(AVG({data_table}.{col}), {decimal_places}) AS {col}"
for col in mapping
]
return [
f"ROUND({data_table}.{col}, {decimal_places}) AS {col}" for col in mapping
]

@classmethod
@cache.memoize()
Expand Down Expand Up @@ -266,14 +418,15 @@ def download_from_bigquery(
"""
decimal_places = cls.DATA_EXPORT_DECIMAL_PLACES

sorting_cols = ["site_id", "datetime", "device_name"]
sorting_cols = ["site_id", "device_name"]

# Define table mapping for dynamic selection based on frequency
data_table = {
"raw": cls.BIGQUERY_RAW_DATA,
"daily": cls.BIGQUERY_DAILY_DATA,
"hourly": cls.BIGQUERY_HOURLY_DATA,
}.get(frequency)
}.get(
frequency, cls.BIGQUERY_HOURLY_DATA
) # Return hourly if the frequency is weekly, monthly yearly. Validation of frequency is done in data.py

if not data_table:
raise ValueError("Invalid frequency")
Expand All @@ -292,23 +445,40 @@ def download_from_bigquery(
key, []
)
pollutant_columns.extend(
[
f"ROUND({data_table}.{mapping}, {decimal_places}) AS {mapping}"
for mapping in pollutant_mapping
]
cls.get_columns(
cls,
pollutant_mapping,
frequency,
data_type,
decimal_places,
data_table,
)
)

# TODO Clean up by use using `get_columns` helper method
if pollutant in {"pm2_5", "pm10", "no2"}:
bam_pollutant_columns.extend(
[f"ROUND({pollutant}, {decimal_places}) AS {key}_value"]
)

if frequency in ["weekly", "monthly", "yearly"]:
bam_pollutant_columns.extend(
[f"ROUND(AVG({pollutant}), {decimal_places}) AS {key}_value"]
)
else:
bam_pollutant_columns.extend(
[f"ROUND({pollutant}, {decimal_places}) AS {key}_value"]
)
# TODO Fix query when weather data is included. Currently failing
if weather_fields:
for field in weather_fields:
weather_mapping = WEATHER_FIELDS_MAPPER.get(field)
if weather_mapping:
weather_columns.append(
f"ROUND({data_table}.{weather_mapping}, {decimal_places}) AS {weather_mapping}"
weather_columns.extend(
cls.get_columns(
cls,
weather_mapping,
frequency,
data_type,
decimal_places,
data_table,
)
)

selected_columns = set(pollutant_columns + weather_columns)
Expand All @@ -322,7 +492,6 @@ def download_from_bigquery(
f"SELECT {', '.join(bam_selected_columns)}, "
f"FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {cls.BIGQUERY_BAM_DATA}.timestamp) AS datetime "
)

instance = cls("build_query")
query = instance.build_query(
data_table,
Expand Down Expand Up @@ -353,9 +522,15 @@ def download_from_bigquery(
if len(dataframe) == 0:
return dataframe

dataframe.drop_duplicates(
subset=["datetime", "device_name"], inplace=True, keep="first"
)
drop_columns = ["device_name"]
if frequency in ["weekly", "monthly", "yearly"]:
drop_columns.append(frequency[:-2])
sorting_cols.append(frequency[:-2])
else:
drop_columns.append("datetime")
sorting_cols.append("datetime")

dataframe.drop_duplicates(subset=drop_columns, inplace=True, keep="first")
dataframe.sort_values(sorting_cols, ascending=True, inplace=True)
dataframe["frequency"] = frequency
dataframe = dataframe.replace(np.nan, None)
Expand Down
Loading

0 comments on commit 8bd88dc

Please sign in to comment.