Skip to content

DM-45569: Add color from fgcm for fitting a PSF color term in Piff. #1111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 132 additions & 6 deletions python/lsst/pipe/tasks/finalizeCharacterization.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
]

import astropy.table
import astropy.units as u
import numpy as np
import esutil
from smatch.matcher import Matcher


import lsst.pex.config as pexConfig
Expand Down Expand Up @@ -81,6 +83,22 @@ class FinalizeCharacterizationConnectionsBase(
deferLoad=True,
multiple=True,
)
fgcm_standard_star = pipeBase.connectionTypes.Input(
doc=('Catalog of fgcm for color corrections, and indexes to the '
'isolated_star_cats catalogs.'),
name='fgcm_standard_star',
storageClass='ArrowAstropy',
dimensions=('instrument', 'tract', 'skymap'),
deferLoad=True,
multiple=True,
)

def __init__(self, *, config=None):
super().__init__(config=config)
if config is None:
return None
if not self.config.psf_determiner['piff'].useColor:
self.inputs.remove("fgcm_standard_star")


class FinalizeCharacterizationConnections(
Expand Down Expand Up @@ -327,6 +345,9 @@ def __init__(self, initInputs=None, **kwargs):

# Only log warning and fatal errors from the source_selector
self.source_selector.log.setLevel(self.source_selector.log.WARN)
self.isPsfDeterminerPiff = False
if isinstance(self.psf_determiner, lsst.meas.extensions.piff.piffPsfDeterminer.PiffPsfDeterminerTask):
self.isPsfDeterminerPiff = True

def _make_output_schema_mapper(self, input_schema):
"""Make the schema mapper from the input schema to the output schema.
Expand Down Expand Up @@ -401,6 +422,17 @@ def _make_output_schema_mapper(self, input_schema):
type=np.int32,
doc='Detector number for the sources.',
)
output_schema.addField(
'psf_color_value',
type=np.float32,
doc="Color used in PSF fit."
)
output_schema.addField(
'psf_color_type',
type=str,
size=10,
doc="Color used in PSF fit."
)

alias_map = input_schema.getAliasMap()
alias_map_output = afwTable.AliasMap()
Expand Down Expand Up @@ -434,6 +466,18 @@ def _make_selection_schema_mapper(self, input_schema):

selection_schema.setAliasMap(input_schema.getAliasMap())

selection_schema.addField(
'psf_color_value',
type=np.float32,
doc="Color used in PSF fit."
)
selection_schema.addField(
'psf_color_type',
type=str,
size=10,
doc="Color used in PSF fit."
)

return mapper, selection_schema

def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_source_dict):
Expand Down Expand Up @@ -542,7 +586,31 @@ def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_

return isolated_table, isolated_source_table

def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_source_table):
def add_src_colors(self, srcCat, fgcmCat, band):

if self.isPsfDeterminerPiff and fgcmCat is not None:

raSrc = (srcCat['coord_ra'] * u.radian).to(u.degree).value
decSrc = (srcCat['coord_dec'] * u.radian).to(u.degree).value

with Matcher(raSrc, decSrc) as matcher:
idx, idxSrcCat, idxColorCat, d = matcher.query_radius(
fgcmCat["ra"],
fgcmCat["dec"],
1. / 3600.0,
return_indices=True,
)

magStr1 = self.psf_determiner.config.color[band][0]
magStr2 = self.psf_determiner.config.color[band][2]
colors = fgcmCat[f'mag_{magStr1}'] - fgcmCat[f'mag_{magStr2}']

for idSrc, idColor in zip(idxSrcCat, idxColorCat):
srcCat[idSrc]['psf_color_value'] = colors[idColor]
srcCat[idSrc]['psf_color_type'] = f"{magStr1}-{magStr2}"

def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src,
isolated_source_table, fgcm_standard_star_cat):
"""Compute psf model and aperture correction map for a single exposure.

Parameters
Expand Down Expand Up @@ -622,6 +690,12 @@ def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_s
# We need to copy over the calib_psf flags because they were not in the mapper
measured_src['calib_psf_candidate'] = selected_src['calib_psf_candidate']
measured_src['calib_psf_reserved'] = selected_src['calib_psf_reserved']
if exposure.filter.hasBandLabel():
band = exposure.filter.bandLabel
else:
band = None
self.add_src_colors(selected_src, fgcm_standard_star_cat, band)
self.add_src_colors(measured_src, fgcm_standard_star_cat, band)

# Select the psf candidates from the selection catalog
try:
Expand All @@ -639,6 +713,7 @@ def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_s
in zip(psf_selection_result.psfCandidates,
~psf_cand_cat['calib_psf_reserved']) if use]
flag_key = psf_cand_cat.schema['calib_psf_used'].asKey()

