Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fix/clean up #4058

Merged
merged 5 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 56 additions & 46 deletions src/analytics/api/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ class EventsModel(BasePyMongoModel):
BIGQUERY_COHORTS = f"`{CONFIGURATIONS.BIGQUERY_COHORTS}`"
BIGQUERY_COHORTS_DEVICES = f"`{CONFIGURATIONS.BIGQUERY_COHORTS_DEVICES}`"
BIGQUERY_SITES = f"`{CONFIGURATIONS.BIGQUERY_SITES}`"
BIGQUERY_SITES_SITES = f"`{CONFIGURATIONS.BIGQUERY_SITES_SITES}`"
BIGQUERY_DEVICES = f"`{CONFIGURATIONS.BIGQUERY_DEVICES}`"
BIGQUERY_DEVICES_DEVICES = f"`{CONFIGURATIONS.BIGQUERY_DEVICES_DEVICES}`"
DATA_EXPORT_DECIMAL_PLACES = CONFIGURATIONS.DATA_EXPORT_DECIMAL_PLACES

BIGQUERY_EVENTS = CONFIGURATIONS.BIGQUERY_EVENTS
Expand All @@ -51,28 +53,30 @@ def __init__(self, tenant):
"""
self.limit_mapper = {"pm2_5": 500.5, "pm10": 604.5, "no2": 2049}
self.sites_table = self.BIGQUERY_SITES
self.sites_sites_table = self.BIGQUERY_SITES_SITES
self.airqlouds_sites_table = self.BIGQUERY_AIRQLOUDS_SITES
self.devices_table = self.BIGQUERY_DEVICES
self.devices_devices_table = self.BIGQUERY_DEVICES_DEVICES
self.airqlouds_table = self.BIGQUERY_AIRQLOUDS
super().__init__(tenant, collection_name="events")

@property
def device_info_query(self):
"""Generates a device information query including site_id, tenant, and approximate location details."""
"""Generates a device information query including site_id, network, and approximate location details."""
return (
f"{self.devices_table}.site_id AS site_id, "
f"{self.devices_table}.tenant AS tenant "
f"{self.devices_devices_table}.site_id AS site_id, "
f"{self.devices_devices_table}.network AS network "
)

@property
def device_info_query_airqloud(self):
"""Generates a device information query specifically for airqlouds, excluding the site_id."""
return f"{self.devices_table}.tenant AS tenant "
return f"{self.devices_devices_table}.network AS network "

@property
def site_info_query(self):
"""Generates a site information query to retrieve site name and approximate location details."""
return f"{self.sites_table}.name AS site_name "
return f"{self.sites_sites_table}.name AS site_name "

@property
def airqloud_info_query(self):
Expand All @@ -92,8 +96,8 @@ def add_device_join(self, data_query, filter_clause=""):
"""
return (
f"SELECT {self.device_info_query}, data.* "
f"FROM {self.devices_table} "
f"RIGHT JOIN ({data_query}) data ON data.device_name = {self.devices_table}.device_id "
f"FROM {self.devices_devices_table} "
f"RIGHT JOIN ({data_query}) data ON data.device_name = {self.devices_devices_table}.device_id "
f"{filter_clause}"
)

Expand All @@ -110,8 +114,8 @@ def add_device_join_to_airqlouds(self, data_query, filter_clause=""):
"""
return (
f"SELECT {self.device_info_query_airqloud}, data.* "
f"FROM {self.devices_table} "
f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.devices_table}.site_id "
f"FROM {self.devices_devices_table} "
f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.devices_devices_table}.site_id "
f"{filter_clause}"
)

Expand All @@ -127,8 +131,8 @@ def add_site_join(self, data_query):
"""
return (
f"SELECT {self.site_info_query}, data.* "
f"FROM {self.sites_table} "
f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.sites_table}.id "
f"FROM {self.sites_sites_table} "
f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.sites_sites_table}.id "
)

