[docs]
class Observation2D(Map2D):
def __init__(self, data=None, blanking_value=np.nan, dtype=float,
shape=None, unit=None, weight_dtype=float,
weight_blanking_value=None):
"""
Initialize an Observation2D object.
The 2-D observation is an extension of the :class:`Map2D` class that
includes weights and exposure times in addition to the observation data
values.
Parameters
----------
data : numpy.ndarray, optional
Data to initialize the flagged array with. If supplied, sets the
shape of the array. Note that the data type will be set to that
defined by the `dtype` parameter.
blanking_value : int or float, optional
The blanking value defines invalid values in the data array. This
is the equivalent of defining a NaN value.
dtype : type, optional
The data type of the data array.
shape : tuple (int), optional
The shape of the data array. This will only be relevant if
`data` is not defined.
unit : str or units.Unit or units.Quantity, optional
The data unit.
weight_dtype : type, optional
Similar the `dtype`, except defines the data type for the
observation weights and exposure times.
weight_blanking_value : int or float, optional
The blanking value for the weight and exposure maps. If `None`,
will be set to np.nan if `weight_dtype` is a float, and 0 if
`weight_dtype` is an integer.
"""
if weight_blanking_value is None:
if weight_dtype == float:
weight_blanking_value = np.nan
else:
weight_blanking_value = 0.0
self.weight = Image2D(dtype=weight_dtype,
blanking_value=weight_blanking_value,)
self.exposure = Image2D(dtype=weight_dtype,
blanking_value=weight_blanking_value)
self.noise_rescale = 1.0
self.is_zero_weight_valid = False
self.weight_dtype = weight_dtype
super().__init__(data=data, blanking_value=blanking_value, dtype=dtype,
shape=shape, unit=unit)
shape = self.shape
if shape != ():
self.weight.shape = shape
self.exposure.shape = shape
[docs]
def copy(self, with_contents=True):
"""
Return a copy of the map.
Returns
-------
Observation2D
"""
return super().copy(with_contents=with_contents)
def __eq__(self, other):
"""
Check if this Observation2D instance is equivalent to another.
Parameters
----------
other : Observation2D
Returns
-------
equal : bool
"""
if not super().__eq__(other):
return False
if self.weight != other.weight:
return False
if self.exposure != other.exposure:
return False
if self.noise_rescale != other.noise_rescale:
return False
if self.is_zero_weight_valid is not other.is_zero_weight_valid:
return False
return True
[docs]
def copy_processing_from(self, other):
"""
Copy the processing from another 2-D observation.
Parameters
----------
other : Observation2D
Returns
-------
None
"""
super().copy_processing_from(other)
self.noise_rescale = other.noise_rescale
[docs]
def reset_processing(self):
"""
Reset the processing status.
Returns
-------
None
"""
super().reset_processing()
self.noise_rescale = 1.0
@property
def valid(self):
"""
Return a boolean mask array of valid data elements.
Valid elements are neither NaN, set to the blanking value, or
flagged as the validating_flags.
Returns
-------
numpy.ndarray (bool)
A boolean mask where `True` indicates a valid element.
"""
valid = self.is_valid()
if self.size == 0:
return valid
if self.weight is None or self.weight.data is None:
return valid
valid &= np.isfinite(self.weight.data)
if self.is_zero_weight_valid:
valid &= self.weight.data >= 0
else:
valid &= self.weight.data > 0
return valid
[docs]
def clear(self, indices=None):
"""
Clear flags and set data to zero. Clear history.
Parameters
----------
indices : tuple (numpy.ndarray (int)) or numpy.ndarray (bool), optional
The indices to discard. Either supplied as a boolean mask of
shape (self.data.shape).
Returns
-------
None
"""
super().clear(indices=indices)
if self.exposure is not None:
self.exposure.clear(indices=indices)
if self.weight is not None:
self.weight.clear(indices=indices)
[docs]
def discard(self, indices=None):
"""
Set the flags for discarded indices to DISCARD and data to zero.
Parameters
----------
indices : tuple (numpy.ndarray (int)) or numpy.ndarray (bool), optional
The indices to discard. Either supplied as a boolean mask of
shape (self.data.shape).
Returns
-------
None
"""
super().discard(indices=indices)
if self.exposure is not None:
self.exposure.clear(indices)
if self.weight is not None:
self.weight.clear(indices)
[docs]
def destroy(self):
"""
Destroy the image data.
Returns
-------
None
"""
super().destroy()
if self.weight is not None:
self.weight.destroy()
if self.exposure is not None:
self.exposure.destroy()
[docs]
def set_data_shape(self, shape):
"""
Set the shape of the data, weight, and exposure images.
Parameters
----------
shape : tuple (int)
Returns
-------
None
"""
super().set_data_shape(shape)
self.weight.set_data_shape(shape)
self.claim_image(self.weight)
self.exposure.set_data_shape(shape)
self.claim_image(self.exposure)
[docs]
def to_weight_image(self, data):
"""
Convert data to a weight image.
Parameters
----------
data : FlaggedArray or FitsData or numpy.ndarray or None
Returns
-------
Image2D
"""
if data is None:
data = Image2D(x_size=self.shape[1],
y_size=self.shape[0],
blanking_value=self.weight.blanking_value,
dtype=self.weight_dtype)
elif isinstance(data, np.ndarray):
data = Image2D(data=data,
blanking_value=self.weight.blanking_value,
dtype=self.weight_dtype)
return data
[docs]
def get_weights(self):
"""
Return the weights overlay.
Returns
-------
WeightMap
"""
return WeightMap(self)
[docs]
def get_weight_image(self):
"""
Return the weights image.
Returns
-------
Image2D
"""
return self.weight
[docs]
def weight_values(self):
"""
Return the array of weights.
Returns
-------
numpy.ndarray (float)
"""
return self.get_weights().data
[docs]
def set_weight_image(self, weight_image):
"""
Set the weight image.
Parameters
----------
weight_image : Image2D or numpy.ndarray or None
Returns
-------
None
"""
weight_image = self.to_weight_image(weight_image)
self.weight = weight_image
self.claim_image(weight_image)
[docs]
def get_exposures(self):
"""
Return an exposure overlay.
Returns
-------
ExposureMap
"""
return ExposureMap(self)
[docs]
def exposure_values(self):
"""
Return the array of weights.
Returns
-------
numpy.ndarray (float)
"""
return self.get_exposures().data
[docs]
def get_exposure_image(self):
"""
Return the exposure image.
Returns
-------
Image2D
"""
return self.exposure
[docs]
def set_exposure_image(self, exposure_image):
"""
Set the weight image.
Parameters
----------
exposure_image : Image2D or numpy.ndarray or None
Returns
-------
None
"""
exposure_image = self.to_weight_image(exposure_image)
self.exposure = exposure_image
self.claim_image(exposure_image)
[docs]
def get_noise(self):
"""
Return a noise overlay.
Returns
-------
NoiseMap
"""
return NoiseMap(self)
[docs]
def noise_values(self):
"""
Return the array of weights.
Returns
-------
numpy.ndarray (float)
"""
return self.get_noise().data
[docs]
def set_noise(self, noise_image):
"""
Set the noise image.
Parameters
----------
noise_image : Image2D or Overlay or numpy.ndarray or None
Returns
-------
None
"""
self.get_noise().data = noise_image
[docs]
def get_significance(self):
"""
Return a significance overlay.
Returns
-------
SignificanceMap
"""
return SignificanceMap(self)
[docs]
def significance_values(self):
"""
Return the array of significance (S2N).
Returns
-------
numpy.ndarray (float)
"""
return self.get_significance().data
[docs]
def set_significance(self, significance_image):
"""
Set the significance image.
Parameters
----------
significance_image : Image2D or Overlay or numpy.ndarray or None
Returns
-------
None
"""
self.get_significance().data = significance_image
[docs]
def scale(self, factor, indices=None):
"""
Scale the data values and weights by a given factor.
Parameters
----------
factor : float
indices : numpy.ndarray (bool), optional
Returns
-------
None
"""
super().scale(factor, indices=indices)
self.get_weight_image().scale(1.0 / (factor ** 2), indices=indices)
[docs]
def crop(self, ranges):
"""
Crop the image data.
Parameters
----------
ranges : numpy.ndarray (int,) or units.Quantity (numpy.ndarray)
The ranges to set crop the data to. Should be of shape
(n_dimensions, 2) where ranges[0, 0] would give the minimum crop
limit for the first dimension and ranges[0, 1] would give the
maximum crop limit for the first dimension. In this case, the
'first' dimension is in FITS format. i.e., (x, y) for a 2-D image.
If a Quantity is supplied this should contain the min and max
grid values to clip to in each dimension.
Returns
-------
None
"""
if isinstance(ranges, units.Quantity):
ranges = self.convert_range_value_to_index(ranges)
else:
ranges = np.asarray(self.nearest_to_offset(ranges))
self.get_weight_image().crop(ranges)
self.get_exposure_image().crop(ranges)
super().crop(ranges)
[docs]
def accumulate(self, image, weight=1.0, gain=1.0, valid=None):
"""
Add an observation image.
This is meant to be performed when the observation data array contains
the product of the actual data with the weights. This is later reset
to the standard data, weight format by performing
:func:`Observation2D.end_accumulation`.
Parameters
----------
image : Observation2D
The observation to add.
weight : float, optional
A global weighting factor for the entire image. Typically the
scan weight from which the image was derived.
gain : float, optional
A gain factor that will be applied to the image values. It will
be applied to the weighting factors as g^2 during accumulation.
valid : numpy.ndarray (bool), optional
An array where `False` excludes a datum from accumulation.
Returns
-------
None
"""
image_weight = image.get_weights().data * weight
times = image.get_exposures().data
if valid is None:
valid = image.valid
self.accumulate_at(image, gain, image_weight, times, indices=valid)
[docs]
def accumulate_at(self, image, gains, weights, times, indices=None):
"""
Accumulate at given indices.
The data are accumulated as: image * gains * weights
The weights are accumulated as: weights * gains^2
The exposures are accumulated as: times
Parameters
----------
image : FlaggedArray or numpy.ndarray or float
gains : FlaggedArray or numpy.ndarray or float
weights : FlaggedArray or numpy.ndarray or float
times : FlaggedArray or numpy.ndarray or float
indices : numpy.ndarray (bool or int), optional
A boolean mask adds to those indices on self.data marked
as `True`. If so, image/weights/times etc should be the same
shape as self.data of scalar values.
Returns
-------
None
"""
if isinstance(image, FlaggedArray):
image = image.data
if isinstance(weights, FlaggedArray):
weights = weights.data
if isinstance(times, FlaggedArray):
times = times.data
if isinstance(gains, FlaggedArray):
gains = gains.data
wg = weights * gains
add_data = wg * image
add_weight = wg * gains
add_time = times
self.add(add_data, indices=indices)
self.get_weight_image().add(add_weight, indices=indices)
self.get_exposure_image().add(add_time, indices=indices)
[docs]
def merge_accumulate(self, image):
"""
Merge and accumulate an image onto this one.
Parameters
----------
image : Observation2D
Returns
-------
None
"""
super().add(image)
self.merge_properties_from(image)
self.get_weight_image().add(image.get_weights())
self.get_exposure_image().add(image.get_exposures())
[docs]
def end_accumulation(self):
"""
End the accumulation process by dividing the data values by the weight.
Zero-valued weights are ignored.
Returns
-------
None
"""
inverse_weight = self.weight.data.copy()
nzi = np.nonzero(inverse_weight)
inverse_weight[nzi] = 1.0 / inverse_weight[nzi]
super().scale(inverse_weight)
[docs]
def get_chi2(self, robust=True):
"""
Return the Chi-squared statistic.
Parameters
----------
robust : bool, optional
If `True`, use the 'robust' (median) method.
Returns
-------
float
"""
significance = self.significance_values()[self.valid]
if significance.size == 0:
return np.nan
variance = significance ** 2
if robust:
return numba_functions.smart_median(
variance, max_dependence=1)[0] / 0.454937
else:
return np.nanmean(variance)
[docs]
def mean(self, weights=None):
"""
Return the weighted mean.
Parameters
----------
weights : numpy.ndarray (float), optional
An array of weights.
Returns
-------
mean, weight : float, float
"""
if weights is None and self.weight is not None:
weights = self.weight.data
return super().mean(weights=weights)
[docs]
def reweight(self, robust=True):
"""
Re-weight the observation
Parameters
----------
robust : bool, optional
If `True`, use the 'robust' (median) method to determine the
chi2 statistic.
Returns
-------
None
"""
chi2 = self.get_chi2(robust=robust)
if chi2 == 0:
return
weight_correction = 1.0 / self.get_chi2(robust=robust)
self.get_weight_image().scale(weight_correction)
self.noise_rescale *= 1.0 / np.sqrt(weight_correction)
[docs]
def unscale_weights(self):
"""
Undo the weight rescaling.
Returns
-------
None
"""
self.get_weight_image().scale(self.noise_rescale ** 2)
self.noise_rescale = 1.0
[docs]
def mem_correct_observation(self, model, lg_multiplier):
"""
Apply a maximum entropy correction given a model.
Parameters
----------
model : numpy.ndarray or FlaggedArray or None
The model from which to base MEM correction. Should be of shape
(self.shape).
lg_multiplier : float
The Lagrange multiplier (lambda) for the MEM correction.
Returns
-------
None
"""
noise = self.get_noise().data
if isinstance(model, FlaggedArray):
model_data = model.data
else:
model_data = model
self.mem_correct(model_data, noise, lg_multiplier)
[docs]
def smooth(self, beam_map, reference_index=None, weights=None):
"""
Smooth the data with a given beam map kernel.
Parameters
----------
beam_map : numpy.ndarray (float)
reference_index : numpy.ndarray (float), optional
The reference index (center) of the beam_map kernel. By default
this will be set to (beam_map.shape - 1)[::-1] / 2. Note that the
reference index should by supplied in (x, y) order for FITS.
weights : numpy.ndarray (float), optional
If not supplied, defaults to the observation weights.
Returns
-------
None
"""
if weights is None:
weights = self.weight.data
if reference_index is not None:
# Reverse this since reference index is passed in as (x, y) order
# but numpy expects (y, x) order.
reference_index = reference_index[::-1]
smoothed, smoothed_weight = self.get_smoothed(
beam_map, reference_index=reference_index, weights=weights)
smoothed_exposure, _ = self.get_exposure_image().get_smoothed(
beam_map, reference_index=reference_index, weights=weights)
self.set_image(smoothed)
self.set_weight_image(smoothed_weight)
self.set_exposure_image(smoothed_exposure)
self.add_smoothing(
self.default_beam().get_equivalent(beam_map, self.grid.resolution))
[docs]
def fast_smooth(self, beam_map, steps, reference_index=None, weights=None):
"""
Smooth the data with a given beam map kernel using fast method.
Notes
-----
This isn't fast compared to standard smooth as it requires an
additional spline interpolation step.
Parameters
----------
beam_map : numpy.ndarray (float)
steps : numpy.ndarray (int)
The kernel steps in each dimension.
reference_index : numpy.ndarray (float), optional
The reference index (center) of the beam_map kernel. By default
this will be set to (beam_map.shape - 1)[::-1] / 2. Note that the
reference index should by supplied in (x, y) order for FITS.
weights : numpy.ndarray (float), optional
If not supplied, defaults to the observation weights.
Returns
-------
None
"""
if weights is None:
weights = self.weight.data
smoothed, smoothed_weight = self.get_fast_smoothed(
beam_map, steps, reference_index=reference_index, weights=weights,
get_weights=True)
smoothed_exposure = self.get_exposure_image().get_fast_smoothed(
beam_map, steps, reference_index=reference_index, weights=weights,
get_weights=False)
self.set_image(smoothed)
self.set_weight_image(smoothed_weight)
self.set_exposure_image(smoothed_exposure)
self.add_smoothing(
self.default_beam().get_equivalent(beam_map, self.grid.resolution))
[docs]
def filter_correct(self, underlying_fwhm, reference=None, valid=None):
"""
Apply filter correction.
Parameters
----------
underlying_fwhm : astropy.units.Quantity
reference : FlaggedArray or numpy.ndarray, optional
valid : numpy.ndarray (bool), optional
Returns
-------
None
"""
if reference is None:
reference = self.significance_values()
super().filter_correct(underlying_fwhm, reference=reference,
valid=valid)
[docs]
def undo_filter_correct(self, reference=None, valid=None):
"""
Undo the last filter correction.
Parameters
----------
reference : FlaggedArray or numpy.ndarray (float), optional
The data set to determine valid data within the blanking range.
Defaults to self.data.
valid : numpy.ndarray (bool), optional
`True` indicates a data element that may have the filter correction
factor un-applied.
Returns
-------
None
"""
if reference is None:
reference = self.significance_values()
super().undo_filter_correct(reference=reference, valid=valid)
[docs]
def fft_filter_above(self, fwhm, valid=None, weight=None):
"""
Apply FFT filtering above a given FWHM.
Parameters
----------
fwhm : astropy.units.Quantity
valid : numpy.ndarray (bool), optional
weight : FlaggedArray or numpy.ndarray (float)
Returns
-------
None
"""
if weight is None:
weight = self.weight
super().fft_filter_above(fwhm, valid=valid, weight=weight)
[docs]
def resample_from_map(self, obs2d, weights=None):
"""
Resample from one map to another.
Parameters
----------
obs2d : Observation2D
The map to resample from.
weights : numpy.ndarray (float), optional
Optional weights to use during the resampling. If not supplied,
defaults to the weights of the supplied observation.
Returns
-------
None
"""
if not isinstance(obs2d, Observation2D):
raise ValueError(f"{self.__class__} cannot be resampled from "
f"{obs2d}.")
if weights is None:
weights = obs2d.weight
beam = self.get_anti_aliasing_beam_image_for(obs2d)
map_indices = self.get_index_transform_to(obs2d)
self.resample_from(obs2d, map_indices, kernel=beam, weights=weights)
self.get_exposure_image().resample_from(
obs2d.get_exposure_image(), map_indices, kernel=beam,
weights=weights)
self.get_weight_image().resample_from(
obs2d.get_weight_image(), map_indices, kernel=beam, weights=None)
self.copy_processing_from(obs2d)
[docs]
def get_table_entry(self, name):
"""
Return a parameter value for a given name.
Parameters
----------
name : str
The name of the entry to retrieve.
Returns
-------
value
"""
if name == 'depth':
return self.get_weights().mean()[0] / self.unit.value
return super().get_table_entry(name)
[docs]
def get_hdus(self):
"""
Return the FITS HDUs for the observation.
Returns
-------
hdus: list (astropy.io.fits.hdu.base.ExtensionHDU)
"""
hdus = super().get_hdus()
hdu = self.get_exposures().create_hdu()
ext_comment = 'Identifier of data contained in this HDU'
hdu.header['EXTNAME'] = 'Exposure', ext_comment
self.edit_header(hdu.header)
hdus.append(hdu)
hdu = self.get_noise().create_hdu()
hdu.header['EXTNAME'] = 'Noise', ext_comment
self.edit_header(hdu.header)
hdus.append(hdu)
hdu = self.get_significance().create_hdu()
hdu.header['EXTNAME'] = 'S/N', ext_comment
self.edit_header(hdu.header)
hdus.append(hdu)
return hdus
[docs]
def get_info(self):
"""
Get a list of info strings for the observation.
Returns
-------
list of str
"""
info = super().get_info()
if self.noise_rescale != 1.0:
info.append(f'Noise re-scaling: {self.noise_rescale:.2f}x '
f'(from image variance).')
return info
[docs]
def index_of_max(self, sign=1, data=None):
"""
Return the maximum value and index of maximum value.
Parameters
----------
sign : int or float, optional
If positive, find the maximum value in the array. If negative,
find the minimum value in the array. If zero, find the maximum
magnitude in the array.
data : numpy.ndarray (float), optional
The data array to examine. Default is the significance values.
Returns
-------
maximum_value, maximum_index : float, int
"""
if data is None:
data = self.significance_values()
return super().index_of_max(sign=sign, data=data)