Skip to content

Commit

Permalink
Merge pull request #3950 from NicholasTurner23/update/Integration-iqa…
Browse files Browse the repository at this point in the history
…ir-devices

Update/integration iqair devices [WIP]
  • Loading branch information
Baalmart authored Dec 1, 2024
2 parents bbcfb67 + 80ff39a commit e39cc95
Show file tree
Hide file tree
Showing 9 changed files with 698 additions and 96 deletions.
118 changes: 113 additions & 5 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import simplejson
import urllib3
from urllib3.util.retry import Retry
from typing import List, Dict, Any, Union, Generator, Tuple
from typing import List, Dict, Any, Union, Generator, Tuple, Optional

from .config import configuration
from .constants import DeviceCategory, Tenant
Expand Down Expand Up @@ -178,7 +178,6 @@ def get_devices(
"""
params = {"tenant": str(Tenant.AIRQO), "category": str(device_category)}
if configuration.ENVIRONMENT == "production":
# Query for active devices only when in production
params["active"] = "yes"

if tenant != Tenant.ALL:
Expand All @@ -200,15 +199,124 @@ def get_devices(
DeviceCategory.from_str(device.pop("category", None))
),
"tenant": device.get("network"),
"device_manufacturer": Tenant.from_str(
device.pop("network")
).device_manufacturer(),
"device_manufacturer": device.get("network", "airqo"),
**device,
}
for device in response.get("devices", [])
]
return devices

def get_networks(
self, net_status: str = "active"
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""
Retrieve a list of networks.
Args:
net_status (str): The status of networks to retrieve. Defaults to "active".
Returns:
Tuple[List[Dict[str, Any]], Optional[str]]:
- List of networks (dictionaries) retrieved from the API.
- Optional error message if an exception occurs.
"""
params = {}

params = {}
networks: List[Dict[str, Any]] = []
exception_message: Optional[str] = None

if configuration.ENVIRONMENT == "production":
params["net_status"] = net_status

try:
response = self.__request("users/networks", params)
networks = response.get("networks", [])
except Exception as e:
exception_message = f"Failed to fetch networks: {e}"
logger.exception(exception_message)

return networks, exception_message

def get_devices_by_network(
self, device_category: DeviceCategory = DeviceCategory.LOW_COST
) -> List[Dict[str, Any]]:
"""
Retrieve devices by network based on the specified device category.
Args: device_category (DeviceCategory, optional): The category of devices to retrieve. Defaults to `DeviceCategory.LOW_COST`.
Returns:
List[Dict[str, Any]]: A List of dictionaries containing the details of the devices. The dictionary has the following structure.
[
{
"_id": str,
"visibility": bool,
"mobility": bool,
"height": int,
"device_codes": List[str]
"status": str,
"isPrimaryInLocation": bool,
"nextMaintenance": date(str),
"category": str,
"isActive": bool,
"long_name": str,
"network": str,
"alias": str",
"name": str,
"createdAt": date(str),
"description": str,
"latitude": float,
"longitude": float,
"approximate_distance_in_km": float,
"bearing_in_radians": float,
"deployment_date": date(str),
"mountType": str,
"powerType": str,
"recall_date": date(str),
"previous_sites": List[Dict[str, Any]],
"cohorts": List,
"site": Dict[str, Any],
"device_number": int
},
]
"""
devices: List[Dict[str, Any]] = []
networks, error = self.get_networks()

if error:
logger.error(f"Error while fetching networks: {error}")
return devices

params = {"category": str(device_category)}
if configuration.ENVIRONMENT == "production":
params["active"] = "yes"

for network in networks:
network_name = network.get("net_name", "airqo")
params["network"] = network_name
try:
response = self.__request("devices/summary", params)
devices.extend(
[
{
"site_id": device.get("site", {}).get("_id", None),
"site_location": device.pop("site", {}).get(
"location_name", None
),
"device_category": device.pop("category", None),
"device_manufacturer": network_name,
**device,
}
for device in response.get("devices", [])
]
)
except Exception as e:
logger.exception(f"Failed to fetch devices on {network_name}: {e}")
continue

return devices

def get_thingspeak_read_keys(
self, devices: pd.DataFrame, return_type: str = "all"
) -> Union[Dict[int, str], Generator[Tuple[int, str], None, None]]:
Expand Down
7 changes: 4 additions & 3 deletions src/workflows/airqo_etl_utils/airqo_gx_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
from pathlib import Path
import numpy as np
import pandas as pd

from .airqo_gx_metrics import AirQoGxExpectations
from .config import configuration
Expand Down Expand Up @@ -368,7 +369,7 @@ def digest_validation_results(
"expectation_suite_name"
]
run_result = validation_result["validation_result"]["success"]
# Not being used for at the moment
# Not being used at the moment
# local_site = validation_result["actions_results"]["update_data_docs"][
# "local_site"
# ]
Expand All @@ -378,13 +379,13 @@ def digest_validation_results(
# Type ExpectationConfig
expectation_type = result["expectation_config"]["expectation_type"]
partial_unexpected_list = [
"null" if np.isnan(x) else x
"null" if pd.isna(x) else x
for x in result["result"].get("partial_unexpected_list", [])
]

partial_unexpected_counts = [
{
"value": "null" if np.isnan(item["value"]) else item["value"],
"value": "null" if pd.isna(item["value"]) else item["value"],
"count": item["count"],
}
for item in result["result"].get("partial_unexpected_counts", [])
Expand Down
Loading

0 comments on commit e39cc95

Please sign in to comment.