Source code for sofia_redux.instruments.hawc.steps.stepmerge

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Mapping pipeline step."""

from astropy import log
from astropy.io import fits
import numpy as np
import psutil

from sofia_redux.instruments.hawc.datafits import DataFits
from sofia_redux.instruments.hawc.stepmiparent import StepMIParent
from sofia_redux.instruments.hawc.steps.basemap import BaseMap
from sofia_redux.toolkit.resampling.resample import Resample

__all__ = ['StepMerge']


[docs] class StepMerge(StepMIParent, BaseMap): """ Create a map from multiple input images. This step resamples all input data into a common output grid. Resampling is performed via a distance-weighted, low-order polynomial surface fit to the input data within a window around each output grid point. It is assumed that all input files already contain the correct WCS. Input files for this step must contain STOKES and ERROR frames for I, Q and U each, as well as COVAR Q I, COVAR U I, and COVAR Q U images. The output is a single file with the same extensions. In addition, an image mask is created, which gives some information on how much data went into each output pixel in the map. The values are sums over the Gaussian weight of each input data point. The EXPTIME keyword in the merged header is the sum of all exposure times from the individual files. The table values for each input image are stored in the rows of the MERGED DATA table in the output file. """
[docs] def setup(self): """ Set parameters and metadata for the pipeline step. Output files have PRODTYPE = 'merge', and are named with the step abbreviation 'MRG'. Parameters defined for this step are: beamsize : list of float Beam FWHM size (arcsec) to write into BMAJ/BMIN header keywords. One value for each HAWC filter band. cdelt : list of float Pixel size in arcseconds of output map. One value for each HAWC filter band. proj : str Projection of output map. sizelimit : int Upper limit on output map size (either axis, in pixels). widowstokesi : bool Use widow pixels (flagged 1 or 2) to compute Stokes I map. conserveflux : bool If set, a flux conservation factor (due to the change in pixel size) will be applied to all output images. fwhm : list of float FWHM of gaussian smoothing kernel in arcseconds (per band). radius : list of float Integration radius for local fits, in arcseconds (per band). fit_order : int Polynomial fit order for local regression. errflag : bool If set, use uncertainties when computing averages. edge_threshold : float Threshold to set edge pixels to NaN. Range is 0-1; 0 means keep all edge pixels. Higher values keep fewer pixels. adaptive_algorithm : {'scaled', 'shaped', 'none'} If 'shaped' or 'scaled', adaptive smoothing will be used, varying the kernel size according to the data. If 'shaped', the kernel shape and rotation angle may also vary. fit_threshold : float Deviation from weighted mean to allow for higher order fit. Set to 0 to turn off. Positive values replace bad values with the mean value in the window; negative values replace bad values with NaN. bin_cdelt : bool If set, and data was previously binned via StepBinPixels, then the input cdelt will be multiplied by the binning factor. If not set, the provided cdelt will be used directly. This allows useful default behavior for binned data, but still allows for tunable output pixel sizes. """ # Name of the pipeline reduction step self.name = 'merge' self.description = 'Merge Maps' # Shortcut for pipeline reduction step and identifier for # saved file names. self.procname = 'mrg' # Clear Parameter list self.paramlist = [] # Append parameters self.paramlist.append(['beamsize', [4.84, 7.80, 7.80, 13.6, 18.2], "Beam FWHM size (arcsec) to write into " "BMAJ/BMIN header keywords"]) self.paramlist.append(['cdelt', [1.21, 1.95, 1.95, 3.40, 4.55], 'Pixel size in arcseconds of output map']) self.paramlist.append(['proj', 'TAN', 'Projection of output map']) self.paramlist.append(['sizelimit', 3000, "Upper limit on output map size " "(either axis, in pixels)."]) self.paramlist.append(['widowstokesi', True, "Use widow pixels (flagged 1 or 2) to " "compute Stokes I map"]) self.paramlist.append(['conserveflux', True, "Apply flux conservation factor (due to " "change in pixel size) to all output images"]) self.paramlist.append(['fwhm', [2.57, 4.02, 4.02, 6.93, 9.43], 'FWHM of gaussian smoothing kernel, ' 'in arcseconds (per band)']) self.paramlist.append(['radius', [4.85, 7.8, 7.8, 13.6, 18.2], 'Integration radius for smoothing, in ' 'arcseconds (per band)']) self.paramlist.append(['fit_order', 0, "Polynomial fit order for local regression."]) self.paramlist.append(['errflag', True, "Use uncertainties to weight fits."]) self.paramlist.append(['edge_threshold', 0.5, "Set edge pixels to NaN. Range 0-1; 0 means " "keep all edge pixels. Higher values " "keep fewer pixels."]) self.paramlist.append(['adaptive_algorithm', 'None', "If 'shaped' or 'scaled', adaptive " "smoothing will be used, varying the kernel " "size according to the data. If 'shaped', " "the kernel shape and rotation angle " "may also vary."]) self.paramlist.append(['fit_threshold', -10.0, "Deviation from weighted mean to allow, " "for higher order fit. Set to 0 to turn " "off, < 0 replaces with NaN."]) self.paramlist.append(['bin_cdelt', True, "Multiply provided cdelt by binning factor " "for the input data."])
[docs] def read_fwhm_radius_cdelt_beam(self): """ Read a fwhm, radius, and cdelt value from the parameters. The parameters are expected to be defined as a list, with one entry for each HAWC band. The correct value for the input data is selected from the list. Returns ------- fwhm : float FWHM value for the input data. radius : float, float, float Radius value for the input data. cdelt : float Pixel scale value for the input data. beamsize : float Beam size value for the input data. """ fwhm = self.getarg('fwhm') radius = self.getarg('radius') cdelt = self.getarg('cdelt') beamsize = self.getarg('beamsize') waveband = self.datain[0].getheadval('spectel1') bands = ['A', 'B', 'C', 'D', 'E'] try: idx = bands.index(waveband[-1]) except (ValueError, IndexError): # waveband not in list msg = 'Cannot parse waveband: %s' % waveband log.error(msg) raise ValueError(msg) try: fwhm = fwhm[idx] radius = radius[idx] cdelt = cdelt[idx] beamsize = beamsize[idx] except IndexError: msg = 'Missing radius/fwhm values for all wavebands' log.error(msg) raise IndexError(msg) return fwhm, radius, cdelt, beamsize
[docs] def resample_images(self, radius, fit_order, smoothing, edge, adaptive_threshold, adaptive_algorithm, fit_threshold, errflag, max_cores): """ Resample input images into a common grid. Resampling is performed via a distance-weighted, low-order polynomial surface fit to the input data within a window around each output grid point. Output images are stored in self.pmap. Parameters ---------- radius : float Fit window to consider for local fits. fit_order : int Polynomial surface order to fit. smoothing : float Smoothing radius for distance weighting, expressed as a fraction of the fit window. edge : float Threshold for setting edge pixels to NaN. Higher values block more pixels, a zero values allows all edge pixels through. adaptive_threshold : float Threshold for adaptive smoothing. Range is 0-2; 1.0 is optimal. Set lower for smaller scale adaptive kernel; higher for larger scale. adaptive_algorithm : {'scaled', 'shaped', None} Algorithm for adaptive smoothing kernel. If scaled, only the size is allowed to vary. If shaped, the kernel shape and rotation may also vary. fit_threshold : float or None Threshold for fit rejection, specified in sigma. Set to None to turn off. errflag : bool If True, errors on the flux values will be used to weight the fits to the data. If False, only distance weights will be used. max_cores : int, or None If a number larger than 1, the data processing will proceed in parallel on max_cores CPUs. Multiprocessing requires that joblib is installed. """ if self.nhwp == 1: stokes_vals = ['I'] else: stokes_vals = ['I', 'Q', 'U'] # loop over stokes for fluxes and errors flxvals = [] errvals = [] for stokes in stokes_vals: flxvals.append(np.hstack([d[stokes].ravel() for d in self.pdata])) errvals.append(np.hstack([d['d{}'.format(stokes)].ravel() for d in self.pdata])) resampler = Resample( self.pmap['coordinates'], flxvals, error=errvals, window=radius, order=fit_order, robust=None, negthresh=None) flux, std, weights = resampler( *self.pmap['grid'], smoothing=smoothing, fit_threshold=fit_threshold, edge_threshold=edge, adaptive_threshold=adaptive_threshold, adaptive_algorithm=adaptive_algorithm, edge_algorithm='distribution', get_error=True, get_distance_weights=True, error_weighting=errflag, jobs=max_cores) for i, stokes in enumerate(stokes_vals): self.pmap[stokes] = flux[i] self.pmap['d{}'.format(stokes)] = std[i] if stokes == 'I': self.pmap['mask'] = weights[i] # stop here for imaging data if self.nhwp == 1: return # otherwise do covariances too covvals = [] errvals = [] covnames = [] for stokes in [('Q', 'I'), ('U', 'I'), ('Q', 'U')]: cov = '%s%scov' % stokes err1 = 'd%s' % stokes[0] err2 = 'd%s' % stokes[1] covnames.append(cov) covvals.append(np.hstack([d[cov].ravel() for d in self.pdata])) errvals.append(np.hstack([np.sqrt(d[err1].ravel() * d[err2].ravel()) for d in self.pdata])) # handle covariances as flux values, weighted by # the appropriate stokes combinations, with a flag # to tell the resampler how to propagate them resampler = Resample( self.pmap['coordinates'], covvals, error=errvals, window=radius, order=0, robust=None, negthresh=None) flux = resampler( *self.pmap['grid'], smoothing=smoothing, fit_threshold=None, edge_threshold=edge, edge_algorithm='distribution', is_covar=True, error_weighting=True, jobs=max_cores) for i, cov in enumerate(covnames): self.pmap[cov] = flux[i]
[docs] def run(self): """ Run the data reduction algorithm. This step is run as a multi-in single-out (MISO) step: self.datain should be a list of DataFits, and output will be a single DataFits, stored in self.dataout. The process is: 1. Read in all good pixels from the input data. 2. Resample all input data into a common map. 3. Store output images in a DataFits object. 4. Merge headers and table data from all input files. """ # check validity, set self.nhwp self.checkvalid() # self.datain must be a list/tuple self.nfiles = len(self.datain) log.debug('Merging %d files' % self.nfiles) # set up for parallel processing via joblib max_cores = psutil.cpu_count() - 1 if max_cores < 2: # pragma: no cover max_cores = 1 # read parameter values fwhm, radius, cdelt, beamsize = self.read_fwhm_radius_cdelt_beam() errflag = self.getarg('errflag') widowstokesi = self.getarg('widowstokesi') conserveflux = self.getarg('conserveflux') edge = self.getarg('edge_threshold') adaptive_algorithm = self.getarg('adaptive_algorithm') fit_order = self.getarg('fit_order') fit_threshold = self.getarg('fit_threshold') bin_cdelt = self.getarg('bin_cdelt') # get pixel scale from header pixscal = self.datain[0].getheadval('PIXSCAL') # get binning factor from first header, if applicable if bin_cdelt: bin_factor = self.datain[0].header.get('PIXELBIN', 1) if bin_factor > 1: log.info(f'Multiplying cdelt and radius by ' f'binning factor {bin_factor}') cdelt *= bin_factor radius *= bin_factor log.info(f'Output pixel size is {cdelt} arcsec') log.info(f'Fit radius is {radius} arcsec') if fit_order > 1: log.info('Reducing fit order to 1.') fit_order = 1 # if adaptive is on, smoothing should be set to true # Gaussian sigma adaptive_algorithm = str(adaptive_algorithm).strip().lower() if adaptive_algorithm in ['shaped', 'scaled']: adaptive = 1.0 else: adaptive = 0.0 adaptive_algorithm = None if adaptive > 0 and not np.allclose(fwhm, beamsize): log.warning('Setting smoothing FWHM to beam size ' 'for adaptive case.') fwhm = beamsize sigma = fwhm / (2 * np.sqrt(2 * np.log(2))) if conserveflux: # get pixel scale from first image and cdelt from params flux_factor = cdelt**2 / pixscal**2 log.debug('') log.debug('Flux factor: %.2f' % flux_factor) log.debug('') else: flux_factor = 1. # If widowstokesi = True, then widow pixels (flagged as # 1 or 2) will also be accounted in the smoothed # Stokes I image. Stokes Q and U will continue # using only strictly good pixels (flagged as 0) if widowstokesi: maxgoodpix = 2 else: maxgoodpix = 0 # read data, allocate space for maps self.read_resample_data(maxgoodpix) self.make_resample_map(cdelt) if errflag: log.debug('Uncertainties used for weighting') else: log.warning('Uncertainties NOT used for weighting') # Resample all the input files together to generate the output images log.debug("Starting resample: all files") # Output data is stored in self.pmap self.resample_images(radius, fit_order, sigma, edge, adaptive, adaptive_algorithm, fit_threshold, errflag, max_cores) # Initialize pipedata, add input OBS_IDs self.dataout = DataFits(config=self.datain[0].config) self.allhead.add_history('Merged Files:') for i in range(self.nfiles): self.allhead.add_history('OBS_ID: %s' % self.datain[i].getheadval('OBS_ID')) # Put output image data from self.pmap into dataout # for I (and if present Q and U) if self.nhwp == 1: stokes = ['I'] else: stokes = ['I', 'Q', 'U'] for var in stokes: name = 'STOKES ' + var tmp = self.pmap[var] # correct for flux conservation tmp *= flux_factor tmp.shape = (self.allhead['naxis2'], self.allhead['naxis1']) # Set I, Q and U maps - copy same astrometry # info from header to these HDUs self.dataout.imageset(tmp, name, self.allhead) # Set dI, dQ, dU - also copy astrometry into these headers name = 'ERROR ' + var tmp = self.pmap['d' + var] tmp *= flux_factor tmp.shape = (self.allhead['naxis2'], self.allhead['naxis1']) self.dataout.imageset(tmp, name, self.allhead) # also propagate the covariance extensions if self.nhwp != 1: stokes = [('Q', 'I'), ('U', 'I'), ('Q', 'U')] for var in stokes: name = 'COVAR %s %s' % var tmp = flux_factor**2 * self.pmap['%s%scov' % var] tmp.shape = (self.allhead['naxis2'], self.allhead['naxis1']) self.dataout.imageset(tmp, name, self.allhead) # Set image mask tmp = self.pmap['mask'] tmp.shape = (self.allhead['naxis2'], self.allhead['naxis1']) self.dataout.imageset(tmp, 'IMAGE MASK', self.allhead) self.dataout.filename = self.datain[-1].filename self.dataout.setheadval('BUNIT', 'pixel', comment='Data units', dataname='IMAGE MASK') # add headers from first input file (to get keywords) for name in self.datain[0].imgnames: if name in self.dataout.imgnames: self.dataout.copyhead(self.datain[0], overwrite=False, name=name) # add relevant header values from other input files for other in self.datain[1:]: self.dataout.mergehead(other) # remove CROTA2, if present because output image is already rotated self.dataout.delheadval("CROTA2") # remove source position keywords if present, since their # values are no longer correct for the resampled map pos_keys = ['STCENTX', 'STCENTY', 'STCENTXE', 'STCENTYE', 'SRCPOSX', 'SRCPOSY'] for key in pos_keys: self.dataout.delheadval(key) # Create table with combined data from the input files # One table row for each input file iterfiles = range(self.nfiles) tablist = [] for i in iterfiles: if 'TABLE DATA' in self.datain[i].tabnames: tablist.append(self.datain[i].tableget('TABLE DATA')) if len(tablist) > 0: # for each name, loop through all tables, # make sure name, format, dim # and unit is the same among them. A column with a # mismatch is thrown out names = [] formats = [] dims = [] units = [] for n in tablist[0].names: format0 = tablist[0].columns[n].format dim = tablist[0].columns[n].dim unit = tablist[0].columns[n].unit column_matches = True for inf in iterfiles: if n not in tablist[inf].names: log.warning("name not found in file %d, " "removing %s" % (inf, n)) column_matches = False break if format0 != tablist[inf].columns[n].format: log.warning("%s has different format in " "file %d, removing" % (n, inf)) column_matches = False break if dim != tablist[inf].columns[n].dim: log.warning("%s has different dimension in " "file %d, removing" % (n, inf)) column_matches = False break if unit != tablist[inf].columns[n].unit: log.warning("%s has different units in " "file %d, removing" % (n, inf)) column_matches = False break if column_matches: names.append(n) formats.append(format0) dims.append(dim) units.append(unit) # Fill up fits columns to make table cols = [] for n, f, d, u in zip(names, formats, dims, units): data = [a[n] for a in tablist] cols.append(fits.Column(name=n, format=f, dim=d, unit=u, array=data)) # add some super-important columns to table, if not present if 'Right Ascension' not in names: ra = [self.datain[i].getheadval('crval1') for i in iterfiles] cols.append(fits.Column(name='Right Ascension', format='D', unit='deg', array=np.array(ra))) if 'Declination' not in names: dec = [self.datain[i].getheadval('crval2') for i in iterfiles] cols.append(fits.Column(name='Declination', format='D', unit='deg', array=np.array(dec))) if 'Filename' not in names: fname = [self.datain[i].filename for i in iterfiles] maxlen = max(map(len, fname)) cols.append(fits.Column(name="Filename", array=np.array(fname), format='%dA' % maxlen, unit='str')) tbhdu = fits.BinTableHDU.from_columns(fits.ColDefs(cols)) self.dataout.tableset(tbhdu.data, "MERGED DATA", tbhdu.header) # Update the EXPTIME header keyword with the summed value of # all individual exposure times of the input files self.sumexptime() # Merge all file headers with the first header # Write beam size (from beamsize config param) to header keywords self.dataout.setheadval('BMAJ', beamsize / 3600., 'Beam major axis') self.dataout.setheadval('BMIN', beamsize / 3600., 'Beam minor axis') self.dataout.setheadval('BPA', 0., 'Beam position angle') # Write list of input OBS_IDs in LSTOBSID keyword lstobsid = [self.datain[j].getheadval('obs_id') for j in range(0, self.nfiles, 1)] self.dataout.setheadval("LSTOBSID", ','.join(lstobsid), 'List of OBS_IDs for merged files') # Delete lat/lonpole from header if present if 'LATPOLE' in self.dataout.header: del self.dataout.header['LATPOLE'] if 'LONPOLE' in self.dataout.header: del self.dataout.header['LONPOLE']