diff --git a/python/lsst/pipe/tasks/finalizeCharacterization.py b/python/lsst/pipe/tasks/finalizeCharacterization.py index d4f80d699..6416b6f93 100644 --- a/python/lsst/pipe/tasks/finalizeCharacterization.py +++ b/python/lsst/pipe/tasks/finalizeCharacterization.py @@ -35,8 +35,10 @@ ] import astropy.table +import astropy.units as u import numpy as np import esutil +from smatch.matcher import Matcher import lsst.pex.config as pexConfig @@ -81,6 +83,21 @@ class FinalizeCharacterizationConnectionsBase( deferLoad=True, multiple=True, ) + fgcm_standard_star = pipeBase.connectionTypes.Input( + doc=('Catalog of fgcm for color corrections, and indexes to the ' + 'isolated_star_cats catalogs.'), + name='fgcm_standard_star', + storageClass='ArrowAstropy', + dimensions=('instrument', 'tract', 'skymap'), + deferLoad=True, + multiple=True, + ) + + def __init__(self, *, config=None): + super().__init__(config=config) + + if not self.config.psf_determiner['piff'].useColor: + self.inputs.remove("fgcm_standard_star") class FinalizeCharacterizationConnections( @@ -327,6 +344,9 @@ def __init__(self, initInputs=None, **kwargs): # Only log warning and fatal errors from the source_selector self.source_selector.log.setLevel(self.source_selector.log.WARN) + self.isPsfDeterminerPiff = False + if isinstance(self.psf_determiner, lsst.meas.extensions.piff.piffPsfDeterminer.PiffPsfDeterminerTask): + self.isPsfDeterminerPiff = True def _make_output_schema_mapper(self, input_schema): """Make the schema mapper from the input schema to the output schema. @@ -401,6 +421,17 @@ def _make_output_schema_mapper(self, input_schema): type=np.int32, doc='Detector number for the sources.', ) + output_schema.addField( + 'psfColorValue', + type=np.float32, + doc="Color used in PSF fit." + ) + output_schema.addField( + 'psfColorType', + type=str, + size=10, + doc="Color used in PSF fit." + ) alias_map = input_schema.getAliasMap() alias_map_output = afwTable.AliasMap() @@ -434,6 +465,18 @@ def _make_selection_schema_mapper(self, input_schema): selection_schema.setAliasMap(input_schema.getAliasMap()) + selection_schema.addField( + 'psfColorValue', + type=np.float32, + doc="Color used in PSF fit." + ) + selection_schema.addField( + 'psfColorType', + type=str, + size=10, + doc="Color used in PSF fit." + ) + return mapper, selection_schema def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_source_dict): @@ -542,7 +585,31 @@ def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_ return isolated_table, isolated_source_table - def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_source_table): + def add_src_colors(self, srcCat, fgcmCat, band): + + if self.isPsfDeterminerPiff and fgcmCat is not None: + + raSrc = (srcCat['coord_ra'] * u.radian).to(u.degree).value + decSrc = (srcCat['coord_dec'] * u.radian).to(u.degree).value + + with Matcher(raSrc, decSrc) as matcher: + idx, idxSrcCat, idxColorCat, d = matcher.query_radius( + fgcmCat["ra"], + fgcmCat["dec"], + 1. / 3600.0, + return_indices=True, + ) + + magStr1 = self.psf_determiner.config.color[band][0] + magStr2 = self.psf_determiner.config.color[band][2] + colors = fgcmCat[f'mag_{magStr1}'] - fgcmCat[f'mag_{magStr2}'] + + for idSrc, idColor in zip(idxSrcCat, idxColorCat): + srcCat[idSrc]['psfColorValue'] = colors[idColor] + srcCat[idSrc]['psfColorType'] = f"{magStr1}-{magStr2}" + + def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, + isolated_source_table, fgcm_standard_star_cat): """Compute psf model and aperture correction map for a single exposure. Parameters @@ -622,6 +689,12 @@ def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_s # We need to copy over the calib_psf flags because they were not in the mapper measured_src['calib_psf_candidate'] = selected_src['calib_psf_candidate'] measured_src['calib_psf_reserved'] = selected_src['calib_psf_reserved'] + if hasattr(exposure.filter, "bandLabel"): + band = exposure.filter.bandLabel + else: + band = None + self.add_src_colors(selected_src, fgcm_standard_star_cat, band) + self.add_src_colors(measured_src, fgcm_standard_star_cat, band) # Select the psf candidates from the selection catalog try: @@ -639,6 +712,7 @@ def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_s in zip(psf_selection_result.psfCandidates, ~psf_cand_cat['calib_psf_reserved']) if use] flag_key = psf_cand_cat.schema['calib_psf_used'].asKey() + try: psf, cell_set = self.psf_determiner.determinePsf(exposure, psf_determiner_list, @@ -649,7 +723,7 @@ def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_s visit, detector, e) return None, None, measured_src # Verify that the PSF is usable by downstream tasks - sigma = psf.computeShape(psf.getAveragePosition()).getDeterminantRadius() + sigma = psf.computeShape(psf.getAveragePosition(), psf.getAverageColor()).getDeterminantRadius() if np.isnan(sigma): self.log.warning('Failed to determine psf for visit %d, detector %d: ' 'Computed final PSF size is NAN.', @@ -727,11 +801,20 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for tract in sorted(isolated_star_source_dict_temp.keys())} + if self.config.psf_determiner['piff'].useColor: + fgcm_standard_star_dict_temp = {handle.dataId['tract']: handle + for handle in input_handle_dict['fgcm_standard_star']} + fgcm_standard_star_dict = {tract: fgcm_standard_star_dict_temp[tract] for + tract in sorted(fgcm_standard_star_dict_temp.keys())} + else: + fgcm_standard_star_dict = None + struct = self.run( visit, band, isolated_star_cat_dict, isolated_star_source_dict, + fgcm_standard_star_dict, src_dict, calexp_dict, ) @@ -739,7 +822,8 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): butlerQC.put(struct.psf_ap_corr_cat, outputRefs.finalized_psf_ap_corr_cat) butlerQC.put(struct.output_table, outputRefs.finalized_src_table) - def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict): + def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, + fgcm_standard_star_dict, src_dict, calexp_dict): """ Run the FinalizeCharacterizationTask. @@ -807,6 +891,19 @@ def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, sr measured_src_tables = [] measured_src_table = None + if fgcm_standard_star_dict is not None: + + fgcm_standard_star_cat = [] + + for tract in fgcm_standard_star_dict: + astropy_fgcm = fgcm_standard_star_dict[tract].get() + table_fgcm = np.asarray(astropy_fgcm) + fgcm_standard_star_cat.append(table_fgcm) + + fgcm_standard_star_cat = np.concatenate(fgcm_standard_star_cat) + else: + fgcm_standard_star_cat = None + self.log.info("Running finalizeCharacterization on %d detectors.", len(detector_keys)) for detector in detector_keys: self.log.info("Starting finalizeCharacterization on detector ID %d.", detector) @@ -818,7 +915,8 @@ def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, sr detector, exposure, src, - isolated_source_table + isolated_source_table, + fgcm_standard_star_cat, ) # And now we package it together... @@ -870,12 +968,21 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for tract in sorted(isolated_star_source_dict_temp.keys())} + if self.config.psf_determiner['piff'].useColor: + fgcm_standard_star_dict_temp = {handle.dataId['tract']: handle + for handle in input_handle_dict['fgcm_standard_star']} + fgcm_standard_star_dict = {tract: fgcm_standard_star_dict_temp[tract] for + tract in sorted(fgcm_standard_star_dict_temp.keys())} + else: + fgcm_standard_star_dict = None + struct = self.run( visit, band, detector, isolated_star_cat_dict, isolated_star_source_dict, + fgcm_standard_star_dict, input_handle_dict['src'], input_handle_dict['calexp'], ) @@ -889,7 +996,8 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): outputRefs.finalized_src_detector_table, ) - def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_source_dict, src, exposure): + def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_source_dict, + fgcm_standard_star_dict, src, exposure): """ Run the FinalizeCharacterizationDetectorTask. @@ -905,6 +1013,8 @@ def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_sourc Per-tract dict of isolated star catalog handles. isolated_star_source_dict : `dict` Per-tract dict of isolated star source catalog handles. + fgcm_standard_star_dict : `dict` + Per-tract dict of fgcm photometry catalog handles. src : `lsst.afw.table.SourceCatalog` Src catalog. exposure : `lsst.afw.image.Exposure` @@ -925,7 +1035,7 @@ def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_sourc _, isolated_source_table = self.concat_isolated_star_cats( band, isolated_star_cat_dict, - isolated_star_source_dict + isolated_star_source_dict, ) if isolated_source_table is None: @@ -943,12 +1053,25 @@ def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_sourc self.log.info("Starting finalizeCharacterization on detector ID %d.", detector) + if fgcm_standard_star_dict is not None: + fgcm_standard_star_cat = [] + + for tract in fgcm_standard_star_dict: + astropy_fgcm = fgcm_standard_star_dict[tract].get() + table_fgcm = np.asarray(astropy_fgcm) + fgcm_standard_star_cat.append(table_fgcm) + + fgcm_standard_star_cat = np.concatenate(fgcm_standard_star_cat) + else: + fgcm_standard_star_cat = None + psf, ap_corr_map, measured_src = self.compute_psf_and_ap_corr_map( visit, detector, exposure, src, isolated_source_table, + fgcm_standard_star_cat, ) # And now we package it together... diff --git a/tests/test_finalizeCharacterization.py b/tests/test_finalizeCharacterization.py index 2f09ab79b..176d52be1 100644 --- a/tests/test_finalizeCharacterization.py +++ b/tests/test_finalizeCharacterization.py @@ -73,11 +73,13 @@ def compute_psf_and_ap_corr_map( exposure, src, isolated_src_table, + fgcm_standard_star_cat, use_super=False, ): """A mocked version of this method.""" if use_super: - return super().compute_psf_and_ap_corr_map(visit, detector, exposure, src, isolated_src_table) + return super().compute_psf_and_ap_corr_map(visit, detector, exposure, src, + isolated_src_table, fgcm_standard_star_cat) return _make_dummy_psf_and_ap_corr_map() @@ -98,6 +100,7 @@ def compute_psf_and_ap_corr_map( exposure, src, isolated_src_table, + fgcm_standard_star_cat, use_super=False, ): """A mocked version of this method.""" @@ -125,7 +128,7 @@ def setUp(self): config=config_det, ) - self.isolated_star_cat_dict, self.isolated_star_source_dict = self._make_isocats() + self.isolated_star_cat_dict, self.isolated_star_source_dict, self.fgcm_cat = self._make_isocats() def _make_isocats(self): """Make test isolated star catalogs. @@ -153,8 +156,16 @@ def _make_isocats(self): dtype_source = [('sourceId', 'i8'), ('obj_index', 'i4')] + dtype_fgcm = [ + ('ra', 'f8'), + ('dec', 'f8'), + ('mag_g', 'f8'), + ('mag_i', 'f8'), + ] + isolated_star_cat_dict = {} isolated_star_source_dict = {} + fgcm_cat_dict = {} np.random.seed(12345) @@ -212,14 +223,22 @@ def _make_isocats(self): counter += nsource_per_band_per_star + fgcm_cat = np.zeros(nstar, dtype=dtype_fgcm) + fgcm_cat['ra'] = ra + fgcm_cat['dec'] = dec + fgcm_cat['mag_g'] = np.random.uniform(low=16, high=20, size=nstar) + fgcm_cat['mag_i'] = np.random.uniform(low=16, high=20, size=nstar) + source_cat = np.concatenate(source_cats) isolated_star_cat_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(cat), storageClass="ArrowAstropy") isolated_star_source_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(source_cat), storageClass="ArrowAstropy") + fgcm_cat_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(fgcm_cat), + storageClass="ArrowAstropy") - return isolated_star_cat_dict, isolated_star_source_dict + return isolated_star_cat_dict, isolated_star_source_dict, fgcm_cat_dict def test_concat_isolated_star_cats(self): """Test concatenation and reservation of the isolated star catalogs. @@ -290,6 +309,7 @@ def test_compute_psf_and_ap_corr_map_no_sources(self): detector = 0 exposure = None isolated_source_table = None + fgcm_cat = None with self.assertLogs(level=logging.WARNING) as cm: psf, ap_corr_map, measured_src = self.finalizeCharacterizationTask.compute_psf_and_ap_corr_map( visit, @@ -297,6 +317,7 @@ def test_compute_psf_and_ap_corr_map_no_sources(self): exposure, src, isolated_source_table, + fgcm_cat, use_super=True, ) self.assertIn( @@ -333,6 +354,7 @@ def test_run_visit(self): band, self.isolated_star_cat_dict, self.isolated_star_source_dict, + self.fgcm_cat, src_dict, calexp_dict, ) @@ -369,6 +391,7 @@ def test_run_detectors(self): detector0, self.isolated_star_cat_dict, self.isolated_star_source_dict, + self.fgcm_cat, src0, calexp0, ) @@ -379,6 +402,7 @@ def test_run_detectors(self): detector1, self.isolated_star_cat_dict, self.isolated_star_source_dict, + self.fgcm_cat, src1, calexp1, )