Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/integration iqair devices [WIP] #3950

Conversation

NicholasTurner23
Copy link
Contributor

@NicholasTurner23 NicholasTurner23 commented Dec 1, 2024

Description

This PR does the following:

  1. Retrieves device updates by networks.
  2. Adds extraction of device measurements by networks.
  3. Considers custom data cleaning for different networks.

Related Issues

  • JIRA cards:
    • OPS-319

Summary by CodeRabbit

Release Notes

  • New Features

    • Added methods for retrieving networks and devices by network in the API.
    • Introduced a new class for querying external data sources, enhancing data retrieval capabilities.
    • New DAG for historical raw low-cost measurements added, improving data processing workflows.
  • Improvements

    • Enhanced error handling and logging across various methods.
    • Improved data validation and formatting processes, increasing robustness.
    • Expanded configuration mappings for air quality metrics, allowing for more detailed data handling.
  • Documentation

    • Updated documentation for new and existing features, providing clarity on ETL processes and data sources.

Copy link
Contributor

coderabbitai bot commented Dec 1, 2024

📝 Walkthrough
📝 Walkthrough

Walkthrough

The pull request introduces significant enhancements across several modules within the AirQo ETL utilities. Key updates include the addition of new methods in the AirQoApi class for retrieving networks and devices, improvements to data handling in the AirQoDataUtils class, and the introduction of a new DataSourcesApis class for querying external APIs. Additionally, new configuration mappings for air quality metrics are added, while existing methods are refined for better error handling and documentation. Overall, these changes enhance the functionality and maintainability of the codebase.

Changes

File Path Change Summary
src/workflows/airqo_etl_utils/airqo_api.py Added methods: get_networks, get_devices_by_network. Updated get_devices method. Minor adjustments to get_maintenance_logs. Enhanced error handling and docstrings.
src/workflows/airqo_etl_utils/airqo_gx_utils.py Added import for pandas. Replaced numpy functions with pandas equivalents in digest_validation_results. Updated comments for clarity.
src/workflows/airqo_etl_utils/airqo_utils.py Replaced ThingspeakApi with DataSourcesApis. Added map_and_extract_data method and _extract_nested_value helper method. Updated extract_devices_data.
src/workflows/airqo_etl_utils/config.py Added new dictionaries: AIRQO_LOW_COST_FIELD_MAPPING, URBANBETTER_LOW_COST_FIELD_MAPPING, IQAIR_LOW_COST_FIELD_MAPPING, device_config_mapping.
src/workflows/airqo_etl_utils/data_sources.py Introduced DataSourcesApis class with methods for querying ThingSpeak and IQAir APIs. Added robust error handling and logging.
src/workflows/airqo_etl_utils/data_validator.py Enhanced format_data_types method with detailed docstrings. Refined logic for handling data types and outlier removal.
src/workflows/airqo_etl_utils/thingspeak_api.py Removed ThingspeakApi class and its query_data method.
src/workflows/dags/airqo_measurements.py Added new DAG for historical low-cost measurements. Updated extract_raw_data function to call extract_devices_data_.
src/workflows/dags/dag_docs.py Added documentation for airqo_historical_raw_low_cost_measurements_doc. Modified existing documentation for airqo_historical_hourly_measurements_doc.

Possibly related PRs

Suggested labels

ready for review

Suggested reviewers

  • Baalmart
  • BenjaminSsempala
  • Psalmz777
  • Codebmk

🌟 In the realm of data, we boldly tread,
With networks and devices, our paths are spread.
From AirQo's heart, new methods arise,
Mapping the air, beneath open skies.
In code we trust, with clarity bright,
Enhancing our tools, to reach new heights! 🌟


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Outside diff range and nitpick comments (10)
src/workflows/airqo_etl_utils/airqo_gx_utils.py (1)

Line range hint 372-391: Consider enhancing method documentation