def add_airqloud_join(self, data_query):
Expand Down Expand Up @@ -194,23 +198,23 @@ def get_device_query(
including BAM data if applicable.
"""
query = (
f"{pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name "
f"{pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_devices_table}.name AS device_name "
f"FROM {data_table} "
f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {data_table}.device_id "
f"JOIN {self.devices_devices_table} ON {self.devices_devices_table}.device_id = {data_table}.device_id "
f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) "
f"AND {self.devices_devices_table}.device_id IN UNNEST(@filter_value) "
)
if frequency in ["weekly", "monthly", "yearly"]:
query += " GROUP BY ALL"

query = self.add_site_join(query)
if frequency in ["hourly", "weekly", "monthly", "yearly"]:
bam_query = (
f"{bam_pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name "
f"{bam_pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_devices_table}.name AS device_name "
f"FROM {self.BIGQUERY_BAM_DATA} "
f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id "
f"JOIN {self.devices_devices_table} ON {self.devices_devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id "
f"WHERE {self.BIGQUERY_BAM_DATA}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) "
f"AND {self.devices_devices_table}.device_id IN UNNEST(@filter_value) "
)
if frequency in ["weekly", "monthly", "yearly"]:
bam_query += " GROUP BY ALL"
Expand Down Expand Up @@ -247,9 +251,9 @@ def get_site_query(
query = (
f"{pollutants_query}, {time_grouping}, {self.site_info_query}, {data_table}.device_id AS device_name "
f"FROM {data_table} "
f"JOIN {self.sites_table} ON {self.sites_table}.id = {data_table}.site_id "
f"JOIN {self.sites_sites_table} ON {self.sites_sites_table}.id = {data_table}.site_id "
f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.sites_table}.id IN UNNEST(@filter_value) "
f"AND {self.sites_sites_table}.id IN UNNEST(@filter_value) "
)
if frequency in ["weekly", "monthly", "yearly"]:
query += " GROUP BY ALL"
Expand Down Expand Up @@ -538,7 +542,7 @@ def download_from_bigquery(
else:
drop_columns.append("datetime")
sorting_cols.append("datetime")

dataframe.to_csv("raw_data50.csv")
if data_type == "raw":
cls.simple_data_cleaning(dataframe)

Expand All @@ -551,39 +555,45 @@ def download_from_bigquery(
@classmethod
def simple_data_cleaning(cls, data: pd.DataFrame) -> pd.DataFrame:
"""
Perform data cleaning on a pandas DataFrame to handle specific conditions
related to "pm2_5" and "pm2_5_raw_value" columns.

The cleaning process includes:
1. Ensuring correct numeric data types for "pm2_5" and "pm2_5_raw_value".
2. Removing "pm2_5" values where "pm2_5_raw_value" has data.
3. Dropping the "pm2_5_raw_value" column if it has no data at all.
4. Retaining "pm2_5" values where "pm2_5_raw_value" has no data, and removing
"pm2_5" values where "pm2_5_raw_value" has data.
5. Dropping any column (including "pm2_5" and "pm2_5_raw_value") if it is
entirely empty.

Args:
cls: Class reference (used in classmethods).
data (pd.DataFrame): Input pandas DataFrame with "pm2_5" and
"pm2_5_raw_value" columns.

Returns:
pd.DataFrame: Cleaned DataFrame with updates applied in place.
Perform data cleaning on a pandas DataFrame to handle specific conditions
related to "pm2_5" and "pm2_5_raw_value" columns.

The cleaning process includes:
1. Ensuring correct numeric data types for "pm2_5" and "pm2_5_raw_value".
2. Removing "pm2_5" values where "pm2_5_raw_value" values are not 0s.
3. Dropping the "pm2_5_raw_value" column if all its values are 0s.
4. Retaining "pm2_5" values where "pm2_5_raw_value" values are all 0s.
5. Dropping any column (including "pm2_5" and "pm2_5_raw_value") if all values are 0s.

Args:
cls: Class reference (used in classmethods).
data (pd.DataFrame): Input pandas DataFrame with "pm2_5" and
"pm2_5_raw_value" columns.

Returns:
pd.DataFrame: Cleaned DataFrame with updates applied in place.

Raises:
ValueError: If "pm2_5" or "pm2_5_raw_value" columns are missing.
"""
data["pm2_5_raw_value"] = pd.to_numeric(
data["pm2_5_raw_value"], errors="coerce"
required_columns = ["pm2_5", "pm2_5_raw_value"]

missing_columns = [col for col in required_columns if col not in data.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")

numeric_columns = ["pm2_5", "pm2_5_raw_value"]
data[numeric_columns] = data[numeric_columns].apply(
pd.to_numeric, errors="coerce"
)
data["pm2_5"] = pd.to_numeric(data["pm2_5"], errors="coerce")

data.loc[~data["pm2_5_raw_value"].isna(), "pm2_5"] = np.nan
data.loc[data["pm2_5_raw_value"] != 0, "pm2_5"] = np.nan

if data["pm2_5_raw_value"].isna().all():
if (data["pm2_5_raw_value"] == 0).all():
data.drop(columns=["pm2_5_raw_value"], inplace=True)

data["pm2_5"] = data["pm2_5"].where(data["pm2_5_raw_value"].isna(), np.nan)

zero_columns = data.loc[:, (data == 0).all()].columns
data.drop(columns=zero_columns, inplace=True)
data.dropna(how="all", axis=1, inplace=True)

return data
Expand Down
2 changes: 2 additions & 0 deletions src/analytics/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ class Config:
CACHE_REDIS_URL = f"redis://{env_var('REDIS_SERVER')}:{env_var('REDIS_PORT')}"

BIGQUERY_DEVICES = env_var("BIGQUERY_DEVICES")
BIGQUERY_DEVICES_DEVICES = env_var("BIGQUERY_DEVICES_DEVICES")
BIGQUERY_SITES = env_var("BIGQUERY_SITES")
BIGQUERY_SITES_SITES = env_var("BIGQUERY_SITES_SITES")
BIGQUERY_AIRQLOUDS_SITES = env_var("BIGQUERY_AIRQLOUDS_SITES")
BIGQUERY_AIRQLOUDS = env_var("BIGQUERY_AIRQLOUDS")
DATA_EXPORT_DECIMAL_PLACES = os.getenv("DATA_EXPORT_DECIMAL_PLACES", 2)
Expand Down
2 changes: 2 additions & 0 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self):
self.raw_weather_table = configuration.BIGQUERY_RAW_WEATHER_TABLE
self.consolidated_data_table = configuration.BIGQUERY_ANALYTICS_TABLE
self.sites_table = configuration.BIGQUERY_SITES_TABLE
self.sites_sites_table = configuration.BIGQUERY_SITES_SITES_TABLE
self.airqlouds_table = configuration.BIGQUERY_AIRQLOUDS_TABLE
self.airqlouds_sites_table = configuration.BIGQUERY_AIRQLOUDS_SITES_TABLE
self.grids_table = configuration.BIGQUERY_GRIDS_TABLE
Expand All @@ -54,6 +55,7 @@ def __init__(self):
self.cohorts_devices_table = configuration.BIGQUERY_COHORTS_DEVICES_TABLE
self.sites_meta_data_table = configuration.BIGQUERY_SITES_META_DATA_TABLE
self.devices_table = configuration.BIGQUERY_DEVICES_TABLE
self.devices_devices_table = configuration.BIGQUERY_DEVICES_DEVICES_TABLE
self.devices_summary_table = configuration.BIGQUERY_DEVICES_SUMMARY_TABLE
self.openweathermap_table = configuration.BIGQUERY_OPENWEATHERMAP_TABLE
self.satellite_data_table = configuration.BIGQUERY_SATELLITE_DATA_TABLE
Expand Down
4 changes: 4 additions & 0 deletions src/workflows/airqo_etl_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ class Config:

# Meta data
BIGQUERY_DEVICES_TABLE = os.getenv("BIGQUERY_DEVICES_TABLE")
BIGQUERY_DEVICES_DEVICES_TABLE = os.getenv("BIGQUERY_DEVICES_DEVICES_TABLE")
BIGQUERY_DEVICES_DATA_TABLE = os.getenv("BIGQUERY_DEVICES_DATA_TABLE")
BIGQUERY_SITES_TABLE = os.getenv("BIGQUERY_SITES_TABLE")
BIGQUERY_SITES_SITES_TABLE = os.getenv("BIGQUERY_SITES_SITES_TABLE")
BIGQUERY_SITES_META_DATA_TABLE = os.getenv("BIGQUERY_SITES_META_DATA_TABLE")
BIGQUERY_AIRQLOUDS_TABLE = os.getenv("BIGQUERY_AIRQLOUDS_TABLE")
BIGQUERY_AIRQLOUDS_SITES_TABLE = os.getenv("BIGQUERY_AIRQLOUDS_SITES_TABLE")
Expand Down Expand Up @@ -371,9 +373,11 @@ class Config:
BIGQUERY_GRIDS_SITES_TABLE: "grids_sites.json",
BIGQUERY_COHORTS_DEVICES_TABLE: "cohorts_devices.json",
BIGQUERY_SITES_TABLE: "sites.json",
BIGQUERY_SITES_SITES_TABLE: "sites.json",
BIGQUERY_SITES_META_DATA_TABLE: "sites_meta_data.json",
SENSOR_POSITIONS_TABLE: "sensor_positions.json",
BIGQUERY_DEVICES_TABLE: "devices.json",
BIGQUERY_DEVICES_DEVICES_TABLE: "devices.json",
BIGQUERY_CLEAN_RAW_MOBILE_EVENTS_TABLE: "mobile_measurements.json",
BIGQUERY_UNCLEAN_RAW_MOBILE_EVENTS_TABLE: "mobile_measurements.json",
BIGQUERY_AIRQO_MOBILE_EVENTS_TABLE: "airqo_mobile_measurements.json",
Expand Down
3 changes: 1 addition & 2 deletions src/workflows/airqo_etl_utils/meta_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def extract_sites_from_api(network: str = "all") -> pd.DataFrame:
"approximate_latitude",
"approximate_longitude",
"name",
"location",
"search_name",
"location_name",
"description",
Expand All @@ -147,7 +146,7 @@ def extract_sites_from_api(network: str = "all") -> pd.DataFrame:
},
inplace=True,
)

dataframe["last_updated"] = datetime.now(timezone.utc)
dataframe = DataValidationUtils.remove_outliers(dataframe)

return dataframe
Expand Down
4 changes: 2 additions & 2 deletions src/workflows/airqo_etl_utils/schema/devices.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
{
"name": "device_id",
"type": "STRING",
"mode": "NULLABLE"
"mode": "REQUIRED"
},
{
"name": "device_number",
Expand Down Expand Up @@ -52,6 +52,6 @@
{
"name": "last_updated",
"type": "TIMESTAMP",
"mode": "NULLABLE"
"mode": "REQUIRED"
}
]
12 changes: 1 addition & 11 deletions src/workflows/airqo_etl_utils/schema/sites.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
[
{
"name": "tenant",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "network",
"type": "STRING",
Expand Down Expand Up @@ -39,11 +34,6 @@
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "location",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "display_name",
"type": "STRING",
Expand Down Expand Up @@ -76,7 +66,7 @@
},
{
"name": "last_updated",
"type": "DATE",
"type": "TIMESTAMP",
"mode": "NULLABLE"
}
]
4 changes: 2 additions & 2 deletions src/workflows/dags/meta_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def load_sites(data: pd.DataFrame):
big_query_api = BigQueryApi()
big_query_api.update_sites_and_devices(
dataframe=data,
table=big_query_api.sites_table,
table=big_query_api.sites_sites_table,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Inconsistent table references across DAGs

While the first DAG has been updated to use the new table references (sites_sites_table and devices_devices_table), the second DAG "Update-BigQuery-Sites-Grids-And-Devices" still uses the old references (sites_table and devices_table). This inconsistency could lead to data synchronization issues.

Apply this diff to update the second DAG:

    def load_sites(data: pd.DataFrame):
        from airqo_etl_utils.bigquery_api import BigQueryApi

        big_query_api = BigQueryApi()
        big_query_api.update_sites_and_devices(
            dataframe=data,
-           table=big_query_api.sites_table,
+           table=big_query_api.sites_sites_table,
            component="sites",
        )

    def load_devices(data: pd.DataFrame):
        from airqo_etl_utils.bigquery_api import BigQueryApi

        big_query_api = BigQueryApi()
        big_query_api.update_sites_and_devices(
            dataframe=data,
-           table=big_query_api.devices_table,
+           table=big_query_api.devices_devices_table,
            component="devices",
        )

Also applies to: 78-78

component="sites",
)

Expand Down Expand Up @@ -75,7 +75,7 @@ def load_devices(data: pd.DataFrame):
big_query_api = BigQueryApi()
big_query_api.update_sites_and_devices(
dataframe=data,
table=big_query_api.devices_table,
table=big_query_api.devices_devices_table,
component="devices",
)

Expand Down
Loading