Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/integration iqair devices [WIP] #3950

118 changes: 113 additions & 5 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import simplejson
import urllib3
from urllib3.util.retry import Retry
from typing import List, Dict, Any, Union, Generator, Tuple
from typing import List, Dict, Any, Union, Generator, Tuple, Optional

from .config import configuration
from .constants import DeviceCategory, Tenant
Expand Down Expand Up @@ -178,7 +178,6 @@ def get_devices(
"""
params = {"tenant": str(Tenant.AIRQO), "category": str(device_category)}
if configuration.ENVIRONMENT == "production":
# Query for active devices only when in production
params["active"] = "yes"

if tenant != Tenant.ALL:
Expand All @@ -200,15 +199,124 @@ def get_devices(
DeviceCategory.from_str(device.pop("category", None))
),
"tenant": device.get("network"),
"device_manufacturer": Tenant.from_str(
device.pop("network")
).device_manufacturer(),
"device_manufacturer": device.get("network", "airqo"),
**device,
}
for device in response.get("devices", [])
]
return devices

def get_networks(
self, net_status: str = "active"
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""
Retrieve a list of networks.
Args:
net_status (str): The status of networks to retrieve. Defaults to "active".
Returns:
Tuple[List[Dict[str, Any]], Optional[str]]:
- List of networks (dictionaries) retrieved from the API.
- Optional error message if an exception occurs.
"""
params = {}

params = {}
networks: List[Dict[str, Any]] = []
exception_message: Optional[str] = None

if configuration.ENVIRONMENT == "production":
params["net_status"] = net_status

try:
response = self.__request("users/networks", params)
networks = response.get("networks", [])
except Exception as e:
exception_message = f"Failed to fetch networks: {e}"
logger.exception(exception_message)

return networks, exception_message

def get_devices_by_network(
self, device_category: DeviceCategory = DeviceCategory.LOW_COST
) -> List[Dict[str, Any]]:
"""
Retrieve devices by network based on the specified device category.
Args: device_category (DeviceCategory, optional): The category of devices to retrieve. Defaults to `DeviceCategory.LOW_COST`.
Returns:
List[Dict[str, Any]]: A List of dictionaries containing the details of the devices. The dictionary has the following structure.
[
{
"_id": str,
"visibility": bool,
"mobility": bool,
"height": int,
"device_codes": List[str]
"status": str,
"isPrimaryInLocation": bool,
"nextMaintenance": date(str),
"category": str,
"isActive": bool,
"long_name": str,
"network": str,
"alias": str",
"name": str,
"createdAt": date(str),
"description": str,
"latitude": float,
"longitude": float,
"approximate_distance_in_km": float,
"bearing_in_radians": float,
"deployment_date": date(str),
"mountType": str,
"powerType": str,
"recall_date": date(str),
"previous_sites": List[Dict[str, Any]],
"cohorts": List,
"site": Dict[str, Any],
"device_number": int
},
]
"""
devices: List[Dict[str, Any]] = []
networks, error = self.get_networks()

if error:
logger.error(f"Error while fetching networks: {error}")
return devices

params = {"category": str(device_category)}
if configuration.ENVIRONMENT == "production":
params["active"] = "yes"

for network in networks:
network_name = network.get("net_name", "airqo")
params["network"] = network_name
try:
response = self.__request("devices/summary", params)
devices.extend(
[
{
"site_id": device.get("site", {}).get("_id", None),
"site_location": device.pop("site", {}).get(
"location_name", None
),
"device_category": device.pop("category", None),
"device_manufacturer": network_name,
**device,
}
Comment on lines +303 to +310
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid using pop when extracting values to prevent unintended side effects

Using device.pop("site", {}).get(...) modifies the device dictionary in place, which could lead to unexpected behavior if device is used elsewhere after this operation. Consider using device.get("site", {}).get(...) instead to safely access the values without modifying the original dictionary.

Apply this diff to replace pop with get:

             {
                 "site_id": device.get("site", {}).get("_id", None),
-                "site_location": device.pop("site", {}).get(
+                "site_location": device.get("site", {}).get(
                     "location_name", None
                 ),
-                "device_category": device.pop("category", None),
+                "device_category": device.get("category", None),
                 "device_manufacturer": network_name,
                 **device,
             }

Similarly, review other instances where pop is used when extracting values to prevent unintended side effects.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"site_id": device.get("site", {}).get("_id", None),
"site_location": device.pop("site", {}).get(
"location_name", None
),
"device_category": device.pop("category", None),
"device_manufacturer": network_name,
**device,
}
"site_id": device.get("site", {}).get("_id", None),
"site_location": device.get("site", {}).get(
"location_name", None
),
"device_category": device.get("category", None),
"device_manufacturer": network_name,
**device,
}

for device in response.get("devices", [])
]
)
except Exception as e:
logger.exception(f"Failed to fetch devices on {network_name}: {e}")
continue

return devices

def get_thingspeak_read_keys(
self, devices: pd.DataFrame, return_type: str = "all"
) -> Union[Dict[int, str], Generator[Tuple[int, str], None, None]]:
Expand Down
7 changes: 4 additions & 3 deletions src/workflows/airqo_etl_utils/airqo_gx_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
from pathlib import Path
import numpy as np
import pandas as pd

from .airqo_gx_metrics import AirQoGxExpectations
from .config import configuration
Expand Down Expand Up @@ -368,7 +369,7 @@ def digest_validation_results(
"expectation_suite_name"
]
run_result = validation_result["validation_result"]["success"]
# Not being used for at the moment
# Not being used at the moment
# local_site = validation_result["actions_results"]["update_data_docs"][
# "local_site"
# ]
Expand All @@ -378,13 +379,13 @@ def digest_validation_results(
# Type ExpectationConfig
expectation_type = result["expectation_config"]["expectation_type"]
partial_unexpected_list = [
"null" if np.isnan(x) else x
"null" if pd.isna(x) else x
for x in result["result"].get("partial_unexpected_list", [])
]

partial_unexpected_counts = [
{
"value": "null" if np.isnan(item["value"]) else item["value"],
"value": "null" if pd.isna(item["value"]) else item["value"],
"count": item["count"],
}
for item in result["result"].get("partial_unexpected_counts", [])
Expand Down
Loading
Loading