The method could benefit from improved documentation about null value handling. Consider adding type hints and docstring updates:

    def digest_validation_results(
        self, validation_result: Dict[str, Any]
    ) -> List[Dict[str, Any]]:
        """
        Extract relevant information from a validation result for storing.

        Args:
            validation_result (Dict[str, Any]): The validation result object from Great Expectations.

+       Notes:
+           - Handles null values in partial_unexpected_list and partial_unexpected_counts
+           - Converts null values to "null" string for BigQuery compatibility

        Returns:
            list of dict: Extracted information from the validation results.
        """
src/workflows/airqo_etl_utils/data_sources.py (1)

38-39: Fix typo in error message

There's a typo in the error message: "An error occured" should be "An error occurred".

Apply this diff to correct the typo:

-                logger.exception(f"An error occured: {ex}")
+                logger.exception(f"An error occurred: {ex}")
src/workflows/airqo_etl_utils/airqo_api.py (1)

225-225: Remove duplicate assignment of params

The variable params is assigned twice consecutively, which is unnecessary. Please remove the duplicate assignment.

Apply this diff to remove the redundant line:

-            params = {}
src/workflows/airqo_etl_utils/airqo_utils.py (4)

23-23: Remove unused import Optional

The typing.Optional import is not used in this file. Please remove it to clean up the code.

Apply this diff to remove the unused import:

-from typing import List, Dict, Any, Optional, Union
+from typing import List, Dict, Any, Union
🧰 Tools
🪛 Ruff (0.8.0)

23-23: typing.Optional imported but unused

Remove unused import: typing.Optional

(F401)


436-436: Remove unused variable mapping

The variable mapping is assigned but never used. Please remove this assignment to clean up the code.

Apply this diff to remove the unused variable:

             elif device_category == DeviceCategory.LOW_COST_GAS:
                 field_8_cols = list(configuration.AIRQO_LOW_COST_GAS_CONFIG.values())
