Skip to content

Commit

Permalink
Merge pull request #2658 from cta-observatory/fix_stereo_comb_uncert
Browse files Browse the repository at this point in the history
Fix uncertainty calculation in StereoMeanCombiner
  • Loading branch information
maxnoe authored Dec 3, 2024
2 parents 7a26885 + 9c74e4d commit 1156181
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 87 deletions.
2 changes: 2 additions & 0 deletions docs/changes/2658.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix ``<reconstruction_property>_uncert`` calculations in ``ctapipe.reco.StereoMeanCombiner``.
Add helper functions for vectorized numpy calculations as new ``ctapipe.reco.telescope_event_handling`` module.
4 changes: 2 additions & 2 deletions src/ctapipe/coordinates/tests/test_impact_distance.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ def test_shower_impact_distance(reference_location):
assert np.allclose(impact_distances, [0, 1, 2] * u.m)

# alt=0 az=0 should be aligned to x-axis (north in ground frame)
# therefore the distances should also be just the y-offset of the telecope
# therefore the distances should also be just the y-offset of the telescope
shower_geom = ReconstructedGeometryContainer(
core_x=0 * u.m, core_y=0 * u.m, alt=0 * u.deg, az=0 * u.deg
)
impact_distances = shower_impact_distance(shower_geom=shower_geom, subarray=sub)
assert np.allclose(impact_distances, [0, 1, 2] * u.m)

# alt=0 az=90 should be aligned to y-axis (east in ground frame)
# therefore the distances should also be just the x-offset of the telecope
# therefore the distances should also be just the x-offset of the telescope
shower_geom = ReconstructedGeometryContainer(
core_x=0 * u.m, core_y=0 * u.m, alt=0 * u.deg, az=90 * u.deg
)
Expand Down
114 changes: 44 additions & 70 deletions src/ctapipe/reco/stereo_combination.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ReconstructedEnergyContainer,
ReconstructedGeometryContainer,
)
from .telescope_event_handling import get_subarray_index, weighted_mean_std_ufunc
from .utils import add_defaults_and_meta

_containers = {
Expand All @@ -31,38 +32,6 @@
]


def _grouped_add(tel_data, n_array_events, indices):
"""
Calculate the group-wise sum for each array event over the
corresponding telescope events. ``indices`` is an array
that gives the index of the subarray event for each telescope event,
returned by
``np.unique(tel_events[["obs_id", "event_id"]], return_inverse=True)``
"""
combined_values = np.zeros(n_array_events)
np.add.at(combined_values, indices, tel_data)
return combined_values


def _weighted_mean_ufunc(tel_values, weights, n_array_events, indices):
# avoid numerical problems by very large or small weights
weights = weights / weights.max()
sum_prediction = _grouped_add(
tel_values * weights,
n_array_events,
indices,
)
sum_of_weights = _grouped_add(
weights,
n_array_events,
indices,
)
mean = np.full(n_array_events, np.nan)
valid = sum_of_weights > 0
mean[valid] = sum_prediction[valid] / sum_of_weights[valid]
return mean


