Source code for imagine.observables.observable_dict

"""
For convenience we define dictionary of Observable objects as
ObservableDict from which one can define define the classes Measurements,
Covariances, Simulations and Masks, which can be used to store:

    * measured data sets
    * measured/simulated covariances
    * simulated ensemble sets
    * mask "maps" (but actually mask lists)


Conventions for observables entries
    * **Faraday depth:** `('fd','nan',str(size/Nside),'nan')`
    * **Dispersion measure**: `('dm','nan',str(pix),'nan')`
    * **Synchrotron emission**: `('sync',str(freq),str(pix),X)`
        where X stands for:
            * 'I' - total intensity (in unit K-cmb)
            * 'Q' - Stokes Q (in unit K-cmb, IAU convention)
            * 'U' - Stokes U (in unit K-cmb, IAU convention)
            * 'PI' - polarisation intensity (in unit K-cmb)
            * 'PA' - polarisation angle (in unit rad, IAU convention)

    Remarks:
        * `str(freq)`, polarisation-related-flag are redundant for Faraday depth
            and dispersion measure so we put 'nan' instead
        * `str(pix/nside)` stores either Healpix Nside, or just number of
            pixels/points we do this for flexibility, in case users have non-HEALPix
            based in/output

Remarks on the observable tags
    str(freq), polarisation-related-flag are redundant for Faraday depth and dispersion measure so we put 'nan' instead

    str(pix/nside) stores either Healpix Nisde, or just number of pixels/points
    we do this for flexibility, in case users have non-HEALPix-map-like in/output

Masking convention
    masked erea associated with pixel value 0,
    unmasked area with pixel value 1

Masking method
    mask only applies to observables/covariances
    observable after masking will be re-recorded as plain data type


.. note:: Distribution with MPI

    * all data are either distributed or copied, where "copied" means each node
      stores the identical copy which is convenient for hosting measured data
      and mask map

    * Covariances has Field object with global shape "around"
      (data_size//mpisize, data_size) "around" means to distribute matrix
      correctly as described in "imagine/tools/mpi_helper.py"
"""

# %% IMPORTS
# Built-in imports
import abc
import logging as log

# Package imports
import numpy as np

# IMAGINE imports
from imagine.observables.dataset import Dataset, HEALPixDataset
from imagine.observables.observable import Observable
from imagine.tools import BaseClass, mask_cov, mask_obs, req_attr

# All declaration
__all__ = ['ObservableDict', 'Masks', 'Measurements', 'Simulations',
           'Covariances']


