Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Excarta WIP processing and analysis #32

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,271 changes: 2,271 additions & 0 deletions notebooks/excarta_temp_proc_analysis.ipynb

Large diffs are not rendered by default.

101 changes: 101 additions & 0 deletions nwp/excarta/excarta_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import argparse
import os
import pathlib
from datetime import datetime

import gcsfs
import numpy as np
import xarray as xr


def _parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("output", type=pathlib.Path, help="Output zarr file")
parser.add_argument("year", type=int, help="Year to process")
parser.add_argument(
"--force",
action="store_true",
help="Overwrite the output file if it already exists.",
)

return parser.parse_args()


def extract_files(args):
# initialize a GCSFileSystem
gcs = gcsfs.GCSFileSystem(project="excarta")
path = f"gs://excarta-public-us/pilots/ocf/{args.year}/"
files = gcs.ls(path)
datasets = []

for file in files:
filename = os.path.basename(file)

# extract date part from filename
date_part = filename.split(".")[0] # adjust this line if necessary

# # convert date_part into a datetime
date = datetime.strptime(date_part, "%Y%m%d%H") # adjust format string if necessary

print(date)
# convert the date to numpy datetime64
date_np = np.datetime64(date)

# load the Zarr store as an xarray Dataset
ds = xr.open_zarr(gcs.get_mapper(file), consolidated=True)
ds = ds.assign_coords(ts=date_np)

# calculate time differences in hours
step_values = (ds["datetimes"].values - date_np) / np.timedelta64(1, "h")
ds = ds.assign_coords(time=step_values)
ds = ds.rename({"time": "step"})

# add 'locidx' to the coordinates
ds = ds.assign_coords(locidx=ds["locidx"])
ds = ds.set_coords(["latitude", "longitude"])

# add to the list of datasets
datasets.append(ds)

return datasets


def merged_zarrs(ds):
ds_merged = xr.concat(ds, dim="ts")
ds_merged = ds_merged.drop_vars("datetimes")

var_names = ds_merged.data_vars
d2 = xr.concat([ds_merged[v] for v in var_names], dim="variable")
d2 = d2.assign_coords(variable=("variable", var_names))
ds_merged = xr.Dataset(dict(value=d2))
ds_merged = ds_merged.sortby("step")
ds_merged = ds_merged.sortby("ts")

ds_merged["step"] = (
"step",
np.array(ds_merged["step"].values, dtype="timedelta64[h]"),
)

return ds_merged


def main():
args = _parse_args()

output_path = f"{args.output}/excarta_{args.year}.zarr"

# if args.output.exists() and not args.force:
# raise RuntimeError(f'Output file "{args.output}" already exist')

datasets = extract_files(args)
print("merging zarrs")
ds_merged = merged_zarrs(datasets)
print("zarrs merged")

ds_merged.to_zarr(output_path)

print("file saved at output_path")


if __name__ == "__main__":
main()
66 changes: 66 additions & 0 deletions nwp/excarta/merge_excarta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# import libs
import argparse
import os
import pathlib

import numpy as np
import xarray as xr


def _parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("input", type=pathlib.Path, help="Path to folder containing files")
parser.add_argument(
"output",
type=pathlib.Path,
help="Output path, include the file name with .zarr ending",
)
return parser.parse_args()


def merge_zarr_files(zarr_path, merged_zarr_path):
# Collect paths of Zarr files in the specified directory
zarr_files = [
os.path.join(zarr_path, file) for file in os.listdir(zarr_path) if file.endswith(".zarr")
]

# Open the datasets and store them in a list
datasets = [xr.open_dataset(file) for file in zarr_files]

# Concatenate the datasets along the 'init_time' dimension
merged_ds = xr.concat(datasets, dim="init_time")

merged_ds = merged_ds.sortby("init_time")

# Define the specific range of x and y coordinates
# x_range = (-10, 2) # Example x coordinate range
# y_range = (49, 59) # Example y coordinate range

# Iterate over the remaining Zarr files and merge them into the initial dataset
# for file in zarr_files[1:]:
# xr.open_zarr(file)
# print(file)

# # ds_filt = ds.sel(x=slice(*x_range), y=slice(*y_range))
# merged_ds = merged_ds.combine_first(ds_filt)

# Rechunk the merged dataset
# merged_ds = merged_ds.chunk(chunks={"init_time": 10, "x": 100, "y": 100})

