# Licensed under a 3-clause BSD style license - see LICENSE.rst
from astropy import log
from astropy.io import fits
import numpy as np
from sofia_redux.scan.custom.hawc_plus.channels.gain_provider.pol_imbalance \
import HawcPlusPolImbalance
from sofia_redux.scan.custom.hawc_plus.channels.mode.los_response import (
LosResponse)
from sofia_redux.scan.custom.hawc_plus.channels.mode.roll_response import (
RollResponse)
from sofia_redux.scan.custom.sofia.channels.camera import SofiaCamera
from sofia_redux.scan.channels.modality.coupled_modality import (
CoupledModality)
from sofia_redux.scan.channels.modality.correlated_modality import (
CorrelatedModality)
from sofia_redux.scan.channels.modality.modality import Modality
__all__ = ['HawcPlusChannels']
[docs]
class HawcPlusChannels(SofiaCamera):
def __init__(self, parent=None, info=None, size=0, name='hawc_plus'):
"""
Initialize HAWC+ channels.
Parameters
----------
parent : object, optional
The owner of the channels such as a Reduction, Scan or Integration.
info : HawcPlusInfo, optional
The info object relating to these channels.
size : int, optional
The intended size of the channels (number of total data channels).
name : str, optional
The name for the channels.
"""
super().__init__(name=name, parent=parent, info=info, size=size)
self.subarray_groups = None
self.subarray_gain_renorm = None
[docs]
def copy(self):
"""
Return a copy of the HAWC+ channels.
Returns
-------
HawcPlusChannels
"""
return super().copy()
@property
def detector(self):
"""
Return the detector info.
Returns
-------
HawcPlusDetectorArrayInfo
"""
return self.info.detector_array
@property
def band_id(self):
"""
Return the HAWC_PLUS Band
Returns
-------
band : str
"""
return self.info.instrument.band_id
@property
def pixel_sizes(self):
"""
Return the (x,y) pixel size.
Returns
-------
numpy.ndarray (astropy.units.Quantity)
An array of size 2 containing the x, y pixel size in arc seconds.
"""
return self.info.detector_array.pixel_sizes
@property
def dark_squid_lookup(self):
"""
Return the dark squid lookup array.
The lookup array is of the form lookup[sub, col] = fixed_index.
Invalid values are marked with values of -1 (good pixels).
Returns
-------
lookup : numpy.ndarray (int)
"""
return self.info.detector_array.dark_squid_lookup
[docs]
def init_groups(self):
"""
Initializes channel groups for HAWC+.
Each group contains a subset of the channel data, referenced by index.
The HAWC_PLUS groups contain additional groups based on sub-array.
Returns
-------
None
"""
super().init_groups()
self.subarray_groups = [None] * self.info.detector_array.subarrays
sub_index = 0
for pol_array in range(self.info.detector_array.pol_arrays):
for pol_sub_array in range(self.info.detector_array.pol_subarrays):
indices = np.nonzero(self.data.sub == sub_index)[0]
pol_id = self.info.detector_array.POL_ID[pol_array]
group = self.create_channel_group(
indices=indices,
name=f'{pol_id}{pol_sub_array}')
self.add_group(group)
self.subarray_groups[sub_index] = group
sub_index += 1
[docs]
def init_divisions(self):
"""
Initializes channel divisions.
Divisions contain sets of channel groups.
The HAWC_PLUS channel adds divisions consisting of groups where
each contains a unique value of a certain data field. For example,
the "rows" division contains a group for row 1, a group for row 2, etc.
Returns
-------
None
"""
super().init_divisions()
dead_blind = self.flagspace.flags.DEAD | self.flagspace.flags.BLIND
for division_name, field in [
('polarrays', 'pol'),
('subarrays', 'sub'),
('bias', 'bias_line'),
('series', 'series_array')
]:
self.add_division(self.get_division(
name=division_name, field=field, discard_flag=dead_blind))
if self.configuration.get_bool('darkcorrect'):
bad_mux_flag = self.flagspace.flags.DEAD
else:
bad_mux_flag = dead_blind
mux_division = self.get_division(name='mux', field='mux',
discard_flag=bad_mux_flag)
# I don't know why, but order channels in pin (row) order.
for group in mux_division.groups:
group.indices = group.indices[np.argsort(group.row)]
self.add_division(mux_division)
self.add_division(self.get_division(
name='rows', field='row', discard_flag=bad_mux_flag))
[docs]
def init_modalities(self):
"""
Initializes channel modalities.
A modality is based of a channel division and contains a mode for each
channel group in the channel division.
The HAWC_PLUS modalities simply contain additional correlated modes
based on the additional channel fields. A new coupled modality
is also created according to polarization arrays.
Returns
-------
None
"""
super().init_modalities()
obs_modality = self.modalities.get('obs-channels')
if obs_modality is not None:
self.add_modality(
CoupledModality(modality=obs_modality,
name='polarrays',
identity='p',
gain_provider=HawcPlusPolImbalance()))
flags = self.flagspace.flags
builds = [('subarrays', 'S', 'subarrays', 'sub_gain', flags.SUB),
('bias', 'b', 'bias', 'bias_gain', flags.BIAS),
('series', 's', 'series', 'series_gain', flags.SERIES_ARRAY),
('mux', 'm', 'mux', 'mux_gain', flags.MUX),
('rows', 'r', 'rows', 'pin_gain', flags.ROW)]
for name, identity, division_name, gain_field, gain_flag in builds:
division = self.divisions.get(division_name)
if division is None: # pragma: no cover
log.warning(f"Channel division {division_name} not found.")
continue
modality = CorrelatedModality(name=name,
identity=identity,
channel_division=division,
gain_provider=gain_field)
modality.set_gain_flag(gain_flag)
self.add_modality(modality)
detector_division = self.divisions.get('detectors')
los_response = Modality(name='los', identity='L',
channel_division=detector_division,
gain_provider='los_gain',
mode_class=LosResponse)
los_response.set_gain_flag(flags.LOS_RESPONSE)
self.add_modality(los_response)
roll_response = Modality(name='roll', identity='R',
channel_division=detector_division,
gain_provider='roll_gain',
mode_class=RollResponse)
roll_response.set_gain_flag(flags.ROLL_RESPONSE)
self.add_modality(roll_response)
[docs]
def load_channel_data(self):
"""
Load the channel data.
The pixel data and wiring data files should be defined in the
configuration.
Returns
-------
None
"""
self.detector.load_detector_configuration()
self.detector.initialize_channel_data(self.data)
self.set_nominal_pixel_positions(self.info.detector_array.pixel_sizes)
super().load_channel_data()
if self.configuration.has_option('jumpdata'):
self.read_jump_levels(self.configuration.get_filepath('jumpdata'))
[docs]
def read_jump_levels(self, filename):
"""
Read the jump levels from a data file.
Parameters
----------
filename : str
Returns
-------
None
"""
if filename is None:
return
log.info(f"Loading jump levels from {filename}")
hdul = fits.open(filename)
self.data.read_jump_hdu(hdul[0])
self.info.register_config_file(filename)
hdul.close()
[docs]
def normalize_array_gains(self):
"""
Normalize the relative channel gains in observing channels.
Returns
-------
average_gain : float
The average gain prior to normalization.
"""
log.debug("Normalizing subarray gains.")
sub_arrays = CorrelatedModality(
name='subs', identity='S',
channel_division=self.divisions.get('subarrays'),
gain_provider='gain')
self.subarray_gain_renorm = np.full(
self.info.detector_array.subarrays, np.nan)
for mode in sub_arrays.modes:
sub_value = mode.channel_group.sub[0]
self.subarray_gain_renorm[sub_value] = mode.normalize_gains()
sub_id = self.info.detector_array.get_subarray_id(sub_value)
sub_gain = self.subarray_gain_renorm[sub_value]
log.debug(f"--> {sub_id} gain = {sub_gain:.3f}")
return 1.0
[docs]
def set_nominal_pixel_positions(self, pixel_sizes):
"""
Set the nominal pixel positions for the given pixel sizes.
Parameters
----------
pixel_sizes : Coordinate2D
Returns
-------
None
"""
self.detector.pixel_sizes = pixel_sizes
self.detector.set_boresight()
self.data.calculate_sibs_position()
center = self.detector.get_sibs_position(
sub=0,
row=39 - self.detector.boresight_index.y,
col=self.detector.boresight_index.x)
self.set_reference_position(center) # subtracts the center position.
[docs]
def max_pixels(self):
"""
Return the maximum pixels in the detector array.
Returns
-------
count : int
"""
return self.n_store_channels
[docs]
def read_data(self, hdul):
"""
Read a FITS HDU list to populate channel data.
Parameters
----------
hdul : fits.HDUList
Returns
-------
None
"""
for hdu in hdul:
if (hdu.header.get('EXTNAME', '').strip().upper()
== 'CONFIGURATION'):
self.info.detector_array.parse_configuration_hdu(hdu)
[docs]
def validate_scan(self, scan):
"""
Validate the channels with a scan.
Parameters
----------
scan : Scan
Returns
-------
None
"""
pol_mask = 0
for i in range(self.detector.subarrays):
pol_mask |= (i & 2) + 1
self.detector.dark_squid_correction = self.configuration.get_bool(
'darkcorrect')
if pol_mask != 3: # pragma: no cover
# Hard to test since it depends on the detector class
self.configuration.blacklist('correlated.polarrays')
self.detector.initialize_channel_data(self.data)
if not self.configuration.has_option('filter'):
wavelength = self.info.instrument.wavelength.to('um').value
if (wavelength % 1) == 0:
wavelength = int(wavelength)
self.configuration.set_option('filter', f'{wavelength}um')
log.info(f"HAWC+ Filter set to "
f"{self.configuration.get_string('filter')}")
super().validate_scan(scan)
self.create_dark_squid_lookup()
[docs]
def slim(self, reindex=True):
"""
Remove all DEAD or DISCARD flagged channels.
Will also update channel groups, divisions, and modalities.
Parameters
----------
reindex : bool, optional
If `True`, reindex channels if slimmed.
Returns
-------
slimmed : bool
`True` if channels were discarded, `False` otherwise.
"""
slimmed = super().slim(reindex=reindex)
if slimmed:
self.create_dark_squid_lookup()
return slimmed
[docs]
def create_dark_squid_lookup(self):
"""
Create the dark squid lookup array.
The dark squid lookup array is stored in the info.detector_array
object.
Returns
-------
None
"""
self.info.detector_array.create_dark_squid_lookup(self)
[docs]
def get_si_pixel_size(self):
"""
Return the science instrument pixel size
Returns
-------
x, y : Coordinate2D
The (x, y) pixel sizes
"""
return self.detector.pixel_sizes
[docs]
def write_flat_field(self, filename, include_nonlinear=False):
"""
Write a flat field file used for chop-nod pipelines.
Parameters
----------
filename : str
The filename to write to.
include_nonlinear : bool, optional
If `True`, include the nonlinear responses.
Returns
-------
None
"""
shape = self.detector.rows, self.detector.pol_cols
# Set defaults
gain_r = np.ones(shape, dtype=float)
gain_t = np.ones(shape, dtype=float)
flag_r = np.full(shape, 1, dtype=int)
flag_t = np.full(shape, 2, dtype=int)
if include_nonlinear:
nonlinear_r = np.zeros(shape, dtype=float)
nonlinear_t = np.zeros(shape, dtype=float)
else:
nonlinear_r = nonlinear_t = None
data = self.data
detector = self.detector
flagged = self.data.is_flagged()
gains = self.subarray_gain_renorm[data.sub] * data.gain * data.coupling
inverse_gains = np.zeros_like(gains)
nzi = gains != 0
inverse_gains[nzi] = 1 / gains[nzi]
all_cols = ((data.sub & 1) * detector.subarray_cols) + data.col
for array in [detector.R_ARRAY, detector.T_ARRAY]:
select = np.nonzero(data.pol == array)[0]
if select.size == 0:
continue
if array == detector.R_ARRAY:
gains, flags, nonlinear = gain_r, flag_r, nonlinear_r
else:
gains, flags, nonlinear = gain_t, flag_t, nonlinear_t
rows = data.subrow[select]
cols = all_cols[select]
gains[rows, cols] = inverse_gains[select]
if include_nonlinear:
nonlinear[rows, cols] = data.nonlinearity[select]
flags[rows, cols] *= flagged[select]
hdul = fits.HDUList()
hdul.append(fits.ImageHDU(gain_r, name='R array gain'))
hdul.append(fits.ImageHDU(gain_t, name='T array gain'))
hdul.append(fits.ImageHDU(flag_r, name='R bad pixel mask'))
hdul.append(fits.ImageHDU(flag_t, name='T bad pixel mask'))
if include_nonlinear:
hdul.append(fits.ImageHDU(nonlinear_r,
name='R array nonlinearity'))
hdul.append(fits.ImageHDU(nonlinear_t,
name='T array nonlinearity'))
hdul.writeto(filename, overwrite=True)
hdul.close()
log.info(f"Written flat field to {filename}.")
[docs]
def add_hdu(self, hdul, hdu, extname):
"""
Add a FITS HDU to the HDUList.
Parameters
----------
hdul : fits.HDUList
The HDUList to append to.
hdu : fits.ImageHDU or fits.PrimaryHDU or fits.BinTableHDU
The fits HDU to append.
extname : str
The name of the HDU extension.
Returns
-------
None
"""
hdu.header['EXTNAME'] = extname, 'image content ID'
self.info.edit_header(hdu.header)
hdul.append(hdu)
[docs]
def get_table_entry(self, name):
"""
Return a channel parameter for the given name.
Parameters
----------
name : str
Returns
-------
value
"""
if name == 'band':
return self.band_id
else:
return super().get_table_entry(name)