Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 65 additions & 59 deletions imap_processing/ialirt/l0/process_swapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from imap_processing.ialirt.utils.time import calculate_time
from imap_processing.spice.time import met_to_ttj2000ns, met_to_utc
from imap_processing.swapi.l1.swapi_l1 import process_sweep_data
from imap_processing.swapi.l2.swapi_l2 import SWAPI_LIVETIME
from imap_processing.swapi.l2.swapi_l2 import SWAPI_LIVETIME, select_first_63_passband_energies

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,6 +76,7 @@ def optimize_pseudo_parameters(
Find the pseudo speed (u), density (n) and temperature (T) of solar wind particles.

Fit a curve to calculated count rate values as a function of energy passband.
Takes count rates (with errors) and energy passbands for one single sweep as input.

Parameters
----------
Expand All @@ -90,45 +91,50 @@ def optimize_pseudo_parameters(
-------
solution_dict : dict
Dictionary containing the optimized speed, density, and temperature values for
each sweep included in the input count_rates array.
the sweep included in the input count_rates array.
"""
solution_dict = { # type: ignore
"pseudo_speed": [],
"pseudo_density": [],
"pseudo_temperature": [],
}

for sweep in np.arange(count_rates.shape[0]):
current_sweep_count_rates = count_rates[sweep, :]
current_sweep_count_rate_errors = count_rate_error[sweep, :]
# Find the max count rate, and use the 5 points surrounding it
max_index = np.argmax(current_sweep_count_rates)
initial_speed_guess = np.sqrt(energy_passbands[max_index]) * Consts.speed_coeff
initial_param_guess = np.array(
[
initial_speed_guess,
5 * (400 / initial_speed_guess) ** 2,
60000 * (initial_speed_guess / 400) ** 2,
]
)
sol = curve_fit(
assert len(count_rates.shape) == 1
assert count_rates.shape == count_rate_error.shape == energy_passbands.shape, f'{count_rates.shape} {count_rate_error.shape} {energy_passbands.shape}'

current_sweep_count_rates = count_rates
current_sweep_count_rate_errors = count_rate_error
# Find the max count rate, and use the 5 points surrounding it
max_index = np.argmax(current_sweep_count_rates)
initial_speed_guess = np.sqrt(energy_passbands[max_index]) * Consts.speed_coeff
initial_param_guess = np.array(
[
initial_speed_guess,
5 * (400 / initial_speed_guess) ** 2,
60000 * (initial_speed_guess / 400) ** 2,
]
)

fitting_point_range = range(max_index - 3, max_index + 3)
xdata = energy_passbands.take(fitting_point_range, mode="clip")
ydata = current_sweep_count_rates.take(fitting_point_range, mode="clip")
sigma = current_sweep_count_rate_errors.take(fitting_point_range, mode="clip")

# exclude points where the count is zero (or rounded to zero)
# because the fitting algorithm cannot handle them
# (since zero count implies sigma=0)
is_valid_data = ydata > 0

if np.count_nonzero(is_valid_data) >= 3:
solution = curve_fit(
f=count_rate,
xdata=energy_passbands.take(
range(max_index - 3, max_index + 3), mode="clip"
),
ydata=current_sweep_count_rates.take(
range(max_index - 3, max_index + 3), mode="clip"
),
sigma=current_sweep_count_rate_errors.take(
range(max_index - 3, max_index + 3), mode="clip"
),
xdata=xdata[is_valid_data],
ydata=ydata[is_valid_data],
sigma=sigma[is_valid_data],
p0=initial_param_guess,
)
solution_dict["pseudo_speed"].append(sol[0][0])
solution_dict["pseudo_density"].append(sol[0][1])
solution_dict["pseudo_temperature"].append(sol[0][2])
)[0]
else:
solution = [np.nan] * 3

return solution_dict
return {
"pseudo_speed": solution[0],
"pseudo_density": solution[1],
"pseudo_temperature": solution[2]
}


def process_swapi_ialirt(
Expand Down Expand Up @@ -176,6 +182,7 @@ def process_swapi_ialirt(
(grouped_dataset["group"] == group)
]

# TODO change this to the center of the interval
met_values.append(
int(grouped_dataset["met"][(grouped_dataset["group"] == group).values][0])
)
Expand Down Expand Up @@ -208,41 +215,40 @@ def process_swapi_ialirt(
dtype="datetime64[ns]"
)

# Find the sweep's energy data for the latest time, where sweep_id == 2
subset = calibration_lut_table[
(calibration_lut_table["timestamp"] == calibration_lut_table["timestamp"].max())
& (calibration_lut_table["Sweep #"] == 2)
]
if subset.empty:
energy_passbands = np.full(NUM_IALIRT_ENERGY_STEPS, np.nan, dtype=np.float64)
else:
subset = subset.sort_values(["timestamp", "ESA Step #"])
energy_passbands = (
subset["Energy"][:NUM_IALIRT_ENERGY_STEPS].to_numpy().astype(float)
)

solution = optimize_pseudo_parameters(
raw_coin_rate, count_rate_error, energy_passbands
)

swapi_data = []

for entry in np.arange(0, len(solution["pseudo_speed"])):
if len(met_values) != len(raw_coin_rate):
raise RuntimeError('Inconsistent number of MET times and count sweep bins')

for entry in np.arange(0, len(raw_coin_rate)):
entry_met = int(met_values[entry])
entry_met_in_utc = met_to_utc(entry_met)
sweep_table_version = unpacked_data["swapi_version"].sel(epoch=entry_met, method="nearest").item()
energy_passbands = select_first_63_passband_energies(
calibration_table_df=calibration_lut_table,
time=entry_met_in_utc,
sweep_table_version=sweep_table_version
)
solution = optimize_pseudo_parameters(
raw_coin_rate[entry],
count_rate_error[entry],
energy_passbands
)
swapi_data.append(
{
"apid": 478,
"met": int(met_values[entry]),
"met_in_utc": met_to_utc(met_values[entry]).split(".")[0],
"met": entry_met,
"met_in_utc": entry_met_in_utc.split(".")[0],
"ttj2000ns": int(met_to_ttj2000ns(met_values[entry])),
"instrument": "swapi",
"swapi_pseudo_proton_speed": Decimal(
f"{solution['pseudo_speed'][entry]:.3f}"
f"{solution['pseudo_speed']:.3f}"
),
"swapi_pseudo_proton_density": Decimal(
f"{solution['pseudo_density'][entry]:.3f}"
f"{solution['pseudo_density']:.3f}"
),
"swapi_pseudo_proton_temperature": Decimal(
f"{solution['pseudo_temperature'][entry]:.3f}"
f"{solution['pseudo_temperature']:.3f}"
),
}
)
Expand Down
57 changes: 29 additions & 28 deletions imap_processing/swapi/l2/swapi_l2.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""SWAPI L2 processing module."""

import logging
import select

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -68,34 +69,7 @@ def solve_full_sweep_energy(
first_63_energies = []

for time, sweep_id in zip(data_time, sweep_table, strict=False):
# Find the sweep's ESA data for the given time and sweep_id
subset = esa_table_df[
(esa_table_df["timestamp"] <= time) & (esa_table_df["Sweep #"] == sweep_id)
]
if subset.empty:
# Get the earliest timestamp available
earliest_time = esa_table_df["timestamp"].min()

# Find the sweep's ESA data for the earliest time and sweep_id
earliest_subset = esa_table_df[
(esa_table_df["timestamp"] == earliest_time)
& (esa_table_df["Sweep #"] == sweep_id)
]
if earliest_subset.empty:
raise ValueError(
f"No matching ESA table entry found for sweep ID {sweep_id} "
f"at time {time}, and no entries found for earliest time "
f"{earliest_time}."
)
subset = earliest_subset

# Subset data can contain multiple 72 energy values with last 9 fine energies
# with 'Solve' value. We need to sort by time and ESA step to maintain correct
# order. Then take the last group of 72 steps values and select first 63
# values only.
subset = subset.sort_values(["timestamp", "ESA Step #"])
grouped = subset["Energy"].values.reshape(-1, NUM_ENERGY_STEPS)
first_63 = grouped[-1, :63]
first_63 = select_first_63_passband_energies(esa_table_df, time, sweep_id)
first_63_energies.append(first_63)

# Find last 9 fine energy values of all sweeps data
Expand Down Expand Up @@ -163,6 +137,33 @@ def solve_full_sweep_energy(
return sweeps_energy_value


def select_first_63_passband_energies(calibration_table_df, time, sweep_table_version):
"""
Select passband energies from the calibration table based on the time and table version.

:param calibration_table_df:
DataFrame containing entries for `timestamp`, `Sweep #`, `ESA Step #`, and `Energy`.
Each timestamp has 72 energy steps.
:param time: Time of measurement (calibration from after this time is excluded if earlier is available, otherwise earliest time is used)
:param sweep_table_version: Sweep table version (to select from the `Sweep #` column)
"""

subset = calibration_table_df[calibration_table_df["Sweep #"] == sweep_table_version]

table_timestamps = pd.to_datetime(subset["timestamp"], utc=True)
table_timestamps_before = table_timestamps[table_timestamps <= pd.to_datetime(time, utc=True)]
selected_timestamp = (table_timestamps_before.max() # latest one that is not after the input time
if not table_timestamps_before.empty
else table_timestamps.max())

subset = subset[table_timestamps == selected_timestamp]

assert len(subset) == 72, f'{len(subset)} entries in calibration table for time {time}, sweep # {sweep_table_version}; 72 are required'

subset = subset.sort_values(["timestamp", "ESA Step #"])
return subset["Energy"][:63].to_numpy().astype(float)


def swapi_l2(
l1_dataset: xr.Dataset,
esa_table_df: pd.DataFrame,
Expand Down
Loading