Skip to content

Commit

Permalink
stream combined events (over samples) to disk
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-fred committed Oct 16, 2023
1 parent eff6a2b commit 0a26ae5
Showing 1 changed file with 38 additions and 4 deletions.
42 changes: 38 additions & 4 deletions workflow/rules/exposure/wind_fields.smk
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,49 @@ rule concat_wind_fields_over_sample:
run:
import logging

import dask
import xarray as xr

from open_gira.io import netcdf_packing_parameters

logging.basicConfig(format="%(asctime)s %(process)d %(filename)s %(message)s", level=logging.INFO)

logging.info("Reading wind fields from each sample")
all_samples = xr.concat([xr.open_dataset(path) for path in input.sample_paths], dim="event_id")

logging.info("Writing wind fields (all samples pooled) to disk")
all_samples.to_netcdf(output.concat)
# concatenated xarray dataset is chunked by input file
# N.B. this is lazily loaded
all_samples: xr.Dataset = xr.open_mfdataset(
input.sample_paths,
chunks={"max_wind_speed": len(input.sample_paths)},
)

logging.info("Computing packing factors for all samples")
# we used dask to allow for chunked calculation of data min/max and to stream to disk
# use the synchronous scheduler to limit dask to a single process (reducing memory footprint)
scheduler = "synchronous"

# compute packing factors for all data, need global min and max
# the implementation below reads all the data chunks twice, once for min and once for max
# would be nice to avoid this duplication
scale_factor, add_offset, fill_value = netcdf_packing_parameters(
all_samples.max_wind_speed.min().compute(scheduler=scheduler).item(),
all_samples.max_wind_speed.max().compute(scheduler=scheduler).item(),
16
)

logging.info("Writing pooled wind fields to disk")
serialisation_task: dask.delayed.Delayed = all_samples.to_netcdf(
output.concat,
encoding={
"max_wind_speed": {
'dtype': 'int16',
'scale_factor': scale_factor,
'add_offset': add_offset,
'_FillValue': fill_value
}
},
compute=False
)
serialisation_task.compute(scheduler=scheduler)

"""
To test:
Expand Down

0 comments on commit 0a26ae5

Please sign in to comment.