Skip to content

Commit

Permalink
Merge pull request #3982 from airqo-platform/staging
Browse files Browse the repository at this point in the history
move to production
  • Loading branch information
Baalmart authored Dec 3, 2024
2 parents 4bb2ac2 + 536a74c commit bf083c5
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 49 deletions.
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-00731eb0-1733176039
tag: prod-4bb2ac23-1733220303
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/auth-service/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 2
image:
repository: eu.gcr.io/airqo-250220/airqo-stage-auth-api
tag: stage-d2f7c6ef-1733175996
tag: stage-2d2da4cd-1733220258
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-00731eb0-1733176039
tag: prod-4bb2ac23-1733220303
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 2
image:
repository: eu.gcr.io/airqo-250220/airqo-stage-device-registry-api
tag: stage-49e2a1f0-1732801503
tag: stage-2d2da4cd-1733220258
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-airqo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
tag: prod-00731eb0-1733176039
tag: prod-4bb2ac23-1733220303
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-kcca.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
tag: prod-00731eb0-1733176039
tag: prod-4bb2ac23-1733220303
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-00731eb0-1733176039
tag: prod-4bb2ac23-1733220303
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/spatial/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-spatial-api
tag: prod-00731eb0-1733176039
tag: prod-4bb2ac23-1733220303
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-00731eb0-1733176039
tag: prod-4bb2ac23-1733220303
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
48 changes: 25 additions & 23 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,21 +225,22 @@ def process_single_entry(entry: Dict[str, Any]) -> Dict[str, Any]:
logger.warning(f"Error processing field8: {e}")

# Process the remaining fields
for key, value_data in entry.items():
target_key = data_mapping.get(key)
target_value = None
if isinstance(target_key, dict):
target_value = target_key.get("value")
target_key = target_key.get("key")

if target_key and target_key not in row_data:
if isinstance(value_data, dict):
extracted_value = AirQoDataUtils._extract_nested_value(
value_data, target_value
)
else:
extracted_value = value_data
row_data[target_key] = extracted_value
if isinstance(entry, dict):
for key, value_data in entry.items():
target_key = data_mapping.get(key, None)
target_value = None
if isinstance(target_key, dict):
target_value = target_key.get("value")
target_key = target_key.get("key")

if target_key and target_key not in row_data:
if isinstance(value_data, dict):
extracted_value = AirQoDataUtils._extract_nested_value(
value_data, target_value
)
else:
extracted_value = value_data
row_data[target_key] = extracted_value
return row_data

