Source code for pysme.echelle

# -*- coding: utf-8 -*-
"""
Contains functions to read and modify echelle structures, just as in reduce

Mostly for compatibility reasons
"""

import logging

import astropy.io.fits as fits
import numpy as np
import scipy.constants

logger = logging.getLogger(__name__)


[docs]def calc_2dpolynomial(solution2d): """Expand a 2d polynomial, where the data is given in a REDUCE make_wave format Note that the coefficients are for order/100 and column/1000 respectively, where the order is counted from order base up Parameters ---------- solution2d : array data in a REDUCE make_wave format: 0: version 1: number of columns 2: number of orders 3: order base, i.e. 0th order number 4-6: empty 7: number of cross coefficients 8: number of column only coefficients 9: number of order only coefficients 10: coefficient - constant 11-x: column coefficients x-y : order coefficients z- : cross coefficients (xy, xy2, x2y, x2y2, xy3, x3y), with x = orders, y = columns Returns ------- poly : array[nord, ncol] expanded polynomial """ # make wave style 2d fit ncol = int(solution2d[1]) nord = int(solution2d[2]) order_base = int(solution2d[3]) deg_cross, deg_column, deg_order = ( int(solution2d[7]), int(solution2d[8]), int(solution2d[9]), ) coeff_in = solution2d[10:] coeff = np.zeros((deg_order + 1, deg_column + 1)) coeff[0, 0] = coeff_in[0] coeff[0, 1:] = coeff_in[1 : 1 + deg_column] coeff[1:, 0] = coeff_in[1 + deg_column : 1 + deg_column + deg_order] coeff[1, 1] = coeff_in[deg_column + deg_order + 1] coeff[1, 2] = coeff_in[deg_column + deg_order + 2] coeff[2, 1] = coeff_in[deg_column + deg_order + 3] coeff[2, 2] = coeff_in[deg_column + deg_order + 4] if deg_cross == 6: coeff[1, 3] = coeff_in[deg_column + deg_order + 5] coeff[3, 1] = coeff_in[deg_column + deg_order + 6] x = np.arange(order_base, order_base + nord, dtype=float) y = np.arange(ncol, dtype=float) poly = np.polynomial.polynomial.polygrid2d(x / 100, y / 1000, coeff) / x[:, None] return poly
[docs]def calc_1dpolynomials(ncol, poly): """Expand a set of 1d polynomials (one per order) seperately Parameters ---------- ncol : int number of columns poly : array[nord, degree] polynomial coefficients Returns ------- poly : array[nord, ncol] expanded polynomials """ nord = poly.shape[0] x = np.arange(ncol) result = np.zeros((nord, ncol)) for i, coef in enumerate(poly): result[i] = np.polyval(coef, x) return result
[docs]def expand_polynomial(ncol, poly): """Checks if and how to expand data poly, then expands the data if necessary Parameters ---------- ncol : int number of columns in the image poly : array[nord, ...] polynomial coefficients to expand, or already expanded data Returns ------- poly : array[nord, ncol] expanded data """ if poly.ndim == 1: poly = calc_2dpolynomial(poly) elif poly.shape[1] < 20: poly = calc_1dpolynomials(ncol, poly) return poly
[docs]def read( fname, extension=1, raw=False, continuum_normalization=True, barycentric_correction=True, radial_velociy_correction=True, ): """ Read data from an echelle file Expand wavelength and continuum polynomials Apply barycentric/radial velocity correction Apply continuum normalization Will load any fields in the binary table, however special attention is given only to specific names: "SPEC" : Spectrum "SIG" : Sigma, i.e. (absolute) uncertainty "CONT" : Continuum "WAVE" : Wavelength solution "COLUMNS" : Column range Parameters ---------- fname : str filename to load extension : int, optional fits extension of the data within the file (default: 1) raw : bool, optional if true apply no corrections to the data (default: False) continuum_normalization : bool, optional apply continuum normalization (default: True) barycentric_correction : bool, optional apply barycentric correction (default: True) radial_velociy_correction : bool, optional apply radial velocity correction (default: True) Returns ------- ech : obj Echelle structure, with data contained in attributes """ hdu = fits.open(fname) header = hdu[0].header data = hdu[extension].data ech = lambda: None # placeholder ech.filename = fname ech.head = header for column in data.dtype.names: setattr(ech, column.lower(), data[column][0]) if column == "SPEC": nord, ncol = data[column][0].shape if not raw: if hasattr(ech, "spec"): ech.orders = np.arange(nord) # Wavelength if hasattr(ech, "wave"): ech.wave = expand_polynomial(ncol, ech.wave) # Correct for radial velocity and barycentric correction velocity_correction = 0 if barycentric_correction: velocity_correction -= header.get("barycorr", 0) header["barycorr"] = 0 if radial_velociy_correction: velocity_correction += header.get("radvel", 0) header["radvel"] = 0 speed_of_light = scipy.constants.speed_of_light * 1e-3 ech.wave *= 1 + velocity_correction / speed_of_light if hasattr(ech, "cont"): ech.cont = expand_polynomial(ncol, ech.cont) # Create Mask, based on column range if hasattr(ech, "columns"): ech.mask = np.full((nord, ncol), False) for iord in range(nord): ech.mask[iord, : ech.columns[iord, 0]] = True ech.mask[iord, ech.columns[iord, 1] :] = True if hasattr(ech, "spec"): ech.spec = np.ma.masked_array(ech.spec, mask=ech.mask) if hasattr(ech, "sig"): ech.sig = np.ma.masked_array(ech.sig, mask=ech.mask) if hasattr(ech, "cont"): ech.cont = np.ma.masked_array(ech.cont, mask=ech.mask) if hasattr(ech, "wave"): ech.wave = np.ma.masked_array(ech.wave, mask=ech.mask) # Apply continuum normalization if hasattr(ech, "cont") and continuum_normalization: if hasattr(ech, "spec"): ech.spec /= ech.cont if hasattr(ech, "sig"): ech.sig /= ech.cont return ech
[docs]def save(fname, header, **kwargs): """Save data in an Echelle fits, i.e. a fits file with a Binary Table in Extension 1 The data is passed in kwargs, with the name of the binary table column as the key Floating point data is saved as float32 (E), Integer data as int16 (I) Parameters ---------- fname : str filename header : fits.header FITS header **kwargs : array[] data to be saved in the file """ primary = fits.PrimaryHDU(header=header) columns = [] for key, value in kwargs.items(): arr = value.flatten()[None, :] if issubclass(arr.dtype.type, np.floating): arr = arr.astype(np.float32) dtype = "E" elif issubclass(arr.dtype.type, np.integer): arr = arr.astype(np.int16) dtype = "I" if np.ma.is_masked(arr): arr = np.ma.getdata(arr) form = "%i%s" % (value.size, dtype) dim = str(value.shape[::-1]) columns += [fits.Column(name=key.upper(), array=arr, format=form, dim=dim)] table = fits.BinTableHDU.from_columns(columns) hdulist = fits.HDUList(hdus=[primary, table]) hdulist.writeto(fname, overwrite=True)