-                mapping = configuration.AIRQO_LOW_COST_GAS_CONFIG
                 other_fields_cols.extend(
                     [
                         "pm2_5",
🧰 Tools
🪛 Ruff (0.8.0)

436-436: Local variable mapping is assigned to but never used

Remove assignment to unused variable mapping

(F841)


538-545: Clarify the purpose of the method extract_devices_data_

The method extract_devices_data_ has a trailing underscore, which may be confusing. If this method is intended to replace or extend extract_devices_data, please consider renaming it appropriately or deprecating the old method to prevent confusion.


577-579: Improve log message for missing configuration

When a device category's configuration is missing, the log message "Missing device category." might be misleading. Consider updating it to "Missing configuration for device category."

Apply this diff to improve the log message:

             if not config:
-                logger.warning("Missing device category.")
+                logger.warning("Missing configuration for device category.")
                 return devices_data
src/workflows/dags/dag_docs.py (1)

39-39: Fix typo in documentation

There's a typo in the documentation: "historica," should be "historical".

Apply this diff to correct the typo:

 ### AirQo historical raw low cost data ETL
 #### Purpose
-Extracts historica, raw measurements for low cost sensors going back 2 days.
+Extracts historical raw measurements for low-cost sensors going back 2 days.
 #### Notes
src/workflows/airqo_etl_utils/config.py (2)

202-222: LGTM! Consider adding docstring documentation.

The field mapping is well-structured and comprehensive. The nested structure for field8 effectively organizes GPS and environmental data.

Consider adding a docstring to document the purpose and structure of this mapping, particularly explaining the significance of field8's nested structure.


272-305: LGTM! Well-structured device configuration mapping.

The configuration effectively organizes device-specific settings and integrates network-specific mappings. This design supports easy addition of new device types and networks.

Consider implementing a validation mechanism to ensure that all required fields are present in the mappings when new device types or networks are added. This could help prevent runtime errors from missing field mappings.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between bbcfb67 and 202f610.

📒 Files selected for processing (9)
  • src/workflows/airqo_etl_utils/airqo_api.py (2 hunks)
  • src/workflows/airqo_etl_utils/airqo_gx_utils.py (3 hunks)
  • src/workflows/airqo_etl_utils/airqo_utils.py (7 hunks)
  • src/workflows/airqo_etl_utils/config.py (2 hunks)
  • src/workflows/airqo_etl_utils/data_sources.py (1 hunks)
  • src/workflows/airqo_etl_utils/data_validator.py (2 hunks)
  • src/workflows/airqo_etl_utils/thingspeak_api.py (0 hunks)
  • src/workflows/dags/airqo_measurements.py (3 hunks)
  • src/workflows/dags/dag_docs.py (1 hunks)
💤 Files with no reviewable changes (1)
  • src/workflows/airqo_etl_utils/thingspeak_api.py
🧰 Additional context used
🪛 Ruff (0.8.0)
src/workflows/airqo_etl_utils/data_validator.py

159-159: Function definition does not bind loop variable mapped_name

(B023)

src/workflows/airqo_etl_utils/airqo_utils.py

23-23: typing.Optional imported but unused

Remove unused import: typing.Optional

(F401)


436-436: Local variable mapping is assigned to but never used

Remove assignment to unused variable mapping

(F841)

🔇 Additional comments (7)
src/workflows/airqo_etl_utils/airqo_gx_utils.py (3)

10-10: LGTM: Explicit pandas import added

The addition of the pandas import is well-placed and necessary for the explicit pd.isna() calls in the digest_validation_results method. This also makes the pandas dependency more visible, which was previously implicit through the DataFrame usage.


382-383: LGTM: Improved null value handling

The switch from np.isnan to pd.isna is a good improvement as it provides more comprehensive null value handling, covering both np.nan and None cases.

Also applies to: 388-389


Line range hint 372-391: Verify validation results handling across networks

Since the PR objectives mention network-specific data cleaning, let's verify that the validation results handling is consistent across different networks.

✅ Verification successful

Let's search for validation and network-related patterns to better understand how validation results are handled across different networks. The previous search might have been too specific, so we're broadening our scope while maintaining focus on the relevant components.


The initial search revealed several relevant files, particularly in the ETL utils. Let's examine the data validator and expectations configuration to understand if there are network-specific validation rules or if the validation is network-agnostic.


Network-specific validation is properly implemented

The validation results handling in the code is consistent and network-aware. Here's why:

  • The DataValidationUtils.remove_outliers method specifically handles network-specific validation by checking is_airqo_network = data["network"] == "airqo" and applies appropriate column mapping for AirQo network data.
  • The validation expectations in airqo_gx_expectations.py are device-category specific (e.g., gaseous_low_cost_sensor_raw_data_check and pm2_5_low_cost_sensor_raw_data) rather than network-specific, ensuring consistent validation across networks for similar device types.
  • The VALID_SENSOR_RANGES dictionary in data_validator.py defines universal validation ranges that are applied appropriately based on the network context.
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for network-specific validation configurations or expectations
rg -l "network.*expect" --type py

# Search for any network-specific data cleaning patterns
ast-grep --pattern 'def $FUNC($ARGS) {
  $$$
  network
  $$$
}'

Length of output: 101


Script:

#!/bin/bash
# Let's try a broader search for validation-related code
rg -l "validation" --type py

# Search for network-specific data handling
rg -l "network.*data" --type py

# Look for files containing both validation and network patterns
rg -l "validation.*network|network.*validation" --type py

# Search for expectation configurations
rg -A 5 "expectation_config" --type py

Length of output: 5164


Script:

#!/bin/bash
# Let's examine the data validator implementation for network-specific handling
cat src/workflows/airqo_etl_utils/data_validator.py

# Check the expectations configuration file
cat src/workflows/airqo_etl_utils/airqo_gx_expectations.py

# Look for network-specific data processing in airqo_utils
rg -A 5 "def.*clean" src/workflows/airqo_etl_utils/airqo_utils.py

Length of output: 18895

src/workflows/airqo_etl_utils/config.py (2)

223-235: LGTM! Field mapping is complete and well-structured.

The mapping covers all necessary pollutant measurements and their corresponding PI values with a consistent naming convention.


237-245: LGTM! Verify IQAir API response format.

The mapping structure aligns well with the PR objectives for IQAir integration. The nested structure for PM measurements provides good flexibility.

✅ Verification successful

Field mapping structure verified and matches IQAir API implementation

The field mapping structure is correctly implemented based on the codebase evidence:

  • The iqair method in data_sources.py retrieves data from the IQAir API
  • The mapping is used in airqo_utils.py through AirQoDataUtils.map_and_extract_data
  • Field names in IQAIR_LOW_COST_FIELD_MAPPING align with the data processing flow

The nested structure for PM measurements ({"key": "pm2_5", "value": "conc"}) provides the necessary flexibility to handle the API response format, while simpler mappings are used for environmental parameters.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify if the field names match IQAir API response format
# Search for IQAir API response handling or documentation in the codebase

rg -g '!*.{json,md}' -A 5 'iqair.*response'

Length of output: 45


Script:

#!/bin/bash
# Let's try a broader search for IQAir related code and data handling

# Search for IQAir related files
echo "=== Files containing IQAir references ==="
fd -t f . | rg -i "iqair"

# Search for any IQAir related code patterns
echo -e "\n=== IQAir code patterns ==="
rg -i "iqair" -A 5 -B 5

# Look for data transformation or processing related to IQAir
echo -e "\n=== Data processing patterns ==="
ast-grep --pattern 'def $_(self, $$$):
    $$$
    iqair
    $$$'

Length of output: 5744

src/workflows/dags/airqo_measurements.py (2)

Line range hint 134-193: LGTM! Well-structured DAG implementation.

The DAG effectively implements the historical data processing pipeline with appropriate task dependencies and error handling.


Line range hint 496-500: Verify the implementation of extract_devices_data_.

The change from extract_devices_data to extract_devices_data_ aligns with the PR objectives for network-specific data extraction.

✅ Verification successful

Implementation of extract_devices_data_ verified and aligned with network-specific data extraction

The change from extract_devices_data to extract_devices_data_ is verified as a valid enhancement. The new implementation:

  • Adds network-specific data extraction capabilities (airqo and iqair networks)
  • Maintains backward compatibility with existing parameters
  • Includes proper error handling and logging
  • Implements network-specific data mapping and processing
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the implementation and usage of extract_devices_data_

# Search for the implementation
ast-grep --pattern 'def extract_devices_data_($$$)'

# Search for all usages
rg 'extract_devices_data_\(' -A 3

Length of output: 758


Script:

#!/bin/bash
# Let's check for the implementation details of extract_devices_data_ and its predecessor
ast-grep --pattern 'def extract_devices_data_($$$) {
  $$$
}'

