Skip to content

Commit

Permalink
Merge pull request #151 from ramya-anche/onskyflatcalibration
Browse files Browse the repository at this point in the history
Creation of on sky flatfield
  • Loading branch information
ramya-anche authored Aug 29, 2024
2 parents 1f5139d + 290f84a commit ad35107
Show file tree
Hide file tree
Showing 11 changed files with 609 additions and 83 deletions.
4 changes: 4 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
tests/test_data/FluxMap1024.fits filter=lfs diff=lfs merge=lfs -text
tests/test_data/medcombined-neptune_band_1.fits filter=lfs diff=lfs merge=lfs -text
tests/test_data/medcombined-neptune_band_4.fits filter=lfs diff=lfs merge=lfs -text
tests/test_data/medcombined-uranus_band_4.fits filter=lfs diff=lfs merge=lfs -text
tests/test_data/medcombined-uranus_band_1.fits filter=lfs diff=lfs merge=lfs -text
4 changes: 4 additions & 0 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ jobs:

steps:
- uses: actions/checkout@v3
- name: Set up Git LFS
run: |
git lfs install
git lfs pull
- name: Set up Python 3.12
uses: actions/setup-python@v3
with:
Expand Down
2 changes: 1 addition & 1 deletion corgidrp/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class Image():
def __init__(self, data_or_filepath, pri_hdr=None, ext_hdr=None, err = None, dq = None, err_hdr = None, dq_hdr = None, input_hdulist = None):
if isinstance(data_or_filepath, str):
# a filepath is passed in
with fits.open(data_or_filepath) as hdulist:
with fits.open(data_or_filepath, ignore_missing_simple=True) as hdulist:

#Pop out the primary header
self.pri_hdr = hdulist.pop(0).header
Expand Down
337 changes: 258 additions & 79 deletions corgidrp/detector.py

Large diffs are not rendered by default.

154 changes: 151 additions & 3 deletions corgidrp/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import astropy.io.fits as fits
from astropy.time import Time
import numpy as np

import corgidrp.data as data
from corgidrp.data import Image
import corgidrp.detector as detector
import photutils.centroids as centr
from corgidrp.detector import imaging_area_geom, unpack_geom

import astropy.io.ascii as ascii
Expand Down Expand Up @@ -128,8 +128,9 @@ def create_synthesized_master_dark_calib(detector_areas):
Args:
detector_areas: dict
a dictionary of detector geometry properties. Keys should be as found
in detector_areas in detector.py.
a dictionary of detector geometry properties. Keys should be as found
in detector_areas in detector.py.
Returns:
dataset: corgidrp.data.Dataset instances
Expand Down Expand Up @@ -261,6 +262,153 @@ def create_simflat_dataset(filedir=None, numfiles=10):
dataset = data.Dataset(frames)
return dataset


def create_raster(mask,data,dither_sizex=None,dither_sizey=None,row_cent = None,col_cent = None,n_dith=None,mask_size=420,snr=250,planet=None, band=None, radius=None, snr_constant=None):
"""Performs raster scan of Neptune or Uranus images
Args:
mask (int): (Required) Mask used for the image. (Size of the HST images, 420 X 420 pixels with random values mean=1, std=0.03)
data (float):(Required) Data in array npixels*npixels format to be raster scanned
dither_sizex (int):(Required) Size of the dither in X axis in pixels (number of pixels across the planet (neptune=50 and uranus=65))
dither_sizey (int):(Required) Size of the dither in X axis in pixels (number of pixels across the planet (neptune=50 and uranus=65))
row_cent (int): (Required) X coordinate of the centroid
col_cent (int): (Required) Y coordinate of the centroid
n_dith (int): number of dithers required (n_dith=3 for Neptune and n_dith=2 for uranus)
mask_size (int): Size of the mask in pixels (Size of the HST images, 420 X 420 pixels with random values mean=1, std=0.03)
snr (int): Required SNR in the planet images (=250 in the HST images)
planet (str): neptune or uranus
band (str): 1 or 4
radius (int): radius of the planet in pixels (radius=54 for neptune, radius=90)
snr_constant (int): constant for snr reference (4.95 for band1 and 9.66 for band4)
Returns:
dither_stack_norm (np.array): stacked dithers of the planet images
cent (np.array): centroid of images
"""

cents = []

data_display = data.copy()
col_max = int(col_cent) + int(mask_size/2)
col_min = int(col_cent) - int(mask_size/2)
row_max = int(row_cent) + int(mask_size/2)
row_min = int(row_cent) - int(mask_size/2)
dithers = []

if dither_sizey == None:
dither_sizey = dither_sizex
if planet == 'neptune':
dith_end = n_dith
elif planet == 'uranus':
dith_end = n_dith+1