# Get dims/coords into correct type
step_hours = merged_ds["step"].values
step_timedelta = np.timedelta64(1, "h") * step_hours
ds_timedelta = merged_ds.assign_coords(step=step_timedelta)

# Save the merged dataset as a new Zarr file
ds_timedelta.to_zarr(merged_zarr_path)


def main():
args = _parse_args()
merge_zarr_files(args.input, args.output)


# Check if script is being run directly
if __name__ == "__main__":
main()
144 changes: 144 additions & 0 deletions nwp/excarta/parse_excarta_monthly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Low memory script
import argparse
import os
import pathlib
from datetime import datetime

import pandas as pd
import xarray as xr


def _parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("output", type=pathlib.Path, help="Output zarr file")
parser.add_argument("year", type=int, help="Year to process")
parser.add_argument("month", type=int, help="Month to process")
return parser.parse_args()


def data_loader(folder_path, month_to_process):
"""
Loads and transforms data from CSV files in the given folder_path and directly convert each DataFrame into an xarray Dataset.
Only process files for the month 'YYYYMM' given by month_to_process
"""
month_to_process = datetime.strptime(month_to_process, "%Y%m")
column_names = [
"DateTimeUTC",
"LocationId",
"Latitude",
"Longitude",
"dni",
"dhi",
"ghi",
]
files = os.listdir(folder_path)
datasets = []

for filename in files:
if filename.endswith(".csv") and not filename.startswith("._"):
file_datetime = datetime.strptime(filename[:-4], "%Y%m%d%H")

if (file_datetime.year == month_to_process.year) and (
file_datetime.month == month_to_process.month
):
file_path = os.path.join(folder_path, filename)
df = pd.read_csv(
file_path,
header=None,
names=column_names,
parse_dates=["DateTimeUTC"],
)

df["step"] = (
df["DateTimeUTC"] - file_datetime
).dt.total_seconds() / 3600 # convert timedelta to hours
df["init_time"] = file_datetime

# Convert the dataframe to an xarray Dataset and append to the list
ds = xr.Dataset.from_dataframe(df)
ds = ds.drop_vars(["LocationId", "DateTimeUTC"])
datasets.append(ds)

return datasets


def load_data_from_all_years(parent_folder_path, month_to_process):
all_datasets = []

# Get 'year' part from month_to_process 'YYYYMM' in string format
year_to_process = int(month_to_process[:4])

folder_path = os.path.join(parent_folder_path, str(year_to_process))
datasets = data_loader(folder_path, month_to_process)
all_datasets.extend(datasets)

return all_datasets


def pdtocdf(datasets):
"""
Processes the xarray Datasets and merges them.
"""

datasets = [
ds.set_index(index=["init_time", "step", "Latitude", "Longitude"]) for ds in datasets
]

ds = xr.concat(datasets, dim="index")

# Subtract one hour from the init_time dimension
ds["init_time"] = ds["init_time"] - pd.Timedelta(hours=1)

# # Define the specific range of x and y coordinates to filter the data on
# x_range = (-10, 2) # Example x coordinate range
# y_range = (49, 59) # Example y coordinate range

ds = ds.rename({"Latitude": "y", "Longitude": "x"})

var_names = ds.data_vars
d2 = xr.concat([ds[v] for v in var_names], dim="variable")
d2 = d2.assign_coords(variable=("variable", var_names))
ds = xr.Dataset(dict(value=d2))
ds = ds.sortby("step")
ds = ds.sortby("init_time")

return ds


def main():
args = _parse_args()

if args.output.exists():
raise RuntimeError(f'Output file "{args.output}" already exist')

PATH = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/experimental/Excarta/sr_UK_Malta_full/solar_data"
month_to_process = (
f"{args.year}{args.month:02d}" # combine year and month arguments into the required format
)
datasets = load_data_from_all_years(PATH, month_to_process)
ds = pdtocdf(datasets)

print(ds.dims)
print(ds.coords)

# ds = ds.sel(x=slice(float(-10), float(2)), y=slice(float(49), float(59)))

print(ds)
ds = ds.unstack("index")

# selecting data based on just a sinlge point for Malta,
# TO DO: this would get change to be a slice for the future
ds_filt = ds.sel(x=14, y=36)

print(ds_filt)

file_ending = ".zarr"

# Create output directory name including the year and month to process
output_name = f"{args.output}{args.year}{args.month:02d}{file_ending}"
ds_filt.to_zarr(output_name)


# Check if script is being run directly
if __name__ == "__main__":
main()
Loading
Loading