From d1cd12c53ee5ab2a216e0790d1bb59488f9a06ce Mon Sep 17 00:00:00 2001 From: Bryson Cale Date: Thu, 4 Jul 2024 12:54:25 -0700 Subject: [PATCH] Minor refactoring of parse_subarray_map_step and test, not completed --- .../parse_subarray_map_step.py | 37 ++++++++----- .../tests/test_parse_subarray_map.py | 55 +++++++++++++++++++ 2 files changed, 78 insertions(+), 14 deletions(-) create mode 100644 liger_iris_pipeline/tests/test_parse_subarray_map.py diff --git a/liger_iris_pipeline/parse_subarray_map/parse_subarray_map_step.py b/liger_iris_pipeline/parse_subarray_map/parse_subarray_map_step.py index 62e05ad..db324c6 100644 --- a/liger_iris_pipeline/parse_subarray_map/parse_subarray_map_step.py +++ b/liger_iris_pipeline/parse_subarray_map/parse_subarray_map_step.py @@ -1,7 +1,8 @@ import numpy as np from jwst.stpipe import Step -from jwst import datamodels +from .. import datamodels +import stdatamodels __all__ = ["ParseSubarrayMapStep"] @@ -34,18 +35,26 @@ class ParseSubarrayMapStep(Step): def process(self, input): - with datamodels.open(input) as input_model: - - if "subarr_map" in input_model: - self.log.info("Parsing the SUBARR_MAP extension") - result = input_model.copy() - for each in parse_subarray_map(result["subarr_map"]): - result.meta.subarray_map.append(each) - result.dq[result["subarr_map"] != 0] = np.bitwise_or( - result.dq[result["subarr_map"] != 0], 2 ** SUBARRAY_DQ_BIT - ) - else: - self.log.info("No SUBARR_MAP extension found") - result = input_model + if isinstance(input, str): + input_model = datamodels.open(input) + else: + input_model = input + + if "subarr_map" in input_model: + + self.log.info("Parsing the SUBARR_MAP extension") + + result = input_model.copy() + + for each in parse_subarray_map(result["subarr_map"]): + result.meta.subarray_map.append(each) + + result.dq[result["subarr_map"] != 0] = np.bitwise_or( + result.dq[result["subarr_map"] != 0], + 2 ** SUBARRAY_DQ_BIT + ) + else: + self.log.info("No SUBARR_MAP extension found") + result = input_model return result diff --git a/liger_iris_pipeline/tests/test_parse_subarray_map.py b/liger_iris_pipeline/tests/test_parse_subarray_map.py new file mode 100644 index 0000000..57df543 --- /dev/null +++ b/liger_iris_pipeline/tests/test_parse_subarray_map.py @@ -0,0 +1,55 @@ +# Imports +import liger_iris_pipeline +liger_iris_pipeline.monkeypatch_jwst_datamodels() +from liger_iris_pipeline.parse_subarray_map.parse_subarray_map_step import parse_subarray_map +from liger_iris_pipeline import ParseSubarrayMapStep +import numpy as np + +def set_subarray_mask(mask_array, subarray_index, xstart, ystart, xsize, ysize): + xstart = xstart - 1 + ystart = ystart - 1 + mask_array[ystart:ystart + ysize, xstart:xstart + xsize] = subarray_index + +def test_parse_subarray_map(): + subarray_maps_metadata = [ + {"xstart" : 80, "ystart" : 70, "xsize" : 10, "ysize" : 10}, + {"xstart" : 10, "ystart" : 20, "xsize" : 20, "ysize" : 20} + ] + + subarr_map = np.zeros((100,100), dtype=np.int16) + for i, shape in enumerate(subarray_maps_metadata): + set_subarray_mask(subarr_map, subarray_index=i+1, **shape) + + parse_subarray_map_output = parse_subarray_map(subarr_map) + + # Test parse_subarray_map + assert subarray_maps_metadata == parse_subarray_map_output + + # Create toy data + image = liger_iris_pipeline.datamodels.LigerIrisImageModel(data=np.zeros((100, 100))) + image.dq[25, 25] = 16 + image.dq[26, 26] = 1 + image["subarr_map"] = subarr_map + + # Test the step class + step = ParseSubarrayMapStep() + output = step.run(image) + + # Test + for each_parsed, each_input in zip(output.subarr_map, subarray_maps_metadata): + assert each_parsed.instance == each_input + + # If a pixel is already flagged as subarray, don't mess it up + assert output.dq[25, 25] == 16 + + # conserve existing flags + assert output.dq[26, 26] == 17 + + # Test + np.testing.assert_array_equal( + np.bitwise_and(output.dq, int(2**4)) > 0, + subarray_map != 0 + ) + + +test_parse_subarray_map() \ No newline at end of file