# Licensed under a 3-clause BSD style license - see LICENSE.rst
from astropy import log, units
from astropy.stats import gaussian_fwhm_to_sigma
import numpy as np
from scipy.optimize import curve_fit
import warnings
from sofia_redux.scan.source_models.beams.gaussian_2d import Gaussian2D
from sofia_redux.scan.coordinate_systems.coordinate_2d import Coordinate2D
from sofia_redux.scan.coordinate_systems.index_2d import Index2D
from sofia_redux.scan.utilities.utils import (
round_values, to_header_quantity, UNKNOWN_FLOAT_VALUE)
__all__ = ['GaussianSource']
[docs]
class GaussianSource(Gaussian2D):
def __init__(self, peak=1.0, x_mean=0.0, y_mean=0.0,
x_fwhm=0.0, y_fwhm=0.0, theta=0.0 * units.Unit('deg'),
peak_unit=None, position_unit=None, gaussian_model=None):
"""
Initialize a GaussianSource model.
The GaussianSource is an extension of the Gaussian2D model that allows
the model to be fit to (or from) a source map.
Parameters
----------
peak : float or units.Quantity, optional
The peak amplitude of the Gaussian.
x_mean : float or units.Quantity, optional
The position of the peak along the x-axis.
y_mean : float or units.Quantity, optional
The position of the peak along the y-axis.
x_fwhm : float or units.Quantity, optional
The Full-Width-Half-Max beam width in the x-direction.
y_fwhm : float or units.Quantity, optional
The Full-Width-Half-Max beam width in the y-direction.
theta : float or units.Quantity, optional
The rotation of the beam pertaining to `x_fwhm` and `y_fwhm` in
relation to the actual (x, y) coordinate axis. If a float value is
supplied, it is assumed to be in degrees.
peak_unit : units.Unit or units.Quantity or str, optional
The physical units for the peak amplitude. The default is
dimensionless.
position_unit : units.Unit or units.Quantity or str, optional
The physical units of all position based parameters (`x_mean`,
`y_mean`, `x_fwhm`, `y_fwhm`)
gaussian_model : Gaussian2D, optional
If supplied, extracts all the above parameters from the supplied
model.
"""
self.positioning_method = 'position'
self.coordinates = None
self.source_mask = None
self.source_radius = None
self.source_sum = 0.0
self.grid = None
self.center_index = None
self.peak_weight = 1.0
self.fwhm_weight = 1.0
self.is_corrected = False
if isinstance(gaussian_model, Gaussian2D):
super().__init__(peak=gaussian_model.peak,
x_mean=gaussian_model.x_mean,
y_mean=gaussian_model.y_mean,
x_fwhm=gaussian_model.x_fwhm,
y_fwhm=gaussian_model.y_fwhm,
theta=gaussian_model.theta,
peak_unit=gaussian_model.unit)
else:
super().__init__(peak=peak, x_mean=x_mean, y_mean=y_mean,
x_fwhm=x_fwhm, y_fwhm=y_fwhm, theta=theta,
peak_unit=peak_unit, position_unit=position_unit)
[docs]
def copy(self):
"""
Return a copy of the Gaussian source.
Returns
-------
GaussianSource
"""
return super().copy()
@property
def referenced_attributes(self):
"""
Return the names of attributes to be referenced rather than copied.
Returns
-------
set of str
"""
attrs = super().referenced_attributes
attrs.add('grid')
return attrs
@property
def position(self):
"""
Return the (x, y) position coordinate.
Returns
-------
astropy.units.Quantity
"""
return Coordinate2D([self.x_mean, self.y_mean])
@position.setter
def position(self, value):
"""
Set the peak position.
Parameters
----------
value : Coordinate2D
Returns
-------
None
"""
self.set_peak_position(value)
@property
def peak_significance(self):
"""
Return the peak significance.
Returns
-------
float
"""
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
return np.abs(self.peak) * np.sqrt(self.peak_weight)
@property
def peak_rms(self):
"""
Return the peak rms.
Returns
-------
float
"""
weight = self.peak_weight
if weight > 0:
return 1.0 / np.sqrt(weight)
else:
return weight * 0
@property
def fwhm_significance(self):
"""
Return the FWHM significance.
Returns
-------
float
"""
return (np.abs(self.fwhm) * np.sqrt(self.fwhm_weight)).value
@property
def fwhm_rms(self):
"""
Return the FWHM RMS.
Returns
-------
float or astropy.units.Quantity
"""
weight = self.fwhm_weight
if isinstance(self.fwhm, units.Quantity):
rms_unit = self.fwhm.unit
else: # pragma: no cover
rms_unit = None
rms = 0.0 if weight <= 0 else 1.0 / np.sqrt(self.fwhm_weight)
if not isinstance(rms, units.Quantity) and rms_unit is not None:
rms = rms * rms_unit # pragma: no cover
return rms
def __eq__(self, other):
"""
Test if this GaussianSource is functionally equivalent to another.
Note that only the model is tested for equality. No factors such as
grid or coordinates will be examined.
Parameters
----------
other : GaussianSource
Returns
-------
equal : bool
"""
if self is other:
return True
if not super().__eq__(other):
return False
if self.is_corrected != other.is_corrected:
return False
return True
[docs]
def set_positioning_method(self, method):
"""
Set the peak positioning method.
Parameters
----------
method : str
May be one of {'position', 'peak', 'centroid'}.
Returns
-------
None
"""
methods = ['position', 'centroid', 'peak']
s = str(method).lower().strip()
if s not in methods:
raise ValueError(f"Available positioning methods are {methods}. "
f"Received {method}.")
if s == 'peak':
s = 'position'
self.positioning_method = s
[docs]
def set_peak_position(self, peak_position):
"""
Set the peak position.
Parameters
----------
peak_position : Coordinate2D or Quantity or numpy.ndarray
The (x, y) peak position.
Returns
-------
None
"""
if isinstance(peak_position, Coordinate2D):
self.x_mean = peak_position.x
self.y_mean = peak_position.y
self.coordinates = peak_position.copy()
elif isinstance(peak_position, np.ndarray):
if peak_position.shape != ():
self.x_mean = peak_position[0]
self.y_mean = peak_position[1]
self.coordinates = Coordinate2D(peak_position)
else: # pragma: no cover
self.x_mean = peak_position
self.y_mean = peak_position
self.coordinates = Coordinate2D([peak_position, peak_position])
[docs]
def fit_map_least_squares(self, map2d, degree=3, reduce_degrees=False):
"""
Fit the Gaussian to a given map using LSQ method (adaptTo).
Parameters
----------
map2d : Map2D or Observation2D
degree : int, optional
The spline degree used to fit the map peak value.
reduce_degrees : bool, optional
If `True`, allow the spline fit to reduce the number of degrees
in cases where there are not enough points available to perform
the spline fit of `degree`. If `False`, a ValueError will be
raised if such a fit fails.
Returns
-------
data_sum : float
The sum of the source withing the source radius.
"""
if hasattr(map2d, 'get_significance'):
# Use significance if an Observation2D object
image = map2d.get_significance()
else:
image = map2d
# Get peak in (x, y) indexing
self.grid = map2d.grid
data = image.data.copy()
func, p0, bounds = self.get_lsq_fit_parameters(image)
y, x = np.indices(data.shape)
d, x, y = data.ravel(), x.ravel(), y.ravel()
p_opt, p_cov = curve_fit(func, (x, y), d, p0=p0, bounds=bounds)
amplitude, x0, y0, pixel_stddev = p_opt
self.set_center_index(Coordinate2D([x0, y0]))
# fwhm = sqrt(i * a / peak_value) / fwhm2size
# fwhm * fwhm2size = sqrt(i * a / peak_value)
# (fwhm * fwhm2size)^2 = i * a / peak_value
# (peak_value) / a * (fwhm * fwhm2size)^2 = i
fwhm_pix = pixel_stddev / gaussian_fwhm_to_sigma
fwhm = fwhm_pix * np.sqrt(self.grid.get_pixel_area())
self.fwhm = fwhm
if image is not map2d:
self.set_peak_from(map2d, degree=degree,
reduce_degrees=reduce_degrees)
fwhm_rms = np.sqrt(2) * fwhm / self.peak_significance
self.fwhm_weight = 1 / (fwhm_rms ** 2)
integrated_value = self.peak * (
(fwhm_pix * self.FWHM_TO_SIZE) ** 2)
return integrated_value
[docs]
def get_lsq_fit_parameters(self, image):
"""
Return the LSQ fit parameters for curve_fit.
Determines the entry parameters for a non-linear least squares fit to
the Gaussian source. These are the initial guess parameters for the
peak amplitude, (x, y) mean, and standard deviation. The bounds for
these parameters are also returned. For the GaussianSource, the
amplitude and centroid location are unbounded, and a minimum bound on
the standard deviation is set, based on the currently set FWHM.
Parameters
----------
image : Observation2D or SignificanceMap
Returns
-------
function, initial_values, bounds
"""
peak_coordinates = self.find_peak(image, sign=0)
x0, y0 = peak_coordinates.coordinates
amplitude = image.data[round_values(y0), round_values(x0)]
pixel_size = self.grid.get_pixel_size().coordinates
fwhm_pix = (self.fwhm / pixel_size.min()).decompose().value
sigma_pix = gaussian_fwhm_to_sigma * fwhm_pix
sigma_pix = np.max([1, sigma_pix])
p0 = (amplitude, x0, y0, sigma_pix)
bounds = ((-np.inf, -np.inf, -np.inf, sigma_pix),
(np.inf, np.inf, np.inf, np.inf))
return self.gaussian_2d_fit, p0, bounds
[docs]
@staticmethod
def gaussian_2d_fit(coordinates, amplitude, x0, y0, sigma):
"""
A simple 2-dimensional Gaussian function.
The return value is:
z = A.exp((-(x - x0)^2) / (2 sigma^2)) +
(-(y - y0)^2) / (2 sigma^2)))
Parameters
----------
coordinates : 2-tuple (numpy.ndarray)
The (x, y) coordinates to evaluate.
amplitude : float
The scaling factor.
x0 : float
The center of the Gaussian in x.
y0 : float
The center of the Gaussian in y.
sigma : float
The Gaussian standard deviation.
Returns
-------
z : numpy.ndarray (float)
The derived function value. Will be the same shape as x or y
in the `coordinates`.
"""
dx = coordinates[0] - x0
dy = coordinates[1] - y0
a = 2 * (sigma ** 2)
x1 = -(dx ** 2) / a
x2 = -(dy ** 2) / a
return amplitude * np.exp(x1 + x2)
[docs]
def fit_map(self, map2d, max_iterations=40, radius_increment=1.1,
tolerance=0.05, degree=3, reduce_degrees=False):
"""
Fit the Gaussian source to a given map.
An Observation2D object or Map2D overlap object may be supplied
and fit with a Gaussian source. If an Observation2D is supplied, the
Gaussian is fit to the significance image, and thus the FWHM rms may
also be determined. Otherwise, all other parameters aside from the
FWHM weight will be fit for.
Parameters
----------
map2d : Map2D or Observation2D
max_iterations : int, optional
The maximum number of iterations by which to increase the radius.
radius_increment : float, optional
The factor by which to increase the radius for each iteration. The
initial radius size is set to min(grid.pixel_size).
tolerance : float, optional
Stop the iterations if (1-tolerance) * last_sum <= sum <=
(1+tolerance).
degree : int, optional
The spline degree used to determine the peak value if necessary.
reduce_degrees : bool, optional
If `True`, allow the spline fit to reduce the number of degrees
in cases where there are not enough points available to perform
the spline fit of `degree`. If `False`, a ValueError will be
raised if such a fit fails.
Returns
-------
data_sum : float
The sum of the source withing the source radius.
"""
if hasattr(map2d, 'get_significance'):
# Use significance if an Observation2D object
image = map2d.get_significance()
else:
image = map2d
# Get peak in (x, y) indexing
self.grid = map2d.grid
self.set_center_index(self.find_peak(image, sign=0))
self.find_source_extent(image, max_iterations=max_iterations,
radius_increment=radius_increment,
tolerance=tolerance)
# Calculate the peak value and fwhm from the image (maybe significance)
self.set_peak_from(image, degree=degree, reduce_degrees=reduce_degrees)
fwhm2 = np.abs(self.source_sum
* self.grid.get_pixel_area()
/ self.peak)
self.fwhm = np.sqrt(fwhm2) / self.FWHM_TO_SIZE
fwhm_unit = self.fwhm.unit
self.fwhm_weight = 1.0 / (fwhm_unit ** 2)
# If the image is not the map, calculate the actual peak value.
if image is not map2d:
# Sets the peak, peak weight, and unit.
self.set_peak_from(map2d, degree=degree,
reduce_degrees=reduce_degrees)
fwhm_rms = np.sqrt(2.0) * self.fwhm / self.peak_significance
if fwhm_rms != 0 and not np.isnan(fwhm_rms):
self.fwhm_weight = 1.0 / (fwhm_rms ** 2)
else:
self.fwhm_weight = np.inf / (fwhm_unit ** 2)
fwhm_pix = self.fwhm / np.sqrt(self.grid.get_pixel_area())
integrated_value = self.peak * (
(fwhm_pix * self.FWHM_TO_SIZE) ** 2)
return integrated_value
[docs]
def set_center_index(self, center_index):
"""
Set the center pixel index on the image for the source.
Parameters
----------
center_index : Coordinate2D
The (x, y) center index.
Returns
-------
None
"""
self.position = self.get_grid_coordinates(center_index)
self.center_index = center_index
[docs]
def get_grid_coordinates(self, index):
"""
Return the grid coordinates for the given map indices.
Parameters
----------
index : Coordinate2D
The dimensionless map pixel coordinates.
Returns
-------
coordinates : Coordinate2D
The physical map coordinates for the given `index`.
"""
offset = self.grid.index_to_offset(index)
grid_coordinates = self.grid.reference.copy()
self.grid.projection.deproject(offset, coordinates=grid_coordinates)
return grid_coordinates
[docs]
def find_source_extent(self, image, max_iterations=40,
radius_increment=0.1, tolerance=0.05):
"""
Find the extent of the source and shape.
Calculates the integral of the source, a circular source radius, and
a source mask. The algorithm for determining the source iterates using
an expanding circular aperture centered on the currently defined
`center_index` attribute. The iteration will be halted, and parameters
updated once either the `max_iteration` number of iterations has been
reached, or::
data_sum <= last_data_sum * (1 + tolerance)
The data_sum is the sum of data values inside the circular aperture,
and the last_data_sum is that same sum from the previous iteration.
Each iteration the radius of the aperture is increased by::
min(pixel_size, radius_increment * current_radius)
The starting radius is equal to the pixel size in the data.
Parameters
----------
image : FlaggedArray
max_iterations : int, optional
The maximum number of iterations, each of which increases the
search radius by `radius_increment`.
radius_increment : float, optional
The factor by which to increase the search radius between
iterations.
tolerance : float, optional
Halt iterations if the change in data sum is less than
1 + `tolerance` between iterations.
Returns
-------
None
"""
offset = Coordinate2D(np.indices(image.shape)[::-1]) # numpy to FITS
offset.subtract(self.center_index)
pixel_size = self.grid.get_pixel_size()
offset.scale(pixel_size)
grow = np.min(pixel_size.coordinates)
radius = offset.length
search_radius = 1.0 * grow
sort_index = np.argsort(radius.ravel())
radius_sort = radius.ravel()[sort_index]
data_sort = image.data.ravel()[sort_index]
search_index = np.nonzero(radius_sort <= search_radius)[0]
if len(search_index) == 0:
search_index = 0
else:
search_index = search_index[-1]
tolerance_check = 1 + tolerance
last_data_sum = np.sum(data_sort[:search_index + 1])
for iteration in range(max_iterations):
break_limit = tolerance_check * last_data_sum
search_radius += min([grow, radius_increment * search_radius])
search_index = np.nonzero(radius_sort <= search_radius)[0]
if len(search_index) == 0:
search_index = 0
else:
search_index = search_index[-1]
data_sum = np.sum(data_sort[:search_index + 1])
if data_sum == 0: # may be the case for simulated data/pixel maps
last_data_sum = 0.0
continue
if data_sum >= 0:
if data_sum <= break_limit:
break
else:
if data_sum >= break_limit:
break
last_data_sum = data_sum
self.source_radius = search_radius
self.source_mask = radius <= search_radius
self.source_sum = last_data_sum
[docs]
def get_center_offset(self, offset=None):
"""
Find the offset of the source position from the grid reference.
Parameters
----------
offset : Coordinate2D, optional
An optional offset into which to place the results.
Returns
-------
offset : Coordinate2D
"""
center_offset = self.grid.index_to_offset(self.center_index)
if offset is None:
return center_offset
offset.copy_coordinates(center_offset)
return offset
[docs]
def find_peak(self, image, grid=None, sign=0):
"""
Fit the peak coordinates from a given map. (moveTo)
Parameters
----------
image : FlaggedArray (float)
The image to fit.
grid : SphericalGrid
The grid to fit onto.
sign : int or float, optional
If positive, fit to positive sources in the map. If negative,
fit to negative sources in the map. If zero, fit to source
magnitudes in the map. Note, this only takes affect if `position`
is set as the positioning_method.
Returns
-------
peak : Coordinate2D
The (x, y) peak position.
"""
# Note that the peak position is in terms of pixels (y, x)
if self.positioning_method == 'position':
peak = self.find_local_peak(image, sign=sign)
elif self.positioning_method == 'centroid':
peak = self.find_local_centroid(image)
else:
raise ValueError(
f"Unknown positioning method: {self.positioning_method}.")
# peak is in pixel positions (y, x) format
if grid is None:
return peak
else:
return grid.projection.deproject(peak)
[docs]
@staticmethod
def find_local_peak(image, sign=0):
"""
Find the local peak in a map using the 'position' method.
Parameters
----------
image : FlaggedArray
The image to fit.
sign : int or float, optional
If positive, fit to positive sources in the map. If negative,
fit to negative sources in the map. If zero, fit to source
magnitudes in the map. Note, this only takes affect if `position`
is set as the positioning_method.
Returns
-------
peak : Coordinate2D
The (x, y) pixel peak position.
"""
peak_value, peak_index = image.index_of_max(sign=sign)
if isinstance(peak_index, Coordinate2D):
peak_index = Index2D(peak_index.coordinates)
else: # pragma: no cover
peak_index = Index2D(peak_index)
return Coordinate2D(image.get_refined_peak_index(peak_index))
[docs]
@staticmethod
def find_local_centroid(image):
"""
Find the local peak in a map using the 'centroid' method.
Parameters
----------
image : FlaggedArray
Returns
-------
peak : Coordinate2D
The (x, y) peak position.
"""
indices = np.indices(image.shape)[::-1] # to (x, y) FITS format
data = image.data
w = np.abs(data) * image.valid
w[np.isnan(w)] = 0.0
w_sum = w.sum()
wd_sum = np.empty(image.ndim, dtype=float)
for dimension in range(image.ndim):
wd_sum[dimension] = np.sum(indices[dimension] * w)
return Coordinate2D(wd_sum / w_sum)
[docs]
def set_peak_from(self, image, degree=3, reduce_degrees=False):
"""
Set the peak value from a given image.
The peak value is determined from spline interpolation on the image
at `self.center_index`. This will also set the weight of the peak
as exact (infinity) or will interpolate from a weight map if available.
Parameters
----------
image : FlaggedArray or Map2D
degree : int, optional
The spline degree to fit.
reduce_degrees : bool, optional
If `True`, allow the spline fit to reduce the number of degrees
in cases where there are not enough points available to perform
the spline fit of `degree`. If `False`, a ValueError will be
raised if such a fit fails.
Returns
-------
None
"""
peak_value = image.value_at(self.center_index, degree=degree,
reduce_degrees=reduce_degrees)
unit = getattr(image, 'unit', 1.0 * units.dimensionless_unscaled)
self.peak = peak_value
if hasattr(image, 'weight'):
self.peak_weight = image.get_weights().value_at(
self.center_index, degree=degree,
reduce_degrees=reduce_degrees)
else:
self.set_exact() # peak weight set to infinity
self.set_unit(unit)
[docs]
def set_unit(self, unit):
"""
Set the unit of the Gaussian peak.
Parameters
----------
unit : units.Quantity or units.Unit or float or str or None
Returns
-------
None
"""
if unit is None:
unit_value = 1.0
unit_type = units.dimensionless_unscaled
elif isinstance(unit, units.Quantity):
unit_value = unit.value
unit_type = unit.unit
elif isinstance(unit, (units.Unit, str)):
unit = units.Unit(unit)
unit_value = 1.0
unit_type = unit
else: # assume a number
unit_value = float(unit)
unit_type = units.dimensionless_unscaled
self.unit = unit_type
self.scale_peak(unit_value)
[docs]
def scale_peak(self, factor):
"""
Scale the peak by a given amount.
Parameters
----------
factor : float
Returns
-------
None
"""
if factor == 1:
return
self.peak = self.peak * factor
self.peak_weight /= factor ** 2
[docs]
def scale_fwhm(self, factor):
"""
Scale the fwhm by a given amount
Parameters
----------
factor : float
Returns
-------
None
"""
if factor == 1:
return
self.fwhm *= factor
self.fwhm_weight /= factor ** 2
[docs]
def set_exact(self):
"""
Set the peak to be 'exact'.
Returns
-------
None
"""
self.peak_weight = np.inf
[docs]
def get_correction_factor(self, map2d):
"""
Get the correction factor for a given map.
Notes
-----
The original CRUSH would always return a value of 1, since the logic
based on the map2d.filter_fwhm returns 1 if it is not NaN, and sets the
filtering factor to 0 if it is NaN. This has been remedied here, but
does not appear to actually be used during the reduction.
Parameters
----------
map2d : Map2D
Returns
-------
correction_factor : float
"""
if not map2d.is_filtered(): # map2d.filter_fwhm = NaN
return 1.0
if map2d.is_filter_blanked():
filter_fraction = min(
map2d.filter_blanking / self.peak_significance, 1.0)
else:
filter_fraction = 1.0
filtering = 1.0 - (1.0 / map2d.get_filter_correction_factor())
correction = 1.0 / (1.0 - filtering * filter_fraction)
return correction
[docs]
def correct(self, map2d):
"""
Apply peak value correction from a map.
Parameters
----------
map2d : Map2d
Returns
-------
None
"""
if self.is_corrected:
log.warning("Source is already corrected.")
return
self.scale_peak(self.get_correction_factor(map2d))
self.is_corrected = True
[docs]
def uncorrect(self, map2d):
"""
Uncorrect the peak value for a given map.
Parameters
----------
map2d : Map2d
Returns
-------
None
"""
if not self.is_corrected:
log.warning("Source is already uncorrected")
return
self.scale_peak(1.0 / self.get_correction_factor(map2d))
self.is_corrected = False
[docs]
def get_gaussian_2d(self):
"""
Return a representation of the beam as a Gaussian2D object.
Returns
-------
Gaussian2D
"""
return Gaussian2D(peak=self.peak,
x_fwhm=self.x_fwhm,
y_fwhm=self.y_fwhm,
x_mean=self.x_mean,
y_mean=self.y_mean,
theta=self.theta)
[docs]
def deconvolve_with(self, psf):
"""
Deconvolve with a given psf.
Note that this process will result in a circular beam. I.e. one in
which the FWHM in the x and y-directions are equivalent.
Parameters
----------
psf : Gaussian2D
Returns
-------
None
"""
factor = self.fwhm
beam = self.get_gaussian_2d()
beam.deconvolve_with(psf)
self.fwhm = beam.get_circular_equivalent_fwhm()
new_fwhm = self.fwhm
if isinstance(self.fwhm, units.Quantity):
weight_unit = (1 / self.fwhm.unit) ** 2
else: # pragma: no cover
weight_unit = None
if new_fwhm == 0:
self.fwhm_weight = 0.0
else:
factor /= new_fwhm
self.fwhm_weight /= factor ** 2
if weight_unit is not None:
if not isinstance(self.fwhm_weight, units.Quantity) or (
self.fwhm_weight.unit == units.dimensionless_unscaled):
self.fwhm_weight = self.fwhm_weight * weight_unit
[docs]
def convolve_with(self, psf):
"""
Convolve with a given PSF.
Convolve the current beam with a given PSF. The shape of the beam
will be accounted for correctly in both (x, y) dimensions unlike
the `deconvolve_with` method.
Parameters
----------
psf : Gaussian2D
Returns
-------
None
"""
factor = self.fwhm
beam = self.get_gaussian_2d()
beam.convolve_with(psf)
self.set_xy_fwhm(beam.x_fwhm, beam.y_fwhm)
new_fwhm = self.fwhm
if new_fwhm == 0:
self.fwhm_weight = 0.0
else:
factor /= new_fwhm
self.fwhm_weight /= factor ** 2
if isinstance(self.fwhm, units.Quantity):
weight_unit = (1 / self.fwhm.unit) ** 2
if not isinstance(self.fwhm_weight, units.Quantity) or (
self.fwhm_weight.unit == units.dimensionless_unscaled):
self.fwhm_weight = self.fwhm_weight * weight_unit
[docs]
def get_integral(self, psf_area):
"""
Return the integral over a given area.
Parameters
----------
psf_area : astropy.units.Quantity or float
The area over which to evaluate the integral.
Returns
-------
integral, weight : (float, float) or (Quantity, Quantity)
"""
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
factor = self.area / psf_area
if isinstance(factor, units.Quantity):
factor = factor.decompose().value
integral = self.peak * factor
weight = self.peak_weight / (factor ** 2)
return integral, weight
[docs]
def pointing_info(self, map2d):
"""
Return a list of strings with pointing information.
Parameters
----------
map2d : Map2d
Returns
-------
info : list (str)
"""
peak = to_header_quantity(self.peak, unit=self.unit)
peak_value = peak.value if isinstance(peak, units.Quantity) else peak
if peak_value == UNKNOWN_FLOAT_VALUE:
peak_value = self.peak
unit_str = f' {peak.unit}' if isinstance(peak, units.Quantity) else ''
info = [f'Peak: {peak_value:.5f}{unit_str} '
f'(S/N ~ {self.peak_significance:.5f})']
size_unit = map2d.display_grid_unit
integral, integral_weight = self.get_integral(
map2d.underlying_beam.area)
if integral_weight > 0:
integral_rms = 1.0 / np.sqrt(integral_weight)
else:
integral_rms = integral_weight * 0
if integral_rms != 0:
info.append(
f'Integral: {integral:.4f} +- {integral_rms:.4f}{unit_str}')
else:
info.append(f'Integral: {integral:.4f}{unit_str}')
fwhm = to_header_quantity(self.fwhm, unit=size_unit)
if size_unit is None and isinstance(fwhm, units.Quantity):
size_unit = fwhm.unit
fwhm_rms = to_header_quantity(self.fwhm_rms, unit=size_unit)
if size_unit is None and isinstance(
fwhm_rms, units.Quantity): # pragma: no cover
# Cannot reach during normal operation
size_unit = fwhm_rms.unit
fwhm = to_header_quantity(fwhm, unit=size_unit)
if isinstance(fwhm, units.Quantity):
unit_str = f' ({fwhm.unit})'
fwhm = fwhm.value
fwhm_rms = fwhm_rms.value
else:
unit_str = ''
if fwhm_rms == 0:
info.append(f'FWHM: {fwhm:.4f}{unit_str}')
else:
info.append(f'FWHM: {fwhm:.4f} +- {fwhm_rms:.4f}{unit_str}')
return info
[docs]
def get_asymmetry_2d(self, image, angle, radial_range):
"""
Return the asymmetry of the GaussianSource.
Parameters
----------
image : Image2D
The image from which to determine the asymmetry.
angle : units.Quantity
The rotation of the image with respect to the GaussianSource.
radial_range : Range
The range over which the asymmetry should be calculated.
Returns
-------
asymmetry : Asymmetry2D
"""
return image.get_asymmetry_2d(self.grid, self.center_index,
angle, radial_range)
[docs]
def get_representation(self, grid):
"""
Return a representation of the Gaussian source on a new grid.
Parameters
----------
grid : Grid2D
The new grid to apply to a copy of this GaussianSource.
Returns
-------
GaussianSource
A copy of the Gaussian source on the new grid.
"""
center_offset = self.grid.index_to_offset(self.center_index)
new = self.copy()
new.grid = grid.copy()
new.set_center_index(new.grid.offset_to_index(center_offset))
return new
[docs]
def get_data(self, map2d, size_unit=None):
"""
Return a dictionary of properties for to the source model on a map.
The key values returned are:
- peak: The fitted peak value
- dpeak: The fitted peak value RMS
- peakS2N: The peak signal-to-noise ratio
- int: The integral of the peak on the map
- dint: The integral rms of the peak on the map
- intS2N: The significance of the peak on the map
- FWHM: The full-width-half maximum of the peak
- dFWHM: The full-width-half-maximum RMS of the peak
Parameters
----------
map2d : Map2D
The map for which to calculate an integral.
size_unit : units.Unit or str, optional
If set, converts FWHM and dFWHM to `size_unit`.
Returns
-------
data : dict
"""
data = {}
if map2d.underlying_beam is not None:
i, iw = self.get_integral(map2d.underlying_beam.area)
else:
i, iw = 0.0, np.inf
fwhm = self.fwhm
fwhm_rms = self.fwhm_rms
data['peak'] = self.peak * self.unit
data['dpeak'] = self.peak_rms * self.unit
data['peakS2N'] = self.peak_significance
data['int'] = i * self.unit
data['dint'] = 1.0 / np.sqrt(iw) * self.unit
if i == 0 and iw == np.inf:
data['intS2N'] = 0.0
else:
data['intS2N'] = np.abs(i) * np.sqrt(iw)
fwhm = to_header_quantity(fwhm, unit=size_unit, keep=True)
fwhm_rms = to_header_quantity(fwhm_rms, unit=size_unit, keep=True)
data['FWHM'] = fwhm
data['dFWHM'] = fwhm_rms
return data