Source code for pysme.iliffe_vector

# -*- coding: utf-8 -*-
import io
import logging
from collections.abc import Iterable
from numbers import Integral

import numpy as np
import numpy.lib.mixins
from flex.extensions.bindata import MultipleDataExtension
from flex.flex import FlexExtension

logger = logging.getLogger(__name__)

# TODO make this a proper subclass of np.ndarray
# see also https://numpy.org/devdocs/user/basics.subclassing.html
HANDLED_FUNCTIONS = {}


[docs]def implements(np_function): "Register an __array_function__ implementation for DiagonalArray objects." def decorator(func): HANDLED_FUNCTIONS[np_function] = func return func return decorator
[docs]class Iliffe_vector(numpy.lib.mixins.NDArrayOperatorsMixin, MultipleDataExtension): """ Illiffe vectors are multidimensional (here 2D) but not necessarily rectangular Instead the index is a pointer to segments of a 1D array with varying sizes """ def __init__(self, values, offsets=None, dtype=None): FlexExtension.__init__(self, cls=MultipleDataExtension) if offsets is not None: if offsets[0] != 0: offsets = [0, *offsets] self.data = np.asarray(values, dtype=dtype) self.offsets = np.asarray(offsets, dtype=int) return sizes = [len(v) for v in values] offsets = np.array([0, *np.cumsum(sizes)]) data = np.concatenate(values) if dtype is not None: data = data.astype(dtype) self.data = data self.offsets = offsets def __getitem__(self, key): if isinstance(key, Integral): return self.__getsegment__(key) if isinstance(key, (slice, Iterable)) and not isinstance(key, tuple): if isinstance(key, slice): key = range(self.nseg)[key] values = [self.__getsegment__(k) for k in key] return self.__class__(values) if isinstance(key, tuple): if isinstance(key[0], Integral): return self[key[0]][key[1]] if isinstance(key[0], (slice, Iterable)): if isinstance(key[0], slice): key0 = range(self.nseg)[key[0]] else: key0 = key[0] values = [self.__getsegment__(k) for k in key0] values = [v[key[1]] for v in values] if isinstance(key[1], Integral): return np.array(values) if isinstance(key[1], (slice, Iterable)): return self.__class__(values) if isinstance(key, Iliffe_vector): data = [w[m] for w, m in zip(self.segments, key.segments)] return self.__class__(data) raise KeyError def __setitem__(self, key, value): isscalar = np.isscalar(value) if not isscalar: value = np.asarray(value) if isinstance(key, Integral): return self.__setsegment__(key, value) if isinstance(key, (slice, Iterable)) and not isinstance(key, tuple): if isinstance(key, slice): key = range(self.nseg)[key] if isscalar or (value.ndim == 1 and value.dtype != "O"): for k in key: self.__setsegment__(k, value) else: for i, k in enumerate(key): self.__setsegment__(k, value[i]) return if isinstance(key, tuple): if isinstance(key[0], Integral): data = self.__getsegment__(key[0]) data[key[1]] = value return if isinstance(key[0], (slice, Iterable)): if isinstance(key[0], slice): key0 = range(self.nseg)[key[0]] else: key0 = key[0] if isscalar or (value.ndim == 1 and value.dtype != "O"): for k in key0: data = self.__getsegment__(k) data[key[1]] = value else: for i, k in enumerate(key0): data = self.__getsegment__(k) data[key[1]] = value return if isinstance(key, Iliffe_vector): self.data[key.data] = value return raise KeyError def __getsegment__(self, seg): while seg < 0: seg = self.nseg - seg if seg > self.nseg - 1: raise IndexError low, upp = self.offsets[seg : seg + 2] return self.data[low:upp] def __setsegment__(self, seg, value): while seg < 0: seg = self.nseg - seg if seg > self.nseg - 1: raise IndexError low, upp = self.offsets[seg : seg + 2] if np.isscalar(value) or value.size == upp - low: # Keep using the existing memory self.data[low:upp] = value else: # Need to set new memory data = [s if i != seg else value for i, s in enumerate(self.segments)] self.offsets = np.array([0, *np.cumsum([len(s) for s in data])]) self.data = np.concatenate(data) def __array__(self, dtype=None): arr = self.data if dtype is not None: arr = arr.astype(dtype) return arr def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): if method == "__call__": inputs = list(inputs) for i, input in enumerate(inputs): if isinstance(input, self.__class__): inputs[i] = input.__array__() if "out" in kwargs: out = [] for i, o in enumerate(kwargs["out"]): if isinstance(o, self.__class__): out.append(o.__array__()) else: out.append(o) kwargs["out"] = tuple(out) arr = ufunc(*inputs, **kwargs) return self.__class__.from_offsets(arr, self.offsets) else: return NotImplemented def __array_function__(self, func, types, args, kwargs): if func not in HANDLED_FUNCTIONS: return func(self.data, *args, **kwargs) # Note: this allows subclasses that don't override # __array_function__ to handle DiagonalArray objects. if not all(issubclass(t, self.__class__) for t in types): return NotImplemented return HANDLED_FUNCTIONS[func](*args, **kwargs) def __array_function_axis__(self, func, axis, args, kwargs): if axis is None: return func(self.data, *args, **kwargs) elif axis == 0: data = [func(s, *args, **kwargs) for s in self.segments] data = np.array(data) return data else: return NotImplemented def __repr__(self): return f"{self.__class__.__name__}({self.segments})" def __len__(self): return self.nseg @property def dtype(self): return self.data.dtype @property def ndim(self): return 2 @property def shape(self): return (self.nseg, self.sizes) @property def size(self): return self.data.size @property def sizes(self): return np.diff(self.offsets) @property def nseg(self): return len(self.offsets) - 1 @property def segments(self): return [self.data[l:u] for l, u in zip(self.offsets[:-1], self.offsets[1:])]
[docs] @implements(np.ravel) def ravel(self, *args, **kwargs): return np.ravel(self.data, *args, **kwargs)
[docs] @implements(np.ndarray.flatten) def flatten(self, *args, **kwargs): return self.data.flatten(*args, **kwargs)
[docs] @implements(np.copy) def copy(self, *args, **kwargs): data = np.copy(self.data, *args, **kwargs) offsets = np.copy(self.offsets, *args, **kwargs) return self.__class__(data, offsets=offsets)
[docs] @implements(np.where) def where(self, *args, **kwargs): data = np.where(self.data, *args, **kwargs) res = self.__class__(data, offsets=self.offsets) return res
[docs] @implements(np.all) def all(self, *args, axis=None, **kwargs): return self.__array_function_axis__(np.all, axis, args, kwargs)
[docs] @implements(np.any) def any(self, *args, axis=None, **kwargs): return self.__array_function_axis__(np.any, axis, args, kwargs)
[docs] @implements(np.min) def min(self, *args, axis=None, **kwargs): return self.__array_function_axis__(np.min, axis, args, kwargs)
[docs] @implements(np.max) def max(self, *args, axis=None, **kwargs): return self.__array_function_axis__(np.max, axis, args, kwargs)
[docs] @implements(np.mean) def mean(self, *args, axis=None, **kwargs): return self.__array_function_axis__(np.mean, axis, args, kwargs)
[docs] @classmethod def from_offsets(cls, array, offsets): self = cls(array, offsets=offsets) return self
[docs] @classmethod def from_indices(cls, array, indices): if indices is None: return cls([array]) else: offsets = [0, *np.cumsum(indices)] arr = [array[l:u] for l, u in zip(offsets[:-1], offsets[1:])] return cls(arr)
[docs] def astype(self, dtype): data = self.data.astype(dtype) offsets = self.offsets return Iliffe_vector(data, offsets, dtype=dtype)
# For IO with Flex def _prepare(self, name: str): cls = self.__class__ header_fname = f"{name}/header.json" header_info, header_bio = cls._prepare_json(header_fname, self.header) result = [(header_info, header_bio)] for key, value in enumerate(self.segments): data_fname = f"{name}/{key}.npy" data_info, data_bio = cls._prepare_npy(data_fname, value) result += [(data_info, data_bio)] return result @classmethod def _parse(cls, header: dict, members: dict): data = {key[:-4]: cls._parse_npy(bio) for key, bio in members.items()} data = [data[str(i)] for i in range(len(data))] ext = cls(values=data) return ext
[docs] def to_dict(self): data = {"data": self.data, "offsets": self.offsets} return data
[docs] @classmethod def from_dict(cls, data): obj = cls(values=data["data"], offsets=data["offsets"]) return obj
# def to_dict(self): # cls = self.__class__ # obj = {"header": self.header} # for i, v in enumerate(self.segments): # obj[str(i)] = cls._np_to_dict(v) # return obj # @classmethod # def from_dict(cls, header: dict, data: dict): # data = {name: cls._np_from_dict(d) for name, d in data.items()} # data = [data[str(i)] for i in range(len(data))] # obj = cls(values=data) # return obj def _save(self): data = {str(i): v for i, v in enumerate(self.segments)} ext = MultipleDataExtension(data=data) return ext @classmethod def _load(cls, ext: MultipleDataExtension): data = ext.data values = [data[str(i)] for i in range(len(data))] iv = cls(values=values) return iv def _save_v1(self, file, folder=""): """ Creates a npz structure, representing the vector Returns ------- data : bytes data to use """ b = io.BytesIO() np.savez(b, *self.segments) file.writestr(f"{folder}.npz", b.getvalue()) @classmethod def _load_v1(cls, file): # file: npzfile names = file.files values = [file[n] for n in names] return cls(values=values)