# %% CLASS DEFINITIONS
[docs]class ObservableDict(BaseClass, metaclass=abc.ABCMeta): """ Base class from which `imagine.observables.observable_dict.Measurements`, `imagine.observables.observable_dict.Covariances`, `imagine.observables.observable_dict.Simulations` and `imagine.observables.observable_dict.Masks` are derived. See `imagine.observables.observable_dict` module documentation for further details. """ def __init__(self): # Call super constructor super().__init__() self._archive = {} @property def archive(self): return self._archive
[docs] def keys(self): return self._archive.keys()
def __getitem__(self, key): return self._archive[key]
[docs] @abc.abstractmethod def append(self, name, new_data, plain=False): """ Adds/updates name and data Parameters ---------- name : str tuple Should follow the convention: ``(data-name,str(data-freq),str(data-Nside/size),str(ext))``. If data is independent from frequency, set 'nan'. `ext` can be 'I','Q','U','PI','PA', 'nan' or other customized tags. new_data distributed/copied :py:class:`numpy.ndarray` or :py:class:`Observable <imagine.observables.observable.Observable>` plain : bool If True, means unstructured data. If False (default case), means HEALPix-like sky map. """ raise NotImplementedError
[docs] @abc.abstractmethod def apply_mask(self, mask_dict): """ Parameters ---------- mask_dict : imagine.observables.observable_dict.Masks Masks object """ raise NotImplementedError
[docs]class Masks(ObservableDict): """ Stores HEALPix mask maps See `imagine.observables.observable_dict` module documentation for further details. """
[docs] def append(self, name, new_data, plain=False): """ Adds/updates name and data Parameters ---------- name : str tuple Should follow the convention: ``(data-name,str(data-freq),str(data-Nside/"tab"),str(ext))``. If data is independent from frequency, set 'nan'. `ext` can be 'I','Q','U','PI','PA', 'nan' or other customized tags. new_data distributed/copied :py:class:`numpy.ndarray` or :py:class:`Observable <imagine.observables.observable.Observable>` plain : bool If True, means unstructured data. If False (default case), means HEALPix-like sky map. """ log.debug('@ observable_dict::Masks::append') assert (len(name) == 4) if isinstance(new_data, Observable): assert (new_data.dtype == 'measured') if not plain: assert (new_data.size == 12*np.uint(name[2])**2) self._archive.update({name: new_data}) elif isinstance(new_data, np.ndarray): assert (new_data.shape[0] == 1) if not plain: assert (new_data.shape[1] == 12*np.uint(name[2])*np.uint(name[2])) self._archive.update({name: Observable(new_data, 'measured')}) else: raise TypeError('unsupported data type')
[docs] def apply_mask(self, *args, **kwargs): pass
[docs]class Measurements(ObservableDict): """ Stores observational data sets See `imagine.observables.observable_dict` module documentation for further details. """
[docs] def append(self, name=None, new_data=None, plain=False, dataset=None): """ Adds/updates name and data Parameters ---------- dataset : imagine.observables.dataset.Dataset The IMAGINE dataset already adjusts the format of the data and sets the adequate key. If `dataset` is present, all other arguments will be ignored. name : str tuple Should follow the convention: ``(data-name,str(data-freq),str(data-Nside)/"tab",str(ext))``. If data is independent from frequency, set 'nan'. `ext` can be 'I','Q','U','PI','PA', 'nan' or other customized tags. new_data : numpy.ndarray or imagine.observables.observable.Observable distributed/copied :py:class:`numpy.ndarray` or :py:class:`Observable <imagine.observables.observable.Observable>` plain : bool If True, means unstructured/tabular data. If False (default case), means HEALPix-like sky map. """ log.debug('@ observable_dict::Measurements::append') if dataset is not None: assert isinstance(dataset, Dataset) name = dataset.key new_data = dataset.data coords = dataset.coords if isinstance(dataset, HEALPixDataset): plain=False else: plain=True else: coords=None assert (len(name) == 4), 'Wrong format for Observable key!' if isinstance(new_data, Observable): assert (new_data.dtype == 'measured') if not plain: assert (new_data.size == 12*np.uint(name[2])**2) self._archive.update({name: new_data}) # rw elif isinstance(new_data, np.ndarray): if not plain: assert (new_data.shape[1] == 12*np.uint(name[2])**2) self._archive.update({name: Observable(data=new_data, dtype='measured', coords=coords)}) # rw else: raise TypeError('Unsupported data type')
[docs] def apply_mask(self, mask_dict=None): log.debug('@ observable_dict::Measurements::apply_mask') if mask_dict is None: pass else: assert isinstance(mask_dict, Masks) for name, msk in mask_dict._archive.items(): if name in self._archive.keys(): masked = mask_obs(self._archive[name].data, msk.data) new_name = (name[0], name[1], str(masked.shape[1]), name[3]) self._archive.pop(name, None) # pop out obsolete data self.append(new_name, masked, plain=True) # append new as plain data
[docs]class Simulations(ObservableDict): """ Stores simulated ensemble sets See `imagine.observables.observable_dict` module documentation for further details. """
[docs] def append(self, name, new_data, plain=False): """ Adds/updates name and data Parameters ---------- name : str tuple Should follow the convention: ``(data-name,str(data-freq),str(data-Nside)/"tab",str(ext))``. If data is independent from frequency, set 'nan'. `ext` can be 'I','Q','U','PI','PA', 'nan' or other customized tags. new_data distributed/copied :py:class:`numpy.ndarray` or :py:class:`Observable <imagine.observables.observable.Observable>` plain : bool If True, means unstructured data. If False (default case), means HEALPix-like sky map. """ log.debug('@ observable_dict::Simulations::append') assert (len(name) == 4) if name in self._archive.keys(): # app self._archive[name].rw_flag = False self._archive[name].append(new_data) else: # new_data if isinstance(new_data, Observable): if not plain: assert (new_data.size == 12*np.uint64(name[2])**2) self._archive.update({name: new_data}) elif isinstance(new_data, np.ndarray): # distributed data self._archive.update({name: Observable(new_data, 'simulated')}) else: raise TypeError('unsupported data type')
[docs] def apply_mask(self, mask_dict): log.debug('@ observable_dict::Simulations::apply_mask') if mask_dict is None: pass else: assert isinstance(mask_dict, Masks) for name, msk in mask_dict._archive.items(): if name in self._archive.keys(): masked = mask_obs(self._archive[name].data, msk.data) new_name = (name[0], name[1], str(masked.shape[1]), name[3]) self._archive.pop(name, None) # pop out obsolete self.append(new_name, masked, plain=True) # append new as plain data
[docs]class Covariances(ObservableDict): """ Stores observational covariances See `imagine.observables.observable_dict` module documentation for further details. """
[docs] def append(self, name=None, new_data=None, plain=False, dataset=None): """ Adds/updates name and data Parameters ---------- name : str tuple Should follow the convention: ``(data-name,str(data-freq),str(data-Nside)/"tab",str(ext))``. If data is independent from frequency, set 'nan'. `ext` can be 'I','Q','U','PI','PA', 'nan' or other customized tags. data distributed/copied ndarray/Observable plain : bool If True, means unstructured data. If False (default case), means HEALPix-like sky map. """ log.debug('@ observable_dict::Covariances::append') if dataset is not None: assert isinstance(dataset, Dataset) name = dataset.key new_data = dataset.cov if isinstance(dataset, HEALPixDataset): plain=False else: plain=True assert (len(name) == 4) if isinstance(new_data, Observable): # always rewrite if not plain: assert (new_data.size == 12*np.uint64(name[2])**2) self._archive.update({name: new_data}) # rw elif isinstance(new_data, np.ndarray): if not plain: assert (new_data.shape[1] == 12*np.uint64(name[2])**2) self._archive.update({name: Observable(new_data, 'covariance')}) else: raise TypeError('unsupported data type')
[docs] def apply_mask(self, mask_dict): log.debug('@ observable_dict::Covariances::apply_mask') if mask_dict is None: pass else: assert isinstance(mask_dict, Masks) for name, msk in mask_dict._archive.items(): if name in self._archive.keys(): masked = mask_cov(self._archive[name].data, msk.data) new_name = (name[0], name[1], str(masked.shape[1]), name[3]) self._archive.pop(name, None) # pop out obsolete self.append(new_name, masked, plain=True) # append new as plain data