From d4292bb87b53d121a8b80f9e3f07f86cf3476822 Mon Sep 17 00:00:00 2001 From: jshipton Date: Thu, 25 Apr 2024 17:44:31 +0100 Subject: [PATCH 01/10] works towards getting io working with ensemble parallelism --- gusto/configuration.py | 2 +- gusto/io.py | 23 ++++++++++++++++++----- gusto/timeloop.py | 10 ++++++---- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/gusto/configuration.py b/gusto/configuration.py index 5aebf5a33..b0de48cb5 100644 --- a/gusto/configuration.py +++ b/gusto/configuration.py @@ -61,7 +61,7 @@ def __setattr__(self, name, value): When attributes are provided as floats or integers, these are converted to Firedrake :class:`Constant` objects, other than a handful of special - integers (dumpfreq, pddumpfreq, chkptfreq and log_level). + integers (dumpfreq, pddumpfreq and chkptfreq). Args: name: the attribute's name. diff --git a/gusto/io.py b/gusto/io.py index 40352085f..78d7e5b64 100644 --- a/gusto/io.py +++ b/gusto/io.py @@ -354,7 +354,7 @@ def setup_diagnostics(self, state_fields): if fname in state_fields.to_dump: self.diagnostics.register(fname) - def setup_dump(self, state_fields, t, pick_up=False): + def setup_dump(self, state_fields, t, ensemble, pick_up=False): """ Sets up a series of things used for outputting. @@ -377,6 +377,16 @@ def setup_dump(self, state_fields, t, pick_up=False): raise_parallel_exception = 0 error = None + if ensemble is not None: + ens_comm = ensemble.ensemble_comm + comm = ensemble.comm + create_dir = ens_comm.rank + comm.rank == 0 + create_files = ens_comm.rank == 0 + else: + comm = self.mesh.comm + create_dir = comm.Get_rank() == 0 + create_files = True + if any([self.output.dump_vtus, self.output.dump_nc, self.output.dumplist_latlon, self.output.dump_diagnostics, self.output.point_data, self.output.checkpoint and not pick_up]): @@ -385,7 +395,7 @@ def setup_dump(self, state_fields, t, pick_up=False): running_tests = '--running-tests' in sys.argv or "pytest" in self.output.dirname # Raising exceptions needs to be done in parallel - if self.mesh.comm.Get_rank() == 0: + if create_dir == 0: # Create results directory if it doesn't already exist if not path.exists(self.dumpdir): try: @@ -400,7 +410,10 @@ def setup_dump(self, state_fields, t, pick_up=False): # Gather errors from each rank and raise appropriate error everywhere # This allreduce also ensures that all ranks are in sync wrt the results dir - raise_exception = self.mesh.comm.allreduce(raise_parallel_exception, op=MPI.MAX) + raise_exception = comm.allreduce(raise_parallel_exception, op=MPI.MAX) + if ensemble is not None: + raise_exception = ens_comm.allreduce(raise_parallel_exception, op=MPI.MAX) + if raise_exception == 1: raise GustoIOError(f'results directory {self.dumpdir} already exists') elif raise_exception == 2: @@ -421,12 +434,12 @@ def setup_dump(self, state_fields, t, pick_up=False): if pick_up: next(self.dumpcount) - if self.output.dump_vtus: + if self.output.dump_vtus and create_files: # setup pvd output file outfile_pvd = path.join(self.dumpdir, "field_output.pvd") self.pvd_dumpfile = VTKFile( outfile_pvd, project_output=self.output.project_fields, - comm=self.mesh.comm) + comm=comm) if self.output.dump_nc: self.nc_filename = path.join(self.dumpdir, "field_output.nc") diff --git a/gusto/timeloop.py b/gusto/timeloop.py index 717b9002e..6a3a89af1 100644 --- a/gusto/timeloop.py +++ b/gusto/timeloop.py @@ -24,7 +24,7 @@ class BaseTimestepper(object, metaclass=ABCMeta): """Base class for timesteppers.""" - def __init__(self, equation, io): + def __init__(self, equation, io, ensemble=None): """ Args: equation (:class:`PrognosticEquation`): the prognostic equation. @@ -33,6 +33,7 @@ def __init__(self, equation, io): self.equation = equation self.io = io + self.ensemble = ensemble self.dt = self.equation.domain.dt self.t = self.equation.domain.t self.reference_profiles_initialised = False @@ -177,7 +178,7 @@ def run(self, t, tmax, pick_up=False): # Set up dump, which may also include an initial dump with timed_stage("Dump output"): - self.io.setup_dump(self.fields, t, pick_up) + self.io.setup_dump(self.fields, t, self.ensemble, pick_up) self.t.assign(t) @@ -249,7 +250,7 @@ class Timestepper(BaseTimestepper): """ def __init__(self, equation, scheme, io, spatial_methods=None, - physics_parametrisations=None): + physics_parametrisations=None, ensemble=None): """ Args: equation (:class:`PrognosticEquation`): the prognostic equation @@ -284,7 +285,7 @@ def __init__(self, equation, scheme, io, spatial_methods=None, else: self.physics_parametrisations = [] - super().__init__(equation=equation, io=io) + super().__init__(equation=equation, io=io, ensemble=ensemble) @property def transporting_velocity(self): @@ -716,6 +717,7 @@ def timestep(self): with timed_stage("Apply forcing terms"): logger.info('SIQN: Explicit forcing') + # Put explicit forcing into xstar self.forcing.apply(x_after_slow, xn, xstar(self.field_name), "explicit") From 67effde2863423b6a3e45f2e6ab7d250c308ad5f Mon Sep 17 00:00:00 2001 From: jshipton Date: Fri, 26 Apr 2024 15:38:13 +0100 Subject: [PATCH 02/10] some changes to io for ensemble parallelism --- gusto/io.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/gusto/io.py b/gusto/io.py index 78d7e5b64..72258e8b2 100644 --- a/gusto/io.py +++ b/gusto/io.py @@ -140,7 +140,7 @@ def dump(self, field_creator, t): class DiagnosticsOutput(object): """Object for outputting global diagnostic data.""" - def __init__(self, filename, diagnostics, description, comm, create=True): + def __init__(self, filename, diagnostics, description, ensemble, create=True): """ Args: filename (str): name of file to output to. @@ -154,10 +154,10 @@ def __init__(self, filename, diagnostics, description, comm, create=True): """ self.filename = filename self.diagnostics = diagnostics - self.comm = comm + self.ensemble = ensemble if not create: return - if self.comm.rank == 0: + if ensemble.ensemble_comm.rank == 0 and ensemble.comm.rank == 0: with Dataset(filename, "w") as dataset: dataset.description = "Diagnostics data for simulation {desc}".format(desc=description) dataset.history = "Created {t}".format(t=time.ctime()) @@ -185,7 +185,7 @@ def dump(self, state_fields, t): diagnostic = getattr(self.diagnostics, dname) diagnostics.append((fname, dname, diagnostic(field))) - if self.comm.rank == 0: + if self.ensemble.ensemble_comm.rank == 0 and self.ensemble.comm.rank == 0: with Dataset(self.filename, "a") as dataset: idx = dataset.dimensions["time"].size dataset.variables["time"][idx:idx + 1] = t @@ -386,6 +386,7 @@ def setup_dump(self, state_fields, t, ensemble, pick_up=False): comm = self.mesh.comm create_dir = comm.Get_rank() == 0 create_files = True + self.ensemble = ensemble if any([self.output.dump_vtus, self.output.dump_nc, self.output.dumplist_latlon, self.output.dump_diagnostics, @@ -395,7 +396,7 @@ def setup_dump(self, state_fields, t, ensemble, pick_up=False): running_tests = '--running-tests' in sys.argv or "pytest" in self.output.dirname # Raising exceptions needs to be done in parallel - if create_dir == 0: + if create_dir: # Create results directory if it doesn't already exist if not path.exists(self.dumpdir): try: @@ -412,7 +413,7 @@ def setup_dump(self, state_fields, t, ensemble, pick_up=False): # This allreduce also ensures that all ranks are in sync wrt the results dir raise_exception = comm.allreduce(raise_parallel_exception, op=MPI.MAX) if ensemble is not None: - raise_exception = ens_comm.allreduce(raise_parallel_exception, op=MPI.MAX) + raise_exception = ens_comm.allreduce(raise_exception, op=MPI.MAX) if raise_exception == 1: raise GustoIOError(f'results directory {self.dumpdir} already exists') @@ -466,10 +467,11 @@ def setup_dump(self, state_fields, t, ensemble, pick_up=False): # setup the latlon coordinate mesh and make output file if len(self.output.dumplist_latlon) > 0: mesh_ll = get_flat_latlon_mesh(self.mesh) - outfile_ll = path.join(self.dumpdir, "field_output_latlon.pvd") - self.dumpfile_ll = VTKFile(outfile_ll, - project_output=self.output.project_fields, - comm=self.mesh.comm) + if create_files: + outfile_ll = path.join(self.dumpdir, "field_output_latlon.pvd") + self.dumpfile_ll = VTKFile(outfile_ll, + project_output=self.output.project_fields, + comm=comm) # make functions on latlon mesh, as specified by dumplist_latlon self.to_dump_latlon = [] @@ -485,11 +487,11 @@ def setup_dump(self, state_fields, t, ensemble, pick_up=False): # already exist, in which case we just need the filenames if self.output.dump_diagnostics: diagnostics_filename = self.dumpdir+"/diagnostics.nc" - to_create = not (path.isfile(diagnostics_filename) and pick_up) + to_create = not (path.isfile(diagnostics_filename) and pick_up) and create_files self.diagnostic_output = DiagnosticsOutput(diagnostics_filename, self.diagnostics, self.output.dirname, - self.mesh.comm, + ensemble, create=to_create) # if picking-up, don't do initial dump From d239a618d1c5356c160c3343005818b0503793db Mon Sep 17 00:00:00 2001 From: jshipton Date: Fri, 26 Apr 2024 16:41:34 +0100 Subject: [PATCH 03/10] attempt to test parallel io --- integration-tests/model/test_parallel_io.py | 36 +++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 integration-tests/model/test_parallel_io.py diff --git a/integration-tests/model/test_parallel_io.py b/integration-tests/model/test_parallel_io.py new file mode 100644 index 000000000..27a48447f --- /dev/null +++ b/integration-tests/model/test_parallel_io.py @@ -0,0 +1,36 @@ +from firedrake import Ensemble, COMM_WORLD, PeriodicUnitSquareMesh +from gusto import * +import pytest + + +@pytest.mark.parallel(nprocs=4) +@pytest.mark.parametrize("spatial_parallelism", [True, False]) +def test_parallel_io(tmpdir): + + if spatial_parallelism: + ensemble = Ensemble(COMM_WORLD, 2) + else: + ensemble = Ensemble(COMM_WORLD, 1) + + mesh = PeriodicUnitSquareMesh(10, comm=ensemble.comm) + dt = 0.1 + domain = Domain(mesh, dt, "BDM", 1) + + # Equation + diffusion_params = DiffusionParameters(kappa=0.75, mu=5) + V = domain.spaces("DG") + + equation = AdvectionDiffusionEquation(domain, V, "f", + diffusion_parameters=diffusion_params) + spatial_methods = [DGUpwind(equation, "f"), + InteriorPenaltyDiffusion(equation, "f", diffusion_params)] + + # I/O + output = OutputParameters(dirname=str(tmpdir)) + io = IO(domain, output) + + # Time stepper + stepper = Timestepper(equation, SSPRK3(domain), io, spatial_methods, + ensemble=ensemble) + + stepper.run(0, 3*dt) From e9600257ef3ae8988cf495bfab2cbc27244fd411 Mon Sep 17 00:00:00 2001 From: jshipton Date: Fri, 26 Apr 2024 16:53:02 +0100 Subject: [PATCH 04/10] minor doc fix --- gusto/equations.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gusto/equations.py b/gusto/equations.py index c07aed867..52e8db012 100644 --- a/gusto/equations.py +++ b/gusto/equations.py @@ -172,7 +172,8 @@ def __init__(self, domain, function_space, field_name, Vu=None, equation's prognostic is defined on. field_name (str): name of the prognostic field. Vu (:class:`FunctionSpace`, optional): the function space for the - velocity field. If this is Defaults to None. + velocity field. Defaults to None in which case use the + HDiv space. diffusion_parameters (:class:`DiffusionParameters`, optional): parameters describing the diffusion to be applied. """ From 80f903bac8bf765cccc360855abb6d05f3adeb7d Mon Sep 17 00:00:00 2001 From: jshipton Date: Fri, 26 Apr 2024 16:56:35 +0100 Subject: [PATCH 05/10] minor test fix --- integration-tests/model/test_parallel_io.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/model/test_parallel_io.py b/integration-tests/model/test_parallel_io.py index 27a48447f..17a29033d 100644 --- a/integration-tests/model/test_parallel_io.py +++ b/integration-tests/model/test_parallel_io.py @@ -5,14 +5,14 @@ @pytest.mark.parallel(nprocs=4) @pytest.mark.parametrize("spatial_parallelism", [True, False]) -def test_parallel_io(tmpdir): +def test_parallel_io(tmpdir, spatial_parallelism): if spatial_parallelism: ensemble = Ensemble(COMM_WORLD, 2) else: ensemble = Ensemble(COMM_WORLD, 1) - mesh = PeriodicUnitSquareMesh(10, comm=ensemble.comm) + mesh = PeriodicUnitSquareMesh(10, 10, comm=ensemble.comm) dt = 0.1 domain = Domain(mesh, dt, "BDM", 1) From c035ddc9356c1205685516a110cacc7f5628db41 Mon Sep 17 00:00:00 2001 From: jshipton Date: Fri, 26 Apr 2024 17:00:11 +0100 Subject: [PATCH 06/10] easier to test with shallow water --- integration-tests/model/test_parallel_io.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/integration-tests/model/test_parallel_io.py b/integration-tests/model/test_parallel_io.py index 17a29033d..85b3dabb0 100644 --- a/integration-tests/model/test_parallel_io.py +++ b/integration-tests/model/test_parallel_io.py @@ -17,19 +17,15 @@ def test_parallel_io(tmpdir, spatial_parallelism): domain = Domain(mesh, dt, "BDM", 1) # Equation - diffusion_params = DiffusionParameters(kappa=0.75, mu=5) - V = domain.spaces("DG") - - equation = AdvectionDiffusionEquation(domain, V, "f", - diffusion_parameters=diffusion_params) - spatial_methods = [DGUpwind(equation, "f"), - InteriorPenaltyDiffusion(equation, "f", diffusion_params)] + parameters = ShallowWaterParameters(H=100) + equation = ShallowWaterEquations(domain, parameters) # I/O output = OutputParameters(dirname=str(tmpdir)) io = IO(domain, output) # Time stepper + spatial_methods = [DGUpwind(equation, "u"), DGUpwind(equation, "D")] stepper = Timestepper(equation, SSPRK3(domain), io, spatial_methods, ensemble=ensemble) From 5d94b95ffd38ea945e57972b59b2e5f1a3dceace Mon Sep 17 00:00:00 2001 From: jshipton Date: Fri, 26 Apr 2024 20:16:50 +0100 Subject: [PATCH 07/10] oops, forgot not everyone wants space-time parallelism --- gusto/io.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/gusto/io.py b/gusto/io.py index 72258e8b2..dac67ae80 100644 --- a/gusto/io.py +++ b/gusto/io.py @@ -140,7 +140,8 @@ def dump(self, field_creator, t): class DiagnosticsOutput(object): """Object for outputting global diagnostic data.""" - def __init__(self, filename, diagnostics, description, ensemble, create=True): + def __init__(self, filename, diagnostics, description, comm, + ensemble_comm=None, create=True): """ Args: filename (str): name of file to output to. @@ -154,10 +155,15 @@ def __init__(self, filename, diagnostics, description, ensemble, create=True): """ self.filename = filename self.diagnostics = diagnostics - self.ensemble = ensemble + self.comm = comm + self.ensemble_comm = ensemble_comm if not create: return - if ensemble.ensemble_comm.rank == 0 and ensemble.comm.rank == 0: + if ensemble_comm is not None: + self.create = ensemble_comm.rank == 0 and comm.rank == 0 + else: + self.create = comm.rank == 0 + if self.create: with Dataset(filename, "w") as dataset: dataset.description = "Diagnostics data for simulation {desc}".format(desc=description) dataset.history = "Created {t}".format(t=time.ctime()) @@ -185,7 +191,7 @@ def dump(self, state_fields, t): diagnostic = getattr(self.diagnostics, dname) diagnostics.append((fname, dname, diagnostic(field))) - if self.ensemble.ensemble_comm.rank == 0 and self.ensemble.comm.rank == 0: + if self.create: with Dataset(self.filename, "a") as dataset: idx = dataset.dimensions["time"].size dataset.variables["time"][idx:idx + 1] = t @@ -383,6 +389,7 @@ def setup_dump(self, state_fields, t, ensemble, pick_up=False): create_dir = ens_comm.rank + comm.rank == 0 create_files = ens_comm.rank == 0 else: + ens_comm = None comm = self.mesh.comm create_dir = comm.Get_rank() == 0 create_files = True @@ -491,7 +498,8 @@ def setup_dump(self, state_fields, t, ensemble, pick_up=False): self.diagnostic_output = DiagnosticsOutput(diagnostics_filename, self.diagnostics, self.output.dirname, - ensemble, + comm=comm, + ensemble_comm=ens_comm, create=to_create) # if picking-up, don't do initial dump From cff27bdc2770a77e230942299463e9f159de7c5c Mon Sep 17 00:00:00 2001 From: jshipton Date: Fri, 26 Apr 2024 22:09:42 +0100 Subject: [PATCH 08/10] fix for checkpointing and diagnostics --- gusto/io.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/gusto/io.py b/gusto/io.py index dac67ae80..33c54dab4 100644 --- a/gusto/io.py +++ b/gusto/io.py @@ -157,13 +157,13 @@ def __init__(self, filename, diagnostics, description, comm, self.diagnostics = diagnostics self.comm = comm self.ensemble_comm = ensemble_comm - if not create: - return if ensemble_comm is not None: - self.create = ensemble_comm.rank == 0 and comm.rank == 0 + self.write_to_file = ensemble_comm.rank == 0 and comm.rank == 0 else: - self.create = comm.rank == 0 - if self.create: + self.write_to_file = comm.rank == 0 + if not create: + return + if self.write_to_file: with Dataset(filename, "w") as dataset: dataset.description = "Diagnostics data for simulation {desc}".format(desc=description) dataset.history = "Created {t}".format(t=time.ctime()) @@ -191,7 +191,7 @@ def dump(self, state_fields, t): diagnostic = getattr(self.diagnostics, dname) diagnostics.append((fname, dname, diagnostic(field))) - if self.create: + if self.write_to_file: with Dataset(self.filename, "a") as dataset: idx = dataset.dimensions["time"].size dataset.variables["time"][idx:idx + 1] = t From afb235bbd2001de0d59466178dc83ceae2350e89 Mon Sep 17 00:00:00 2001 From: jshipton Date: Wed, 1 May 2024 10:52:55 +0100 Subject: [PATCH 09/10] check for ensemble rank before writing to pvd --- gusto/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gusto/io.py b/gusto/io.py index 33c54dab4..0a9eb2b8d 100644 --- a/gusto/io.py +++ b/gusto/io.py @@ -726,7 +726,7 @@ def dump(self, state_fields, t, step, initial_steps=None): # dump fields self.write_nc_dump(t) - if output.dump_vtus: + if output.dump_vtus and self.ensemble.ensemble_comm.rank == 0: # dump fields self.pvd_dumpfile.write(*self.to_dump) From e76ffe6001ae8153b8f3312ab8f19ed0fb31085b Mon Sep 17 00:00:00 2001 From: jshipton Date: Wed, 1 May 2024 11:08:48 +0100 Subject: [PATCH 10/10] oops, forgot not everyone does time-parallelism again --- gusto/io.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gusto/io.py b/gusto/io.py index 0a9eb2b8d..7dd841300 100644 --- a/gusto/io.py +++ b/gusto/io.py @@ -688,6 +688,10 @@ def dump(self, state_fields, t, step, initial_steps=None): completed by a multi-level time scheme. Defaults to None. """ output = self.output + if self.ensemble is not None: + write_file = self.ensemble.ensemble_comm.rank == 0 + else: + write_file = True # Diagnostics: # Compute diagnostic fields @@ -726,7 +730,7 @@ def dump(self, state_fields, t, step, initial_steps=None): # dump fields self.write_nc_dump(t) - if output.dump_vtus and self.ensemble.ensemble_comm.rank == 0: + if output.dump_vtus and write_file: # dump fields self.pvd_dumpfile.write(*self.to_dump)