Source code for sofia_redux.instruments.hawc.steps.stepnoisefft

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Noise FFT pipeline step."""

from astropy import log
from astropy.io import fits
import numpy as np

from sofia_redux.instruments.hawc.datafits import DataFits
from sofia_redux.instruments.hawc.stepparent import StepParent

__all__ = ['StepNoiseFFT']


[docs] class StepNoiseFFT(StepParent): """ Take the FFT of diagnostic noise data. The input to this step is lab data taken with CALMODE = 'NOISE'. The output is a FITS file containing the power spectrum image with pixels arrayed along the y-axis and frequencies arrayed along the x-axis. Frequency values are stored in WCS keywords in the header. The full FFT with linear frequencies is stored in the primary image. Subsequent extensions contain binned frequencies, in linear and log scales. """
[docs] def setup(self): """ Set parameters and metadata for the pipeline step. Output files have PRODTYPE = 'noisefft', and are named with the step abbreviation 'FFT'. Parameters defined for this step are: truncate : bool If set, will truncate to an integer power of 2 number of samples before taking the FFT. """ # Name of the pipeline reduction step self.name = 'noisefft' self.description = 'Noise FFT' # Shortcut for pipeline reduction step and identifier for # saved file names. self.procname = 'fft' # Clear parameter list self.paramlist = [] # add default parameters self.paramlist.append(['truncate', False, 'Truncate time samples'])
[docs] def run(self): """ Run the data reduction algorithm. Because this step is single-in, single-out (SISO), self.datain must be a DataFits object. The output is also a DataFits object, stored in self.dataout. The process is: 1. Convert the flux data to Amps/rtHz. 2. Take the FFT of the flux data for each pixel. 3. Bin the frequencies and average flux values within each bin. 4. Store the data. """ # get arguments truncate = self.getarg('truncate') # make new output data self.dataout = DataFits(config=self.config) self.dataout.filename = self.datain.filename self.dataout.header = self.datain.header.copy() # get raw flux data and reshape it r_data = self.datain.table['R array'] t_data = self.datain.table['T array'] f_data = np.concatenate([r_data, t_data], axis=2) nsamp, nrow, ncol = f_data.shape npix = nrow * ncol # magic numbers from hawcp_powspec IDL script, # originally developed by J. Vaillancourt, 2011 # use value from IV curves dac2squid = 12600. # counts / phi0 coil2squid = 3e-6 # Amp / phi0 inv_gain = coil2squid / dac2squid # Amp / count # remove bottom 4 bits inv_gain /= 4096. # invert for proper gain gain = 1.0 / inv_gain # counts / Amp # unfiltered gain /= 3.361 # convert counts to Amps acdata = f_data / gain # sample rate from header samprate = self.dataout.getheadval('SMPLFREQ') # time in seconds from first sample time = np.arange(nsamp, dtype=float) / samprate log.info(f'Total record time in file: {time[-1]} seconds') # reshape data to 2D acdata = acdata.reshape(nsamp, npix) # truncate to integer power of 2 number of samples # This was originally intended to make FFT more efficient # in the IDL implementation, but numpy FFT is plenty fast, # so it may no longer be needed. if truncate: tr = int(np.log(nsamp) / np.log(2)) nsamp = 2 ** tr acdata = acdata[:nsamp, :] time = time[:nsamp] log.info(f'Truncating total time record to {time[-1]}') # frequency vector fmax = samprate / 2.0 df = samprate / nsamp fmin = 10.0 * df frange = [fmin, fmax] freq = np.arange(nsamp, dtype=float) * df # hanning window with discrete correction h = 0.5 * (1 - np.cos(2 * np.pi * (np.arange(nsamp, dtype=float) + 1.) / nsamp)) wss = (1. / nsamp) * np.sum(h ** 2) # fft of samples for all channels, matching IDL norm convention ft = np.fft.fft(h[:, None] * acdata, axis=0, norm='forward') # linear frequency fft psd = np.abs(ft) / np.sqrt(wss * df) * np.sqrt(2) nsamp2 = nsamp // 2 psd = psd[0:nsamp2, :] freq = freq[0:nsamp2] # bin linear frequencies nbin = 512 hist, edges = np.histogram(freq, bins=nbin, range=frange) freq2 = np.empty(nbin) signal = np.empty((nbin, npix)) for i in range(nbin): freq2[i] = (edges[i] + edges[i + 1]) / 2 f_ind = (freq >= edges[i]) & (freq < edges[i + 1]) signal[i, :] = np.sum(psd[f_ind, :], axis=0) / np.sum(f_ind) # bin log frequencies logf_unbin = np.log10(freq) hist, edges = np.histogram(logf_unbin, bins=nbin, range=np.log10(frange)) logfreq = np.empty(nbin) logsignal = np.empty((nbin, npix)) for i in range(nbin): logfreq[i] = (edges[i] + edges[i + 1]) / 2 f_ind = (logf_unbin >= edges[i]) & (logf_unbin < edges[i + 1]) logsignal[i, :] = np.sum(psd[f_ind, :], axis=0) / np.sum(f_ind) # take the log of the signal too loglogsignal = np.log10(logsignal) loglogsignal[~np.isfinite(logsignal)] = np.nan # save images in dataout # linear frequency x pixels psd_header = fits.Header({ 'EXTNAME': 'FULL_FFT', 'CTYPE1': 'FREQ', 'CTYPE2': 'Pixels', 'CUNIT1': 'Hz', 'CUNIT2': 'pixel', 'CRPIX1': 1, 'CRPIX2': 1, 'CRVAL1': freq[0], 'CRVAL2': 1, 'CDELT1': df, 'CDELT2': 1, 'BUNIT': 'Amps/rtHz'}) self.dataout.header.update(psd_header) self.dataout.imageset(psd.T, imagename='FULL_FFT') # binned linear frequency x pixels signal_header = fits.Header({ 'EXTNAME': 'LINEARFREQ', 'CTYPE1': 'FREQ', 'CTYPE2': 'Pixels', 'CUNIT1': 'Hz', 'CUNIT2': 'pixel', 'CRPIX1': 1, 'CRPIX2': 1, 'CRVAL1': freq2[0], 'CRVAL2': 1, 'CDELT1': freq2[1] - freq2[0], 'CDELT2': 1, 'BUNIT': 'Amps/rtHz'}) self.dataout.imageset(signal.T, imagename='LINEARFREQ', imageheader=signal_header) # binned log frequency x pixels logsignal_header = fits.Header({ 'EXTNAME': 'LOGFREQ', 'CTYPE1': 'FREQ-LOG', 'CTYPE2': 'Pixels', 'CUNIT1': 'Hz', 'CUNIT2': 'pixel', 'CRPIX1': 1, 'CRPIX2': 1, 'CRVAL1': logfreq[0], 'CRVAL2': 1, 'CDELT1': logfreq[1] - logfreq[0], 'CDELT2': 1, 'BUNIT': 'Amps/rtHz'}) self.dataout.imageset(logsignal.T, imagename='LOGFREQ', imageheader=logsignal_header) # binned log signal, log freq loglogsignal_header = fits.Header({ 'EXTNAME': 'LOGFREQ_LOGSIGNAL', 'CTYPE1': 'FREQ-LOG', 'CTYPE2': 'Pixels', 'CUNIT1': 'Hz', 'CUNIT2': 'pixel', 'CRPIX1': 1, 'CRPIX2': 1, 'CRVAL1': logfreq[0], 'CRVAL2': 1, 'CDELT1': logfreq[1] - logfreq[0], 'CDELT2': 1, 'BUNIT': 'log(Amps/rtHz)'}) self.dataout.imageset(loglogsignal.T, imagename='LOGFREQ_LOGSIGNAL', imageheader=loglogsignal_header) # update SOFIA mandated keywords (since this is first pipe step) obsid = 'P_' + self.dataout.getheadval('OBS_ID') self.dataout.setheadval('OBS_ID', obsid) self.dataout.setheadval('PROCSTAT', 'LEVEL_2') self.dataout.setheadval('PIPELINE', 'HAWC_DRP', 'Data processing pipeline') # set ASSC_AOR and ASSC_MSN value in output header try: self.dataout.setheadval('ASSC_AOR', self.dataout.getheadval('AOR_ID'), 'Associated AORs') except KeyError: # pragma: no cover pass try: self.dataout.setheadval('ASSC_MSN', self.dataout.getheadval('MISSN-ID'), 'Associated Mission IDs') except KeyError: # pragma: no cover pass