Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/chunk_storms_by_sample #153

Merged
merged 7 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Diff not rendered.
2 changes: 1 addition & 1 deletion tests/integration/test_estimate_wind_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ def test_estimate_wind_fields():
runner.run_snakemake_test(
"estimate_wind_fields",
(
"results/power/by_country/PRI/storms/IBTrACS/max_wind_field.nc",
"results/power/by_country/PRI/storms/IBTrACS/0/max_wind_field.nc",
)
)
2 changes: 1 addition & 1 deletion workflow/rules/exposure/electricity_grid/intersection.smk
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ rule electricity_grid_damages:
"""
input:
grid_splits = rules.rasterise_electricity_grid.output.geoparquet,
wind_speeds = rules.estimate_wind_fields.output.wind_speeds,
wind_speeds = rules.concat_wind_fields_over_sample.output.concat,
grid_edges = rules.create_power_network.output.edges,
grid_nodes = rules.create_power_network.output.nodes,
output:
Expand Down
123 changes: 110 additions & 13 deletions workflow/rules/exposure/wind_fields.smk
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,29 @@ snakemake -c1 results/power/by_country/CUB/surface_roughness.tiff
"""


rule create_downscaling_factors:
"""
Use surface roughness values calculate factors for scaling from gradient-level to surface-level winds.
"""
input:
surface_roughness=rules.create_surface_roughness_raster.output.surface_roughness,
output:
downscale_factors="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/downscale_factors.npy",
downscale_factors_plot="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/downscale_factors.png",
script:
"../../scripts/intersect/wind_downscaling_factors.py"

"""
To test:
snakemake -c1 results/power/by_country/PRI/storms/downscale_factors.npy
"""


def storm_tracks_by_country(wildcards) -> str:
"""Return path to storm tracks"""
# parent dataset of storm set e.g. IBTrACS for IBTrACS_maria-2017
storm_dataset = wildcards.STORM_SET.split("_")[0]
return f"{wildcards.OUTPUT_DIR}/power/by_country/{wildcards.COUNTRY_ISO_A3}/storms/{storm_dataset}/tracks.geoparquet"
return f"{wildcards.OUTPUT_DIR}/power/by_country/{wildcards.COUNTRY_ISO_A3}/storms/{storm_dataset}/{wildcards.SAMPLE}/tracks.geoparquet"


def read_storm_set(wildcards) -> list[str]:
Expand All @@ -185,24 +203,101 @@ rule estimate_wind_fields:
input:
storm_file=storm_tracks_by_country,
wind_grid="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/wind_grid.tiff",
surface_roughness=rules.create_surface_roughness_raster.output.surface_roughness,
downscaling_factors=rules.create_downscaling_factors.output.downscale_factors,
params:
storm_set=read_storm_set,
# include failure_thresholds as a param (despite not using elsewhere in the
# rule) to trigger re-runs on change to this configuration option
failure_thresholds=config["transmission_windspeed_failure"]
threads: threads_for_country
resources:
# 2GB RAM per CPU
mem_mb = lambda wildcards: threads_for_country(wildcards) * 2_048
output:
# enable or disable plotting in the config file
plot_dir=directory("{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/{STORM_SET}/plots/"),
wind_speeds="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/{STORM_SET}/max_wind_field.nc",
downscale_factors_plot="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/{STORM_SET}/downscale_factors.png",
plot_dir=directory("{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/{STORM_SET}/{SAMPLE}/plots/"),
wind_speeds="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/{STORM_SET}/{SAMPLE}/max_wind_field.nc",
script:
"../../scripts/intersect/estimate_wind_fields.py"

"""
To test:
snakemake -c1 results/power/by_country/PRI/storms/IBTrACS/max_wind_field.nc
snakemake -c1 results/power/by_country/PRI/storms/IBTrACS/0/max_wind_field.nc
"""


def wind_field_paths_all_samples(wildcards) -> list[str]:
"""
Return a list of paths for every sample
"""
dataset_name = wildcards.STORM_SET.split("-")[0]
return expand(
"{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/{STORM_SET}/{SAMPLE}/max_wind_field.nc",
OUTPUT_DIR=wildcards.OUTPUT_DIR,
COUNTRY_ISO_A3=wildcards.COUNTRY_ISO_A3,
STORM_SET=wildcards.STORM_SET,
SAMPLE=range(0, SAMPLES_PER_TRACKSET[dataset_name])
)


