Skip to content

Commit

Permalink
Merge pull request #3993 from NicholasTurner23/update/Integration-iqa…
Browse files Browse the repository at this point in the history
…ir-devices

Update/integration iqair devices
  • Loading branch information
Baalmart authored Dec 4, 2024
2 parents 4718807 + cd0063a commit f43cad7
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 25 deletions.
3 changes: 1 addition & 2 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def get_devices(
device_category: DeviceCategory = DeviceCategory.NONE,
) -> List[Dict[str, Any]]:
"""
Retrieve devices given a tenant and device category.
Retrieve devices given a network and device category.
Args:
- network (str): An Enum that represents site ownership.
Expand Down Expand Up @@ -198,7 +198,6 @@ def get_devices(
"device_category": str(
DeviceCategory.from_str(device.pop("category", None))
),
"network": device.get("network"),
"device_manufacturer": device.get("network", "airqo"),
**device,
}
Expand Down
13 changes: 7 additions & 6 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def extract_uncalibrated_data(start_date_time, end_date_time) -> pd.DataFrame:
null_cols=["pm2_5_calibrated_value"],
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.AIRQO,
network=str(Tenant.AIRQO),
)

return DataValidationUtils.remove_outliers(hourly_uncalibrated_data)
Expand All @@ -79,7 +79,7 @@ def extract_data_from_bigquery(
table=table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.AIRQO,
network=str(Tenant.AIRQO),
)

return DataValidationUtils.remove_outliers(raw_data)
Expand Down Expand Up @@ -117,7 +117,10 @@ def remove_duplicates(data: pd.DataFrame) -> pd.DataFrame:

@staticmethod
def extract_aggregated_raw_data(
start_date_time: str, end_date_time: str, dynamic_query: bool = False
start_date_time: str,
end_date_time: str,
network: str = None,
dynamic_query: bool = False,
) -> pd.DataFrame:
"""
Retrieves raw pm2.5 sensor data from bigquery and computes averages for the numeric columns grouped by device_number, device_id and site_id
Expand All @@ -128,9 +131,7 @@ def extract_aggregated_raw_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=bigquery_api.raw_measurements_table,
network=str(
Tenant.AIRQO
), # TODO Replace tenant implementation with network implementation
network=network,
dynamic_query=dynamic_query,
)

Expand Down
30 changes: 20 additions & 10 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def compose_query(
table: str,
start_date_time: str,
end_date_time: str,
network: str,
network: str = "all",
where_fields: dict = None,
null_cols: list = None,
columns: list = None,
Expand All @@ -536,17 +536,15 @@ def compose_query(
Exception: If an invalid column is provided in `where_fields` or `null_cols`,
or if the `query_type` is not supported.
"""
tenant = "airqo"

null_cols = [] if null_cols is None else null_cols
where_fields = {} if where_fields is None else where_fields

columns = ", ".join(map(str, columns)) if columns else " * "
where_clause = (
f" timestamp >= '{start_date_time}' and timestamp <= '{end_date_time}' "
)
if tenant != Tenant.ALL:
where_clause = f" {where_clause} and tenant = '{str(tenant)}' or network = '{str(network)}' "
where_clause = f" timestamp between '{start_date_time}' and '{end_date_time}' "

if network:
where_clause += f"AND network = '{network}' "

valid_cols = self.get_columns(table=table)

Expand Down Expand Up @@ -613,7 +611,7 @@ def query_data(
start_date_time: str,
end_date_time: str,
table: str,
network: str,
network: str = None,
dynamic_query: bool = False,
columns: list = None,
where_fields: dict = None,
Expand Down Expand Up @@ -649,7 +647,11 @@ def query_data(
)
else:
query = self.dynamic_averaging_query(
table, start_date_time, end_date_time, time_granularity=time_granularity
table,
start_date_time,
end_date_time,
network=network,
time_granularity=time_granularity,
)

dataframe = self.client.query(query=query).result().to_dataframe()
Expand All @@ -669,6 +671,7 @@ def dynamic_averaging_query(
end_date_time: str,
exclude_columns: list = None,
group_by: list = None,
network: str = "all",
time_granularity: str = "HOUR",
) -> str:
"""
Expand Down Expand Up @@ -728,11 +731,18 @@ def dynamic_averaging_query(
]
)

where_clause: str = (
f"timestamp BETWEEN '{start_date_time}' AND '{end_date_time}' "
)

if network:
where_clause += f"AND network = '{network}' "

# Include time granularity in both SELECT and GROUP BY
timestamp_trunc = f"TIMESTAMP_TRUNC(timestamp, {time_granularity.upper()}) AS {time_granularity.lower()}"
group_by_clause = ", ".join(group_by + [time_granularity.lower()])

query = f"""SELECT {", ".join(group_by)}, {timestamp_trunc}, {avg_columns} FROM `{table}` WHERE timestamp BETWEEN '{start_date_time}' AND '{end_date_time}' GROUP BY {group_by_clause} ORDER BY {time_granularity.lower()};"""
query = f"""SELECT {", ".join(group_by)}, {timestamp_trunc}, {avg_columns} FROM `{table}` WHERE {where_clause} GROUP BY {group_by_clause} ORDER BY {time_granularity.lower()};"""

return query

Expand Down
2 changes: 0 additions & 2 deletions src/workflows/airqo_etl_utils/daily_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def query_hourly_data(start_date_time, end_date_time) -> pd.DataFrame:
table=bigquery_api.hourly_measurements_table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.ALL,
)

return DataValidationUtils.remove_outliers(raw_data)
Expand All @@ -57,7 +56,6 @@ def query_daily_data(start_date_time, end_date_time) -> pd.DataFrame:
table=bigquery_api.daily_measurements_table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.ALL,
)

return DataValidationUtils.remove_outliers(raw_data)
Expand Down
3 changes: 0 additions & 3 deletions src/workflows/airqo_etl_utils/data_warehouse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def extract_hourly_bam_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.bam_measurements_table,
tenant=Tenant.ALL,
)

if data.empty:
Expand All @@ -59,7 +58,6 @@ def extract_data_from_big_query(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.consolidated_data_table,
tenant=Tenant.ALL,
)

@staticmethod
Expand All @@ -83,7 +81,6 @@ def extract_hourly_low_cost_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.hourly_measurements_table,
tenant=Tenant.ALL,
)

if data.empty:
Expand Down
5 changes: 5 additions & 0 deletions src/workflows/airqo_etl_utils/schema/raw_measurements.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "network",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "timestamp",
"type": "TIMESTAMP",
Expand Down
2 changes: 0 additions & 2 deletions src/workflows/airqo_etl_utils/weather_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def extract_hourly_weather_data(start_date_time, end_date_time) -> pd.DataFrame:
start_date_time=start_date_time,
end_date_time=end_date_time,
table=bigquery_api.hourly_weather_table,
tenant=Tenant.ALL,
)
cols = bigquery_api.get_columns(table=bigquery_api.hourly_weather_table)
return pd.DataFrame([], cols) if measurements.empty else measurements
Expand Down Expand Up @@ -79,7 +78,6 @@ def extract_raw_data_from_bigquery(start_date_time, end_date_time) -> pd.DataFra
start_date_time=start_date_time,
end_date_time=end_date_time,
table=bigquery_api.raw_weather_table,
tenant=Tenant.ALL,
)

return measurements
Expand Down
1 change: 1 addition & 0 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def extract_device_measurements(**kwargs) -> pd.DataFrame:
return AirQoDataUtils.extract_aggregated_raw_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
network="airqo",
dynamic_query=True,
)

Expand Down

0 comments on commit f43cad7

Please sign in to comment.