# -*- coding: utf-8 -*-
import logging
import os.path
import platform
import sys
from datetime import datetime as dt
from enum import Enum, IntEnum, IntFlag
import numpy as np
from scipy.constants import speed_of_light
from scipy.io import readsav
from . import __file_ending__, __version__, echelle, persistence
from .abund import Abund
from .abund import elements as abund_elem
from .atmosphere.atmosphere import Atmosphere
from .data_structure import *
from .iliffe_vector import Iliffe_vector
from .linelist.linelist import LineList
from .nlte import NLTE
logger = logging.getLogger(__name__)
[docs]
class MASK_VALUES(IntFlag):
"""
Mask value specifier used in mob
Values can be combined to mark a point as e.g. line and vrad
If cont or vrad are not used it will fallback to the line mask
"""
BAD = 0
LINE = 1
CONT = 2
VRAD = 4
[docs]
class CONT_SCALE(Enum):
NONE = "none"
FIX = "fix"
LINEAR = "linear"
QUADRATIC = "quadratic"
CUBIC = "cubic"
[docs]
class CONT_OPTIONS(Enum):
MASK = "mask"
MATCH = "match"
MATCH_MASK = "match+mask"
MATCHLINES = "matchlines"
MATCHLINES_MASK = "matchlines+mask"
SPLINE = "spline"
SPLINE_MASK = "spline_mask"
MCMC = "mcmc"
[docs]
class VRAD_OPTIONS(Enum):
NONE = "none"
FIX = "fix"
EACH = "each"
WHOLE = "whole"
[docs]
@CollectionFactory
class Parameters(Collection):
# fmt: off
_fields = Collection._fields + [
("teff", 5770, asfloat, this, "float: effective temperature in Kelvin"),
("logg", 4.0, asfloat, this, "float: surface gravity in log10(cgs)"),
("abund", Abund.solar(), this, this, "Abund: elemental abundances"),
("vmic", 0, absolute, this, "float: micro turbulence in km/s"),
("vmac", 0, absolute, this, "float: macro turbulence in km/s"),
("vsini", 0, absolute, this, "float: projected rotational velocity in km/s"),
]
# fmt: on
def __init__(self, **kwargs):
monh = kwargs.pop("monh", kwargs.pop("feh", 0))
abund = kwargs.pop("abund", "solar")
if "grav" in kwargs.keys() and "logg" not in kwargs.keys():
kwargs["logg"] = kwargs.pop("grav")
super().__init__(**kwargs)
self.abund = Abund(monh=monh, pattern=abund, type="sme")
@property
def _abund(self):
return self.__abund
@_abund.setter
def _abund(self, value):
if isinstance(value, Abund):
self.__abund = value
else:
logger.warning(
"Abundance set using just a pattern, assuming that "
"it has format %s. "
"If that is incorrect, try changing the format first.",
self.__abund.type,
)
self.__abund = Abund(monh=self.monh, pattern=value, type=self.__abund.type)
@property
def monh(self):
"""float: metallicity in log scale relative to the base abundances"""
return self.abund.monh
@monh.setter
def monh(self, value):
self.abund.monh = value
[docs]
def citation(self, format="string"):
return self.abund.citation()
[docs]
@CollectionFactory
class Version(Collection):
# fmt: off
_fields = Collection._fields + [
("arch", "", asstr, this, "str: system architecture"),
("os", "", asstr, this, "str: operating system"),
("os_family", "", asstr, this, "str: operating system family"),
("os_name", "", asstr, this, "str: os name"),
("release", "", asstr, this, "str: python version"),
("build_date", "", asstr, this, "str: build date of the Python version used"),
("memory_bits", 64, astype(int), this, "int: Platform architecture bit size (usually 32 or 64)"),
("host", "", asstr, this, "str: name of the machine that created the SME Structure")
]
# fmt: on
def __init__(self, **kwargs):
super().__init__(**kwargs)
[docs]
def update(self):
"""Update version info with current machine data"""
self.arch = platform.machine()
self.os = sys.platform
self.os_family = platform.system()
self.os_name = platform.version()
self.release = platform.python_version()
self.build_date = platform.python_build()[1]
self.memory_bits = int(platform.architecture()[0][:2])
self.host = platform.node()
[docs]
@CollectionFactory
class Fitresults(Collection):
# fmt: off
_fields = Collection._fields + [
("iterations", None, astype(int, allow_None=True), this, "int: maximum number of iterations in the solver"),
("chisq", None, this, this, "float: reduced chi-square of the solution"),
("parameters", None, this, this, "list: parameter names"),
("values", None, array(None, float), this, "array: best fit values for the fit parameters"),
("uncertainties", None, array(None, float), this, "array of size(nfree,): uncertainties of the free parameters bases on SME statistics"),
("covariance", None, array(None, float), this, "array of size (nfree, nfree): covariance matrix"),
("gradient", None, array(None, float), this, "array of size (nfree,): final gradients of the free parameters on the cost function"),
("derivative", None, array(None, float), this, "array of size (npoints, nfree): final Jacobian of each point and each parameter"),
("residuals", None, array(None, float), this, "array of size (npoints,): final residuals of the fit"),
("fit_uncertainties", None, this, this, "array: uncertainties based solely on the least_squares fit")
]
# fmt: on
[docs]
def clear(self):
"""Reset all values to None"""
for name in self._names:
default = [f[1] for f in self._fields if f[0] == name][0]
setattr(self, name, default)
[docs]
@CollectionFactory
class ProfileNLTE(Collection):
# fmt: off
_fields = Collection._fields + [
("enabled", False, asbool, this,
"bool: Whether to apply profile-based NLTE corrections during synthesis"),
("element", None, astype(str, allow_None=True), this,
"str or None: Element whose profile-based NLTE provider should be used"),
("provider", None, astype(str, allow_None=True), this,
"str or None: Explicit profile-NLTE provider name; None uses the default provider for the element"),
("summary", {}, this, this,
"dict: Runtime summary describing which profile-NLTE provider was requested and applied"),
]
# fmt: on
[docs]
@CollectionFactory
class SME_Structure(Parameters):
# fmt: off
_fields = Parameters._fields + [
("id", dt.now(), asstr, this, "str: DateTime when this structure was created"),
("meta", {}, this, this, "dict: Arbitrary extra information"),
("version", __version__, this, this, "str: PySME version used to create this structure"),
("vrad_flag", "none", lowercase(oneof(-2, -1, 0, "none", "each", "whole", "fix")), this,
"""str: flag that determines how the radial velocity is determined
allowed values are:
* "none": No radial velocity correction
* "each": Determine radial velocity for each segment individually
* "whole": Determine one radial velocity for the whole spectrum
"""),
("vrad", 0, array(None, float), this, "array of size (nseg,): radial velocity of each segment in km/s"),
("vrad_bounds", (-500, 500), array(2, float), this, "float: radial velocity limits in km/s"),
("vrad_loss", "soft_l1", asstr, this, "str: loss function for the radial velocity fit"),
("vrad_method", "dogbox", asstr, this, "str: least squares method used in the radial velocity fit"),
("vrad_jac", "3-point", asstr, this, "str: jacobian approximation used in the radial velocity fit"),
("vrad_xscale", "jac", this, this, "array or 'jac': scale of the vrad parameter"),
("vrad_ftol", 1e-8, asfloat, this, "float: tolerance for the radial velocity least squares fit"),
("vrad_xtol", 1e-8, asfloat, this, "float: tolerance for the radial velocity least squares fit"),
("vrad_gtol", 1e-8, asfloat, this, "float: tolerance for the radial velocity least squares fit"),
("cscale_flag", "none", lowercase(oneof("none", "fix", "constant", "linear", "quadratic", "cubic", "quintic", "quantic", astype=int)), this,
"""str: Flag that describes how to correct for the continuum
allowed values are:
* "none": No continuum correction
* "fix": Use whatever continuum scale has been set, but don't change it
* "constant": Zeroth order polynomial, i.e. scale everything by a factor
* "linear": First order polynomial, i.e. approximate continuum by a straight line
* "quadratic": Second order polynomial, i.e. approximate continuum by a quadratic polynomial
"""),
("cscale_type", "match+mask", lowercase(oneof("mcmc", "mask", "match", "match+mask", "matchlines", "matchlines+mask", "spline", "spline+mask")), this,
"""str: Flag that determines the algorithm to determine the continuum
This is used in combination with cscale_flag, which determines the degree of the fit, if any.
allowed values are:
* "mask": Fit a polynomial to the pixels marked as continuum in the mask
* "match": Fit a multiplicative correction so the synthetic spectrum matches the observation
* "match+mask": Same as "match", but only using continuum-mask pixels
* "matchlines": Match mostly on line pixels rather than continuum pixels
* "matchlines+mask": Same as "matchlines", with continuum-mask restrictions
* "spline": Match using a spline instead of a low-order polynomial
* "spline+mask": Same as "spline", but only using continuum-mask pixels
* "mcmc": Joint MCMC continuum/radial-velocity fit
"""),
("cscale", None, this, this,
"""array of size (nseg, ndegree): Continumm polynomial coefficients for each wavelength segment
The x coordinates of each polynomial are chosen so that x = 0, at the first wavelength point,
i.e. x is shifted by wave[segment][0]
"""),
("cscale_bounds", (-np.inf, np.inf), this, this, "array(2, cscale_degree): bounds for the continuum parameters"),
("cscale_loss", "soft_l1", asstr, this, "str: loss function for the continuum fit"),
("cscale_method", "dogbox", asstr, this, "str: least squares method used in the continuum fit"),
("cscale_jac", "3-point", asstr, this, "str: jacobian approximation used in the continuum fit"),
("cscale_xscale", "jac", this, this, "array of 'jac', Scale of each continuum parameter"),
("cscale_ftol", 1e-8, asfloat, this, "float: tolerance for the continuum least squares fit"),
("cscale_xtol", 1e-8, asfloat, this, "float: tolerance for the continuum least squares fit"),
("cscale_gtol", 1e-8, asfloat, this, "float: tolerance for the continuum least squares fit"),
("normalize_by_continuum", True, asbool, this,
"bool: Whether to normalize the synthetic spectrum by the synthetic continuum spectrum or not"),
("specific_intensities_only", False, asbool, this,
"bool: Whether to keep the specific intensities or integrate them together"),
("gam6", 1, asfloat, this, "float: van der Waals scaling factor"),
("h2broad", True, asbool, this, "bool: Whether to use H2 broadening or not"),
("accwi", 3e-3, asfloat, this,
"float: minimum accuracy for linear spectrum interpolation vs. wavelength."),
("accrt", 1e-4, asfloat, this,
"float: minimum accuracy for synthethized spectrum at wavelength grid points in sme.wint."),
("leastsquares_method", "dogbox", asstr, this, "str: leastsquares method to use, see scipy least_squares for details, default: 'dogbox'."),
("leastsquares_loss", "linear", asstr, this, "str: leastsquares loss to use, see scipy least_squares for details, default: 'linear'"),
("leastsquares_xscale", 1.0, this, this, "str, arraylike: leastsquare x-scale to use, see scipy least_squares for details, default: 1"),
("leastsquares_jac", "2-point", asstr, this, "str: leastsquares jacobian calculation, see scipy least_squares for details, default: '2-point'"),
("leastsquares_ftol", 1e-3, asfloat, this, "float: minimum accuracy of the best fit cost"),
("leastsquares_xtol", 1e-6, asfloat, this, "float: minimum accuracy of the parameters in the fitting procedure"),
("leastsquares_gtol", 1e-4, asfloat, this, "float: minimum accuracy of the gradient of the least squares fit"),
("iptype", None, lowercase(oneof(None, "gauss", "sinc", "table")), this, "str: instrumental broadening type"),
("ipres", 0, array(None, float), this, "float, array: Instrumental resolution for instrumental broadening"),
("ip_x", None, this, this, "array: Instrumental broadening table in x direction"),
("ip_y", None, this, this, "array: Instrumental broadening table in y direction"),
("mu", np.sqrt(0.5 * (2 * np.arange(7) + 1) / 7), array(None, float), this,
"""array of size (nmu,): Mu values to calculate radiative transfer at
mu values describe the distance from the center of the stellar disk to the edge
with mu = cos(theta), where theta is the angle of the observation,
i.e. mu = 1 at the center of the disk and 0 at the edge
"""),
("wran", None, this, this,
"array of size (nseg, 2): beginning and end wavelength points of each segment"),
("wint", None, vector, this,
"Iliffe_vector of shape (nseg, ...): optional wavelength grid passed to SMElib Transf"),
("wave", None, this, this,
"Iliffe_vector of shape (nseg, ...): wavelength"),
("spec", None, vector, this,
"Iliffe_vector of shape (nseg, ...): observed spectrum"),
("uncs", None, vector, this,
"Iliffe_vector of shape (nseg, ...): uncertainties of the observed spectrum"),
("telluric", None, vector, this,
"Illife_vector of shape (nseg, ...): telluric spectrum that is multiplied with synth during the fit"),
("mask", None, vector, this,
"Iliffe_vector of shape (nseg, ...): mask defining good and bad points for the fit"),
("synth", None, vector, this,
"Iliffe_vector of shape (nseg, ...): synthetic spectrum"),
("cont", None, vector, this,
"Iliffe_vector of shape (nseg, ...): continuum intensities"),
("sint", None, vector, this,
"Iliffe_vector of shape (nseg, nmu, ...): specific intensities from Transf"),
("cint", None, vector, this,
"Iliffe_vector of shape (nseg, nmu, ...): continuum specific intensities from Transf"),
("linelist", LineList(), astype(LineList), this, "LineList: spectral line information"),
("fitparameters", [], astype(list, allow_None=True), this, "list: parameters to fit"),
("fitresults", Fitresults(), astype(Fitresults), this, "Fitresults: fit results data"),
("atmo", Atmosphere(), astype(Atmosphere), this, "Atmosphere: model atmosphere data"),
("nlte", NLTE(), astype(NLTE), this, "NLTE: nlte calculation data"),
("profile_nlte", ProfileNLTE(), astype(ProfileNLTE), this,
"ProfileNLTE: profile-based NLTE correction configuration and runtime summary"),
("system_info", Version(), astype(Version), this,
"Version: information about the host system running the calculation for debugging")
]
# fmt: on
def __init__(self, **kwargs):
wind = kwargs.get("wind", None)
self.__dict__["_wave_invalidation_enabled"] = False
atmo = kwargs.pop("atmo", {})
nlte = kwargs.pop("nlte", {})
profile_nlte = kwargs.pop("profile_nlte", {})
idlver = kwargs.pop("idlver", {})
self.wave = None
self.wran = None
super().__init__(**kwargs)
if wind is not None and self.wave is not None:
wind = wind + 1
self.wave = Iliffe_vector(self.wave.ravel(), offsets=wind)
self.spec = kwargs.get("sob", None)
self.uncs = kwargs.get("uob", None)
self.mask = kwargs.get("mob", None)
self.synth = kwargs.get("smod", None)
self.cdr_N_line_chunk = 2000
self.cdr_parallel = True
self.cdr_n_jobs = 10
self.cdr_pysme_out = False
self.strong_depth_thres = 0.001
self.strong_bin_width = 0.2
# Unified line-selection controls (preferred over legacy cdr_* knobs).
self.line_select_method = "internal" # internal | cdr | almax
self.line_select_policy = "auto" # auto | strict
self.line_select_parallel = False
self.line_select_n_jobs = None
self.line_select_chunk_size = 2000
self.line_select_recompute = "if_stale" # if_stale | always | never
self.line_select_stale_thres = {
"teff": 250.0,
"logg": 0.5,
"monh": 0.5,
"vmic": 1.0,
"accrt": 0.0,
}
self.line_select_reuse = "none" # deprecated; non-default values only have limited effect
self.line_select_cdr_bin_width = 0.2
self.line_select_cdr_strength_thres = 0.001
self.line_precompute_database = None
# Legacy alias kept for backward compatibility.
self.line_select_cdr_database = None
self.line_select_almax_threshold = None
self.line_select_almax_use_bins = False
self.line_select_almax_bin_width = 0.2
self.tdnlte_H = False
# self.tdnlte_H_new = False
if isinstance(profile_nlte, dict):
self.profile_nlte = ProfileNLTE(**profile_nlte)
elif isinstance(profile_nlte, ProfileNLTE):
self.profile_nlte = profile_nlte
else:
raise TypeError("profile_nlte must be a dict or ProfileNLTE instance")
self.profile_nlte.summary = {}
self.show_progress_bars = False
self.meta["object"] = kwargs.get("obs_name", "")
try:
self.linelist = LineList(**kwargs)
except (KeyError, AttributeError):
# TODO ignore the warning during loading of data
logger.warning("No or incomplete linelist data present")
# Parse free parameters into one list
pname = kwargs.get("pname", [])
glob_free = kwargs.get("glob_free", [])
if isinstance(glob_free, str):
glob_free = [glob_free]
ab_free = kwargs.get("ab_free", [])
if len(ab_free) != 0:
ab_free = [f"abund {el}" for i, el in zip(ab_free, abund_elem) if i == 1]
fitparameters = np.concatenate((pname, glob_free, ab_free)).astype("U")
#:array of size (nfree): Names of the free parameters
self.fitparameters = np.unique(fitparameters)
self.fitresults = Fitresults(
iterations=kwargs.get("maxiter", None),
chisq=kwargs.get("chisq", 0),
uncertainties=kwargs.get("punc", None),
covariance=kwargs.get("covar", None),
)
self.normalize_by_continuum = kwargs.get("cscale_flag", "") != "fix"
self.system_info = Version(**idlver)
atmo_abund = atmo.pop("abund", kwargs.get("abund", "empty"))
atmo_monh = atmo.pop(
"monh", atmo.pop("feh", kwargs.get("monh", kwargs.get("feh", 0)))
)
self.atmo = Atmosphere(**atmo, abund=atmo_abund, monh=atmo_monh)
self.nlte = NLTE(**nlte)
self.citation_info = r"""
@ARTICLE{2017A&A...597A..16P,
author = {{Piskunov}, Nikolai and {Valenti}, Jeff A.},
title = "{Spectroscopy Made Easy: Evolution}",
journal = {\aap},
keywords = {stars: abundances, radiative transfer, stars: fundamental parameters, stars: atmospheres, techniques: spectroscopic, Astrophysics - Instrumentation and Methods for Astrophysics, Astrophysics - Solar and Stellar Astrophysics},
year = "2017",
month = "Jan",
volume = {597},
eid = {A16},
pages = {A16},
doi = {10.1051/0004-6361/201629124},
archivePrefix = {arXiv},
eprint = {1606.06073},
primaryClass = {astro-ph.IM},
adsurl = {https://ui.adsabs.harvard.edu/abs/2017A&A...597A..16P},
adsnote = {Provided by the SAO/NASA Astrophysics Data System}
}
@ARTICLE{1996A&AS..118..595V,
author = {{Valenti}, J.~A. and {Piskunov}, N.},
title = "{Spectroscopy made easy: A new tool for fitting observations with synthetic spectra.}",
journal = {\aaps},
keywords = {RADIATIVE TRANSFER, METHODS: NUMERICAL, TECHNIQUES: SPECTROSCOPIC, STARS: FUNDAMENTAL PARAMETERS, SUN: FUNDAMENTAL PARAMETERS, ATOMIC DATA},
year = "1996",
month = "Sep",
volume = {118},
pages = {595-603},
adsurl = {https://ui.adsabs.harvard.edu/abs/1996A&AS..118..595V},
adsnote = {Provided by the SAO/NASA Astrophysics Data System}
}
"""
# Apply final conversions from IDL to Python version
if "wave" in self:
self.__convert_cscale__()
self.__dict__["_wave_invalidation_enabled"] = True
def __getitem__(self, key):
assert isinstance(key, str), "Key must be of type string"
key = key.casefold()
if key.startswith("abund "):
element = key[5:].strip()
element = element.capitalize()
return self.abund[element]
if key.startswith("linelist "):
_, idx, field = key[8:].split(" ", 2)
idx = int(idx)
return self.linelist[field][idx]
return super().__getitem__(key)
def __setitem__(self, key, value):
assert isinstance(key, str), "Key must be of type string"
key = key.casefold()
if key.startswith("abund "):
element = key[5:].strip()
element = element.capitalize()
self.abund.set_pattern_abundance(element, value)
elif key.startswith("linelist "):
_, idx, field = key[8:].split(" ", 2)
idx = int(idx)
self.linelist[field][idx] = value
else:
super().__setitem__(key, value)
# Additional constraints on fields
@staticmethod
def _coerce_wave_value(value):
if value is None:
return None
if np.isscalar(value):
raise ValueError("Can not set wavelength from single value")
if isinstance(value, np.ndarray):
value = np.require(value, requirements="W")
if value.ndim == 1:
return Iliffe_vector([value])
return Iliffe_vector(value)
if isinstance(value, (list, tuple)):
return Iliffe_vector(list(value))
if isinstance(value, Iliffe_vector):
return value
if isinstance(value, FlexExtension):
return Iliffe_vector._load(value)
if isinstance(value, np.lib.npyio.NpzFile):
return Iliffe_vector._load_v1(value)
raise TypeError("Input value is of the wrong type")
@staticmethod
def _wave_values_equal(lhs, rhs):
if lhs is rhs:
return True
if lhs is None or rhs is None:
return lhs is rhs
if lhs.nseg != rhs.nseg:
return False
if lhs.shape != rhs.shape:
return False
for i in range(lhs.nseg):
a = np.asarray(lhs[i])
b = np.asarray(rhs[i])
try:
if not np.array_equal(a, b, equal_nan=True):
return False
except TypeError:
if not np.array_equal(a, b):
return False
return True
@staticmethod
def _wran_from_wave(wave, previous_wran=None):
if wave is None:
return None
wran = np.zeros((wave.nseg, 2), dtype=float)
prev = None
if previous_wran is not None:
prev = np.asarray(previous_wran, dtype=float)
for i in range(wave.nseg):
seg = wave[i]
if seg is None or len(seg) == 0:
if prev is not None and prev.ndim == 2 and i < prev.shape[0]:
wran[i] = prev[i]
else:
wran[i] = [np.nan, np.nan]
elif len(seg) == 1:
wran[i] = [seg[0], seg[0]]
else:
wran[i] = [seg[0], seg[-1]]
return wran
def _invalidate_wave_dependent_state(self):
for name in [
"_spec",
"_uncs",
"_mask",
"_synth",
"_cont",
"_telluric",
"_sint",
"_cint",
]:
if name in self.__dict__:
self.__dict__[name] = None
self.__vrad = None
self.__cscale = None
# Uncertainty arrays are created dynamically after synthesis.
if "vrad_unc" in self.__dict__:
self.vrad_unc = None
if "cscale_unc" in self.__dict__:
self.cscale_unc = None
@property
def _wave(self):
return self.__dict__.get("_wave", None)
@_wave.setter
def _wave(self, value):
previous_wran = self.__dict__.get("_SME_Structure__wran", None)
old_wave = self.__dict__.get("_wave", None)
new_wave = self._coerce_wave_value(value)
changed = not self._wave_values_equal(old_wave, new_wave)
self.__dict__["_wave"] = new_wave
self.__wran = self._wran_from_wave(new_wave, previous_wran=previous_wran)
# Only invalidate on reassignment, not during first initialization.
if (
changed
and old_wave is not None
and self.__dict__.get("_wave_invalidation_enabled", True)
):
self._invalidate_wave_dependent_state()
logger.warning(
"Wavelength grid changed; cleared wave-dependent fields "
"(spec/uncs/mask/synth/cont/telluric/sint/cint and rv/continuum fit state)."
)
@property
def _wran(self):
if self.wave is not None:
nseg = self.wave.shape[0]
values = np.zeros((nseg, 2))
for i in range(nseg):
if self.wave[i] is not None and len(self.wave[i]) >= 2:
values[i] = [self.wave[i][0], self.wave[i][-1]]
else:
values[i] = self.__wran[i]
self.__wran = values
return self.__wran
return self.__wran
@_wran.setter
def _wran(self, value):
try:
if self.wave is not None:
logger.warning(
"The wavelength range is overriden by the existing wavelength grid"
)
except:
pass
self.__wran = np.atleast_2d(value) if value is not None else None
@property
def _vrad(self):
"""array of size (nseg,): Radial velocity in km/s for each wavelength region"""
nseg = self.nseg if self.nseg is not None else 1
if self.__vrad is None:
self.__vrad = np.zeros(nseg)
return self.__vrad
if self.vrad_flag == "none":
return np.zeros(nseg)
else:
nseg = self.__vrad.shape[0]
if nseg == self.nseg:
return self.__vrad
rv = np.zeros(self.nseg)
rv[:nseg] = self.__vrad[:nseg]
rv[nseg:] = self.__vrad[-1]
self.__vrad = rv
return self.__vrad
return self.__vrad
@_vrad.setter
def _vrad(self, value):
self.__vrad = np.atleast_1d(value) if value is not None else None
@property
def _vrad_flag(self):
try:
_ = self.__vrad_flag
except AttributeError:
self.__vrad_flag = "none"
return self.__vrad_flag
@_vrad_flag.setter
def _vrad_flag(self, value):
if isinstance(value, (int, np.integer)):
value = {-2: "none", -1: "whole", 0: "each"}[value]
self.__vrad_flag = value
@property
def _cscale(self):
"""array of size (nseg, ndegree): Continumm polynomial coefficients for each wavelength segment
The x coordinates of each polynomial are chosen so that x = 0, at the first wavelength point,
i.e. x is shifted by wave[segment][0]
"""
if self.cscale_type in ["spline", "spline+mask"]:
return self.__cscale
nseg = self.nseg if self.nseg is not None else 1
if self.__cscale is None:
cs = np.zeros((nseg, self.cscale_degree + 1))
cs[:, -1] = 1
self.__cscale = cs
return cs
if self.cscale_flag == "none":
return np.ones((nseg, 1))
ndeg = self.cscale_degree + 1
ns, nd = self.__cscale.shape
if nd == ndeg and ns == nseg:
return self.__cscale
cs = np.zeros((nseg, ndeg))
cs[:, -1] = 1
if nd > ndeg:
cs[:ns, :] = self.__cscale[:ns, -ndeg:]
elif nd < ndeg:
cs[:ns, -nd:] = self.__cscale[:ns, :]
else:
cs[:ns, :] = self.__cscale[:ns, :]
cs[ns:, -1] = 1
# We need to update our internal representation as well
# since we might operate on that array
self.__cscale = cs
return self.__cscale
@_cscale.setter
def _cscale(self, value):
if self.cscale_type in ["spline", "spline+mask"]:
if not isinstance(value, Iliffe_vector):
self.__cscale = vector(self, value)
else:
self.__cscale = value
else:
self.__cscale = np.atleast_2d(value) if value is not None else None
@property
def _cscale_flag(self):
try:
_ = self.__cscale_flag
except AttributeError:
self.__cscale_flag = "none"
return self.__cscale_flag
@_cscale_flag.setter
def _cscale_flag(self, value):
if isinstance(value, (int, np.integer)):
try:
value = {
-3: "none",
-2: "fix",
-1: "fix",
0: "constant",
1: "linear",
2: "quadratic",
3: "cubic",
4: "quintic",
5: "quantic",
}[value]
except KeyError:
value = value
self.__cscale_flag = value
@property
def _mu(self):
return self.__mu
@_mu.setter
def _mu(self, value):
if np.any(value < 0):
raise ValueError("All values must be positive")
if np.any(value > 1):
raise ValueError("All values must be smaller or equal to 1")
# The mu values are expected in decreasing order
# For spherical models
value = np.sort(value)[::-1]
self.__mu = value
@property
def _ipres(self):
return self.__ipres
@_ipres.setter
def _ipres(self, value):
size = np.size(value)
if self.nseg != 0 and size != 1 and size != self.nseg:
raise ValueError(
f"The instrument resolution must have 1 or {self.nseg} elements"
)
self.__ipres = value
# Additional properties
@property
def nseg(self):
"""int: Number of wavelength segments"""
if self.wran is None:
return 0
else:
return len(self.wran)
@property
def nmu(self):
return self.mu.size
@nmu.setter
def nmu(self, value):
self.mu = np.sqrt(0.5 * (2 * np.arange(value) + 1) / value)
@property
def mask_good(self):
if self.mask is None:
return None
return self.mask != MASK_VALUES.BAD
@property
def mask_bad(self):
if self.mask is None:
return None
return self.mask == MASK_VALUES.BAD
@property
def mask_line(self):
if self.mask is None:
return None
return (self.mask & MASK_VALUES.LINE) != 0
@property
def mask_cont(self):
if self.mask is None:
return None
return (self.mask & MASK_VALUES.CONT) != 0
@property
def mask_vrad(self):
if self.mask is None:
return None
return (self.mask & MASK_VALUES.VRAD) != 0
@property
def cscale_degree(self):
"""int: Polynomial degree of the continuum as determined by cscale_flag"""
if self.cscale_type in ["spline", "spline+mask"]:
return self.wave.shape[1]
else:
if self.cscale_flag == "constant":
return 0
if self.cscale_flag == "linear":
return 1
if self.cscale_flag == "quadratic":
return 2
if self.cscale_flag == "cubic":
return 3
if self.cscale_flag == "quintic":
return 4
if self.cscale_flag == "quantic":
return 5
if self.cscale_flag == "fix":
# Use the underying element to avoid a loop
if self.__cscale is not None:
return self.__cscale.shape[1] - 1
else:
return 0
if self.cscale_flag == "none":
return 0
return self.cscale_flag
raise ValueError("This should never happen")
@property
def atomic(self):
"""array of size (nlines, 8): Atomic linelist data, usually passed to the C library
Use sme.linelist instead for other purposes"""
if self.linelist is None:
return None
return self.linelist.atomic
@property
def species(self):
"""array of size (nlines,): Names of the species of each spectral line"""
if self.linelist is None:
return None
return self.linelist.species
# Aliases for outdated names
@property
def accxt(self):
return self.leastsquares_xtol
@accxt.setter
def accxt(self, value):
self.leastsquares_xtol = value
@property
def accft(self):
return self.leastsquares_ftol
@accft.setter
def accft(self, value):
self.leastsquares_ftol = value
@property
def accgt(self):
return self.leastsquares_gtol
@accgt.setter
def accgt(self, value):
self.leastsquares_gtol = value
@property
def vrad_limit(self):
return self.vrad_bounds[0]
@vrad_limit.setter
def vrad_limit(self, value):
self.vrad_bounds = (-value, value)
def __convert_cscale__(self):
"""
Convert IDL SME continuum scale to regular polynomial coefficients
Uses Taylor series approximation, as IDL version used the inverse of the continuum
"""
wave = self.wave
self.cscale = np.require(self.cscale, requirements="W")
if self.cscale_flag == "linear":
for i in range(len(self.cscale)):
c, d = self.cscale[i]
a, b = max(wave[i]), min(wave[i])
c0 = (a - b) * (c - d) / (a * c - b * d) ** 2
c1 = (a - b) / (a * c - b * d)
# Shift zero point to first wavelength of the segment
c1 += c0 * self.spec[i][0]
self.cscale[i] = [c0, c1]
elif self.cscale_flag == "fix":
self.cscale = self.cscale / np.sqrt(2)
elif self.cscale_flag == "constant":
self.cscale = np.sqrt(1 / self.cscale)
[docs]
def import_mask(self, other, keep_bpm=False):
"""
Import the mask of another sme structure and apply it to this one
Conversion is based on the wavelength
Parameters
----------
other : SME_Structure
the sme structure to import the mask from
Returns
-------
self : SME_Structure
this sme structure
"""
if self.mask is None:
self.mask = MASK_VALUES.LINE
c_light = speed_of_light * 1e-3 # speed of light in km/s
wave = other.wave.copy()
for i in range(len(wave)):
rvel = other.vrad[i]
rv_factor = np.sqrt((1 - rvel / c_light) / (1 + rvel / c_light))
wave[i] *= rv_factor
wave = wave.ravel()
line_mask = other.mask_line.ravel()
cont_mask = other.mask_cont.ravel()
for seg in range(self.nseg):
# We simply interpolate between the masks, if most if the new pixel was
# continuum / line mask then it will become that, otherwise bad
rvel = self.vrad[seg]
rv_factor = np.sqrt((1 - rvel / c_light) / (1 + rvel / c_light))
w = self.wave[seg] * rv_factor
cm = np.interp(w, wave, cont_mask) > 0.5
lm = np.interp(w, wave, line_mask) > 0.5
if keep_bpm:
bpm = self.mask_bad[seg]
cm[bpm] = False
lm[bpm] = False
self.mask[seg][cm] |= MASK_VALUES.CONT
self.mask[seg][lm] |= MASK_VALUES.LINE
self.mask[seg][~(cm | lm)] = MASK_VALUES.BAD
return self
[docs]
def citation(self, output="string"):
"""Create a citation string for use in papers, or
other places. The citations are from all components that
contribute to the SME structure. SME and PySME, the linelist,
the abundance, the atmosphere, and the NLTE grids.
The default output is plaintext, but
it is also possible to get bibtex format.
Parameters
----------
output : str, optional
the output format, options are: ["string", "bibtex", "html", "markdown"], by default "string"
Returns
-------
citation : str
citation string in the desired output format
"""
citation = [self.citation_info]
citation += [self.atmo.citation_info]
citation += [self.abund.citation_info]
citation += [self.linelist.citation_info]
citation += [self.nlte.citation_info]
citation = "\n".join(citation)
return self.create_citation(citation, output=output)
[docs]
def save(self, filename, format="flex", _async=False):
"""Save the whole SME structure to disk.
The file format is zip file, with one info.json
file for simple values, and numpy save files for
large arrays. Substructures (linelist, abundance, etc.)
have their own folder within the zip file.
Parameters
----------
filename : str
filename to save the structure at
compressed : bool, optional
whether to compress the output, by default False
"""
persistence.save(filename, self, format=format, _async=_async)
[docs]
@staticmethod
def load(filename):
"""
Load SME data from disk
Currently supported file formats:
* ".npy": Numpy save file of an SME_Struct
* ".sav", ".inp", ".out": IDL save file with an sme structure
* ".ech": Echelle file from (Py)REDUCE
Parameters
----------
filename : str, optional
name of the file to load (default: 'sme.npy')
Returns
-------
sme : SME_Struct
Loaded SME structure
Raises
------
ValueError
If the file format extension is not recognized
"""
logger.info("Loading SME file %s", filename)
ext = os.path.splitext(filename)[1]
if ext == ".sme":
s = SME_Structure()
return persistence.load(filename, s)
elif ext == ".npy":
# Numpy Save file
s = np.load(filename, allow_pickle=True)
return np.atleast_1d(s)[0]
elif ext == ".npz":
s = np.load(filename, allow_pickle=True)
return s["sme"][()]
elif ext in [".sav", ".out", ".inp"]:
# IDL save file (from SME)
s = readsav(filename)["sme"]
def unfold(obj):
if isinstance(obj, bytes):
return obj.decode()
elif isinstance(obj, np.recarray):
return {
name.casefold(): unfold(obj[name][0])
for name in obj.dtype.names
}
return obj
s = unfold(s)
return SME_Structure(**s)
elif ext == ".ech":
# Echelle file (from REDUCE)
ech = echelle.read(filename)
s = SME_Structure()
s.wave = [np.ma.compressed(w) for w in ech.wave]
s.spec = [np.ma.compressed(s) for s in ech.spec]
s.uncs = [np.ma.compressed(s) for s in ech.sig]
for i, w in enumerate(s.wave):
sort = np.argsort(w)
s.wave[i] = w[sort]
s.spec[i] = s.spec[i][sort]
s.uncs[i] = s.uncs[i][sort]
s.mask = [np.full(i.size, 1) for i in s.spec]
s.mask[s.spec == 0] = MASK_VALUES.BAD
s.wran = [[w[0], w[-1]] for w in s.wave]
s.abund = Abund.solar()
try:
s.object = ech.head["OBJECT"]
except KeyError:
pass
return s
else:
options = [".npy", ".sav", ".out", ".inp", ".ech"]
raise ValueError(
f"File format not recognised, expected one of {options} but got {ext}"
)