# Also search for the old method to understand the transition
ast-grep --pattern 'def extract_devices_data($$$) {
  $$$
}'

# Let's also check for any other usages of both methods
rg 'extract_devices_data[_]?\(' --type python -A 5

Length of output: 210


Script:

#!/bin/bash
# Let's check for the implementation details of both methods without ast-grep syntax
ast-grep --pattern 'def extract_devices_data_'

# Get the content of the file containing the implementation
rg -l 'def extract_devices_data_' | xargs cat

# Search for all usages of both methods
rg 'extract_devices_data[_]?'

Length of output: 52827

Comment on lines 144 to 149
logger.exception(
ValueError(
"Device information must include 'api_code' and 'serial_number'."
)
)
url = f"{base_url}/{device_id}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling when device information is missing

The code is incorrectly using logger.exception with a ValueError instance. Instead, log an error message and raise the exception to handle it properly.

Apply this diff to fix the error handling:

             if not base_url or not device_id:
-                logger.exception(
-                    ValueError(
-                        "Device information must include 'api_code' and 'serial_number'."
-                    )
-                )
+                error_msg = "Device information must include 'api_code' and 'serial_number'."
+                logger.error(error_msg)
+                raise ValueError(error_msg)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.exception(
ValueError(
"Device information must include 'api_code' and 'serial_number'."
)
)
url = f"{base_url}/{device_id}"
if not base_url or not device_id:
error_msg = "Device information must include 'api_code' and 'serial_number'."
logger.error(error_msg)
raise ValueError(error_msg)
url = f"{base_url}/{device_id}"

