diff --git a/k8s/auth-service/values-prod.yaml b/k8s/auth-service/values-prod.yaml index 6cbf4898a0..60cc544fea 100644 --- a/k8s/auth-service/values-prod.yaml +++ b/k8s/auth-service/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-auth-api - tag: prod-00731eb0-1733176039 + tag: prod-4bb2ac23-1733220303 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/auth-service/values-stage.yaml b/k8s/auth-service/values-stage.yaml index 053c45ebf8..196d2991bf 100644 --- a/k8s/auth-service/values-stage.yaml +++ b/k8s/auth-service/values-stage.yaml @@ -6,7 +6,7 @@ app: replicaCount: 2 image: repository: eu.gcr.io/airqo-250220/airqo-stage-auth-api - tag: stage-d2f7c6ef-1733175996 + tag: stage-2d2da4cd-1733220258 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml index 92b3dcc95d..40ffc485ac 100644 --- a/k8s/device-registry/values-prod.yaml +++ b/k8s/device-registry/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-device-registry-api - tag: prod-00731eb0-1733176039 + tag: prod-4bb2ac23-1733220303 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/device-registry/values-stage.yaml b/k8s/device-registry/values-stage.yaml index be119c8d77..29d1fd9ce9 100644 --- a/k8s/device-registry/values-stage.yaml +++ b/k8s/device-registry/values-stage.yaml @@ -6,7 +6,7 @@ app: replicaCount: 2 image: repository: eu.gcr.io/airqo-250220/airqo-stage-device-registry-api - tag: stage-49e2a1f0-1732801503 + tag: stage-2d2da4cd-1733220258 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/exceedance/values-prod-airqo.yaml b/k8s/exceedance/values-prod-airqo.yaml index 7c9b78b612..242d0e3a52 100644 --- a/k8s/exceedance/values-prod-airqo.yaml +++ b/k8s/exceedance/values-prod-airqo.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/airqo-exceedance-job - tag: prod-00731eb0-1733176039 + tag: prod-4bb2ac23-1733220303 nameOverride: '' fullnameOverride: '' diff --git a/k8s/exceedance/values-prod-kcca.yaml b/k8s/exceedance/values-prod-kcca.yaml index e4c48f7b3a..c4ca1f0aa3 100644 --- a/k8s/exceedance/values-prod-kcca.yaml +++ b/k8s/exceedance/values-prod-kcca.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/kcca-exceedance-job - tag: prod-00731eb0-1733176039 + tag: prod-4bb2ac23-1733220303 nameOverride: '' fullnameOverride: '' diff --git a/k8s/predict/values-prod.yaml b/k8s/predict/values-prod.yaml index 085d151b55..15bd6dc49b 100644 --- a/k8s/predict/values-prod.yaml +++ b/k8s/predict/values-prod.yaml @@ -7,7 +7,7 @@ images: predictJob: eu.gcr.io/airqo-250220/airqo-predict-job trainJob: eu.gcr.io/airqo-250220/airqo-train-job predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality - tag: prod-00731eb0-1733176039 + tag: prod-4bb2ac23-1733220303 api: name: airqo-prediction-api label: prediction-api diff --git a/k8s/spatial/values-prod.yaml b/k8s/spatial/values-prod.yaml index f87948b893..88255d30da 100644 --- a/k8s/spatial/values-prod.yaml +++ b/k8s/spatial/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-spatial-api - tag: prod-00731eb0-1733176039 + tag: prod-4bb2ac23-1733220303 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/workflows/values-prod.yaml b/k8s/workflows/values-prod.yaml index b843d539e1..e28b0776db 100644 --- a/k8s/workflows/values-prod.yaml +++ b/k8s/workflows/values-prod.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-redis containers: eu.gcr.io/airqo-250220/airqo-workflows - tag: prod-00731eb0-1733176039 + tag: prod-4bb2ac23-1733220303 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index 1524fa5af9..864df891fa 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -225,21 +225,22 @@ def process_single_entry(entry: Dict[str, Any]) -> Dict[str, Any]: logger.warning(f"Error processing field8: {e}") # Process the remaining fields - for key, value_data in entry.items(): - target_key = data_mapping.get(key) - target_value = None - if isinstance(target_key, dict): - target_value = target_key.get("value") - target_key = target_key.get("key") - - if target_key and target_key not in row_data: - if isinstance(value_data, dict): - extracted_value = AirQoDataUtils._extract_nested_value( - value_data, target_value - ) - else: - extracted_value = value_data - row_data[target_key] = extracted_value + if isinstance(entry, dict): + for key, value_data in entry.items(): + target_key = data_mapping.get(key, None) + target_value = None + if isinstance(target_key, dict): + target_value = target_key.get("value") + target_key = target_key.get("key") + + if target_key and target_key not in row_data: + if isinstance(value_data, dict): + extracted_value = AirQoDataUtils._extract_nested_value( + value_data, target_value + ) + else: + extracted_value = value_data + row_data[target_key] = extracted_value return row_data if isinstance(data, dict): @@ -433,7 +434,7 @@ def extract_devices_data_( mapping = configuration.AIRQO_BAM_CONFIG elif device_category == DeviceCategory.LOW_COST_GAS: field_8_cols = list(configuration.AIRQO_LOW_COST_GAS_CONFIG.values()) - mapping = configuration.AIRQO_LOW_COST_GAS_CONFIG + mapping = configuration.AIRQO_LOW_COST_GAS_FIELD_MAPPING other_fields_cols.extend( [ "pm2_5", @@ -446,7 +447,9 @@ def extract_devices_data_( ] ) else: - field_8_cols = list(configuration.AIRQO_LOW_COST_CONFIG.values()) + field_8_cols = list( + configuration.AIRQO_LOW_COST_CONFIG.get("field8", {}).values() + ) other_fields_cols.extend( ["s1_pm2_5", "s1_pm10", "s2_pm2_5", "s2_pm10", "battery"] ) @@ -562,7 +565,9 @@ def extract_devices_data( devices = airqo_api.get_devices_by_network(device_category=device_category) if not devices: - logger.exception("Failed to fetch devices.") + logger.exception( + "Failed to fetch devices. Please check if devices are deployed" + ) return devices_data other_fields_cols: List[str] = [] @@ -602,6 +607,7 @@ def extract_devices_data( ) for device in devices: + data = [] device_number = device.get("device_number", None) read_key = device.get("readKey", None) network = device.get("network", None) @@ -652,7 +658,6 @@ def extract_devices_data( else: data["latitude"] = meta_data.get("latitude", None) data["longitude"] = meta_data.get("longitude", None) - devices_data = pd.concat([devices_data, data], ignore_index=True) if remove_outliers: @@ -771,7 +776,7 @@ def format_data_for_bigquery( ) -> pd.DataFrame: # Currently only used for BAM device measurements data.loc[:, "timestamp"] = pd.to_datetime(data["timestamp"]) - data.loc[:, "tenant"] = str(Tenant.AIRQO) + big_query_api = BigQueryApi() if data_type == DataType.UNCLEAN_BAM_DATA: cols = big_query_api.get_columns( @@ -797,7 +802,6 @@ def process_raw_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame: Makes neccessary conversions, adds missing columns and sets them to `None` """ data["timestamp"] = pd.to_datetime(data["timestamp"]) - data["tenant"] = str(Tenant.AIRQO) big_query_api = BigQueryApi() cols = big_query_api.get_columns(table=big_query_api.raw_measurements_table) return Utils.populate_missing_columns(data=data, columns=cols) @@ -808,7 +812,6 @@ def process_aggregated_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame: Makes neccessary conversions, adds missing columns and sets them to `None` """ data["timestamp"] = pd.to_datetime(data["timestamp"]) - data["tenant"] = str(Tenant.AIRQO) big_query_api = BigQueryApi() cols = big_query_api.get_columns(table=big_query_api.hourly_measurements_table) return Utils.populate_missing_columns(data=data, columns=cols) @@ -849,7 +852,6 @@ def process_latest_data( data["pm2_5"] = data["pm2_5"].fillna(data["pm2_5_raw_value"]) data["pm10"] = data["pm10"].fillna(data["pm10_raw_value"]) - data.loc[:, "tenant"] = str(Tenant.AIRQO) data.loc[:, "device_category"] = str(device_category) return data diff --git a/src/workflows/airqo_etl_utils/config.py b/src/workflows/airqo_etl_utils/config.py index ca8540c8b9..a343d1bd05 100644 --- a/src/workflows/airqo_etl_utils/config.py +++ b/src/workflows/airqo_etl_utils/config.py @@ -168,9 +168,28 @@ class Config: 12: "status", } + AIRQO_BAM_MAPPING_NEW = { + "field8": { + 0: "timestamp", + 1: "realtime_conc", + 2: "hourly_conc", + 3: "short_time_conc", + 4: "air_flow", + 5: "wind_speed", + 6: "wind_direction", + 7: "temperature", + 8: "humidity", + 9: "barometric_pressure", + 10: "filter_temperature", + 11: "filter_humidity", + 12: "status", + }, + } + AIRQO_BAM_MAPPING = { "hourly_conc": "pm2_5", } + # 1st 6 values are from the gps AIRQO_LOW_COST_CONFIG = { 0: "latitude", @@ -199,6 +218,30 @@ class Config: 9: "humidity", 10: "vapor_pressure", } + AIRQO_LOW_COST_GAS_FIELD_MAPPING = { + "field1": "pm2_5", + "field2": "tvoc", + "field3": "hcho", + "field4": "co2", + "field5": "intaketemperature", + "field6": "intakehumidity", + "field7": "battery", + "created_at": "timestamp", + "field8": { + 0: "latitude", + 1: "longitude", + 2: "altitude", + 3: "wind_speed", # For mobile devices (Velocity) + 4: "satellites", # Number of satelites tracked + 5: "hdop", # For mobile devices + 6: "device_temperature", # Internal + 7: "device_humidity", # Internal + 8: "temperature", # Internal + 9: "humidity", + 10: "vapor_pressure", + }, + } + AIRQO_LOW_COST_FIELD_MAPPING = { "field1": "s1_pm2_5", "field2": "s1_pm10", @@ -271,13 +314,15 @@ class Config: device_config_mapping = { "bam": { - "field_8_cols": list(AIRQO_BAM_CONFIG.values()), - "mapping": {"airqo": AIRQO_BAM_CONFIG}, + "field_8_cols": list(AIRQO_BAM_MAPPING_NEW.get("field8", {}).values()), + "mapping": {"airqo": AIRQO_BAM_MAPPING_NEW}, "other_fields_cols": [], }, "gas": { - "field_8_cols": list(AIRQO_LOW_COST_GAS_CONFIG.values()), - "mapping": {"airqo": AIRQO_LOW_COST_GAS_CONFIG}, + "field_8_cols": list( + AIRQO_LOW_COST_GAS_FIELD_MAPPING.get("field8", {}).values() + ), + "mapping": {"airqo": AIRQO_LOW_COST_GAS_FIELD_MAPPING}, "other_fields_cols": [ "pm2_5", "tvoc", @@ -289,7 +334,9 @@ class Config: ], }, "lowcost": { - "field_8_cols": list(AIRQO_LOW_COST_CONFIG.values()), + "field_8_cols": list( + AIRQO_LOW_COST_FIELD_MAPPING.get("field8", {}).values() + ), "mapping": { "airqo": AIRQO_LOW_COST_FIELD_MAPPING, "iqair": IQAIR_LOW_COST_FIELD_MAPPING, diff --git a/src/workflows/airqo_etl_utils/data_validator.py b/src/workflows/airqo_etl_utils/data_validator.py index 10dfd64344..623cb6f703 100644 --- a/src/workflows/airqo_etl_utils/data_validator.py +++ b/src/workflows/airqo_etl_utils/data_validator.py @@ -72,6 +72,7 @@ def format_data_types( data[col] = ( data[col] .astype(str) + .str.replace(r"[^\w\s\.\-:]", "", regex=True) .str.replace(r"(?