11"""Test coverage for imap_processing.hi.l1c.hi_l1c.py"""
22
3+ import io
34from collections import namedtuple
45from unittest import mock
56from unittest .mock import MagicMock
@@ -146,15 +147,16 @@ def test_empty_pset_dataset(use_fake_repoint_data_for_time):
146147 data = np .concat ((np .arange (n_energy_steps + 1 ).repeat (2 ), np .array ([255 , 255 ]))),
147148 attrs = {"FILLVAL" : 255 },
148149 )
149- n_calibration_prods = 5
150+ # Create calibration product numbers array (0, 1, 2, 3, 4)
151+ cal_prod_numbers = np .arange (5 )
150152 sensor_str = HIAPID .H90_SCI_DE .sensor
151153 l1b_met = 482373065
152154 use_fake_repoint_data_for_time (
153155 np .asarray ([l1b_met - 15 * 60 , l1b_met + 24 * 60 * 60 ])
154156 )
155157
156158 dataset = hi_l1c .empty_pset_dataset (
157- l1b_met , l1b_esa_energy_steps , n_calibration_prods , sensor_str
159+ l1b_met , l1b_esa_energy_steps , cal_prod_numbers , sensor_str
158160 )
159161
160162 assert dataset .epoch .size == 1
@@ -164,7 +166,8 @@ def test_empty_pset_dataset(use_fake_repoint_data_for_time):
164166 np .testing .assert_array_equal (
165167 dataset .esa_energy_step .data , np .arange (n_energy_steps ) + 1
166168 )
167- assert dataset .calibration_prod .size == n_calibration_prods
169+ assert dataset .calibration_prod .size == len (cal_prod_numbers )
170+ np .testing .assert_array_equal (dataset .calibration_prod .data , cal_prod_numbers )
168171
169172 # verify that attrs defined in hi_pset_epoch have overwritten default
170173 # epoch attributes
@@ -229,7 +232,7 @@ def test_pset_counts(
229232 empty_pset = hi_l1c .empty_pset_dataset (
230233 100 ,
231234 l1b_dataset .esa_energy_step ,
232- cal_config_df .cal_prod_config .number_of_products ,
235+ cal_config_df .cal_prod_config .calibration_product_numbers ,
233236 HIAPID .H90_SCI_DE .sensor ,
234237 )
235238 counts_var = hi_l1c .pset_counts (empty_pset .coords , cal_config_df , l1b_dataset )
@@ -255,7 +258,7 @@ def test_pset_counts_empty_l1b(
255258 empty_pset = hi_l1c .empty_pset_dataset (
256259 100 ,
257260 l1b_dataset .esa_energy_step ,
258- cal_config_df .cal_prod_config .number_of_products ,
261+ cal_config_df .cal_prod_config .calibration_product_numbers ,
259262 HIAPID .H90_SCI_DE .sensor ,
260263 )
261264 counts_var = hi_l1c .pset_counts (empty_pset .coords , cal_config_df , l1b_dataset )
@@ -325,6 +328,103 @@ def test_get_tof_window_mask():
325328 np .testing .assert_array_equal (expected_mask , window_mask )
326329
327330
331+ def test_empty_pset_dataset_arbitrary_cal_prod_numbers (use_fake_repoint_data_for_time ):
332+ """Test empty_pset_dataset with non-sequential calibration product numbers."""
333+ n_energy_steps = 3
334+ l1b_esa_energy_steps = xr .DataArray (
335+ data = np .concat ((np .arange (n_energy_steps + 1 ).repeat (2 ), np .array ([255 , 255 ]))),
336+ attrs = {"FILLVAL" : 255 },
337+ )
338+ # Use non-sequential calibration product numbers
339+ cal_prod_numbers = np .array ([5 , 10 , 100 ])
340+ sensor_str = HIAPID .H45_SCI_DE .sensor
341+ l1b_met = 482373065
342+ use_fake_repoint_data_for_time (
343+ np .asarray ([l1b_met - 15 * 60 , l1b_met + 24 * 60 * 60 ])
344+ )
345+
346+ dataset = hi_l1c .empty_pset_dataset (
347+ l1b_met , l1b_esa_energy_steps , cal_prod_numbers , sensor_str
348+ )
349+
350+ # Verify calibration_prod coordinate has the correct non-sequential values
351+ assert dataset .calibration_prod .size == len (cal_prod_numbers )
352+ np .testing .assert_array_equal (dataset .calibration_prod .data , cal_prod_numbers )
353+ # Verify the calibration_prod_label reflects the actual numbers
354+ expected_labels = np .array (["5" , "10" , "100" ])
355+ np .testing .assert_array_equal (dataset .calibration_prod_label .data , expected_labels )
356+
357+
358+ @pytest .mark .external_test_data
359+ def test_pset_counts_arbitrary_cal_prod_numbers (
360+ hi_l1_test_data_path , use_fake_repoint_data_for_time
361+ ):
362+ """Test pset_counts with non-sequential calibration product numbers."""
363+ # Create a test calibration product config with non-sequential numbers
364+ csv_content = """\
365+ calibration_prod,esa_energy_step,geometric_factor,coincidence_type_list,tof_ab_low,tof_ab_high,tof_ac1_low,tof_ac1_high,tof_bc1_low,tof_bc1_high,tof_c1c2_low,tof_c1c2_high
366+ 5,1,0.00055,ABC1C2,0,1023,-1023,1023,-1023,1023,0,1023
367+ 5,2,0.00085,ABC1C2,0,1023,-1023,1023,-1023,1023,0,1023
368+ 10,1,0.00055,BC1C2,0,1023,-1023,1023,-1023,1023,0,1023
369+ 10,2,0.00085,BC1C2,0,1023,-1023,1023,-1023,1023,0,1023
370+ """
371+
372+ l1b_de_path = hi_l1_test_data_path / "imap_hi_l1b_45sensor-de_20250415_v999.cdf"
373+ l1b_dataset = load_cdf (l1b_de_path )
374+
375+ cal_config_df = imap_processing .hi .utils .CalibrationProductConfig .from_csv (
376+ io .StringIO (csv_content )
377+ )
378+
379+ # Create PSET with non-sequential calibration product numbers
380+ l1b_met = 482373065
381+ use_fake_repoint_data_for_time (
382+ np .asarray ([l1b_met - 15 * 60 , l1b_met + 24 * 60 * 60 ])
383+ )
384+
385+ empty_pset = hi_l1c .empty_pset_dataset (
386+ l1b_met ,
387+ l1b_dataset .esa_energy_step ,
388+ cal_config_df .cal_prod_config .calibration_product_numbers ,
389+ HIAPID .H90_SCI_DE .sensor ,
390+ )
391+
392+ # Verify the calibration_prod coordinate has non-sequential values
393+ np .testing .assert_array_equal (empty_pset .calibration_prod .data , np .array ([5 , 10 ]))
394+
395+ # Mock get_pointing_times to avoid SPICE kernel requirements
396+ with mock .patch (
397+ "imap_processing.hi.hi_l1c.get_pointing_times" , return_value = (100 , 200 )
398+ ):
399+ counts_var = hi_l1c .pset_counts (empty_pset .coords , cal_config_df , l1b_dataset )
400+
401+ # Verify counts array has correct shape based on coordinates
402+ assert "counts" in counts_var
403+ # Shape should be (n_epoch, n_esa_energy, n_cal_prod, n_spin_bins)
404+ # where n_cal_prod is 2 (for products 5 and 10)
405+ expected_shape = (
406+ 1 ,
407+ empty_pset .esa_energy_step .size ,
408+ 2 , # Two calibration products: 5 and 10
409+ 3600 ,
410+ )
411+ assert counts_var ["counts" ].data .shape == expected_shape
412+ # Check that total number of expected counts is correct
413+ # ABC1C2 is coincidence type 15
414+ esa_1_2_mask = (l1b_dataset ["esa_step" ][l1b_dataset ["ccsds_index" ]] < 3 ).values
415+ coincidence_15_mask = (l1b_dataset ["coincidence_type" ] == 15 ).values
416+ np .testing .assert_equal (
417+ np .sum (counts_var ["counts" ].data [:, :, 0 ]),
418+ np .sum (coincidence_15_mask & esa_1_2_mask ),
419+ )
420+ # BC1C2 is coincidence type 7
421+ coincidence_7_mask = (l1b_dataset ["coincidence_type" ] == 7 ).values
422+ np .testing .assert_equal (
423+ np .sum (counts_var ["counts" ].data [:, :, 1 ]),
424+ np .sum (coincidence_7_mask & esa_1_2_mask ),
425+ )
426+
427+
328428def test_pset_backgrounds ():
329429 """Test coverage for pset_backgrounds function."""
330430 # Create some fake coordinates to use
@@ -369,7 +469,7 @@ def test_pset_exposure(
369469 attrs = {"FILLVAL" : 255 },
370470 )
371471 empty_pset = hi_l1c .empty_pset_dataset (
372- 100 , l1b_energy_steps , 2 , HIAPID .H90_SCI_DE .sensor
472+ 100 , l1b_energy_steps , np . array ([ 0 , 1 ]) , HIAPID .H90_SCI_DE .sensor
373473 )
374474 # Set the mock of find_second_de_packet_data to return a xr.Dataset
375475 # with some dummy data. ESA 1 will get binned data once, ESA 2 will get
0 commit comments