rule concat_wind_fields_over_sample:
"""
Take wind fields expanded over sample and stack them into a single netCDF.
"""
input:
sample_paths=wind_field_paths_all_samples
output:
concat="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/{STORM_SET}/max_wind_field.nc",
run:
import logging

import dask
import xarray as xr

from open_gira.io import netcdf_packing_parameters

logging.basicConfig(format="%(asctime)s %(process)d %(filename)s %(message)s", level=logging.INFO)

logging.info("Reading wind fields from each sample")
# concatenated xarray dataset is chunked by input file
# N.B. this is lazily loaded
all_samples: xr.Dataset = xr.open_mfdataset(
input.sample_paths,
chunks={"max_wind_speed": len(input.sample_paths)},
)

logging.info("Computing packing factors for all samples")
# we used dask to allow for chunked calculation of data min/max and to stream to disk
# use the synchronous scheduler to limit dask to a single process (reducing memory footprint)
scheduler = "synchronous"

# compute packing factors for all data, need global min and max
# the implementation below reads all the data chunks twice, once for min and once for max
# would be nice to avoid this duplication
scale_factor, add_offset, fill_value = netcdf_packing_parameters(
all_samples.max_wind_speed.min().compute(scheduler=scheduler).item(),
all_samples.max_wind_speed.max().compute(scheduler=scheduler).item(),
16
)

logging.info("Writing pooled wind fields to disk")
serialisation_task: dask.delayed.Delayed = all_samples.to_netcdf(
output.concat,
encoding={
"max_wind_speed": {
'dtype': 'int16',
'scale_factor': scale_factor,
'add_offset': add_offset,
'_FillValue': fill_value
}
},
compute=False
)
serialisation_task.compute(scheduler=scheduler)

"""
To test:
snakemake -c1 results/power/by_country/PRI/storms/STORM-constant/max_wind_field.nc
"""


Expand All @@ -216,9 +311,10 @@ def wind_fields_by_country_for_storm(wildcards):
country_set_by_storm = cached_json_file_read(json_file)

return expand(
"results/power/by_country/{COUNTRY_ISO_A3}/storms/{STORM_SET}/max_wind_field.nc",
"results/power/by_country/{COUNTRY_ISO_A3}/storms/{STORM_SET}/{SAMPLE}/max_wind_field.nc",
COUNTRY_ISO_A3=country_set_by_storm[wildcards.STORM_ID], # list of str
STORM_SET=wildcards.STORM_SET, # str
SAMPLE=wildcards.SAMPLE # str
)


Expand All @@ -229,7 +325,7 @@ rule merge_wind_fields_of_storm:
input:
wind_fields = wind_fields_by_country_for_storm
output:
merged = "{OUTPUT_DIR}/power/by_storm_set/{STORM_SET}/by_storm/{STORM_ID}/wind_field.nc",
merged = "{OUTPUT_DIR}/power/by_storm_set/{STORM_SET}/{SAMPLE}/by_storm/{STORM_ID}/wind_field.nc",
run:
import logging
import os
Expand Down Expand Up @@ -298,7 +394,7 @@ rule merge_wind_fields_of_storm:

"""
Test with:
snakemake -c1 results/power/by_storm_set/IBTrACS/by_storm/2017260N12310/wind_field.nc
snakemake -c1 results/power/by_storm_set/IBTrACS/0/by_storm/2017260N12310/wind_field.nc
"""


Expand All @@ -315,9 +411,10 @@ def merged_wind_fields_for_all_storms_in_storm_set(wildcards):
storms = list(country_set_by_storm.keys())

return expand(
"results/power/by_storm_set/{STORM_SET}/by_storm/{STORM_ID}/wind_field.nc",
"results/power/by_storm_set/{STORM_SET}/{SAMPLE}/by_storm/{STORM_ID}/wind_field.nc",
STORM_SET=wildcards.STORM_SET, # str
STORM_ID=storms # list of str
STORM_ID=storms, # list of str
SAMPLE=wildcards.SAMPLE # str
)


