Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/integration iqair devices #3981

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,21 +225,22 @@ def process_single_entry(entry: Dict[str, Any]) -> Dict[str, Any]:
logger.warning(f"Error processing field8: {e}")

# Process the remaining fields
for key, value_data in entry.items():
target_key = data_mapping.get(key)
target_value = None
if isinstance(target_key, dict):
target_value = target_key.get("value")
target_key = target_key.get("key")

if target_key and target_key not in row_data:
if isinstance(value_data, dict):
extracted_value = AirQoDataUtils._extract_nested_value(
value_data, target_value
)
else:
extracted_value = value_data
row_data[target_key] = extracted_value
if isinstance(entry, dict):
for key, value_data in entry.items():
target_key = data_mapping.get(key, None)
target_value = None
if isinstance(target_key, dict):
target_value = target_key.get("value")
target_key = target_key.get("key")

if target_key and target_key not in row_data:
if isinstance(value_data, dict):
extracted_value = AirQoDataUtils._extract_nested_value(
value_data, target_value
)
else:
extracted_value = value_data
row_data[target_key] = extracted_value
return row_data

if isinstance(data, dict):
Expand Down Expand Up @@ -433,7 +434,7 @@ def extract_devices_data_(
mapping = configuration.AIRQO_BAM_CONFIG
elif device_category == DeviceCategory.LOW_COST_GAS:
field_8_cols = list(configuration.AIRQO_LOW_COST_GAS_CONFIG.values())
mapping = configuration.AIRQO_LOW_COST_GAS_CONFIG
mapping = configuration.AIRQO_LOW_COST_GAS_FIELD_MAPPING
other_fields_cols.extend(
[
"pm2_5",
Expand All @@ -446,7 +447,9 @@ def extract_devices_data_(
]
)
else:
field_8_cols = list(configuration.AIRQO_LOW_COST_CONFIG.values())
field_8_cols = list(
configuration.AIRQO_LOW_COST_CONFIG.get("field8", {}).values()
)
other_fields_cols.extend(
["s1_pm2_5", "s1_pm10", "s2_pm2_5", "s2_pm10", "battery"]
)
Expand Down Expand Up @@ -562,7 +565,9 @@ def extract_devices_data(

devices = airqo_api.get_devices_by_network(device_category=device_category)
if not devices:
logger.exception("Failed to fetch devices.")
logger.exception(
"Failed to fetch devices. Please check if devices are deployed"
)
return devices_data

other_fields_cols: List[str] = []
Expand Down Expand Up @@ -602,6 +607,7 @@ def extract_devices_data(
)

for device in devices:
data = []
device_number = device.get("device_number", None)
read_key = device.get("readKey", None)
network = device.get("network", None)
Expand Down Expand Up @@ -652,7 +658,6 @@ def extract_devices_data(
else:
data["latitude"] = meta_data.get("latitude", None)
data["longitude"] = meta_data.get("longitude", None)

devices_data = pd.concat([devices_data, data], ignore_index=True)

if remove_outliers:
Expand Down Expand Up @@ -771,7 +776,7 @@ def format_data_for_bigquery(
) -> pd.DataFrame:
# Currently only used for BAM device measurements
data.loc[:, "timestamp"] = pd.to_datetime(data["timestamp"])
data.loc[:, "tenant"] = str(Tenant.AIRQO)

big_query_api = BigQueryApi()
if data_type == DataType.UNCLEAN_BAM_DATA:
cols = big_query_api.get_columns(
Expand All @@ -797,7 +802,6 @@ def process_raw_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame:
Makes neccessary conversions, adds missing columns and sets them to `None`
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["tenant"] = str(Tenant.AIRQO)
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(table=big_query_api.raw_measurements_table)
return Utils.populate_missing_columns(data=data, columns=cols)
Expand All @@ -808,7 +812,6 @@ def process_aggregated_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame:
Makes neccessary conversions, adds missing columns and sets them to `None`
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["tenant"] = str(Tenant.AIRQO)
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(table=big_query_api.hourly_measurements_table)
return Utils.populate_missing_columns(data=data, columns=cols)
Expand Down Expand Up @@ -849,7 +852,6 @@ def process_latest_data(
data["pm2_5"] = data["pm2_5"].fillna(data["pm2_5_raw_value"])
data["pm10"] = data["pm10"].fillna(data["pm10_raw_value"])

data.loc[:, "tenant"] = str(Tenant.AIRQO)
data.loc[:, "device_category"] = str(device_category)

return data
Expand Down
57 changes: 52 additions & 5 deletions src/workflows/airqo_etl_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,28 @@ class Config:
12: "status",
}

AIRQO_BAM_MAPPING_NEW = {
"field8": {
0: "timestamp",
1: "realtime_conc",
2: "hourly_conc",
3: "short_time_conc",
4: "air_flow",
5: "wind_speed",
6: "wind_direction",
7: "temperature",
8: "humidity",
9: "barometric_pressure",
10: "filter_temperature",
11: "filter_humidity",
12: "status",
},
}

AIRQO_BAM_MAPPING = {
"hourly_conc": "pm2_5",
}

# 1st 6 values are from the gps
AIRQO_LOW_COST_CONFIG = {
0: "latitude",
Expand Down Expand Up @@ -199,6 +218,30 @@ class Config:
9: "humidity",
10: "vapor_pressure",
}
AIRQO_LOW_COST_GAS_FIELD_MAPPING = {
"field1": "pm2_5",
"field2": "tvoc",
"field3": "hcho",
"field4": "co2",
"field5": "intaketemperature",
"field6": "intakehumidity",
"field7": "battery",
"created_at": "timestamp",
"field8": {
0: "latitude",
1: "longitude",
2: "altitude",
3: "wind_speed", # For mobile devices (Velocity)
4: "satellites", # Number of satelites tracked
5: "hdop", # For mobile devices
6: "device_temperature", # Internal
7: "device_humidity", # Internal
8: "temperature", # Internal
9: "humidity",
10: "vapor_pressure",
},
}

AIRQO_LOW_COST_FIELD_MAPPING = {
"field1": "s1_pm2_5",
"field2": "s1_pm10",
Expand Down Expand Up @@ -271,13 +314,15 @@ class Config:

device_config_mapping = {
"bam": {
"field_8_cols": list(AIRQO_BAM_CONFIG.values()),
"mapping": {"airqo": AIRQO_BAM_CONFIG},
"field_8_cols": list(AIRQO_BAM_MAPPING_NEW.get("field8", {}).values()),
"mapping": {"airqo": AIRQO_BAM_MAPPING_NEW},
"other_fields_cols": [],
},
"gas": {
"field_8_cols": list(AIRQO_LOW_COST_GAS_CONFIG.values()),
"mapping": {"airqo": AIRQO_LOW_COST_GAS_CONFIG},
"field_8_cols": list(
AIRQO_LOW_COST_GAS_FIELD_MAPPING.get("field8", {}).values()
),
"mapping": {"airqo": AIRQO_LOW_COST_GAS_FIELD_MAPPING},
"other_fields_cols": [
"pm2_5",
"tvoc",
Expand All @@ -289,7 +334,9 @@ class Config:
],
},
"lowcost": {
"field_8_cols": list(AIRQO_LOW_COST_CONFIG.values()),
"field_8_cols": list(
AIRQO_LOW_COST_FIELD_MAPPING.get("field8", {}).values()
),
"mapping": {
"airqo": AIRQO_LOW_COST_FIELD_MAPPING,
"iqair": IQAIR_LOW_COST_FIELD_MAPPING,
Expand Down
6 changes: 2 additions & 4 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def format_data_types(
data[col] = (
data[col]
.astype(str)
.str.replace(r"[^\w\s\.\-:]", "", regex=True)
.str.replace(r"(?<!\.\d{3})Z$", ".000Z", regex=True)
) # Negative lookbehind to add missing milliseconds if needed
data[col] = pd.to_datetime(data[col], errors="coerce")
Expand Down Expand Up @@ -209,7 +210,7 @@ def process_data_for_message_broker(

devices = AirQoDataUtils.get_devices(group_id=caller)
devices = devices[
["tenant", "device_name", "site_id", "device_latitude", "device_longitude"]
["device_name", "site_id", "device_latitude", "device_longitude"]
]

data = pd.merge(
Expand All @@ -218,9 +219,6 @@ def process_data_for_message_broker(
on=["device_name", "site_id", "tenant"],
how="left",
)

data.rename(columns={"tenant": "network"}, inplace=True)

return data

@staticmethod
Expand Down
19 changes: 12 additions & 7 deletions src/workflows/airqo_etl_utils/schema/bam_raw_measurements.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "network",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "site_id",
"type": "STRING",
Expand Down Expand Up @@ -52,7 +57,7 @@
"mode": "NULLABLE",
"description": "μg/m3."
},
{
{
"name": "air_flow",
"type": "FLOAT",
"mode": "NULLABLE",
Expand All @@ -64,12 +69,12 @@
"mode": "NULLABLE",
"description": "m/s"
},
{
{
"name": "wind_direction",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
{
"name": "temperature",
"type": "FLOAT",
"mode": "NULLABLE",
Expand All @@ -81,13 +86,13 @@
"mode": "NULLABLE",
"description": "%. External Relative Humidity"
},
{
{
"name": "barometric_pressure",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "mmHg"
},
{
{
"name": "filter_temperature",
"type": "FLOAT",
"mode": "NULLABLE",
Expand All @@ -99,9 +104,9 @@
"mode": "NULLABLE",
"description": "%. Filter Relative Humidity"
},
{
{
"name": "status",
"type": "INTEGER",
"mode": "NULLABLE"
}
]
]
1 change: 0 additions & 1 deletion src/workflows/dags/airqo_bam_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs):
)
data = DataValidationUtils.process_data_for_message_broker(
data=data,
tenant=Tenant.AIRQO,
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
)
Expand Down
Loading