Skip to content

Commit

Permalink
begin a pure-MPI implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
moustakas committed Dec 8, 2024
1 parent 3e800a5 commit 889951d
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
19 changes: 13 additions & 6 deletions bin/mpi-fastspecfit
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None,
_, zbestfiles, outfiles, groups, ntargets = plan(
size=size, specprod=args.specprod, specprod_dir=specprod_dir,
sample=sample, coadd_type='healpix', makeqa=args.makeqa,
mp=args.mp, fastphot=args.fastphot,
mp=args.mp, fastphot=args.fastphot,
outdir_data=outdir_data, overwrite=args.overwrite)
else:
sample = None
Expand Down Expand Up @@ -117,6 +117,9 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None,
if args.fphotofile:
cmdargs += f' --fphotofile={args.fphotofile}'

if args.verbose:
cmdargs += ' --verbose'

if args.ntargets is not None:
cmdargs += f' --ntargets={args.ntargets}'

Expand Down Expand Up @@ -162,12 +165,18 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None,
outdir = os.path.dirname(logfile)
if not os.path.isdir(outdir):
os.makedirs(outdir, exist_ok=True)

## pure-MPI
#if comm is not None:
# subcomm = comm.Split(color=rank)
#else:
# subcomm = None

if args.nolog:
fast(args=cmdargs.split())
fast(args=cmdargs.split())#, comm=subcomm)
else:
cmdargs += ' --verbose'
with stdouterr_redirected(to=logfile, overwrite=args.overwrite):
fast(args=cmdargs.split())
fast(args=cmdargs.split())#, comm=subcomm)

dt1 = time.time() - t1
log.info(f' rank {rank} done in {dt1:.2f} sec')
Expand Down Expand Up @@ -360,13 +369,11 @@ def main():
mp=args.mp, fastphot=args.fastphot,
outdir_data=outdir_data, overwrite=args.overwrite)
else:
print(f'I am rank {rank}')
plan(size=size, specprod=args.specprod, specprod_dir=specprod_dir,
coadd_type=args.coadd_type, survey=args.survey, program=args.program,
healpix=args.healpix, tile=args.tile, night=args.night,
makeqa=args.makeqa, mp=args.mp, fastphot=args.fastphot,
outdir_data=outdir_data, overwrite=args.overwrite)
print(f'I am rank {rank}')
else:
run_fastspecfit(args, comm=comm, fastphot=args.fastphot, specprod_dir=specprod_dir,
makeqa=args.makeqa, outdir_data=outdir_data,
Expand Down
13 changes: 7 additions & 6 deletions py/fastspecfit/fastspecfit.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,16 @@ def fastspec(fastphot=False, fitstack=False, args=None, comm=None, verbose=False

sc_data.initialize(**init_sc_args)

# If multiprocessing, create a pool of worker processes and initialize
# single-copy objects in each worker.
if args.mp > 1 and not 'NERSC_HOST' in os.environ:
import multiprocessing
multiprocessing.set_start_method('fork')
## If multiprocessing, create a pool of worker processes and initialize
## single-copy objects in each worker.
#if args.mp > 1 and not 'NERSC_HOST' in os.environ:
# import multiprocessing
# multiprocessing.set_start_method('fork')

t0 = time.time()
mp_pool = MPPool(args.mp, initializer=sc_data.initialize,
init_argdict=init_sc_args)
init_argdict=init_sc_args, comm=comm)

log.debug(f'Caching took {time.time()-t0:.5f} seconds.')

log.info(f'Cached stellar templates {sc_data.templates.file}')
Expand Down
11 changes: 5 additions & 6 deletions py/fastspecfit/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class MPPool(object):
rather than a list of positional arguments.
"""
def __init__(self, nworkers, initializer=None, init_argdict=None):
def __init__(self, nworkers, initializer=None, init_argdict=None, comm=None):
"""
create a pool with nworkers workers, using the current
process if nworkers is 1. If initiializer is not None,
Expand All @@ -65,11 +65,10 @@ def __init__(self, nworkers, initializer=None, init_argdict=None):
initfunc = None if initializer is None else self.apply_to_dict

if nworkers > 1:
#try:
# from mpi4py.futures import MPIPoolExecutor as Pool
#except:
# from multiprocessing import Pool
from multiprocessing import Pool
if comm is not None:
from mpi4py.futures import MPIPoolExecutor as Pool
else:
from multiprocessing import Pool
self.pool = Pool(nworkers,
initializer=initfunc,
initargs=(initializer, init_argdict,))
Expand Down

0 comments on commit 889951d

Please sign in to comment.