Skip to content

Commit d7d22ab

Browse files
committed
planning now MPI-parallelized
1 parent c705036 commit d7d22ab

File tree

2 files changed

+300
-228
lines changed

2 files changed

+300
-228
lines changed

bin/mpi-fastspecfit

Lines changed: 97 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ MPI wrapper for fastphot and fastspec.
55
"""
66
import os, time
77
import numpy as np
8+
from astropy.table import Table
89

910
from fastspecfit.logger import log
1011

@@ -19,24 +20,34 @@ def get_size(comm, mp=1):
1920
def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None, makeqa=False,
2021
sample=None, input_redshifts=False, outdir_data='.', templates=None,
2122
templateversion=None, fphotodir=None, fphotofile=None):
23+
"""Main wrapper to run fastspec, fastphot, or fastqa.
2224
25+
Top-level MPI paraellelization is over (e.g., healpix) files, but there is
26+
another level of parallelization which makes use of subcommunicators.
27+
28+
For example, calling `mpi-fastspecfit` with 8 MPI tasks and --mp=4 will
29+
result in two (8/4) healpix files being processed simultaneously
30+
(specifically, by ranks 0 and 4) and then a further level of
31+
parallelization over the objects in each of those files specifically, but
32+
subranks (0, 1, 2, 3) and (4, 5, 6, 7), respectively.
33+
34+
"""
2335
import sys
2436
from desispec.parallel import stdouterr_redirected
2537
from fastspecfit.mpi import plan
2638

2739
if comm:
2840
rank = comm.rank
29-
#size = comm.size
30-
size = get_size(comm, mp=args.mp)
41+
size = comm.size
42+
#size = get_size(comm, mp=args.mp)
3143

3244
# Split the MPI.COMM_WORLD communicator into size // args.mp
3345
# subcommunicators so we can parallelize over objects in
3446
# fastspecfit.fastspec (or fastspecfit.fastphot).
35-
colors = np.arange(size) // args.mp
47+
colors = np.arange(comm.size) // args.mp
3648
subcomm = comm.Split(color=rank // args.mp, key=rank)
3749
else:
38-
rank = 0
39-
size = 1
50+
rank, size = 0, 1
4051
colors = [0]
4152
subcomm = None
4253
print(comm.rank, comm.size, subcomm.rank, subcomm.size, size)
@@ -45,13 +56,13 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None, makeqa=F
4556
if rank == 0:
4657
if sample is not None:
4758
_, redrockfiles, outfiles, groups, ntargets = plan(
48-
size=size, specprod=args.specprod, specprod_dir=specprod_dir,
59+
comm=comm, specprod=args.specprod, specprod_dir=specprod_dir,
4960
sample=sample, coadd_type='healpix', makeqa=args.makeqa,
5061
mp=args.mp, fastphot=args.fastphot, outdir_data=outdir_data,
5162
overwrite=args.overwrite)
5263
else:
5364
_, redrockfiles, outfiles, groups, ntargets = plan(
54-
size=size, specprod=args.specprod, specprod_dir=specprod_dir,
65+
comm=comm, specprod=args.specprod, specprod_dir=specprod_dir,
5566
coadd_type=args.coadd_type, survey=args.survey, program=args.program,
5667
healpix=args.healpix, tile=args.tile, night=args.night,
5768
makeqa=args.makeqa, mp=args.mp, fastphot=fastphot, outdir_data=outdir_data,
@@ -60,6 +71,10 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None, makeqa=F
6071
else:
6172
redrockfiles, outfiles, groups, ntargets = [], [], [], []
6273

74+
print(rank, redrockfiles, ntargets, groups)
75+
76+
return
77+
6378
#if comm:
6479
# groups = comm.bcast(groups, root=0)
6580
# redrockfiles = comm.bcast(redrockfiles, root=0)
@@ -303,12 +318,17 @@ def main():
303318
import multiprocessing
304319
multiprocessing.set_start_method('spawn')
305320

306-
# Rank 0 is responsible for planning and merging.
307-
if rank == 0:
308-
# check the input samplefile
309-
if args.samplefile is not None:
321+
# If an input samplefile is provided, read and broadcast it.
322+
if args.samplefile is None:
323+
sample = None
324+
else:
325+
if args.coadd_type != 'healpix':
326+
errmsg = 'Input --samplefile is only currently compatible with --coadd-type="healpix"'
327+
log.critical(errmsg)
328+
raise NotImplementedError(errmsg)
329+
330+
if rank == 0:
310331
import fitsio
311-
from astropy.table import Table
312332
if not os.path.isfile(args.samplefile):
313333
log.warning(f'{args.samplefile} does not exist.')
314334
return
@@ -324,80 +344,79 @@ def main():
324344
'{SURVEY,PROGRAM,HEALPIX,TARGETID}'
325345
log.critical(errmsg)
326346
raise ValueError(errmsg)
347+
else:
348+
sample = Table()
327349

328-
if args.samplefile is None and args.coadd_type == 'healpix':
329-
args.survey = args.survey.split(',')
330-
args.program = args.program.split(',')
331-
if args.healpix is not None:
332-
args.healpix = args.healpix.split(',')
350+
sample = comm.bcast(sample, root=0)
333351

334-
# merge
335-
if args.mergeall_main or args.mergeall_sv or args.mergeall_special:
336-
args.mergeall = True
352+
# Parse some of the inputs.
353+
if args.samplefile is None and args.coadd_type == 'healpix':
354+
args.survey = args.survey.split(',')
355+
args.program = args.program.split(',')
356+
if args.healpix is not None:
357+
args.healpix = args.healpix.split(',')
337358

338-
if args.merge or args.mergeall:
339-
from fastspecfit.mpi import merge_fastspecfit
359+
# FIXME - merging business is a mess
360+
if args.mergeall_main or args.mergeall_sv or args.mergeall_special:
361+
args.mergeall = True
340362

341-
# convenience code to make the super-merge catalogs, e.g., fastspec-iron-{main,special,sv}.fits
342-
if args.fastphot:
343-
fastprefix = 'fastphot'
344-
else:
345-
fastprefix = 'fastspec'
363+
if args.merge or args.mergeall:
364+
from fastspecfit.mpi import merge_fastspecfit
346365

347-
if args.mergeall_main or args.mergeall_sv or args.mergeall_special:
348-
from glob import glob
349-
if args.mergedir is None:
350-
mergedir = os.path.join(outdir_data, args.specprod, 'catalogs')
351-
else:
352-
mergedir = args.mergedir
353-
354-
if args.mergeall_main:
355-
args.merge_suffix = f'{args.specprod}-main'
356-
fastfiles_to_merge = sorted(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-main-*.fits')))
357-
elif args.mergeall_special:
358-
args.merge_suffix = f'{args.specprod}-special'
359-
fastfiles_to_merge = sorted(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-special-*.fits')))
360-
elif args.mergeall_sv:
361-
args.merge_suffix = f'{args.specprod}-sv'
362-
fastfiles_to_merge = sorted(list(set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-*.fits'))) -
363-
set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-main.fits'))) -
364-
set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-special.fits'))) -
365-
set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-special-*.fits'))) -
366-
set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-main-*.fits'))) -
367-
set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-sv.fits')))))
368-
else:
369-
fastfiles_to_merge = None
370-
else:
371-
fastfiles_to_merge = None
366+
# convenience code to make the super-merge catalogs, e.g., fastspec-iron-{main,special,sv}.fits
367+
if args.fastphot:
368+
fastprefix = 'fastphot'
369+
else:
370+
fastprefix = 'fastspec'
372371

373-
if args.samplefile is not None:
374-
merge_fastspecfit(specprod=args.specprod, specprod_dir=specprod_dir, coadd_type='healpix',
375-
sample=sample, merge_suffix=args.merge_suffix,
376-
outdir_data=outdir_data, fastfiles_to_merge=fastfiles_to_merge,
377-
outsuffix=args.merge_suffix, mergedir=args.mergedir, overwrite=args.overwrite,
378-
verbose=args.verbose, fastphot=args.fastphot, supermerge=args.mergeall, mp=args.mp)
372+
if args.mergeall_main or args.mergeall_sv or args.mergeall_special:
373+
from glob import glob
374+
if args.mergedir is None:
375+
mergedir = os.path.join(outdir_data, args.specprod, 'catalogs')
379376
else:
380-
merge_fastspecfit(specprod=args.specprod, specprod_dir=specprod_dir, coadd_type=args.coadd_type,
381-
survey=args.survey, program=args.program, healpix=args.healpix,
382-
tile=args.tile, night=args.night, outdir_data=outdir_data,
383-
fastfiles_to_merge=fastfiles_to_merge, verbose=args.verbose,
384-
outsuffix=args.merge_suffix, mergedir=args.mergedir, overwrite=args.overwrite,
385-
fastphot=args.fastphot, supermerge=args.mergeall, mp=args.mp)
386-
387-
388-
if args.plan and args.makeqa is False:
389-
if rank == 0:
390-
if args.samplefile is not None:
391-
plan(size=size, specprod=args.specprod, specprod_dir=specprod_dir,
392-
sample=sample, coadd_type='healpix', makeqa=args.makeqa,
393-
mp=args.mp, fastphot=args.fastphot,
394-
outdir_data=outdir_data, overwrite=args.overwrite)
377+
mergedir = args.mergedir
378+
379+
if args.mergeall_main:
380+
args.merge_suffix = f'{args.specprod}-main'
381+
fastfiles_to_merge = sorted(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-main-*.fits')))
382+
elif args.mergeall_special:
383+
args.merge_suffix = f'{args.specprod}-special'
384+
fastfiles_to_merge = sorted(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-special-*.fits')))
385+
elif args.mergeall_sv:
386+
args.merge_suffix = f'{args.specprod}-sv'
387+
fastfiles_to_merge = sorted(list(set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-*.fits'))) -
388+
set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-main.fits'))) -
389+
set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-special.fits'))) -
390+
set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-special-*.fits'))) -
391+
set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-main-*.fits'))) -
392+
set(glob(os.path.join(mergedir, f'{fastprefix}-{args.specprod}-sv.fits')))))
395393
else:
396-
plan(size=size, specprod=args.specprod, specprod_dir=specprod_dir,
397-
coadd_type=args.coadd_type, survey=args.survey, program=args.program,
398-
healpix=args.healpix, tile=args.tile, night=args.night,
399-
makeqa=args.makeqa, mp=args.mp, fastphot=args.fastphot,
400-
outdir_data=outdir_data, overwrite=args.overwrite)
394+
fastfiles_to_merge = None
395+
else:
396+
fastfiles_to_merge = None
397+
398+
if args.samplefile is not None:
399+
merge_fastspecfit(specprod=args.specprod, specprod_dir=specprod_dir, coadd_type='healpix',
400+
sample=sample, merge_suffix=args.merge_suffix,
401+
outdir_data=outdir_data, fastfiles_to_merge=fastfiles_to_merge,
402+
outsuffix=args.merge_suffix, mergedir=args.mergedir, overwrite=args.overwrite,
403+
verbose=args.verbose, fastphot=args.fastphot, supermerge=args.mergeall, mp=args.mp)
404+
else:
405+
merge_fastspecfit(specprod=args.specprod, specprod_dir=specprod_dir, coadd_type=args.coadd_type,
406+
survey=args.survey, program=args.program, healpix=args.healpix,
407+
tile=args.tile, night=args.night, outdir_data=outdir_data,
408+
fastfiles_to_merge=fastfiles_to_merge, verbose=args.verbose,
409+
outsuffix=args.merge_suffix, mergedir=args.mergedir, overwrite=args.overwrite,
410+
fastphot=args.fastphot, supermerge=args.mergeall, mp=args.mp)
411+
412+
413+
if args.plan:
414+
plan(comm=comm, specprod=args.specprod, specprod_dir=specprod_dir,
415+
coadd_type=args.coadd_type, survey=args.survey, program=args.program,
416+
healpix=args.healpix, tile=args.tile, night=args.night,
417+
makeqa=args.makeqa, mp=args.mp, fastphot=args.fastphot,
418+
outdir_data=outdir_data, overwrite=args.overwrite,
419+
sample=sample)
401420
else:
402421
run_fastspecfit(args, comm=comm, fastphot=args.fastphot, specprod_dir=specprod_dir,
403422
makeqa=args.makeqa, outdir_data=outdir_data, sample=sample,

0 commit comments

Comments
 (0)