diff --git a/src/workflows/airqo_etl_utils/airqo_api.py b/src/workflows/airqo_etl_utils/airqo_api.py index 81b5808818..b5c8f38d3a 100644 --- a/src/workflows/airqo_etl_utils/airqo_api.py +++ b/src/workflows/airqo_etl_utils/airqo_api.py @@ -5,7 +5,7 @@ import simplejson import urllib3 from urllib3.util.retry import Retry -from typing import List, Dict, Any, Union, Generator, Tuple +from typing import List, Dict, Any, Union, Generator, Tuple, Optional from .config import configuration from .constants import DeviceCategory, Tenant @@ -178,7 +178,6 @@ def get_devices( """ params = {"tenant": str(Tenant.AIRQO), "category": str(device_category)} if configuration.ENVIRONMENT == "production": - # Query for active devices only when in production params["active"] = "yes" if tenant != Tenant.ALL: @@ -200,15 +199,124 @@ def get_devices( DeviceCategory.from_str(device.pop("category", None)) ), "tenant": device.get("network"), - "device_manufacturer": Tenant.from_str( - device.pop("network") - ).device_manufacturer(), + "device_manufacturer": device.get("network", "airqo"), **device, } for device in response.get("devices", []) ] return devices + def get_networks( + self, net_status: str = "active" + ) -> Tuple[List[Dict[str, Any]], Optional[str]]: + """ + Retrieve a list of networks. + + Args: + net_status (str): The status of networks to retrieve. Defaults to "active". + + Returns: + Tuple[List[Dict[str, Any]], Optional[str]]: + - List of networks (dictionaries) retrieved from the API. + - Optional error message if an exception occurs. + """ + params = {} + + params = {} + networks: List[Dict[str, Any]] = [] + exception_message: Optional[str] = None + + if configuration.ENVIRONMENT == "production": + params["net_status"] = net_status + + try: + response = self.__request("users/networks", params) + networks = response.get("networks", []) + except Exception as e: + exception_message = f"Failed to fetch networks: {e}" + logger.exception(exception_message) + + return networks, exception_message + + def get_devices_by_network( + self, device_category: DeviceCategory = DeviceCategory.LOW_COST + ) -> List[Dict[str, Any]]: + """ + Retrieve devices by network based on the specified device category. + + Args: device_category (DeviceCategory, optional): The category of devices to retrieve. Defaults to `DeviceCategory.LOW_COST`. + + Returns: + List[Dict[str, Any]]: A List of dictionaries containing the details of the devices. The dictionary has the following structure. + [ + { + "_id": str, + "visibility": bool, + "mobility": bool, + "height": int, + "device_codes": List[str] + "status": str, + "isPrimaryInLocation": bool, + "nextMaintenance": date(str), + "category": str, + "isActive": bool, + "long_name": str, + "network": str, + "alias": str", + "name": str, + "createdAt": date(str), + "description": str, + "latitude": float, + "longitude": float, + "approximate_distance_in_km": float, + "bearing_in_radians": float, + "deployment_date": date(str), + "mountType": str, + "powerType": str, + "recall_date": date(str), + "previous_sites": List[Dict[str, Any]], + "cohorts": List, + "site": Dict[str, Any], + "device_number": int + }, + ] + """ + devices: List[Dict[str, Any]] = [] + networks, error = self.get_networks() + + if error: + logger.error(f"Error while fetching networks: {error}") + return devices + + params = {"category": str(device_category)} + if configuration.ENVIRONMENT == "production": + params["active"] = "yes" + + for network in networks: + network_name = network.get("net_name", "airqo") + params["network"] = network_name + try: + response = self.__request("devices/summary", params) + devices.extend( + [ + { + "site_id": device.get("site", {}).get("_id", None), + "site_location": device.pop("site", {}).get( + "location_name", None + ), + "device_category": device.pop("category", None), + "device_manufacturer": network_name, + **device, + } + for device in response.get("devices", []) + ] + ) + except Exception as e: + logger.exception(f"Failed to fetch devices on {network_name}: {e}") + continue + + return devices + def get_thingspeak_read_keys( self, devices: pd.DataFrame, return_type: str = "all" ) -> Union[Dict[int, str], Generator[Tuple[int, str], None, None]]: diff --git a/src/workflows/airqo_etl_utils/airqo_gx_utils.py b/src/workflows/airqo_etl_utils/airqo_gx_utils.py index 080562654e..ad76f24bbd 100644 --- a/src/workflows/airqo_etl_utils/airqo_gx_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_gx_utils.py @@ -7,6 +7,7 @@ import os from pathlib import Path import numpy as np +import pandas as pd from .airqo_gx_metrics import AirQoGxExpectations from .config import configuration @@ -368,7 +369,7 @@ def digest_validation_results( "expectation_suite_name" ] run_result = validation_result["validation_result"]["success"] - # Not being used for at the moment + # Not being used at the moment # local_site = validation_result["actions_results"]["update_data_docs"][ # "local_site" # ] @@ -378,13 +379,13 @@ def digest_validation_results( # Type ExpectationConfig expectation_type = result["expectation_config"]["expectation_type"] partial_unexpected_list = [ - "null" if np.isnan(x) else x + "null" if pd.isna(x) else x for x in result["result"].get("partial_unexpected_list", []) ] partial_unexpected_counts = [ { - "value": "null" if np.isnan(item["value"]) else item["value"], + "value": "null" if pd.isna(item["value"]) else item["value"], "count": item["count"], } for item in result["result"].get("partial_unexpected_counts", []) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index abbbc0ec52..c0520d2260 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -1,4 +1,3 @@ -import traceback from datetime import datetime, timezone import numpy as np @@ -18,10 +17,10 @@ from .data_validator import DataValidationUtils from .date import date_to_str from .ml_utils import GCSUtils -from .thingspeak_api import ThingspeakApi +from .data_sources import DataSourcesApis from .utils import Utils from .weather_data_utils import WeatherDataUtils -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional, Union from .airqo_gx_expectations import AirQoGxExpectations import logging @@ -172,6 +171,101 @@ def flatten_field_8(device_category: DeviceCategory, field_8: str = None): return series + @staticmethod + def map_and_extract_data( + data_mapping: Dict[str, Union[str, Dict[str, List[str]]]], + data: Union[List[Any], Dict[str, Any]], + ) -> pd.DataFrame: + """ + Map and extract specified fields from input data based on a provided mapping and extraction fields. + + Args: + data_mapping (Dict[str, str]): A dictionary mapping source keys to target keys. + Example: {"pm25": "pm2_5", "pm10": "pm10", "tp": "temperature"} + data (Dict[str, Any]|): Input data containing raw key-value pairs to map and extract. + Example: + { + "pm25": {"conc": 21, "aqius": 73, "aqicn": 30}, + "pm10": {"conc": 37, "aqius": 34, "aqicn": 37}, + "pr": 100836, + "hm": 28, + "tp": 39.7, + "ts": "2024-11-24T13:14:40.000Z" + } + + Returns: + pd.Series: A pandas Series containing the mapped and extracted data. + """ + + def process_single_entry(entry: Dict[str, Any]) -> Dict[str, Any]: + """ + Process a single dictionary entry and map its data based on the mapping. + + Args: + entry (Dict[str, Any]): A single data entry. + + Returns: + Dict[str, Any]: A dictionary with the mapped data. + """ + row_data = {} + + # Process 'field8' mapping + if "field8" in entry and isinstance(entry["field8"], str): + field8_mapping = data_mapping.get("field8") + try: + field8_values: List[str] = entry.pop("field8").split(",") + for index, target_key in field8_mapping.items(): + if target_key not in row_data: + row_data[target_key] = ( + field8_values[index] + if index < len(field8_values) + else None + ) + except (ValueError, TypeError, AttributeError) as e: + logger.warning(f"Error processing field8: {e}") + + # Process the remaining fields + for key, value_data in entry.items(): + target_key = data_mapping.get(key) + target_value = None + if isinstance(target_key, dict): + target_value = target_key.get("value") + target_key = target_key.get("key") + + if target_key and target_key not in row_data: + if isinstance(value_data, dict): + extracted_value = AirQoDataUtils._extract_nested_value( + value_data, target_value + ) + else: + extracted_value = value_data + row_data[target_key] = extracted_value + return row_data + + if isinstance(data, dict): + data = [data] + elif not isinstance(data, list): + raise ValueError( + f"Invalid data format. Expected a dictionary or a list of dictionaries got {type(data)}" + ) + + processed_rows = [process_single_entry(entry) for entry in data] + + return pd.DataFrame(processed_rows) + + def _extract_nested_value(data: Dict[str, Any], key: str) -> Any: + """ + Helper function to extract a nested value from a dictionary. + + Args: + data (Dict[str, Any]): The input dictionary containing nested data. + key (str): The key to extract the value for. + + Returns: + Any: The extracted value or None if not found. + """ + return data.get(key) + @staticmethod def flatten_meta_data(meta_data: list) -> list: data = [] @@ -323,7 +417,7 @@ def extract_devices_data( """ airqo_api = AirQoApi() - thingspeak_api = ThingspeakApi() + data_source_api = DataSourcesApis() devices = airqo_api.get_devices( tenant=Tenant.AIRQO, device_category=device_category ) @@ -336,8 +430,10 @@ def extract_devices_data( if device_category == DeviceCategory.BAM: field_8_cols = list(configuration.AIRQO_BAM_CONFIG.values()) + mapping = configuration.AIRQO_BAM_CONFIG elif device_category == DeviceCategory.LOW_COST_GAS: field_8_cols = list(configuration.AIRQO_LOW_COST_GAS_CONFIG.values()) + mapping = configuration.AIRQO_LOW_COST_GAS_CONFIG other_fields_cols.extend( [ "pm2_5", @@ -385,16 +481,19 @@ def extract_devices_data( continue for start, end in dates: - data = thingspeak_api.query_data( + data = data_source_api.thingspeak( device_number=device_number, start_date_time=start, end_date_time=end, read_key=read_key, ) - if data.empty: - logger.exception( - f"Device does not have data between {start} and {end}" - ) + data, meta_data, data_available = data_source_api.thingspeak( + device_number=device_number, + start_date_time=start, + end_date_time=end, + read_key=read_key, + ) + if not data_available: continue if "field8" not in data.columns.to_list(): @@ -408,7 +507,6 @@ def extract_devices_data( ) ) - meta_data = data.attrs.pop("meta_data", {}) data["device_number"] = device_number data["device_id"] = devices.loc[device_number].device_id data["site_id"] = devices.loc[device_number].site_id @@ -437,6 +535,135 @@ def extract_devices_data( return devices_data + @staticmethod + def extract_devices_data_( + start_date_time: str, + end_date_time: str, + device_category: DeviceCategory, + device_numbers: list = None, + remove_outliers: bool = True, + ) -> pd.DataFrame: + """ + Extracts sensor measurements from AirQo devices recorded between specified date and time ranges. + + Retrieves sensor data from Thingspeak API for devices belonging to the specified device category (BAM or low-cost sensors). + Optionally filters data by specific device numbers and removes outliers if requested. + + Args: + start_date_time (str): Start date and time (ISO 8601 format) for data extraction. + end_date_time (str): End date and time (ISO 8601 format) for data extraction. + device_category (DeviceCategory): Category of devices to extract data from (BAM or low-cost sensors). + device_numbers (list, optional): List of device numbers whose data to extract. Defaults to None (all devices). + remove_outliers (bool, optional): If True, removes outliers from the extracted data. Defaults to True. + """ + devices_data = pd.DataFrame() + airqo_api = AirQoApi() + data_source_api = DataSourcesApis() + + devices = airqo_api.get_devices_by_network(device_category=device_category) + if not devices: + logger.exception("Failed to fetch devices.") + return devices_data + + other_fields_cols: List[str] = [] + network: str = None + devices = ( + [x for x in devices if x["device_number"] in device_numbers] + if device_numbers + else devices + ) + + config = configuration.device_config_mapping.get(str(device_category), None) + if not config: + logger.warning("Missing device category.") + return devices_data + + field_8_cols = config["field_8_cols"] + other_fields_cols = config["other_fields_cols"] + data_columns = list( + set( + [ + "device_number", + "device_id", + "site_id", + "latitude", + "longitude", + "timestamp", + *field_8_cols, + *other_fields_cols, + ] + ) + ) + + dates = Utils.query_dates_array( + start_date_time=start_date_time, + end_date_time=end_date_time, + data_source=DataSource.THINGSPEAK, + ) + + for device in devices: + device_number = device.get("device_number", None) + read_key = device.get("readKey", None) + network = device.get("network", None) + + if device_number and read_key is None: + logger.exception(f"{device_number} does not have a read key") + continue + data = [] + if device_number and network == "airqo": + for start, end in dates: + data_, meta_data, data_available = data_source_api.thingspeak( + device_number=device_number, + start_date_time=start, + end_date_time=end, + read_key=read_key, + ) + if data_available: + data.extend(data_) + mapping = config["mapping"][network] + data = AirQoDataUtils.map_and_extract_data(mapping, data) + elif network == "iqair": + mapping = config["mapping"][network] + try: + data = AirQoDataUtils.map_and_extract_data( + mapping, data_source_api.iqair(device) + ) + except Exception as e: + logger.exception(f"An error occured: {e} - device {device['name']}") + continue + if isinstance(data, pd.DataFrame) and data.empty: + logger.warning(f"No data received from {device['name']}") + continue + + if isinstance(data, pd.DataFrame) and not data.empty: + data = DataValidationUtils.fill_missing_columns( + data=data, cols=data_columns + ) + data["device_number"] = device_number + data["device_id"] = device["name"] + data["site_id"] = device["site_id"] + data["network"] = network + + # TODO Clean up long,lat assignment. + if device_category in AirQoDataUtils.Device_Field_Mapping: + data["latitude"] = device["latitude"] + data["longitude"] = device["longitude"] + else: + data["latitude"] = meta_data.get("latitude", None) + data["longitude"] = meta_data.get("longitude", None) + + devices_data = pd.concat([devices_data, data], ignore_index=True) + + if remove_outliers: + if "vapor_pressure" in devices_data.columns.to_list(): + is_airqo_network = devices_data["network"] == "airqo" + devices_data.loc[is_airqo_network, "vapor_pressure"] = devices_data.loc[ + is_airqo_network, "vapor_pressure" + ].apply(DataValidationUtils.convert_pressure_values) + devices_data = DataValidationUtils.remove_outliers(devices_data) + + return devices_data + @staticmethod def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame: """ @@ -527,7 +754,7 @@ def clean_low_cost_sensor_data( data.dropna(subset=["timestamp"], inplace=True) data["timestamp"] = pd.to_datetime(data["timestamp"]) data.drop_duplicates( - subset=["timestamp", "device_number"], keep="first", inplace=True + subset=["timestamp", "device_id"], keep="first", inplace=True ) # TODO Find an appropriate place to put this if device_category == DeviceCategory.LOW_COST: diff --git a/src/workflows/airqo_etl_utils/config.py b/src/workflows/airqo_etl_utils/config.py index cfbaeb9386..ca8540c8b9 100644 --- a/src/workflows/airqo_etl_utils/config.py +++ b/src/workflows/airqo_etl_utils/config.py @@ -199,6 +199,51 @@ class Config: 9: "humidity", 10: "vapor_pressure", } + AIRQO_LOW_COST_FIELD_MAPPING = { + "field1": "s1_pm2_5", + "field2": "s1_pm10", + "field3": "s2_pm2_5", + "field4": "s2_pm10", + "field7": "battery", + "created_at": "timestamp", + "field8": { + 0: "latitude", + 1: "longitude", + 2: "altitude", + 3: "wind_speed", # For mobile devices (Velocity) + 4: "satellites", # Number of satelites tracked + 5: "hdop", # For mobile devices + 6: "device_temperature", # Internal + 7: "device_humidity", # Internal + 8: "temperature", # Internal + 9: "humidity", + 10: "vapor_pressure", + }, + } + URBANBETTER_LOW_COST_FIELD_MAPPING = { + "pollutants.no2.value": "no2", + "pollutants.voc.value": "voc", + "pollutants.pm25.value": "pm2_5", + "pollutants.pm10.value": "pm10", + "pollutants.pm1.value": "pm1", + "pollutants.no2.pi": "no2_pi", + "pollutants.voc.pi": "voc_pi", + "pollutants.pm25.pi": "pm2_5_pi", + "pollutants.pm10.pi": "pm10_pi", + "pollutants.pm1.pi": "pm1_pi", + "date": "timestamp", + } + + IQAIR_LOW_COST_FIELD_MAPPING = { + "pm25": {"key": "pm2_5", "value": "conc"}, + "pm10": {"key": "pm10", "value": "conc"}, + "pm1": {"key": "pm1", "value": "conc"}, + "pr": "pressure", + "hm": "humidity", + "tp": "temperature", + "ts": "timestamp", + } + AIRQO_DATA_COLUMN_NAME_MAPPING = { "pm2_5": "pm2_5", "s1_pm2_5": "pm2_5", @@ -224,6 +269,41 @@ class Config: "pm1_pi": "pm1", } + device_config_mapping = { + "bam": { + "field_8_cols": list(AIRQO_BAM_CONFIG.values()), + "mapping": {"airqo": AIRQO_BAM_CONFIG}, + "other_fields_cols": [], + }, + "gas": { + "field_8_cols": list(AIRQO_LOW_COST_GAS_CONFIG.values()), + "mapping": {"airqo": AIRQO_LOW_COST_GAS_CONFIG}, + "other_fields_cols": [ + "pm2_5", + "tvoc", + "hcho", + "co2", + "intaketemperature", + "intakehumidity", + "battery", + ], + }, + "lowcost": { + "field_8_cols": list(AIRQO_LOW_COST_CONFIG.values()), + "mapping": { + "airqo": AIRQO_LOW_COST_FIELD_MAPPING, + "iqair": IQAIR_LOW_COST_FIELD_MAPPING, + }, + "other_fields_cols": [ + "s1_pm2_5", + "s1_pm10", + "s2_pm2_5", + "s2_pm10", + "battery", + ], + }, + } + # Schema files mapping SCHEMA_FILE_MAPPING = { BIGQUERY_HOURLY_EVENTS_TABLE: "measurements.json", diff --git a/src/workflows/airqo_etl_utils/data_sources.py b/src/workflows/airqo_etl_utils/data_sources.py new file mode 100644 index 0000000000..edce7897a2 --- /dev/null +++ b/src/workflows/airqo_etl_utils/data_sources.py @@ -0,0 +1,167 @@ +import json + +import pandas as pd +import requests + +from .config import configuration +import logging +from typing import Any, Dict, List, Union, Tuple, Optional + +logger = logging.getLogger(__name__) + + +class DataSourcesApis: + def __init__(self): + self.THINGSPEAK_CHANNEL_URL = configuration.THINGSPEAK_CHANNEL_URL + + def query_data( + self, + device_number: int, + start_date_time: str, + end_date_time: str, + read_key: str, + ) -> pd.DataFrame: + data = pd.DataFrame([]) + + try: + url = f"{self.THINGSPEAK_CHANNEL_URL}{device_number}/feeds.json?start={start_date_time}&end={end_date_time}&api_key={read_key}" + print(f"{url}") + + response = json.loads( + requests.get(url, timeout=100.0).content.decode("utf-8") + ) + + if (response != -1) and ("feeds" in response): + data = pd.DataFrame(response["feeds"]) + data.attrs["meta_data"] = response["channel"] + + except Exception as ex: + logger.exception(f"An error occured: {ex}") + + return data + + def thingspeak( + self, + device_number: int, + start_date_time: str, + end_date_time: str, + read_key: str, + ) -> Optional[Tuple[List[Dict[str, Any]], Dict[str, Any], bool]]: + """ + Fetch data from a ThingSpeak channel for a specific device within a time range. + + Args: + device_number (int): The ThingSpeak channel ID corresponding to the device. + start_date_time (str): The start timestamp in ISO 8601 format (e.g., "YYYY-MM-DDTHH:mm:ssZ"). + end_date_time (str): The end timestamp in ISO 8601 format. + read_key (str): The API key to authenticate the request for the specified channel. + + Returns: + Optional[Tuple[List[Dict[str, Any]], Dict[str, Any], bool]]: + - A list of dictionaries containing the channel feeds data. + - A dictionary containing metadata about the channel. + - Returns `None` if no valid data is found or an error occurs. + + Raises: + requests.exceptions.RequestException: For issues with the HTTP request. + ValueError: If the response data is invalid or malformed. + Exception: For any other unexpected errors. + """ + + data: List[Dict[str, Any]] = None + meta_data: Dict[str, Any] = None + data_available: bool = True + try: + url = f"{self.THINGSPEAK_CHANNEL_URL}{device_number}/feeds.json?start={start_date_time}&end={end_date_time}&api_key={read_key}" + logger.info(f"Fetching data from URL: {url}") + + response_data = json.loads( + requests.get(url, timeout=100.0).content.decode("utf-8") + ) + + if (response_data != -1) and ("feeds" in response_data): + data = response_data.get("feeds", {}) + meta_data = response_data.get("channel", {}) + + except requests.exceptions.RequestException as req_err: + logger.error(f"Request error while fetching ThingSpeak data: {req_err}") + except ValueError as val_err: + logger.error(f"Value error: {val_err}") + except Exception as ex: + logger.exception(f"An unexpected error occurred: {ex}") + + if not data: + data_available = False + logger.exception( + f"Device does not have data between {start_date_time} and {end_date_time}" + ) + + return data, meta_data, data_available + + def iqair( + self, device: Dict[str, Any], resolution: str = "current" + ) -> Union[List, Dict]: + """ + Retrieve data from the IQAir API for a specific device and resolution. + + Args: + device (Dict[str, Any]): A dictionary containing device details, such as: + - api_code (str): The base URL or endpoint for the API. + - serial_number (str): The unique identifier for the device. + resolution (str): The data resolution to retrieve. Options include: + - "current": Real-time data (default). + - "instant": Instantaneous measurements. + - "hourly": Hourly aggregated data. + - "daily": Daily aggregated data. + - "monthly": Monthly aggregated data. + + Returns: + Union[List, Dict]: A list or dictionary containing the retrieved data, or `None` in case of errors or no data. + + Raises: + ValueError: If an invalid resolution is provided or if the response data is invalid or malformed. + requests.exceptions.RequestException: For issues with the HTTP request. + Exception: For any other unexpected errors. + """ + + valid_resolutions = {"current", "instant", "hourly", "daily", "monthly"} + historical_resolutions = {"instant", "hourly", "daily", "monthly"} + + if resolution not in valid_resolutions: + raise ValueError( + f"Invalid resolution '{resolution}'. Choose from {valid_resolutions}." + ) + + # Determine the appropriate API resolution path + api_resolution = ( + "historical" if resolution in historical_resolutions else resolution + ) + data = None + try: + base_url = device.get("api_code") + device_id = device.get("serial_number") + if not base_url or not device_id: + logger.exception( + "Device information must include 'api_code' and 'serial_number'." + ) + + url = f"{base_url}/{device_id}" + logger.info(f"Fetching data from URL: {url}") + + response = requests.get(url, timeout=10) + response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) + response_data = response.json() + + if api_resolution in response_data: + if resolution == "current": + data = response_data.get("current") + else: + historical_data = response_data.get("historical", {}) + data = historical_data.get(resolution, []) + except requests.exceptions.RequestException as req_err: + logger.error(f"Request error while fetching IQAir data: {req_err}") + except ValueError as val_err: + logger.error(f"Value error: {val_err}") + except Exception as ex: + logger.exception(f"An unexpected error occurred: {ex}") + return data diff --git a/src/workflows/airqo_etl_utils/data_validator.py b/src/workflows/airqo_etl_utils/data_validator.py index 5b93ca21f9..273faab0b0 100644 --- a/src/workflows/airqo_etl_utils/data_validator.py +++ b/src/workflows/airqo_etl_utils/data_validator.py @@ -40,23 +40,57 @@ def format_data_types( integers: list = None, timestamps: list = None, ) -> pd.DataFrame: - floats = [] if floats is None else floats - integers = [] if integers is None else integers - timestamps = [] if timestamps is None else timestamps + """ + Formats specified columns in a DataFrame to desired data types: float, integer, and datetime. + + Args: + data(pd.DataFrame): The input DataFrame containing the data to be formatted. + floats(list, optional): List of column names to be converted to floats. Defaults to an empty list. + integers(list, optional): List of column names to be converted to integers. Defaults to an empty list. + timestamps(list, optional): List of column names to be converted to datetime. Defaults to an empty list. + + Returns: + pd.DataFrame: A DataFrame with the specified columns formatted to their respective data types. + + Notes: + ------ + - Columns specified in `floats` are converted to floats. Rows with invalid values are coerced to NaN. + - Columns specified in `integers` are stripped of non-numeric characters, and invalid values are replaced with -1. + - Columns specified in `timestamps` are converted to datetime. Invalid timestamps are coerced to NaT. + - The function modifies the input DataFrame in place and returns it. + """ + + floats = floats or [] + integers = integers or [] + timestamps = timestamps or [] + + if floats: + data[floats] = data[floats].apply(pd.to_numeric, errors="coerce") - # This drops rows that have data that cannot be converted - data[floats] = data[floats].apply(pd.to_numeric, errors="coerce") - data[timestamps] = data[timestamps].apply(pd.to_datetime, errors="coerce") + if timestamps: + for col in timestamps: + data[col] = ( + data[col] + .astype(str) + .str.replace(r"(? Any: @staticmethod def remove_outliers(data: pd.DataFrame) -> pd.DataFrame: + """ + Cleans and validates data in a DataFrame by formatting columns to their proper types and removing or correcting outliers based on predefined validation rules. + + Args: + data (pd.DataFrame): Input DataFrame containing the raw data to clean. + + Returns: + pd.DataFrame: A DataFrame with outliers removed or corrected and data formatted to their respective types (float, integer, timestamp). + """ big_query_api = BigQueryApi() - float_columns = set( - big_query_api.get_columns(table="all", column_type=[ColumnDataType.FLOAT]) - ) - integer_columns = set( - big_query_api.get_columns(table="all", column_type=[ColumnDataType.INTEGER]) - ) - timestamp_columns = set( - big_query_api.get_columns( + column_types = { + ColumnDataType.FLOAT: big_query_api.get_columns( + table="all", column_type=[ColumnDataType.FLOAT] + ), + ColumnDataType.INTEGER: big_query_api.get_columns( + table="all", column_type=[ColumnDataType.INTEGER] + ), + ColumnDataType.TIMESTAMP: big_query_api.get_columns( table="all", column_type=[ColumnDataType.TIMESTAMP] - ) - ) + ), + } - float_columns = list(float_columns & set(data.columns)) - integer_columns = list(integer_columns & set(data.columns)) - timestamp_columns = list(timestamp_columns & set(data.columns)) + filtered_columns = { + dtype: list(set(columns) & set(data.columns)) + for dtype, columns in column_types.items() + } data = DataValidationUtils.format_data_types( data=data, - floats=float_columns, - integers=integer_columns, - timestamps=timestamp_columns, + floats=filtered_columns[ColumnDataType.FLOAT], + integers=filtered_columns[ColumnDataType.INTEGER], + timestamps=filtered_columns[ColumnDataType.TIMESTAMP], ) - columns = list(chain(float_columns, integer_columns, timestamp_columns)) + validated_columns = list(chain.from_iterable(filtered_columns.values())) - for col in columns: - name = configuration.AIRQO_DATA_COLUMN_NAME_MAPPING.get(col, None) - data.loc[:, col] = data[col].apply( + for col in validated_columns: + is_airqo_network = data["network"] == "airqo" + mapped_name = configuration.AIRQO_DATA_COLUMN_NAME_MAPPING.get(col, None) + data.loc[is_airqo_network, col] = data.loc[is_airqo_network, col].apply( lambda x: DataValidationUtils.get_valid_value( - column_name=name, row_value=x + column_name=mapped_name, row_value=x ) ) - return data @staticmethod diff --git a/src/workflows/airqo_etl_utils/thingspeak_api.py b/src/workflows/airqo_etl_utils/thingspeak_api.py deleted file mode 100644 index f7354d842f..0000000000 --- a/src/workflows/airqo_etl_utils/thingspeak_api.py +++ /dev/null @@ -1,40 +0,0 @@ -import json - -import pandas as pd -import requests - -from .config import configuration -import logging - -logger = logging.getLogger(__name__) - - -class ThingspeakApi: - def __init__(self): - self.THINGSPEAK_CHANNEL_URL = configuration.THINGSPEAK_CHANNEL_URL - - def query_data( - self, - device_number: int, - start_date_time: str, - end_date_time: str, - read_key: str, - ) -> pd.DataFrame: - data = pd.DataFrame([]) - - try: - url = f"{self.THINGSPEAK_CHANNEL_URL}{device_number}/feeds.json?start={start_date_time}&end={end_date_time}&api_key={read_key}" - print(f"{url}") - - response = json.loads( - requests.get(url, timeout=100.0).content.decode("utf-8") - ) - - if (response != -1) and ("feeds" in response): - data = pd.DataFrame(response["feeds"]) - data.attrs["meta_data"] = response["channel"] - - except Exception as ex: - logger.exception(f"An error occured: {ex}") - - return data diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index 31d9bd6889..7bc76e0ce9 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -7,6 +7,7 @@ airqo_realtime_low_cost_measurements_doc, airqo_historical_hourly_measurements_doc, airqo_gaseous_realtime_low_cost_data_doc, + airqo_historical_raw_low_cost_measurements_doc, ) from task_docs import ( extract_raw_airqo_data_doc, @@ -130,6 +131,7 @@ def send_hourly_measurements_to_message_broker( @dag( "AirQo-Historical-Raw-Low-Cost-Measurements", + doc_md=airqo_historical_raw_low_cost_measurements_doc, schedule="0 4 * * *", catchup=False, tags=["airqo", "raw", "historical", "low cost"], @@ -491,7 +493,7 @@ def extract_raw_data(**kwargs): start_date_time = date_to_str_hours(hour_of_day) end_date_time = datetime.strftime(hour_of_day, "%Y-%m-%dT%H:59:59Z") - return AirQoDataUtils.extract_devices_data( + return AirQoDataUtils.extract_devices_data_( start_date_time=start_date_time, end_date_time=end_date_time, device_category=DeviceCategory.LOW_COST, diff --git a/src/workflows/dags/dag_docs.py b/src/workflows/dags/dag_docs.py index 52ba94e1ad..56f104fef7 100644 --- a/src/workflows/dags/dag_docs.py +++ b/src/workflows/dags/dag_docs.py @@ -27,13 +27,26 @@ - Bigquery:raw_data.device_measurements - Bigquery:averaged_data.hourly_weather_data Data Destinations: -- Bigquery(stage):averaged_data_stage.hourly_device_measurements - Bigquery(prod):averaged_data.hourly_device_measurements - API(devices/events): - Kafka(hourly-measurements-topic): - AirQo """ +airqo_historical_raw_low_cost_measurements_doc = """ +### AirQo historical raw low cost data ETL +#### Purpose +Extracts historica, raw measurements for low cost sensors going back 2 days. +#### Notes +Data sources: +- Airqo api - devices +- ThingSpeak - Measurements +Data Destinations: +- Bigquery:raw_data.device_measurements +- API(events/measurements): +- AirQo +""" + airqo_gaseous_realtime_low_cost_data_doc = """ ### AirQo Gaseous low cost sensors hourly ETL #### Purpose