Skip to content

Commit

Permalink
Merge pull request #3995 from NicholasTurner23/update/Integration-iqa…
Browse files Browse the repository at this point in the history
…ir-devices

Update/integration iqair devices
  • Loading branch information
Baalmart authored Dec 4, 2024
2 parents 50c8a62 + 9b2d45b commit 369cafd
Show file tree
Hide file tree
Showing 8 changed files with 775 additions and 740 deletions.
21 changes: 14 additions & 7 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,19 +758,26 @@ def clean_low_cost_sensor_data(
AirQoGxExpectations.from_pandas().pm2_5_low_cost_sensor_raw_data(
data
)

else:
data["timestamp"] = pd.to_datetime(data["timestamp"])
data.dropna(subset=["timestamp"], inplace=True)
data["timestamp"] = pd.to_datetime(data["timestamp"])

data.drop_duplicates(
subset=["timestamp", "device_id"], keep="first", inplace=True
)
# TODO Find an appropriate place to put this
if device_category == DeviceCategory.LOW_COST:
data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)
data["pm10"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)
is_airqo_network = data["network"] == "airqo"

pm2_5_mean = data.loc[is_airqo_network, ["s1_pm2_5", "s2_pm2_5"]].mean(
axis=1
)
pm10_mean = data.loc[is_airqo_network, ["s1_pm10", "s2_pm10"]].mean(axis=1)

data.loc[is_airqo_network, "pm2_5_raw_value"] = pm2_5_mean
data.loc[is_airqo_network, "pm2_5"] = pm2_5_mean
data.loc[is_airqo_network, "pm10_raw_value"] = pm10_mean
data.loc[is_airqo_network, "pm10"] = pm10_mean
return data

@staticmethod
Expand Down Expand Up @@ -1033,7 +1040,7 @@ def merge_aggregated_weather_data(
@staticmethod
def extract_devices_deployment_logs() -> pd.DataFrame:
airqo_api = AirQoApi()
devices = airqo_api.get_devices(tenant=Tenant.AIRQO)
devices = airqo_api.get_devices(network=str(Tenant.AIRQO))
devices_history = pd.DataFrame()
for device in devices:
try:
Expand Down
6 changes: 2 additions & 4 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ def format_data_types(
data[col] = (
data[col]
.astype(str)
.str.replace(r"[^\w\s\.\-:]", "", regex=True)
.str.replace(r"[^\w\s\.\-+:]", "", regex=True)
.str.replace(r"(?<!\.\d{3})Z$", ".000Z", regex=True)
) # Negative lookbehind to add missing milliseconds if needed
data[col] = pd.to_datetime(data[col], errors="coerce")
data[col] = pd.to_datetime(data[col], errors="coerce", utc=True)

if integers:
for col in integers:
Expand Down Expand Up @@ -142,7 +142,6 @@ def remove_outliers(data: pd.DataFrame) -> pd.DataFrame:
dtype: list(set(columns) & set(data.columns))
for dtype, columns in column_types.items()
}

data = DataValidationUtils.format_data_types(
data=data,
floats=filtered_columns[ColumnDataType.FLOAT],
Expand All @@ -151,7 +150,6 @@ def remove_outliers(data: pd.DataFrame) -> pd.DataFrame:
)

validated_columns = list(chain.from_iterable(filtered_columns.values()))

for col in validated_columns:
is_airqo_network = data["network"] == "airqo"
mapped_name = configuration.AIRQO_DATA_COLUMN_NAME_MAPPING.get(col, None)
Expand Down
319 changes: 162 additions & 157 deletions src/workflows/airqo_etl_utils/schema/airqo_mobile_measurements.json
Original file line number Diff line number Diff line change
@@ -1,158 +1,163 @@
[
{
"name": "tenant",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "timestamp",
"type": "TIMESTAMP",
"mode": "NULLABLE"
},
{
"name": "device_number",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "device_id",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "latitude",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "longitude",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "s1_pm2_5",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "s2_pm2_5",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "s1_pm10",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "s2_pm10",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "altitude",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "battery",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "satellites",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "hdop",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "device_temperature",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "device_humidity",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "pm2_5_calibrated_value",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "pm10_calibrated_value",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "pm2_5_raw_value",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "pm10_raw_value",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "temperature",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "°C."
},
{
"name": "humidity",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "%."
},
{
"name": "wind_speed",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "m/s."
},
{
"name": "atmospheric_pressure",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "kPa."
},
{
"name": "radiation",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "W/m2."
},
{
"name": "wind_gusts",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "m/s."
},
{
"name": "precipitation",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "mm."
},
{
"name": "wind_direction",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "degrees"
}
]
{
"name": "tenant",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "network",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "timestamp",
"type": "TIMESTAMP",
"mode": "NULLABLE"
},
{
"name": "device_number",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "device_id",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "latitude",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "longitude",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "s1_pm2_5",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "s2_pm2_5",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "s1_pm10",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "s2_pm10",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "altitude",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "battery",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "satellites",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "hdop",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "device_temperature",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "device_humidity",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "pm2_5_calibrated_value",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "pm10_calibrated_value",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "pm2_5_raw_value",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "pm10_raw_value",
"type": "FLOAT",
"mode": "NULLABLE",
"description": " μg/m3."
},
{
"name": "temperature",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "°C."
},
{
"name": "humidity",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "%."
},
{
"name": "wind_speed",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "m/s."
},
{
"name": "atmospheric_pressure",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "kPa."
},
{
"name": "radiation",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "W/m2."
},
{
"name": "wind_gusts",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "m/s."
},
{
"name": "precipitation",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "mm."
},
{
"name": "wind_direction",
"type": "FLOAT",
"mode": "NULLABLE",
"description": "degrees"
}
]
Loading

0 comments on commit 369cafd

Please sign in to comment.