Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move to production #3994

Merged
merged 20 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1819f05
Update auth service production image tag to prod-1ea1da62-1733249735
github-actions[bot] Dec 3, 2024
ff9d784
Update device registry production image tag to prod-1ea1da62-1733249735
github-actions[bot] Dec 3, 2024
1557b8b
Update workflows prod image tag to prod-1ea1da62-1733249735
github-actions[bot] Dec 3, 2024
ad8bfb6
Update predict production image tag to prod-1ea1da62-1733249735
github-actions[bot] Dec 3, 2024
74c0ef3
Update workflows staging image tag to stage-e06dd73e-1733249810
github-actions[bot] Dec 3, 2024
c415837
Update AirQo exceedance production image tag to prod-03a25cf1-1733249950
github-actions[bot] Dec 3, 2024
209fbca
Update KCCA exceedance production image tag to prod-03a25cf1-1733249950
github-actions[bot] Dec 3, 2024
dc7eea6
Update device registry production image tag to prod-03a25cf1-1733249950
github-actions[bot] Dec 3, 2024
34c38f9
Update workflows prod image tag to prod-03a25cf1-1733249950
github-actions[bot] Dec 3, 2024
4718807
Update spatial production image tag to prod-1ea1da62-1733249735
github-actions[bot] Dec 3, 2024
9a4b4a7
Update bigquery builder to dynamically include networks
NicholasTurner23 Dec 4, 2024
aa2dfeb
Update tenants/networks
NicholasTurner23 Dec 4, 2024
cd0063a
Merge branch 'staging' into update/Integration-iqair-devices
NicholasTurner23 Dec 4, 2024
f43cad7
Merge pull request #3993 from NicholasTurner23/update/Integration-iqa…
Baalmart Dec 4, 2024
f8ee1cf
Add networks to schemas
NicholasTurner23 Dec 4, 2024
50c8a62
Update workflows staging image tag to stage-f43cad79-1733305404
github-actions[bot] Dec 4, 2024
647c05a
Clean up to remove repeated operations
NicholasTurner23 Dec 4, 2024
ba74197
Cleanup datetime conversion
NicholasTurner23 Dec 4, 2024
9b2d45b
Merge branch 'staging' into update/Integration-iqair-devices
NicholasTurner23 Dec 4, 2024
369cafd
Merge pull request #3995 from NicholasTurner23/update/Integration-iqa…
Baalmart Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-da0db8f0-1733248410
tag: prod-1ea1da62-1733249735
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-da0db8f0-1733248410
tag: prod-03a25cf1-1733249950
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-airqo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
tag: prod-1ea1da62-1733249735
tag: prod-03a25cf1-1733249950
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-kcca.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
tag: prod-1ea1da62-1733249735
tag: prod-03a25cf1-1733249950
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-da0db8f0-1733248410
tag: prod-1ea1da62-1733249735
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/spatial/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-spatial-api
tag: prod-da0db8f0-1733248410
tag: prod-1ea1da62-1733249735
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-da0db8f0-1733248410
tag: prod-03a25cf1-1733249950
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis
containers: eu.gcr.io/airqo-250220/airqo-stage-workflows
tag: stage-7f60b036-1733224934
tag: stage-f43cad79-1733305404
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
3 changes: 1 addition & 2 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def get_devices(
device_category: DeviceCategory = DeviceCategory.NONE,
) -> List[Dict[str, Any]]:
"""
Retrieve devices given a tenant and device category.
Retrieve devices given a network and device category.

Args:
- network (str): An Enum that represents site ownership.
Expand Down Expand Up @@ -198,7 +198,6 @@ def get_devices(
"device_category": str(
DeviceCategory.from_str(device.pop("category", None))
),
"network": device.get("network"),
"device_manufacturer": device.get("network", "airqo"),
**device,
}
Expand Down
34 changes: 21 additions & 13 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def extract_uncalibrated_data(start_date_time, end_date_time) -> pd.DataFrame:
null_cols=["pm2_5_calibrated_value"],
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.AIRQO,
network=str(Tenant.AIRQO),
)

return DataValidationUtils.remove_outliers(hourly_uncalibrated_data)
Expand All @@ -79,7 +79,7 @@ def extract_data_from_bigquery(
table=table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.AIRQO,
network=str(Tenant.AIRQO),
)

return DataValidationUtils.remove_outliers(raw_data)
Expand Down Expand Up @@ -117,7 +117,10 @@ def remove_duplicates(data: pd.DataFrame) -> pd.DataFrame:

@staticmethod
def extract_aggregated_raw_data(
start_date_time: str, end_date_time: str, dynamic_query: bool = False
start_date_time: str,
end_date_time: str,
network: str = None,
dynamic_query: bool = False,
) -> pd.DataFrame:
"""
Retrieves raw pm2.5 sensor data from bigquery and computes averages for the numeric columns grouped by device_number, device_id and site_id
Expand All @@ -128,9 +131,7 @@ def extract_aggregated_raw_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=bigquery_api.raw_measurements_table,
network=str(
Tenant.AIRQO
), # TODO Replace tenant implementation with network implementation
network=network,
dynamic_query=dynamic_query,
)