Comment on lines 104 to 125
"""
Retrieve data from the IQAir API for a specific device and resolution.

Args:
device (Dict[str, Any]): A dictionary containing device details, such as:
- api_code (str): The base URL or endpoint for the API.
- serial_number (str): The unique identifier for the device.
resolution (str): The data resolution to retrieve. Options include:
- "current": Real-time data (default).
- "instant": Instantaneous measurements.
- "hourly": Hourly aggregated data.
- "daily": Daily aggregated data.
- "monthly": Monthly aggregated data.

Returns:
pd.DataFrame: A DataFrame containing the retrieved data, or an empty DataFrame in case of errors or no data.

Raises:
ValueError: If an invalid resolution is provided or if the response data is invalid or malformed.
requests.exceptions.RequestException: For issues with the HTTP request.
Exception: For any other unexpected errors.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct the docstring return type to match the actual return value

The iqair method's docstring specifies the return type as pd.DataFrame, but the method returns a Union[List, Dict]. Please update the docstring to reflect the correct return type.

Apply this diff to correct the docstring:

@@ -118,8 +118,8 @@
     Returns:
-        pd.DataFrame: A DataFrame containing the retrieved data, or an empty DataFrame in case of errors or no data.
+        Union[List, Dict]: A list or dictionary containing the retrieved data, or `None` in case of errors or no data.
 
     Raises:
         ValueError: If an invalid resolution is provided or if the response data is invalid or malformed.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"""
Retrieve data from the IQAir API for a specific device and resolution.
Args:
device (Dict[str, Any]): A dictionary containing device details, such as:
- api_code (str): The base URL or endpoint for the API.
- serial_number (str): The unique identifier for the device.
resolution (str): The data resolution to retrieve. Options include:
- "current": Real-time data (default).
- "instant": Instantaneous measurements.
- "hourly": Hourly aggregated data.
- "daily": Daily aggregated data.
- "monthly": Monthly aggregated data.
Returns:
pd.DataFrame: A DataFrame containing the retrieved data, or an empty DataFrame in case of errors or no data.
Raises:
ValueError: If an invalid resolution is provided or if the response data is invalid or malformed.
requests.exceptions.RequestException: For issues with the HTTP request.
Exception: For any other unexpected errors.
"""
"""
Retrieve data from the IQAir API for a specific device and resolution.
Args:
device (Dict[str, Any]): A dictionary containing device details, such as:
- api_code (str): The base URL or endpoint for the API.
- serial_number (str): The unique identifier for the device.
resolution (str): The data resolution to retrieve. Options include:
- "current": Real-time data (default).
- "instant": Instantaneous measurements.
- "hourly": Hourly aggregated data.
- "daily": Daily aggregated data.
- "monthly": Monthly aggregated data.
Returns:
Union[List, Dict]: A list or dictionary containing the retrieved data, or `None` in case of errors or no data.
Raises:
ValueError: If an invalid resolution is provided or if the response data is invalid or malformed.
requests.exceptions.RequestException: For issues with the HTTP request.
Exception: For any other unexpected errors.
"""

Comment on lines 50 to 69
"""
Fetch data from a ThingSpeak channel for a specific device within a time range.

Args:
device_number (int): The ThingSpeak channel ID corresponding to the device.
start_date_time (str): The start timestamp in ISO 8601 format (e.g., "YYYY-MM-DDTHH:mm:ssZ").
end_date_time (str): The end timestamp in ISO 8601 format.
read_key (str): The API key to authenticate the request for the specified channel.

Returns:
Optional[Tuple[List[Dict[str, Any]], Dict[str, Any]]]:
- A list of dictionaries containing the channel feeds data.
- A dictionary containing metadata about the channel.
- Returns `None` if no valid data is found or an error occurs.

Raises:
requests.exceptions.RequestException: For issues with the HTTP request.
ValueError: If the response data is invalid or malformed.
Exception: For any other unexpected errors.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Update docstring to include data_available in the return value

The thingspeak method returns a tuple with a bool indicating data availability, but the docstring does not mention it. Please update the docstring to reflect the correct return type.

Apply this diff to update the docstring:

@@ -59,10 +59,11 @@
     Returns:
-        Optional[Tuple[List[Dict[str, Any]], Dict[str, Any]]]:
+        Optional[Tuple[List[Dict[str, Any]], Dict[str, Any], bool]]:
             - A list of dictionaries containing the channel feeds data.
             - A dictionary containing metadata about the channel.
