Skip to content

Commit

Permalink
Option to bypass data missing check, ReducedSpectra class
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanLoh committed Apr 4, 2024
1 parent ef991cd commit ad6c1c8
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 49 deletions.
2 changes: 1 addition & 1 deletion nenupy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
__copyright__ = "Copyright 2023, nenupy"
__credits__ = ["Alan Loh"]
__license__ = "MIT"
__version__ = "2.6.10"
__version__ = "2.6.11"
__maintainer__ = "Alan Loh"
__email__ = "[email protected]"

Expand Down
3 changes: 2 additions & 1 deletion nenupy/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#! /usr/bin/python3
# -*- coding: utf-8 -*-

from .tf import Spectra
from .tf import Spectra, TFTask
from .tf_utils import ReducedSpectra
82 changes: 72 additions & 10 deletions nenupy/io/tf.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,21 @@ def _func_call(self) -> Callable:

@classmethod
def correct_bandpass(cls):
""" :class:`~nenupy.io.tf.TFTask` calling :func:`~nenupy.io.tf_utils.correct_bandpass` to correct the polyphase-filter bandpass reponse.
""" :class:`~nenupy.io.tf.TFTask` to correct for the sub-band bandpass response.
A Poly-Phase Filter is involved in the NenuFAR data acquisition pipeline to split the data stream into sub-bands.
The combination of the filter shape and a Fourier transform results in a non-flat response across each sub-band.
This :class:`~nenupy.io.tf.TFTask` calls the :func:`~nenupy.io.tf_utils.correct_bandpass` function.
Example
-------
.. code-block:: python
:emphasize-lines: 3
>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline
>>> sp = Spectra("/my/file.spectra")
>>> sp.pipeline = TFPipeline(sp, TFTask.correct_bandpass())
>>> data = sp.get(...)
.. figure:: ./_images/io_images/tf_bandpass_correction.png
:width: 650
Expand All @@ -129,16 +143,28 @@ def wrapper_task(data, channels):

@classmethod
def flatten_subband(cls):
"""_summary_
""" :class:`~nenupy.io.tf.TFTask` to flatten each sub-band bandpass.
Based on the temporal median over each suband, a linear correction is applied to flatten the signal.
This :class:`~nenupy.io.tf.TFTask` calls the :func:`~nenupy.io.tf_utils.flatten_subband` function.
Example
-------
.. code-block:: python
:emphasize-lines: 3
>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline
>>> sp = Spectra("/my/file.spectra")
>>> sp.pipeline = TFPipeline(sp, TFTask.flatten_subband())
>>> data = sp.get(...)
.. figure:: ./_images/io_images/tf_sb_flatten.png
:width: 650
:align: center
Warning
-------
This is a warning
This correction assumes that the signal's spectrum could be considered flat at the sub-band resolution.
The method is not recommended for data other than Stokes I.
"""
def wrapper_task(data, channels):
Expand Down Expand Up @@ -169,6 +195,11 @@ def wrapper_task(data, channels, remove_channels):

@classmethod
def correct_polarization(cls):
"""_summary_
warning : needs to be done at the beginning.
"""
def wrapper_task(
time_unix,
frequency_hz,
Expand Down Expand Up @@ -273,7 +304,32 @@ def wrapper_task(

@classmethod
def time_rebin(cls):
"""_summary_
""" :class:`~nenupy.io.tf.TFTask` to re-bin the data in time.
The targetted time resolution is defined by the ``'rebin_dt'`` argument, set in :attr:`~nenupy.io.tf.TFPipeline.parameters`.
This :class:`~nenupy.io.tf.TFTask` calls the :func:`~nenupy.io.tf_utils.rebin_along_dimension` function.
Example
-------
.. code-block:: python
>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline
>>> import astropy.units as u
>>> sp = Spectra("/my/file.spectra")
>>> sp.pipeline = TFPipeline(sp, TFTask.time_rebin())
Then, either perform a one_time application of the `rebin_dt` parameter (that is forgotten after the :meth:`~nenupy.io.tf.Spectra.get` call):
.. code-block:: python
>>> data = sp.get(..., rebin_dt=0.2*u.s,...)
Or, set it for further usage:
.. code-block:: python
>>> sp.pipeline.parameters["rebin_dt"] = 0.2*u.s
>>> data = sp.get(...)
.. figure:: ./_images/io_images/tf_time_rebin.png
:width: 650
Expand Down Expand Up @@ -652,7 +708,7 @@ class Spectra:
"""

def __init__(self, filename: str):
def __init__(self, filename: str, check_missing_data: bool = True):
self.filename = filename

# Decode the main header and lazy load the data
Expand All @@ -664,7 +720,7 @@ def __init__(self, filename: str):
data = self._lazy_load_data()

# Compute the boolean mask of bad blocks
bad_block_mask = self._get_bad_data_mask(data)
bad_block_mask = self._get_bad_data_mask(data, bypass_verification=~check_missing_data)

# Compute the main data block descriptors (time / frequency / beam)
subband_width_hz = SUBBAND_WIDTH.to_value(u.Hz)
Expand Down Expand Up @@ -915,12 +971,15 @@ def get(self, file_name: str = None, **pipeline_kwargs) -> SData:
else:
# Save the result of the pipeline in a file
# No security on the resulting data volume
log.info(f"Estimated data volume to store: {(data.nbytes * u.byte).to(u.Gibyte):.3f}...")
utils.store_dask_tf_data(
file_name=file_name,
data=data,
time=time,
frequency=frequency,
polarization=["XX", "XY", "YX", "YY"] if not self.pipeline.contains("Compute Stokes parameters") else self.pipeline.parameters["stokes"]
polarization=["XX", "XY", "YX", "YY"] if not self.pipeline.contains("Compute Stokes parameters") else self.pipeline.parameters["stokes"],
mode="w" if self.pipeline.parameters["overwrite"] else "auto",
beam=self.pipeline.parameters["beam"]
)
self.pipeline.parameters = parameters_copy # Reset the parameters
return
Expand Down Expand Up @@ -1159,9 +1218,13 @@ def _lazy_load_data(self) -> np.ndarray:
return tmp.view(np.dtype(global_struct))

@staticmethod
def _get_bad_data_mask(data: np.ndarray) -> np.ndarray:
def _get_bad_data_mask(data: np.ndarray, bypass_verification: bool = False) -> np.ndarray:
""" """

if bypass_verification:
log.info("Skipping missing data verification...")
return np.zeros(data["TIMESTAMP"].size, dtype=bool)

log.info("Checking for missing data (can take up to 1 min)...")

# Either the TIMESTAMP is set to 0, the first idx, or the SB number is negative
Expand Down Expand Up @@ -1278,4 +1341,3 @@ def _to_sdata(self, data: np.ndarray, time: Time, frequency: u.Quantity) -> SDat
freq=frequency,
polar=self.pipeline.parameters["stokes"],
)

Loading

0 comments on commit ad6c1c8

Please sign in to comment.