Expand Down Expand Up @@ -757,19 +758,26 @@ def clean_low_cost_sensor_data(
AirQoGxExpectations.from_pandas().pm2_5_low_cost_sensor_raw_data(
data
)

else:
data["timestamp"] = pd.to_datetime(data["timestamp"])
data.dropna(subset=["timestamp"], inplace=True)
data["timestamp"] = pd.to_datetime(data["timestamp"])

data.drop_duplicates(
subset=["timestamp", "device_id"], keep="first", inplace=True
)
# TODO Find an appropriate place to put this
if device_category == DeviceCategory.LOW_COST:
data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)
data["pm10"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)
is_airqo_network = data["network"] == "airqo"

pm2_5_mean = data.loc[is_airqo_network, ["s1_pm2_5", "s2_pm2_5"]].mean(
axis=1
)
pm10_mean = data.loc[is_airqo_network, ["s1_pm10", "s2_pm10"]].mean(axis=1)

data.loc[is_airqo_network, "pm2_5_raw_value"] = pm2_5_mean
data.loc[is_airqo_network, "pm2_5"] = pm2_5_mean
data.loc[is_airqo_network, "pm10_raw_value"] = pm10_mean
data.loc[is_airqo_network, "pm10"] = pm10_mean
return data

@staticmethod
Expand Down Expand Up @@ -1032,7 +1040,7 @@ def merge_aggregated_weather_data(
@staticmethod
def extract_devices_deployment_logs() -> pd.DataFrame:
airqo_api = AirQoApi()
devices = airqo_api.get_devices(tenant=Tenant.AIRQO)
devices = airqo_api.get_devices(network=str(Tenant.AIRQO))
devices_history = pd.DataFrame()
for device in devices:
try:
Expand Down
30 changes: 20 additions & 10 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def compose_query(
table: str,
start_date_time: str,
end_date_time: str,
network: str,
network: str = "all",
where_fields: dict = None,
null_cols: list = None,
columns: list = None,
Expand All @@ -536,17 +536,15 @@ def compose_query(
Exception: If an invalid column is provided in `where_fields` or `null_cols`,
or if the `query_type` is not supported.
"""
tenant = "airqo"

null_cols = [] if null_cols is None else null_cols
where_fields = {} if where_fields is None else where_fields

columns = ", ".join(map(str, columns)) if columns else " * "
where_clause = (
f" timestamp >= '{start_date_time}' and timestamp <= '{end_date_time}' "
)
if tenant != Tenant.ALL:
where_clause = f" {where_clause} and tenant = '{str(tenant)}' or network = '{str(network)}' "
where_clause = f" timestamp between '{start_date_time}' and '{end_date_time}' "

if network:
where_clause += f"AND network = '{network}' "

valid_cols = self.get_columns(table=table)

Expand Down Expand Up @@ -613,7 +611,7 @@ def query_data(
start_date_time: str,
end_date_time: str,
table: str,
network: str,
network: str = None,
dynamic_query: bool = False,
columns: list = None,
where_fields: dict = None,
Expand Down Expand Up @@ -649,7 +647,11 @@ def query_data(
)
else:
query = self.dynamic_averaging_query(
table, start_date_time, end_date_time, time_granularity=time_granularity
table,
start_date_time,
end_date_time,
network=network,
time_granularity=time_granularity,
)

dataframe = self.client.query(query=query).result().to_dataframe()
Expand All @@ -669,6 +671,7 @@ def dynamic_averaging_query(
end_date_time: str,
exclude_columns: list = None,
group_by: list = None,
network: str = "all",
time_granularity: str = "HOUR",
) -> str:
"""
Expand Down Expand Up @@ -728,11 +731,18 @@ def dynamic_averaging_query(
]
)

where_clause: str = (
f"timestamp BETWEEN '{start_date_time}' AND '{end_date_time}' "
)

if network:
where_clause += f"AND network = '{network}' "

# Include time granularity in both SELECT and GROUP BY
timestamp_trunc = f"TIMESTAMP_TRUNC(timestamp, {time_granularity.upper()}) AS {time_granularity.lower()}"
group_by_clause = ", ".join(group_by + [time_granularity.lower()])

query = f"""SELECT {", ".join(group_by)}, {timestamp_trunc}, {avg_columns} FROM `{table}` WHERE timestamp BETWEEN '{start_date_time}' AND '{end_date_time}' GROUP BY {group_by_clause} ORDER BY {time_granularity.lower()};"""
query = f"""SELECT {", ".join(group_by)}, {timestamp_trunc}, {avg_columns} FROM `{table}` WHERE {where_clause} GROUP BY {group_by_clause} ORDER BY {time_granularity.lower()};"""

return query

Expand Down
2 changes: 0 additions & 2 deletions src/workflows/airqo_etl_utils/daily_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def query_hourly_data(start_date_time, end_date_time) -> pd.DataFrame:
table=bigquery_api.hourly_measurements_table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.ALL,
)

return DataValidationUtils.remove_outliers(raw_data)
Expand All @@ -57,7 +56,6 @@ def query_daily_data(start_date_time, end_date_time) -> pd.DataFrame:
table=bigquery_api.daily_measurements_table,
start_date_time=start_date_time,
end_date_time=end_date_time,
tenant=Tenant.ALL,
)