class StereoCombiner(Component):
"""
Base Class for algorithms combining telescope-wise predictions to common prediction.
Expand Down Expand Up @@ -299,26 +268,26 @@ def predict_table(self, mono_predictions: Table) -> Table:
prefix = f"{self.prefix}_tel"
# TODO: Integrate table quality query once its done
valid = mono_predictions[f"{prefix}_is_valid"]
valid_predictions = mono_predictions[valid]

array_events, indices, multiplicity = np.unique(
mono_predictions[["obs_id", "event_id"]],
return_inverse=True,
return_counts=True,
obs_ids, event_ids, multiplicity, tel_to_array_indices = get_subarray_index(
mono_predictions
)
stereo_table = Table(array_events)
n_array_events = len(obs_ids)
stereo_table = Table({"obs_id": obs_ids, "event_id": event_ids})
# copy metadata
for colname in ("obs_id", "event_id"):
stereo_table[colname].description = mono_predictions[colname].description

n_array_events = len(array_events)
weights = self._calculate_weights(valid_predictions)
weights = self._calculate_weights(mono_predictions[valid])

if self.property is ReconstructionProperty.PARTICLE_TYPE:
if len(valid_predictions) > 0:
mono_predictions = valid_predictions[f"{prefix}_prediction"]
stereo_predictions = _weighted_mean_ufunc(
mono_predictions, weights, n_array_events, indices[valid]
if np.count_nonzero(valid) > 0:
stereo_predictions, _ = weighted_mean_std_ufunc(
mono_predictions[f"{prefix}_prediction"],
valid,
tel_to_array_indices,
multiplicity,
weights=weights,
)
else:
stereo_predictions = np.full(n_array_events, np.nan)
Expand All @@ -328,29 +297,20 @@ def predict_table(self, mono_predictions: Table) -> Table:
stereo_table[f"{self.prefix}_goodness_of_fit"] = np.nan

elif self.property is ReconstructionProperty.ENERGY:
if len(valid_predictions) > 0:
mono_energies = valid_predictions[f"{prefix}_energy"].quantity.to_value(
if np.count_nonzero(valid) > 0:
mono_energies = mono_predictions[f"{prefix}_energy"].quantity.to_value(
u.TeV
)

if self.log_target:
mono_energies = np.log(mono_energies)

stereo_energy = _weighted_mean_ufunc(
stereo_energy, std = weighted_mean_std_ufunc(
mono_energies,
weights,
n_array_events,
indices[valid],
)
variance = _weighted_mean_ufunc(
(mono_energies - np.repeat(stereo_energy, multiplicity)[valid])
** 2,
weights,
n_array_events,
indices[valid],
valid,
tel_to_array_indices,
multiplicity,
weights=weights,
)
std = np.sqrt(variance)

if self.log_target:
stereo_energy = np.exp(stereo_energy)
std = np.exp(std)
Expand All @@ -369,22 +329,34 @@ def predict_table(self, mono_predictions: Table) -> Table:
stereo_table[f"{self.prefix}_goodness_of_fit"] = np.nan

elif self.property is ReconstructionProperty.GEOMETRY:
if len(valid_predictions) > 0:
if np.count_nonzero(valid) > 0:
coord = AltAz(
alt=valid_predictions[f"{prefix}_alt"],
az=valid_predictions[f"{prefix}_az"],
alt=mono_predictions[f"{prefix}_alt"],
az=mono_predictions[f"{prefix}_az"],
)
# https://en.wikipedia.org/wiki/Von_Mises%E2%80%93Fisher_distribution#Mean_direction
mono_x, mono_y, mono_z = coord.cartesian.get_xyz()

stereo_x = _weighted_mean_ufunc(
mono_x, weights, n_array_events, indices[valid]
stereo_x, _ = weighted_mean_std_ufunc(
mono_x,
valid,
tel_to_array_indices,
multiplicity,
weights=weights,
)
stereo_y = _weighted_mean_ufunc(
mono_y, weights, n_array_events, indices[valid]
stereo_y, _ = weighted_mean_std_ufunc(
mono_y,
valid,
tel_to_array_indices,
multiplicity,
weights=weights,
)
stereo_z = _weighted_mean_ufunc(
mono_z, weights, n_array_events, indices[valid]
stereo_z, _ = weighted_mean_std_ufunc(
mono_z,
valid,
tel_to_array_indices,
multiplicity,
weights=weights,
)

mean_cartesian = CartesianRepresentation(
Expand Down Expand Up @@ -425,7 +397,9 @@ def predict_table(self, mono_predictions: Table) -> Table:

tel_ids = [[] for _ in range(n_array_events)]

for index, tel_id in zip(indices[valid], valid_predictions["tel_id"]):
for index, tel_id in zip(
tel_to_array_indices[valid], mono_predictions["tel_id"][valid]
):
tel_ids[index].append(tel_id)

stereo_table[f"{self.prefix}_telescopes"] = tel_ids
Expand Down
146 changes: 146 additions & 0 deletions src/ctapipe/reco/telescope_event_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""Helper functions for array-event-wise aggregation of telescope events."""

import numpy as np
from numba import njit, uint64

__all__ = ["get_subarray_index", "weighted_mean_std_ufunc"]


@njit
def _get_subarray_index(obs_ids, event_ids):
n_tel_events = len(obs_ids)
idx = np.zeros(n_tel_events, dtype=uint64)
current_idx = 0
subarray_obs_index = []
subarray_event_index = []
multiplicities = []
multiplicity = 0

if n_tel_events > 0:
subarray_obs_index.append(obs_ids[0])
subarray_event_index.append(event_ids[0])
multiplicity += 1