Expand All @@ -329,7 +426,7 @@ rule merged_wind_fields_for_storm_set:
input:
wind_fields = merged_wind_fields_for_all_storms_in_storm_set
output:
completion_flag = "{OUTPUT_DIR}/power/by_storm_set/{STORM_SET}/wind_fields.txt"
completion_flag = "{OUTPUT_DIR}/power/by_storm_set/{STORM_SET}/{SAMPLE}/wind_fields.txt"
shell:
"""
# one output file per line
Expand All @@ -338,5 +435,5 @@ rule merged_wind_fields_for_storm_set:

"""
Test with:
snakemake -c1 -- results/power/by_storm_set/IBTrACS/wind_fields.txt
snakemake -c1 -- results/power/by_storm_set/IBTrACS/0/wind_fields.txt
"""
8 changes: 4 additions & 4 deletions workflow/rules/preprocess/IBTrACS.smk
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ rule parse_ibtracs:
input:
ibtracs_csv = "results/input/IBTrACS/raw/v4.csv"
output:
ibtracs_parquet = "results/storm_tracks/IBTrACS/tracks.geoparquet"
ibtracs_parquet = "results/storm_tracks/IBTrACS/0/tracks.geoparquet"
script:
"../../scripts/preprocess/parse_IBTrACS.py"

"""
To test:
snakemake -c1 results/storm_tracks/IBTrACS/tracks.geoparquet
snakemake -c1 results/storm_tracks/IBTrACS/0/tracks.geoparquet
"""


Expand All @@ -17,11 +17,11 @@ rule slice_ibtracs:
global_tracks=rules.parse_ibtracs.output.ibtracs_parquet,
grid_hull="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/network/convex_hull.json"
output:
sliced_tracks="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/IBTrACS/tracks.geoparquet",
sliced_tracks="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/IBTrACS/0/tracks.geoparquet",
script:
"../../scripts/preprocess/slice_storm_tracks.py"

"""
To test:
snakemake -c1 results/power/by_country/PRI/storms/IBTrACS/tracks.geoparquet
snakemake -c1 results/power/by_country/PRI/storms/IBTrACS/0/tracks.geoparquet
"""
8 changes: 4 additions & 4 deletions workflow/rules/preprocess/IRIS.smk
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ rule parse_IRIS:
input:
csv_dir="{OUTPUT_DIR}/input/IRIS/iris-data/event_sets/{IRIS_SCENARIO}/"
output:
parquet="{OUTPUT_DIR}/storm_tracks/IRIS-{IRIS_SCENARIO}/tracks.geoparquet"
parquet="{OUTPUT_DIR}/storm_tracks/IRIS-{IRIS_SCENARIO}/{SAMPLE}/tracks.geoparquet"
script:
"../../scripts/preprocess/parse_IRIS.py"

"""
Test with:
snakemake -c1 results/storm_tracks/IRIS_SSP1-2050/tracks.geoparquet
snakemake -c1 results/storm_tracks/IRIS_SSP1-2050/0/tracks.geoparquet
"""


Expand All @@ -17,13 +17,13 @@ rule slice_IRIS:
global_tracks=rules.parse_IRIS.output.parquet,
grid_hull="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/network/convex_hull.json"
output:
sliced_tracks="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/IRIS-{IRIS_SCENARIO}/tracks.geoparquet",
sliced_tracks="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/IRIS-{IRIS_SCENARIO}/{SAMPLE}/tracks.geoparquet",
resources:
mem_mb=10000 # the global tracks file is fairly chunky
script:
"../../scripts/preprocess/slice_storm_tracks.py"

"""
To test:
snakemake -c1 results/power/by_country/PRI/storms/IRIS-PRESENT/tracks.geoparquet
snakemake -c1 results/power/by_country/PRI/storms/IRIS-PRESENT/0/tracks.geoparquet
"""
12 changes: 6 additions & 6 deletions workflow/rules/preprocess/STORM.smk
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@ rule parse_storm:
input:
csv_dir="{OUTPUT_DIR}/input/STORM/events/{STORM_MODEL}/raw"
output:
parquet="{OUTPUT_DIR}/storm_tracks/STORM-{STORM_MODEL}/tracks.geoparquet"
parquet="{OUTPUT_DIR}/storm_tracks/STORM-{STORM_MODEL}/{SAMPLE}/tracks.geoparquet"
script:
"../../scripts/preprocess/parse_STORM.py"

"""
Test with:
snakemake -c1 results/storm_tracks/STORM-constant/tracks.geoparquet
snakemake -c1 results/storm_tracks/STORM-constant/0/tracks.geoparquet
"""


rule slice_storm:
input:
global_tracks=rules.parse_storm.output.parquet,
global_tracks="{OUTPUT_DIR}/storm_tracks/STORM-{STORM_MODEL}/{SAMPLE}/tracks.geoparquet",
grid_hull="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/network/convex_hull.json"
output:
sliced_tracks="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/STORM-{STORM_MODEL}/tracks.geoparquet",
sliced_tracks="{OUTPUT_DIR}/power/by_country/{COUNTRY_ISO_A3}/storms/STORM-{STORM_MODEL}/{SAMPLE}/tracks.geoparquet",
resources:
mem_mb=10000 # the global tracks file is fairly chunky
script:
"../../scripts/preprocess/slice_storm_tracks.py"

"""
To test:
snakemake -c1 results/power/by_country/PRI/storms/STORM-constant/tracks.geoparquet
"""
snakemake -c1 results/power/by_country/PRI/storms/STORM-constant/0/tracks.geoparquet
"""
2 changes: 1 addition & 1 deletion workflow/rules/preprocess/create_network.smk
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def threads_for_country(wildcards) -> int:

