diff --git a/README.rst b/README.rst index 8794a50..950f8cc 100644 --- a/README.rst +++ b/README.rst @@ -40,7 +40,7 @@ Documentation can be found at `http://pycrires.readthedocs.io `_ when *pycrires* is used in a publication and `Landman et al. (2023) `_ specifically when using the dedicated routines for spatially-resolved sources. +Please cite `Stolker & Landman (2023) `_ when *pycrires* is used in a publication and `Landman et al. (2023) `_ specifically when using the dedicated routines for spatially-resolved sources. Contributing ------------ diff --git a/docs/about.rst b/docs/about.rst index f1acdf0..232fda9 100644 --- a/docs/about.rst +++ b/docs/about.rst @@ -11,7 +11,7 @@ Contact Attribution ----------- -Please cite `Stolker & Landman (2023) `_ when *pycrires* is used in a publication and `Landman et al. (2023) `_ specifically when using the dedicated routines for spatially-resolved sources. +Please cite `Stolker & Landman (2023) `_ when *pycrires* is used in a publication and `Landman et al. (2023) `_ specifically when using the dedicated routines for spatially-resolved sources. Contributing ------------ diff --git a/docs/running.rst b/docs/running.rst index 4fbe656..375fb61 100644 --- a/docs/running.rst +++ b/docs/running.rst @@ -33,6 +33,9 @@ Below, there is an full example for reducing and calibrating the `CRIRES+ None: np.savetxt(out_file, transm_spec, header=header) @typechecked - def cal_dark(self, verbose: bool = True) -> None: + def cal_dark(self, verbose: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_cal_dark``. @@ -1389,6 +1389,13 @@ def cal_dark(self, verbose: bool = True) -> None: ---------- verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. + Returns ------- @@ -1398,6 +1405,9 @@ def cal_dark(self, verbose: bool = True) -> None: self._print_section("Create master DARK", recipe_name="cr2res_cal_dark") + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + indices = self.header_data["DPR.TYPE"] == "DARK" # Create output folder @@ -1414,27 +1424,40 @@ def cal_dark(self, verbose: bool = True) -> None: unique_dit.add(item) if len(unique_dit) == 0: - print("Unique DIT values: none") + print("\nUnique DIT values: none") else: - print(f"Unique DIT values: {unique_dit}\n") + print(f"\nUnique DIT values: {unique_dit}") # Create SOF file - print("Creating SOF file:") - sof_file = Path(output_dir / "files.sof") - with open(sof_file, "w", encoding="utf-8") as sof_open: - for item in self.header_data[indices]["ORIGFILE"]: - sof_open.write(f"{self.path}/raw/{item} DARK\n") - self._update_files("DARK", f"{self.path}/raw/{item}") + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'cal_dark' has not been " + "previously executed so forcing " + "'create_sof' to True.") - # Check if any dark frames were found + create_sof = True - if "DARK" not in self.file_dict: - raise RuntimeError( - "The 'raw' folder does not contain any DPR.TYPE=DARK files." - ) + if create_sof: + print("\nCreating SOF file:") + + with open(sof_file, "w", encoding="utf-8") as sof_open: + for item in self.header_data[indices]["ORIGFILE"]: + sof_open.write(f"{self.path}/raw/{item} DARK\n") + self._update_files("DARK", f"{self.path}/raw/{item}") + + # Check if any dark frames were found + + if "DARK" not in self.file_dict: + raise RuntimeError( + "The 'raw' folder does not contain any DPR.TYPE=DARK files." + ) + + else: + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -1495,7 +1518,7 @@ def cal_dark(self, verbose: bool = True) -> None: json.dump(self.file_dict, json_file, indent=4) @typechecked - def cal_flat(self, verbose: bool = True) -> None: + def cal_flat(self, verbose: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_cal_flat``. @@ -1503,6 +1526,12 @@ def cal_flat(self, verbose: bool = True) -> None: ---------- verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -1512,6 +1541,9 @@ def cal_flat(self, verbose: bool = True) -> None: self._print_section("Create master FLAT", recipe_name="cr2res_cal_flat") + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + indices = self.header_data["DPR.TYPE"] == "FLAT" # Create output folder @@ -1543,9 +1575,9 @@ def cal_flat(self, verbose: bool = True) -> None: unique_dit.add(item) if len(unique_dit) == 0: - print("Unique DIT values: none") + print("\nUnique DIT values: none") else: - print(f"Unique DIT values: {unique_dit}\n") + print(f"\nUnique DIT values: {unique_dit}") # Wavelength setting @@ -1558,58 +1590,71 @@ def cal_flat(self, verbose: bool = True) -> None: # Iterate over different DIT values for FLAT for dit_item in unique_dit: - print(f"Creating SOF file for DIT={dit_item}:") + sof_file = Path(self.path / f"calib/cal_flat/files_dit{fit_item}.sof") - sof_file = Path(self.path / "calib/cal_flat/files.sof") + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'cal_dark' has not been " + "previously executed so forcing " + "'create_sof' to True.") - with open(sof_file, "w", encoding="utf-8") as sof_open: - for item in self.header_data[indices]["ORIGFILE"]: - index = self.header_data.index[ - self.header_data["ORIGFILE"] == item - ][0] - flat_dit = self.header_data.iloc[index]["DET.SEQ1.DIT"] + create_sof = True - if flat_dit == dit_item: - file_path = f"{self.path}/raw/{item}" - sof_open.write(f"{file_path} FLAT\n") - self._update_files("FLAT", file_path) + if create_sof: + print(f"\nCreating SOF file for DIT={dit_item}:") - # Find master dark + with open(sof_file, "w", encoding="utf-8") as sof_open: + for item in self.header_data[indices]["ORIGFILE"]: + index = self.header_data.index[ + self.header_data["ORIGFILE"] == item + ][0] + flat_dit = self.header_data.iloc[index]["DET.SEQ1.DIT"] - file_found = False + if flat_dit == dit_item: + file_path = f"{self.path}/raw/{item}" + sof_open.write(f"{file_path} FLAT\n") + self._update_files("FLAT", file_path) - if "CAL_DARK_MASTER" in self.file_dict: - for key, value in self.file_dict["CAL_DARK_MASTER"].items(): - if not file_found and value["DIT"] == dit_item: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_MASTER" - ) - sof_open.write(f"{key} CAL_DARK_MASTER\n") - file_found = True + # Find master dark - if not file_found: - self._download_archive("DARK", dit_item) + file_found = False - # Find bad pixel map + if "CAL_DARK_MASTER" in self.file_dict: + for key, value in self.file_dict["CAL_DARK_MASTER"].items(): + if not file_found and value["DIT"] == dit_item: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_MASTER" + ) + sof_open.write(f"{key} CAL_DARK_MASTER\n") + file_found = True - file_found = False + if not file_found: + self._download_archive("DARK", dit_item) - if "CAL_DARK_BPM" in self.file_dict: - bpm_file = self.select_bpm(wlen_id, dit_item) + # Find bad pixel map - if bpm_file is not None: - file_name = bpm_file.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM" + file_found = False + + if "CAL_DARK_BPM" in self.file_dict: + bpm_file = self.select_bpm(wlen_id, dit_item) + + if bpm_file is not None: + file_name = bpm_file.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM" + ) + sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") + file_found = True + + if not file_found: + warnings.warn( + f"There is not a bad pixel map " f"with DIT = {dit_item} s." ) - sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") - file_found = True - if not file_found: - warnings.warn( - f"There is not a bad pixel map " f"with DIT = {dit_item} s." - ) + else: + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -1638,11 +1683,11 @@ def cal_flat(self, verbose: bool = True) -> None: subprocess.run(esorex, cwd=output_dir, stdout=stdout, check=True) if not verbose: - print(" [DONE]\n") + print(" [DONE]") # Update file dictionary with master flat - print("Output files:") + print("\nOutput files:") fits_files = Path(self.path / "calib").glob( "cr2res_cal_flat_*master_flat.fits" @@ -1668,7 +1713,7 @@ def cal_flat(self, verbose: bool = True) -> None: json.dump(self.file_dict, json_file, indent=4) @typechecked - def cal_detlin(self, verbose: bool = True) -> None: + def cal_detlin(self, verbose: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_cal_detlin``. @@ -1676,6 +1721,12 @@ def cal_detlin(self, verbose: bool = True) -> None: ---------- verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -1687,6 +1738,9 @@ def cal_detlin(self, verbose: bool = True) -> None: "Determine detector linearity", recipe_name="cr2res_cal_detlin" ) + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + indices = self.header_data["DPR.TYPE"] == "FLAT,LAMP,DETCHECK" # Create output folder @@ -1709,14 +1763,27 @@ def cal_detlin(self, verbose: bool = True) -> None: # Create SOF file - print("Creating SOF file:") - sof_file = Path(output_dir / "files.sof") - with open(sof_file, "w", encoding="utf-8") as sof_open: - for item in self.header_data[indices]["ORIGFILE"]: - sof_open.write(f"{self.path}/raw/{item} DETLIN_LAMP\n") - self._update_files("DETLIN_LAMP", f"{self.path}/raw/{item}") + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'cal_dark' has not been " + "previously executed so forcing " + "'create_sof' to True.") + + create_sof = True + + if create_sof: + print("Creating SOF file:") + + with open(sof_file, "w", encoding="utf-8") as sof_open: + for item in self.header_data[indices]["ORIGFILE"]: + sof_open.write(f"{self.path}/raw/{item} DETLIN_LAMP\n") + self._update_files("DETLIN_LAMP", f"{self.path}/raw/{item}") + + else: + print(f"\nFound SOF file: {sof_file}") # Check if any dark frames were found @@ -1779,7 +1846,7 @@ def cal_detlin(self, verbose: bool = True) -> None: json.dump(self.file_dict, json_file, indent=4) @typechecked - def cal_wave(self, verbose: bool = True) -> None: + def cal_wave(self, verbose: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_cal_wave``. @@ -1787,6 +1854,12 @@ def cal_wave(self, verbose: bool = True) -> None: ---------- verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -1796,6 +1869,9 @@ def cal_wave(self, verbose: bool = True) -> None: self._print_section("Wavelength calibration", recipe_name="cr2res_cal_wave") + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + # Create output folder output_dir = self.calib_folder / "cal_wave" @@ -1805,115 +1881,128 @@ def cal_wave(self, verbose: bool = True) -> None: # Create SOF file - print("Creating SOF file:") - sof_file = Path(self.path / "calib/cal_wave/files.sof") - with open(sof_file, "w", encoding="utf-8") as sof_open: - # Uranium-Neon lamp (UNE) frames + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'cal_dark' has not been " + "previously executed so forcing " + "'create_sof' to True.") - indices = self.header_data["DPR.TYPE"] == "WAVE,UNE" + create_sof = True - une_found = False + if create_sof: + print("Creating SOF file:") - if sum(indices) == 0: - warnings.warn( - "The 'raw' folder does not contain a DPR.TYPE=WAVE,UNE file." - ) + with open(sof_file, "w", encoding="utf-8") as sof_open: + # Uranium-Neon lamp (UNE) frames - elif sum(indices) > 1: - raise RuntimeError( - "More than one WAVE,UNE file " - "Please only provided a single " - "WAVE,UNE file." - ) + indices = self.header_data["DPR.TYPE"] == "WAVE,UNE" - else: - une_found = True + une_found = False - for item in self.header_data[indices]["ORIGFILE"]: - file_path = f"{self.path}/raw/{item}" - sof_open.write(f"{file_path} WAVE_UNE\n") - self._update_files("WAVE_UNE", file_path) + if sum(indices) == 0: + warnings.warn( + "The 'raw' folder does not contain a DPR.TYPE=WAVE,UNE file." + ) - # Fabry Pérot Etalon (FPET) frames + elif sum(indices) > 1: + raise RuntimeError( + "More than one WAVE,UNE file " + "Please only provided a single " + "WAVE,UNE file." + ) - indices = self.header_data["DPR.TYPE"] == "WAVE,FPET" + else: + une_found = True - fpet_found = False + for item in self.header_data[indices]["ORIGFILE"]: + file_path = f"{self.path}/raw/{item}" + sof_open.write(f"{file_path} WAVE_UNE\n") + self._update_files("WAVE_UNE", file_path) - if sum(indices) == 0: - indices = self.header_data["OBJECT"] == "WAVE,FPET" + # Fabry Pérot Etalon (FPET) frames - if sum(indices) == 0: - warnings.warn( - "The 'raw' folder does not contain a DPR.TYPE=WAVE,FPET file." - ) + indices = self.header_data["DPR.TYPE"] == "WAVE,FPET" - elif sum(indices) > 1: - raise RuntimeError( - "More than one WAVE,FPET file " - "Please only provided a single " - "WAVE,FPET file." - ) + fpet_found = False - else: - fpet_found = True + if sum(indices) == 0: + indices = self.header_data["OBJECT"] == "WAVE,FPET" - for item in self.header_data[indices]["ORIGFILE"]: - file_path = f"{self.path}/raw/{item}" - sof_open.write(f"{file_path} WAVE_FPET\n") - self._update_files("WAVE_FPET", file_path) + if sum(indices) == 0: + warnings.warn( + "The 'raw' folder does not contain a DPR.TYPE=WAVE,FPET file." + ) - # Find TraceWave file + elif sum(indices) > 1: + raise RuntimeError( + "More than one WAVE,FPET file " + "Please only provided a single " + "WAVE,FPET file." + ) - file_found = False + else: + fpet_found = True - if "UTIL_TRACE_TW" in self.file_dict: - for key in self.file_dict["UTIL_TRACE_TW"]: - if not file_found: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_TRACE_TW" - ) - sof_open.write(f"{key} UTIL_TRACE_TW\n") - file_found = True + for item in self.header_data[indices]["ORIGFILE"]: + file_path = f"{self.path}/raw/{item}" + sof_open.write(f"{file_path} WAVE_FPET\n") + self._update_files("WAVE_FPET", file_path) - if "CAL_WAVE_TW" in self.file_dict: - for key in self.file_dict["CAL_WAVE_TW"]: - if not file_found: - file_name = key.split("/")[-2:] - print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_WAVE_TW") - sof_open.write(f"{key} CAL_WAVE_TW\n") - file_found = True + # Find TraceWave file - if "CAL_FLAT_TW" in self.file_dict: - for key in self.file_dict["CAL_FLAT_TW"]: - if not file_found: - file_name = key.split("/")[-2:] - print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_TW") - sof_open.write(f"{key} CAL_FLAT_TW\n") - file_found = True + file_found = False - if not file_found: - raise RuntimeError("Could not find a TraceWave file.") + if "UTIL_TRACE_TW" in self.file_dict: + for key in self.file_dict["UTIL_TRACE_TW"]: + if not file_found: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_TRACE_TW" + ) + sof_open.write(f"{key} UTIL_TRACE_TW\n") + file_found = True - # Find emission lines file + if "CAL_WAVE_TW" in self.file_dict: + for key in self.file_dict["CAL_WAVE_TW"]: + if not file_found: + file_name = key.split("/")[-2:] + print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_WAVE_TW") + sof_open.write(f"{key} CAL_WAVE_TW\n") + file_found = True - file_found = False + if "CAL_FLAT_TW" in self.file_dict: + for key in self.file_dict["CAL_FLAT_TW"]: + if not file_found: + file_name = key.split("/")[-2:] + print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_TW") + sof_open.write(f"{key} CAL_FLAT_TW\n") + file_found = True - if "EMISSION_LINES" in self.file_dict: - for key in self.file_dict["EMISSION_LINES"]: - if not file_found: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} EMISSION_LINES" - ) - sof_open.write(f"{key} EMISSION_LINES\n") - file_found = True + if not file_found: + raise RuntimeError("Could not find a TraceWave file.") - if not file_found: - raise RuntimeError("Could not find an emission lines file.") + # Find emission lines file + + file_found = False + + if "EMISSION_LINES" in self.file_dict: + for key in self.file_dict["EMISSION_LINES"]: + if not file_found: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} EMISSION_LINES" + ) + sof_open.write(f"{key} EMISSION_LINES\n") + file_found = True + + if not file_found: + raise RuntimeError("Could not find an emission lines file.") + + else: + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -1970,7 +2059,7 @@ def cal_wave(self, verbose: bool = True) -> None: json.dump(self.file_dict, json_file, indent=4) @typechecked - def util_calib(self, calib_type: str, verbose: bool = True) -> None: + def util_calib(self, calib_type: str, verbose: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_util_calib``. @@ -1980,6 +2069,12 @@ def util_calib(self, calib_type: str, verbose: bool = True) -> None: Calibration type ("flat", "une", "fpet", "nodding"). verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -2004,6 +2099,10 @@ def util_calib(self, calib_type: str, verbose: bool = True) -> None: else: raise RuntimeError("The argument of 'calib_type' is not recognized.") + print(f"Calibration type: {calib_type}") + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + # Create output folder output_dir = self.calib_folder / f"util_calib_{calib_type}" @@ -2057,9 +2156,9 @@ def util_calib(self, calib_type: str, verbose: bool = True) -> None: unique_dit.add(item) if len(unique_dit) == 0: - print("Unique DIT values: none") + print("\nUnique DIT values: none") else: - print(f"Unique DIT values: {unique_dit}\n") + print(f"\nUnique DIT values: {unique_dit}") unique_dit = list(unique_dit) @@ -2082,125 +2181,144 @@ def util_calib(self, calib_type: str, verbose: bool = True) -> None: science_idx = np.where(self.header_data["DPR.CATG"] == "SCIENCE")[0] wlen_id = self.header_data["INS.WLEN.ID"][science_idx[0]] - print(f"Creating SOF file for DIT={dit_item}:") + # Create SOF file - sof_file = Path(self.path / f"calib/util_calib_{calib_type}/files.sof") + sof_file = Path(self.path / f"calib/util_calib_{calib_type}/files_dit{dit_item:.2f}.sof") - with open(sof_file, "w", encoding="utf-8") as sof_open: - for item in self.header_data[indices]["ORIGFILE"]: - index = self.header_data.index[self.header_data["ORIGFILE"] == item][0] - calib_dit = self.header_data.iloc[index]["DET.SEQ1.DIT"] + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'util_calib' has not been " + "previously executed so forcing " + "'create_sof' to True.") - if calib_dit == dit_item: - file_path = f"{self.path}/raw/{item}" - header_tmp = fits.getheader(file_path) + create_sof = True - if calib_type == "flat": - sof_open.write(f"{file_path} FLAT\n") - self._update_files("FLAT", file_path) + if create_sof: + print(f"\nCreating SOF file for DIT={dit_item}:") - elif calib_type == "une": - sof_open.write(f"{file_path} WAVE_UNE\n") - self._update_files("WAVE_UNE", file_path) + with open(sof_file, "w", encoding="utf-8") as sof_open: + for item in self.header_data[indices]["ORIGFILE"]: + index = self.header_data.index[self.header_data["ORIGFILE"] == item][0] + calib_dit = self.header_data.iloc[index]["DET.SEQ1.DIT"] - elif calib_type == "fpet": - sof_open.write(f"{file_path} WAVE_FPET\n") - self._update_files("WAVE_FPET", file_path) + if calib_dit == dit_item: + file_path = f"{self.path}/raw/{item}" + header_tmp = fits.getheader(file_path) - elif calib_type == "nodding": - if "ESO DPR TECH" in header_tmp: - if header_tmp["ESO DPR TECH"] == "SPECTRUM,NODDING,OTHER": - sof_open.write(f"{file_path} OBS_NODDING_OTHER\n") - self._update_files("OBS_NODDING_OTHER", file_path) + if calib_type == "flat": + sof_open.write(f"{file_path} FLAT\n") + self._update_files("FLAT", file_path) - elif ( - header_tmp["ESO DPR TECH"] == "SPECTRUM,NODDING,JITTER" - ): - sof_open.write(f"{file_path} OBS_NODDING_JITTER\n") - self._update_files("OBS_NODDING_JITTER", file_path) + elif calib_type == "une": + sof_open.write(f"{file_path} WAVE_UNE\n") + self._update_files("WAVE_UNE", file_path) - # Find master dark + elif calib_type == "fpet": + sof_open.write(f"{file_path} WAVE_FPET\n") + self._update_files("WAVE_FPET", file_path) - file_found = False + elif calib_type == "nodding": + if "ESO DPR TECH" in header_tmp: + if header_tmp["ESO DPR TECH"] == "SPECTRUM,NODDING,OTHER": + sof_open.write(f"{file_path} OBS_NODDING_OTHER\n") + self._update_files("OBS_NODDING_OTHER", file_path) - if "CAL_DARK_MASTER" in self.file_dict: - for key, value in self.file_dict["CAL_DARK_MASTER"].items(): - if not file_found and value["DIT"] == dit_item: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_MASTER" - ) - sof_open.write(f"{key} CAL_DARK_MASTER\n") + elif ( + header_tmp["ESO DPR TECH"] == "SPECTRUM,NODDING,JITTER" + ): + sof_open.write(f"{file_path} OBS_NODDING_JITTER\n") + self._update_files("OBS_NODDING_JITTER", file_path) + + # Find master dark + + file_found = False + + if "CAL_DARK_MASTER" in self.file_dict: + for key, value in self.file_dict["CAL_DARK_MASTER"].items(): + if not file_found and value["DIT"] == dit_item: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_MASTER" + ) + sof_open.write(f"{key} CAL_DARK_MASTER\n") + file_found = True + + if not file_found: + self._download_archive("DARK", dit_item) + + # Find bad pixel map + + file_found = False + + if "CAL_DARK_BPM" in self.file_dict: + bpm_file = self.select_bpm(wlen_id, dit_item) + + if bpm_file is not None: + file_name = bpm_file.split("/")[-2:] + print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM") + sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") file_found = True - if not file_found: - self._download_archive("DARK", dit_item) + if not file_found: + warnings.warn(f"There is not a bad pixel map with DIT = {dit_item} s.") - # Find bad pixel map + # Find UTIL_MASTER_FLAT or CAL_FLAT_MASTER file + # when DPR.TYPE is WAVE,UNE or WAVE,FPET - file_found = False + if calib_type in ["une", "fpet"]: + file_found = False - if "CAL_DARK_BPM" in self.file_dict: - bpm_file = self.select_bpm(wlen_id, dit_item) + if "UTIL_MASTER_FLAT" in self.file_dict: + for key, value in self.file_dict["UTIL_MASTER_FLAT"].items(): + if not file_found: + # The FLAT correction is not applied when using + # the UTIL_MASTER_FLAT keyword in the SOF file + file_name = key.split("/")[-2:] + print( + # f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_MASTER_FLAT" + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_MASTER" + ) + # sof_open.write(f"{key} UTIL_MASTER_FLAT\n") + sof_open.write(f"{key} CAL_FLAT_MASTER\n") + file_found = True - if bpm_file is not None: - file_name = bpm_file.split("/")[-2:] - print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM") - sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") - file_found = True + if "CAL_FLAT_MASTER" in self.file_dict: + for key, value in self.file_dict["CAL_FLAT_MASTER"].items(): + if not file_found: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_MASTER" + ) + sof_open.write(f"{key} CAL_FLAT_MASTER\n") + file_found = True - if not file_found: - warnings.warn(f"There is not a bad pixel map with DIT = {dit_item} s.") + if not file_found: + warnings.warn( + "The CAL_FLAT_MASTER file is not found in " + "the 'calib' folder so continuing without " + "applying a master flat." + ) - # Find UTIL_MASTER_FLAT or CAL_FLAT_MASTER file - # when DPR.TYPE is WAVE,UNE or WAVE,FPET + # Find CAL_DETLIN_COEFFS file - if calib_type in ["une", "fpet"]: file_found = False - if "UTIL_MASTER_FLAT" in self.file_dict: - for key, value in self.file_dict["UTIL_MASTER_FLAT"].items(): - if not file_found: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_MASTER_FLAT" - ) - sof_open.write(f"{key} UTIL_MASTER_FLAT\n") - file_found = True - - if "CAL_FLAT_MASTER" in self.file_dict: - for key, value in self.file_dict["CAL_FLAT_MASTER"].items(): + if "CAL_DETLIN_COEFFS" in self.file_dict: + for key, value in self.file_dict["CAL_DETLIN_COEFFS"].items(): if not file_found: file_name = key.split("/")[-2:] print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_MASTER" + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DETLIN_COEFFS" ) - sof_open.write(f"{key} CAL_FLAT_MASTER\n") + sof_open.write(f"{key} CAL_DETLIN_COEFFS\n") file_found = True if not file_found: - warnings.warn( - "The CAL_FLAT_MASTER file is not found in " - "the 'calib' folder so continuing without " - "applying a master flat." - ) - - # Find CAL_DETLIN_COEFFS file - - file_found = False - - if "CAL_DETLIN_COEFFS" in self.file_dict: - for key, value in self.file_dict["CAL_DETLIN_COEFFS"].items(): - if not file_found: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DETLIN_COEFFS" - ) - sof_open.write(f"{key} CAL_DETLIN_COEFFS\n") - file_found = True + warnings.warn("Could not find CAL_DETLIN_COEFFS.") - if not file_found: - warnings.warn("Could not find CAL_DETLIN_COEFFS.") + else: + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -2229,11 +2347,11 @@ def util_calib(self, calib_type: str, verbose: bool = True) -> None: subprocess.run(esorex, cwd=output_dir, stdout=stdout, check=True) if not verbose: - print(" [DONE]\n") + print(" [DONE]") # Update file dictionary with UTIL_CALIB file - print("Output files:") + print("\nOutput files:") if calib_type == "nodding": fits_files = sorted( @@ -2261,7 +2379,7 @@ def util_calib(self, calib_type: str, verbose: bool = True) -> None: json.dump(self.file_dict, json_file, indent=4) @typechecked - def util_trace(self, plot_trace: bool = False, verbose: bool = True) -> None: + def util_trace(self, plot_trace: bool = False, verbose: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_util_trace``. @@ -2271,6 +2389,12 @@ def util_trace(self, plot_trace: bool = False, verbose: bool = True) -> None: Plot the traces of the spectral orders on the raw data. verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -2282,6 +2406,10 @@ def util_trace(self, plot_trace: bool = False, verbose: bool = True) -> None: "Trace the spectral orders", recipe_name="cr2res_util_trace" ) + print(f"Plot trace: {plot_trace}") + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + # Create output folder output_dir = self.calib_folder / "util_trace" @@ -2291,38 +2419,51 @@ def util_trace(self, plot_trace: bool = False, verbose: bool = True) -> None: # Create SOF file - print("Creating SOF file:") - sof_file = Path(self.path / "calib/util_trace/files.sof") - # Find UTIL_CALIB file + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'util_trace' has not been " + "previously executed so forcing " + "'create_sof' to True.") + + create_sof = True file_found = False - if "UTIL_CALIB" in self.file_dict: - for key in self.file_dict["UTIL_CALIB"]: - if not file_found: - with open(sof_file, "w", encoding="utf-8") as sof_open: - file_name = key.split("/")[-2:] - print(f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_CALIB") - sof_open.write(f"{key} UTIL_CALIB\n") - file_found = True + if create_sof: + print("\nCreating SOF file:") - # Otherwise use raw FLAT files + # Find UTIL_CALIB file - if not file_found: - indices = self.header_data["DPR.TYPE"] == "FLAT" + if "UTIL_CALIB" in self.file_dict: + for key in self.file_dict["UTIL_CALIB"]: + if not file_found: + with open(sof_file, "w", encoding="utf-8") as sof_open: + file_name = key.split("/")[-2:] + print(f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_CALIB") + sof_open.write(f"{key} UTIL_CALIB\n") + file_found = True - with open(sof_file, "w", encoding="utf-8") as sof_open: - for item in self.header_data[indices]["ORIGFILE"]: - print(f" - raw/{item} FLAT") - sof_open.write(f"{self.path}/raw/{item} FLAT\n") + # Otherwise use raw FLAT files - warnings.warn( - "There is not a UTIL_CALIB file so using raw FLAT " - "files instead. To use the UTIL_CALIB file, please " - "first run the util_calib method." - ) + if not file_found: + indices = self.header_data["DPR.TYPE"] == "FLAT" + + with open(sof_file, "w", encoding="utf-8") as sof_open: + for item in self.header_data[indices]["ORIGFILE"]: + print(f" - raw/{item} FLAT") + sof_open.write(f"{self.path}/raw/{item} FLAT\n") + + warnings.warn( + "There is not a UTIL_CALIB file so using raw FLAT " + "files instead. To use the UTIL_CALIB file, please " + "first run the util_calib method." + ) + + else: + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -2351,13 +2492,18 @@ def util_trace(self, plot_trace: bool = False, verbose: bool = True) -> None: subprocess.run(esorex, cwd=output_dir, stdout=stdout, check=True) if not verbose: - print(" [DONE]\n") + print(" [DONE]") # Update file dictionary with UTIL_TRACE_TW file - print("Output files:") + print("\nOutput files:") - if file_found: + fits_file = ( + f"{self.path}/calib/util_trace/" + + "cr2res_util_calib_calibrated_collapsed_tw.fits" + ) + + if Path(fits_file).exists(): fits_file = ( f"{self.path}/calib/util_trace/" + "cr2res_util_calib_calibrated_collapsed_tw.fits" @@ -2382,7 +2528,7 @@ def util_trace(self, plot_trace: bool = False, verbose: bool = True) -> None: json.dump(self.file_dict, json_file, indent=4) @typechecked - def util_slit_curv(self, plot_trace: bool = False, verbose: bool = True) -> None: + def util_slit_curv(self, plot_trace: bool = False, verbose: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_util_slit_curv``. @@ -2392,6 +2538,12 @@ def util_slit_curv(self, plot_trace: bool = False, verbose: bool = True) -> None Plot the traces of the spectral orders on the raw data. verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -2403,6 +2555,10 @@ def util_slit_curv(self, plot_trace: bool = False, verbose: bool = True) -> None "Compute slit curvature", recipe_name="cr2res_util_slit_curv" ) + print(f"Plot trace: {plot_trace}") + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + # Create output folder output_dir = self.calib_folder / "util_slit_curv" @@ -2412,56 +2568,69 @@ def util_slit_curv(self, plot_trace: bool = False, verbose: bool = True) -> None # Create SOF file - print("Creating SOF file:") - sof_file = Path(self.path / "calib/util_slit_curv/files.sof") - # Find Fabry Pérot Etalon (FPET) files + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'util_slit_curv' has not been " + "previously executed so forcing " + "'create_sof' to True.") - indices = self.header_data["DPR.TYPE"] == "WAVE,FPET" + create_sof = True - if sum(indices) == 0: - indices = self.header_data["OBJECT"] == "WAVE,FPET" + if create_sof: + print("\nCreating SOF file:") - if sum(indices) == 0: - raise RuntimeError( - "The 'raw' folder does not contain a DPR.TYPE=WAVE,FPET file." - ) + # Find Fabry Pérot Etalon (FPET) files - if sum(indices) > 1: - raise RuntimeError( - "More than one WAVE,FPET file is present in the " - "'raw' folder. Please only provided a single " - "WAVE,FPET file." - ) + indices = self.header_data["DPR.TYPE"] == "WAVE,FPET" - with open(sof_file, "w", encoding="utf-8") as sof_open: - for item in self.header_data[indices]["ORIGFILE"]: - file_path = f"{self.path}/raw/{item}" - sof_open.write(f"{file_path} WAVE_FPET\n") - self._update_files("WAVE_FPET", file_path) + if sum(indices) == 0: + indices = self.header_data["OBJECT"] == "WAVE,FPET" - # Find UTIL_TRACE_TW file + if sum(indices) == 0: + raise RuntimeError( + "The 'raw' folder does not contain a DPR.TYPE=WAVE,FPET file." + ) - file_found = False + if sum(indices) > 1: + raise RuntimeError( + "More than one WAVE,FPET file is present in the " + "'raw' folder. Please only provided a single " + "WAVE,FPET file." + ) - if "UTIL_TRACE_TW" in self.file_dict: - for key in self.file_dict["UTIL_TRACE_TW"]: - if not file_found: - with open(sof_file, "a", encoding="utf-8") as sof_open: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_TRACE_TW" - ) - sof_open.write(f"{key} UTIL_TRACE_TW\n") - file_found = True + with open(sof_file, "w", encoding="utf-8") as sof_open: + for item in self.header_data[indices]["ORIGFILE"]: + file_path = f"{self.path}/raw/{item}" + sof_open.write(f"{file_path} WAVE_FPET\n") + self._update_files("WAVE_FPET", file_path) + + # Find UTIL_TRACE_TW file + + file_found = False + + if "UTIL_TRACE_TW" in self.file_dict: + for key in self.file_dict["UTIL_TRACE_TW"]: + if not file_found: + with open(sof_file, "a", encoding="utf-8") as sof_open: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_TRACE_TW" + ) + sof_open.write(f"{key} UTIL_TRACE_TW\n") + file_found = True + + else: + raise RuntimeError( + "The UTIL_TRACE_TW file is not found in " + "the 'calib' folder. Please first run " + "the util_trace method." + ) else: - raise RuntimeError( - "The UTIL_TRACE_TW file is not found in " - "the 'calib' folder. Please first run " - "the util_trace method." - ) + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -2490,11 +2659,11 @@ def util_slit_curv(self, plot_trace: bool = False, verbose: bool = True) -> None subprocess.run(esorex, cwd=output_dir, stdout=stdout, check=True) if not verbose: - print(" [DONE]\n") + print(" [DONE]") # Update file dictionary with UTIL_SLIT_CURV_TW file - print("Output files:") + print("\nOutput files:") fits_file = f"{output_dir}/cr2res_util_calib_calibrated_collapsed_tw_tw.fits" @@ -2518,7 +2687,7 @@ def util_slit_curv(self, plot_trace: bool = False, verbose: bool = True) -> None json.dump(self.file_dict, json_file, indent=4) @typechecked - def util_extract(self, calib_type: str, verbose: bool = True) -> None: + def util_extract(self, calib_type: str, verbose: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_util_extract``. @@ -2528,6 +2697,12 @@ def util_extract(self, calib_type: str, verbose: bool = True) -> None: Calibration type ("flat", "une", or "fpet"). verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -2553,6 +2728,10 @@ def util_extract(self, calib_type: str, verbose: bool = True) -> None: else: raise RuntimeError("The argument of 'calib_type' is not recognized.") + print(f"Calibration type: {calib_type}") + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + # Create output folder output_dir = self.calib_folder / f"util_extract_{calib_type}" @@ -2562,91 +2741,104 @@ def util_extract(self, calib_type: str, verbose: bool = True) -> None: # Create SOF file - print("Creating SOF file:") - sof_file = Path(self.path / f"calib/util_extract_{calib_type}/files.sof") - # Find UTIL_CALIB file + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'util_extract' has not been " + "previously executed so forcing " + "'create_sof' to True.") - file_found = False + create_sof = True + + if create_sof: + print("\nCreating SOF file:") + + # Find UTIL_CALIB file + + file_found = False + + if "UTIL_CALIB" in self.file_dict: + for key in self.file_dict["UTIL_CALIB"]: + if not file_found: + if key.split("/")[-2] == f"util_calib_{calib_type}": + with open(sof_file, "w", encoding="utf-8") as sof_open: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_CALIB" + ) + sof_open.write(f"{key} UTIL_CALIB\n") + file_found = True - if "UTIL_CALIB" in self.file_dict: - for key in self.file_dict["UTIL_CALIB"]: if not file_found: - if key.split("/")[-2] == f"util_calib_{calib_type}": - with open(sof_file, "w", encoding="utf-8") as sof_open: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_CALIB" - ) - sof_open.write(f"{key} UTIL_CALIB\n") - file_found = True + raise RuntimeError( + f"The UTIL_CALIB file for calib_type={calib_type} " + f"is not found in the 'calib/util_calib_{calib_type}' " + f"folder. Please first run the util_calib method." + ) - if not file_found: + else: raise RuntimeError( - f"The UTIL_CALIB file for calib_type={calib_type} " - f"is not found in the 'calib/util_calib_{calib_type}' " - f"folder. Please first run the util_calib method." + "The UTIL_CALIB file is not found in the 'calib' " + "folder. Please first run the util_calib method." ) - else: - raise RuntimeError( - "The UTIL_CALIB file is not found in the 'calib' " - "folder. Please first run the util_calib method." - ) + # Find UTIL_TRACE_TW file - # Find UTIL_TRACE_TW file + # file_found = False + # + # if "UTIL_TRACE_TW" in self.file_dict: + # for key, value in self.file_dict["UTIL_TRACE_TW"].items(): + # if not file_found: + # with open(sof_file, "a", encoding="utf-8") as sof_open: + # file_name = key.split("/")[-2:] + # print(f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_TRACE_TW") + # sof_open.write(f"{key} UTIL_TRACE_TW\n") + # file_found = True + # + # else: + # raise RuntimeError( + # "The UTIL_TRACE_TW file is not found in " + # "the 'calib' folder. Please first run " + # "the util_trace method." + # ) - # file_found = False - # - # if "UTIL_TRACE_TW" in self.file_dict: - # for key, value in self.file_dict["UTIL_TRACE_TW"].items(): - # if not file_found: - # with open(sof_file, "a", encoding="utf-8") as sof_open: - # file_name = key.split("/")[-2:] - # print(f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_TRACE_TW") - # sof_open.write(f"{key} UTIL_TRACE_TW\n") - # file_found = True - # - # else: - # raise RuntimeError( - # "The UTIL_TRACE_TW file is not found in " - # "the 'calib' folder. Please first run " - # "the util_trace method." - # ) + # Find UTIL_WAVE_TW or UTIL_SLIT_CURV_TW file - # Find UTIL_WAVE_TW or UTIL_SLIT_CURV_TW file + file_found = False - file_found = False + if "UTIL_WAVE_TW" in self.file_dict: + for key in self.file_dict["UTIL_WAVE_TW"]: + if not file_found and key.split("/")[-2] == "util_wave_une": + with open(sof_file, "a", encoding="utf-8") as sof_open: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_WAVE_TW" + ) + sof_open.write(f"{key} UTIL_WAVE_TW\n") + file_found = True - if "UTIL_WAVE_TW" in self.file_dict: - for key in self.file_dict["UTIL_WAVE_TW"]: - if not file_found and key.split("/")[-2] == "util_wave_une": - with open(sof_file, "a", encoding="utf-8") as sof_open: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_WAVE_TW" - ) - sof_open.write(f"{key} UTIL_WAVE_TW\n") - file_found = True + if "UTIL_SLIT_CURV_TW" in self.file_dict: + for key in self.file_dict["UTIL_SLIT_CURV_TW"]: + if not file_found: + with open(sof_file, "a", encoding="utf-8") as sof_open: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_SLIT_CURV_TW" + ) + sof_open.write(f"{key} UTIL_SLIT_CURV_TW\n") + file_found = True - if "UTIL_SLIT_CURV_TW" in self.file_dict: - for key in self.file_dict["UTIL_SLIT_CURV_TW"]: - if not file_found: - with open(sof_file, "a", encoding="utf-8") as sof_open: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_SLIT_CURV_TW" - ) - sof_open.write(f"{key} UTIL_SLIT_CURV_TW\n") - file_found = True + if not file_found: + raise RuntimeError( + "The UTIL_SLIT_CURV_TW file is not found " + "in the 'calib' folder. Please first run " + "the util_slit_curv method." + ) - if not file_found: - raise RuntimeError( - "The UTIL_SLIT_CURV_TW file is not found " - "in the 'calib' folder. Please first run " - "the util_slit_curv method." - ) + else: + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -2677,11 +2869,11 @@ def util_extract(self, calib_type: str, verbose: bool = True) -> None: subprocess.run(esorex, cwd=output_dir, stdout=stdout, check=True) if not verbose: - print(" [DONE]\n") + print(" [DONE]") # Update file dictionary with UTIL_EXTRACT_1D file - print("Output files:") + print("\nOutput files:") fits_file = ( f"{self.path}/calib/util_extract_{calib_type}/" @@ -2714,7 +2906,7 @@ def util_extract(self, calib_type: str, verbose: bool = True) -> None: json.dump(self.file_dict, json_file, indent=4) @typechecked - def util_normflat(self, verbose: bool = True) -> None: + def util_normflat(self, verbose: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_util_normflat``. @@ -2722,6 +2914,12 @@ def util_normflat(self, verbose: bool = True) -> None: ---------- verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -2733,6 +2931,9 @@ def util_normflat(self, verbose: bool = True) -> None: "Create normalized flat field", recipe_name="cr2res_util_normflat" ) + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + # Create output folder output_dir = self.calib_folder / "util_normflat" @@ -2742,51 +2943,64 @@ def util_normflat(self, verbose: bool = True) -> None: # Create SOF file - print("Creating SOF file:") - sof_file = self.path / "calib/util_normflat/files.sof" - # Find UTIL_CALIB file + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'util_normflat' has not been " + "previously executed so forcing " + "'create_sof' to True.") - file_found = False + create_sof = True - if "UTIL_CALIB" in self.file_dict: - for key in self.file_dict["UTIL_CALIB"]: - if not file_found: - with open(sof_file, "w", encoding="utf-8") as sof_open: - file_name = key.split("/")[-2:] - print(f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_CALIB") - sof_open.write(f"{key} UTIL_CALIB\n") - file_found = True + if create_sof: + print("\nCreating SOF file:") + + # Find UTIL_CALIB file + + file_found = False + + if "UTIL_CALIB" in self.file_dict: + for key in self.file_dict["UTIL_CALIB"]: + if not file_found: + with open(sof_file, "w", encoding="utf-8") as sof_open: + file_name = key.split("/")[-2:] + print(f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_CALIB") + sof_open.write(f"{key} UTIL_CALIB\n") + file_found = True + + else: + raise RuntimeError( + "The UTIL_CALIB file is not found in " + "the 'calib' folder. Please first run " + "the util_calib method." + ) - else: - raise RuntimeError( - "The UTIL_CALIB file is not found in " - "the 'calib' folder. Please first run " - "the util_calib method." - ) + # Find UTIL_SLIT_MODEL file - # Find UTIL_SLIT_MODEL file + file_found = False - file_found = False + if "UTIL_SLIT_MODEL" in self.file_dict: + for key in self.file_dict["UTIL_SLIT_MODEL"]: + if not file_found: + with open(sof_file, "a", encoding="utf-8") as sof_open: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_SLIT_MODEL" + ) + sof_open.write(f"{key} UTIL_SLIT_MODEL\n") + file_found = True - if "UTIL_SLIT_MODEL" in self.file_dict: - for key in self.file_dict["UTIL_SLIT_MODEL"]: - if not file_found: - with open(sof_file, "a", encoding="utf-8") as sof_open: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_SLIT_MODEL" - ) - sof_open.write(f"{key} UTIL_SLIT_MODEL\n") - file_found = True + else: + raise RuntimeError( + "The UTIL_SLIT_MODEL file is not found in " + "the 'calib' folder. Please first run " + "the util_extract method." + ) else: - raise RuntimeError( - "The UTIL_SLIT_MODEL file is not found in " - "the 'calib' folder. Please first run " - "the util_extract method." - ) + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -2815,11 +3029,11 @@ def util_normflat(self, verbose: bool = True) -> None: subprocess.run(esorex, cwd=output_dir, stdout=stdout, check=True) if not verbose: - print(" [DONE]\n") + print(" [DONE]") # Update file dictionary with UTIL_MASTER_FLAT file - print("Output files:") + print("\nOutput files:") fits_file = f"{self.path}/calib/util_normflat/cr2res_util_normflat_Open_master_flat.fits" self._update_files("UTIL_MASTER_FLAT", fits_file) @@ -2837,7 +3051,7 @@ def util_normflat(self, verbose: bool = True) -> None: json.dump(self.file_dict, json_file, indent=4) @typechecked - def util_bpm_merge(self, verbose: bool = True) -> None: + def util_bpm_merge(self, verbose: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_util_bpm_merge``. @@ -2845,6 +3059,12 @@ def util_bpm_merge(self, verbose: bool = True) -> None: ---------- verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -2856,6 +3076,9 @@ def util_bpm_merge(self, verbose: bool = True) -> None: "Merge bad pixels maps", recipe_name="cr2res_util_bpm_merge" ) + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + indices = self.header_data["DPR.TYPE"] == "DARK" # Create output folder @@ -2872,9 +3095,9 @@ def util_bpm_merge(self, verbose: bool = True) -> None: unique_dit.add(item) if len(unique_dit) == 0: - print("Unique DIT values: none") + print("\nUnique DIT values: none") else: - print(f"Unique DIT values: {unique_dit}\n") + print(f"\nUnique DIT values: {unique_dit}\n") science_idx = np.where(indices)[0] @@ -2884,73 +3107,85 @@ def util_bpm_merge(self, verbose: bool = True) -> None: # Create SOF file - print("Creating SOF file:") - sof_file = self.path / "calib/util_bpm_merge/files.sof" - with open(sof_file, "w", encoding="utf-8") as sof_open: - - # Find UTIL_MASTER_FLAT file - - file_found = False + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'util_bpm_merge' has not been " + "previously executed so forcing " + "'create_sof' to True.") - if "UTIL_NORM_BPM" in self.file_dict: - for key in self.file_dict["UTIL_NORM_BPM"]: - if not file_found: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_NORM_BPM" - ) - sof_open.write(f"{key} UTIL_NORM_BPM\n") - file_found = True + create_sof = True - if not file_found: - raise RuntimeError( - "The UTIL_NORM_BPM file is not found in " - "the 'calib' folder. Please first run " - "the util_normflat method." - ) + if create_sof: + print("\nCreating SOF file:") - # Find CAL_DARK_BPM file + with open(sof_file, "w", encoding="utf-8") as sof_open: + + # Find UTIL_NORM_BPM file - for science_dit in unique_dit: - file_found = False - if "CAL_DARK_BPM" in self.file_dict: - bpm_file = self.select_bpm(science_wlen, science_dit) - - if bpm_file is not None: - file_name = bpm_file.split("/")[-2:] - print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM") - sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") - file_found = True - + if "UTIL_NORM_BPM" in self.file_dict: + for key in self.file_dict["UTIL_NORM_BPM"]: + if not file_found: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_NORM_BPM" + ) + sof_open.write(f"{key} UTIL_NORM_BPM\n") + file_found = True if not file_found: raise RuntimeError( - "The CAL_DARK_BPM file is not found in " + "The UTIL_NORM_BPM file is not found in " "the 'calib' folder. Please first run " - "the util_calib method." + "the util_normflat method." ) - # Find CAL_DETLIN_COEFFS file + # Find CAL_DARK_BPM file + + for science_dit in unique_dit: + + file_found = False - file_found = False + if "CAL_DARK_BPM" in self.file_dict: + bpm_file = self.select_bpm(science_wlen, science_dit) - if "CAL_DETLIN_BPM" in self.file_dict: - for key, value in self.file_dict["CAL_DETLIN_BPM"].items(): - if not file_found: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DETLIN_BPM" + if bpm_file is not None: + file_name = bpm_file.split("/")[-2:] + print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM") + sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") + file_found = True + + + if not file_found: + raise RuntimeError( + "The CAL_DARK_BPM file is not found in " + "the 'calib' folder. Please first run " + "the util_calib method." ) - sof_open.write(f"{key} CAL_DETLIN_BPM\n") - file_found = True - if not file_found: - warnings.warn(f"Could not find CAL_DETLIN_BPM.") - + # Find CAL_DETLIN_COEFFS file + + file_found = False + + if "CAL_DETLIN_BPM" in self.file_dict: + for key, value in self.file_dict["CAL_DETLIN_BPM"].items(): + if not file_found: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DETLIN_BPM" + ) + sof_open.write(f"{key} CAL_DETLIN_BPM\n") + file_found = True + + if not file_found: + warnings.warn(f"Could not find CAL_DETLIN_BPM.") + + else: + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -2979,11 +3214,11 @@ def util_bpm_merge(self, verbose: bool = True) -> None: subprocess.run(esorex, cwd=output_dir, stdout=stdout, check=True) if not verbose: - print(" [DONE]\n") + print(" [DONE]") - # Update file dictionary with UTIL_MASTER_FLAT file + # Update file dictionary with UTIL_BPM_MERGE file - print("Output files:") + print("\nOutput files:") fits_file = f"{self.path}/calib/util_bpm_merge/cr2res_util_bpm_merge.fits" self._update_files("UTIL_BPM_MERGE", fits_file) @@ -3001,7 +3236,7 @@ def util_bpm_merge(self, verbose: bool = True) -> None: json.dump(self.file_dict, json_file, indent=4) @typechecked - def util_genlines(self, verbose: bool = True) -> None: + def util_genlines(self, verbose: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_util_genlines``. Generate spectrum calibration FITS tables. @@ -3010,6 +3245,12 @@ def util_genlines(self, verbose: bool = True) -> None: ---------- verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -3021,6 +3262,9 @@ def util_genlines(self, verbose: bool = True) -> None: "Generate calibration lines", recipe_name="cr2res_util_genlines" ) + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + # Create output folder output_dir = self.calib_folder / "util_genlines" @@ -3117,16 +3361,29 @@ def util_genlines(self, verbose: bool = True) -> None: # Create SOF file - print("Creating SOF file:") - sof_file = self.path / "calib/util_genlines/files.sof" - with open(sof_file, "w", encoding="utf-8") as sof_open: - sof_open.write(f"{line_file} EMISSION_LINES_TXT\n") - self._update_files("EMISSION_LINES_TXT", str(line_file)) + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'util_genlines' has not been " + "previously executed so forcing " + "'create_sof' to True.") + + create_sof = True + + if create_sof: + print("\nCreating SOF file:") - sof_open.write(f"{range_file} LINES_SELECTION_TXT\n") - self._update_files("LINES_SELECTION_TXT", str(range_file)) + with open(sof_file, "w", encoding="utf-8") as sof_open: + sof_open.write(f"{line_file} EMISSION_LINES_TXT\n") + self._update_files("EMISSION_LINES_TXT", str(line_file)) + + sof_open.write(f"{range_file} LINES_SELECTION_TXT\n") + self._update_files("LINES_SELECTION_TXT", str(range_file)) + + else: + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -3155,11 +3412,11 @@ def util_genlines(self, verbose: bool = True) -> None: subprocess.run(esorex, cwd=output_dir, stdout=stdout, check=True) if not verbose: - print(" [DONE]\n") + print(" [DONE]") # Update file dictionary with EMISSION_LINES files - print("Output files:") + print("\nOutput files:") fits_file = output_dir / line_file.with_suffix(".fits").name self._update_files("EMISSION_LINES", str(fits_file)) @@ -3190,6 +3447,7 @@ def util_wave( poly_deg: int = 0, wl_err: float = 0.1, verbose: bool = True, + create_sof: bool = True ) -> None: """ Method for running ``cr2res_util_wave``. @@ -3204,6 +3462,12 @@ def util_wave( Estimate wavelength error (in nm). verbose : bool Print output produced by ``esorex``. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -3221,6 +3485,21 @@ def util_wave( f"{labels[calib_type]} wavelength solution", recipe_name="cr2res_util_wave" ) + # Flag for re-using higher degrees of first guess + # with cross-correlation done by EsoRex + + if poly_deg == 0: + keep_high_deg = "TRUE" + else: + keep_high_deg = "FALSE" + + print(f"Calibration type: {calib_type}") + print(f"Polynomial degree: {poly_deg}") + print(f"Keep higher degrees: {keep_high_deg}") + print(f"Wavelength error (nm): {wl_err}") + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + # Create output folder output_dir = self.calib_folder / f"util_wave_{calib_type}" @@ -3229,125 +3508,138 @@ def util_wave( # Create SOF file - print("Creating SOF file:") - sof_file = Path(self.path / f"calib/util_wave_{calib_type}/files.sof") - # Find EMISSION_LINES file - - file_found = False - - if calib_type == "une": - if "EMISSION_LINES" in self.file_dict: - for key in self.file_dict["EMISSION_LINES"]: - if not file_found: - with open(sof_file, "w", encoding="utf-8") as sof_open: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} EMISSION_LINES" - ) - sof_open.write(f"{key} EMISSION_LINES\n") - file_found = True + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'util_wave' has not been " + "previously executed so forcing " + "'create_sof' to True.") - else: - raise RuntimeError( - "The EMISSION_LINES file is not found in " - "the 'calib/genlines' folder. Please first " - "run the util_genlines method." - ) + create_sof = True - else: - with open(sof_file, "w", encoding="utf-8") as sof_open: - pass + if create_sof: + print("\nCreating SOF file:") - # Find UTIL_CALIB file + # Find EMISSION_LINES file - file_found = False + file_found = False - if "UTIL_EXTRACT_1D" in self.file_dict: - for key in self.file_dict["UTIL_EXTRACT_1D"]: - if ( - not file_found - and key.split("/")[-2] == f"util_extract_{calib_type}" - ): - with open(sof_file, "a", encoding="utf-8") as sof_open: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_EXTRACT_1D" - ) - sof_open.write(f"{key} UTIL_EXTRACT_1D\n") - file_found = True + if calib_type == "une": + if "EMISSION_LINES" in self.file_dict: + for key in self.file_dict["EMISSION_LINES"]: + if not file_found: + with open(sof_file, "w", encoding="utf-8") as sof_open: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} EMISSION_LINES" + ) + sof_open.write(f"{key} EMISSION_LINES\n") + file_found = True - if not file_found: - raise RuntimeError( - f"The UTIL_EXTRACT_1D file is not found in the " - f"'calib/util_extract_{calib_type}' folder. Please first " - f"run the util_extract method with calib_type='une'." - ) + else: + raise RuntimeError( + "The EMISSION_LINES file is not found in " + "the 'calib/genlines' folder. Please first " + "run the util_genlines method." + ) - else: - raise RuntimeError( - f"The UTIL_EXTRACT_1D file is not found in the " - f"'calib/util_extract_{calib_type}' folder. Please first " - f"run the util_extract method with calib_type='une'." - ) + else: + with open(sof_file, "w", encoding="utf-8") as sof_open: + pass - # Find UTIL_SLIT_CURV_TW file + # Find UTIL_CALIB file - if calib_type == "une": file_found = False - if "UTIL_WAVE_TW" in self.file_dict and poly_deg > 0: - for key in self.file_dict["UTIL_WAVE_TW"]: - if not file_found and key.split("/")[-2] == "util_wave_une": + if "UTIL_EXTRACT_1D" in self.file_dict: + for key in self.file_dict["UTIL_EXTRACT_1D"]: + if ( + not file_found + and key.split("/")[-2] == f"util_extract_{calib_type}" + ): with open(sof_file, "a", encoding="utf-8") as sof_open: file_name = key.split("/")[-2:] print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_SLIT_CURV_TW" + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_EXTRACT_1D" ) - sof_open.write(f"{key} UTIL_SLIT_CURV_TW\n") + sof_open.write(f"{key} UTIL_EXTRACT_1D\n") file_found = True - if "UTIL_SLIT_CURV_TW" in self.file_dict: - for key in self.file_dict["UTIL_SLIT_CURV_TW"]: - if not file_found: - with open(sof_file, "a", encoding="utf-8") as sof_open: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_SLIT_CURV_TW" - ) - sof_open.write(f"{key} UTIL_SLIT_CURV_TW\n") - file_found = True + if not file_found: + raise RuntimeError( + f"The UTIL_EXTRACT_1D file is not found in the " + f"'calib/util_extract_{calib_type}' folder. Please first " + f"run the util_extract method with calib_type='une'." + ) - if not file_found: + else: raise RuntimeError( - "The UTIL_SLIT_CURV_TW file is not found " - "in the 'calib/util_slit_curv' folder. " - "Please first run the util_slit_curv method." + f"The UTIL_EXTRACT_1D file is not found in the " + f"'calib/util_extract_{calib_type}' folder. Please first " + f"run the util_extract method with calib_type='une'." ) - # Find UTIL_WAVE_TW file + # Find UTIL_SLIT_CURV_TW file - if calib_type == "fpet": - file_found = False + if calib_type == "une": + file_found = False - if "UTIL_WAVE_TW" in self.file_dict: - for key in self.file_dict["UTIL_WAVE_TW"]: - if not file_found and key.split("/")[-2] == "util_wave_une": - with open(sof_file, "a", encoding="utf-8") as sof_open: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_WAVE_TW" - ) - sof_open.write(f"{key} UTIL_WAVE_TW\n") - file_found = True + if "UTIL_WAVE_TW" in self.file_dict and poly_deg > 0: + for key in self.file_dict["UTIL_WAVE_TW"]: + if not file_found and key.split("/")[-2] == "util_wave_une": + with open(sof_file, "a", encoding="utf-8") as sof_open: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_SLIT_CURV_TW" + ) + sof_open.write(f"{key} UTIL_SLIT_CURV_TW\n") + file_found = True - else: - raise RuntimeError( - "The UTIL_WAVE_TW file is not found in the " - "'calib/util_wave_une' folder. Please first " - "run the util_wave method with calib_type='une'." - ) + if "UTIL_SLIT_CURV_TW" in self.file_dict: + for key in self.file_dict["UTIL_SLIT_CURV_TW"]: + if not file_found: + with open(sof_file, "a", encoding="utf-8") as sof_open: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_SLIT_CURV_TW" + ) + sof_open.write(f"{key} UTIL_SLIT_CURV_TW\n") + file_found = True + + if not file_found: + raise RuntimeError( + "The UTIL_SLIT_CURV_TW file is not found " + "in the 'calib/util_slit_curv' folder. " + "Please first run the util_slit_curv method." + ) + + # Find UTIL_WAVE_TW file + + if calib_type == "fpet": + file_found = False + + if "UTIL_WAVE_TW" in self.file_dict: + for key in self.file_dict["UTIL_WAVE_TW"]: + if not file_found and key.split("/")[-2] == "util_wave_une": + with open(sof_file, "a", encoding="utf-8") as sof_open: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_WAVE_TW" + ) + sof_open.write(f"{key} UTIL_WAVE_TW\n") + file_found = True + + else: + raise RuntimeError( + "The UTIL_WAVE_TW file is not found in the " + "'calib/util_wave_une' folder. Please first " + "run the util_wave method with calib_type='une'." + ) + + else: + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -3355,19 +3647,11 @@ def util_wave( # Run EsoRex + print() + if calib_type == "fpet": wl_err = -1.0 - if poly_deg == 0: - keep_high_deg = "TRUE" - else: - keep_high_deg = "FALSE" - - print("\nConfiguration:") - print(f" - Polynomial degree = {poly_deg}") - print(f" - Wavelength error (nm) = {wl_err}") - print(f" - Keep higher degrees = {keep_high_deg}\n") - config_file = self.config_folder / f"util_wave_{calib_type}.rc" esorex = [ @@ -3392,11 +3676,9 @@ def util_wave( if not verbose: print(" [DONE]") - print() - # Update file dictionary with UTIL_WAVE_TW file - print("Output files:") + print("\nOutput files:") fits_file = ( f"{self.path}/calib/util_wave_{calib_type}/" @@ -3455,7 +3737,7 @@ def _find_master_flat(self) -> Tuple[str, str]: assert True in which_key, "No master flat found" if all(which_key): - print("WARNING: Multiple master flats found. Using UTIL_MASTER_FLAT") + warnings.warn("Multiple master flats found. Using UTIL_MASTER_FLAT.") which_key[1] = False good_key = master_flat_keys[which_key.index(True)] @@ -3562,7 +3844,7 @@ def _find(self, key: str, key_type: Optional[str] = None) -> Dict[str, List[str] return self.file_dict[key] @typechecked - def obs_staring(self, verbose: bool = True, check_existing: bool = True) -> None: + def obs_staring(self, verbose: bool = True, check_existing: bool = True, create_sof: bool = True) -> None: """ Method for running ``cr2res_obs_staring``. @@ -3573,6 +3855,12 @@ def obs_staring(self, verbose: bool = True, check_existing: bool = True) -> None check_existing : bool Search for existing extracted spectra in the product folder. Avoids re-reducing existing files. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns -------- @@ -3582,6 +3870,9 @@ def obs_staring(self, verbose: bool = True, check_existing: bool = True) -> None self._print_section("Process staring frames", recipe_name="cr2res_obs_staring") + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + # indices = self.header_data["DPR.CATG"] == "SCIENCE" # Create output folder @@ -3599,7 +3890,7 @@ def obs_staring(self, verbose: bool = True, check_existing: bool = True) -> None science_wlen = self.header_data["INS.WLEN.ID"][science_idx[0]] science_dit = self.header_data["DET.SEQ1.DIT"][science_idx[0]] - print(f"Number of exposures: {science_idx.size}") + print(f"\nNumber of exposures: {science_idx.size}") # Find master FLAT master_flat_filename, master_flat_label = self._find_master_flat() @@ -3618,9 +3909,11 @@ def obs_staring(self, verbose: bool = True, check_existing: bool = True) -> None if check_existing: folder = self.product_folder / "obs_staring" name_pattern = "cr2res_obs_staring_extracted*.fits" + files_i = sorted( [int(x.stem.split("_")[-1]) for x in folder.glob(name_pattern)] ) + if len(files_i) > 1: start = int(files_i[-1] + 1) print(f"Found {len(files_i)} files") @@ -3628,34 +3921,52 @@ def obs_staring(self, verbose: bool = True, check_existing: bool = True) -> None # Iterate over exposures for i, i_row in enumerate(self.header_data.index[science_idx], start=1): - if i < start: # skip existing reduced files + # skip existing reduced files + if check_existing and i < start: continue - # Create SOF file **for each frame** sof_file = Path(output_dir / f"stare_{i+1:03d}.sof") - sof_open = open(sof_file, "w", encoding="utf-8") - file_0 = self.header_data["ORIGFILE"][i_row] - file_path_0 = f"{self.path}/raw/{file_0}" - header_0 = fits.getheader(file_path_0) - tech = header_0["ESO DPR TECH"].split(",")[-1] - staring_label = f"OBS_STARING_{tech}" + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'obs_staring' has not been " + "previously executed so forcing " + "'create_sof' to True.") + + create_sof = True + + if create_sof: + # Creating SOF file for each frame + sof_open = open(sof_file, "w", encoding="utf-8") + file_0 = self.header_data["ORIGFILE"][i_row] + file_path_0 = f"{self.path}/raw/{file_0}" + header_0 = fits.getheader(file_path_0) + + tech = header_0["ESO DPR TECH"].split(",")[-1] + staring_label = f"OBS_STARING_{tech}" + + sof_open.write(f"{file_path_0} {staring_label}\n") + self._update_files(staring_label, file_path_0) - sof_open.write(f"{file_path_0} {staring_label}\n") - self._update_files(staring_label, file_path_0) + # Write calibration files to SOF (for each file) + sof_open.write(f"{master_flat_filename} {master_flat_label}\n") + sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") + sof_open.write(f"{master_dark_filename} CAL_DARK_MASTER\n") + sof_open.write(f"{wave_tw_file} UTIL_WAVE_TW\n") + sof_open.close() + + else: + print(f"\nFound SOF file: {sof_file}") - # Write calibration files to SOF (for each file) - sof_open.write(f"{master_flat_filename} {master_flat_label}\n") - sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") - sof_open.write(f"{master_dark_filename} CAL_DARK_MASTER\n") - sof_open.write(f"{wave_tw_file} UTIL_WAVE_TW\n") - sof_open.close() + print() # Create EsoRex config file + self._create_config("cr2res_obs_staring", "obs_staring", verbose) - # RUN EsoRex - print() + # Run EsoRex + config_file = self.config_folder / "obs_staring.rc" esorex = [ @@ -3673,7 +3984,7 @@ def obs_staring(self, verbose: bool = True, check_existing: bool = True) -> None print("Running EsoRex...", end="", flush=True) subprocess.run(esorex, cwd=output_dir, stdout=stdout, check=True) - print(" [DONE]\n") + print(" [DONE]") # Rename EsoRex output to avoid overwriting key_labels = { @@ -3700,6 +4011,7 @@ def obs_nodding( correct_bad_pixels: bool = True, extraction_required: bool = True, check_existing: bool = False, + create_sof: bool = True ) -> None: """ Method for running ``cr2res_obs_nodding``. @@ -3728,6 +4040,12 @@ def obs_nodding( check_existing : bool Search for existing files in the product folder. Avoids re-reducing existing files. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -3737,6 +4055,12 @@ def obs_nodding( self._print_section("Process nodding frames", recipe_name="cr2res_obs_nodding") + print(f"Correct bad pixels: {correct_bad_pixels}") + print(f"Extraction required: {extraction_required}") + print(f"Check existing: {check_existing}") + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + # Create output folder output_dir = self.product_folder / "obs_nodding" @@ -3779,7 +4103,7 @@ def obs_nodding( nod_a_count = sum(nod_a_exp) nod_b_count = sum(nod_b_exp) - print(f"Number of exposures at nod A: {nod_a_count}") + print(f"\nNumber of exposures at nod A: {nod_a_count}") print(f"Number of exposures at nod B: {nod_b_count}") if nod_a_count != nod_b_count: @@ -3856,195 +4180,215 @@ def obs_nodding( ) continue - print( - f"\nCreating SOF file for nod pair #{count_exp+1}/{indices.sum()//2}:" - ) sof_file = Path(output_dir / f"files_{count_exp:03d}.sof") - sof_open = open(sof_file, "w", encoding="utf-8") - - file_0 = self.header_data["ORIGFILE"][i_row] + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'obs_nodding' has not been " + "previously executed so forcing " + "'create_sof' to True.") - if self.header_data["SEQ.NODPOS"][i_row + 1] == "B": - # AB pair, so using the next exposure for B - file_1 = self.header_data["ORIGFILE"][i_row + 1] + create_sof = True - elif self.header_data["SEQ.NODPOS"][i_row - 1] == "B": - # BA pair, so using the previous exposure for B - file_1 = self.header_data["ORIGFILE"][i_row - 1] - else: - warnings.warn( - f"Irregular A-B nodding sequence." - f"Please use 'obs_nodding_irregular'" - f"to reduce the data. Skipping file {file_0}" + if create_sof: + print( + f"\nCreating SOF file for nod pair #{count_exp+1}/{indices.sum()//2}:" ) - continue + sof_open = open(sof_file, "w", encoding="utf-8") - file_path_0 = f"{self.path}/raw/{file_0}" - file_path_1 = f"{self.path}/raw/{file_1}" + file_0 = self.header_data["ORIGFILE"][i_row] - header_0 = fits.getheader(file_path_0) - # header_1 = fits.getheader(file_path_1) + if self.header_data["SEQ.NODPOS"][i_row + 1] == "B": + # AB pair, so using the next exposure for B + file_1 = self.header_data["ORIGFILE"][i_row + 1] - if "ESO DPR TECH" in header_0: - if header_0["ESO DPR TECH"] == "SPECTRUM,NODDING,OTHER": - sof_open.write(f"{file_path_0} OBS_NODDING_OTHER\n") - self._update_files("OBS_NODDING_OTHER", file_path_0) + elif self.header_data["SEQ.NODPOS"][i_row - 1] == "B": + # BA pair, so using the previous exposure for B + file_1 = self.header_data["ORIGFILE"][i_row - 1] + else: + warnings.warn( + f"Irregular A-B nodding sequence." + f"Please use 'obs_nodding_irregular'" + f"to reduce the data. Skipping file {file_0}" + ) - sof_open.write(f"{file_path_1} OBS_NODDING_OTHER\n") - self._update_files("OBS_NODDING_OTHER", file_path_1) + continue - elif header_0["ESO DPR TECH"] == "SPECTRUM,NODDING,JITTER": - sof_open.write(f"{file_path_0} OBS_NODDING_JITTER\n") - self._update_files("OBS_NODDING_JITTER", file_path_0) + file_path_0 = f"{self.path}/raw/{file_0}" + file_path_1 = f"{self.path}/raw/{file_1}" - sof_open.write(f"{file_path_1} OBS_NODDING_JITTER\n") - self._update_files("OBS_NODDING_JITTER", file_path_1) + header_0 = fits.getheader(file_path_0) + # header_1 = fits.getheader(file_path_1) - else: - raise RuntimeError( - f"Could not find 'ESO DPR TECH' in the header of {file_path_0}." - ) + if "ESO DPR TECH" in header_0: + if header_0["ESO DPR TECH"] == "SPECTRUM,NODDING,OTHER": + sof_open.write(f"{file_path_0} OBS_NODDING_OTHER\n") + self._update_files("OBS_NODDING_OTHER", file_path_0) - # Find UTIL_MASTER_FLAT or CAL_FLAT_MASTER file + sof_open.write(f"{file_path_1} OBS_NODDING_OTHER\n") + self._update_files("OBS_NODDING_OTHER", file_path_1) - file_found = False + elif header_0["ESO DPR TECH"] == "SPECTRUM,NODDING,JITTER": + sof_open.write(f"{file_path_0} OBS_NODDING_JITTER\n") + self._update_files("OBS_NODDING_JITTER", file_path_0) - if "UTIL_MASTER_FLAT" in self.file_dict: - for key in self.file_dict["UTIL_MASTER_FLAT"]: - if not file_found: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_MASTER_FLAT" - ) - sof_open.write(f"{key} UTIL_MASTER_FLAT\n") - file_found = True - if "CAL_FLAT_MASTER" in self.file_dict: - for key in self.file_dict["CAL_FLAT_MASTER"]: - if not file_found: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_MASTER" - ) - sof_open.write(f"{key} CAL_FLAT_MASTER\n") - file_found = True + sof_open.write(f"{file_path_1} OBS_NODDING_JITTER\n") + self._update_files("OBS_NODDING_JITTER", file_path_1) - if not file_found: - warnings.warn("Could not find a master flat.") + else: + raise RuntimeError( + f"Could not find 'ESO DPR TECH' in the header of {file_path_0}." + ) - # Find CAL_DARK_BPM file + # Find UTIL_MASTER_FLAT or CAL_FLAT_MASTER file - file_found = False + file_found = False + + if "UTIL_MASTER_FLAT" in self.file_dict: + for key in self.file_dict["UTIL_MASTER_FLAT"]: + if not file_found: + # The FLAT correction is not applied when using + # the UTIL_MASTER_FLAT keyword in the SOF file + file_name = key.split("/")[-2:] + print( + # f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_MASTER_FLAT" + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_MASTER" + ) - if "CAL_DARK_BPM" in self.file_dict: - bpm_file = self.select_bpm(science_wlen, science_dit) + # sof_open.write(f"{key} UTIL_MASTER_FLAT\n") + sof_open.write(f"{key} CAL_FLAT_MASTER\n") + file_found = True - if bpm_file is not None: - file_name = bpm_file.split("/")[-2:] - print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM") - sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") - file_found = True + if "CAL_FLAT_MASTER" in self.file_dict: + for key in self.file_dict["CAL_FLAT_MASTER"]: + if not file_found: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_MASTER" + ) + sof_open.write(f"{key} CAL_FLAT_MASTER\n") + file_found = True + + if not file_found: + warnings.warn("Could not find a master flat.") + + # Find CAL_DARK_BPM file + + file_found = False + + if "CAL_DARK_BPM" in self.file_dict: + bpm_file = self.select_bpm(science_wlen, science_dit) if bpm_file is not None: file_name = bpm_file.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM" - ) + print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM") sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") file_found = True - if not file_found: - warnings.warn("Could not find a bap pixel map.") - - # Find UTIL_WAVE_TW file - - file_found = False - - for calib_type in ["fpet", "une"]: - if "UTIL_WAVE_TW" in self.file_dict: - for key in self.file_dict["UTIL_WAVE_TW"]: - if ( - not file_found - and key.split("/")[-2] == f"util_wave_{calib_type}" - ): - file_name = key.split("/")[-2:] + if bpm_file is not None: + file_name = bpm_file.split("/")[-2:] print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_WAVE_TW" + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM" ) - sof_open.write(f"{key} UTIL_WAVE_TW\n") + sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") file_found = True - if not file_found: - esorex_path = shutil.which("esorex") + if not file_found: + warnings.warn("Could not find a bap pixel map.") - if esorex_path is not None: - data_static = esorex_path[:-10] + "share/esopipes/datastatic" + # Find UTIL_WAVE_TW file - if glob.glob(f"{data_static}/cr2re-*"): - cr2re_folder = glob.glob(f"{data_static}/cr2re-*")[0] - cr2re_data = sorted(glob.glob(f"{cr2re_folder}/*")) - tw_file = f"{cr2re_folder}/{science_wlen}_tw.fits" + file_found = False - if os.path.exists(tw_file): - print(f" - {tw_file} UTIL_WAVE_TW") - sof_open.write(f"{tw_file} UTIL_WAVE_TW\n") - file_found = True + for calib_type in ["fpet", "une"]: + if "UTIL_WAVE_TW" in self.file_dict: + for key in self.file_dict["UTIL_WAVE_TW"]: + if ( + not file_found + and key.split("/")[-2] == f"util_wave_{calib_type}" + ): + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_WAVE_TW" + ) + sof_open.write(f"{key} UTIL_WAVE_TW\n") + file_found = True - if not file_found: - url = f"https://home.strw.leidenuniv.nl/~stolker/pycrires/{science_wlen}_tw.fits" - tw_file = f"{output_dir}/{science_wlen}_tw.fits" + if not file_found: + esorex_path = shutil.which("esorex") - if not os.path.exists(tw_file): - pooch.retrieve( - url=url, - known_hash=None, - fname=f"{science_wlen}_tw.fits", - path=output_dir, - progressbar=True, - ) + if esorex_path is not None: + data_static = esorex_path[:-10] + "share/esopipes/datastatic" - print(f" - product/obs_nodding/{science_wlen}_tw.fits UTIL_WAVE_TW") - sof_open.write(f"{tw_file} UTIL_WAVE_TW\n") - file_found = True + if glob.glob(f"{data_static}/cr2re-*"): + cr2re_folder = glob.glob(f"{data_static}/cr2re-*")[0] + cr2re_data = sorted(glob.glob(f"{cr2re_folder}/*")) + tw_file = f"{cr2re_folder}/{science_wlen}_tw.fits" - # if "CAL_WAVE_TW" in self.file_dict: - # for key in self.file_dict["CAL_WAVE_TW"]: - # if not file_found: - # file_name = key.split("/")[-2:] - # print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_WAVE_TW") - # sof_open.write(f"{key} CAL_WAVE_TW\n") - # file_found = True - # - # if "CAL_FLAT_TW" in self.file_dict: - # for key in self.file_dict["CAL_FLAT_TW"]: - # if not file_found: - # file_name = key.split("/")[-2:] - # print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_TW") - # sof_open.write(f"{key} CAL_FLAT_TW\n") - # file_found = True + if os.path.exists(tw_file): + print(f" - {tw_file} UTIL_WAVE_TW") + sof_open.write(f"{tw_file} UTIL_WAVE_TW\n") + file_found = True - if not file_found: - warnings.warn("Could not find file with TraceWave table.") + if not file_found: + url = f"https://home.strw.leidenuniv.nl/~stolker/pycrires/{science_wlen}_tw.fits" + tw_file = f"{output_dir}/{science_wlen}_tw.fits" + + if not os.path.exists(tw_file): + pooch.retrieve( + url=url, + known_hash=None, + fname=f"{science_wlen}_tw.fits", + path=output_dir, + progressbar=True, + ) - # Find CAL_DETLIN_COEFFS file + print(f" - product/obs_nodding/{science_wlen}_tw.fits UTIL_WAVE_TW") + sof_open.write(f"{tw_file} UTIL_WAVE_TW\n") + file_found = True - file_found = False + # if "CAL_WAVE_TW" in self.file_dict: + # for key in self.file_dict["CAL_WAVE_TW"]: + # if not file_found: + # file_name = key.split("/")[-2:] + # print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_WAVE_TW") + # sof_open.write(f"{key} CAL_WAVE_TW\n") + # file_found = True + # + # if "CAL_FLAT_TW" in self.file_dict: + # for key in self.file_dict["CAL_FLAT_TW"]: + # if not file_found: + # file_name = key.split("/")[-2:] + # print(f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_TW") + # sof_open.write(f"{key} CAL_FLAT_TW\n") + # file_found = True - if "CAL_DETLIN_COEFFS" in self.file_dict: - for key in self.file_dict["CAL_DETLIN_COEFFS"]: - if not file_found: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DETLIN_COEFFS" - ) - sof_open.write(f"{key} CAL_DETLIN_COEFFS\n") - file_found = True + if not file_found: + warnings.warn("Could not find file with TraceWave table.") - if not file_found: - warnings.warn("Could not find CAL_DETLIN_COEFFS.") + # Find CAL_DETLIN_COEFFS file + + file_found = False + + if "CAL_DETLIN_COEFFS" in self.file_dict: + for key in self.file_dict["CAL_DETLIN_COEFFS"]: + if not file_found: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DETLIN_COEFFS" + ) + sof_open.write(f"{key} CAL_DETLIN_COEFFS\n") + file_found = True + + if not file_found: + warnings.warn("Could not find CAL_DETLIN_COEFFS.") - sof_open.close() + sof_open.close() + + else: + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -4092,7 +4436,7 @@ def obs_nodding( subprocess.run(esorex, cwd=output_dir, stdout=stdout, check=True) if not verbose: - print(" [DONE]\n") + print(" [DONE]") if correct_bad_pixels: for nod_pos in ["A", "B"]: @@ -4253,6 +4597,7 @@ def obs_nodding_irregular( extraction_required: bool = True, check_existing: bool = False, unique_pairs: bool = False, + create_sof: bool = True ) -> None: """ Method for running ``cr2res_obs_nodding``. @@ -4287,6 +4632,12 @@ def obs_nodding_irregular( to each B in sequence. So the nth A goes with the nth B and the nth B goes with the nth A. This will only be carried out if the numbers of nodding exposures is equal. + create_sof : bool + Create a new SOF file. Setting the argument to ``True`` + will overwrite the SOF file if already present. Setting + the argument to ``False`` will allow for manually + adjusting an existing SOF file if the routine had + already been previously executed. Returns ------- @@ -4296,6 +4647,9 @@ def obs_nodding_irregular( self._print_section("Process nodding frames", recipe_name="cr2res_obs_nodding") + print(f"Verbose: {verbose}") + print(f"Create SOF: {create_sof}") + indices = self.header_data["DPR.CATG"] == "SCIENCE" # Create output folder @@ -4327,7 +4681,7 @@ def obs_nodding_irregular( nod_a_count = sum(nod_a_exp) nod_b_count = sum(nod_b_exp) - print(f"Number of exposures at nod A: {nod_a_count}") + print(f"\nNumber of exposures at nod A: {nod_a_count}") print(f"Number of exposures at nod B: {nod_b_count}") if nod_a_count != nod_b_count and unique_pairs is True: @@ -4365,142 +4719,161 @@ def obs_nodding_irregular( ) if check_existing and os.path.exists(output_file): print(f"Already reduced file {output_file}") + else: - print(f"\nCreating SOF file for {output_file}:") sof_file = Path(output_dir / f"files_{count_exp:03d}_{nod_ab}.sof") - sof_open = open(sof_file, "w", encoding="utf-8") - - file_0 = self.header_data["ORIGFILE"][i_row] + if not create_sof and not sof_file.exists(): + warnings.warn(f"The SOF file is not found at '{sof_file}' " + "while 'create_sof' is set to False. " + "Probably 'obs_nodding_irregular' has not " + "been previously executed so forcing " + "'create_sof' to True.") - if nod_ab == "A": - if not unique_pairs: - closest_i_diffnod = b_i_rows[ - np.argmin(np.abs(i_row - b_i_rows)) - ] + create_sof = True - else: - closest_i_diffnod = b_i_rows[B_counter] - B_counter += 1 + if create_sof: + print(f"\nCreating SOF file for {output_file}:") - elif nod_ab == "B": - if not unique_pairs: - closest_i_diffnod = a_i_rows[ - np.argmin(np.abs(i_row - a_i_rows)) - ] + sof_open = open(sof_file, "w", encoding="utf-8") - else: - closest_i_diffnod = a_i_rows[A_counter] - A_counter += 1 + file_0 = self.header_data["ORIGFILE"][i_row] - if unique_pairs and verbose: - # This is for printing the pairing at the end - sequence.append([i_row, closest_i_diffnod]) + if nod_ab == "A": + if not unique_pairs: + closest_i_diffnod = b_i_rows[ + np.argmin(np.abs(i_row - b_i_rows)) + ] - file_1 = self.header_data["ORIGFILE"][closest_i_diffnod] + else: + closest_i_diffnod = b_i_rows[B_counter] + B_counter += 1 - for file in [file_0, file_1]: - file_path = f"{self.path}/raw/{file}" - header = fits.getheader(file_path) - if "ESO DPR TECH" in header: - if header["ESO DPR TECH"] == "SPECTRUM,NODDING,OTHER": - sof_open.write(f"{file_path} OBS_NODDING_OTHER\n") - self._update_files("OBS_NODDING_OTHER", file_path) + elif nod_ab == "B": + if not unique_pairs: + closest_i_diffnod = a_i_rows[ + np.argmin(np.abs(i_row - a_i_rows)) + ] - elif header["ESO DPR TECH"] == "SPECTRUM,NODDING,JITTER": - sof_open.write(f"{file_path} OBS_NODDING_JITTER\n") - self._update_files("OBS_NODDING_JITTER", file_path) + else: + closest_i_diffnod = a_i_rows[A_counter] + A_counter += 1 - else: - raise RuntimeError( - f"Could not find ESO.DPR.TECH in " - f"the header of {file_path}." - ) + if unique_pairs and verbose: + # This is for printing the pairing at the end + sequence.append([i_row, closest_i_diffnod]) - # Find UTIL_MASTER_FLAT or CAL_FLAT_MASTER file + file_1 = self.header_data["ORIGFILE"][closest_i_diffnod] - file_found = False + for file in [file_0, file_1]: + file_path = f"{self.path}/raw/{file}" + header = fits.getheader(file_path) + if "ESO DPR TECH" in header: + if header["ESO DPR TECH"] == "SPECTRUM,NODDING,OTHER": + sof_open.write(f"{file_path} OBS_NODDING_OTHER\n") + self._update_files("OBS_NODDING_OTHER", file_path) - if "UTIL_MASTER_FLAT" in self.file_dict: - for key in self.file_dict["UTIL_MASTER_FLAT"]: - if not file_found: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_MASTER_FLAT" - ) - sof_open.write(f"{key} UTIL_MASTER_FLAT\n") - file_found = True + elif header["ESO DPR TECH"] == "SPECTRUM,NODDING,JITTER": + sof_open.write(f"{file_path} OBS_NODDING_JITTER\n") + self._update_files("OBS_NODDING_JITTER", file_path) - if "CAL_FLAT_MASTER" in self.file_dict: - for key in self.file_dict["CAL_FLAT_MASTER"]: - if not file_found: - file_name = key.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_MASTER" + else: + raise RuntimeError( + f"Could not find ESO.DPR.TECH in " + f"the header of {file_path}." ) - sof_open.write(f"{key} CAL_FLAT_MASTER\n") - file_found = True - - if not file_found: - warnings.warn("Could not find a master flat.") - - # Find CAL_DARK_BPM file - - file_found = False - - if "CAL_DARK_BPM" in self.file_dict: - bpm_file = self.select_bpm(science_wlen, science_dit) - - if bpm_file is not None: - file_name = bpm_file.split("/")[-2:] - print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM" - ) - sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") - file_found = True - if not file_found: - warnings.warn("Could not find a bap pixel map.") + # Find UTIL_MASTER_FLAT or CAL_FLAT_MASTER file - # Find UTIL_WAVE_TW file + file_found = False - file_found = False + if "UTIL_MASTER_FLAT" in self.file_dict: + for key in self.file_dict["UTIL_MASTER_FLAT"]: + if not file_found: + # The FLAT correction is not applied when using + # the UTIL_MASTER_FLAT keyword in the SOF file + file_name = key.split("/")[-2:] + print( + # f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_MASTER_FLAT" + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_MASTER_FLAT" + ) + # sof_open.write(f"{key} UTIL_MASTER_FLAT\n") + sof_open.write(f"{key} CAL_MASTER_FLAT\n") + file_found = True - for calib_type in ["fpet", "une"]: - if "UTIL_WAVE_TW" in self.file_dict: - for key in self.file_dict["UTIL_WAVE_TW"]: - if ( - not file_found - and key.split("/")[-2] == f"util_wave_{calib_type}" - ): + if "CAL_FLAT_MASTER" in self.file_dict: + for key in self.file_dict["CAL_FLAT_MASTER"]: + if not file_found: file_name = key.split("/")[-2:] print( - f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_WAVE_TW" + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_FLAT_MASTER" ) - sof_open.write(f"{key} UTIL_WAVE_TW\n") + sof_open.write(f"{key} CAL_FLAT_MASTER\n") file_found = True - if not file_found: - warnings.warn("Could not find file with TraceWave table.") + if not file_found: + warnings.warn("Could not find a master flat.") - # Find CAL_DETLIN_COEFFS file + # Find CAL_DARK_BPM file - file_found = False + file_found = False - if "CAL_DETLIN_COEFFS" in self.file_dict: - for key in self.file_dict["CAL_DETLIN_COEFFS"]: - if not file_found: - file_name = key.split("/")[-2:] + if "CAL_DARK_BPM" in self.file_dict: + bpm_file = self.select_bpm(science_wlen, science_dit) + + if bpm_file is not None: + file_name = bpm_file.split("/")[-2:] print( - f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DETLIN_COEFFS" + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DARK_BPM" ) - sof_open.write(f"{key} CAL_DETLIN_COEFFS\n") + sof_open.write(f"{bpm_file} CAL_DARK_BPM\n") file_found = True - if not file_found: - warnings.warn("Could not find CAL_DETLIN_COEFFS.") + if not file_found: + warnings.warn("Could not find a bap pixel map.") + + # Find UTIL_WAVE_TW file + + file_found = False + + for calib_type in ["fpet", "une"]: + if "UTIL_WAVE_TW" in self.file_dict: + for key in self.file_dict["UTIL_WAVE_TW"]: + if ( + not file_found + and key.split("/")[-2] == f"util_wave_{calib_type}" + ): + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} UTIL_WAVE_TW" + ) + sof_open.write(f"{key} UTIL_WAVE_TW\n") + file_found = True - sof_open.close() + if not file_found: + warnings.warn("Could not find file with TraceWave table.") + + # Find CAL_DETLIN_COEFFS file + + file_found = False + + if "CAL_DETLIN_COEFFS" in self.file_dict: + for key in self.file_dict["CAL_DETLIN_COEFFS"]: + if not file_found: + file_name = key.split("/")[-2:] + print( + f" - calib/{file_name[-2]}/{file_name[-1]} CAL_DETLIN_COEFFS" + ) + sof_open.write(f"{key} CAL_DETLIN_COEFFS\n") + file_found = True + + if not file_found: + warnings.warn("Could not find CAL_DETLIN_COEFFS.") + + sof_open.close() + + else: + print(f"\nFound SOF file: {sof_file}") # Create EsoRex configuration file if not found @@ -4546,7 +4919,7 @@ def obs_nodding_irregular( subprocess.run(esorex, cwd=output_dir, stdout=stdout, check=True) if not verbose: - print(" [DONE]\n") + print(" [DONE]") if correct_bad_pixels: fits_file = output_dir / f"cr2res_obs_nodding_combined{nod_ab}.fits"