Skip to content

Commit c0cf056

Browse files
authored
Merge pull request #4042 from NicholasTurner23/refactor-fix-update/code-clean-up
Refactor fix update/code clean up
2 parents 741573b + 88f3ffe commit c0cf056

12 files changed

+134
-70
lines changed

src/workflows/airqo_etl_utils/airqo_api.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ def get_cohorts(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
719719
for cohort in response.get("cohorts", [])
720720
]
721721

722-
def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
722+
def get_sites(self, network: str = "all") -> List[Dict[str, Any]]:
723723
"""
724724
Retrieve sites given a tenant.
725725
@@ -766,19 +766,17 @@ def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
766766
},
767767
]
768768
"""
769-
query_params = {"tenant": str(Tenant.AIRQO)}
769+
query_params = {}
770770

771-
if tenant != Tenant.ALL:
772-
query_params["network"] = str(tenant)
771+
if network != "all":
772+
query_params["network"] = network
773773

774774
response = self.__request("devices/sites", query_params)
775775

776776
return [
777777
{
778778
**site,
779779
"site_id": site.get("_id", None),
780-
"tenant": site.get("network", site.get("tenant", None)),
781-
"location": site.get("location", None),
782780
"approximate_latitude": site.get(
783781
"approximate_latitude", site.get("latitude", None)
784782
),

src/workflows/airqo_etl_utils/airqo_utils.py

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,6 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame:
554554
numeric_columns = data.select_dtypes(include=["number"]).columns
555555
numeric_columns = numeric_columns.difference(["device_number"])
556556
data_for_aggregation = data[["timestamp", "device_id"] + list(numeric_columns)]
557-
558557
aggregated = (
559558
data_for_aggregation.groupby("device_id")
560559
.apply(lambda group: group.resample("1H", on="timestamp").mean())
@@ -744,20 +743,19 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
744743
devices = airqo_api.get_devices()
745744

746745
device_lookup = {
747-
device["device_number"]: device
748-
for device in devices
749-
if device.get("device_number")
746+
device["device_id"]: device for device in devices if device.get("device_id")
750747
}
751748

752749
for _, row in data.iterrows():
753750
try:
754751
device_number = row["device_number"]
752+
device_id = row["device_id"]
755753

756754
# Get device details from the lookup dictionary
757-
device_details = device_lookup.get(device_number)
755+
device_details = device_lookup.get(device_id)
758756
if not device_details:
759757
logger.exception(
760-
f"Device number {device_number} not found in device list."
758+
f"Device number {device_id} not found in device list."
761759
)
762760
continue
763761

@@ -766,7 +764,7 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
766764
"device_id": device_details["_id"],
767765
"site_id": row["site_id"],
768766
"device_number": device_number,
769-
"tenant": str(Tenant.AIRQO),
767+
"network": device_details["network"],
770768
"location": {
771769
"latitude": {"value": row["latitude"]},
772770
"longitude": {"value": row["longitude"]},
@@ -832,7 +830,7 @@ def merge_aggregated_weather_data(
832830
airqo_api = AirQoApi()
833831
sites: List[Dict[str, Any]] = []
834832

835-
for site in airqo_api.get_sites(tenant=Tenant.AIRQO):
833+
for site in airqo_api.get_sites(network="airqo"):
836834
sites.extend(
837835
[
838836
{
@@ -894,7 +892,8 @@ def merge_aggregated_weather_data(
894892
numeric_columns = measurements.select_dtypes(include=["number"]).columns
895893
numeric_columns = numeric_columns.difference(["device_number"])
896894
numeric_counts = measurements[numeric_columns].notna().sum(axis=1)
897-
measurements = measurements[numeric_counts >= 1]
895+
# Raws with more than 1 numeric values
896+
measurements = measurements[numeric_counts > 1]
898897
return measurements
899898

900899
@staticmethod
@@ -1012,12 +1011,10 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
10121011

10131012
data["timestamp"] = pd.to_datetime(data["timestamp"])
10141013
sites = AirQoApi().get_sites()
1015-
sites_df = pd.DataFrame(sites, columns=["_id", "city"]).rename(
1016-
columns={"_id": "site_id"}
1017-
)
1014+
sites_df = pd.DataFrame(sites, columns=["site_id", "city"])
1015+
10181016
data = pd.merge(data, sites_df, on="site_id", how="left")
10191017
data.dropna(subset=["device_id", "timestamp"], inplace=True)
1020-
10211018
columns_to_fill = [
10221019
"s1_pm2_5",
10231020
"s1_pm10",
@@ -1027,9 +1024,9 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
10271024
"humidity",
10281025
]
10291026

1030-
data[columns_to_fill] = data[columns_to_fill].fillna(0)
10311027
# TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only?
10321028
# May have to rewrite entire pipeline flow
1029+
data[columns_to_fill] = data[columns_to_fill].fillna(0)
10331030

10341031
# additional input columns for calibration
10351032
data["avg_pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2)
@@ -1052,9 +1049,12 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
10521049
"pm2_5_pm10_mod",
10531050
]
10541051
data[input_variables] = data[input_variables].replace([np.inf, -np.inf], 0)
1055-
data.dropna(subset=input_variables, inplace=True)
10561052

1057-
grouped_df = data.groupby("city", dropna=False)
1053+
# Explicitly filter data to calibrate.
1054+
to_calibrate = data["network"] == "airqo"
1055+
data_to_calibrate = data.loc[to_calibrate]
1056+
data_to_calibrate.dropna(subset=input_variables, inplace=True)
1057+
grouped_df = data_to_calibrate.groupby("city", dropna=False)
10581058

10591059
rf_model = GCSUtils.get_trained_model_from_gcs(
10601060
project_name=project_id,
@@ -1071,6 +1071,8 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
10711071
),
10721072
)
10731073
for city, group in grouped_df:
1074+
# What was the intention of this?
1075+
# If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop.
10741076
if str(city).lower() in [c.value.lower() for c in CityModel]:
10751077
try:
10761078
rf_model = GCSUtils.get_trained_model_from_gcs(
@@ -1087,6 +1089,7 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
10871089
)
10881090
except Exception as ex:
10891091
logger.exception(f"Error getting model: {ex}")
1092+
continue
10901093
group["pm2_5_calibrated_value"] = rf_model.predict(group[input_variables])
10911094
group["pm10_calibrated_value"] = lasso_model.predict(group[input_variables])
10921095

@@ -1100,15 +1103,20 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
11001103
data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
11011104
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)
11021105
if "pm2_5_calibrated_value" in data.columns:
1103-
data["pm2_5"] = data["pm2_5_calibrated_value"]
1106+
data.loc[to_calibrate, "pm2_5"] = data.loc[
1107+
to_calibrate, "pm2_5_calibrated_value"
1108+
]
11041109
else:
1105-
data["pm2_5_calibrated_value"] = None
1106-
data["pm2_5"] = None
1110+
data.loc[to_calibrate, "pm2_5_calibrated_value"] = None
1111+
data.loc[to_calibrate, "pm2_5"] = None
11071112
if "pm10_calibrated_value" in data.columns:
1108-
data["pm10"] = data["pm10_calibrated_value"]
1113+
data.loc[to_calibrate, "pm10"] = data.loc[
1114+
to_calibrate, "pm10_calibrated_value"
1115+
]
11091116
else:
1110-
data["pm10_calibrated_value"] = None
1111-
data["pm10"] = None
1117+
data.loc[to_calibrate, "pm10_calibrated_value"] = None
1118+
data.loc[to_calibrate, "pm10"] = None
1119+
11121120
data["pm2_5"] = data["pm2_5"].fillna(data["pm2_5_raw_value"])
11131121
data["pm10"] = data["pm10"].fillna(data["pm10_raw_value"])
11141122

src/workflows/airqo_etl_utils/data_validator.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ def process_for_big_query(dataframe: pd.DataFrame, table: str) -> pd.DataFrame:
188188
@staticmethod
189189
def process_data_for_message_broker(
190190
data: pd.DataFrame,
191-
topic: str,
192191
caller: str,
192+
topic: str = None,
193193
frequency: Frequency = Frequency.HOURLY,
194194
) -> pd.DataFrame:
195195
"""
@@ -214,16 +214,31 @@ def process_data_for_message_broker(
214214
data.rename(columns={"device_id": "device_name"}, inplace=True)
215215

216216
devices = AirQoDataUtils.get_devices(group_id=caller)
217-
devices = devices[
218-
["device_name", "site_id", "device_latitude", "device_longitude", "network"]
219-
]
220-
221-
data = pd.merge(
222-
left=data,
223-
right=devices,
224-
on=["device_name", "site_id", "network"],
225-
how="left",
226-
)
217+
try:
218+
devices = devices[
219+
[
220+
"device_name",
221+
"site_id",
222+
"device_latitude",
223+
"device_longitude",
224+
"network",
225+
]
226+
]
227+
228+
data = pd.merge(
229+
left=data,
230+
right=devices,
231+
on=["device_name", "site_id", "network"],
232+
how="left",
233+
)
234+
except KeyError as e:
235+
logger.exception(
236+
f"KeyError: The key(s) '{e.args}' are not available in the returned devices data."
237+
)
238+
return None
239+
except Exception as e:
240+
logger.exception(f"An error occured: {e}")
241+
return None
227242
return data
228243

229244
@staticmethod

src/workflows/airqo_etl_utils/data_warehouse_utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,14 @@ def extract_hourly_weather_data(
106106
)
107107

108108
@staticmethod
109-
def extract_sites_meta_data(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
109+
def extract_sites_meta_data(network: str = "all") -> pd.DataFrame:
110110
airqo_api = AirQoApi()
111-
sites = airqo_api.get_sites(tenant=tenant)
111+
sites = airqo_api.get_sites(network=network)
112112
sites = pd.DataFrame(sites)
113113
sites.rename(
114114
columns={
115-
"latitude": "site_latitude",
116-
"longitude": "site_longitude",
115+
"approximate_latitude": "site_latitude",
116+
"approximate_longitude": "site_longitude",
117117
"description": "site_description",
118118
"altitude": "site_altitude",
119119
"name": "site_name",

src/workflows/airqo_etl_utils/meta_data_utils.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ def merge_cohorts_and_devices(data: pd.DataFrame) -> pd.DataFrame:
119119
return pd.DataFrame(merged_data)
120120

121121
@staticmethod
122-
def extract_sites_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
123-
sites = AirQoApi().get_sites(tenant=tenant)
122+
def extract_sites_from_api(network: str = "all") -> pd.DataFrame:
123+
sites = AirQoApi().get_sites(network=network)
124124
dataframe = pd.json_normalize(sites)
125125
dataframe = dataframe[
126126
[
@@ -155,8 +155,8 @@ def extract_sites_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
155155
return dataframe
156156

157157
@staticmethod
158-
def extract_sites_meta_data_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
159-
sites = AirQoApi().get_sites(tenant=tenant)
158+
def extract_sites_meta_data_from_api(network: str = "all") -> pd.DataFrame:
159+
sites = AirQoApi().get_sites(network=network)
160160
dataframe = pd.json_normalize(sites)
161161
big_query_api = BigQueryApi()
162162
cols = big_query_api.get_columns(table=big_query_api.sites_meta_data_table)
@@ -167,15 +167,15 @@ def extract_sites_meta_data_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFram
167167
return dataframe
168168

169169
@staticmethod
170-
def update_nearest_weather_stations(tenant: Tenant) -> None:
170+
def update_nearest_weather_stations(network: str) -> None:
171171
airqo_api = AirQoApi()
172-
sites = airqo_api.get_sites(tenant=tenant)
172+
sites = airqo_api.get_sites(network=network)
173173
sites_data = [
174174
{
175175
"site_id": site.get("site_id", None),
176-
"tenant": site.get("tenant", None),
177-
"latitude": site.get("latitude", None),
178-
"longitude": site.get("longitude", None),
176+
"network": site.get("network", None),
177+
"latitude": site.get("approximate_latitude", None),
178+
"longitude": site.get("approximate_longitude", None),
179179
}
180180
for site in sites
181181
]
@@ -184,24 +184,24 @@ def update_nearest_weather_stations(tenant: Tenant) -> None:
184184
updated_sites = [
185185
{
186186
"site_id": site.get("site_id"),
187-
"tenant": site.get("tenant"),
187+
"network": site.get("network"),
188188
"weather_stations": site.get("weather_stations"),
189189
}
190190
for site in updated_sites
191191
]
192192
airqo_api.update_sites(updated_sites)
193193

194194
@staticmethod
195-
def update_sites_distance_measures(tenant: Tenant) -> None:
195+
def update_sites_distance_measures(network: str) -> None:
196196
airqo_api = AirQoApi()
197-
sites = airqo_api.get_sites(tenant=tenant)
197+
sites = airqo_api.get_sites(network=network)
198198
updated_sites = []
199199
for site in sites:
200200
record = {
201201
"site_id": site.get("site_id", None),
202-
"tenant": site.get("tenant", None),
203-
"latitude": site.get("latitude", None),
204-
"longitude": site.get("longitude", None),
202+
"network": site.get("network", None),
203+
"latitude": site.get("approximate_latitude", None),
204+
"longitude": site.get("approximate_longitude", None),
205205
}
206206
meta_data = airqo_api.get_meta_data(
207207
latitude=record.get("latitude"),
@@ -212,7 +212,7 @@ def update_sites_distance_measures(tenant: Tenant) -> None:
212212
updated_sites.append(
213213
{
214214
**meta_data,
215-
**{"site_id": record["site_id"], "tenant": record["tenant"]},
215+
**{"site_id": record["site_id"], "network": record["network"]},
216216
}
217217
)
218218

src/workflows/dags/airnow.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from airqo_etl_utils.workflows_custom_utils import AirflowUtils
44
from datetime import timedelta
55
from airqo_etl_utils.config import configuration
6+
from airflow.exceptions import AirflowFailException
67

78

89
# Historical Data DAG
@@ -48,9 +49,14 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs):
4849

4950
data = DataValidationUtils.process_data_for_message_broker(
5051
data=data,
51-
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
5252
caller=kwargs["dag"].dag_id + unique_str,
53+
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
5354
)
55+
if not data:
56+
raise AirflowFailException(
57+
"Processing for message broker failed. Please check if kafka is up and running."
58+
)
59+
5460
broker = MessageBrokerUtils()
5561
broker.publish_to_topic(
5662
topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data
@@ -130,9 +136,15 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs):
130136

131137
data = DataValidationUtils.process_data_for_message_broker(
132138
data=data,
133-
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
134139
caller=kwargs["dag"].dag_id + unique_str,
140+
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
135141
)
142+
143+
if not data:
144+
raise AirflowFailException(
145+
"Processing for message broker failed. Please check if kafka is up and running."
146+
)
147+
136148
broker = MessageBrokerUtils()
137149
broker.publish_to_topic(
138150
topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data

src/workflows/dags/airqo_automated_tweets.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def create_forecast_tweets():
1818
def retrieve_sites():
1919
from airqo_etl_utils.airqo_api import AirQoApi
2020

21-
return AirQoApi().get_sites(tenant=Tenant.AIRQO)
21+
return AirQoApi().get_sites(network="airqo")
2222

2323
@task()
2424
def select_forecast_sites(sites):

0 commit comments

Comments
 (0)