Source code for sofia_redux.spectroscopy.tracespec

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import warnings

from astropy import log
from astropy.stats import gaussian_fwhm_to_sigma
import numpy as np
from scipy.optimize import curve_fit

from sofia_redux.toolkit.fitting.polynomial import polyfitnd
from sofia_redux.toolkit.fitting.fitpeaks1d import \
    fitpeaks1d, get_x_parname, get_n_submodels

__all__ = ['tracespec']


[docs] def tracespec(rectimg, positions, orders=None, fwhm=1.0, step=3, sumap=3, winthresh=5, fitorder=2, fitthresh=3, polyfit_kwargs=None, info=None, fast=False, **kwargs): """ Trace spectral continua in a spatially/spectrally rectified image. Determines trace coefficients for the center of the aperture via fits to the continuum. Within an order, `sumap` columns are added together to increase the total signal. A Gaussian is then fitted around the guess position. If within `winthresh` pixels of a guess, the position is stored. The resulting positions are then fitted with a polynomial of degree `fitorder`. The `rectimg` value should come from sofia_redux.spectroscopy.rectify. Parameters ---------- rectimg : dict Rectified image data, as returned by `sofia_redux.spectroscopy.rectify` with integer keys. Each order value is a dictionary with keys: ``"image"`` Rectified image array. (ns, nw) array, required. ``"wave"`` Wave coordinates along image axis=1. (nw,) array, required. ``"spatial"`` Spatial coordinates along image axis=0. (ns,) array, required. positions : dict List of aperture positions by order, given as arcsec up the slit. Keys are integers, values are lists of floats. orders : array_like of int, optional Order numbers to process. If not provided, all orders in `positions` will be processed. step : int, optional Pixel step size in the dispersion direction used to determine the trace. sumap : int, optional Number of columns to add together. Must be odd and less than 2 * `step`. winthresh : float, optional The threshold over which an identified peak is ignored. If the difference between the guess position is larger than `winthresh`, the fitted position is ignored. fitorder : int, optional Polynomial fit degree used to determine the trace coefficients. fitthresh : float, optional Sigma threshold used to identify outliers. polyfit_kwargs : dict, optional Optional keyword arguments for the polynomial fit to the trace (spatial location of peak vs wavecal). These will be passed to `sofia_redux.toolkit.fitting.polyfit`. info : dict, optional If supplied will be updated as follows: order : int x : numpy.ndarray (n_apertures, n_steps) array of fitted trace pixel coordinates in x. y : numpy.ndarray (n_apertures, n_steps) array of fitted trace pixel coordinates in y. mask : numpy.ndarray (n_apertures, n_steps) array of bool where True indicates that the peak fit on that `step` of columns was included in the final calculation of trace coefficients. trace_model : numpy.ndarray (n_apertures) array containing instances of `astropy.modeling.polynomial.Polynomial1D` models if aperture traces. fit : numpy.ndarray (n_apertures, n_steps, n_parameters) array of peak fit coefficients calculated by `mc.fitpeaks1d` at each step. spatial : numpy.ndarray (n_apertures, n_steps) array containing the spatial coordinates at each step. wave : numpy.ndarray (n_apertures, n_steps) array containing the wave coordinates at each step. peak_model : `astropy.modeling.Fittable1DModel` model used to fit peaks via `fitpeaks1d` fast : bool, optional If set, use `scipy.optimize.curve_fit` to calculate spatial peak coefficients instead of the more flexible optimization in `sofia_redux.toolkit.fitting.fitpeaks1d`. If NaNs may be present, `fast` must be False. kwargs : dict, optional Keyword arguments for `sofia_redux.toolkit.fitting.fitpeaks1d`. By default this will fit a single 1-D Gaussian peak and a constant background offset to each (median combined +/- sumap/2 @ step interval) column. Returns ------- trace_coefficients : dict Array of polynomial fit coefficients by aperture and order. Coefficients are of the form: spatial_position = c[0] + c[1].w + c[2].w^2 + ... + c[n].w^n where n ranges from 0 to fit_order and w is the wave value and c are the trace coefficients for a given order and aperture. Keys and values are: order : int (n_apertures, n_coeff) array """ if orders is None: orders = np.unique(list(positions.keys())).astype(int) else: orders = np.unique(orders).astype(int) polyfit_kwargs = {} if polyfit_kwargs is None else polyfit_kwargs do_info = isinstance(info, dict) dw = int(sumap / 2) # Gaussian width from FHWM sigma = gaussian_fwhm_to_sigma * fwhm result = {} for order in orders: rectified = rectimg.get(order) if rectified is None: log.warning(f"Order {order} is missing from rectimg.") continue image = rectified.get('image') if image is None: log.warning(f"Order {order} is missing image key.") continue wave = rectified.get('wave') if wave is None: log.warning(f"Order {order} is missing wave key.") continue spatial = rectified.get('spatial') if spatial is None: log.warning(f"Order {order} is missing spatial key.") continue n_cols = wave.size n_rows = spatial.size if image.shape != (n_rows, n_cols): log.warning(f"Invalid image dimensions for order {order}.") log.warning(f"Should be ({n_rows},{n_cols}) but is {image.shape}.") continue ap_pos = positions.get(order) if ap_pos is None: log.warning(f"Order {order} is missing from positions.") continue if not hasattr(ap_pos, '__len__'): ap_pos = [ap_pos] n_apertures = len(ap_pos) xpar, fitter, columns, model_fit = [None] * 4 trace_coeffs = np.full((n_apertures, fitorder + 1), np.nan) y = np.arange(n_rows).astype(float) # start 'step' columns in, rather than at zero n_step = int((n_cols - step) / step) columns = np.arange(n_step) * step + step peaks_arc = np.full((n_apertures, n_step), np.nan) waves = np.full(n_step, np.nan) order_set = False if do_info: info[order] = {} info[order]['x'] = np.repeat([columns], n_apertures, axis=0) info[order]['y'] = np.full((n_apertures, n_step), np.nan) info[order]['mask'] = np.full((n_apertures, n_step), False) info[order]['trace_model'] = [None] * n_apertures for stepi, column in enumerate(columns): cr = np.clip([column - dw, column + dw], 0, n_cols - 1) waves[stepi] = wave[column] with warnings.catch_warnings(): warnings.simplefilter('ignore', RuntimeWarning) slit_image = np.nanmedian(image[:, cr[0]: cr[1] + 1], axis=1) for api, position in enumerate(ap_pos): kwargs['guess'] = position kwargs['npeaks'] = 1 kwargs['stddev'] = sigma nn = ~np.isnan(slit_image) try: model = fitpeaks1d(spatial[nn], slit_image[nn], **kwargs) except ValueError: log.warning('Invalid data in initial fit.') continue if not order_set: if do_info: info[order]['fit'] = np.full( (n_apertures, n_step, model.parameters.size), np.nan) info['peak_model'] = model info[order]['spatial'] = np.full( (n_apertures, n_step), np.nan) if fast: def model_fit(x, *params): model.parameters = params return model(x) suffix = '_0' if get_n_submodels(model) > 1 else '' xpar = get_x_parname(model) + suffix fitter = model.solver order_set = True else: if fast: coeffs, _ = curve_fit(model_fit, spatial[nn], slit_image[nn], p0=model.parameters) model.parameters = coeffs else: model = fitter(model, spatial[nn], slit_image[nn]) if do_info: info[order]['fit'][api, stepi] = model.parameters.copy() xval = getattr(model, xpar).value # check fit for failure if possible try: failure = (model.fit_info['ierr'] not in [1, 2, 3, 4]) except (AttributeError, KeyError): # pragma: no cover failure = False if (np.abs(xval - position) <= winthresh) and not failure: peaks_arc[api, stepi] = xval if do_info: info[order]['y'][api, stepi] = np.interp( xval, spatial, y) info[order]['spatial'][api, stepi] = xval if do_info: info[order]['wave'] = np.repeat([waves], n_apertures, axis=0) for api, arc in enumerate(peaks_arc): trace_model = polyfitnd(waves, arc, fitorder, robust=fitthresh, model=True, **polyfit_kwargs) if trace_model.success: trace_coeffs[api] = trace_model.coefficients if do_info: info[order]['trace_model'][api] = trace_model info[order]['mask'][api] = trace_model.mask.copy() result[order] = trace_coeffs return result