From 293aec8d0dc3f9f2fb12ab0560bd1f1398d652fb Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:19:30 +0300 Subject: [PATCH 01/13] Update AirQo exceedance production image tag to prod-a7e2441e-1733386719 --- k8s/exceedance/values-prod-airqo.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/exceedance/values-prod-airqo.yaml b/k8s/exceedance/values-prod-airqo.yaml index 87510329f1..eaece32bef 100644 --- a/k8s/exceedance/values-prod-airqo.yaml +++ b/k8s/exceedance/values-prod-airqo.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/airqo-exceedance-job - tag: prod-1c7e9a8e-1733346549 + tag: prod-a7e2441e-1733386719 nameOverride: '' fullnameOverride: '' From 3333d5be01a779b76698968fda6f6e1858b75b82 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:19:37 +0300 Subject: [PATCH 02/13] Update auth service staging image tag to stage-73463ae7-1733386665 --- k8s/auth-service/values-stage.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/auth-service/values-stage.yaml b/k8s/auth-service/values-stage.yaml index 2eac62b202..af9a73e7ed 100644 --- a/k8s/auth-service/values-stage.yaml +++ b/k8s/auth-service/values-stage.yaml @@ -6,7 +6,7 @@ app: replicaCount: 2 image: repository: eu.gcr.io/airqo-250220/airqo-stage-auth-api - tag: stage-1838aa7a-1733249623 + tag: stage-73463ae7-1733386665 nameOverride: '' fullnameOverride: '' podAnnotations: {} From ff98911bb28b0de6cf0ca606e784af6948732c78 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:19:39 +0300 Subject: [PATCH 03/13] Update KCCA exceedance production image tag to prod-a7e2441e-1733386719 --- k8s/exceedance/values-prod-kcca.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/exceedance/values-prod-kcca.yaml b/k8s/exceedance/values-prod-kcca.yaml index edace9fb16..94f080cd66 100644 --- a/k8s/exceedance/values-prod-kcca.yaml +++ b/k8s/exceedance/values-prod-kcca.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/kcca-exceedance-job - tag: prod-1c7e9a8e-1733346549 + tag: prod-a7e2441e-1733386719 nameOverride: '' fullnameOverride: '' From 934fcd9f34ae3b46cb4c7385dbd4c0644d3e1ebf Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:20:30 +0300 Subject: [PATCH 04/13] Update auth service production image tag to prod-a7e2441e-1733386719 --- k8s/auth-service/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/auth-service/values-prod.yaml b/k8s/auth-service/values-prod.yaml index 7a54b2c9dd..882d47a2a5 100644 --- a/k8s/auth-service/values-prod.yaml +++ b/k8s/auth-service/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-auth-api - tag: prod-1c7e9a8e-1733346549 + tag: prod-a7e2441e-1733386719 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 207a18b10df8f757d7ef64e7b25fd24b742efe5a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:21:14 +0300 Subject: [PATCH 05/13] Update workflows prod image tag to prod-a7e2441e-1733386719 --- k8s/workflows/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/workflows/values-prod.yaml b/k8s/workflows/values-prod.yaml index ae1a77e8a8..b9f22e61ca 100644 --- a/k8s/workflows/values-prod.yaml +++ b/k8s/workflows/values-prod.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-redis containers: eu.gcr.io/airqo-250220/airqo-workflows - tag: prod-1c7e9a8e-1733346549 + tag: prod-a7e2441e-1733386719 nameOverride: '' fullnameOverride: '' podAnnotations: {} From b59a0c19113951beaf87cdd80ac43f46022f97fb Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:22:09 +0300 Subject: [PATCH 06/13] Update predict production image tag to prod-a7e2441e-1733386719 --- k8s/predict/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/predict/values-prod.yaml b/k8s/predict/values-prod.yaml index 958daaea45..a04a12b572 100644 --- a/k8s/predict/values-prod.yaml +++ b/k8s/predict/values-prod.yaml @@ -7,7 +7,7 @@ images: predictJob: eu.gcr.io/airqo-250220/airqo-predict-job trainJob: eu.gcr.io/airqo-250220/airqo-train-job predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality - tag: prod-1c7e9a8e-1733346549 + tag: prod-a7e2441e-1733386719 api: name: airqo-prediction-api label: prediction-api From b49a82b91e4724c49cb95b79976588a7dea99f53 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:27:27 +0300 Subject: [PATCH 07/13] Update spatial production image tag to prod-a7e2441e-1733386719 --- k8s/spatial/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/spatial/values-prod.yaml b/k8s/spatial/values-prod.yaml index cb8457f8b3..7a402d0235 100644 --- a/k8s/spatial/values-prod.yaml +++ b/k8s/spatial/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-spatial-api - tag: prod-1c7e9a8e-1733346549 + tag: prod-a7e2441e-1733386719 nameOverride: '' fullnameOverride: '' podAnnotations: {} From f599d46e17c0924ae96ac03d8b653e11d69005e3 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 5 Dec 2024 14:02:34 +0300 Subject: [PATCH 08/13] Harmonize data resolutions/frequencies --- src/workflows/airqo_etl_utils/config.py | 4 ++++ src/workflows/airqo_etl_utils/data_sources.py | 6 ++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/workflows/airqo_etl_utils/config.py b/src/workflows/airqo_etl_utils/config.py index a343d1bd05..f583a5088b 100644 --- a/src/workflows/airqo_etl_utils/config.py +++ b/src/workflows/airqo_etl_utils/config.py @@ -287,6 +287,10 @@ class Config: "ts": "timestamp", } + DATA_RESOLUTION_MAPPING = { + "iqair": {"hourly": "instant", "raw": "instant", "current": "current"} + } + AIRQO_DATA_COLUMN_NAME_MAPPING = { "pm2_5": "pm2_5", "s1_pm2_5": "pm2_5", diff --git a/src/workflows/airqo_etl_utils/data_sources.py b/src/workflows/airqo_etl_utils/data_sources.py index edce7897a2..54b683ea46 100644 --- a/src/workflows/airqo_etl_utils/data_sources.py +++ b/src/workflows/airqo_etl_utils/data_sources.py @@ -99,7 +99,7 @@ def thingspeak( return data, meta_data, data_available def iqair( - self, device: Dict[str, Any], resolution: str = "current" + self, device: Dict[str, Any], resolution: str = "instant" ) -> Union[List, Dict]: """ Retrieve data from the IQAir API for a specific device and resolution. @@ -123,7 +123,9 @@ def iqair( requests.exceptions.RequestException: For issues with the HTTP request. Exception: For any other unexpected errors. """ - + resolution = configuration.DATA_RESOLUTION_MAPPING.get("iqair").get( + resolution, "instant" + ) valid_resolutions = {"current", "instant", "hourly", "daily", "monthly"} historical_resolutions = {"instant", "hourly", "daily", "monthly"} From f502aaa4343958eaee3273a24a0a3211b6adc30e Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 5 Dec 2024 15:28:28 +0300 Subject: [PATCH 09/13] Clean up low cost sensor cleaning --- src/workflows/airqo_etl_utils/airqo_utils.py | 57 ++++++++++---------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index 44fe7dcca6..4db973424f 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -284,7 +284,7 @@ def flatten_meta_data(meta_data: list) -> list: @staticmethod def extract_mobile_low_cost_sensors_data( - meta_data: list, + meta_data: list, resolution: Frequency ) -> pd.DataFrame: data = pd.DataFrame() @@ -294,6 +294,7 @@ def extract_mobile_low_cost_sensors_data( start_date_time=value.get("start_date_time"), end_date_time=value.get("end_date_time"), device_numbers=[value.get("device_number")], + resolution=resolution, device_category=DeviceCategory.LOW_COST, ) if measurements.empty: @@ -546,6 +547,7 @@ def extract_devices_data( start_date_time: str, end_date_time: str, device_category: DeviceCategory, + resolution: Frequency = Frequency.RAW, device_numbers: list = None, remove_outliers: bool = True, ) -> pd.DataFrame: @@ -636,7 +638,7 @@ def extract_devices_data( mapping = config["mapping"][network] try: data = AirQoDataUtils.map_and_extract_data( - mapping, data_source_api.iqair(device) + mapping, data_source_api.iqair(device, resolution=resolution) ) except Exception as e: logger.exception(f"An error occured: {e} - device {device['name']}") @@ -676,38 +678,35 @@ def extract_devices_data( @staticmethod def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame: """ - Resamples and avergages out the numeric type fields on an hourly basis. + Resamples and averages out the numeric type fields on an hourly basis. Args: - data(pandas.DataFrame): A pandas DataFrame object containing cleaned/converted(numeric) data. + data(pandas.DataFrame): A pandas DataFrame object containing cleaned/converted (numeric) data. Returns: A pandas DataFrame object containing hourly averages of data. """ + data["timestamp"] = pd.to_datetime(data["timestamp"]) - averages_list: List[pd.DataFrame] = [] - for _, device_group in data.groupby("device_number"): - site_id = device_group.iloc[0]["site_id"] - device_id = device_group.iloc[0]["device_id"] - device_number = device_group.iloc[0]["device_number"] - - del device_group["site_id"] - del device_group["device_id"] - del device_group["device_number"] - try: - averages = device_group.resample("1H", on="timestamp").mean() - except ValueError as value_error: - logger.exception(f"Error: {value_error}") - logger.info(device_group) - continue - averages["timestamp"] = averages.index - averages["device_id"] = device_id - averages["site_id"] = site_id - averages["device_number"] = device_number - averages_list.append(averages) - aggregated_data = pd.concat(averages_list, ignore_index=True) - return aggregated_data + group_metadata = ( + data[["device_id", "site_id", "device_number", "network"]] + .drop_duplicates("device_id") + .set_index("device_id") + ) + numeric_columns = data.select_dtypes(include=["number"]).columns + numeric_columns = numeric_columns.difference(["device_number"]) + data_for_aggregation = data[["timestamp", "device_id"] + list(numeric_columns)] + + aggregated = ( + data_for_aggregation.groupby("device_id") + .apply(lambda group: group.resample("1H", on="timestamp").mean()) + .reset_index() + ) + + aggregated = aggregated.merge(group_metadata, on="device_id", how="left") + + return aggregated @staticmethod def clean_bam_data(data: pd.DataFrame) -> pd.DataFrame: @@ -1035,6 +1034,10 @@ def merge_aggregated_weather_data( ) del measurements[f"device_reading_{col}_col"] + numeric_columns = measurements.select_dtypes(include=["number"]).columns + numeric_columns = numeric_columns.difference(["device_number"]) + numeric_counts = measurements[numeric_columns].notna().sum(axis=1) + measurements = measurements[numeric_counts >= 1] return measurements @staticmethod @@ -1156,7 +1159,7 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame: columns={"_id": "site_id"} ) data = pd.merge(data, sites_df, on="site_id", how="left") - data.dropna(subset=["device_number", "timestamp"], inplace=True) + data.dropna(subset=["device_id", "timestamp"], inplace=True) columns_to_fill = [ "s1_pm2_5", From 13fd5fc852b3c8211f18e0b354609a829b30192e Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 5 Dec 2024 15:32:40 +0300 Subject: [PATCH 10/13] Remove old implementation of extracting device data --- src/workflows/airqo_etl_utils/airqo_utils.py | 143 ------------------- 1 file changed, 143 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index 4db973424f..8e24b25074 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -399,149 +399,6 @@ def restructure_airqo_mobile_data_for_bigquery(data: pd.DataFrame) -> pd.DataFra ) return Utils.populate_missing_columns(data=data, columns=cols) - @staticmethod - def extract_devices_data_( - start_date_time: str, - end_date_time: str, - device_category: DeviceCategory, - device_numbers: list = None, - remove_outliers: bool = True, - ) -> pd.DataFrame: - """ - Extracts sensor measurements from AirQo devices recorded between specified date and time ranges. - - Retrieves sensor data from Thingspeak API for devices belonging to the specified device category (BAM or low-cost sensors). - Optionally filters data by specific device numbers and removes outliers if requested. - - Args: - start_date_time (str): Start date and time (ISO 8601 format) for data extraction. - end_date_time (str): End date and time (ISO 8601 format) for data extraction. - device_category (DeviceCategory): Category of devices to extract data from (BAM or low-cost sensors). - device_numbers (list, optional): List of device numbers whose data to extract. Defaults to None (all devices). - remove_outliers (bool, optional): If True, removes outliers from the extracted data. Defaults to True. - """ - - airqo_api = AirQoApi() - data_source_api = DataSourcesApis() - devices = airqo_api.get_devices( - network=str(Tenant.AIRQO), device_category=device_category - ) - other_fields_cols: List[str] = [] - devices = ( - [x for x in devices if x["device_number"] in device_numbers] - if device_numbers - else devices - ) - - if device_category == DeviceCategory.BAM: - field_8_cols = list(configuration.AIRQO_BAM_CONFIG.values()) - mapping = configuration.AIRQO_BAM_CONFIG - elif device_category == DeviceCategory.LOW_COST_GAS: - field_8_cols = list(configuration.AIRQO_LOW_COST_GAS_CONFIG.values()) - mapping = configuration.AIRQO_LOW_COST_GAS_FIELD_MAPPING - other_fields_cols.extend( - [ - "pm2_5", - "tvoc", - "hcho", - "co2", - "intaketemperature", - "intakehumidity", - "battery", - ] - ) - else: - field_8_cols = list( - configuration.AIRQO_LOW_COST_CONFIG.get("field8", {}).values() - ) - other_fields_cols.extend( - ["s1_pm2_5", "s1_pm10", "s2_pm2_5", "s2_pm10", "battery"] - ) - - data_columns = [ - "device_number", - "device_id", - "site_id", - "latitude", - "longitude", - "timestamp", - *field_8_cols, - *other_fields_cols, - ] - data_columns = list(set(data_columns)) - - devices_data = pd.DataFrame() - dates = Utils.query_dates_array( - start_date_time=start_date_time, - end_date_time=end_date_time, - data_source=DataSource.THINGSPEAK, - ) - - devices = pd.DataFrame(devices) - devices.set_index("device_number", inplace=True) - - for device_number, read_key in airqo_api.get_thingspeak_read_keys( - devices[["readKey"]], return_type="yield" - ): - if read_key is None or device_number is None: - logger.exception(f"{device_number} does not have a read key") - continue - - for start, end in dates: - data = data_source_api.thingspeak( - device_number=device_number, - start_date_time=start, - end_date_time=end, - read_key=read_key, - ) - data, meta_data, data_available = data_source_api.thingspeak( - device_number=device_number, - start_date_time=start, - end_date_time=end, - read_key=read_key, - ) - if not data_available: - continue - - if "field8" not in data.columns.to_list(): - data = DataValidationUtils.fill_missing_columns( - data=data, cols=data_columns - ) - else: - data[field_8_cols] = data["field8"].apply( - lambda x: AirQoDataUtils.flatten_field_8( - device_category=device_category, field_8=x - ) - ) - - data["device_number"] = device_number - data["device_id"] = devices.loc[device_number].device_id - data["site_id"] = devices.loc[device_number].site_id - - if device_category in AirQoDataUtils.Device_Field_Mapping: - data["latitude"] = devices.loc[device_number].latitude - data["longitude"] = devices.loc[device_number].longitude - data.rename( - columns=AirQoDataUtils.Device_Field_Mapping[device_category], - inplace=True, - ) - else: - data["latitude"] = meta_data.get("latitude", None) - data["longitude"] = meta_data.get("longitude", None) - - devices_data = pd.concat( - [devices_data, data[data_columns]], ignore_index=True - ) - - if remove_outliers: - if "vapor_pressure" in devices_data.columns.to_list(): - devices_data.loc[:, "vapor_pressure"] = devices_data[ - "vapor_pressure" - ].apply(DataValidationUtils.convert_pressure_values) - devices_data = DataValidationUtils.remove_outliers(devices_data) - - return devices_data - @staticmethod def extract_devices_data( start_date_time: str, From 004372e1f771c20a81f4a8aff22d25c0edaf1d2a Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 5 Dec 2024 16:06:38 +0300 Subject: [PATCH 11/13] Tenant clean up --- src/workflows/airqo_etl_utils/airqo_api.py | 52 ---------------------- 1 file changed, 52 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_api.py b/src/workflows/airqo_etl_utils/airqo_api.py index b0866fc26b..598a026abd 100644 --- a/src/workflows/airqo_etl_utils/airqo_api.py +++ b/src/workflows/airqo_etl_utils/airqo_api.py @@ -798,58 +798,6 @@ def update_sites(self, updated_sites): params = {"tenant": str(Tenant.AIRQO), "id": site.pop("site_id")} self.__request("devices/sites", params, site, "put") - def get_tenants(self, data_source: str) -> List[Dict[str, Any]]: - """ - Retrieve tenants given a data source. - - Args: - data_source: The source of the tenant's data. - - Returns: - List[Dict[str, Any]]: A list of dictionaries with tenant details. - - [ - { - "_id": str, - "net_status": str, - "net_email": str, - "net_phoneNumber": int, - "net_category": str, - "net_name": str, - "net_description": str, - "net_website": str, - "net_acronym": str, - "net_api_key": str, - "net_data_source": str, - "createdAt": str, - "net_users": List[Dict[str,Any]], - "net_permissions": List[Dict[str,Any]], - "net_roles": List[Dict[str,Any]], - "net_groups": List[Dict[str,Any]], - "net_departments": List[Dict[str,Any]], - "network_id": str, - "network": str, - "data_source": str, - "api_key": str" - }, - ] - """ - response = self.__request("users/networks") - - return [ - { - **network, - **{ - "network_id": network.get("_id", None), - "network": network.get("net_name", None), - "data_source": network.get("net_data_source", None), - "api_key": network.get("net_api_key", None), - }, - } - for network in response.get("networks", []) - if network.get("net_data_source") == str(data_source) - ] - def __request(self, endpoint, params=None, body=None, method="get", base_url=None): """ Executes API request and returns the response. From 27f259d86c6bd0323b85e19e860d1af1d73d11cb Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 5 Dec 2024 16:57:33 +0300 Subject: [PATCH 12/13] Clean up --- src/workflows/airqo_etl_utils/airnow_utils.py | 6 ++- src/workflows/airqo_etl_utils/constants.py | 37 ++++++++++++++----- .../airqo_etl_utils/data_warehouse_utils.py | 2 +- src/workflows/dags/airqo_bam_measurements.py | 5 +++ src/workflows/dags/airqo_measurements.py | 3 ++ .../dags/airqo_mobile_measurements.py | 5 ++- 6 files changed, 44 insertions(+), 14 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airnow_utils.py b/src/workflows/airqo_etl_utils/airnow_utils.py index 8ee23b340d..d1db3b0f11 100644 --- a/src/workflows/airqo_etl_utils/airnow_utils.py +++ b/src/workflows/airqo_etl_utils/airnow_utils.py @@ -52,7 +52,8 @@ def query_bam_data( @staticmethod def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame: - tenants = AirQoApi().get_tenants(DataSource.AIRNOW) + # TODO Update if being used. + tenants = AirQoApi().get_networks(DataSource.AIRNOW) bam_data = pd.DataFrame() dates = Utils.query_dates_array( start_date_time=start_date_time, @@ -65,11 +66,12 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame: network_data = pd.DataFrame() for start, end in dates: + continue query_data = AirnowDataUtils.query_bam_data( api_key=network_api_key, start_date_time=start, end_date_time=end ) network_data = pd.concat([network_data, query_data], ignore_index=True) - + continue network_data["tenant"] = tenant["network"] bam_data = pd.concat([bam_data, network_data], ignore_index=True) diff --git a/src/workflows/airqo_etl_utils/constants.py b/src/workflows/airqo_etl_utils/constants.py index 79ca4497c0..90b45245ab 100644 --- a/src/workflows/airqo_etl_utils/constants.py +++ b/src/workflows/airqo_etl_utils/constants.py @@ -34,24 +34,41 @@ def from_str(string: str): class Frequency(Enum): """ - LOW_COST -> Raw data returned from the devices + RAW -> Raw current data returned from devices + RAW-LOW-COST -> Raw data returned from the low-cost devices HOURLY -> Aggregated hourly data DAILY -> Aggregated daily data + WEEKLY -> Aggregated weekly data + MONTHLY -> Aggregated monthly data + YEARLY -> Aggregated yearly data + HISTORICAL -> Raw data returned from the devices """ - RAW = 1 + RAW = 0 + RAW_LOW_COST = 1 HOURLY = 2 DAILY = 3 + WEEKLY = 4 + MONTHLY = 5 + YEARLY = 6 + HISTORICAL = 7 def __str__(self) -> str: - if self == self.RAW: - return "raw" - elif self == self.HOURLY: - return "hourly" - elif self == self.DAILY: - return "daily" - else: - return "" + match self: + case self.RAW | self.RAW_LOW_COST: + return "raw" + case self.HOURLY: + return "hourly" + case self.DAILY: + return "daily" + case self.WEEKLY: + return "weekly" + case self.MONTHLY: + return "monthly" + case self.YEARLY: + return "yearly" + case _: + return "historical" class Attachments(Enum): diff --git a/src/workflows/airqo_etl_utils/data_warehouse_utils.py b/src/workflows/airqo_etl_utils/data_warehouse_utils.py index 2d3a16c28c..22e6a6ff06 100644 --- a/src/workflows/airqo_etl_utils/data_warehouse_utils.py +++ b/src/workflows/airqo_etl_utils/data_warehouse_utils.py @@ -94,7 +94,7 @@ def extract_hourly_low_cost_data( }, inplace=True, ) - data.loc[:, "device_category"] = str(DeviceCategory.LOW_COST) + data["device_category"] = str(DeviceCategory.LOW_COST) return DataWarehouseUtils.filter_valid_columns(data) @staticmethod diff --git a/src/workflows/dags/airqo_bam_measurements.py b/src/workflows/dags/airqo_bam_measurements.py index a13372723d..9cd0ca5636 100644 --- a/src/workflows/dags/airqo_bam_measurements.py +++ b/src/workflows/dags/airqo_bam_measurements.py @@ -21,6 +21,8 @@ start_date=days_ago(1), ) def airqo_bam_historical_measurements(): + from airqo_etl_utils.constants import Frequency + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract_bam_data(**kwargs) -> pd.DataFrame: start_date_time, end_date_time = DateUtils.get_dag_date_time_values( @@ -30,6 +32,7 @@ def extract_bam_data(**kwargs) -> pd.DataFrame: start_date_time=start_date_time, end_date_time=end_date_time, device_category=DeviceCategory.BAM, + resolution=Frequency.HISTORICAL, ) @task(retries=3, retry_delay=timedelta(minutes=5)) @@ -76,6 +79,7 @@ def save_clean_bam_data(data: pd.DataFrame): ) def airqo_bam_realtime_measurements(): import pandas as pd + from airqo_etl_utils.constants import Frequency @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract_bam_data(**kwargs): @@ -89,6 +93,7 @@ def extract_bam_data(**kwargs): start_date_time=start_date_time, end_date_time=end_date_time, device_category=DeviceCategory.BAM, + resolution=Frequency.RAW, ) @task(retries=3, retry_delay=timedelta(minutes=5)) diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index 4804193fea..058222d21e 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -153,6 +153,7 @@ def extract_raw_data(**kwargs): start_date_time=start_date_time, end_date_time=end_date_time, device_category=DeviceCategory.LOW_COST, + resolution=Frequency.DAILY, ) @task() @@ -311,6 +312,7 @@ def extract_raw_data(**kwargs) -> pd.DataFrame: start_date_time=start_date_time, end_date_time=end_date_time, device_category=DeviceCategory.LOW_COST, + resolution=Frequency.RAW_LOW_COST, ) @task() @@ -456,6 +458,7 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs): merged_data = merge_data( averaged_hourly_data=averaged_airqo_data, weather_data=extracted_weather_data ) + # TODO Should calibrated data be merge with non airqo data after calibration calibrated_data = calibrate(merged_data) send_hourly_measurements_to_api(calibrated_data) send_hourly_measurements_to_bigquery(calibrated_data) diff --git a/src/workflows/dags/airqo_mobile_measurements.py b/src/workflows/dags/airqo_mobile_measurements.py index 09d202b066..5c599bab73 100644 --- a/src/workflows/dags/airqo_mobile_measurements.py +++ b/src/workflows/dags/airqo_mobile_measurements.py @@ -13,6 +13,7 @@ ) def airqo_mobile_devices_measurements(): import pandas as pd + from airqo_etl_utils.constants import Frequency @task() def extract_raw_data(**kwargs): @@ -21,7 +22,9 @@ def extract_raw_data(**kwargs): dag_run = kwargs.get("dag_run") meta_data = AirQoDataUtils.flatten_meta_data(dag_run.conf["meta_data"]) - return AirQoDataUtils.extract_mobile_low_cost_sensors_data(meta_data=meta_data) + return AirQoDataUtils.extract_mobile_low_cost_sensors_data( + meta_data, Frequency.HOURLY + ) @task() def aggregate_raw_data(raw_data: pd.DataFrame): From d1496e58cd1b86cdf96512494ef3199567aac7d3 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 5 Dec 2024 17:15:09 +0300 Subject: [PATCH 13/13] Update workflows staging image tag to stage-635379ef-1733407940 --- k8s/workflows/values-stage.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/workflows/values-stage.yaml b/k8s/workflows/values-stage.yaml index 9cfc802532..f291feb84f 100644 --- a/k8s/workflows/values-stage.yaml +++ b/k8s/workflows/values-stage.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis containers: eu.gcr.io/airqo-250220/airqo-stage-workflows - tag: stage-76ff52fe-1733336066 + tag: stage-635379ef-1733407940 nameOverride: '' fullnameOverride: '' podAnnotations: {}