-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update/integration iqair devices [WIP] #3950
Update/integration iqair devices [WIP] #3950
Conversation
Updates from airqo staging
📝 Walkthrough📝 WalkthroughWalkthroughThe pull request introduces significant enhancements across several modules within the AirQo ETL utilities. Key updates include the addition of new methods in the Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Outside diff range and nitpick comments (10)
src/workflows/airqo_etl_utils/airqo_gx_utils.py (1)
Line range hint
372-391
: Consider enhancing method documentationThe method could benefit from improved documentation about null value handling. Consider adding type hints and docstring updates:
def digest_validation_results( self, validation_result: Dict[str, Any] ) -> List[Dict[str, Any]]: """ Extract relevant information from a validation result for storing. Args: validation_result (Dict[str, Any]): The validation result object from Great Expectations. + Notes: + - Handles null values in partial_unexpected_list and partial_unexpected_counts + - Converts null values to "null" string for BigQuery compatibility Returns: list of dict: Extracted information from the validation results. """src/workflows/airqo_etl_utils/data_sources.py (1)
38-39
: Fix typo in error messageThere's a typo in the error message: "An error occured" should be "An error occurred".
Apply this diff to correct the typo:
- logger.exception(f"An error occured: {ex}") + logger.exception(f"An error occurred: {ex}")src/workflows/airqo_etl_utils/airqo_api.py (1)
225-225
: Remove duplicate assignment ofparams
The variable
params
is assigned twice consecutively, which is unnecessary. Please remove the duplicate assignment.Apply this diff to remove the redundant line:
- params = {}
src/workflows/airqo_etl_utils/airqo_utils.py (4)
23-23
: Remove unused importOptional
The
typing.Optional
import is not used in this file. Please remove it to clean up the code.Apply this diff to remove the unused import:
-from typing import List, Dict, Any, Optional, Union +from typing import List, Dict, Any, Union🧰 Tools
🪛 Ruff (0.8.0)
23-23:
typing.Optional
imported but unusedRemove unused import:
typing.Optional
(F401)
436-436
: Remove unused variablemapping
The variable
mapping
is assigned but never used. Please remove this assignment to clean up the code.Apply this diff to remove the unused variable:
elif device_category == DeviceCategory.LOW_COST_GAS: field_8_cols = list(configuration.AIRQO_LOW_COST_GAS_CONFIG.values()) - mapping = configuration.AIRQO_LOW_COST_GAS_CONFIG other_fields_cols.extend( [ "pm2_5",
🧰 Tools
🪛 Ruff (0.8.0)
436-436: Local variable
mapping
is assigned to but never usedRemove assignment to unused variable
mapping
(F841)
538-545
: Clarify the purpose of the methodextract_devices_data_
The method
extract_devices_data_
has a trailing underscore, which may be confusing. If this method is intended to replace or extendextract_devices_data
, please consider renaming it appropriately or deprecating the old method to prevent confusion.
577-579
: Improve log message for missing configurationWhen a device category's configuration is missing, the log message "Missing device category." might be misleading. Consider updating it to "Missing configuration for device category."
Apply this diff to improve the log message:
if not config: - logger.warning("Missing device category.") + logger.warning("Missing configuration for device category.") return devices_datasrc/workflows/dags/dag_docs.py (1)
39-39
: Fix typo in documentationThere's a typo in the documentation: "historica," should be "historical".
Apply this diff to correct the typo:
### AirQo historical raw low cost data ETL #### Purpose -Extracts historica, raw measurements for low cost sensors going back 2 days. +Extracts historical raw measurements for low-cost sensors going back 2 days. #### Notessrc/workflows/airqo_etl_utils/config.py (2)
202-222
: LGTM! Consider adding docstring documentation.The field mapping is well-structured and comprehensive. The nested structure for field8 effectively organizes GPS and environmental data.
Consider adding a docstring to document the purpose and structure of this mapping, particularly explaining the significance of field8's nested structure.
272-305
: LGTM! Well-structured device configuration mapping.The configuration effectively organizes device-specific settings and integrates network-specific mappings. This design supports easy addition of new device types and networks.
Consider implementing a validation mechanism to ensure that all required fields are present in the mappings when new device types or networks are added. This could help prevent runtime errors from missing field mappings.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (9)
src/workflows/airqo_etl_utils/airqo_api.py
(2 hunks)src/workflows/airqo_etl_utils/airqo_gx_utils.py
(3 hunks)src/workflows/airqo_etl_utils/airqo_utils.py
(7 hunks)src/workflows/airqo_etl_utils/config.py
(2 hunks)src/workflows/airqo_etl_utils/data_sources.py
(1 hunks)src/workflows/airqo_etl_utils/data_validator.py
(2 hunks)src/workflows/airqo_etl_utils/thingspeak_api.py
(0 hunks)src/workflows/dags/airqo_measurements.py
(3 hunks)src/workflows/dags/dag_docs.py
(1 hunks)
💤 Files with no reviewable changes (1)
- src/workflows/airqo_etl_utils/thingspeak_api.py
🧰 Additional context used
🪛 Ruff (0.8.0)
src/workflows/airqo_etl_utils/data_validator.py
159-159: Function definition does not bind loop variable mapped_name
(B023)
src/workflows/airqo_etl_utils/airqo_utils.py
23-23: typing.Optional
imported but unused
Remove unused import: typing.Optional
(F401)
436-436: Local variable mapping
is assigned to but never used
Remove assignment to unused variable mapping
(F841)
🔇 Additional comments (7)
src/workflows/airqo_etl_utils/airqo_gx_utils.py (3)
10-10
: LGTM: Explicit pandas import added
The addition of the pandas import is well-placed and necessary for the explicit pd.isna() calls in the digest_validation_results method. This also makes the pandas dependency more visible, which was previously implicit through the DataFrame usage.
382-383
: LGTM: Improved null value handling
The switch from np.isnan
to pd.isna
is a good improvement as it provides more comprehensive null value handling, covering both np.nan
and None
cases.
Also applies to: 388-389
Line range hint 372-391
: Verify validation results handling across networks
Since the PR objectives mention network-specific data cleaning, let's verify that the validation results handling is consistent across different networks.
✅ Verification successful
Let's search for validation and network-related patterns to better understand how validation results are handled across different networks. The previous search might have been too specific, so we're broadening our scope while maintaining focus on the relevant components.
The initial search revealed several relevant files, particularly in the ETL utils. Let's examine the data validator and expectations configuration to understand if there are network-specific validation rules or if the validation is network-agnostic.
Network-specific validation is properly implemented
The validation results handling in the code is consistent and network-aware. Here's why:
- The
DataValidationUtils.remove_outliers
method specifically handles network-specific validation by checkingis_airqo_network = data["network"] == "airqo"
and applies appropriate column mapping for AirQo network data. - The validation expectations in
airqo_gx_expectations.py
are device-category specific (e.g.,gaseous_low_cost_sensor_raw_data_check
andpm2_5_low_cost_sensor_raw_data
) rather than network-specific, ensuring consistent validation across networks for similar device types. - The
VALID_SENSOR_RANGES
dictionary indata_validator.py
defines universal validation ranges that are applied appropriately based on the network context.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for network-specific validation configurations or expectations
rg -l "network.*expect" --type py
# Search for any network-specific data cleaning patterns
ast-grep --pattern 'def $FUNC($ARGS) {
$$$
network
$$$
}'
Length of output: 101
Script:
#!/bin/bash
# Let's try a broader search for validation-related code
rg -l "validation" --type py
# Search for network-specific data handling
rg -l "network.*data" --type py
# Look for files containing both validation and network patterns
rg -l "validation.*network|network.*validation" --type py
# Search for expectation configurations
rg -A 5 "expectation_config" --type py
Length of output: 5164
Script:
#!/bin/bash
# Let's examine the data validator implementation for network-specific handling
cat src/workflows/airqo_etl_utils/data_validator.py
# Check the expectations configuration file
cat src/workflows/airqo_etl_utils/airqo_gx_expectations.py
# Look for network-specific data processing in airqo_utils
rg -A 5 "def.*clean" src/workflows/airqo_etl_utils/airqo_utils.py
Length of output: 18895
src/workflows/airqo_etl_utils/config.py (2)
223-235
: LGTM! Field mapping is complete and well-structured.
The mapping covers all necessary pollutant measurements and their corresponding PI values with a consistent naming convention.
237-245
: LGTM! Verify IQAir API response format.
The mapping structure aligns well with the PR objectives for IQAir integration. The nested structure for PM measurements provides good flexibility.
✅ Verification successful
Field mapping structure verified and matches IQAir API implementation
The field mapping structure is correctly implemented based on the codebase evidence:
- The
iqair
method indata_sources.py
retrieves data from the IQAir API - The mapping is used in
airqo_utils.py
throughAirQoDataUtils.map_and_extract_data
- Field names in
IQAIR_LOW_COST_FIELD_MAPPING
align with the data processing flow
The nested structure for PM measurements ({"key": "pm2_5", "value": "conc"}
) provides the necessary flexibility to handle the API response format, while simpler mappings are used for environmental parameters.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify if the field names match IQAir API response format
# Search for IQAir API response handling or documentation in the codebase
rg -g '!*.{json,md}' -A 5 'iqair.*response'
Length of output: 45
Script:
#!/bin/bash
# Let's try a broader search for IQAir related code and data handling
# Search for IQAir related files
echo "=== Files containing IQAir references ==="
fd -t f . | rg -i "iqair"
# Search for any IQAir related code patterns
echo -e "\n=== IQAir code patterns ==="
rg -i "iqair" -A 5 -B 5
# Look for data transformation or processing related to IQAir
echo -e "\n=== Data processing patterns ==="
ast-grep --pattern 'def $_(self, $$$):
$$$
iqair
$$$'
Length of output: 5744
src/workflows/dags/airqo_measurements.py (2)
Line range hint 134-193
: LGTM! Well-structured DAG implementation.
The DAG effectively implements the historical data processing pipeline with appropriate task dependencies and error handling.
Line range hint 496-500
: Verify the implementation of extract_devices_data_.
The change from extract_devices_data
to extract_devices_data_
aligns with the PR objectives for network-specific data extraction.
✅ Verification successful
Implementation of extract_devices_data_ verified and aligned with network-specific data extraction
The change from extract_devices_data
to extract_devices_data_
is verified as a valid enhancement. The new implementation:
- Adds network-specific data extraction capabilities (airqo and iqair networks)
- Maintains backward compatibility with existing parameters
- Includes proper error handling and logging
- Implements network-specific data mapping and processing
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the implementation and usage of extract_devices_data_
# Search for the implementation
ast-grep --pattern 'def extract_devices_data_($$$)'
# Search for all usages
rg 'extract_devices_data_\(' -A 3
Length of output: 758
Script:
#!/bin/bash
# Let's check for the implementation details of extract_devices_data_ and its predecessor
ast-grep --pattern 'def extract_devices_data_($$$) {
$$$
}'
# Also search for the old method to understand the transition
ast-grep --pattern 'def extract_devices_data($$$) {
$$$
}'
# Let's also check for any other usages of both methods
rg 'extract_devices_data[_]?\(' --type python -A 5
Length of output: 210
Script:
#!/bin/bash
# Let's check for the implementation details of both methods without ast-grep syntax
ast-grep --pattern 'def extract_devices_data_'
# Get the content of the file containing the implementation
rg -l 'def extract_devices_data_' | xargs cat
# Search for all usages of both methods
rg 'extract_devices_data[_]?'
Length of output: 52827
logger.exception( | ||
ValueError( | ||
"Device information must include 'api_code' and 'serial_number'." | ||
) | ||
) | ||
url = f"{base_url}/{device_id}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling when device information is missing
The code is incorrectly using logger.exception
with a ValueError
instance. Instead, log an error message and raise the exception to handle it properly.
Apply this diff to fix the error handling:
if not base_url or not device_id:
- logger.exception(
- ValueError(
- "Device information must include 'api_code' and 'serial_number'."
- )
- )
+ error_msg = "Device information must include 'api_code' and 'serial_number'."
+ logger.error(error_msg)
+ raise ValueError(error_msg)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
logger.exception( | |
ValueError( | |
"Device information must include 'api_code' and 'serial_number'." | |
) | |
) | |
url = f"{base_url}/{device_id}" | |
if not base_url or not device_id: | |
error_msg = "Device information must include 'api_code' and 'serial_number'." | |
logger.error(error_msg) | |
raise ValueError(error_msg) | |
url = f"{base_url}/{device_id}" |
""" | ||
Retrieve data from the IQAir API for a specific device and resolution. | ||
|
||
Args: | ||
device (Dict[str, Any]): A dictionary containing device details, such as: | ||
- api_code (str): The base URL or endpoint for the API. | ||
- serial_number (str): The unique identifier for the device. | ||
resolution (str): The data resolution to retrieve. Options include: | ||
- "current": Real-time data (default). | ||
- "instant": Instantaneous measurements. | ||
- "hourly": Hourly aggregated data. | ||
- "daily": Daily aggregated data. | ||
- "monthly": Monthly aggregated data. | ||
|
||
Returns: | ||
pd.DataFrame: A DataFrame containing the retrieved data, or an empty DataFrame in case of errors or no data. | ||
|
||
Raises: | ||
ValueError: If an invalid resolution is provided or if the response data is invalid or malformed. | ||
requests.exceptions.RequestException: For issues with the HTTP request. | ||
Exception: For any other unexpected errors. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct the docstring return type to match the actual return value
The iqair
method's docstring specifies the return type as pd.DataFrame
, but the method returns a Union[List, Dict]
. Please update the docstring to reflect the correct return type.
Apply this diff to correct the docstring:
@@ -118,8 +118,8 @@
Returns:
- pd.DataFrame: A DataFrame containing the retrieved data, or an empty DataFrame in case of errors or no data.
+ Union[List, Dict]: A list or dictionary containing the retrieved data, or `None` in case of errors or no data.
Raises:
ValueError: If an invalid resolution is provided or if the response data is invalid or malformed.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
""" | |
Retrieve data from the IQAir API for a specific device and resolution. | |
Args: | |
device (Dict[str, Any]): A dictionary containing device details, such as: | |
- api_code (str): The base URL or endpoint for the API. | |
- serial_number (str): The unique identifier for the device. | |
resolution (str): The data resolution to retrieve. Options include: | |
- "current": Real-time data (default). | |
- "instant": Instantaneous measurements. | |
- "hourly": Hourly aggregated data. | |
- "daily": Daily aggregated data. | |
- "monthly": Monthly aggregated data. | |
Returns: | |
pd.DataFrame: A DataFrame containing the retrieved data, or an empty DataFrame in case of errors or no data. | |
Raises: | |
ValueError: If an invalid resolution is provided or if the response data is invalid or malformed. | |
requests.exceptions.RequestException: For issues with the HTTP request. | |
Exception: For any other unexpected errors. | |
""" | |
""" | |
Retrieve data from the IQAir API for a specific device and resolution. | |
Args: | |
device (Dict[str, Any]): A dictionary containing device details, such as: | |
- api_code (str): The base URL or endpoint for the API. | |
- serial_number (str): The unique identifier for the device. | |
resolution (str): The data resolution to retrieve. Options include: | |
- "current": Real-time data (default). | |
- "instant": Instantaneous measurements. | |
- "hourly": Hourly aggregated data. | |
- "daily": Daily aggregated data. | |
- "monthly": Monthly aggregated data. | |
Returns: | |
Union[List, Dict]: A list or dictionary containing the retrieved data, or `None` in case of errors or no data. | |
Raises: | |
ValueError: If an invalid resolution is provided or if the response data is invalid or malformed. | |
requests.exceptions.RequestException: For issues with the HTTP request. | |
Exception: For any other unexpected errors. | |
""" |
""" | ||
Fetch data from a ThingSpeak channel for a specific device within a time range. | ||
|
||
Args: | ||
device_number (int): The ThingSpeak channel ID corresponding to the device. | ||
start_date_time (str): The start timestamp in ISO 8601 format (e.g., "YYYY-MM-DDTHH:mm:ssZ"). | ||
end_date_time (str): The end timestamp in ISO 8601 format. | ||
read_key (str): The API key to authenticate the request for the specified channel. | ||
|
||
Returns: | ||
Optional[Tuple[List[Dict[str, Any]], Dict[str, Any]]]: | ||
- A list of dictionaries containing the channel feeds data. | ||
- A dictionary containing metadata about the channel. | ||
- Returns `None` if no valid data is found or an error occurs. | ||
|
||
Raises: | ||
requests.exceptions.RequestException: For issues with the HTTP request. | ||
ValueError: If the response data is invalid or malformed. | ||
Exception: For any other unexpected errors. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Update docstring to include data_available
in the return value
The thingspeak
method returns a tuple with a bool
indicating data availability, but the docstring does not mention it. Please update the docstring to reflect the correct return type.
Apply this diff to update the docstring:
@@ -59,10 +59,11 @@
Returns:
- Optional[Tuple[List[Dict[str, Any]], Dict[str, Any]]]:
+ Optional[Tuple[List[Dict[str, Any]], Dict[str, Any], bool]]:
- A list of dictionaries containing the channel feeds data.
- A dictionary containing metadata about the channel.
+ - A boolean indicating whether data is available.
- Returns `None` if no valid data is found or an error occurs.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
""" | |
Fetch data from a ThingSpeak channel for a specific device within a time range. | |
Args: | |
device_number (int): The ThingSpeak channel ID corresponding to the device. | |
start_date_time (str): The start timestamp in ISO 8601 format (e.g., "YYYY-MM-DDTHH:mm:ssZ"). | |
end_date_time (str): The end timestamp in ISO 8601 format. | |
read_key (str): The API key to authenticate the request for the specified channel. | |
Returns: | |
Optional[Tuple[List[Dict[str, Any]], Dict[str, Any]]]: | |
- A list of dictionaries containing the channel feeds data. | |
- A dictionary containing metadata about the channel. | |
- Returns `None` if no valid data is found or an error occurs. | |
Raises: | |
requests.exceptions.RequestException: For issues with the HTTP request. | |
ValueError: If the response data is invalid or malformed. | |
Exception: For any other unexpected errors. | |
""" | |
""" | |
Fetch data from a ThingSpeak channel for a specific device within a time range. | |
Args: | |
device_number (int): The ThingSpeak channel ID corresponding to the device. | |
start_date_time (str): The start timestamp in ISO 8601 format (e.g., "YYYY-MM-DDTHH:mm:ssZ"). | |
end_date_time (str): The end timestamp in ISO 8601 format. | |
read_key (str): The API key to authenticate the request for the specified channel. | |
Returns: | |
Optional[Tuple[List[Dict[str, Any]], Dict[str, Any], bool]]: | |
- A list of dictionaries containing the channel feeds data. | |
- A dictionary containing metadata about the channel. | |
- A boolean indicating whether data is available. | |
- Returns `None` if no valid data is found or an error occurs. | |
Raises: | |
requests.exceptions.RequestException: For issues with the HTTP request. | |
ValueError: If the response data is invalid or malformed. | |
Exception: For any other unexpected errors. | |
""" |
"site_id": device.get("site", {}).get("_id", None), | ||
"site_location": device.pop("site", {}).get( | ||
"location_name", None | ||
), | ||
"device_category": device.pop("category", None), | ||
"device_manufacturer": network_name, | ||
**device, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid using pop
when extracting values to prevent unintended side effects
Using device.pop("site", {}).get(...)
modifies the device
dictionary in place, which could lead to unexpected behavior if device
is used elsewhere after this operation. Consider using device.get("site", {}).get(...)
instead to safely access the values without modifying the original dictionary.
Apply this diff to replace pop
with get
:
{
"site_id": device.get("site", {}).get("_id", None),
- "site_location": device.pop("site", {}).get(
+ "site_location": device.get("site", {}).get(
"location_name", None
),
- "device_category": device.pop("category", None),
+ "device_category": device.get("category", None),
"device_manufacturer": network_name,
**device,
}
Similarly, review other instances where pop
is used when extracting values to prevent unintended side effects.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
"site_id": device.get("site", {}).get("_id", None), | |
"site_location": device.pop("site", {}).get( | |
"location_name", None | |
), | |
"device_category": device.pop("category", None), | |
"device_manufacturer": network_name, | |
**device, | |
} | |
"site_id": device.get("site", {}).get("_id", None), | |
"site_location": device.get("site", {}).get( | |
"location_name", None | |
), | |
"device_category": device.get("category", None), | |
"device_manufacturer": network_name, | |
**device, | |
} |
logger.exception(f"An error occured: {e} - device {device['name']}") | ||
continue | ||
if isinstance(data, pd.DataFrame) and data.empty: | ||
logger.exception(f"No data received from {device['name']}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use appropriate logging level instead of logger.exception
When logging that no data is received from a device, use logger.warning
or logger.info
instead of logger.exception
, as no exception has occurred.
Apply this diff to correct the logging level:
if isinstance(data, pd.DataFrame) and data.empty:
- logger.exception(f"No data received from {device['name']}")
+ logger.warning(f"No data received from {device['name']}")
continue
Committable suggestion skipped: line range outside the PR's diff.
lambda x: DataValidationUtils.get_valid_value( | ||
column_name=name, row_value=x | ||
column_name=mapped_name, row_value=x |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix closure over loop variable mapped_name
in lambda function
Using the loop variable mapped_name
inside the lambda function may cause unintended behavior due to late binding. Consider passing mapped_name
as a default argument to the lambda to capture its current value.
Apply this diff to fix the issue:
for col in validated_columns:
is_airqo_network = data["network"] == "airqo"
mapped_name = configuration.AIRQO_DATA_COLUMN_NAME_MAPPING.get(col, None)
data.loc[is_airqo_network, col] = data.loc[is_airqo_network, col].apply(
- lambda x: DataValidationUtils.get_valid_value(
+ lambda x, mapped_name=mapped_name: DataValidationUtils.get_valid_value(
column_name=mapped_name, row_value=x
)
)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
column_name=mapped_name, row_value=x | |
for col in validated_columns: | |
is_airqo_network = data["network"] == "airqo" | |
mapped_name = configuration.AIRQO_DATA_COLUMN_NAME_MAPPING.get(col, None) | |
data.loc[is_airqo_network, col] = data.loc[is_airqo_network, col].apply( | |
lambda x, mapped_name=mapped_name: DataValidationUtils.get_valid_value( | |
column_name=mapped_name, row_value=x | |
) | |
) |
🧰 Tools
🪛 Ruff (0.8.0)
159-159: Function definition does not bind loop variable mapped_name
(B023)
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## staging #3950 +/- ##
===========================================
+ Coverage 11.73% 12.22% +0.48%
===========================================
Files 114 51 -63
Lines 15326 4670 -10656
Branches 318 318
===========================================
- Hits 1799 571 -1228
+ Misses 13527 4099 -9428 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (3)
src/workflows/airqo_etl_utils/data_sources.py (1)
95-97
: Use appropriate logging level for expected scenariosUsing
logger.exception
is typically reserved for logging actual exceptions with their stack traces. When logging that a device doesn't have data within a time range, this is an expected scenario that should uselogger.warning
orlogger.info
.Apply this diff to use the appropriate logging level:
- logger.exception( + logger.warning( f"Device does not have data between {start_date_time} and {end_date_time}" )src/workflows/airqo_etl_utils/airqo_utils.py (2)
23-23
: Address static analysis warningsThe code has some minor issues flagged by static analysis:
- Unused import:
Optional
from typing- Unused variable:
mapping
in the device category handlingApply these diffs to clean up the code:
-from typing import List, Dict, Any, Optional, Union +from typing import List, Dict, Any, Union - mapping = configuration.AIRQO_LOW_COST_GAS_CONFIGAlso applies to: 436-436
🧰 Tools
🪛 Ruff (0.8.0)
23-23:
typing.Optional
imported but unusedRemove unused import:
typing.Optional
(F401)
174-255
: Enhance type hints and documentation in map_and_extract_dataThe method has good structure but could benefit from more detailed type hints and documentation:
- Add return type hint for process_single_entry
- Clarify the structure of data_mapping in docstring
- Add examples for different data formats
Apply this diff to enhance the documentation:
@staticmethod def map_and_extract_data( data_mapping: Dict[str, Union[str, Dict[str, List[str]]]], data: Union[List[Any], Dict[str, Any]], ) -> pd.DataFrame: """ Map and extract specified fields from input data based on a provided mapping and extraction fields. Args: - data_mapping (Dict[str, str]): A dictionary mapping source keys to target keys. + data_mapping (Dict[str, Union[str, Dict[str, List[str]]]]): A dictionary mapping source keys to target keys. Example: {"pm25": "pm2_5", "pm10": "pm10", "tp": "temperature"} + For field8 mapping: {"field8": {"0": "temperature", "1": "humidity"}} data (Dict[str, Any]|): Input data containing raw key-value pairs to map and extract. Example: { "pm25": {"conc": 21, "aqius": 73, "aqicn": 30}, "pm10": {"conc": 37, "aqius": 34, "aqicn": 37}, "pr": 100836, "hm": 28, "tp": 39.7, "ts": "2024-11-24T13:14:40.000Z" } + Or with field8: + { + "field8": "39.7,28", + "field1": "21", + "field2": "37" + } Returns: pd.Series: A pandas Series containing the mapped and extracted data. """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
src/workflows/airqo_etl_utils/airqo_api.py
(2 hunks)src/workflows/airqo_etl_utils/airqo_utils.py
(7 hunks)src/workflows/airqo_etl_utils/data_sources.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.0)
src/workflows/airqo_etl_utils/airqo_utils.py
23-23: typing.Optional
imported but unused
Remove unused import: typing.Optional
(F401)
436-436: Local variable mapping
is assigned to but never used
Remove assignment to unused variable mapping
(F841)
🔇 Additional comments (2)
src/workflows/airqo_etl_utils/data_sources.py (1)
144-146
:
Improve error handling for missing device information
The code uses logger.exception
without an actual exception and doesn't raise an error when required device information is missing. This could lead to silent failures and undefined behavior.
Apply this diff to fix the error handling:
- logger.exception(
- "Device information must include 'api_code' and 'serial_number'."
- )
+ error_msg = "Device information must include 'api_code' and 'serial_number'."
+ logger.error(error_msg)
+ raise ValueError(error_msg)
Likely invalid or redundant comment.
src/workflows/airqo_etl_utils/airqo_api.py (1)
202-202
: Verify the default device manufacturer change
The modification to default the device_manufacturer to "airqo" when network is not specified could affect existing integrations. Ensure this change aligns with the business logic and verify its impact on the system.
Run the following script to check the usage of device_manufacturer:
✅ Verification successful
Default device manufacturer change is consistent with existing patterns
The codebase analysis shows that "airqo" is consistently used as the default network value across multiple components:
- The device model explicitly handles "airqo" network cases (
src/device-registry/models/Device.js
) - Multiple data processing utilities check for "network == 'airqo'" condition (
src/workflows/airqo_etl_utils/data_validator.py
,src/workflows/airqo_etl_utils/airqo_utils.py
) - Device status and uptime services specifically filter for "airqo" network devices
- The
device_manufacturer
field is optional in the device registry validators and follows the same pattern as network handling
The change to default device_manufacturer
to "airqo" when network is not specified aligns with the existing business logic and system-wide patterns.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check how device_manufacturer is used across the codebase
# Search for device_manufacturer references
rg -A 5 "device_manufacturer"
# Search for specific network-related patterns
rg "network.*=.*airqo"
Length of output: 14892
|
||
try: | ||
url = f"{self.THINGSPEAK_CHANNEL_URL}{device_number}/feeds.json?start={start_date_time}&end={end_date_time}&api_key={read_key}" | ||
print(f"{url}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve logging and error handling in the query_data method
Several improvements can be made to enhance the robustness of this method:
- Replace the print statement with proper logging
- Handle specific exceptions separately
- Log meaningful messages when returning empty DataFrame
Apply this diff to implement the improvements:
- print(f"{url}")
+ logger.info(f"Querying data from URL: {url}")
- except Exception as ex:
- logger.exception(f"An error occured: {ex}")
+ except requests.exceptions.RequestException as req_err:
+ logger.error(f"Request error while fetching data: {req_err}")
+ except ValueError as val_err:
+ logger.error(f"Value error while processing data: {val_err}")
+ except Exception as ex:
+ logger.exception(f"Unexpected error occurred: {ex}")
+
+ if data.empty:
+ logger.warning("No data retrieved from the API")
Also applies to: 38-40
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @NicholasTurner23
Description
This PR does the following:
Related Issues
Summary by CodeRabbit
Release Notes
New Features
Improvements
Documentation