# Licensed under a 3-clause BSD style license - see LICENSE.rst
import os
import re
from astropy import log
from astropy.io.fits.header import Header
from astropy.time import Time
from astropy.utils.data import download_file
import numpy as np
import pandas as pd
from sofia_redux.instruments import exes
from sofia_redux.instruments.exes.utils import \
set_elapsed_time, parse_central_wavenumber
from sofia_redux.toolkit.utilities.fits import hdinsert
from sofia_redux.toolkit.utilities.func import goodfile, robust_bool
__all__ = ['readhdr']
# back up download URL for non-source installs
DATA_URL = 'https://sofia-exes-reference.s3-us-gov-west-1.amazonaws.com/'
[docs]
def readhdr(header, check_header=True,
config_file=None, comments_file=None):
r"""
Read and update an EXES FITS header.
Keywords that must be present in the output header are defined in
exes/data/header/headerdef.dat. Default values and acceptable
ranges are also defined there. For each keyword defined there,
the header is checked for a value. If it is found, the value is
added to the output header.
If checkreq=True, it is checked against the allowed values. If
it is out of range, header_ok will be set to False. If the
value is not found, the default value is added. If it is missing
and required, header_ok will be set to False, but the value
will be updated.
Some additional special values are added to the output header:
* Comments associated with all keywords are read from
exes/data/header/headercomment.dat.
* The full path to the package containing this routine is
stored under the key 'PKGPATH'.
* The full path to the package/data directory is stored
under the key 'DATAPATH'.
* Distortion correction parameters are read from
exes/data/tort/\*.dat and added to the output header.
Some additional parameters are calculated from these and
other header parameters and added to the output header.
* The slit angle is converted from degrees to cm using
the values in exes/data/slitval.dat.
* Filenames for BPM, LINFILE, and DARKFILE are looked up
by date from exes/data/caldefault.dat.
* The current date is added under the key 'DATE'.
Parameters
----------
header : fits.Header
The FITS header to check and update.
check_header : bool, optional
If True, check against header requirements and return success or
failure status.
config_file : str, optional
Path to the headerdef.dat header defaults configuration file
comments_file : str, optional
Path to the headercomment.dat header keyword comments file
Returns
-------
updated_header : fits.Header
The updated FITS header
success : bool, optional
True if header checks succeeded; False if they did not.
Raises
------
ValueError if header is incorrectly formatted.
"""
if not isinstance(header, Header):
raise ValueError(f"Header is not {Header}")
header_config = _get_header_configuration(
config_file=config_file, comments_file=comments_file)
new_header = header.copy()
success = True
for keyword in header_config.index:
value = header.get(keyword)
default_value = _get_default(keyword, header_config)
comment = header_config.loc[keyword].comment
# try to convert to expected type
if value is not None:
try:
value = header_config.loc[keyword].type(value)
except (ValueError, TypeError):
# leave it if it fails
pass
if check_header:
success &= _checkreq(keyword, value, header_config)
if value is not None:
hdinsert(new_header, keyword, value, comment=comment)
elif default_value is not None:
hdinsert(new_header, keyword, default_value, comment=comment)
_standardize_values(new_header)
_set_decimal_date(new_header)
_process_instrument_configuration(new_header)
_process_slit_configuration(new_header)
_process_tort_configuration(new_header)
_add_configuration_files(new_header)
if check_header:
return new_header, success
else:
return new_header
def _get_configuration_file(filename, subdir=None, check=True):
"""Get a configuration file from exes path."""
datadir = os.path.join(os.path.dirname(exes.__file__), 'data')
if subdir is not None:
datadir = os.path.join(datadir, subdir)
if check and not os.path.isdir(datadir):
raise ValueError(f"{datadir} does not exist")
datafile = os.path.join(datadir, filename)
if check and not goodfile(datafile):
raise ValueError(f"Could not read: {datafile}")
return datafile
def _get_header_configuration(config_file=None, comments_file=None):
"""Get header configuration from a stored definition."""
if config_file is None:
config_file = _get_configuration_file('headerdef.dat', subdir='header')
else:
if not goodfile(config_file, verbose=True):
raise ValueError(
f"Header default file does not exist: {config_file}")
if comments_file is None:
comments_file = _get_configuration_file(
'headercomment.dat', subdir='header')
else:
if not goodfile(comments_file, verbose=True):
raise ValueError(f'header comments file does not exist: '
f'{comments_file}')
columns = ['required', 'default', 'type', 'min', 'max', 'enum']
types = {'int': int, 'integer': int, 'float': float, 'bool': robust_bool,
'str': str, 'string': str, 'complex': complex, 'double': float}
converters = {'required': robust_bool,
'default': lambda x: None if x == '.' else x,
'type': lambda x: types.get(x),
'min': lambda x: float(x) if x != '.' else None,
'max': lambda x: float(x) if x != '.' else None,
'enum': lambda x: x.split('|') if x != '.' else []}
df = pd.read_csv(config_file, delim_whitespace=True, comment='#',
index_col=0, names=columns, converters=converters)
df.index = df.index.str.upper().str.strip()
converters = {'comment': str.strip, 'keyword': str.strip}
comments = pd.read_csv(comments_file, comment='#',
names=['keyword', 'comment'],
converters=converters,
sep='^([^,]+),',
engine='python')
comments.set_index('keyword', inplace=True)
comments.index = comments.index.str.upper()
return df.join(comments)
def _get_default(keyword, table):
"""
Get a keyword default value from the keyword defaults table.
Parameters
----------
keyword : str
table : pandas.DataFrame
Returns
-------
value : str, float, int, or bool
The default value.
"""
if keyword not in table.index:
log.error(f"Keyword {keyword} not found in keywords default table")
return
dtype = table.loc[keyword]['type']
if dtype is bool:
dtype = robust_bool
value = table.loc[keyword]['default']
# handle missing default, for keywords that should be
# tracked but not added if missing
if value is None:
return None
try:
value = dtype(value)
except (ValueError, TypeError, AttributeError):
value = None
# handle poorly specified default
if value is None:
if dtype is robust_bool: # pragma: no cover
# shouldn't be reachable, for robust bool type
value = False
elif dtype in [float, int]:
value = dtype(-9999)
elif dtype is str: # pragma: no cover
# shouldn't be reachable, for str type
value = 'UNKNOWN'
return value
def _checkreq(key, value, table):
"""
Check whether the value of a key fits requirements.
Parameters
----------
key : str
Key name
value : str, float, int, or bool
Value to test
table : pandas.DataFrame
Keyword defaults table
Returns
-------
bool
True if value fits the requirements, False otherwise. If the
keyword is not found in the keyword defaults table or is not
required, then None will be returned.
"""
# special check for UTC 0 date (a common FIFI-LS glitch)
if key == 'DATE-OBS':
try:
mjd = Time(value).mjd
except (ValueError, AttributeError, TypeError):
mjd = 40587
if int(mjd) == 40587:
log.error(f"Required keyword DATE-OBS has wrong value ({value})")
return False
if key not in table.index: # If there are no rules, allow it to go through
log.warning(f"{key} keyword not found in keyword defaults table")
return True
row = table.loc[key]
if not row['required']:
return True
if value is None:
log.error(f'Required keyword {key} has missing value')
return False
rtype = row['type']
rtype = bool if rtype is robust_bool else rtype
if not isinstance(value, rtype):
# allow ints for float values
if not (rtype is float and isinstance(value, int)):
log.error(
f"Required keyword {key} has wrong type (value: {value}). "
f"Should be {row['type']}")
return False
# Check enum
if len(row['enum']) > 0:
dtype = row['type']
dtype = robust_bool if dtype is bool else dtype
check_enum = [dtype(x) for x in row['enum']]
if value not in check_enum:
log.error(f"Required keyword {key} has wrong value {value}. "
f"Should be within [{','.join(row['enum'])}]")
return False
# Check min
if not np.isnan(row['min']) and value < row['min']:
log.error(f"Required keyword {key} has wrong value {value}. "
f"Should be >= {row['min']}")
return False
# Check max
if not np.isnan(row['max']) and value > row['max']:
log.error(f"Required keyword {key} has wrong value {value}. "
f"Should be <= {row['min']}")
return False
return True
def _set_decimal_date(header):
"""
Set the FDATE key in the header to a decimal value representing date.
Parameters
----------
header : fits.Header
Header to update.
"""
# Decimal date
date = header.get('DATE-OBS', 'UNKNOWN')
fdate = header['FDATE']
try:
time = Time(date, scale='utc', format='isot').isot
except ValueError:
log.warning(f'DATE-OBS {date} not understood, using date={fdate}')
return
pattern = re.compile(r'(\d{4})-(\d{2})-(\d{2})T(\d{2})')
regex = pattern.match(time)
if regex is None: # pragma: no cover
# shouldn't be reachable, if Time was created correctly
log.warning(f'DATE-OBS {date} not understood, using date={fdate}')
return
y, m, d, h = map(int, regex.groups())
if y > 100:
y -= 2000
fdate = round(y + (m / 1e2) + (d / 1e4) + (h / 1e6), 6)
header['FDATE'] = fdate
def _standardize_values(header):
"""
Reformat certain values.
Header is updated in-place.
Parameters
----------
header : fits.Header
Header to update.
"""
package_path = os.path.dirname(exes.__file__)
header['PKGPATH'] = package_path
header['DATAPATH'] = os.path.join(package_path, 'data')
header['ASSC_AOR'] = str(header.get('AOR_ID', 'UNKNOWN'))
header['CARDMODE'] = str(header.get('CARDMODE', 'UNKNOWN')).upper().strip()
header['INSTMODE'] = str(header.get('INSTMODE', 'UNKNOWN')).upper().strip()
float_keys = ['EXPTIME', 'WAVENO0', 'XDDGR', 'GRATANGL',
'XDDELTA', 'SDEG', 'HRFL0', 'XDFL0',
'EFL0', 'PIXELWD', 'SLITVAL', 'SLITWID',
'HRG', 'WNO0', 'HRDGR', 'HRR', 'FDATE']
for key in float_keys:
if key in header:
try:
header[key] = float(header[key])
except Exception as err:
header[key] = -9999.0
log.warning(f"Unable to convert {key} key to float: {err}")
else:
header[key] = -9999.0
instcfg = header['INSTCFG'].upper().strip()
if instcfg in ['HI-MED', 'HIGH-MED', 'HIMED']:
header['INSTCFG'] = 'HIGH_MED'
elif instcfg in ['HI-LO', 'HIGH-LOW', 'HILO']:
header['INSTCFG'] = 'HIGH_LOW'
elif instcfg in ['MED']:
header['INSTCFG'] = 'MEDIUM'
elif instcfg in ['LO']:
header['INSTCFG'] = 'LOW'
elif instcfg in ['CAM', 'PUP']:
header['INSTCFG'] = 'CAMERA'
else:
header['INSTCFG'] = instcfg
nexp = int(header.get('NEXP', -9999))
header['NEXP'] = 1 if nexp == -9999 else nexp
# For SOFIA
header['EFL0'] = 3000.0
# Check for pinhole mode
header['PINHOLE'] = False # not used for EXES yet
# Add the current date/time
header['DATE'] = Time.now().isot[:-4]
# Set the elapsed time within the file
set_elapsed_time(header)
# If raw filename has not yet been added, add it now
if 'RAWFNAME' not in header:
header['RAWFNAME'] = (str(header.get('FILENAME', 'UNKNOWN')),
'Raw file name')
def _process_instrument_configuration(header):
"""
Update header based on instrument configuration.
Header is updated in-place.
Parameters
----------
header : fits.Header
"""
instcfg = header['INSTCFG']
if instcfg in ['MEDIUM', 'HIGH_MED', 'LOW', 'HIGH_LOW']:
header['GRATANGL'] = header['ECHELLE']
if 'LOW' in instcfg:
header['XDDGR'] = header['XDLRDGR'] # LOW
else:
header['XDDGR'] = header['XDMRDGR'] # MEDIUM
if instcfg != 'CAMERA':
w0 = parse_central_wavenumber(header)
xddgr = float(header['XDDGR'])
gangle = float(header['GRATANGL'])
iorder = int(np.round(2 * xddgr * np.sin(np.radians(gangle)) * w0))
if instcfg in ['MEDIUM', 'HIGH_MED']:
sinang = iorder / (2 * xddgr * w0)
theta = np.arcsin(sinang)
else:
theta = np.radians(gangle)
header['XDR'] = np.tan(theta + float(header['XDDELTA']))
if instcfg == 'HIGH_LOW':
# XDR seems underestimated by 2 for high-low mode
header['XDR'] /= 2
# assign a rough resolution if it is missing entirely
# or has a bad value
res = header.get('RESOLUN', -9999)
if res < 500:
log.warning(f'RESOLUN has incorrect value: {res}')
if 'HIGH' in instcfg:
header['RESOLUN'] = 75000
elif instcfg == 'LOW':
header['RESOLUN'] = 2000
else:
header['RESOLUN'] = 10000
log.warning(f"Assigning default value: {header['RESOLUN']}")
def _process_slit_configuration(header):
"""
Update the SLITVAL keyword based on configuration and header.
Header is updated in-place.
Parameters
----------
header : fits.Header
"""
slit_file = os.path.join(
os.path.dirname(exes.__file__), 'data', 'slit', 'slitval.dat')
if not goodfile(slit_file, verbose=True):
raise ValueError(f"Could not read slit configuration "
f"file: {slit_file}")
columns = ['date', 'limit', 'div1', 'div2', 'c1', 'c2']
columns.extend([f'v{i}' for i in range(24)])
df = pd.read_csv(slit_file, delim_whitespace=True, comment='#',
names=columns, dtype=float)
row = df[df['date'] > header['fdate']].iloc[0]
slit = float(header['SDEG'])
if slit < row['limit']:
c = row['c1']
m = row['div1']
else:
c = row['c2']
m = row['div2']
islit = int(slit / m) + int(c) - 1
slitval = 0
if 0 <= islit < 24:
slitval = row[f'v{islit}']
if slitval == 0:
log.warning("Slit angle out of range")
slitval = 0.01
header['SLITVAL'] = slitval
def _process_tort_configuration(header):
"""
Set tort parameters from central wave number and date.
Header is updated in-place.
Parameters
----------
header : fits.Header
"""
tort_directory = os.path.join(
os.path.dirname(exes.__file__), 'data', 'tort')
tort_name = header['INSTCFG'].replace('_', '').lower()
tort_file = os.path.join(tort_directory, 'tortparm_' + tort_name + '.dat')
if not goodfile(tort_file, verbose=True):
log.warning(f"Cannot read tort file: {tort_file}")
log.warning('Using low mode parameters.')
tort_file = os.path.join(tort_directory, 'tortparm_low.dat')
columns = ['date', 'hrfl0', 'xdfl0', 'slitrot',
'krot', 'brl', 'x0brl', 'y0brl', 'hrr', 'detrot']
df = pd.read_csv(tort_file, delim_whitespace=True, comment='#',
names=columns, dtype=float)
row = df[df['date'] > header['fdate']].iloc[0]
log.info(f"Using tort parameters from: {tort_file} {row['date']}")
header['BRL'] = row['brl']
header['X0BRL'] = row['x0brl']
header['Y0BRL'] = row['y0brl']
# Only use default values if not already set for the following keys
override_keys = ['KROT', 'SLITROT', 'HRFL0', 'XDFL0', 'HRR', 'DETROT']
for key in override_keys:
if np.isclose(header[key], -9999):
header[key] = row[key.lower()]
hrfl0 = float(header['HRFL0'])
xdfl0 = float(header['XDFL0'])
efl0 = float(header['EFL0'])
slitval = float(header['SLITVAL'])
hrg = float(header['HRG'])
# This value is a holdover from the TEXES pipeline,
# which used a focal reducer. EXES doesn't have one,
# but the calculation is preserved here for historical
# reasons.
fred = 1.0
header['HRFL'] = hrfl0 / fred
header['XDFL'] = xdfl0 / fred
header['EFL'] = efl0 / fred
header['HRDGR'] = 0.3 * 2.54 * 0.996 * np.cos(hrg)
# preferentially use slit width from estimated header value
# instead of recalculating:
slitwid = header.get('SLIT_AS', -9999)
if slitwid == -9999:
slitwid = slitval / (efl0 * 4.848e-06)
header['SLITWID'] = slitwid
# set platescale value by mode
instcfg = header['INSTCFG']
pltscale = 0.201
if 'HIGH' in instcfg:
# input angle
alpha = np.radians(header['ECHELLE'])
# difference between input and output beams: 5.43 degrees
delta = np.radians(5.43)
# output angle
beta = alpha + delta
# corrected plate scale
header['PLTSCALE'] = pltscale * np.cos(beta) / np.cos(alpha)
else:
header['PLTSCALE'] = pltscale
# set solid angle per pixel from plate scale and slit width
header['OMEGAP'] = header['PLTSCALE'] * slitwid * (4.848e-06) ** 2
def _add_configuration_files(header):
"""
Add file paths to configuration files.
Header is updated in-place.
Parameters
----------
header : fits.Header
"""
datapath = os.path.join(os.path.dirname(exes.__file__), 'data')
default_file = os.path.join(datapath, 'caldefault.dat')
if not goodfile(default_file, verbose=True):
raise ValueError(f"Could not read default file: {default_file}")
columns = ['date', 'bpmfile', 'darkfile', 'linfile']
df = pd.read_csv(default_file, delim_whitespace=True,
comment='#', names=columns)
df['date'] = df['date'].apply(float)
row = df[df['date'] > header['fdate']].iloc[0]
bpmfile = os.path.join(datapath, 'bpm', row['bpmfile'])
if goodfile(bpmfile, verbose=True):
# standard cal path for source distribution
header['BPM'] = bpmfile
else:
# retrieve remotely if needed
header['BPM'] = _download_cache_file(row['bpmfile'])
linfile = os.path.join(datapath, 'lincoeff', row['linfile'])
if goodfile(linfile, verbose=True):
header['LINFILE'] = linfile
else:
header['LINFILE'] = _download_cache_file(row['linfile'])
darkfile = os.path.join(datapath, 'dark', row['darkfile'])
if goodfile(darkfile, verbose=True):
header['DRKFILE'] = darkfile
else:
header['DRKFILE'] = _download_cache_file(row['darkfile'])
def _download_cache_file(filename):
basename = os.path.basename(filename)
url = f'{DATA_URL}{basename}'
try:
cache_file = download_file(url, cache=True, pkgname='sofia_redux')
except (OSError, KeyError):
# return basename only if file can't be downloaded;
# pipeline will issue clearer errors later
cache_file = basename
log.warning(f'File {cache_file} could not be downloaded from {url}')
return cache_file