for i in range(1, n_tel_events):
if obs_ids[i] != obs_ids[i - 1] or event_ids[i] != event_ids[i - 1]:
# append to subarray events
multiplicities.append(multiplicity)
subarray_obs_index.append(obs_ids[i])
subarray_event_index.append(event_ids[i])
# reset state
current_idx += 1
multiplicity = 0

multiplicity += 1
idx[i] = current_idx

# Append multiplicity of the last subarray event
if n_tel_events > 0:
multiplicities.append(multiplicity)

return (
np.asarray(subarray_obs_index),
np.asarray(subarray_event_index),
np.asarray(multiplicities),
idx,
)


def get_subarray_index(tel_table):
"""
Get the subarray-event-wise information from a table of telescope events.
Extract obs_ids and event_ids of all subarray events contained
in a table of telescope events, their multiplicity and an array
giving the index of the subarray event for each telescope event.
This requires the telescope events of one subarray event to be come
in a single block.
Parameters
----------
tel_table: astropy.table.Table
table with telescope events as rows
Returns
-------
Tuple(np.ndarray, np.ndarray, np.ndarray, np.ndarray)
obs_ids of subarray events, event_ids of subarray events,
multiplicity of subarray events, index of the subarray event
for each telescope event
"""
obs_idx = tel_table["obs_id"]
event_idx = tel_table["event_id"]
return _get_subarray_index(obs_idx, event_idx)


def _grouped_add(tel_data, n_array_events, indices):
"""
Calculate the group-wise sum for each array event over the
corresponding telescope events. ``indices`` is an array
that gives the index of the subarray event for each telescope event.
"""
combined_values = np.zeros(n_array_events)
np.add.at(combined_values, indices, tel_data)
return combined_values


def weighted_mean_std_ufunc(
tel_values,
valid_tel,
indices,
multiplicity,
weights=np.array([1]),
):
"""
Calculate the weighted mean and standard deviation for each array event
over the corresponding telescope events.
Parameters
----------
tel_values: np.ndarray
values for each telescope event
valid_tel: array-like
boolean mask giving the valid values of ``tel_values``
indices: np.ndarray
index of the subarray event for each telescope event, returned as
the fourth return value of ``get_subarray_index``
multiplicity: np.ndarray
multiplicity of the subarray events in the same order as the order of
subarray events in ``indices``
weights: np.ndarray
weights used for averaging (equal/no weights are used by default)
Returns
-------
Tuple(np.ndarray, np.ndarray)
weighted mean and standard deviation for each array event
"""
n_array_events = len(multiplicity)
# avoid numerical problems by very large or small weights
weights = weights / weights.max()
tel_values = tel_values[valid_tel]
indices = indices[valid_tel]

sum_prediction = _grouped_add(
tel_values * weights,
n_array_events,
indices,
)
sum_of_weights = _grouped_add(
weights,
n_array_events,
indices,
)
mean = np.full(n_array_events, np.nan)
valid = sum_of_weights > 0
mean[valid] = sum_prediction[valid] / sum_of_weights[valid]

sum_sq_residulas = _grouped_add(
(tel_values - np.repeat(mean, multiplicity)[valid_tel]) ** 2 * weights,
n_array_events,
indices,
)
variance = np.full(n_array_events, np.nan)
variance[valid] = sum_sq_residulas[valid] / sum_of_weights[valid]
return mean, np.sqrt(variance)
10 changes: 10 additions & 0 deletions src/ctapipe/reco/tests/test_stereo_combination.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,18 @@ def test_predict_mean_energy(weights, mono_table):
assert_array_equal(stereo["event_id"], np.array([1, 2, 1]))
if weights == "intensity":
assert_array_equal(stereo["dummy_energy"], [7, 0.5, np.nan] * u.TeV)
assert_allclose(
stereo["dummy_energy_uncert"].quantity,
[4.242641, 0, np.nan] * u.TeV,
atol=1e-7,
)
elif weights == "none":
assert_array_equal(stereo["dummy_energy"], [5, 0.5, np.nan] * u.TeV)
assert_allclose(
stereo["dummy_energy_uncert"].quantity,
[3.741657, 0, np.nan] * u.TeV,
atol=1e-7,
)

assert_array_equal(stereo["dummy_telescopes"][0], np.array([1, 2, 3]))
assert_array_equal(stereo["dummy_telescopes"][1], 5)
Expand Down
Loading

0 comments on commit 1156181

Please sign in to comment.