+            - A boolean indicating whether data is available.
             - Returns `None` if no valid data is found or an error occurs.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"""
Fetch data from a ThingSpeak channel for a specific device within a time range.
Args:
device_number (int): The ThingSpeak channel ID corresponding to the device.
start_date_time (str): The start timestamp in ISO 8601 format (e.g., "YYYY-MM-DDTHH:mm:ssZ").
end_date_time (str): The end timestamp in ISO 8601 format.
read_key (str): The API key to authenticate the request for the specified channel.
Returns:
Optional[Tuple[List[Dict[str, Any]], Dict[str, Any]]]:
- A list of dictionaries containing the channel feeds data.
- A dictionary containing metadata about the channel.
- Returns `None` if no valid data is found or an error occurs.
Raises:
requests.exceptions.RequestException: For issues with the HTTP request.
ValueError: If the response data is invalid or malformed.
Exception: For any other unexpected errors.
"""
"""
Fetch data from a ThingSpeak channel for a specific device within a time range.
Args:
device_number (int): The ThingSpeak channel ID corresponding to the device.
start_date_time (str): The start timestamp in ISO 8601 format (e.g., "YYYY-MM-DDTHH:mm:ssZ").
end_date_time (str): The end timestamp in ISO 8601 format.
read_key (str): The API key to authenticate the request for the specified channel.
Returns:
Optional[Tuple[List[Dict[str, Any]], Dict[str, Any], bool]]:
- A list of dictionaries containing the channel feeds data.
- A dictionary containing metadata about the channel.
- A boolean indicating whether data is available.
- Returns `None` if no valid data is found or an error occurs.
Raises:
requests.exceptions.RequestException: For issues with the HTTP request.
ValueError: If the response data is invalid or malformed.
Exception: For any other unexpected errors.
"""

Comment on lines +302 to +309
"site_id": device.get("site", {}).get("_id", None),
"site_location": device.pop("site", {}).get(
"location_name", None
),
"device_category": device.pop("category", None),
"device_manufacturer": network_name,
**device,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid using pop when extracting values to prevent unintended side effects

Using device.pop("site", {}).get(...) modifies the device dictionary in place, which could lead to unexpected behavior if device is used elsewhere after this operation. Consider using device.get("site", {}).get(...) instead to safely access the values without modifying the original dictionary.

Apply this diff to replace pop with get:

             {
                 "site_id": device.get("site", {}).get("_id", None),
-                "site_location": device.pop("site", {}).get(
+                "site_location": device.get("site", {}).get(
                     "location_name", None
                 ),
-                "device_category": device.pop("category", None),
+                "device_category": device.get("category", None),
                 "device_manufacturer": network_name,
                 **device,
             }

Similarly, review other instances where pop is used when extracting values to prevent unintended side effects.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"site_id": device.get("site", {}).get("_id", None),
"site_location": device.pop("site", {}).get(
"location_name", None
),
"device_category": device.pop("category", None),
"device_manufacturer": network_name,
**device,
}
"site_id": device.get("site", {}).get("_id", None),
"site_location": device.get("site", {}).get(
"location_name", None
),
"device_category": device.get("category", None),
"device_manufacturer": network_name,
**device,
}

