Source code for pysme.persistence

# -*- coding: utf-8 -*-
import io
import json
import logging
import os
import subprocess
import sys
import tempfile
from zipfile import ZIP_LZMA, ZIP_STORED, ZipFile

import numpy as np
from flex.flex import FlexExtension, FlexFile

from . import __version__

logger = logging.getLogger(__name__)


[docs]def to_flex(sme): header = {} extensions = {} for name in sme._names: value = sme[name] if isinstance(value, IPersist): extensions[name] = value._save() elif isinstance(value, FlexExtension): extensions[name] = value elif value is not None: header[name] = value ff = FlexFile(header, extensions) return ff
[docs]def from_flex(ff, sme): header = ff.header extensions = ff.extensions for name in sme._names: if name in updates.keys(): name = updates[name] if name in header.keys(): sme[name] = header[name] elif name in extensions.keys(): if sme[name] is not None and isinstance(sme[name], IPersist): sme[name] = sme[name]._load(extensions[name]) else: sme[name] = extensions[name] return sme
[docs]def save(filename, sme, format="flex", _async=False): """ Create a folder structure inside a tarfile See flex-format for details Parameters ---------- filename : str Filename of the final file sme : SME_Structure sme structure to save compressed : bool, optional whether to compress the output """ ff = to_flex(sme) if format == "flex": file_ending = ".sme" else: file_ending = "." + format if not filename.endswith(file_ending): filename = filename + file_ending if format == "flex": if _async: ff.write_async(filename) else: ff.write(filename) elif format == "fits": ff.to_fits(filename, overwrite=True) elif format == "json": ff.to_json(filename) else: raise ValueError( "Format {!r} not understood, expected one of ['flex', 'fits', 'json'].".format( format ) )
[docs]def load(fname, sme): """ Load the SME Structure from disk Parameters ---------- fname : str file to load sme : SME_Structure empty sme structure with default values set Returns ------- sme : SME_Structure loaded sme structure """ try: ff = FlexFile.read(fname) sme = from_flex(ff, sme) ff.close() return sme except Exception as ex: logger.error(ex) try: sme = load_v1(fname, sme) except: raise ex return sme
# Update this if the names in sme change updates = {"idlver": "system_info"}
[docs]class IPersist: def _save(self): raise NotImplementedError @classmethod def _load(cls, ext): raise NotImplementedError def _save_v1(self, file, folder=""): saves_v1(file, self, folder) @classmethod def _load_v1(cls, file, names, folder=""): logger.setLevel(logging.INFO) data = cls() # TODO Suppress warnings data = loads_v1(file, data, names, folder) logger.setLevel(logging.NOTSET) return data
# Version 1 IO (Deprecated)
[docs]def toBaseType(value): if value is None: return value if isinstance(value, np.ndarray): return value.tolist() if isinstance(value, np.integer): return int(value) if isinstance(value, np.floating): return float(value) if isinstance(value, np.bool_): return bool(value) if isinstance(value, np.str): return str(value) return value
[docs]def save_v1(filename, data, folder="", compressed=False): """ Create a folder structure inside a zipfile Add .json and .npy and .npz files with the correct names And subfolders for more complicated objects with the same layout Each class should have a save and a load method which can be used for this purpose Parameters ---------- filename : str Filename of the final zipfile data : SME_struct data to save folder : str, optional subfolder to save data to compressed : bool, optional whether to compress the output """ # We use LZMA for compression, since that yields the # smallest filesize of the existing compression algorithms if not compressed: compression = ZIP_STORED else: compression = ZIP_LZMA with ZipFile(filename, "w", compression) as file: saves_v1(file, data, folder=folder)
# TODO: this is specific for Collection type objects # Move this to Collection, and not here
[docs]def saves_v1(file, data, folder=""): if folder != "" and folder[-1] != "/": folder = folder + "/" parameters = {} arrays = {} others = {} for key in data._names: value = getattr(data, key) if np.isscalar(value) or isinstance(value, dict): parameters[key] = value elif isinstance(value, (list, np.ndarray)): if np.size(value) > 20: arrays[key] = value else: parameters[key] = value else: others[key] = value info = json.dumps(parameters, default=toBaseType) file.writestr(f"{folder}info.json", info) for key, value in arrays.items(): b = io.BytesIO() np.save(b, value) file.writestr(f"{folder}{key}.npy", b.getvalue()) for key, value in others.items(): if value is not None: value._save_v1(file, f"{folder}{key}")
[docs]def load_v1(filename, data): with ZipFile(filename, "r") as file: names = file.namelist() return loads_v1(file, data, names)
[docs]def loads_v1(file, data, names=None, folder=""): if folder != "" and folder[-1] != "/": folder = folder + "/" if names is None: names = file.namelist() subdirs = {} local = [] for name in names: name_within = name[len(folder) :] if "/" not in name_within: local.append(name) else: direc, _ = name_within.split("/", 1) if direc not in subdirs.keys(): subdirs[direc] = [] subdirs[direc].append(name) for name in local: if name.endswith(".json"): info = file.read(name) info = json.loads(info) for key, value in info.items(): key = updates.get(key, key) data[key] = value elif name.endswith(".npy"): b = io.BytesIO(file.read(name)) key = name[len(folder) : -4] key = updates.get(key, key) data[key] = np.load(b) elif name.endswith(".npz"): b = io.BytesIO(file.read(name)) key = name[len(folder) : -4] key = updates.get(key, key) value = np.load(b) data[key] = [value[f"arr_{i}"] for i in range(len(value))] for key, value in subdirs.items(): data_key = updates.get(key, key) data[data_key] = data[data_key]._load_v1(file, value, folder=folder + key) return data
# IDL IO
[docs]def get_typecode(dtype): """Get the IDL typecode for a given dtype""" if dtype.name[:5] == "bytes": return "1" if dtype.name == "int16": return "2" if dtype.name == "int32": return "3" if dtype.name == "float32": return "4" if dtype.name == "float64": return "5" if dtype.name[:3] == "str": return dtype.name[3:] raise ValueError("Don't recognise the datatype")
temps_to_clean = []
[docs]def save_as_binary(arr): global temps_to_clean with tempfile.NamedTemporaryFile("w+", suffix=".dat", delete=False) as temp: if arr.dtype.name[:3] == "str" or arr.dtype.name == "object": arr = arr.astype(bytes) shape = (arr.dtype.itemsize, len(arr)) elif np.issubdtype(arr.dtype, np.floating): # SME expects double precision, so we assure that here arr = arr.astype("float64") shape = arr.shape[::-1] else: shape = arr.shape[::-1] # Most arrays should be in the native endianness anyway # But if not we swap it to the native representation endian = arr.dtype.str[0] if endian == "<": endian = "little" elif endian == ">": endian = "big" elif endian == "|": endian = sys.byteorder if endian != sys.byteorder: arr = arr.newbyteorder().byteswap() endian = "native" arr.tofile(temp) value = [temp.name, str(list(shape)), get_typecode(arr.dtype), endian] temps_to_clean += [temp] return value
[docs]def clean_temps(): global temps_to_clean for temp in temps_to_clean: try: os.remove(temp) except: pass temps_to_clean = []
[docs]def write_as_idl(sme): """ Write SME structure into and idl format data arrays are stored in seperate temp files, and only the filename is passed to idl """ vrad_flag = {"none": -2, "whole": -1, "each": 0, "fix": -2}[sme.vrad_flag] # cscale_flag = {"none": -3, "fix": -3, "constant": 0, "linear": 1, "quadratic": 1, }[ # sme.cscale_flag # ] # if not sme.normalize_by_continuum: # cscale_flag = -2 abund = sme.abund.get_pattern(type="sme", raw=True) abund[np.isnan(abund)] = -99 fitvars = ["TEFF", "GRAV", "FEH", "VMIC", "VMAC", "VSINI", "GAM6", "VRAD"] fitvars = [s.upper() for s in sme.fitparameters if s.upper() in fitvars] if "logg" in sme.fitparameters: fitvars += ["GRAV"] if "monh" in sme.fitparameters: fitvars += ["FEH"] if sme.mask is None and sme.wave is not None: sme.mask = 1 idl_fields = { "version": 5.1, "id": sme.id, "teff": sme.teff, "grav": sme.logg, "feh": sme.monh, "vmic": float(sme.vmic), "vmac": float(sme.vmac), "vsini": float(sme.vsini), "vrad": sme.vrad.tolist() if vrad_flag == 0 else sme.vrad[0], "vrad_flag": vrad_flag, "cscale": 1.0, "cscale_flag": 0, "gam6": sme.gam6, "h2broad": int(sme.h2broad), "accwi": sme.accwi, "accrt": sme.accrt, "clim": 0.01, "maxiter": 100, "chirat": 0.002, "nmu": sme.nmu, "nseg": sme.nseg, "abund": save_as_binary(abund), "species": save_as_binary(sme.species), "atomic": save_as_binary(sme.atomic), "lande": save_as_binary(sme.linelist.lande), "lineref": save_as_binary(sme.linelist.reference), "short_line_format": {"short": 1, "long": 2}[sme.linelist.lineformat], "wran": sme.wran.tolist(), "mu": sme.mu.tolist() if sme.nmu > 1 else sme.mu[0], "obs_name": "", "obs_type": 0, "glob_free": fitvars if len(fitvars) != 0 else "", "atmo": { "method": str(sme.atmo.method), "source": str(sme.atmo.source), "depth": str(sme.atmo.depth), "interp": str(sme.atmo.interp), "geom": str(sme.atmo.geom), }, } if len(sme.nlte.elements) != 0: idl_fields["nlte"] = {} flags = np.zeros(99, dtype="int16") grids = ["" for _ in range(99)] for elem in sme.nlte.elements: flags[sme.abund.elem_dict[elem]] = 1 grids[sme.abund.elem_dict[elem]] = sme.nlte.grids[elem] idl_fields["nlte"]["nlte_elem_flags"] = save_as_binary(flags) idl_fields["nlte"]["nlte_subgrid_size"] = save_as_binary( sme.nlte.subgrid_size.astype("int16") ) idl_fields["nlte"]["nlte_grids"] = grids idl_fields["nlte"]["nlte_pro"] = "sme_nlte" if sme.iptype is not None: idl_fields["iptype"] = sme.iptype idl_fields["ipres"] = sme.ipres[0] # "ip_x": sme.ip_x, # "ip_y": sme.ip_y, else: idl_fields["iptype"] = "gauss" idl_fields["ipres"] = 0 if sme.wave is not None: wind = np.cumsum(sme.wave.shape[1]) - 1 idl_fields["wave"] = save_as_binary(sme.wave.ravel()) idl_fields["wind"] = wind.tolist() if sme.spec is not None: idl_fields["sob"] = save_as_binary(sme.spec.ravel()) if sme.uncs is not None: idl_fields["uob"] = save_as_binary(sme.uncs.ravel()) if sme.mask is not None: idl_fields["mob"] = save_as_binary(sme.mask.ravel().astype("int16")) if sme.synth is not None: idl_fields["smod"] = save_as_binary(sme.synth.ravel()) if "depth" in sme.linelist.columns: idl_fields["depth"] = save_as_binary(sme.linelist.depth) else: idl_fields["depth"] = save_as_binary(np.ones(len(sme.linelist))) if sme.linelist.lineformat == "long": idl_fields.update( { "line_extra": save_as_binary(sme.linelist.extra), "line_lulande": save_as_binary(sme.linelist.lulande), "line_term_low": save_as_binary(sme.linelist.term_lower), "line_term_upp": save_as_binary(sme.linelist.term_upper), } ) sep = "" text = "" for key, value in idl_fields.items(): if isinstance(value, dict): text += f"{sep}{key!s}:{{{key!s},$\n" sep = "" for key2, value2 in value.items(): text += f"{sep}{key2!s}:{value2!r}$\n" sep = "," sep = "," text += "}$\n" else: text += f"{sep}{key!s}:{value!r}$\n" sep = "," return text
[docs]def save_as_idl(sme, fname): """ Save the SME structure to disk as an idl save file This writes a IDL script to a temporary file, which is then run with idl as a seperate process. Therefore this reqires a working idl installation. There are two steps to this. First all the fields from the sme, structure need to be transformed into simple idl readable structures. All large arrays are stored in seperate binary files, for performance. The script then reads those files back into idl. """ with tempfile.NamedTemporaryFile("w+", suffix=".pro") as temp: tempname = temp.name temp.write("print, 'Hello'\n") temp.write("sme = {sme,") # TODO: Save data as idl compatible data temp.write(write_as_idl(sme)) temp.write("} \n") # This is the code that will be run in idl temp.write("print, 'there'\n") temp.write( """tags = tag_names(sme) print, tags new_sme = {} for i = 0, n_elements(tags)-1 do begin arr = sme.(i) s = size(arr) if (s[0] eq 1) and (s[1] eq 4) then begin void = execute('shape = ' + arr[1]) type = fix(arr[2]) endian = string(arr[3]) arr = read_binary(arr[0], data_dims=shape, data_type=type, endian=endian) if type eq 1 then begin ;string arr = string(arr) endif endif if (s[s[0]+1] eq 8) then begin ;struct tags2 = tag_names(sme.(i)) new2 = {} tmp = sme.(i) for j = 0, n_elements(tags2)-1 do begin arr2 = tmp.(j) s = size(arr2) if (s[0] eq 1) and (s[1] eq 4) then begin void = execute('shape = ' + arr2[1]) type = fix(arr2[2]) endian = string(arr2[3]) arr2 = read_binary(arr2[0], data_dims=shape, data_type=type, endian=endian) if type eq 1 then begin ;string arr2 = string(arr2) endif endif new2 = create_struct(temporary(new2), tags2[j], arr2) endfor arr = new2 endif new_sme = create_struct(temporary(new_sme), tags[i], arr) endfor sme = new_sme\n""" ) temp.write(f'save, sme, filename="{fname}"\n') temp.write("end\n") temp.flush() # with open(os.devnull, 'w') as devnull: print("IDL Script: ", tempname) subprocess.run(["idl", "-e", ".r %s" % tempname]) # input("Wait for me...") clean_temps()