Source code for sofia_redux.instruments.fifi_ls.combine_nods

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import os

from astropy import log
from astropy.io import fits
from astropy.time import Time
import numba as nb
import numpy as np
from pandas import DataFrame

from sofia_redux.instruments.fifi_ls.make_header import make_header
from sofia_redux.toolkit.interpolate \
    import interp_1d_point_with_error as interp
from sofia_redux.toolkit.utilities \
    import (hdinsert, gethdul, write_hdul)


__all__ = ['classify_files', 'combine_extensions', 'combine_nods']


def _mjd(dateobs):
    """Get the MJD from a DATE-OBS."""
    try:
        mean_time = Time(dateobs).mjd
    except (ValueError, AttributeError):
        mean_time = 0
    return mean_time


def _read_exthdrs(hdul, key, default=0):
    """Read FIFI-LS extension headers."""
    result = []
    if len(hdul) <= 1:
        return result
    ngrating = hdul[0].header.get('NGRATING', 1)
    for idx in range(ngrating):
        name = f'FLUX_G{idx}'
        header = hdul[name].header
        result.append(header.get(key, default))
    return np.array(result)


def _from_hdul(hdul, key):
    """Read a header keyword from the PHU."""
    return hdul[0].header[key.upper().strip()]


[docs] def classify_files(filenames, offbeam=False): """ Extract various properties of all files for subsequent combination. Parameters ---------- filenames : array_like of str File paths to FITS files offbeam : bool, optional If True, swap 'A' nods with 'B' nods and the following associated keywords: DLAM_MAP <-> DLAM_OFF, DBET_MAP <-> DBET_OFF. Returns ------- pandas.DataFrame """ hduls = [] fname_list = [] for fname in filenames: hdul = gethdul(fname) if hdul is None: log.error("Invalid HDUList: %s" % fname) continue hduls.append(hdul) if not isinstance(fname, str): fname_list.append(_from_hdul(hdul, 'FILENAME')) else: fname_list.append(fname) filenames = fname_list n = len(filenames) if n == 0: log.error("No good files found.") return None keywords = ['nodstyle', 'detchan', 'channel', 'nodbeam', 'dlam_map', 'dbet_map', 'dlam_off', 'dbet_off', 'date-obs'] init = dict((key, [_from_hdul(hdul, key) for hdul in hduls]) for key in keywords) init['mjd'] = [_mjd(dateobs) for dateobs in init['date-obs']] init['indpos'] = [_read_exthdrs(hdul, 'indpos', default=0) for hdul in hduls] init['bglevl'] = [_read_exthdrs(hdul, 'bglevl_a', default=0) for hdul in hduls] init['asymmetric'] = [x in ['ASYMMETRIC', 'C2NC2'] for x in init['nodstyle']] init['tsort'] = [0.0] * n init['sky'] = [False] * n # calculate later init['hdul'] = hduls init['chdul'] = [None] * n init['combined'] = [np.full(len(x), False) for x in init['indpos']] init['outfile'] = [''] * n df = DataFrame(init, index=filenames) # If any files are asymmetric, treat them all as asymmetric if df['asymmetric'].any() and not df['asymmetric'].all(): log.warning("Mismatched NODSTYLE. Will attempt to combine anyway.") # Drop any bad dates bad_dates = df[df['mjd'] == 0] if len(bad_dates) > 0: for name, row in bad_dates.iterrows(): log.error('DATE-OBS in header is %s for %s' % (row['date-obs'], name)) df = df.drop(bad_dates.index) # If there's a good detchan value, use it in place of channel, # then set channel to either 1 (BLUE) or 0 (RED) valid_detchan = (df['detchan'] != 0) & (df['detchan'] != '0') df['channel'] = np.where(valid_detchan, df['detchan'], df['channel']) df['channel'] = df['channel'].apply(lambda x: 1 if x == 'BLUE' else 0) # update headers if offbeam is True if offbeam: # Switch A and B beams df['nodbeam'] = np.where(df['nodbeam'] != 'A', 'A', 'B') df = df.rename(index=str, columns={ 'dlam_map': 'dlam_off', 'dbet_map': 'dbet_off', 'dlam_off': 'dlam_map', 'dbet_off': 'dbet_map'}) for key in ['dlam_map', 'dlam_off', 'dbet_map', 'dbet_off', 'nodbeam']: df.apply(lambda x: hdinsert( x.hdul[0].header, key.upper(), x[key]), axis=1) # set on-source exptime to 0 for asym B beams and track as 'sky' files df['sky'] = df['asymmetric'] & (df['nodbeam'] != 'A') for hdul in df[df['sky']]['hdul'].values: if isinstance(hdul, fits.HDUList): hdul[0].header['EXPTIME'] = 0.0 hdul[0].header['NEXP'] = 0 return df
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def interp_b_nods(atime, btime, bdata, berr): # pragma: no cover """ Interpolate two B nods to the A time. Parameters ---------- atime : array-like of float The UNIX time for each A nod sample. btime : array-like float Before and after time for the B nods. Expected to have two elements; all `atime` values should fall between the first and second values. bdata : array-like of float 2 x nw x ns B nod data to interpolate. berr : array-like of float 2 x nw x ns B nod errors to interpolate. Returns ------- bflux, bvar : array-like of float nw x ns interpolated B nod flux and variance. """ nt = atime.size nn, nw, ns = bdata.shape bflux = np.empty((nt, nw, ns), dtype=nb.float64) bvar = np.empty((nt, nw, ns), dtype=nb.float64) for t in range(nt): for i in range(nw): for j in range(ns): # flux and error at this pixel bf = bdata[:, i, j] be = berr[:, i, j] # Interpolate B flux and error onto A time if np.any(np.isnan(bf)) or np.any(np.isnan(be)): bflux[t, i, j] = np.nan bvar[t, i, j] = np.nan else: f, e = interp(btime, bf, be, atime[t]) bflux[t, i, j] = f bvar[t, i, j] = e * e return bflux, bvar
[docs] def combine_extensions(df, b_nod_method='nearest'): """ Find a B nod for each A nod. For asymmetric data, DLAM and DBET do not need to match, B data can be used more than once, and the B needs to be subtracted, rather than added (symmetric B nods are multiplied by -1 in chop_subtract) For the 'interpolate' option for B nod combination for most data, the time of interpolation is taken to be the middle of the observation, as determined by the FIFISTRT and EXPTIME keywords in the primary header. For OTF data, the time is interpolated between RAMPSTRT and RAMPEND times in the extension header, for each ramp. Parameters ---------- df : pandas.DataFrame b_nod_method : {'nearest', 'average', 'interpolate'}, optional Determines the method of combining the two nearest before and after B nods. Returns ------- list of fits.HDUList """ # check B method parameter if b_nod_method not in ['nearest', 'average', 'interpolate']: raise ValueError("Bad b_nod_method: should be 'nearest', " "'average', or 'interpolate'.") get_two = b_nod_method != 'nearest' df.sort_values('mjd', inplace=True) blist = df[df['nodbeam'] == 'B'] alist = df[df['nodbeam'] == 'A'] # skip if no pairs available if len(blist) == 0: log.warning('No B nods found') return df elif len(alist) == 0: log.error('No A nods found') return df for afile, arow in alist.iterrows(): asymmetric = arow['asymmetric'] bselect = blist[(blist['channel'] == arow['channel']) & (blist['asymmetric'] == asymmetric)] if not asymmetric: bselect = bselect[(bselect['dlam_map'] == arow['dlam_map']) & (bselect['dbet_map'] == arow['dbet_map'])] # find closest matching B image in time if get_two and asymmetric: bselect['tsort'] = bselect['mjd'] - arow['mjd'] after = (bselect[bselect['tsort'] > 0]).sort_values('tsort') bselect = (bselect[bselect['tsort'] <= 0]).sort_values( 'tsort', ascending=False) else: bselect['tsort'] = abs(bselect['mjd'] - arow['mjd']) bselect = bselect.sort_values('tsort') after = None primehead, combined_hdul = None, None for aidx, apos in enumerate(arow['indpos']): bidx, bidx2 = [], [] bfile, bfile2 = None, None brow, brow2 = None, None for bfile, brow in bselect.iterrows(): bidx = brow['indpos'] == apos if not asymmetric: bidx &= ~brow['combined'] if np.any(bidx): break if after is not None: for bfile2, brow2 in after.iterrows(): # always asymmetric bidx2 = brow2['indpos'] == apos if np.any(bidx2): break if not np.any(bidx) and np.any(bidx2): bidx = bidx2 brow = brow2 bfile = bfile2 bidx2 = [] describe_a = f"A {os.path.basename(arow.name)} at ext{aidx + 1} " \ f"channel {arow['channel']} indpos {apos} " \ f"dlam {arow['dlam_map']} dbet {arow['dbet_map']}" if np.any(bidx): arow['combined'][aidx] = True a_fname = f'FLUX_G{aidx}' a_sname = f'STDDEV_G{aidx}' a_hdr = arow['hdul'][0].header bgidx = np.nonzero(bidx)[0][0] brow['combined'][bgidx] = True b_background = brow['bglevl'][bgidx] b_fname = f'FLUX_G{bgidx}' b_sname = f'STDDEV_G{bgidx}' b_flux = brow['hdul'][b_fname].data b_var = brow['hdul'][b_sname].data ** 2 b_hdr = brow['hdul'][0].header # check for offbeam with OTF mode: B nods # can't have an extra dimension if b_flux.ndim > 2: msg = 'Offbeam option is not available for OTF mode' log.error(msg) raise ValueError(msg) combine_headers = [a_hdr, b_hdr] # check for a second B nod: if not found, will do # 'nearest' for this A file if np.any(bidx2): # add in header for combination b2_hdr = brow2['hdul'][0].header combine_headers.append(b2_hdr) # get A and B times try: # unix time at middle of exposure atime = a_hdr['START'] \ + a_hdr['FIFISTRT'] * a_hdr['ALPHA'] \ + a_hdr['EXPTIME'] / 2.0 btime1 = b_hdr['START'] \ + b_hdr['FIFISTRT'] * b_hdr['ALPHA'] \ + b_hdr['EXPTIME'] / 2.0 btime2 = b2_hdr['START'] \ + b2_hdr['FIFISTRT'] * b2_hdr['ALPHA'] \ + b2_hdr['EXPTIME'] / 2.0 except KeyError: raise ValueError('Missing START, FIFISTRT, ALPHA, ' 'or EXPTIME keys in headers.') # get index for second B row bgidx2 = np.nonzero(bidx2)[0][0] brow2['combined'][bgidx2] = True if b_nod_method == 'interpolate': # debug message msg = f'Interpolating B {bfile} at {btime1} ' \ f'and {bfile2} at {btime2} ' \ f'to A time {atime} and subbing from ' # interpolate background to header atime b_background = \ np.interp(atime, [btime1, btime2], [b_background, brow2['bglevl'][bgidx2]]) # UNIX time is a range of values for OTF data: # retrieve from RAMPSTRT and RAMPEND keys a_hdu_hdr = arow['hdul'][a_fname].header a_shape = arow['hdul'][a_fname].data.shape if len(a_shape) == 3 \ and 'RAMPSTRT' in a_hdu_hdr \ and 'RAMPEND' in a_hdu_hdr: rampstart = a_hdu_hdr['RAMPSTRT'] rampend = a_hdu_hdr['RAMPEND'] nramp = a_shape[0] ramp_incr = (rampend - rampstart) / (nramp - 1) atime = np.full(nramp, rampstart) atime += np.arange(nramp, dtype=float) * ramp_incr else: atime = np.array([atime]) btime = np.array([btime1, btime2]) # interpolate B flux to A time b_fname = f'FLUX_G{bgidx2}' b_sname = f'STDDEV_G{bgidx2}' bdata = np.array([b_flux, brow2['hdul'][b_fname].data]) berr = np.array([np.sqrt(b_var), brow2['hdul'][b_sname].data]) b_flux, b_var = \ interp_b_nods(atime, btime, bdata, berr) # reshape if there was only one atime if atime.size == 1: b_flux = b_flux[0] b_var = b_var[0] else: # debug message msg = f'Averaging B {bfile} and {bfile2} ' \ f'and subbing from ' # average flux b_flux += brow2['hdul'][b_fname].data b_flux /= 2. # propagate variance b_var += brow2['hdul'][b_sname].data ** 2 b_var /= 4. # average background b_background += brow2['bglevl'][bgidx2] b_background /= 2. else: if asymmetric: msg = f'Subbing B {os.path.basename(brow.name)} from ' else: msg = f'Adding B {os.path.basename(brow.name)} to ' log.debug(msg + describe_a) # Note: in the OTF case, A data is a 3D cube with # ramps x spexels x spaxels, and B data is a # 2D array of spexels x spaxels. The B data is # subtracted at each ramp. # For other modes, A and B are both spexels x spaxels. flux = arow['hdul'][a_fname].data stddev = arow['hdul'][a_sname].data ** 2 + b_var if asymmetric: flux -= b_flux else: flux += b_flux # divide by two for doubled source flux /= 2 stddev /= 4 stddev = np.sqrt(stddev) if combined_hdul is None: primehead = make_header(combine_headers) primehead['HISTORY'] = 'Nods combined' hdinsert(primehead, 'PRODTYPE', 'nod_combined') outfile, _ = os.path.splitext(os.path.basename(afile)) outfile = '_'.join(outfile.split('_')[:-2]) outfile += '_NCM_%s.fits' % primehead.get('FILENUM') df.loc[afile, 'outfile'] = outfile hdinsert(primehead, 'FILENAME', outfile) combined_hdul = fits.HDUList( fits.PrimaryHDU(header=primehead)) exthead = arow['hdul'][a_fname].header hdinsert(exthead, 'BGLEVL_B', b_background, comment='BG level nod B (ADU/s)') combined_hdul.append(fits.ImageHDU(flux, header=exthead, name=a_fname)) combined_hdul.append(fits.ImageHDU(stddev, header=exthead, name=a_sname)) # add in scanpos table from A nod if present a_pname = f'SCANPOS_G{aidx}' if a_pname in arow['hdul']: combined_hdul.append(arow['hdul'][a_pname].copy()) else: msg = "No matching B found for " log.debug(msg + describe_a) if combined_hdul is not None: df.at[afile, 'chdul'] = combined_hdul return df
[docs] def combine_nods(filenames, offbeam=False, b_nod_method='nearest', outdir=None, write=False): """ Combine nods of ramp-fitted, chop-subtracted data. Writes a single FITS file to disk for each A nod found. Each HDU list contains n_san binary table extensions, each containing DATA and STDDEV data cubes, each 5x5x18. The output filename is created from the input filename, with the suffix 'CSB', 'RP0' or 'RP1' replaced with 'NCM', and with input file numbers numbers concatenated. Unless specified, the output directory is the same as the input files. Input files should have been generated by `subtract_chops`, or `fit_ramps` (for total power mode, which has no chops). The procedure is: 1. Read header information from each extension in each of the input files, making lists of A data and B data, with relevant metadata (dither) position, date/time observed (DATE-OBS), inductosyn position, channel, nod style). 2. Loop though all A data to find matching B data a. asymmetric nod style: find closest B nod in time with the same channel and inductosyn position. Dither position does not have to match, B data can be used more than once, and data must be subtracted rather than added. b. symmetric nod style: find closest B nod in time with the same channel, inductosyn position, and dither position. Each B nod can only be used once, since it contains a source observation, and data must be added rather than subtracted. 3. After addition or subtraction, create a FITS file and write results to disk. Parameters ---------- filenames : array_like of str File paths to the data to be combined offbeam : bool, optional If True, swap 'A' nods with 'B' nods and the following associated keywords: DLAM_MAP <-> DLAM_OFF, DBET_MAP <-> DBET_OFF. This option cannot be used with OTF-mode A nods. b_nod_method : {'nearest', 'average', 'interpolate'}, optional For asymmetric, data this option controls how the nearest B nods are combined. The 'nearest' option takes only the nearest B nod in time. The 'average' option averages the nearest before and after B nods. The 'interpolate' option linearly interpolates the nearest before and after B nods to the time of the A data. outdir : str, optional Directory path to write output. If None, output files will be written to the same directory as the input files. write : bool, optional If True, write to disk and return the path to the output file. If False, return the HDUL. Returns ------- pandas.DataFrame The output pandas dataframe contains a huge variety of information indexed by original filename. The combined A-B FITS data are located in the 'chdul' column. Note that only A nod files contain combined data in this 'chdul' column. For example, in order to extract combined FITS data, one could issue the command:: df = combine_nods(filenames) combined_hduls = df[df['nodbeam'] == 'A']['chdul'] In order to extract rows from the dataframe that were not combined issue the command:: not_combined = df[(df['nodbeam'] == 'A') & (df['chdul'] == None)] files are considered 'combined' if at least one A extension was combined for an A-nod file. A true signifier of whether an extension was combined (both A and B nod files) can be found in the 'combined' column as a list of bools, one for each extension. """ if isinstance(filenames, str): filenames = [filenames] if not hasattr(filenames, '__len__'): log.error("Invalid input files type (%s)" % repr(filenames)) return if isinstance(outdir, str): if not os.path.isdir(outdir): log.error("Output directory %s does not exist" % outdir) return df = classify_files(filenames, offbeam=offbeam) if df is None: log.error("Problem in file classification") return df = combine_extensions(df, b_nod_method=b_nod_method) for filename, row in df[df['nodbeam'] == 'A'].iterrows(): if outdir is not None: outdir = str(outdir) else: outdir = os.path.dirname(filename) if write and row['chdul'] is not None: write_hdul(row['chdul'], outdir=outdir, overwrite=True) if row['outfile'] is not None: df.at[filename, 'outfile'] = os.path.join( outdir, os.path.basename(row['outfile'])) return df