if isinstance(data, dict):
Expand Down Expand Up @@ -433,7 +434,7 @@ def extract_devices_data_(
mapping = configuration.AIRQO_BAM_CONFIG
elif device_category == DeviceCategory.LOW_COST_GAS:
field_8_cols = list(configuration.AIRQO_LOW_COST_GAS_CONFIG.values())
mapping = configuration.AIRQO_LOW_COST_GAS_CONFIG
mapping = configuration.AIRQO_LOW_COST_GAS_FIELD_MAPPING
other_fields_cols.extend(
[
"pm2_5",
Expand All @@ -446,7 +447,9 @@ def extract_devices_data_(
]
)
else:
field_8_cols = list(configuration.AIRQO_LOW_COST_CONFIG.values())
field_8_cols = list(
configuration.AIRQO_LOW_COST_CONFIG.get("field8", {}).values()
)
other_fields_cols.extend(
["s1_pm2_5", "s1_pm10", "s2_pm2_5", "s2_pm10", "battery"]
)
Expand Down Expand Up @@ -562,7 +565,9 @@ def extract_devices_data(

devices = airqo_api.get_devices_by_network(device_category=device_category)
if not devices:
logger.exception("Failed to fetch devices.")
logger.exception(
"Failed to fetch devices. Please check if devices are deployed"
)
return devices_data

other_fields_cols: List[str] = []
Expand Down Expand Up @@ -602,6 +607,7 @@ def extract_devices_data(
)

for device in devices:
data = []
device_number = device.get("device_number", None)
read_key = device.get("readKey", None)
network = device.get("network", None)
Expand Down Expand Up @@ -652,7 +658,6 @@ def extract_devices_data(
else:
data["latitude"] = meta_data.get("latitude", None)
data["longitude"] = meta_data.get("longitude", None)

devices_data = pd.concat([devices_data, data], ignore_index=True)

if remove_outliers:
Expand Down Expand Up @@ -771,7 +776,7 @@ def format_data_for_bigquery(
) -> pd.DataFrame:
# Currently only used for BAM device measurements
data.loc[:, "timestamp"] = pd.to_datetime(data["timestamp"])
data.loc[:, "tenant"] = str(Tenant.AIRQO)

big_query_api = BigQueryApi()
if data_type == DataType.UNCLEAN_BAM_DATA:
cols = big_query_api.get_columns(
Expand All @@ -797,7 +802,6 @@ def process_raw_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame:
Makes neccessary conversions, adds missing columns and sets them to `None`
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["tenant"] = str(Tenant.AIRQO)
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(table=big_query_api.raw_measurements_table)
return Utils.populate_missing_columns(data=data, columns=cols)
Expand All @@ -808,7 +812,6 @@ def process_aggregated_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame:
Makes neccessary conversions, adds missing columns and sets them to `None`
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["tenant"] = str(Tenant.AIRQO)
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(table=big_query_api.hourly_measurements_table)
return Utils.populate_missing_columns(data=data, columns=cols)
Expand Down Expand Up @@ -849,7 +852,6 @@ def process_latest_data(
data["pm2_5"] = data["pm2_5"].fillna(data["pm2_5_raw_value"])
data["pm10"] = data["pm10"].fillna(data["pm10_raw_value"])

data.loc[:, "tenant"] = str(Tenant.AIRQO)
data.loc[:, "device_category"] = str(device_category)

return data
Expand Down
57 changes: 52 additions & 5 deletions src/workflows/airqo_etl_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,28 @@ class Config:
12: "status",
}

AIRQO_BAM_MAPPING_NEW = {
"field8": {
0: "timestamp",
1: "realtime_conc",
2: "hourly_conc",
3: "short_time_conc",
4: "air_flow",
5: "wind_speed",
6: "wind_direction",
7: "temperature",
8: "humidity",
9: "barometric_pressure",
10: "filter_temperature",
11: "filter_humidity",
12: "status",
},
}

AIRQO_BAM_MAPPING = {
"hourly_conc": "pm2_5",
}

# 1st 6 values are from the gps
AIRQO_LOW_COST_CONFIG = {
0: "latitude",
Expand Down Expand Up @@ -199,6 +218,30 @@ class Config:
9: "humidity",
10: "vapor_pressure",
}
AIRQO_LOW_COST_GAS_FIELD_MAPPING = {
"field1": "pm2_5",
"field2": "tvoc",
"field3": "hcho",
"field4": "co2",
"field5": "intaketemperature",
"field6": "intakehumidity",
"field7": "battery",
"created_at": "timestamp",
"field8": {
0: "latitude",
1: "longitude",
2: "altitude",
3: "wind_speed", # For mobile devices (Velocity)
4: "satellites", # Number of satelites tracked
5: "hdop", # For mobile devices
6: "device_temperature", # Internal
7: "device_humidity", # Internal
8: "temperature", # Internal
9: "humidity",
10: "vapor_pressure",
},
}

AIRQO_LOW_COST_FIELD_MAPPING = {
"field1": "s1_pm2_5",
"field2": "s1_pm10",
Expand Down Expand Up @@ -271,13 +314,15 @@ class Config:

device_config_mapping = {
"bam": {
"field_8_cols": list(AIRQO_BAM_CONFIG.values()),
"mapping": {"airqo": AIRQO_BAM_CONFIG},
"field_8_cols": list(AIRQO_BAM_MAPPING_NEW.get("field8", {}).values()),
"mapping": {"airqo": AIRQO_BAM_MAPPING_NEW},
"other_fields_cols": [],
},
"gas": {
"field_8_cols": list(AIRQO_LOW_COST_GAS_CONFIG.values()),
"mapping": {"airqo": AIRQO_LOW_COST_GAS_CONFIG},
"field_8_cols": list(
AIRQO_LOW_COST_GAS_FIELD_MAPPING.get("field8", {}).values()
),
"mapping": {"airqo": AIRQO_LOW_COST_GAS_FIELD_MAPPING},
"other_fields_cols": [
"pm2_5",
"tvoc",
Expand All @@ -289,7 +334,9 @@ class Config:
],
},
"lowcost": {
"field_8_cols": list(AIRQO_LOW_COST_CONFIG.values()),
"field_8_cols": list(
AIRQO_LOW_COST_FIELD_MAPPING.get("field8", {}).values()
),
"mapping": {
"airqo": AIRQO_LOW_COST_FIELD_MAPPING,
"iqair": IQAIR_LOW_COST_FIELD_MAPPING,
Expand Down
6 changes: 2 additions & 4 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def format_data_types(
data[col] = (
data[col]
.astype(str)
.str.replace(r"[^\w\s\.\-:]", "", regex=True)
.str.replace(r"(?<!\.\d{3})Z$", ".000Z", regex=True)
) # Negative lookbehind to add missing milliseconds if needed
data[col] = pd.to_datetime(data[col], errors="coerce")
Expand Down Expand Up @@ -209,7 +210,7 @@ def process_data_for_message_broker(

devices = AirQoDataUtils.get_devices(group_id=caller)
devices = devices[
["tenant", "device_name", "site_id", "device_latitude", "device_longitude"]
["device_name", "site_id", "device_latitude", "device_longitude"]
]

data = pd.merge(
Expand All @@ -218,9 +219,6 @@ def process_data_for_message_broker(
on=["device_name", "site_id", "tenant"],
how="left",
)

data.rename(columns={"tenant": "network"}, inplace=True)

return data

@staticmethod
Expand Down
19 changes: 12 additions & 7 deletions src/workflows/airqo_etl_utils/schema/bam_raw_measurements.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "network",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "site_id",
"type": "STRING",
Expand Down Expand Up @@ -52,7 +57,7 @@
"mode": "NULLABLE",
"description": "μg/m3."
},
{
{
"name": "air_flow",
"type": "FLOAT",
"mode": "NULLABLE",
Expand All @@ -64,12 +69,12 @@
"mode": "NULLABLE",
"description": "m/s"
},
{
{
"name": "wind_direction",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
{
"name": "temperature",
"type": "FLOAT",
"mode": "NULLABLE",
Expand All @@ -81,13 +86,13 @@
"mode": "NULLABLE",
"description": "%. External Relative Humidity"
},
{
{
"name": "barometric_pressure",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "mmHg"
},
{
{
"name": "filter_temperature",
"type": "FLOAT",
"mode": "NULLABLE",
Expand All @@ -99,9 +104,9 @@
"mode": "NULLABLE",
"description": "%. Filter Relative Humidity"
},
{
{
"name": "status",
"type": "INTEGER",
"mode": "NULLABLE"
}
]
]
1 change: 0 additions & 1 deletion src/workflows/dags/airqo_bam_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs):
)
data = DataValidationUtils.process_data_for_message_broker(
data=data,
tenant=Tenant.AIRQO,
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
)
Expand Down

0 comments on commit bf083c5

Please sign in to comment.