return DataValidationUtils.remove_outliers(raw_data)
Expand Down
6 changes: 2 additions & 4 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ def format_data_types(
data[col] = (
data[col]
.astype(str)
.str.replace(r"[^\w\s\.\-:]", "", regex=True)
.str.replace(r"[^\w\s\.\-+:]", "", regex=True)
.str.replace(r"(?<!\.\d{3})Z$", ".000Z", regex=True)
) # Negative lookbehind to add missing milliseconds if needed
data[col] = pd.to_datetime(data[col], errors="coerce")
data[col] = pd.to_datetime(data[col], errors="coerce", utc=True)

if integers:
for col in integers:
Expand Down Expand Up @@ -142,7 +142,6 @@ def remove_outliers(data: pd.DataFrame) -> pd.DataFrame:
dtype: list(set(columns) & set(data.columns))
for dtype, columns in column_types.items()
}

data = DataValidationUtils.format_data_types(
data=data,
floats=filtered_columns[ColumnDataType.FLOAT],
Expand All @@ -151,7 +150,6 @@ def remove_outliers(data: pd.DataFrame) -> pd.DataFrame:
)

validated_columns = list(chain.from_iterable(filtered_columns.values()))

for col in validated_columns:
is_airqo_network = data["network"] == "airqo"
mapped_name = configuration.AIRQO_DATA_COLUMN_NAME_MAPPING.get(col, None)
Expand Down
3 changes: 0 additions & 3 deletions src/workflows/airqo_etl_utils/data_warehouse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def extract_hourly_bam_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.bam_measurements_table,
tenant=Tenant.ALL,
)

if data.empty:
Expand All @@ -59,7 +58,6 @@ def extract_data_from_big_query(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.consolidated_data_table,
tenant=Tenant.ALL,
)

@staticmethod
Expand All @@ -83,7 +81,6 @@ def extract_hourly_low_cost_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
table=biq_query_api.hourly_measurements_table,
tenant=Tenant.ALL,
)

if data.empty:
Expand Down
Loading
Loading