logger.exception(f"An error occured: {e} - device {device['name']}")
continue
if isinstance(data, pd.DataFrame) and data.empty:
logger.exception(f"No data received from {device['name']}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use appropriate logging level instead of logger.exception

When logging that no data is received from a device, use logger.warning or logger.info instead of logger.exception, as no exception has occurred.

Apply this diff to correct the logging level:

                 if isinstance(data, pd.DataFrame) and data.empty:
-                    logger.exception(f"No data received from {device['name']}")
+                    logger.warning(f"No data received from {device['name']}")
                     continue

Committable suggestion skipped: line range outside the PR's diff.

lambda x: DataValidationUtils.get_valid_value(
column_name=name, row_value=x
column_name=mapped_name, row_value=x
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix closure over loop variable mapped_name in lambda function

Using the loop variable mapped_name inside the lambda function may cause unintended behavior due to late binding. Consider passing mapped_name as a default argument to the lambda to capture its current value.

Apply this diff to fix the issue:

             for col in validated_columns:
                 is_airqo_network = data["network"] == "airqo"
                 mapped_name = configuration.AIRQO_DATA_COLUMN_NAME_MAPPING.get(col, None)
                 data.loc[is_airqo_network, col] = data.loc[is_airqo_network, col].apply(
-                    lambda x: DataValidationUtils.get_valid_value(
+                    lambda x, mapped_name=mapped_name: DataValidationUtils.get_valid_value(
                         column_name=mapped_name, row_value=x
                     )
                 )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
column_name=mapped_name, row_value=x
for col in validated_columns:
is_airqo_network = data["network"] == "airqo"
mapped_name = configuration.AIRQO_DATA_COLUMN_NAME_MAPPING.get(col, None)
data.loc[is_airqo_network, col] = data.loc[is_airqo_network, col].apply(
lambda x, mapped_name=mapped_name: DataValidationUtils.get_valid_value(
column_name=mapped_name, row_value=x
)
)
🧰 Tools
🪛 Ruff (0.8.0)

159-159: Function definition does not bind loop variable mapped_name

(B023)

Copy link

codecov bot commented Dec 1, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 12.22%. Comparing base (bbcfb67) to head (80ff39a).
Report is 11 commits behind head on staging.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff             @@
##           staging    #3950      +/-   ##
===========================================
+ Coverage    11.73%   12.22%   +0.48%     
===========================================
  Files          114       51      -63     
  Lines        15326     4670   -10656     
  Branches       318      318              
===========================================
- Hits          1799      571    -1228     
+ Misses       13527     4099    -9428     

see 63 files with indirect coverage changes

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (3)
src/workflows/airqo_etl_utils/data_sources.py (1)

95-97: Use appropriate logging level for expected scenarios

Using logger.exception is typically reserved for logging actual exceptions with their stack traces. When logging that a device doesn't have data within a time range, this is an expected scenario that should use logger.warning or logger.info.

Apply this diff to use the appropriate logging level:

-            logger.exception(
+            logger.warning(
                 f"Device does not have data between {start_date_time} and {end_date_time}"
             )
src/workflows/airqo_etl_utils/airqo_utils.py (2)

23-23: Address static analysis warnings

The code has some minor issues flagged by static analysis:

  1. Unused import: Optional from typing
  2. Unused variable: mapping in the device category handling

Apply these diffs to clean up the code:

-from typing import List, Dict, Any, Optional, Union
+from typing import List, Dict, Any, Union

-            mapping = configuration.AIRQO_LOW_COST_GAS_CONFIG

Also applies to: 436-436

🧰 Tools
🪛 Ruff (0.8.0)

23-23: typing.Optional imported but unused

Remove unused import: typing.Optional

(F401)


174-255: Enhance type hints and documentation in map_and_extract_data

The method has good structure but could benefit from more detailed type hints and documentation:

  1. Add return type hint for process_single_entry
  2. Clarify the structure of data_mapping in docstring
  3. Add examples for different data formats

Apply this diff to enhance the documentation:

     @staticmethod
     def map_and_extract_data(
         data_mapping: Dict[str, Union[str, Dict[str, List[str]]]],
         data: Union[List[Any], Dict[str, Any]],
     ) -> pd.DataFrame:
         """
         Map and extract specified fields from input data based on a provided mapping and extraction fields.
 
         Args:
-            data_mapping (Dict[str, str]): A dictionary mapping source keys to target keys.
+            data_mapping (Dict[str, Union[str, Dict[str, List[str]]]]): A dictionary mapping source keys to target keys.
                 Example: {"pm25": "pm2_5", "pm10": "pm10", "tp": "temperature"}
+                For field8 mapping: {"field8": {"0": "temperature", "1": "humidity"}}
             data (Dict[str, Any]|): Input data containing raw key-value pairs to map and extract.
                 Example:
                 {
                     "pm25": {"conc": 21, "aqius": 73, "aqicn": 30},
                     "pm10": {"conc": 37, "aqius": 34, "aqicn": 37},
                     "pr": 100836,
                     "hm": 28,
                     "tp": 39.7,
                     "ts": "2024-11-24T13:14:40.000Z"
                 }
+                Or with field8:
+                {
+                    "field8": "39.7,28",
+                    "field1": "21",
+                    "field2": "37"
+                }
 
         Returns:
             pd.Series: A pandas Series containing the mapped and extracted data.
         """
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 202f610 and 80ff39a.

📒 Files selected for processing (3)
  • src/workflows/airqo_etl_utils/airqo_api.py (2 hunks)
  • src/workflows/airqo_etl_utils/airqo_utils.py (7 hunks)
  • src/workflows/airqo_etl_utils/data_sources.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.0)
src/workflows/airqo_etl_utils/airqo_utils.py

23-23: typing.Optional imported but unused

Remove unused import: typing.Optional

(F401)


436-436: Local variable mapping is assigned to but never used

Remove assignment to unused variable mapping

(F841)

🔇 Additional comments (2)
src/workflows/airqo_etl_utils/data_sources.py (1)

144-146: ⚠️ Potential issue

Improve error handling for missing device information

The code uses logger.exception without an actual exception and doesn't raise an error when required device information is missing. This could lead to silent failures and undefined behavior.

Apply this diff to fix the error handling:

-                logger.exception(
-                    "Device information must include 'api_code' and 'serial_number'."
-                )
+                error_msg = "Device information must include 'api_code' and 'serial_number'."
+                logger.error(error_msg)
+                raise ValueError(error_msg)

Likely invalid or redundant comment.

src/workflows/airqo_etl_utils/airqo_api.py (1)

202-202: Verify the default device manufacturer change

The modification to default the device_manufacturer to "airqo" when network is not specified could affect existing integrations. Ensure this change aligns with the business logic and verify its impact on the system.

Run the following script to check the usage of device_manufacturer:

✅ Verification successful

Default device manufacturer change is consistent with existing patterns

The codebase analysis shows that "airqo" is consistently used as the default network value across multiple components:

  • The device model explicitly handles "airqo" network cases (src/device-registry/models/Device.js)
  • Multiple data processing utilities check for "network == 'airqo'" condition (src/workflows/airqo_etl_utils/data_validator.py, src/workflows/airqo_etl_utils/airqo_utils.py)
  • Device status and uptime services specifically filter for "airqo" network devices
  • The device_manufacturer field is optional in the device registry validators and follows the same pattern as network handling

The change to default device_manufacturer to "airqo" when network is not specified aligns with the existing business logic and system-wide patterns.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check how device_manufacturer is used across the codebase

# Search for device_manufacturer references
rg -A 5 "device_manufacturer"

# Search for specific network-related patterns
rg "network.*=.*airqo"

Length of output: 14892


try:
url = f"{self.THINGSPEAK_CHANNEL_URL}{device_number}/feeds.json?start={start_date_time}&end={end_date_time}&api_key={read_key}"
print(f"{url}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve logging and error handling in the query_data method

Several improvements can be made to enhance the robustness of this method:

  1. Replace the print statement with proper logging
  2. Handle specific exceptions separately
  3. Log meaningful messages when returning empty DataFrame

Apply this diff to implement the improvements:

-            print(f"{url}")
+            logger.info(f"Querying data from URL: {url}")

-        except Exception as ex:
-            logger.exception(f"An error occured: {ex}")
+        except requests.exceptions.RequestException as req_err:
+            logger.error(f"Request error while fetching data: {req_err}")
+        except ValueError as val_err:
+            logger.error(f"Value error while processing data: {val_err}")
+        except Exception as ex:
+            logger.exception(f"Unexpected error occurred: {ex}")
+        
+        if data.empty:
+            logger.warning("No data retrieved from the API")

Also applies to: 38-40

Copy link
Contributor

@Baalmart Baalmart left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Baalmart Baalmart merged commit e39cc95 into airqo-platform:staging Dec 1, 2024
48 checks passed
@Baalmart Baalmart mentioned this pull request Dec 1, 2024
1 task
This was referenced Dec 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants