Source code for sofia_redux.pipeline.gui.qad.qad_imview

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""DS9 Image Viewer for QAD."""

import contextlib
import io
import os
import re
import tempfile
import warnings

from astropy.io import fits
from astropy import log, modeling, stats, table, wcs
import numpy as np
from scipy.stats import gmean

from sofia_redux.pipeline.gui.matplotlib_viewer import MatplotlibPlot
from sofia_redux.toolkit.utilities.fits import set_log_level

try:
    from sofia_redux.calibration.pipecal_photometry import pipecal_photometry
    from sofia_redux.calibration.pipecal_error import PipeCalError
except ImportError:
    HAS_PIPECAL = False
    pipecal_photometry, PipeCalError = None, None
else:
    HAS_PIPECAL = True

try:
    from sofia_redux.visualization.redux_viewer import EyeViewer
except ImportError:
    HAS_EYE = False
    EyeViewer = None
else:
    HAS_EYE = True

try:
    from PyQt5 import QtCore
except ImportError:
    HAS_PYQT5 = False

    # duck type parents to allow class definition
    class QtCore:
        class QObject:
            pass
else:
    HAS_PYQT5 = True

try:
    import regions as ar
except ImportError:
    HAS_REGIONS = False
    ar = None
else:
    HAS_REGIONS = True


__all__ = ['QADImView']


class ViewerSignals(QtCore.QObject):
    make_radial_plot = QtCore.pyqtSignal()
    make_histogram_plot = QtCore.pyqtSignal()
    make_p2p_plot = QtCore.pyqtSignal()


[docs] class QADImView(object): """ View FITS images in DS9. Attributes ---------- ds9 : pyds9.DS9 DS9 instance to display images to. specviewer : EyeViewer Eye instance to display spectra to. plotviewer : MatplotlibPlot Plot viewer widget. files : list of str Currently displayed FITS files. regions : list of str Currently displayed region files. headers : dict FITS file headers for currently displayed files. Keys are filenames; values are lists of headers (astropy.io.fits.Header) radial_data : list of dict Radial flux data, for the radial_plot method to display. histogram_data : list of dict Histogram data, for the histogram_plot method to display. p2p_data : list of dict Pixel-to-pixel data, for the p2p_plot method to display. ptable : astropy.table.Table Photometry data table. disp_parameters : dict Display settings. phot_parameters : dict Photometry settings. plot_parameters : dict Plot settings. cs : {'image', 'wcs', 'none'} Coordinate system for DS9 display and frame locks. """ def __init__(self): """Initialize the viewer.""" self.ds9 = None self.specviewer = None self.plotviewer = None self.break_loop = False self.files = [] self.regions = [] self.headers = {} self.radial_data = [] self.histogram_data = [] self.p2p_data = [] self.ptable = None self.disp_parameters = self.default_parameters('display') self.phot_parameters = self.default_parameters('photometry') self.plot_parameters = self.default_parameters('plot') self.cs = None # useful for testing: flag to always break imexam loop self.break_loop = False # flag for viewer availability self.HAS_DS9 = True self.HAS_EYE = HAS_EYE if not self.HAS_EYE: # pragma: no cover log.warning('Eye viewer is not available. Install ' 'sofia_redux.visualization for spectral display.') # flag for photometry availability self.HAS_PIPECAL = HAS_PIPECAL if not self.HAS_PIPECAL: log.warning('Photometry tools are not available. Install ' 'sofia_redux.calibration to enable photometry.') # always set XPA_METHOD to local os.environ["XPA_METHOD"] = "local" # signals for plot events self.HAS_PYQT5 = HAS_PYQT5 if self.HAS_PYQT5: self.signals = ViewerSignals() self.signals.make_radial_plot.connect(self.radial_plot) self.signals.make_histogram_plot.connect(self.histogram_plot) self.signals.make_p2p_plot.connect(self.pix2pix_plot) else: log.warning('Plotting tools are not available. Install PyQt5 ' 'to enable plotting displays.') self.signals = None def _loaded_data(self): """Check for loaded FITS data in current frame.""" try: dsize = [int(d) for d in self.run('fits size', via='get').split()] except (ValueError, TypeError, AttributeError) as err: log.debug(f' FITS size error: {err}') return False else: if 0 in dsize: return False else: return True def _run_internal(self, cmd, buf=None, via='set'): """ Format commands to send to PyDS9. Parameters ---------- cmd : str Command to send. buf : list, str, or file-like; optional Additional buffers to send. May be a string or a file handler, or a list of those. via : {'get', 'set'}, optional Command method. Returns ------- str DS9 command exit status. """ log.debug(' Command: {}'.format(str(cmd))) retval = None if str(via).lower().strip() == 'set': if type(buf) is list: retval = self.ds9.set(cmd, *buf) else: retval = self.ds9.set(cmd, buf) elif str(via).lower().strip() == 'get': # workaround for DS9 seg fault with empty frame # and wcs requests if 'wcs' in cmd and not self._loaded_data(): log.debug(' No loaded data; skipping frame') retval = '' else: try: retval = self.ds9.get(cmd) except TypeError as err: log.error('Error in pyds9') log.debug(err) else: log.error('Unknown ds9 interaction command: ' + str(via)) return retval def _region_mask(self, cs, all_regions, xctr, yctr, hwcs): """Compute a region mask at a cursor position.""" if not HAS_REGIONS: return None ctr_coord = ar.PixCoord(xctr, yctr) mask = None for reg_str in all_regions: # read ds9 string into a region class try: with set_log_level('CRITICAL'): frame_regions = ar.Regions.parse(reg_str, format='ds9') except Exception as err: log.debug(f'Region parser error: {err}') continue for fr in frame_regions: if cs == 'wcs': # convert to a pixel region first try: with set_log_level('CRITICAL'): fr = fr.to_pixel(hwcs) except Exception as err: # pragma: no cover # error could be anything, since regions package # is in early development state log.debug(f'Region WCS conversion error: {err}') continue # check if cursor is contained in a region # in any frame with set_log_level('CRITICAL'): contained = fr.contains(ctr_coord) if hasattr(contained, '__len__'): # PolygonPixelRegion returns an array, currently # (regions v0.4) contained = contained[0] if contained: # get mask from first matching region try: with set_log_level('CRITICAL'): mask = fr.to_mask() except Exception as err: # pragma: no cover # error could be anything, since regions package # is in early development state log.debug(f'Region mask error: {err}') continue else: log.info(f'Contained in {type(fr).__name__}') break if mask is not None: break # reset active frame return mask
[docs] def default_parameters(self, ptype): """ Retrieve default display, photometry, and plot parameters. Parameters ---------- ptype : {'display', 'photometry', 'plot'} Parameter type to retrieve. Returns ------- dict Parameter settings dictionary. """ ptype = str(ptype).lower() if ptype == 'photometry': defaults = {'model': 'moffat', 'window': 50.0, 'window_units': 'pixels', 'fwhm': 3.0, 'fwhm_units': 'arcsec', 'psf_radius': 12.0, 'aperture_units': 'pixels', 'bg_inner': 15.0, 'bg_width': 10.0, 'show_plots': False} elif ptype == 'display': defaults = {'extension': 'first', 'lock_image': 'wcs', 'lock_slice': 'image', 'scale': 'zscale', 'cmap': 'none', 'zoom_fit': True, 'tile': True, 's2n_range': None, 'eye_viewer': True, 'overplots': True, 'ds9_viewer': True, 'ds9_viewer_pipeline': False, 'ds9_viewer_qad': True} elif ptype == 'plot': defaults = {'window': 50.0, 'window_units': 'pixels', 'color': 'tab10', 'share_axes': 'none', 'separate_plots': True, 'bin': 'fd', 'hist_limits': None, 'p2p_reference': 1, 'summary_stat': 'clipped median'} else: defaults = {} return defaults
[docs] def histogram(self, ctr1, ctr2): """ Create a histogram of pixel values at the selected location. If the cursor is over an enclosed region (circle, box, etc.), then the histogram is computed for the enclosed data. Otherwise, the histogram is computed for an analysis window centered on the cursor position. The window width can be set in the plot parameter dialog; if set to a blank value, the entire image is used to compute the histogram. The binning strategy can also be set in the parameters. Accepted values are any string accepted by the matplotlib hist function, or else an integer number of bins. The range of data for the histogram can also be set in parameters, as min,max values. Parameters ---------- ctr1 : float RA (if DS9 coordinate system is 'wcs') or x-position (if DS9 coordinate system is 'image') at selected image location. ctr2 : float Dec (if DS9 coordinate system is 'wcs') or y-position (if DS9 coordinate system is 'image') at selected image location. """ # check for the current status of the viewer # (tiling, aligned by wcs) if self.run('tile', via='get') == 'yes': allframes = True frames = self.run('frame active', via='get').split() else: allframes = False frames = [self.run('frame', via='get')] if self.run('wcs align', via='get') == 'yes': cs = 'wcs' else: cs = 'image' # get any currently available regions all_regions = self.run(f'regions -system {cs}', allframes=allframes, via='get') if not allframes: all_regions = [all_regions] param = self.plot_parameters for frame in frames: log.info('') if allframes: self.run('frame ' + frame) # check for loaded data if not self._loaded_data(): continue try: results = self.retrieve_data(ctr1, ctr2, photometry=False) except (ValueError, TypeError) as err: log.debug(f'Error in retrieving Frame {frame} data: {err}') continue fulldata = results['fulldata'] data = results['data'] wdw = results['window'] hwcs = results['wcs'] xctr = results['xctr'] yctr = results['yctr'] filename = results['filename'] # get file and ext name if possible log.info(f'Frame {frame}: {filename}') log.info(f'Histogram at x={ctr1}, y={ctr2} ' f'(in {cs} coordinates)') # get data from region mask or window mask = self._region_mask(cs, all_regions, xctr, yctr, hwcs) if mask is None: if param['window'] is None: log.info('Using the full image') reg_name = 'full image' short_reg_name = 'full' hist_data = fulldata else: log.info(f'Using the analysis window ' f'(width: {wdw} pixels)') reg_name = f'{wdw} pixel window' short_reg_name = f'x={xctr:.0f} y={yctr:.0f} {wdw}pix' hist_data = data else: reg_name = 'DS9 region' short_reg_name = f'x={xctr:.0f} y={yctr:.0f} region' hist_data = mask.multiply(fulldata) if hist_data is None: # pragma: no cover # condition occasionally but unreliably encountered # in testing log.warning('Region is too small; skipping histogram') continue hist_data[hist_data == 0] = np.nan hist_data = hist_data.ravel() hist_minmax = (np.nanmin(hist_data), np.nanmax(hist_data), np.nansum(hist_data)) hist_stats = (np.nanmean(hist_data), np.nanmedian(hist_data), np.nanstd(hist_data)) nnan = np.isfinite(hist_data) clip_stats = stats.sigma_clipped_stats(hist_data[nnan]) text_stats = [f'Total pixels: {np.sum(nnan)}', f'Min, max, sum: ' f'{hist_minmax[0]:.5g}, {hist_minmax[1]:.5g}, ' f'{hist_minmax[2]:.5g}', f'Mean, median, stddev: ' f'{hist_stats[0]:.5g}, {hist_stats[1]:.5g}, ' f'{hist_stats[2]:.5g}', f'Clipped mean, median, stddev: ' f'{clip_stats[0]:.5g}, {clip_stats[1]:.5g}, ' f'{clip_stats[2]:.5g}'] for t in text_stats: log.info(t) title = f'Frame {frame}, x={xctr:.0f} y={yctr:.0f} in {reg_name}' l1 = f'F{frame} {short_reg_name}' hist_kwargs = {'bins': param['bin'], 'label': l1, 'alpha': 0.8} if param['hist_limits'] is not None: hist_kwargs['range'] = (param['hist_limits'][0], param['hist_limits'][1]) new_hist = {'plot_type': 'histogram', 'args': [hist_data], 'kwargs': hist_kwargs} if param['separate_plots'] or len(self.histogram_data) < 1: # summary stat (mean, median, clipped mean, or clipped median) summary_stat = str(param.get('summary_stat', 'mean')).lower() if 'clip' in summary_stat: se = clip_stats[2] if 'median' in summary_stat: ss = clip_stats[1] ss_label = 'Clipped median' else: ss = clip_stats[0] ss_label = 'Clipped mean' else: se = hist_stats[2] if 'median' in summary_stat: ss = hist_stats[1] ss_label = 'Median' else: ss = hist_stats[0] ss_label = 'Mean' l2 = f'{ss_label} {ss:.3g} +/- {se:.3g}' overplots = [new_hist] vlines = [ss, ss - se, ss + se] vlabels = [l2, None, None] vstyles = ['-', ':', ':'] for vdata, vlabel, vstyle in zip(vlines, vlabels, vstyles): overplots.append({'plot_type': 'vline', 'args': [vdata], 'kwargs': {'label': vlabel, 'color': 'gray', 'linewidth': 1, 'linestyle': vstyle}}) overplots.append({'plot_type': 'legend', 'args': []}) plot_data = {'args': [], 'kwargs': {'title': title, 'xlabel': 'Flux', 'ylabel': 'Count', 'colormap': param['color']}, 'plot_kwargs': {}, 'overplot': overplots} self.histogram_data.append(plot_data) else: # append new histogram to existing ones plot_data = self.histogram_data[-1] overplots = [] for plot in plot_data['overplot']: if plot['plot_type'] == 'histogram': overplots.append(plot) overplots.append(new_hist) overplots.append({'plot_type': 'legend', 'args': []}) plot_data['overplot'] = overplots plot_data['kwargs']['title'] = 'All histogram regions' if self.signals is not None: self.signals.make_histogram_plot.emit()
[docs] def histogram_plot(self): """Plot radial fluxes in a separate window.""" if not self.HAS_PYQT5: return data = self.histogram_data if data is None or len(data) == 0: return # start up plot viewer if needed if self.plotviewer is None or not self.plotviewer.isVisible(): self.plotviewer = MatplotlibPlot() self.plotviewer.setWindowTitle('Histogram') self.plotviewer.plot_layout = 'rows' self.plotviewer.share_axes = self.plot_parameters['share_axes'] self.plotviewer.plot(data) self.plotviewer.set_scroll('bottom') self.plotviewer.show() self.plotviewer.raise_()
[docs] def imexam(self): """ Start event loop for photometry in DS9. The following keypress values (over the DS9 window) are accepted: * a: Perform photometry at the cursor position. * p: Plot a pixel-to-pixel comparison of all loaded frames at the cursor location. * s: Compute statistics and plot a histogram of the data at the cursor location. * c: Clear active plots and photometry data. * h: Show a help message. * q: Clear previous results and close the imexam event loop. """ if not self.HAS_DS9: # pragma: no cover return usage_message = [ '', 'Available options for ImExam:', ' a: Perform photometry at the cursor location.', ' p: Plot a pixel-to-pixel comparison of all ' 'frames at the cursor location.', ' s: Compute statistics and plot a histogram of the data at the ' 'cursor location.', ' c: Clear active plots and photometry data.', ' h: Show this message.', ' q: Quit ImExam.', ''] for msg in usage_message: log.info(msg) keypress = 'none' while keypress != 'q': # reset ptable if necessary if self.ptable is None: self.reset_ptable() # get new imexam cursor try: if self.run('wcs align', via='get') == 'yes': cs = 'wcs' else: cs = 'image' uval = self.run(f'imexam any coordinate {cs}', via='get') except (ValueError, AttributeError): log.debug('Error in ds9 wcs or imexam command') log.error('Error in ImExam loop') break if str(uval).strip() == '0' or str(uval).strip() == '': # error condition -- break loop log.debug(f'ImExam returned: {uval}') log.error('Error in ImExam loop') break if str(uval).lower().strip().startswith('q'): keypress = 'q' xcoord = None ycoord = None else: uvallist = str(uval).split() if len(uvallist) != 3: log.debug(f'Imexam returned: {uval}') log.error('Error in ImExam loop') break keypress, xcoord, ycoord = uvallist keypress = keypress.lower().strip() xcoord = float(xcoord) ycoord = float(ycoord) if keypress == 'a': # perform profile fits/aperture photometry self.photometry(xcoord, ycoord) # show radial plot if desired if self.phot_parameters['show_plots']: if self.signals is not None: self.signals.make_radial_plot.emit() elif keypress == 's': self.histogram(xcoord, ycoord) elif keypress == 'p': self.pix2pix(xcoord, ycoord) elif keypress == 'h': # show help message for msg in usage_message: log.info(msg) elif keypress == 'c' or keypress == 'q': # reset imexam data self.radial_data = [] self.histogram_data = [] self.p2p_data = [] self.ptable = None if self.plotviewer is not None: self.plotviewer.clear() self.plotviewer.hide() # check for tiling -- if tiled, delete regions # from all active frames if self.run('tile', via='get') == 'yes': tile = True else: tile = False self.run('regions group imexam delete', allframes=tile) if self.break_loop: # break loop if required break
[docs] def run(self, cmd, buf=None, via='set', allframes=False): """ Send an XPA command to DS9. Parameters ---------- cmd : str Command to send. buf : list, str, or file-like; optional Additional buffers to send. May be a string or a file handler, or a list of those. via : {'get', 'set'}, optional Command method. allframes : bool If True, the command should be run once for each active frame. Returns ------- str DS9 command exit status. """ if self.ds9 is None: self.startup() if not self.HAS_DS9: # pragma: no cover return if allframes: fcmd = 'frame active' else: fcmd = 'frame' try: framenums = self._run_internal(fcmd, via='get').split() except (TypeError, ValueError, AttributeError) as err: msg = str(err) if (msg.lower().startswith('ds9 is no longer running') or msg.lower().startswith('no response')): self.startup() framenums = self._run_internal(fcmd, via='get').split() elif msg.lower().startswith("'nonetype' object has no attribute"): framenums = [] else: raise if allframes: retvalarr = [] for frm in framenums: fcmd = 'frame ' + frm self._run_internal(fcmd) retval = self._run_internal(cmd, buf, via) retvalarr.append(retval) return retvalarr else: retval = self._run_internal(cmd, buf, via) return retval
[docs] def get_extension_param(self): """Retrieve the extension setting from the display parameters.""" if 'frame' in self.disp_parameters['extension']: exten = 'all' elif 'cube' in self.disp_parameters['extension']: exten = 'all' elif 'first' in self.disp_parameters['extension']: exten = 0 else: try: exten = int(self.disp_parameters['extension']) except ValueError: exten = str(self.disp_parameters['extension']).strip() return exten
[docs] def load(self, fitsfiles, regfiles=None): """ Load FITS files into DS9. Spectral files will be loaded into the Eye viewer if possible. Spectra are determined by the `spec_test` method. If only headers are provided, they are just stored in the `headers` attribute. Otherwise, all images and region files are loaded into DS9. Parameters ---------- fitsfiles : list of str, `astropy.io.fits.HDUList`, \ or `astropy.io.fits.Header` FITS file paths to load. May also be HDUList instances or just Header instances, if the data is already in memory. regfiles : list of str, optional DS9 region file paths to load. """ if type(fitsfiles) is not list: fitsfiles = [fitsfiles] if regfiles is None: regfiles = [] elif type(regfiles) is not list: regfiles = [regfiles] # read fits files, categorizing into image and spectral files good_files = [] img_files = [] imgdata = [] spec_files = [] specdata = [] regions = [] headers = {} is_data = {} data_type = {} for j, ffile in enumerate(fitsfiles): if isinstance(ffile, list) and \ isinstance(ffile[0], fits.Header): # header only: set aside for header viewing hdr = ffile try: ffile = hdr[0]["FILENAME"] except KeyError: ffile = "Array {}".format(j) headers[ffile] = hdr continue elif isinstance(ffile, fits.HDUList): hdul = ffile hdr = hdul[0].header try: ffile = hdr["FILENAME"] except KeyError: ffile = "Array {}".format(j) is_data[ffile] = True elif os.path.exists(str(ffile)): try: hdul = fits.open(ffile) except IOError: log.warning('Cannot load {}; ignoring.'.format(ffile)) continue is_data[ffile] = False else: log.warning('Cannot load {}; ignoring.'.format(ffile)) continue data_type[ffile] = self.spec_test(hdul) if 'spec' in data_type[ffile]: spec_files.append(ffile) specdata.append(hdul) if data_type[ffile] != 'spectrum_only': # Data that is not spectrum-only should also go to DS9 img_files.append(ffile) imgdata.append(hdul) # load spectral files into Eye if possible if len(spec_files) > 0: if self.disp_parameters['eye_viewer'] and self.HAS_EYE: if self.specviewer is None: self.specviewer = EyeViewer() self.specviewer.start() # send as data if any are not available as files, # otherwise send as files if any([is_data[ffile] for ffile in spec_files]): self.specviewer.update(specdata) else: self.specviewer.update(spec_files) self.specviewer.display() # keep the headers even if the eye is closed for j, ffile in enumerate(spec_files): if data_type[ffile] == 'spectrum_only': # other data will be tracked in img_files section good_files.append(ffile) headers[ffile] = specdata[j][0].header # keep headers for image files even if ds9 is unavailable if len(img_files) > 0: for j, ffile in enumerate(img_files): if is_data[ffile]: good_files.append(imgdata[j]) else: good_files.append(ffile) headers[ffile] = [] for ext in imgdata[j]: headers[ffile].append(ext.header) # also track good region files, regardless for rfile in regfiles: if os.path.exists(rfile): regions.append(rfile) # load image files into ds9 i = 0 if len(img_files) > 0 and self.disp_parameters['ds9_viewer'] \ and self.HAS_DS9: log.info('') self.set_defaults() exten = '' if 'frame' in self.disp_parameters['extension']: cmd = 'multiframe' elif 'cube' in self.disp_parameters['extension']: cmd = 'mecube' elif 'first' in self.disp_parameters['extension']: cmd = 'fits' else: try: exten = int(self.disp_parameters['extension']) except ValueError: exten = str(self.disp_parameters['extension']).strip() cmd = 'fits' for j, ffile in enumerate(img_files): ds9_cmd = cmd # automagic an S/N extension if desired if str(exten).upper() == 'S/N': try: s2n = self._make_s2n(ffile, imgdata[j]) except ValueError as err: log.error(err) continue is_data[ffile] = True imgdata[j] = s2n try: frame_to_load = int(self.run('frame', via='get')) + 1 except ValueError: frame_to_load = 1 if exten != '': extenstr = f'[{exten}]' else: extenstr = '' try: if not is_data[ffile]: if cmd == 'multiframe': ds9_cmd = "{} {}{}".format(cmd, ffile, extenstr) else: ds9_cmd = "{} new {}{}".format(cmd, ffile, extenstr) log.debug("Running DS9 command: {}".format(ds9_cmd)) status = self.run(ds9_cmd) else: self.run('frame new') # display FILENAME keyword in addition to filename self.run('view keyvalue "{}"'.format("'FILENAME'")) self.run('view keyword yes') if exten != '' and str(exten).upper() != 'S/N': try: phu = imgdata[j][0].copy() phu.data = None data = fits.HDUList([phu, imgdata[j][exten]]) except KeyError as err: log.error(err) raise ValueError(err) else: data = imgdata[j] # try to stream the data to DS9. If that fails, # try to write it to disk, then load it try: log.debug("Loading from memory") status = self._load_from_memory(cmd, ffile, data) except ValueError: log.debug("Loading from tempfile") status = self._load_from_tempfile(cmd, ffile, data) log.info(f'Loaded frame {frame_to_load}: ' f'{os.path.basename(ffile)}{extenstr}') except ValueError as err: msg = str(err) if exten != '': if str(exten).lower() in msg.lower(): log.warning(f'Error loading extension ' f'{exten} for file ' f'{os.path.basename(ffile)}') continue else: status = 0 log.error(f'Error in XPA command: {ds9_cmd}') log.error(msg) else: status = 0 log.error(f'Error in XPA command: {ds9_cmd}') log.error(msg) if status == 1 and self.disp_parameters['overplots']: # overlay photometric and/or spectroscopic apertures self.overlay() if len(specdata) > 0: nspec = len(specdata) if i >= nspec: hdr = specdata[0][0].header else: hdr = specdata[i][0].header try: with set_log_level('ERROR'): hwcs = wcs.WCS(headers[ffile][0]) except (ValueError, IndexError, MemoryError, AttributeError, TypeError): hwcs = None try: self.overlay_aperture(hdr, hwcs=hwcs) except ValueError: # pragma: no cover # may be encountered with extensions with # unexpected WCSs pass i += 1 if str(self.cs).strip().lower() != 'none': self.run('match frame ' + self.cs) if self.disp_parameters['zoom_fit']: frames = self.run('frame active', via='get').split() if len(frames) > 0: self.run(f'frame {frames[0]}') self.run('zoom to fit') if self.disp_parameters['cmap'].lower() != 'none': self.run('cmap ' + self.disp_parameters['cmap'].lower()) if self.disp_parameters['scale'].lower() != 'none': self.run('scale ' + self.disp_parameters['scale'].lower()) if len(regfiles) > 0: frames = self.run('frame active', via='get').split() nframes = len(frames) if nframes == len(regfiles): for reg_i, rfile in enumerate(regfiles): if os.path.exists(rfile): self.run('frame {}'.format(frames[reg_i])) self.run('region load {}'.format(rfile)) else: for rfile in regfiles: if os.path.exists(rfile): self.run('region load all ' + rfile) # reset the photometry table self.ptable = None self.radial_data = [] self.histogram_data = [] self.p2p_data = [] self.run('raise') # keep track of the files loaded into ds9, for reloading # if ds9 is closed self.files = good_files self.regions = regions self.headers = headers
def _load_from_tempfile(self, cmd, ffile, data): """Write a tempfile and load it into DS9.""" with tempfile.NamedTemporaryFile(delete=False) as \ new_file: new_name = new_file.name try: data.writeto(new_name, overwrite=True) ds9_cmd = "{} {}".format(cmd, new_name) log.debug("Running DS9 command: {}".format(ds9_cmd)) status = self.run(ds9_cmd) os.remove(new_name) except (TypeError, OSError, ValueError): log.warning("Cannot load image {} " "from tempfile".format(ffile)) status = 0 return status def _load_from_memory(self, cmd, ffile, data): """Use BytesIO to stream HDU data to DS9.""" status = 0 with contextlib.closing(io.BytesIO()) as new_file: new_file.name = ffile try: data.writeto(new_file, overwrite=True) new_fits = new_file.getvalue() log.debug("Running DS9 command: {}".format(cmd)) status = self.run(cmd, buf=[new_fits, len(new_fits)]) except (TypeError, ValueError): msg = "Cannot load image {} " \ "from memory".format(ffile) log.warning(msg) raise ValueError(msg) return status def _make_s2n(self, ffile, hdul): """Retrieve or make an S/N image.""" bname = os.path.basename(ffile) phu = hdul[0].copy() phu.data = None if 'S/N' in hdul: hdu = hdul['S/N'] else: if 'FLUX' in hdul and 'ERROR' in hdul: log.debug(f'Making S/N image from FLUX and ERROR ' f'extensions for {bname}.') hdu = fits.ImageHDU(hdul['FLUX'].data, hdul['FLUX'].header) s = hdul['FLUX'].data n = hdul['ERROR'].data elif 'FLUX' in hdul and 'STDDEV' in hdul: log.debug(f'Making S/N image from FLUX and STDDEV ' f'extensions for {bname}.') hdu = fits.ImageHDU(hdul['FLUX'].data, hdul['FLUX'].header) s = hdul['FLUX'].data n = hdul['STDDEV'].data elif 'STOKES I' in hdul and 'ERROR I' in hdul: log.debug(f'Making S/N image from STOKES I and ' f'ERROR I extensions for {bname}.') hdu = fits.ImageHDU(hdul['STOKES I'].data, hdul['STOKES I'].header) s = hdul['STOKES I'].data n = hdul['ERROR I'].data else: raise ValueError(f'Cannot determine S/N from extensions ' f'in file {bname}') hdu.data = s / n hdu.header['EXTNAME'] = 'S/N' hdu.header['BUNIT'] = '' # blank out data outside of range try: low, high = self.disp_parameters['s2n_range'] hdu.data[hdu.data < float(low)] = np.nan hdu.data[hdu.data > float(high)] = np.nan except (ValueError, AttributeError, IndexError, TypeError): pass s2n = fits.HDUList([phu, hdu]) return s2n
[docs] def lock(self, cs=None, ltype=None, off=False): """ Lock DS9 frames to the desired coordinate system. Parameters ---------- cs : {'image', 'wcs', 'none'}, optional Coordinate system to lock to. Display parameters will be used if not provided. ltype : {'frame', 'crosshair', 'crop', 'slice', \ 'bin', 'scale', 'colorbar', 'smooth'}, optional Lock type. If not provided, all will be locked. off : bool If True, locks will be turned off. """ icstypes = ['frame', 'crosshair', 'crop'] scstypes = ['slice'] booltypes = ['bin', 'scale', 'colorbar', 'smooth'] if off: cs = 'none' bval = 'no' else: bval = 'yes' if cs is None: ics = self.disp_parameters['lock_image'] scs = self.disp_parameters['lock_slice'] else: ics = cs scs = cs if ltype is not None: ltypes = [ltype.lower().strip()] else: # lock all ltypes = icstypes + scstypes + booltypes for ltype in ltypes: if ltype in icstypes: self.run('lock ' + ltype + ' ' + ics) elif ltype in scstypes: self.run('lock ' + ltype + ' ' + scs) elif ltype in booltypes: self.run('lock ' + ltype + ' ' + bval) self.cs = ics
[docs] def overlay(self): """ Overlay photometry aperture regions. Aperture parameters are read from the SRCPOSX/Y, STCENTX/Y, PHOTAPER, PHOTSKAP, STAPFLX, and STAPSKY header keywords in the FITS data in the currently displayed frame. """ # retrieve header for photometry keywords # from current frame only hdr_str = self.run('fits header', via='get') # read it in to a fits header phdr = fits.Header() hdr = phdr.fromstring(hdr_str, sep='\n') try: srcposx = hdr['SRCPOSX'] + 1 srcposy = hdr['SRCPOSY'] + 1 s1 = 'point({:f} {:f}) # ' \ 'point=x ' \ 'color=blue tag={{srcpos}} '\ 'text=SRCPOS'.format(srcposx, srcposy) self.run('regions', s1) except (KeyError, ValueError): pass try: stcentx = hdr['STCENTX'] + 1 stcenty = hdr['STCENTY'] + 1 photaper = hdr['PHOTAPER'] photskap = [float(x) for x in hdr['PHOTSKAP'].split(',')] s1 = 'point({:f} {:f}) # ' \ 'point=x ' \ 'color=cyan tag={{srcpos}}'.format(stcentx, stcenty) self.run('regions', s1) s2 = 'circle({:f} {:f} {:f}) # ' \ 'color=cyan tag={{srcpos}}'.format( stcentx, stcenty, photaper) self.run('regions', s2) s3 = 'annulus({:f} {:f} {:f} {:f}) # ' \ 'color=cyan tag={{srcpos}} text=STCENT'.format( stcentx, stcenty, photskap[0], photskap[1]) self.run('regions', s3) except (KeyError, ValueError): pass try: stcentx = hdr['STCENTX'] + 1 stcenty = hdr['STCENTY'] + 1 flux = hdr['STAPFLX'] sky = hdr['STAPSKY'] s1 = 'text({:f} {:f}) # color=cyan ' \ 'text="Flux={:.2f}, Sky={:.2f}"'.format( stcentx, stcenty - 40, flux, sky) self.run('regions', s1) except (KeyError, ValueError): pass # try overlaying apertures as well try: self.overlay_aperture(hdr) except ValueError: # pragma: no cover # may be encountered with extensions with # unexpected WCSs pass
[docs] def overlay_aperture(self, hdr, hwcs=None): """ Overlay spectroscopic aperture regions. Aperture parameters are determined from the FITS header keywords APPOSO01, APRADO01, PSFRAD01, and BGR. A primary spectral WCS is required. Parameters ---------- hdr : `astropy.io.fits.Header` FITS header to read apertures from. """ # test for appropriately sized data if hwcs is None: try: with set_log_level('ERROR'): hwcs = wcs.WCS(hdr) except (ValueError, IndexError, MemoryError, AttributeError, TypeError): return naxis = hwcs.pixel_shape if not naxis or len(naxis) != 2: return minval = hwcs.wcs_pix2world([[1, 1]], 1)[0][0] maxval = hwcs.wcs_pix2world([[naxis[0], 1]], 1)[0][0] regions = [] width = 1 try: appos = hdr['APPOSO01'].split(',') if 'APRADO01' in hdr: aprad = hdr['APRADO01'].split(',') else: aprad = None if 'PSFRAD01' in hdr: psfrad = hdr['PSFRAD01'].split(',') else: psfrad = None line_template = 'wcs; linear; line({:f} {:f} {:f} {:f}) # ' \ 'color={:s} tag={{aperture}} '\ 'width={:d}' for j in range(len(appos)): pos = float(appos[j]) s1 = line_template.format(minval, pos, maxval, pos, 'cyan', width) regions.extend([s1]) if aprad is not None: try: apr = float(aprad[j]) s2 = line_template.format(minval, pos + apr, maxval, pos + apr, 'green', width) s3 = line_template.format(minval, pos - apr, maxval, pos - apr, 'green', width) regions.extend([s2, s3]) except IndexError: pass if psfrad is not None: try: psf = float(psfrad[j]) s4 = line_template.format(minval, pos + psf, maxval, pos + psf, 'blue', width) s5 = line_template.format(minval, pos - psf, maxval, pos - psf, 'blue', width) regions.extend([s4, s5]) except IndexError: pass bgr = hdr.get('BGR') if not bgr: bgr = hdr.get('BGR_O01') if bgr: bgreg = re.split('[,;]', bgr) for bg in bgreg: r1, r2 = bg.split('-') s6 = line_template.format(minval, float(r1), maxval, float(r1), 'red', width) s7 = line_template.format(minval, float(r2), maxval, float(r2), 'red', width) regions.extend([s6, s7]) except (KeyError, ValueError): pass for reg in regions: try: self.run('regions', reg) except ValueError: # A bad WCS in the ds9 image will cause the # region set to fail -- in this case, don't continue # to try region break
[docs] def photometry(self, ctr1, ctr2): """ Perform aperture photometry at a selected position. For each active frame, the following steps are performed: * A window around the selected position is extracted from the FITS image. * NaNs in the extracted data are removed, the absolute value of the extracted data is taken, and the median value of the extracted data is subtracted. * The brightest peak in the window is located. * A model is fit to the extracted data, centered at the peak location. The model is either a 2-D Moffat or Gaussian, depending on the current photometry parameters. * A circular aperture and sky annulus are placed at the fit position, and fluxes are extracted. * Model fit parameters (centroid, FWHM, etc.) and fluxes are printed to the log at INFO level. Parameters ---------- ctr1 : float RA (if DS9 coordinate system is 'wcs') or x-position (if DS9 coordinate system is 'image') at selected image location. ctr2 : float Dec (if DS9 coordinate system is 'wcs') or y-position (if DS9 coordinate system is 'image') at selected image location. """ if not self.HAS_PIPECAL: log.warning('Photometry is not available. The ' 'sofia_redux.calibration package is required.') return # reset photometry table if necessary if self.ptable is None: self.reset_ptable() # get photometry parameters param = self.phot_parameters # check for the current status of the viewer # (tiling, aligned by wcs) if self.run('tile', via='get') == 'yes': allframes = True frames = self.run('frame active', via='get').split() else: allframes = False frames = [self.run('frame', via='get')] if self.run('wcs align', via='get') == 'yes': cs = 'wcs' else: cs = 'image' # log input values log.info(f'Photometry at x={ctr1}, y={ctr2} (in {cs} coordinates)') log.info('Parameters:') log.info(f" Model: {param['model']}") log.info(f" Window: {param['window']} {param['window_units']}") log.info(f" Starting FWHM: {param['fwhm']} {param['fwhm_units']}") log.info(f" Aperture: {param['psf_radius']} " f"{param['aperture_units']}") log.info(f" Background: radius {param['bg_inner']} " f"{param['aperture_units']}, width {param['bg_width']} " f"{param['aperture_units']}") log.info('') for frame in frames: if allframes: log.debug('Selecting frame ' + frame) self.run('frame ' + frame) try: results = self.retrieve_data(ctr1, ctr2) except (ValueError, TypeError) as err: log.debug(f'Error in retrieving Frame {frame} data: {err}') continue ps = results['pix_scale'] data = results['data'] fulldata = results['fulldata'] hwcs = results['wcs'] wdw = results['window'] xstart = results['xstart'] ystart = results['ystart'] xctr = results['xctr'] yctr = results['yctr'] filename = results['filename'] log.info(f'Frame {frame}: {filename}') # check for reasonable data if np.sum(np.isfinite(data)) < 3: continue default_fwhm = param['fwhm'] if param['fwhm_units'] == 'arcsec': default_fwhm /= ps try: psfr = float(param['psf_radius']) if param['aperture_units'] == 'arcsec': psfr /= ps except ValueError: # auto radius psfr = 2.15 * default_fwhm if (param['bg_inner'] is None or param['bg_width'] is None): do_bg = False skyrad = (0., 0.) else: do_bg = True try: bgrin = float(param['bg_inner']) if param['aperture_units'] == 'arcsec': bgrin /= ps except ValueError: bgrin = psfr + 0.2 * default_fwhm try: bgwid = float(param['bg_width']) if param['aperture_units'] == 'arcsec': bgwid /= ps bgrout = bgrin + bgwid except ValueError: bgrout = bgrin + 2.0 * default_fwhm if bgrout > bgrin: skyrad = (bgrin, bgrout) else: skyrad = (0., 0.) try: phot_par = pipecal_photometry( fulldata, np.full_like(fulldata, np.nan), srcpos=(xctr, yctr), fitsize=wdw, fwhm=default_fwhm, profile=param['model'], aprad=psfr, skyrad=skyrad, stamp_center=False, allow_badfit=True) except PipeCalError as err: log.warning(' Bad fit.') log.warning(f' {err}') continue peak, xcent, ycent, ra, dec, xfwhm, yfwhm, ellip, \ pa, pw_law, final_sum, bg_avg, bg_std = [np.nan] * 13 bg_fit = 0.0 for pp in phot_par: if pp['key'] == 'STPEAK': peak = pp['value'][0] elif pp['key'] == 'STCENTX': xcent = pp['value'][0] elif pp['key'] == 'STCENTY': ycent = pp['value'][0] elif pp['key'] == 'STFWHMX': xfwhm = pp['value'][0] elif pp['key'] == 'STFWHMY': yfwhm = pp['value'][0] elif pp['key'] == 'STANGLE': pa = pp['value'][0] elif pp['key'] == 'STPWLAW': pw_law = pp['value'][0] elif pp['key'] == 'STAPFLX': final_sum = pp['value'][0] elif pp['key'] == 'STAPSKY' and do_bg: bg_avg = pp['value'][0] elif pp['key'] == 'STAPSSTD' and do_bg: bg_std = pp['value'] elif pp['key'] == 'STBKG': bg_fit = pp['value'][0] # check whether source is already in table limit = 2. * default_fwhm present = (int(frame) == self.ptable['Frame']) \ & (np.abs(self.ptable['X'] - (xcent + 1)) < limit) \ & (np.abs(self.ptable['Y'] - (ycent + 1)) < limit) if np.any(present): log.info(' Source already measured.') continue # check whether source is unreasonably large or small badfit = False with warnings.catch_warnings(): warnings.simplefilter('ignore') mfwhm = gmean([xfwhm, yfwhm]) if np.isnan(mfwhm) or mfwhm > 20 or mfwhm < 1.0: log.warning(' Bad fit.') log.warning(' Calculated FWHM: {:.2f} pixels'.format(mfwhm)) badfit = True mfwhm = np.nan ellip = np.nan pa = np.nan else: # calculate ellipticity and fix PA if xfwhm >= yfwhm: ellip = 1 - yfwhm / xfwhm else: ellip = 1 - xfwhm / yfwhm if pa <= 0: pa += 90 else: pa -= 90 # track flux by radial distance if param['show_plots']: y, x = np.mgrid[:wdw, :wdw] r = np.sqrt((x - xcent + xstart) ** 2 + (y - ycent + ystart) ** 2) if badfit: moddata = None else: # get the equivalent 1D model from the profile fit # for plotting if param['model'] == 'gaussian': eqw = mfwhm * stats.gaussian_fwhm_to_sigma rmodel = modeling.models.Gaussian1D(peak, 0.0, eqw) else: n_1 = 1 / pw_law eqw = mfwhm / (2 * np.sqrt(2 ** n_1 - 1)) rmodel = modeling.models.Moffat1D( peak, 0.0, eqw, pw_law) rmodel += modeling.models.Const1D(bg_fit) moddata = rmodel(r) # data for matplotlib viewer: primary is model; # scatter data and h/v lines are overplots rflat = r.ravel() dflat = data.ravel() sortidx = np.argsort(rflat) xdata = rflat[sortidx] overplots = [{'plot_type': 'scatter', 'args': [rflat, dflat], 'kwargs': {'marker': '*', 'c': dflat, 'label': 'Flux data'}}, {'plot_type': 'hline', 'args': [0.0], 'kwargs': {'linestyle': ':', 'linewidth': 1, 'color': 'lightgray'}}] if moddata is not None: ydata = moddata.ravel()[sortidx] overplots.append({'plot_type': 'vline', 'args': [mfwhm / 2.0], 'kwargs': { 'linestyle': ':', 'linewidth': 1, 'color': '#ff7f0e', 'label': 'Fit HWHM'}}) overplots.append({'plot_type': 'vline', 'args': [mfwhm], 'kwargs': { 'linestyle': ':', 'linewidth': 1, 'color': '#d62728', 'label': 'Fit FWHM'}}) else: ydata = np.full_like(xdata, np.nan) title = f'Frame {frame}, x={xcent:.0f} y={ycent:.0f}' overplots.append({'plot_type': 'legend', 'args': []}) plot_data = {'args': [xdata, ydata], 'kwargs': { 'title': title, 'xlabel': 'Distance (pixels)', 'ylabel': 'Flux'}, 'plot_kwargs': { 'linestyle': '-', 'color': 'gray', 'label': f"{param['model'].title()} profile"}, 'overplot': overplots} self.radial_data.append(plot_data) # add DS9 start index back into centroid and convert to RA/Dec xcent += 1 ycent += 1 if hwcs is not None: try: radec = hwcs.wcs_pix2world([[xcent, ycent, 1]], 1) except ValueError: try: radec = hwcs.wcs_pix2world([[xcent, ycent]], 1) except ValueError: radec = np.array([[None, None]]) else: radec = np.array([[None, None]]) # set region b0 = 'point({:f} {:f}) # ' \ 'point=x ' \ 'color=green tag={{imexam}}'.format(xcent, ycent) self.run('regions', b0) b1 = 'circle({:f} {:f} {:f}) # ' \ 'color=green tag={{imexam}}'.format(xcent, ycent, psfr) self.run('regions', b1) if do_bg: b2 = 'annulus({:f} {:f} {:f} {:f}) # ' \ 'color=red ' \ 'tag={{imexam}}'.format(xcent, ycent, skyrad[0], skyrad[1]) self.run('regions', b2) self.ptable.add_row([frame, peak, xcent, ycent, radec[0, 0], radec[0, 1], mfwhm, mfwhm * ps, ellip, pa, final_sum, bg_avg, bg_std]) self.ptable.sort(['Frame', 'Peak']) print_str = '\n'.join( self.ptable.pformat(max_lines=-1, max_width=-1)) log.info(f'\nResults:\n{print_str}\n')
[docs] def pix2pix(self, ctr1, ctr2): """ Create a comparison of pixel values at the selected location. Two or more frames must be loaded to generate a pixel comparison plot. The reference frame is Frame 1 by default, but can be changed in the plot parameters. If the cursor is over an enclosed region (circle, box, etc.), then the comparison is computed for the enclosed data. Otherwise, the comparison is computed for an analysis window centered on the cursor position. The window width can be set in the plot parameter dialog; if set to a blank value, the entire image is used to compute the comparison. Parameters ---------- ctr1 : float RA (if DS9 coordinate system is 'wcs') or x-position (if DS9 coordinate system is 'image') at selected image location. ctr2 : float Dec (if DS9 coordinate system is 'wcs') or y-position (if DS9 coordinate system is 'image') at selected image location. """ # check for the current status of the viewer # (tiling, aligned by wcs) if self.run('tile', via='get') == 'yes': allframes = True frames = self.run('frame active', via='get').split() else: allframes = False frames = [self.run('frame', via='get')] if self.run('wcs align', via='get') == 'yes': cs = 'wcs' else: cs = 'image' if len(frames) < 2: log.info('') log.info('Pixel comparison requires 2 or more ' 'frames to be shown.') return # get any currently available regions all_regions = self.run(f'regions -system {cs}', allframes=allframes, via='get') if not allframes: # pragma: no cover # this shouldn't be reachable, since frames >= 2 all_regions = [all_regions] param = self.plot_parameters p2p_data_sets = {} label = {} reg_name = {} for frame in frames: log.info('') if allframes: self.run('frame ' + frame) # check for loaded data if not self._loaded_data(): continue try: results = self.retrieve_data(ctr1, ctr2, photometry=False) except (ValueError, TypeError) as err: log.debug(f'Error in retrieving Frame {frame} data: {err}') continue fulldata = results['fulldata'] data = results['data'] wdw = results['window'] hwcs = results['wcs'] xctr = results['xctr'] yctr = results['yctr'] filename = results['filename'] # get file and ext name if possible log.info(f'Frame {frame}: {filename}') log.info(f'Pixel comparison at x={ctr1}, y={ctr2} ' f'(in {cs} coordinates)') # get data from region mask or window mask = self._region_mask(cs, all_regions, xctr, yctr, hwcs) if mask is None: if param['window'] is None: log.info('Using the full image') reg_name[frame] = 'full image' short_reg_name = 'full' p2p_data = fulldata else: log.info(f'Using the analysis window ' f'(width: {wdw} pixels)') reg_name[frame] = f'{wdw} pixel window' short_reg_name = f'x={xctr:.0f} y={yctr:.0f} {wdw}pix' p2p_data = data else: reg_name[frame] = 'DS9 region' short_reg_name = f'x={xctr:.0f} y={yctr:.0f} region' p2p_data = mask.multiply(fulldata) p2p_data[p2p_data == 0] = np.nan p2p_data = p2p_data.ravel() p2p_data_sets[frame] = p2p_data label[frame] = f'F{frame} {short_reg_name}' log.info('') ref_frame = str(param['p2p_reference']) log.info(f'Reference frame: {ref_frame}') if ref_frame in p2p_data_sets: ref_data = p2p_data_sets[ref_frame] mp = (np.nanmean(ref_data), np.nanmean(ref_data)) overplots = [] for f in p2p_data_sets: if f != ref_frame: if p2p_data_sets[f].size == ref_data.size: new_p2p = {'plot_type': 'plot', 'args': [ref_data, p2p_data_sets[f]], 'kwargs': {'label': label[f], 'alpha': 0.8, 'linestyle': '', 'marker': '.'}} overplots.append(new_p2p) else: # pragma: no cover # todo: this clause and the next are unreachable # with current mock test fixture, since it # returns the same data array for all frames log.warning(f'Data pixelation mismatch; ' f'skipping frame {f}') if len(overplots) < 1: # pragma: no cover log.info('No data to plot.') return title = f'Pixel comparison to Frame {ref_frame}, ' \ f'{reg_name[ref_frame]}' legend = {'plot_type': 'legend', 'args': []} line = {'plot_type': 'line', 'args': [mp], 'kwargs': {'linestyle': ':', 'color': 'gray', 'slope': 1.0}} if param['separate_plots'] or len(self.p2p_data) < 1: overplots.extend([line, legend]) plot_data = {'args': [], 'kwargs': {'title': title, 'xlabel': 'Reference flux', 'ylabel': 'Comparison flux', 'colormap': param['color']}, 'plot_kwargs': {}, 'overplot': overplots} self.p2p_data.append(plot_data) else: # append new dataset to existing ones plot_data = self.p2p_data[-1] old_overplots = [] for plot in plot_data['overplot']: if plot['plot_type'] == 'plot': old_overplots.append(plot) old_overplots.extend(overplots) old_overplots.extend([line, legend]) plot_data['overplot'] = old_overplots plot_data['kwargs']['title'] = 'All pixel comparisons' if self.signals is not None: self.signals.make_p2p_plot.emit() else: log.warning(f'Reference frame {ref_frame} is not loaded')
[docs] def pix2pix_plot(self): """Plot pixel-to-pixel comparison in a separate window.""" if not self.HAS_PYQT5: return data = self.p2p_data if data is None or len(data) == 0: return # start up plot viewer if needed if self.plotviewer is None or not self.plotviewer.isVisible(): self.plotviewer = MatplotlibPlot() self.plotviewer.setWindowTitle('Pixel-to-Pixel Comparison') self.plotviewer.plot_layout = 'rows' self.plotviewer.share_axes = self.plot_parameters['share_axes'] self.plotviewer.plot(data) self.plotviewer.set_scroll('bottom') self.plotviewer.show() self.plotviewer.raise_()
[docs] def radial_plot(self): """Plot radial fluxes in a separate window.""" if not self.HAS_PYQT5: return data = self.radial_data if data is None or len(data) == 0: return # start up plot viewer if needed if self.plotviewer is None or not self.plotviewer.isVisible(): self.plotviewer = MatplotlibPlot() self.plotviewer.setWindowTitle('Radial Profiles') self.plotviewer.plot_layout = 'rows' self.plotviewer.share_axes = self.plot_parameters['share_axes'] self.plotviewer.plot(data) self.plotviewer.set_scroll('bottom') self.plotviewer.show() self.plotviewer.raise_()
[docs] def reload(self): """Reload previously loaded FITS files.""" if len(self.files) > 0: self.load(self.files, regfiles=self.regions)
[docs] def reset(self): """Reset to starting state.""" self.files = [] self.regions = [] self.headers = {} self.radial_data = [] self.histogram_data = [] self.p2p_data = [] self.ptable = None
[docs] def reset_ptable(self): self.ptable = table.Table(names=['Frame', 'Peak', 'X', 'Y', 'RA', 'Dec', 'FWHM (px)', 'FWHM (")', 'Ellip.', 'PA', 'Flux', 'BG/pix', 'BG Std.'], dtype=[int, float, float, float, float, float, float, float, float, float, float, float, float]) flt_col = ['Peak', 'X', 'Y', 'FWHM (px)', 'FWHM (")', 'Ellip.', 'PA', 'Flux', 'BG/pix', 'BG Std.'] for col in flt_col: self.ptable[col].format = '.4g' self.ptable['RA'].format = '.7g' self.ptable['Dec'].format = '.7g'
[docs] def retrieve_data(self, ctr1, ctr2, cube=False, photometry=True): """ Extract a sub region from the active frame. Parameters ---------- ctr1 : float RA (if DS9 coordinate system is 'wcs') or x-position (if DS9 coordinate system is 'image') at selected image location. ctr2 : float Dec (if DS9 coordinate system is 'wcs') or y-position (if DS9 coordinate system is 'image') at selected image location. cube : bool, optional If True, extract a region from the full data cube. Otherwise, extract from the current slice only. photometry : bool, optional If True, the photometry parameters are used to determine the data window. Otherwise, the plot parameters are used. Returns ------- dict Extracted data for the current frame. Keys are: 'cs', 'pix_scale', 'wcs', 'window', 'xstart', 'ystart', 'xctr', 'yctr', 'data', 'fulldata', 'header' """ if photometry: param = self.phot_parameters else: param = self.plot_parameters if self.run('wcs align', via='get') == 'yes': cs = 'wcs' else: cs = 'image' # retrieve header for wcs from current frame hdr_str = self.run('fits header', via='get') phdr = fits.Header() hdr = phdr.fromstring(hdr_str, sep='\n') try: with set_log_level('ERROR'): hwcs = wcs.WCS(hdr) psd2 = wcs.utils.proj_plane_pixel_scales(hwcs.celestial) with warnings.catch_warnings(): warnings.simplefilter('ignore') ps = np.mean(psd2) * 3600 except (ValueError, IndexError, MemoryError, AttributeError, TypeError): log.warning('Error reading WCS. Setting plate scale to 1.0.') hwcs = None ps = 1.0 if np.isnan(ps): log.warning('Error reading WCS. Setting plate scale to 1.0.') hwcs = None ps = 1.0 # get file/ext name if possible filename = hdr.get('FILENAME', '') extname = hdr.get('EXTNAME', None) if extname is not None: filename = f'{filename} [{extname}]' # convert RA/Dec to pix if necessary if cs == 'wcs' and hwcs is not None: if hwcs.naxis == 2: ctr = [[ctr1, ctr2]] else: ctr = [[ctr1, ctr2, 1]] with set_log_level('ERROR'): pixctr = hwcs.wcs_world2pix(ctr, 1) # occasionally, values get returned as pixels instead # of RA/Dec - eg. when there is an empty frame around if np.any(np.isnan(pixctr)): log.debug('WCS position retrieval failed; ' 'assuming pixel postions') xctr = ctr1 yctr = ctr2 else: xctr = pixctr[0, 0] yctr = pixctr[0, 1] else: xctr = ctr1 yctr = ctr2 # subtract 1 for numpy indexing xctr -= 1 yctr -= 1 # retrieve data from current slice from viewer try: pdata = self.ds9.get_arr2np() except ValueError: log.warning('Displayed data cannot be retrieved as an array.') log.warning('Try turning off cube display for ' 'multi-extension files.') raise dim = pdata.shape if len(dim) > 2: zdim, ydim, xdim = dim else: ydim, xdim = dim zdim = 0 dslice = int(self.run('cube', via='get')) # set fitting window and retrieve subimage if param['window'] is None: pix_wdw = np.inf elif param['window_units'] == 'arcsec': pix_wdw = param['window'] / ps else: pix_wdw = param['window'] wdw = int(np.min([pix_wdw, xdim, ydim])) xstart = int(xctr - wdw / 2.0) ystart = int(yctr - wdw / 2.0) if xstart < 0: xstart = 0 elif xstart + wdw > xdim: xstart = xdim - wdw if ystart < 0: ystart = 0 elif ystart + wdw > ydim: ystart = ydim - wdw if zdim >= dslice: if not cube: log.debug(f'Retrieving slice {dslice}') fulldata = pdata[dslice - 1, :, :] data = pdata[dslice - 1, ystart:ystart + wdw, xstart:xstart + wdw] else: fulldata = pdata data = pdata[:, ystart:ystart + wdw, xstart:xstart + wdw] elif zdim > 0: fulldata = pdata[0, :, :] data = pdata[0, ystart:ystart + wdw, xstart:xstart + wdw] else: fulldata = pdata data = pdata[ystart:ystart + wdw, xstart:xstart + wdw] results = {'cs': cs, 'pix_scale': ps, 'wcs': hwcs, 'window': wdw, 'xstart': xstart, 'ystart': ystart, 'xctr': xctr, 'yctr': yctr, 'data': data, 'fulldata': fulldata, 'header': hdr, 'filename': filename} return results
[docs] def set_defaults(self): """Reset DS9 configuration from display parameters.""" if not self.HAS_DS9: # pragma: no cover return self.run('frame delete all') self.run('wcs degrees') if self.disp_parameters['tile']: self.run('tile yes') else: self.run('tile no') self.cs = str(self.disp_parameters['lock_image']).lower() self.lock()
[docs] def spec_test(self, hdul): """ Test for a spectrum the Eye can display. The data is considered spectral if a 'spectral_flux' extension is present. It is 'spectrum_only' if NAXIS1 is greater than 0 and NAXIS2 is less than 6. Otherwise, it is 'image'. Parameters ---------- header : `astropy.io.fits.Header` FITS header to test. Returns ------- str 'spectrum', 'spectrum_only', or 'image'. """ # check for mixed spectral product in # single spectral extension or multi-order spectra # (may be SPECTRAL_FLUX or SPECTRAL_FLUX_ORDER_*) for hdu in hdul: extname = str(hdu.header.get('EXTNAME', 'UNKNOWN')).lower() if 'spectral_flux' in extname: return 'spectrum' header = hdul[0].header if 'NAXIS1' in header: xdim = header['NAXIS1'] else: xdim = 0 if 'NAXIS2' in header: ydim = header['NAXIS2'] else: ydim = 0 if xdim > 0 and ydim < 6: return 'spectrum_only' else: return 'image'
[docs] def startup(self): """Start up DS9.""" log.debug('Starting DS9.') # lazy import pyds9 because it has non-trivial startup behavior try: import pyds9 except (ImportError, ValueError): # PyDS9 sometimes fails to import for internal reasons. log.error('Cannot import PyDS9. DS9 display ' 'will not be available.') self.HAS_DS9 = False return else: self.HAS_DS9 = True try: self.ds9 = pyds9.DS9() except (TypeError, ValueError): raise ValueError('DS9 is not accessible.') from None # reset files and regions instead self.files = [] self.regions = []
[docs] def quit(self): """Quit DS9.""" self.break_loop = True try: self._run_internal('quit') except Exception: pass