try:
psf, cell_set = self.psf_determiner.determinePsf(exposure,
psf_determiner_list,
Expand All @@ -649,7 +724,7 @@ def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_s
visit, detector, e)
return None, None, measured_src
# Verify that the PSF is usable by downstream tasks
sigma = psf.computeShape(psf.getAveragePosition()).getDeterminantRadius()
sigma = psf.computeShape(psf.getAveragePosition(), psf.getAverageColor()).getDeterminantRadius()
if np.isnan(sigma):
self.log.warning('Failed to determine psf for visit %d, detector %d: '
'Computed final PSF size is NAN.',
Expand Down Expand Up @@ -727,19 +802,29 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for
tract in sorted(isolated_star_source_dict_temp.keys())}

if self.config.psf_determiner['piff'].useColor:
fgcm_standard_star_dict_temp = {handle.dataId['tract']: handle
for handle in input_handle_dict['fgcm_standard_star']}
fgcm_standard_star_dict = {tract: fgcm_standard_star_dict_temp[tract] for
tract in sorted(fgcm_standard_star_dict_temp.keys())}
else:
fgcm_standard_star_dict = None

struct = self.run(
visit,
band,
isolated_star_cat_dict,
isolated_star_source_dict,
src_dict,
calexp_dict,
fgcm_standard_star_dict=fgcm_standard_star_dict,
)

butlerQC.put(struct.psf_ap_corr_cat, outputRefs.finalized_psf_ap_corr_cat)
butlerQC.put(struct.output_table, outputRefs.finalized_src_table)

def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict):
def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict,
src_dict, calexp_dict, fgcm_standard_star_dict=None):
"""
Run the FinalizeCharacterizationTask.

Expand All @@ -757,6 +842,8 @@ def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, sr
Per-detector dict of src catalog handles.
calexp_dict : `dict`
Per-detector dict of calibrated exposure handles.
fgcm_standard_star_dict : `dict`
Per tract dict of fgcm isolated stars.

Returns
-------
Expand Down Expand Up @@ -807,6 +894,19 @@ def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, sr
measured_src_tables = []
measured_src_table = None

if fgcm_standard_star_dict is not None:

fgcm_standard_star_cat = []

for tract in fgcm_standard_star_dict:
astropy_fgcm = fgcm_standard_star_dict[tract].get()
table_fgcm = np.asarray(astropy_fgcm)
fgcm_standard_star_cat.append(table_fgcm)

fgcm_standard_star_cat = np.concatenate(fgcm_standard_star_cat)
else:
fgcm_standard_star_cat = None

self.log.info("Running finalizeCharacterization on %d detectors.", len(detector_keys))
for detector in detector_keys:
self.log.info("Starting finalizeCharacterization on detector ID %d.", detector)
Expand All @@ -818,7 +918,8 @@ def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, sr
detector,
exposure,
src,
isolated_source_table
isolated_source_table,
fgcm_standard_star_cat,
)

# And now we package it together...
Expand Down Expand Up @@ -870,6 +971,14 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for
tract in sorted(isolated_star_source_dict_temp.keys())}

if self.config.psf_determiner['piff'].useColor:
fgcm_standard_star_dict_temp = {handle.dataId['tract']: handle
for handle in input_handle_dict['fgcm_standard_star']}
fgcm_standard_star_dict = {tract: fgcm_standard_star_dict_temp[tract] for
tract in sorted(fgcm_standard_star_dict_temp.keys())}
else:
fgcm_standard_star_dict = None

struct = self.run(
visit,
band,
Expand All @@ -878,6 +987,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
isolated_star_source_dict,
input_handle_dict['src'],
input_handle_dict['calexp'],
fgcm_standard_star_dict=fgcm_standard_star_dict,
)

butlerQC.put(
Expand All @@ -889,7 +999,8 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
outputRefs.finalized_src_detector_table,
)

def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_source_dict, src, exposure):
def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_source_dict,
src, exposure, fgcm_standard_star_dict=None):
"""
Run the FinalizeCharacterizationDetectorTask.

Expand All @@ -909,6 +1020,8 @@ def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_sourc
Src catalog.
exposure : `lsst.afw.image.Exposure`
Calexp exposure.
fgcm_standard_star_dict : `dict`
Per tract dict of fgcm isolated stars.

Returns
-------
Expand All @@ -925,7 +1038,7 @@ def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_sourc
_, isolated_source_table = self.concat_isolated_star_cats(
band,
isolated_star_cat_dict,
isolated_star_source_dict
isolated_star_source_dict,
)

if isolated_source_table is None:
Expand All @@ -943,12 +1056,25 @@ def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_sourc

self.log.info("Starting finalizeCharacterization on detector ID %d.", detector)

if fgcm_standard_star_dict is not None:
fgcm_standard_star_cat = []

for tract in fgcm_standard_star_dict:
astropy_fgcm = fgcm_standard_star_dict[tract].get()
table_fgcm = np.asarray(astropy_fgcm)
fgcm_standard_star_cat.append(table_fgcm)

fgcm_standard_star_cat = np.concatenate(fgcm_standard_star_cat)
else:
fgcm_standard_star_cat = None

psf, ap_corr_map, measured_src = self.compute_psf_and_ap_corr_map(
visit,
detector,
exposure,
src,
isolated_source_table,
fgcm_standard_star_cat,
)

# And now we package it together...
Expand Down
Loading