ranked["threads"] = logistic_min(
ranked.index, # input to transform
workflow.cores, # maximum (roughly)
0.8 * workflow.cores, # maximum (roughly)
-2, # minimum (roughly)
0.02, # steepness of sigmoid
0.8 * len(ranked.index) # location of sigmoid centre on input axis
Expand Down
44 changes: 39 additions & 5 deletions workflow/rules/preprocess/join_data.smk
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Take split .geoparquet files and output a single, unified .geoparquet file
# for each dataset-hazard combination


rule join_data:
"""
Take split .geoparquet files and output a single, unified .geoparquet file
for each dataset-hazard combination
"""
input:
lambda wildcards: expand(
os.path.join(
Expand All @@ -20,8 +20,42 @@ rule join_data:
script:
"../../scripts/join_data.py"


"""
Test with:
snakemake --cores all results/tanzania-mini_filter-road_hazard-aqueduct-river.geoparquet
"""


def all_storm_tracks_by_sample(wildcards) -> list[str]:
"""
Return a list of every per-sample tracks file for a given STORM_SET.
"""
dataset_name = wildcards.STORM_SET.split("-")[0]
return expand(
"{OUTPUT_DIR}/storm_tracks/{STORM_SET}/{SAMPLE}/tracks.geoparquet",
OUTPUT_DIR=wildcards.OUTPUT_DIR,
STORM_SET=wildcards.STORM_SET,
SAMPLE=range(0, SAMPLES_PER_TRACKSET[dataset_name])
)


rule concat_storm_tracks:
"""
To concat all samples of tracks together, useful when we want to e.g. plot track atlas.
"""
input:
by_sample=all_storm_tracks_by_sample
output:
tracks_from_all_samples="{OUTPUT_DIR}/storm_tracks/{STORM_SET}/tracks.geoparquet",
run:
import geopandas as gpd
import pandas as pd

data = [gpd.read_parquet(path) for path in input.by_sample]
concat = gpd.GeoDataFrame(pd.concat(data))
concat.to_parquet(output.tracks_from_all_samples)

"""
Test with:
snakemake -c1 -- results/storm_tracks/STORM-constant/tracks.geoparquet
"""
7 changes: 7 additions & 0 deletions workflow/rules/storm_workflow_global_variables.smk
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ def country_codes() -> List[str]:
# list of ISO A3 country codes
COUNTRY_CODES = country_codes()

# how many samples is each storm track dataset split into?
SAMPLES_PER_TRACKSET = {
"IBTrACS": 1,
"STORM": 10,
"IRIS": 10,
}

STORM_RPS = (
list(range(10, 100, 10))
+ list(range(100, 1000, 100))
Expand Down
Loading
Loading