# -*- coding: utf-8 -*-
"""
Determine continuum based on continuum mask
and fit best radial velocity to observation
"""
import logging
import warnings
import emcee
import numpy as np
from scipy.constants import speed_of_light
from scipy.interpolate import splev, splrep
from scipy.ndimage import binary_dilation
from scipy.optimize import least_squares
from scipy.signal import correlate
from tqdm import tqdm
from pysme.util import disable_progress_bars
from .iliffe_vector import Iliffe_vector
from .sme import MASK_VALUES
from .sme_synth import SME_DLL
from .util import show_progress_bars
logger = logging.getLogger(__name__)
c_light = speed_of_light * 1e-3 # speed of light in km/s
[docs]
class ContinuumNormalizationAbstract:
def __init__(self):
self.cscale_type = "none"
pass
def __call__(self, sme, x_syn, y_syn, segments, rvel=0, least_squares_kwargs=None):
if least_squares_kwargs is None:
least_squares_kwargs = {}
raise NotImplementedError
[docs]
def apply(self, wave, smod, cwave, cscale, segments):
return apply_continuum(wave, smod, cwave, cscale, self.cscale_type, segments)
[docs]
class ContinuumNormalizationMask(ContinuumNormalizationAbstract):
def __init__(self):
super().__init__()
self.cscale_type = "mask"
def __call__(
self,
sme,
x_syn,
y_syn,
segments,
rvel=0,
least_squares_kwargs=None,
):
"""
Fit a polynomial to the spectrum points marked as continuum
The degree of the polynomial fit is determined by sme.cscale_flag
Parameters
----------
sme : SME_Struct
input sme structure with sme.sob, sme.wave, and sme.mask
segment : int
index of the wavelength segment to use, or -1 when dealing with the whole spectrum
Returns
-------
cscale : array of size (ndeg + 1,)
polynomial coefficients of the continuum fit, in numpy order, i.e. largest exponent first
"""
if segments < 0:
return sme.cscale
if least_squares_kwargs is None:
least_squares_kwargs = {}
if "spec" not in sme or "wave" not in sme:
# If there is no observation, we have no continuum scale
warnings.warn("Missing data for continuum fit")
cscale = [1]
elif sme.cscale_flag in ["none", -3]:
cscale = [1]
elif sme.cscale_flag in ["fix", -1, -2]:
# Continuum flag is set to no continuum
cscale = sme.cscale[segments]
else:
# fit a line to the continuum points
ndeg = sme.cscale_degree
# Extract points in this segment
x, y = sme.wave, sme.spec
if "mask" in sme:
m = sme.mask
else:
m = sme.spec.copy()
m[:] = sme.mask_value["line"]
if "uncs" in sme:
u = sme.uncs
else:
u = sme.spec.copy()
u[:] = 1
x, y, m, u = x[segments], y[segments], m[segments], u[segments]
# Set continuum mask
if np.all((m & MASK_VALUES.CONT) == 0):
# If no continuum mask has been set
# Use the effective wavelength ranges of the lines to determine continuum points
logger.info(
"No Continuum mask was set in segment %s, "
"Using effective wavelength range of lines to find continuum instead",
segments,
)
cont = self.get_continuum_mask(x, y, sme.linelist, mask=m)
# Save mask for next iteration
m[cont == MASK_VALUES.CONT] = MASK_VALUES.CONT
logger.debug("Continuum mask points: %i", np.count_nonzero(cont == 2))
cont = (m & MASK_VALUES.CONT) != 0
x = x - x[0]
x, y, u = x[cont], y[cont], u[cont]
# Fit polynomial
try:
func = lambda coef: (np.polyval(coef, x) - y) / u
c0 = np.polyfit(x, y, deg=ndeg)
res = least_squares(func, x0=c0, **least_squares_kwargs)
cscale = res.x
except TypeError:
warnings.warn("Could not fit continuum, set continuum mask?")
cscale = [1]
return cscale
[docs]
def get_continuum_mask(self, wave, synth, linelist, threshold=0.1, mask=None):
"""
Use the effective wavelength range of the lines,
to find wavelength points that should be unaffected by lines
However one usually has to ignore the weak lines, as most points are affected by one line or another
Therefore keep increasing the threshold until enough lines have been found (>10%)
Parameters
----------
wave : array of size (n,)
wavelength points
linelist : LineList
LineList object that was input into the Radiative Transfer
threshold : float, optional
starting threshold, lines with depth below this value are ignored
the actual threshold is increased until enough points are found (default: 0.1)
Returns
-------
mask : array(bool) of size (n,)
True for points between lines and False for points within lines
"""
if "depth" not in linelist.columns:
raise ValueError(
"No depth specified in the linelist, can't auto compute the mask"
)
if threshold <= 0:
threshold = 0.01
if mask is None:
mask = np.full(len(wave), MASK_VALUES.LINE)
# TODO make this better
dll = SME_DLL()
dll.linelist = linelist
width = dll.GetLineRange()
# TODO: optimize this
temp = False
while np.count_nonzero(temp) < len(wave) * 0.1:
temp = np.full(len(wave), True)
for i, line in enumerate(width):
if linelist["depth"][i] > threshold:
w = (wave >= line[0]) & (wave <= line[1])
temp[w] = False
# TODO: Good value to increase threshold by?
temp[mask == MASK_VALUES.BAD] = False
threshold *= 1.1
mask[temp] = MASK_VALUES.CONT
logger.debug("Ignoring lines with depth < %f", threshold)
return mask
[docs]
class ContinuumNormalizationMCMC(ContinuumNormalizationAbstract):
def __init__(self):
super().__init__()
self.cscale_tyoe = "mcmc"
def __call__(
self,
sme,
x_syn,
y_syn,
segments,
rvel=0,
least_squares_kwargs=None,
):
"""
Fits both radial velocity and continuum level simultaneously
by comparing the synthetic spectrum to the observation
The best fit is determined using robust least squares between
a shifted and scaled synthetic spectrum and the observation
Parameters
----------
sme : SME_Struct
contains the observation
segment : int
wavelength segment to fit
x_syn : array of size (ngrid,)
wavelength of the synthetic spectrum
y_syn : array of size (ngrid,)
intensity of the synthetic spectrum
Returns
-------
vrad : float
radial velocity in km/s
vrad_unc : float
radial velocity uncertainty in km/s
cscale : array of size (ndeg+1,)
polynomial coefficients of the continuum
cscale_unc : array if size (ndeg + 1,)
uncertainties of the continuum coefficients
"""
if np.isscalar(segments):
segments = [segments]
nseg = len(segments)
if least_squares_kwargs is None:
least_squares_kwargs = {}
if sme.cscale_flag in ["none", "fix"] and sme.vrad_flag in [
"none",
"fix",
]:
vrad, vunc, cscale, cunc = null_result(nseg, sme.cscale_degree)
if sme.vrad_flag == "fix":
vrad = sme.vrad[segments]
if sme.cscale_flag == "fix":
cscale = sme.cscale[segments]
return vrad, vunc, cscale, cunc
if "spec" not in sme or "wave" not in sme:
# No observation no radial velocity
logger.warning("Missing data for radial velocity/continuum determination")
return null_result(nseg, sme.cscale_degree)
if "mask" not in sme:
sme.mask = np.full(sme.spec.size, MASK_VALUES.LINE)
if "uncs" not in sme:
sme.uncs = np.full(sme.spec.size, 1.0)
if np.all(sme.mask_bad[segments].ravel()):
warnings.warn(
"Only bad pixels in this segments, can't determine radial velocity/continuum",
UserWarning,
)
return null_result(nseg, sme.cscale_degree)
if x_syn.ndim == 1:
x_syn = x_syn[None, :]
if y_syn.ndim == 1:
y_syn = y_syn[None, :]
if x_syn.shape[0] != nseg or y_syn.shape[0] != nseg:
raise ValueError(
"Size of synthetic spectrum, does not match the number of requested segments"
)
mask = sme.mask_good[segments]
x_obs = Iliffe_vector(
[w[m] for w, m in zip(sme.wave[segments].segments, mask.segments)]
)
y_obs = sme.spec[segments][mask]
x_num = x_obs - sme.wave[segments][:, 0]
if x_obs.size <= sme.cscale_degree:
warnings.warn(
"Not enough good pixels to determine radial velocity/continuum"
)
return null_result(nseg)
if sme.cscale_flag in [-3, "none"]:
cflag = False
cscale = np.ones((nseg, 1))
ndeg = 0
elif sme.cscale_flag in [-1, -2, "fix"]:
cflag = False
cscale = sme.cscale[segments]
ndeg = cscale.shape[1] - 1
elif sme.cscale_flag in [0, "constant"]:
ndeg = 0
cflag = True
elif sme.cscale_flag in [1, "linear"]:
ndeg = 1
cflag = True
elif sme.cscale_flag in [2, "quadratic"]:
ndeg = 2
cflag = True
else:
raise ValueError("cscale_flag not recognized")
if cflag:
if sme.cscale is not None:
cscale = sme.cscale[segments]
else:
cscale = np.zeros((nseg, ndeg + 1))
for i, seg in enumerate(segments):
cscale[i, -1] = np.nanpercentile(y_obs[seg], 95)
# Even when the vrad_flag is set to whole
# you still want to fit the rv of the individual segments
# just for the continuum fit
if sme.vrad_flag == "none":
vrad = np.zeros(len(segments))
vflag = False
elif sme.vrad_flag == "whole":
vrad = sme.vrad[:1]
vflag = True
elif sme.vrad_flag == "each":
vrad = sme.vrad[segments]
vflag = True
elif sme.vrad_flag == "fix":
vrad = sme.vrad[segments]
vflag = False
else:
raise ValueError(f"Radial velocity Flag not understood {sme.vrad_flag}")
# Limit shift to half an order
x1, x2 = (
x_obs[:, 0],
x_obs[:, [s // 4 for s in x_obs.shape[1]]].ravel(),
)
rv_limit = np.abs(c_light * (1 - x2 / x1))
if sme.vrad_flag == "whole":
rv_limit = np.min(rv_limit)
# Use Cross corellatiom as a first guess for the radial velocity
# This uses the median as the continuum
if vrad is None:
y_tmp = np.interp(x_obs, x_syn, y_syn, left=1, right=1)
corr = correlate(y_obs - np.median(y_obs), y_tmp - 1, mode="same")
offset = np.argmax(corr)
x1, x2 = x_obs[offset], x_obs[len(x_obs) // 2]
vrad = c_light * (1 - x2 / x1)
if np.abs(vrad) >= rv_limit:
logger.warning(
"Radial Velocity could not be estimated from cross correlation, using initial guess of 0 km/h. Please check results!"
)
vrad = 0
def log_prior(rv, cscale, nwalkers):
prior = np.zeros(nwalkers)
# Add the prior here
# TODO reject too large/small rv values in a prior
where = np.full(nwalkers, False)
if vflag:
where |= np.any(np.abs(rv) > rv_limit, axis=1)
if cflag:
where |= np.any(cscale[:, :, -1] < 0, axis=1)
if ndeg == 1:
where |= np.any(
(cscale[:, :, -1] + cscale[:, :, -2] * x_num[:, -1]) < 0,
axis=1,
)
elif ndeg == 2:
for i in range(nseg):
where |= np.any(
cscale[:, i, None, -1]
+ cscale[:, i, None, -2] * x_num[i]
+ cscale[:, i, None, -3] * x_num[i] ** 2
< 0,
axis=1,
)
prior[where] = -np.inf
return prior
def log_prob(par, sep, nseg, ndeg):
"""
par : array of shape (nwalkers, ndim)
ndim = 1 for radial velocity + continuum polynomial coeficients
"""
nwalkers = par.shape[0]
rv = par[:, :sep] if vflag else vrad[None, :]
if rv.shape[0] == 1 and nwalkers > 1:
rv = np.tile(rv, [nwalkers, 1])
if rv.shape[1] == 1 and nseg > 1:
rv = np.tile(rv, [1, nseg])
if cflag:
cs = par[:, sep:]
cs.shape = nwalkers, nseg, ndeg + 1
else:
cs = cscale[None, ...]
prior = log_prior(rv, cs, nwalkers)
# Apply RV shift
rv_factor = np.sqrt((1 - rv / c_light) / (1 + rv / c_light))
total = np.zeros(nwalkers)
for i in range(nseg):
x = x_obs[i][None, :] * rv_factor[:, i, None]
model = np.interp(x, x_syn[i], y_syn[i], left=0, right=0)
# Apply continuum
y = np.zeros_like(x_num[i])[None, :]
for j in range(ndeg + 1):
y = y * x_num[i] + cs[:, i, j, None]
model *= y
# Ignore the non-overlapping parts of the spectrum
mask = model == 0
npoints = mask.shape[1] - np.count_nonzero(mask, axis=1)
resid = (model - y_obs[i]) ** 2
resid[mask] = 0
prob = -0.5 * np.sum(resid, axis=-1)
# Need to rescale here, to account for the ignored points before
prob *= mask.shape[1] / npoints
prob[np.isnan(prob)] = -np.inf
total += prob
return prior + total
sep = len(vrad) if vflag else 0
ndim, p0, scale = 0, [], []
if vflag:
ndim += len(vrad)
p0 += list(vrad)
scale += [1] * len(vrad)
if cflag:
ndim += cscale.size
p0 += list(cscale.ravel())
scale += [0.001] * cscale.size
p0 = np.array(p0)[None, :]
scale = np.array(scale)[None, :]
max_n = 10000
ncheck = 100
nburn = 300
nwalkers = max(2 * ndim + 1, 10)
p0 = p0 + np.random.randn(nwalkers, ndim) * scale
# If the original guess is good then DEMove is much faster, and sometimes just as good
# However StretchMove is much more robust to the initial starting value
moves = [
(emcee.moves.DEMove(), 0.8),
(emcee.moves.DESnookerMove(), 0.2),
]
sampler = emcee.EnsembleSampler(
nwalkers,
ndim,
log_prob,
vectorize=True,
moves=moves,
args=(sep, nseg, ndeg),
)
# We'll track how the average autocorrelation time estimate changes
index = 0
autocorr = np.empty((max_n // ncheck + 1, ndim))
# This will be useful to testing convergence
# old_tau = 0
# Now we'll sample for up to max_n steps
with tqdm(
leave=False, desc="RV", total=max_n, disable=~show_progress_bars
) as t:
for _ in sampler.sample(p0, iterations=max_n):
t.update()
# Only check convergence every 100 steps
if sampler.iteration < 2 * nburn or sampler.iteration % ncheck != 0:
continue
# Compute the autocorrelation time so far
# Using tol=0 means that we'll always get an estimate even
# if it isn't trustworthy
tau = sampler.get_autocorr_time(
tol=0, discard=sampler.iteration - ncheck
)
autocorr[index] = tau
index += 1
# Check convergence
converged = np.all(tau * 100 < sampler.iteration - nburn)
# converged &= np.all(np.abs(old_tau - tau) < 0.01 * tau)
# old_tau = tau
if converged:
break
if sampler.iteration == max_n:
logger.warning(
"The radial velocity did not converge within the limit. Check the results!"
)
samples = sampler.get_chain(flat=True, discard=nburn)
_, vrad_unc, _, cscale_unc = null_result(nseg, ndeg)
if vflag:
vmin, vrad, vmax = np.percentile(samples[:, :sep], (32, 50, 68), axis=0)
vrad_unc[:, 0] = vrad - vmin
vrad_unc[:, 1] = vmax - vrad
if cflag:
vmin, cscale, vmax = np.percentile(samples[:, sep:], (32, 50, 68), axis=0)
vmin.shape = cscale.shape = vmax.shape = nseg, ndeg + 1
cscale_unc[..., 0] = cscale - vmin
cscale_unc[..., 1] = vmax - cscale
if sme.vrad_flag == "whole":
vrad = np.tile(vrad, [nseg])
return vrad, vrad_unc, cscale, cscale_unc
[docs]
class ContinuumNormalizationMatch(ContinuumNormalizationAbstract):
def __init__(self):
super().__init__()
self.cscale_type = "match"
self.mask = False
# We over exaggerate the weights on the top of the spectrum
# this works well to determine the continuum
# assuming that there is something there
self.top_factor = 1000
self.bottom_factor = 1
def __call__(
self,
sme,
x_syn,
y_syn,
segments,
rvel=0,
least_squares_kwargs=None,
):
"""
Fit a continuum when no continuum points exist
Parameters
----------
sme : SME_Struct
sme structure with observation data
segment : int
index of the wavelength segment to fit
x_syn : array of size (n,)
wavelengths of the synthetic spectrum
y_syn : array of size (n,)
intensity of the synthetic spectrum
rvel : float, optional
radial velocity in km/s to apply to the wavelength (default: 0)
Returns
-------
continuum : array of size (ndeg,)
continuum fit polynomial coefficients
"""
if sme.cscale_flag == "none":
return [1]
elif sme.cscale_flag == "fix":
return sme.cscale[segments]
if least_squares_kwargs is None:
least_squares_kwargs = {}
# else
x = sme.wave[segments]
y = sme.spec[segments]
u = sme.uncs[segments]
if self.mask:
m = sme.mask_cont[segments]
else:
m = sme.mask_good[segments]
# Interpolate the synthetic spectrum to the observed wavelength grid
yp = apply_radial_velocity([x], [x_syn], [y_syn], [rvel], [0], copy=True)[0]
# Apply the mask to all vectors
xs = x - x[0]
xs, y, u, yp = xs[m], y[m], u[m], yp[m]
if sme.telluric is not None:
# by definition sme.telluric is on the same wavelength grid as sme.spec
# also it should be in the correct restframe
tell = sme.telluric[segments][m]
yp *= tell
# TODO: what should this be?
if self.top_factor != 0:
from scipy.ndimage import median_filter
diff = median_filter(y, 5) - y
std = 1.5 * np.median(np.abs(diff[diff != 0] - np.median(diff[diff != 0])))
snr = 1 / std
factor = 1000 + (snr - 50) * 90
factor = np.clip(factor, 100, 100_000)
mod = np.nanmedian(u) / np.nanmedian(y) * (np.nanpercentile(y, 95) - y) ** 2
u = u + factor * mod
deg = sme.cscale_degree
p0 = sme.cscale[segments]
def func(p):
model = yp * np.polyval(p, xs)
resid = (model - y) / u
# resid[resid > 0] *= self.top_factor
resid[resid < 0] *= self.bottom_factor
return resid
try:
res = least_squares(
func,
x0=p0,
**least_squares_kwargs,
)
popt = res.x
except (RuntimeError, ValueError) as ex:
# logger.warning("Failed to determine the continuum: " + ex.msg)
logger.warning("Failed to determine the continuum: %s", str(ex))
popt = p0
return popt[None, :]
[docs]
class ContinuumNormalizationMatchLines(ContinuumNormalizationMatch):
def __init__(self):
super().__init__()
self.cscale_type = "matchlines"
self.top_factor = 0
self.bottom_factor = 10
[docs]
class ContinuumNormalizationMatchMask(ContinuumNormalizationMatch):
def __init__(self):
super().__init__()
self.cscale_type = "match+mask"
self.mask = True
self.top_factor = 0
[docs]
class ContinuumNormalizationMatchLinesMask(ContinuumNormalizationMatchLines):
def __init__(self):
super().__init__()
self.cscale_type = "matchlines+mask"
self.mask = True
[docs]
class ContinuumNormalizationSpline(ContinuumNormalizationAbstract):
def __init__(self):
super().__init__()
self.cscale_type = "spline"
self.mask = False
def __call__(
self,
sme,
x_syn,
y_syn,
segments,
rvel,
least_squares_kwargs=None,
):
if sme.cscale_flag in ["none"]:
return np.ones(len(sme.spec[segments]))
elif sme.cscale_flag in ["fix"]:
if sme.cscale is not None:
return sme.cscale[segments]
else:
return np.ones(len(sme.spec[segments]))
if least_squares_kwargs is None:
least_squares_kwargs = {}
least_squares_kwargs.setdefault("f_scale", 0.01)
w = sme.wave[segments]
s = sme.spec[segments]
u = sme.uncs[segments]
# Apply RV correction to the synthetic spectrum
y = apply_radial_velocity([w], [x_syn], [y_syn], [rvel], [0], copy=True)[0]
# and don't forget the telluric spectrum if available
if sme.telluric is not None:
tell = sme.telluric[segments]
y *= tell
# Apply the bpm to all arrays
if self.mask:
m = sme.mask_cont[segments]
else:
m = sme.mask_good[segments]
wm, sm, ym, um = w[m], s[m], y[m], u[m]
# Fit the spline like in the polynomial
# so that synth * spline = obs
func = lambda p: (ym * splev(wm, (t, p, 3)) - sm) / um
# We use splrep to find the intial guess for the number of knots and their
# positions
# TODO: how do we know how many points to use?
if sme.cscale_flag == "constant":
tlen = int(np.round(w.max() - w.min()))
elif sme.cscale_flag == "linear":
tlen = 3
elif sme.cscale_flag == "quadratic":
tlen = 4
elif sme.cscale_flag == "cubic":
tlen = 5
elif sme.cscale_flag == "quintic":
tlen = 6
elif sme.cscale_flag == "quantic":
tlen = 7
else:
tlen = int(sme.cscale_flag)
# We need to avoid the 0 points
m2 = ym != 0
# Get a first guess by dividing by ym
t = np.linspace(wm[m2].min(), wm[m2].max(), tlen)[1:-1]
t, c, k = splrep(wm[m2], sm[m2] / ym[m2], w=1 / um[m2], k=3, t=t)
# Then get a real fit using the function
res = least_squares(
func,
x0=c,
**least_squares_kwargs,
)
# And finally evaluate the continuum
c = res.x
coef = splev(w, (t, c, k))
return coef
[docs]
class ContinuumNormalizationSplineMask(ContinuumNormalizationSpline):
def __init__(self):
super().__init__()
self.cscale_type = "spline+mask"
self.mask = True
[docs]
def apply_radial_velocity(wave, wmod, smod, vrad, segments, copy=False):
if copy:
wmod = [np.copy(w) for w in wmod]
smod = [np.copy(s) for s in smod]
if vrad is None:
return smod
for il in segments:
if vrad[il] is not None:
rv_factor = np.sqrt((1 + vrad[il] / c_light) / (1 - vrad[il] / c_light))
wmod[il] *= rv_factor
smod[il] = np.interp(wave[il], wmod[il], smod[il])
return smod
[docs]
def apply_continuum(wave, smod, cwave, cscale, cscale_type, segments, copy=False):
'''
Apply continuum to the spectra according to cscale.
'''
if copy:
smod = np.copy(smod)
if cscale is None:
return smod
for il in segments:
if cscale[il] is not None and not np.all(cscale[il] == 0):
if cscale_type in ["spline", "spline+mask"]:
if len(cscale[il]) != len(smod[il]):
cs = np.interp(wave[il], cwave[il], cscale[il])
else:
cs = cscale[il]
smod[il] *= cs
else:
x = wave[il] - wave[il][0]
smod[il] *= np.polyval(cscale[il], x)
return smod
[docs]
def apply_radial_velocity_and_continuum(
wave, wmod, smod, vrad, cscale, cscale_type, segments, copy=False
):
"""
Apply the radial velocity and continuum corrections
to a syntheic spectrum to match the observation
Parameters
----------
wave : array
final wavelength array of the observation
wmod : array
wavelength array of the synthethic spectrum
smod : array
flux array of the synthetic spectrum
vrad : array, None
radial velocities in km/s for each segment
cscale : array, None
continnum scales for each segment, exact meaning depends on cscale_type
cscale_type : str
defines the continuum correction behaviour
segments : array
the segments to apply the correction to
Returns
-------
smod : array
the corrected synthetic spectrum
"""
smod = apply_radial_velocity(wave, wmod, smod, vrad, segments, copy=copy)
# The radial velocity shift also interpolates onto the wavelength grid
smod = apply_continuum(wave, smod, wave, cscale, cscale_type, segments, copy=copy)
return smod
[docs]
def null_result(nseg, ndeg=0, ctype=None):
vrad, vrad_unc = np.zeros(nseg), np.zeros((nseg, 2))
if ctype in ["spline", "spline+mask"]:
cscale = [np.ones(ndeg[i]) for i in range(nseg)]
cscale = Iliffe_vector(values=cscale)
cscale_unc = [np.zeros(ndeg[i]) for i in range(nseg)]
cscale_unc = Iliffe_vector(values=cscale_unc)
else:
cscale, cscale_unc = np.zeros((nseg, ndeg + 1)), np.zeros((nseg, ndeg + 1, 2))
cscale[:, -1] = 1
return vrad, vrad_unc, cscale, cscale_unc
[docs]
def cross_correlate_segment(x_obs, y_obs, x_syn, y_syn, mask, mask_wider, rv_bounds):
# Interpolate synthetic observation onto the observation wavelength grid
y_tmp = np.interp(x_obs, x_syn, y_syn)
# Normalize both spectra
y_obs_tmp = np.copy(y_obs)
y_obs_tmp /= np.nanpercentile(y_obs_tmp, 95)
y_obs_tmp -= 1
y_obs_tmp[~mask] = 0
y_tmp_tmp = np.copy(y_tmp)
y_tmp_tmp /= np.nanpercentile(y_tmp_tmp, 95)
y_tmp_tmp -= 1
mask_wider = np.interp(x_obs, x_syn, mask_wider) > 0.5
y_tmp_tmp[~mask_wider] = 0
# Perform cross correaltion between normalized spectra
corr = correlate(y_obs_tmp, y_tmp_tmp, mode="same")
# Determine the radial velocity offset
# and only retain the area within the bounds
x_mid = x_obs[len(x_obs) // 2]
x_shift = c_light * (1 - x_mid / x_obs)
idx = (x_shift >= rv_bounds[0]) & (x_shift <= rv_bounds[1])
x_shift = x_shift[idx]
corr = corr[idx]
return x_shift, corr
[docs]
def determine_radial_velocity(
sme,
x_syn,
y_syn,
segment,
cscale=None,
whole=False,
least_squares_kwargs=None,
):
"""
Calculate radial velocity by using cross correlation and
least-squares between observation and synthetic spectrum
Parameters
----------
sme : SME_Struct
sme structure with observed spectrum and flags
segment : int
which wavelength segment to handle, -1 if its using the whole spectrum
cscale : array of size (ndeg,)
continuum coefficients, as determined by e.g. determine_continuum
x_syn : array of size (n,)
wavelength of the synthetic spectrum
y_syn : array of size (n,)
intensity of the synthetic spectrum
Raises
------
ValueError
if sme.vrad_flag is not recognized
Returns
-------
rvel : float
best fit radial velocity for this segment/whole spectrum
or None if no observation is present
"""
if least_squares_kwargs is None:
least_squares_kwargs = {}
least_squares_kwargs.setdefault("bounds", (-100, 100))
least_squares_kwargs.setdefault("jac", "3-point")
least_squares_kwargs.setdefault("loss", "soft_l1")
least_squares_kwargs.setdefault("method", "dogbox")
least_squares_kwargs.setdefault("x_scale", "jac")
least_squares_kwargs.setdefault("ftol", 1e-8)
least_squares_kwargs.setdefault("xtol", 1e-8)
least_squares_kwargs.setdefault("gtol", 1e-8)
if "spec" not in sme or "wave" not in sme:
# No observation no radial velocity
warnings.warn("Missing data for radial velocity determination")
rvel = 0
elif sme.vrad_flag == "none":
# vrad_flag says don't determine radial velocity
rvel = sme.vrad[segment]
elif sme.vrad_flag == "whole" and not whole:
# We are inside a segment, but only want to determine rv at the end
rvel = 0
elif sme.vrad_flag == "fix":
rvel = sme.vrad[segment]
else:
# Fit radial velocity
# Extract data
x, y = sme.wave, sme.spec
if "mask" in sme:
m = sme.mask
else:
m = Iliffe_vector(
[np.full(s, MASK_VALUES.LINE, dtype="i4") for s in sme.spec.shape[1]]
)
if "uncs" in sme:
u = sme.uncs
else:
u = sme.spec.copy()
u[:] = 1
# Only this one segment
x_obs = x[segment]
y_obs = y[segment].copy()
u_obs = u[segment]
mask = m[segment]
if sme.telluric is not None:
tell = sme.telluric[segment]
else:
tell = 1
if sme.vrad_flag == "each":
# apply continuum
y_syn = apply_continuum(
{segment: x_syn},
{segment: y_syn},
sme.wave,
{segment: cscale},
sme.cscale_type,
[segment],
)[segment]
elif sme.vrad_flag == "whole":
# All segments
y_syn = apply_continuum(
x_syn,
y_syn,
sme.wave,
cscale,
sme.cscale_type,
range(len(y_obs)),
)
else:
raise ValueError(
f"Radial velocity flag {sme.vrad_flag} not recognised, expected one of 'each', 'whole', 'none'"
)
# Filter the mask for the correct sections
# Need to use temporary m instead of mask, so that we can check
m = (mask & MASK_VALUES.VRAD) != 0
if not np.any(m):
# No VRAD specified, use LINE instead
m = (mask & MASK_VALUES.LINE) != 0
mask = m
mask &= u_obs != 0
# Widen the mask by roughly the amount expected from the rv_bounds
bounds = least_squares_kwargs["bounds"]
rv = max(bounds)
rv_factor = np.sqrt((1 + rv / c_light) / (1 - rv / c_light))
# mask_wider = mask.copy()
if sme.vrad_flag == "each":
iterations = int(
np.ceil(np.nanmedian((x_obs * rv_factor - x_obs) / np.gradient(x_obs)))
)
iterations = max(1, iterations)
mask_wider = binary_dilation(mask, iterations=iterations)
mask_wider = np.interp(x_syn, x_obs, mask_wider) > 0.5
elif sme.vrad_flag == "whole":
mask_wider = [None] * len(x_obs)
for i in range(len(x_obs)):
iterations = int(
np.ceil(
np.nanmedian(
(x_obs[i] * rv_factor - x_obs[i]) / np.gradient(x_obs[i])
)
)
)
iterations = max(1, iterations)
mask_wider[i] = binary_dilation(mask[i], iterations=iterations)
mask_wider[i] = np.interp(x_syn[i], x_obs[i], mask_wider[i]) > 0.5
mask_wider = Iliffe_vector(mask_wider)
else:
raise ValueError
# Get a first rough estimate from cross correlation
if sme.vrad_flag == "each":
x_shift, corr = cross_correlate_segment(
x_obs, y_obs, x_syn, y_syn, mask, mask_wider, bounds
)
else:
# If using several segments we run the cross correlation for each
# segment seperately and then sum the results
nseg = len(segment)
shift = [None for _ in range(nseg)]
corr = [None for _ in range(nseg)]
for i in range(len(x_obs)):
shift[i], corr[i] = cross_correlate_segment(
x_obs[i],
y_obs[i],
x_syn[i],
y_syn[i],
mask[i],
mask_wider[i],
bounds,
)
n_min = min(len(s) for s in shift)
x_shift = np.linspace(bounds[0], bounds[1], n_min)
corrs_interp = [np.interp(x_shift, s, c) for s, c in zip(shift, corr)]
corrs_interp = np.array(corrs_interp)
corr = np.sum(corrs_interp, axis=0)
# Concatenate the segments for the last step
mask = np.concatenate(mask)
mask_wider = np.concatenate(mask_wider)
x_obs = np.concatenate(x_obs)
y_obs = np.concatenate(y_obs)
u_obs = np.concatenate(u_obs)
x_syn = np.concatenate(x_syn)
y_syn = np.concatenate(y_syn)
if sme.telluric is not None:
tell = np.concatenate(tell)
# Retrieve the initial cross correlation guess
offset = np.argmax(corr)
rvel = x_shift[offset]
# Normalize the spectra to 1
y_obs = y_obs / np.nanpercentile(y_obs, 95)
y_obs -= 1
y_syn = y_syn / np.nanpercentile(y_syn, 95)
y_syn -= 1
# Apply mask
y_obs[~mask] = 0
y_syn[~mask_wider] = 0
u_obs = u_obs.copy()
u_obs[~mask] = 1
# Then minimize the least squares for a better fit
# as cross correlation can only find
def func(rv):
rv_factor = np.sqrt((1 - rv / c_light) / (1 + rv / c_light))
shifted = interpolator(x_obs * rv_factor)
resid = (y_obs - shifted * tell) / u_obs
resid[~mask] = 0
resid = np.nan_to_num(resid, copy=False, nan=0, posinf=0, neginf=0)
return resid
interpolator = lambda x: np.interp(x, x_syn, y_syn)
try:
res = least_squares(func, x0=rvel, **least_squares_kwargs)
rvel = res.x[0]
except ValueError:
logger.warning(f"Could not determine radial velocity for segment {segment}")
return rvel
[docs]
def match_rv_continuum(sme, segments, x_syn, y_syn):
"""
Match both the continuum and the radial velocity of observed/synthetic spectrum
Note that the parameterization of the continuum is different to old SME !!!
Parameters
----------
sme : SME_Struct
input sme structure with all the parameters
segment : int
index of the wavelength segment to match, or -1 when dealing with the whole spectrum
x_syn : array of size (n,)
wavelength of the synthetic spectrum
y_syn : array of size (n,)
intensitz of the synthetic spectrum
Returns
-------
rvel : float
new radial velocity
cscale : array of size (ndeg + 1,)
new continuum coefficients
"""
cont_func = {
"mask": ContinuumNormalizationMask,
"match": ContinuumNormalizationMatch,
"match+mask": ContinuumNormalizationMatchMask,
"matchlines": ContinuumNormalizationMatchLines,
"matchlines+mask": ContinuumNormalizationMatchLinesMask,
"spline": ContinuumNormalizationSpline,
"spline+mask": ContinuumNormalizationSplineMask,
"mcmc": ContinuumNormalizationMCMC,
}
vrad, vrad_unc, cscale, cscale_unc = null_result(
sme.nseg, sme.cscale_degree, sme.cscale_type
)
if sme.cscale_flag == "none" and sme.vrad_flag == "none":
return cscale, cscale_unc, vrad, vrad_unc
if np.isscalar(segments):
segments = [segments]
if not callable(sme.vrad_flag):
radial_velocity = determine_radial_velocity
else:
radial_velocity = sme.vrad_flag
if not callable(sme.cscale_type):
continuum_normalization = cont_func[sme.cscale_type]()
else:
continuum_normalization = sme.cscale_type
least_squares_kwargs = dict(
bounds=sme.vrad_bounds,
loss=sme.vrad_loss,
method=sme.vrad_method,
jac=sme.vrad_jac,
x_scale=sme.vrad_xscale,
ftol=sme.vrad_ftol,
xtol=sme.vrad_xtol,
gtol=sme.vrad_gtol,
)
if sme.vrad_flag == "none":
pass
elif sme.vrad_flag == "fix":
vrad[segments] = sme.vrad[segments]
elif sme.vrad_flag == "each":
for s in segments:
# We only use the continuum mask for the continuum fit,
# we need the lines for the radial velocity
vrad[s] = radial_velocity(
sme,
x_syn[s],
y_syn[s],
s,
cscale[s],
whole=False,
least_squares_kwargs=least_squares_kwargs,
)
elif sme.vrad_flag == "whole":
s = segments
vrad[s] = radial_velocity(
sme,
[x_syn[s] for s in s],
[y_syn[s] for s in s],
s,
cscale[s],
whole=True,
least_squares_kwargs=least_squares_kwargs,
)
else:
raise ValueError
if sme.cscale_flag == "none":
pass
elif sme.cscale_flag == "fix":
if sme.cscale is not None:
cscale[segments] = sme.cscale[segments]
else:
pass
else:
if sme.cscale_type == "mcmc":
for s in segments:
rv_i, rv_unc_i, cscale_i, cscale_unc_i = continuum_normalization(
sme,
x_syn[s],
y_syn[s],
s,
rvel=vrad[s],
)
# MCMC returns arrays even for one segment; normalize them to row/scalar.
cscale_row = np.asarray(cscale_i, dtype=float).reshape(1, -1)[0]
vrad[s] = np.asarray(rv_i, dtype=float).reshape(-1)[0]
vrad_unc[s] = np.asarray(rv_unc_i, dtype=float).reshape(1, 2)[0]
cscale[s] = cscale_row
cscale_unc[s] = np.asarray(cscale_unc_i, dtype=float).reshape(
1, cscale_row.size, 2
)[0]
else:
for s in segments:
cscale[s] = continuum_normalization(
sme,
x_syn[s],
y_syn[s],
s,
rvel=vrad[s],
least_squares_kwargs=dict(
bounds=sme.cscale_bounds,
loss=sme.cscale_loss,
method=sme.cscale_method,
jac=sme.cscale_jac,
x_scale=sme.cscale_xscale,
ftol=sme.cscale_ftol,
xtol=sme.cscale_xtol,
gtol=sme.cscale_gtol,
),
)
# Keep values from unused segments
select = np.arange(sme.nseg)
mask = np.full(select.shape, True)
for seg in segments:
mask &= select != seg
vrad[mask] = sme.vrad[mask]
if sme.cscale_type in ["spline", "spline+mask"]:
for i in range(len(mask)):
if (
mask[i]
and sme.cscale is not None
and len(cscale[i]) == len(sme.cscale[i])
):
cscale[i] = sme.cscale[i]
else:
cscale[mask] = sme.cscale[mask]
return cscale, cscale_unc, vrad, vrad_unc