for i in np.arange(-n_dith,dith_end):
for j in np.arange(-n_dith,dith_end):
mask_data = data.copy()
image_data = mask_data[row_min + (dither_sizey * j):row_max + (dither_sizey * j), col_min + (dither_sizex * i):col_max + (dither_sizex * i)]
cents.append(((mask_size/2) + (row_cent - int(row_cent)) - (dither_sizey//2) - (dither_sizey * j), (mask_size/2) + (col_cent - int(col_cent)) - (dither_sizex//2) - (dither_sizex * i)))
try:
new_image_data = image_data * mask

if planet == 'neptune' and band=='1' or planet == 'uranus' and band=='1':
snr_ref = snr/np.sqrt(snr_constant)
elif planet == 'neptune' and band=='4' or planet == 'uranus' and band=='4' :
snr_ref = snr/np.sqrt(snr_constant)

u_centroid = centr.centroid_1dg(new_image_data)
uxc = int(u_centroid[0])
uyc = int(u_centroid[1])

modified_data = new_image_data

nx = np.arange(0,modified_data.shape[1])
ny = np.arange(0,modified_data.shape[0])
nxx,nyy = np.meshgrid(nx,ny)
nrr = np.sqrt((nxx-uxc)**2 + (nyy-uyc)**2)

planmed = np.median(modified_data[nrr<radius])
modified_data[nrr<=radius] = np.random.normal(modified_data[nrr<=radius], (planmed/snr_ref) * np.abs(modified_data[nrr<=radius]/planmed))

new_image_data_snr = modified_data
except ValueError:
print(image_data.shape)
print(mask.shape)
dithers.append(new_image_data_snr)

dither_stack_norm = []
for dither in dithers:
dither_stack_norm.append(dither)
dither_stack = None

median_dithers = None
final = None
full_mask = mask

return dither_stack_norm,cents

def create_onsky_rasterscans(dataset,filedir=None,planet=None,band=None, im_size=420, d=None, n_dith=None, numfiles=36, radius=None, snr=250, snr_constant=None):
"""
Create simulated data to check the flat division
Args:
dataset (corgidrp.data.Dataset): dataset of HST images of neptune and uranus
filedir (str): Full path to directory to save the raster scanned images.
planet (str): neptune or uranus
band (str): 1 or 4
im_size (int): x-dimension of the planet image (in pixels= 420 for the HST images)
d (int): number of pixels across the planet (neptune=50 and uranus=65)
n_dith (int): Number of dithers required (n_dith=3 for neptune and n_dith=2 for Uranus)
numfiles (int): total number of raster images (default 36)
radius (int): radius of the planet in pixels (radius=54 for neptune, radius=90 in HST images)
snr (int): SNR required for the planet image (default is 250 for the HST images)
snr_constant (int): constant for snr reference (4.95 for band1 and 9.66 for band4)
Returns:
corgidrp.data.Dataset:
The simulated dataset of raster scanned images of planets uranus or neptune
"""
n = im_size
qe_prnu_fsm_raster = np.random.normal(1,.03,(n,n))
pred_cents=[]
planet_rot_images=[]

for i in range(len(dataset)):
target=dataset[i].pri_hdr['TARGET']
filter=dataset[i].pri_hdr['FILTER']
if planet==target and band==filter:
planet_image=dataset[i].data
centroid=centr.centroid_com(planet_image)
xc=centroid[0]
yc=centroid[1]
if planet == 'neptune':
planetrad=radius; snrcon=snr_constant
planet_repoint_current = create_raster(qe_prnu_fsm_raster,planet_image,row_cent=yc+(d//2),col_cent=xc+(d//2), dither_sizex=d, dither_sizey=d,n_dith=n_dith,mask_size=n,snr=snr,planet=target,band=filter,radius=planetrad, snr_constant=snrcon)
elif planet == 'uranus':
planetrad=radius; snrcon=snr_constant
planet_repoint_current = create_raster(qe_prnu_fsm_raster,planet_image,row_cent=yc,col_cent=xc, dither_sizex=d, dither_sizey=d,n_dith=n_dith,mask_size=n,snr=snr,planet=target,band=filter,radius=planetrad, snr_constant=snrcon)

for j in np.arange(len(planet_repoint_current[0])):
for j in np.arange(len(planet_repoint_current[0])):
planet_rot_images.append(planet_repoint_current[0][j])
pred_cents.append(planet_repoint_current[1][j])
filepattern= planet+'_'+band+"_"+"raster_scan_{0:01d}.fits"
frames=[]
for i in range(numfiles):
prihdr, exthdr = create_default_headers()
sim_data=planet_rot_images[i]
frame = data.Image(sim_data, pri_hdr=prihdr, ext_hdr=exthdr)
pl=planet
band=band
frame.pri_hdr.append(('TARGET', pl), end=True)
frame.pri_hdr.append(('FILTER', band), end=True)
if filedir is not None:
frame.save(filedir=filedir, filename=filepattern.format(i))
frames.append(frame)
raster_dataset = data.Dataset(frames)
return raster_dataset

def create_flatfield_dummy(filedir=None, numfiles=2):

"""
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ numpy
pandas
pytest
scipy
photutils
statsmodels
pyklip
178 changes: 178 additions & 0 deletions tests/test_create_flatfield.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import os
import glob
import pytest
import numpy as np
import corgidrp
import corgidrp.data as data
import corgidrp.mocks as mocks
import corgidrp.detector as detector
import corgidrp.l2a_to_l2b as l2a_to_l2b
import photutils.centroids as centr


old_err_tracking = corgidrp.track_individual_errors

def test_create_flatfield_neptune():
"""
Generate mock input data and pass into flat division function
"""
corgidrp.track_individual_errors = True # this test uses individual error components

###### create simulated data
# check that simulated data folder exists, and create if not
datadir = os.path.join(os.path.dirname(__file__), "simdata")
if not os.path.exists(datadir):
os.mkdir(datadir)
mocks.create_simflat_dataset(filedir=datadir)

# simulated images to be checked in flat division
simdata_filenames=glob.glob(os.path.join(datadir, "sim_flat*.fits"))
simflat_dataset=data.Dataset(simdata_filenames)

###### create simulated raster scanned data
# check that simulated raster scanned data folder exists, and create if not
file_dir = os.path.join(os.path.dirname(__file__), "simdata_rasterscan")
data_dir = os.path.join(os.path.dirname(__file__),"test_data/")
if not os.path.exists(file_dir):
os.mkdir(file_dir)
filenames = glob.glob(os.path.join(data_dir, "med*.fits"))
data_set = data.Dataset(filenames)
# creating flatfield for neptune for band 1
planet='neptune'; band='1'
mocks.create_onsky_rasterscans(data_set,filedir=file_dir,planet='neptune',band='1',im_size=420,d=50, n_dith=3,numfiles=36,radius=54,snr=250,snr_constant=4.55)

####### create flat field
flat_dataset=[]
flat_filenames = glob.glob(os.path.join(file_dir, "*.fits"))
flat_dataset_all = data.Dataset(flat_filenames)
for i in range(len(flat_dataset_all)):
target=flat_dataset_all[i].pri_hdr['TARGET']
filter=flat_dataset_all[i].pri_hdr['FILTER']
if planet==target and band==filter:
flat_dataset.append(flat_dataset_all[i])
onskyflat_field = detector.create_onsky_flatfield(flat_dataset, planet='neptune',band='1',up_radius=55, im_size=420, N=3, rad_mask=1.26, planet_rad=50, n_pix=165, n_pad=302)

assert np.nanmean(onskyflat_field.data) == pytest.approx(1, abs=1e-2)


calibdir = os.path.join(os.path.dirname(__file__), "testcalib")

flat_filename = "sim_flatfield_"+str(planet)+"_"+str(band)+".fits"
if not os.path.exists(calibdir):
os.mkdir(calibdir)
onskyflat_field.save(filedir=calibdir, filename=flat_filename)

###### perform flat division
# load in the flatfield
flat_filepath = os.path.join(calibdir, flat_filename)
onsky_flatfield = data.FlatField(flat_filepath)


flatdivided_dataset = l2a_to_l2b.flat_division(simflat_dataset,onsky_flatfield)
print(flatdivided_dataset[0].ext_hdr["HISTORY"])


# perform checks after the flat divison for one of the dataset
assert(flat_filename in str(flatdivided_dataset[0].ext_hdr["HISTORY"]))



# check the propagated errors for one of the dataset
assert flatdivided_dataset[0].err_hdr["Layer_2"] == "FlatField_error"
print("mean of all simulated data",np.mean(simflat_dataset.all_data))
print("mean of all simulated data error",np.nanmean(simflat_dataset.all_err) )
print("mean of all flat divided data:", np.nanmean(flatdivided_dataset.all_data))
print("mean of flatfield:", np.nanmean(onsky_flatfield.data))
print("mean of flatfield err:", np.nanmean(onsky_flatfield.err))

err_flatdiv=np.nanmean(flatdivided_dataset.all_err)
err_estimated=np.sqrt(((np.nanmean(onsky_flatfield.data))**2)*(np.nanmean(simflat_dataset.all_err))**2+((np.nanmean(simflat_dataset.all_data))**2)*(np.nanmean(onsky_flatfield.err))**2)
print("mean of all flat divided data errors:",err_flatdiv)
print("Error estimated:",err_estimated)
assert(err_flatdiv == pytest.approx(err_estimated, abs = 1e-1))

print(flatdivided_dataset[0].ext_hdr)

return

#creating flatfield using uranus for band4

def test_create_flatfield_uranus():
###### create simulated data
# check that simulated data folder exists, and create if not
datadir = os.path.join(os.path.dirname(__file__), "simdata")
if not os.path.exists(datadir):
os.mkdir(datadir)
mocks.create_simflat_dataset(filedir=datadir)

# simulated images to be checked in flat division
simdata_filenames=glob.glob(os.path.join(datadir, "sim_flat*.fits"))
simflat_dataset=data.Dataset(simdata_filenames)

###### create simulated raster scanned data
# check that simulated raster scanned data folder exists, and create if not
file_dir = os.path.join(os.path.dirname(__file__), "simdata_rasterscan")
data_dir = os.path.join(os.path.dirname(__file__),"test_data/")
if not os.path.exists(file_dir):
os.mkdir(file_dir)
filenames = glob.glob(os.path.join(data_dir, "med*.fits"))
data_set = data.Dataset(filenames)
planet='uranus'; band='4'
mocks.create_onsky_rasterscans(data_set,filedir=file_dir,planet='uranus',band='4',im_size=420,d=65, n_dith=2,numfiles=36,radius=90,snr=250,snr_constant=9.66)

####### create flat field
flat_dataset=[]
flat_filenames = glob.glob(os.path.join(file_dir, "*.fits"))
flat_dataset_all = data.Dataset(flat_filenames)
for i in range(len(flat_dataset_all)):
target=flat_dataset_all[i].pri_hdr['TARGET']
filter=flat_dataset_all[i].pri_hdr['FILTER']
if planet==target and band==filter:
flat_dataset.append(flat_dataset_all[i])
onskyflat_field = detector.create_onsky_flatfield(flat_dataset, planet='uranus',band='4',up_radius=55, im_size=420, N=3, rad_mask=1.75, planet_rad=65, n_pix=165, n_pad=302)

assert np.nanmean(onskyflat_field.data) == pytest.approx(1, abs=1e-2)


calibdir = os.path.join(os.path.dirname(__file__), "testcalib")

flat_filename = "sim_flatfield_"+str(planet)+"_"+str(band)+".fits"
if not os.path.exists(calibdir):
os.mkdir(calibdir)
onskyflat_field.save(filedir=calibdir, filename=flat_filename)

###### perform flat division
# load in the flatfield
flat_filepath = os.path.join(calibdir, flat_filename)
onsky_flatfield = data.FlatField(flat_filepath)


flatdivided_dataset = l2a_to_l2b.flat_division(simflat_dataset,onsky_flatfield)
print(flatdivided_dataset[0].ext_hdr["HISTORY"])


# perform checks after the flat divison for one of the dataset
assert(flat_filename in str(flatdivided_dataset[0].ext_hdr["HISTORY"]))

# check the propagated errors for one of the dataset
assert flatdivided_dataset[0].err_hdr["Layer_2"] == "FlatField_error"
print("mean of all simulated data",np.mean(simflat_dataset.all_data))
print("mean of all simulated data error",np.nanmean(simflat_dataset.all_err) )
print("mean of all flat divided data:", np.nanmean(flatdivided_dataset.all_data))
print("mean of flatfield:", np.nanmean(onsky_flatfield.data))
print("mean of flatfield err:", np.nanmean(onsky_flatfield.err))

err_flatdiv=np.nanmean(flatdivided_dataset.all_err)
err_estimated=np.sqrt(((np.nanmean(onsky_flatfield.data))**2)*(np.nanmean(simflat_dataset.all_err))**2+((np.nanmean(simflat_dataset.all_data))**2)*(np.nanmean(onsky_flatfield.err))**2)
print("mean of all flat divided data errors:",err_flatdiv)
print("Error estimated:",err_estimated)
assert(err_flatdiv == pytest.approx(err_estimated, abs = 1e-1))

print(flatdivided_dataset[0].ext_hdr)
corgidrp.track_individual_errors = old_err_tracking

return

if __name__ == "__main__":
test_create_flatfield_neptune()
test_create_flatfield_uranus()
3 changes: 3 additions & 0 deletions tests/test_data/medcombined-neptune_band_1.fits
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/test_data/medcombined-neptune_band_4.fits
Git LFS file not shown
Loading

0 comments on commit ad35107

Please sign in to comment.