Source code for imagine.pipelines.pipeline

# %% IMPORTS
# Built-in imports
import abc
from itertools import chain
import logging as log
import tempfile
import os
from os import path
import shutil
from collections import defaultdict

# Package imports
from astropy.table import QTable
import astropy.units as apu
from e13tools import q2tex
from mpi4py import MPI
import numpy as np
from scipy.stats import norm as scipy_norm
from scipy.stats import pearsonr as scipy_pearsonr
import scipy.optimize as scipy_optimize
import IPython.display as ipd
import matplotlib.pyplot as plt
import hickle as hkl
import pandas as pd

# IMAGINE imports
from imagine import rc
from imagine.likelihoods import Likelihood
from imagine.fields import FieldFactory
from imagine.priors import Prior
from imagine.simulators import Simulator
from imagine.tools import BaseClass, ensemble_seed_generator, misc, visualization
from imagine.tools import io, Timer

# GLOBALS
comm = MPI.COMM_WORLD
mpisize = comm.Get_size()
mpirank = comm.Get_rank()

# All declaration
__all__ = ['Pipeline']


# %% CLASS DEFINITIONS
[docs]class Pipeline(BaseClass, metaclass=abc.ABCMeta): """ Base class used for for initialing Bayesian analysis pipeline Attributes ---------- likelihood_rescaler : double Rescale log-likelihood value random_type : str If set to 'fixed', the exact same set of ensemble seeds will be used for the evaluation of all fields, generated using the `master_seed`. If set to 'controllable', each individual field will get their own set of ensemble fields, but multiple runs will lead to the same results, as they are based on the same `master_seed`. If set to 'free', every time the pipeline is run, the `master_seed` is reset to a different value, and the ensemble seeds for each individual field are drawn based on this. master_seed : int Master seed used by the random number generators Parameters ---------- simulator : imagine.simulators.simulator.Simulator Simulator object factory_list : list List or tuple of field factory objects likelihood : imagine.likelihoods.likelihood.Likelihood Likelihood object ensemble_size : int Number of observable realizations to be generated by the simulator run_directory : str Directory where the pipeline state and reports are saved. chains_directory : str Path of the directory where the chains should be saved. By default, this is saved to a 'chains' subdirectory of `run_directory`. prior_correlations : dict Dictionary used to set up prior distribution correlations. If two parameters are A and B are correlated a priori, an entry should be added to the prior_correlations dictionary in the form `(name_A, name_B): True`, to extract the correlation from the samples (in the case of CustomPriors) or `(name_A, name_B): value` otherwise. show_summary_reports : bool If True (default), shows/saves a corner plot and shows the evidence after the pipeline run has finished show_progress_reports : bool If True, shows/saves a simple progress of the sampler during the run. n_evals_report : int The number of likelihood evaluations before showing a progress report """ def __init__(self, *, simulator, factory_list, likelihood, ensemble_size=1, run_directory=None, chains_directory=None, prior_correlations=None, show_summary_reports=True, show_progress_reports=False, n_evals_report=500): # Call super constructor super().__init__() self.factory_list = factory_list # NB setting the factory list automatically sets: the active parameters, # parameter ranges and priors, based on the list self.simulator = simulator self.likelihood = likelihood self.prior_correlations = prior_correlations self.run_directory = run_directory self.chains_directory = chains_directory self.sampling_controllers = {} self._distribute_ensemble = rc['pipeline_distribute_ensemble'] # Random seed settings self.random_type = 'controllable' # This may change on every execution if random_type=='free' self.master_seed = rc['pipeline_default_seed'] # The ensemble_seeds are fixed in the case of the 'fixed' random_type; # or are regenerated on each Field evaluation, in the 'free' and # 'controllable' cases self.ensemble_seeds = None self.ensemble_size = ensemble_size # rescaling total likelihood in _core_likelihood self.likelihood_rescaler = 1 # Checking likelihood threshold self.check_threshold = False # Place holders self.sampler = None self.results = None self._evidence = None self._evidence_err = None self._posterior_summary = None self._samples_array = None self._samples = None self.correlated_priors = None self._median_simulation = None self._median_model = None self._MAP_simulation = None self._MAP_model = None self._MAP = None # Report settings self.show_summary_reports = show_summary_reports self.show_progress_reports = show_progress_reports self.n_evals_report = n_evals_report self._likelihood_evaluations_counter = 0 self.intermediate_results = defaultdict(lambda: None)
[docs] def __call__(self, *args, save_pipeline_state=True, **kwargs): # Keeps the setup safe if save_pipeline_state: self.save() # Resets internal state and adjusts random seed self.tidy_up() result = self.call(*args, **kwargs) if self.show_summary_reports and (mpirank == 0): self.posterior_report() self.evidence_report() # Stores the final results if save_pipeline_state: self.save() return result
@property def run_directory(self): """ Directory where the chains are stored (NB details of what is stored are sampler-dependent) """ return self._run_directory @run_directory.setter def run_directory(self, run_directory): assert run_directory is not None, 'A valid run_directory must be specified' if mpirank == 0: # Creates new directory (if needed) os.makedirs(run_directory, exist_ok=True) assert path.isdir(run_directory), 'Unable to created run directory' comm.Barrier() self._run_directory = run_directory @property def chains_directory(self): """ Directory where the chains are stored (NB details of what is stored are sampler-dependent) """ return self._chains_directory @chains_directory.setter def chains_directory(self, chains_directory): if chains_directory is None: chains_directory = os.path.join(self._run_directory, 'chains') os.makedirs(chains_directory, exist_ok=True) assert path.isdir(chains_directory) self._chains_directory = chains_directory
[docs] def clean_chains_directory(self): """Removes the contents of the chains directory""" log.debug('@ pipeline::clean_chains_directory') if mpirank==0: for f in os.listdir(self._chains_directory): fullpath = path.join(self._chains_directory, f) try: shutil.rmtree(fullpath) except NotADirectoryError: os.remove(fullpath) comm.Barrier()
@property def active_parameters(self): """ List of all the active parameters """ # The user should not be able to set this attribute manually return self._active_parameters @property def wrapped_parameters(self): """ List of parameters which are periodic or "wrapped around" """ return [self._priors[k].wrapped for k in self._active_parameters] @property def priors(self): """ Dictionary containing priors for all active parameters """ # The user should not be able to set this attribute manually return self._priors @property def posterior_summary(self): r""" A dictionary containing a summary of posterior statistics for each of the active parameters. These are: 'median', 'errlo' (15.87th percentile), 'errup' (84.13th percentile), 'mean' and 'stdev'. """ if self._posterior_summary is None: samp = self.samples self._posterior_summary = {} for name, column in zip(samp.columns, samp.itercols()): lo, median, up = np.percentile(column, [15.865, 50, 84.135]) errlo = abs(median-lo) errup = abs(up-median) self._posterior_summary[name] = { 'median': median, 'errlo': errlo, 'errup': errup, 'mean': np.mean(column), 'stdev': np.std(column)} return self._posterior_summary @property def prior_correlations(self): return self._prior_correlations @prior_correlations.setter def prior_correlations(self, prior_correlations): if prior_correlations is None: # If None is provided, resets the correlations self._prior_correlations = None return else: # Otherwise, updates the prior correlations dictionary and # computes the L matrix # First, checks coefficient consistency correlated_priors = [] new_prior_correlations = {} for n in list(prior_correlations.keys()): t = [] for m in range(2): for f in self._factory_list: if list(n)[m] in f.active_parameters: newname = f.name + '_' + list(n)[m] correlated_priors.append(newname) t.append(newname) new_prior_correlations.update({tuple(t): prior_correlations[n]}) name_pairs = list(new_prior_correlations.keys()) if len(name_pairs) != len(list(set(name_pairs))): raise ValueError('Inconsistent prior correlations, ' 'possibly multiple values for the same coefficient') corr_matrix = np.eye(len(self._active_parameters)) for name_pair in name_pairs: i, j = (self._prior_cube_mapping[param] for param in name_pair) c = new_prior_correlations[name_pair] if isinstance(c, bool): if not c: raise ValueError() # Creates zero mean Gaussian distributions from the # original samples gaussian_pair = [] for name in name_pair: prior = self._priors[name] assert prior is not None # Makes sure the samples are within the selected range ok = (prior.samples >= prior.range[0]) ok *= (prior.samples <= prior.range[1]) x = scipy_norm.ppf(loc=0, scale=1, q=prior.cdf(prior.samples.value[ok])) gaussian_pair.append(x) # Computes the Pearson r correlation coefficient from the # pair of distributions c = scipy_pearsonr(*gaussian_pair)[0] else: # If this was supplied, check whether the value is consistent assert (-1 <= c <= 1) corr_matrix[i, j] = corr_matrix[j, i] = c # Computes the Cholesky L (lower-triangular) matrix which is used # to correlate the priors in the prior_transform method self._correlator_L = np.linalg.cholesky(corr_matrix) # Stores a list of correlated priors, for convenience self.correlated_priors = set(correlated_priors) # Finally, saves updated dictionary self._prior_correlations = new_prior_correlations
[docs] def corner_plot(self, **kwargs): """ Calls :py:func:`imagine.tools.visualization.corner_plot` to make a corner plot of samples produced by this Pipeline """ return visualization.corner_plot(pipeline=self, **kwargs)
[docs] def progress_report(self): """ Reports the progress of the inference """ # Try to call get_intermediate_results try: self.get_intermediate_results() # If this method is not implemented, show error instead and continue except NotImplementedError: print("Progress reports can only be made if the " "'get_intermediate_results()'-method is overridden! " "Skipping report.") # If this method is implemented, create progress report else: if mpirank!=0: return dead_samples = self.intermediate_results['rejected_points'] live_samples = self.intermediate_results['live_points'] likelihood = self.intermediate_results['logLikelihood'] lnX = self.intermediate_results['lnX'] if dead_samples is not None: fig = visualization.trace_plot( parameter_names=self._active_parameters, samples=dead_samples, live_samples=live_samples, likelihood=likelihood, lnX=lnX) fig_filepath = os.path.join(self._run_directory, 'progress_report.pdf') msg = 'Saving progress report to {}'.format(fig_filepath) log.info(msg) fig.savefig(fig_filepath) if misc.is_notebook(): ipd.clear_output() ipd.display(ipd.Markdown( "\n**Progress report:**\nnumber of likelihood " "evaluations {}".format( self._likelihood_evaluations_counter))) plt.show() else: print(msg)
[docs] def posterior_report(self, sdigits=2, **kwargs): """ Displays the best fit values and 1-sigma errors for each active parameter. Also produces a corner plot of the samples, which is saved to the run directory. If running on a jupyter-notebook, a nice LaTeX display is used, and the plot is shown. Parameters ---------- sdigits : int The number of significant digits to be used """ out = '' for param, pdict in self.posterior_summary.items(): if misc.is_notebook(): # Extracts LaTeX representation from astropy unit object out += r"\\ \text{{ {0}: }}\; ".format(param) out += q2tex(*map(pdict.get, ['median', 'errup', 'errlo']), sdigits=sdigits) out += r"\\" else: out += r"{0}: ".format(param) md, errlo, errup = map(pdict.get, ['median', 'errlo', 'errup']) if isinstance(md, apu.Quantity): unit = str(md.unit) md, errlo, errup = map(lambda x: x.value, [md, errlo, errup]) else: unit = "" v, l, u = misc.adjust_error_intervals( md, errlo, errup, sdigits=sdigits) out += r'{0} (-{1})/(+{2}) {3}\n'.format(v, l, u, unit) fig = self.corner_plot(**kwargs) fig.savefig(os.path.join(self._run_directory, 'corner_plot.pdf')) if misc.is_notebook(): ipd.display(ipd.Markdown("\n**Posterior report:**")) plt.show() ipd.display(ipd.Math(out)) else: # Restores linebreaks and prints print('Posterior report') print(out.replace(r'\n','\n'))
[docs] def evidence_report(self, sdigits=4): if not np.isfinite(self.log_evidence): return if misc.is_notebook(): ipd.display(ipd.Markdown("**Evidence report:**")) out = r"\log\mathcal{{ Z }} = " out += q2tex(self.log_evidence, self.log_evidence_err) ipd.display(ipd.Math(out)) else: print('Evidence report') print('logZ =', self.log_evidence, '±', self.log_evidence_err)
@property def log_evidence(self): r""" Natural logarithm of the *marginal likelihood* or *Bayesian model evidence*, :math:`\ln\mathcal{Z}`, where .. math:: \mathcal{Z} = P(d|m) = \int_{\Omega_\theta} P(d | \theta, m) P(\theta | m) \mathrm{d}\theta . Note ---- Available only after the pipeline is run. """ if self._evidence is None: raise ValueError('Evidence not set! Have you run the pipeline?') else: return self._evidence @property def log_evidence_err(self): """ Error estimate in the natural logarithm of the *Bayesian model evidence*. Available once the pipeline is run. Note ---- Available only after the pipeline is run. """ assert self._evidence_err is not None, 'Evidence error not set! Did you run the pipeline?' return self._evidence_err @property def median_model(self): """ Posterior median model List of Field objects corresponding to the median values of the distributions of parameter values found *after a Pipeline run*. """ if self._median_model is None: self._median_model = [] for factory in self.factory_list: params_dict = {} for param, results_dict in self.posterior_summary.items(): if factory.field_name in param: param = param.replace(factory.field_name + '_','') params_dict[param] = results_dict['median'] field = factory(ensemble_seeds=self.ensemble_seeds[factory], variables=params_dict) self._median_model.append(field) return self._median_model @property def MAP_model(self): """ Maximum a posteriori (MAP) model List of Field objects corresponding to the mode of the posterior distribution. This does not require a previous run of the Pipeline, as the MAP is computed by maximizing the unnormalized posterior distribution. This convenience property uses the results from the latest call of the :py:meth:`Pipeline.get_MAP` method. If :py:meth:`Pipeline.get_MAP` has never been called, the MAP is found calling it with with default arguments. See :py:meth:`Pipeline.get_MAP` for details. """ if self._MAP_model is None: if self._MAP is None: self.get_MAP() assert not isinstance(self._MAP, scipy_optimize.OptimizeResult), 'Try running get_MAP directly, with different parameters' self._MAP_model = [] for factory in self.factory_list: params_dict = {} for pname, MAP_val in zip(self.active_parameters, self._MAP): if factory.name in pname: params_dict[pname.replace(factory.name+'_','')] = MAP_val field = factory(ensemble_seeds=self.ensemble_seeds[factory], variables=params_dict) self._MAP_model.append(field) return self._MAP_model @property def median_simulation(self): """ Simulation corresponding to the :py:data:`median_model <Pipeline.median_model>`. """ if self._median_simulation is None: self._median_simulation = self.simulator(self.median_model) return self._median_simulation @property def MAP_simulation(self): """ Simulation corresponding to the :py:data:`MAP_model <Pipeline.MAP_model>`. """ if self._MAP_simulation is None: self._MAP_simulation = self.simulator(self.MAP_model) return self._MAP_simulation @property def samples(self): """ An :py:class:`astropy.table.QTable` object containing parameter values of the samples produced in the run. """ if self._samples is None: assert self._samples_array is not None, 'Samples not available. Did you run the pipeline?' self._samples = QTable(data=self._samples_array, names=self._active_parameters) # Restores the units for param, prior in self._priors.items(): self._samples[param] = self._samples[param] << prior.unit return self._samples @property def factory_list(self): """ List of the :py:class:`Field Factories <imagine.fields.field_factory.GeneralFieldFactory>` currently being used. Updating the factory list automatically extracts active_parameters, parameter ranges and priors from each field factory. """ return self._factory_list @factory_list.setter def factory_list(self, factory_list): # Notice that the parameter/variable ordering is fixed wrt # factory ordering. This is useful for recovering variable logic value # for each factory and necessary to construct the common prior function. assert isinstance(factory_list, (list, tuple)), 'Factory list must be a tuple or list' self._active_parameters = tuple() self._priors = dict() self._prior_cube_mapping = dict() i = 0 for factory in factory_list: assert isinstance(factory, FieldFactory) for ap_name in factory.active_parameters: if ap_name in self._prior_cube_mapping: raise KeyError('Ambiguous prior naming') self._prior_cube_mapping.update({factory.name+'_'+ap_name: i}) assert isinstance(ap_name, str) # Sets the parameters and ranges self._active_parameters += (str(factory.name+'_'+ap_name),) # Sets the Prior prior = factory.priors[ap_name] assert isinstance(prior, Prior) self._priors[factory.name+'_'+ap_name] = prior i += 1 self._factory_list = factory_list @property def sampler_supports_mpi(self): return getattr(self, 'SUPPORTS_MPI', False) @property def simulator(self): """ The :py:class:`Simulator <imagine.simulators.simulator.Simulator>` object used by the pipeline """ return self._simulator @simulator.setter def simulator(self, simulator): assert isinstance(simulator, Simulator) self._simulator = simulator @property def likelihood(self): """ The :py:class:`Likelihood <imagine.likelihoods.likelihood.Likelihood>` object used by the pipeline """ return self._likelihood @likelihood.setter def likelihood(self, likelihood): assert isinstance(likelihood, Likelihood) self._likelihood = likelihood
[docs] def prior_pdf(self, cube): """ Probability distribution associated with the all parameters being used by the multiple Field Factories Parameters ---------- cube : np.ndarray Each row of the array corresponds to a different parameter value in the sampling (dimensionless, but in the standard units of the prior). Returns ------- rtn : float Prior probability of the parameter choice specified by `cube` """ rtn = 1. for i, parameter in enumerate(self._active_parameters): prior = self._priors[parameter] rtn *= prior.pdf(cube[i]*prior.unit) return rtn
[docs] def prior_transform(self, cube): """ Prior transform cube Takes a cube containing a uniform sampling of values and maps then onto a distribution compatible with the priors specified in the Field Factories. If prior correlations were specified, these are applied to the cube (assumes the correlations can be well described by the correlations between gaussians). Parameters ---------- cube : array Each row of the array corresponds to a different parameter in the sampling. Returns ------- cube The modified cube """ cube_copy = cube.copy() if self.prior_correlations is not None: # Correlates the cube, using the previously computed Cholesky L matrix cube_copy = scipy_norm.cdf( self._correlator_L @ scipy_norm.ppf(cube_copy) ) for i, parameter in enumerate(self._active_parameters): val = self._priors[parameter](cube_copy[i]) if isinstance(val, apu.Quantity): val = val.value cube_copy[i] = val return cube_copy
@property def distribute_ensemble(self): """ If True, whenever the sampler requires a likelihood evaluation, the ensemble of stochastic fields realizations is distributed among all the nodes. Otherwise, each likelihood evaluations will go through the whole ensemble size on a single node. See :doc:`parallel` for details. """ return self._distribute_ensemble @distribute_ensemble.setter def distribute_ensemble(self, distr_ensemble): # Saves the choice self._distribute_ensemble = distr_ensemble # Calls the setter method to ajust hidden parts self.ensemble_size = self.ensemble_size @property def ensemble_size(self): return self._ensemble_size @ensemble_size.setter def ensemble_size(self, ensemble_size): ensemble_size = int(ensemble_size) assert (ensemble_size > 0) self._ensemble_size = ensemble_size log.debug('set ensemble size to %i' % int(ensemble_size)) if self._distribute_ensemble: # Sets pointer to the correct likelihood function self._likelihood_function = self._mpi_likelihood # Sets effective ensemble size if self.ensemble_size % mpisize != 0: raise ValueError("In 'distribute_ensemble' mode, ensemble_size " "must be a multiple of the number of MPI nodes") self.ensemble_size_actual = self.ensemble_size // mpisize else: # Sets pointer to the correct likelihood function self._likelihood_function = self._core_likelihood # Sets effective ensemble size self.ensemble_size_actual = self.ensemble_size self._randomness() @property def sampling_controllers(self): """ Settings used by the sampler (e.g. `'dlogz'`). See the documentation of each specific pipeline subclass for details. After the pipeline runs, this property is updated to reflect the actual final choice of sampling controllers (including default values). """ return self._sampling_controllers @sampling_controllers.setter def sampling_controllers(self, pp_dict): try: self._sampling_controllers.update(pp_dict) except AttributeError: self._sampling_controllers = pp_dict
[docs] def tidy_up(self): """ Resets internal state before a new run """ log.debug('@ pipeline::tidy_up') self.results = None self._evidence = None self._evidence_err = None self._posterior_summary = None self._samples_array = None self._samples = None self._median_model = None self._median_simulation = None self._randomness() self._likelihood_evaluations_counter = 0 self.intermediate_results = defaultdict(lambda: None)
def _randomness(self): """ Manipulate random seed(s) isolating this process for convenience of testing """ log.debug('@ pipeline::_randomness') assert self.random_type in ('free', 'controllable', 'fixed') if self.random_type == 'free': # Refreshes the master seed self.master_seed = np.random.randint(0, 2**32) # Updates numpy random accordingly np.random.seed(self.master_seed) if self.random_type == 'fixed': common_ensemble_seeds = ensemble_seed_generator(self.ensemble_size_actual) self.ensemble_seeds = {factory: common_ensemble_seeds for factory in self._factory_list} elif self.random_type == 'controllable': self.ensemble_seeds = {factory: ensemble_seed_generator(self.ensemble_size_actual) for factory in self._factory_list} else: self.ensemble_seeds = {factory: None for factory in self._factory_list} # This function returns all parameter names of all factories in order
[docs] def get_par_names(self): # Create list of names names = list(chain(*map( lambda x: ["%s_%s" % (x.name, par) for par in x.active_parameters], self._factory_list))) # Return them return(names)
def _get_observables(self, cube): # return active variables from pymultinest cube to factories # and then generate new field objects head_idx = 0 tail_idx = 0 field_list = tuple() # the ordering in factory list and variable list is vital for factory in self._factory_list: variable_dict = dict() tail_idx = head_idx + len(factory.active_parameters) factory_cube = cube[head_idx:tail_idx] for i, av in enumerate(factory.active_parameters): variable_dict[av] = factory_cube[i]*self._priors[factory.field_name + '_' + av].unit ensemble_seeds = self.ensemble_seeds[factory] field_list += (factory(variables=variable_dict, ensemble_size=self.ensemble_size_actual, ensemble_seeds=ensemble_seeds),) log.debug('create '+factory.name+' field') head_idx = tail_idx assert(head_idx == len(self._active_parameters)) observables = self._simulator(field_list) return(observables)
[docs] def test(self, n_points=3, include_centre=True, verbose=True): """ Tests the present IMAGINE pipeline evaluating the likelihood on a small number of points and reporting the run-time. Paramters --------- n_points : int Number of points to evaluate the likelihood on. The first point corresponds to the centre of the active parameter ranges (unless `include_centre` is set to `False`) and the other are randomly sampled from the prior distributions. include_centre : bool If True, the initial point will be obtained from the centre of the active parameter ranges. Returns ------- mean_time : astropy.units.Quantity The average execution time of a single likelihood evaluation """ n_params = len(self.active_parameters) timer = Timer() times = [] for i_point in range(n_points): timer.tick(i_point) if (i_point == 0) and include_centre: if verbose: print('Sampling centres of the parameter ranges.') values = self.parameter_central_value() else: if verbose: print('Randomly sampling from prior.') # Draws a random point from the prior distributions values = self.prior_transform(np.random.random_sample(n_params)) if verbose: print('\tEvaluating point:', values) L = self._likelihood_function(values) time = timer.tock(i_point) if verbose: print('\tLog-likelihood', L) print('\tTotal execution time: ', time,'s\n') times.append(time) mean_time = np.mean(times) * apu.s if verbose: print('Average execution time:', mean_time) return mean_time
def _core_likelihood(self, cube): """ core log-likelihood calculator Parameters ---------- cube list of variable values Returns ------- log-likelihood value """ log.debug('@ pipeline::_core_likelihood') log.debug('sampler at %s' % str(cube)) # Obtain observables for provided cube observables = self._get_observables(cube) # add up individual log-likelihood terms current_likelihood = self.likelihood(observables) current_likelihood *= self.likelihood_rescaler # check likelihood value until negative (or no larger than given threshold) if self.check_threshold and current_likelihood > self.likelihood_threshold: raise ValueError('log-likelihood beyond threshold') # Logs the value log.info('Likelihood evaluation at point:' ' {0} value: {1}'.format(cube, current_likelihood)) # Reports, if needed self._likelihood_evaluations_counter += 1 if (self.show_progress_reports and self._likelihood_evaluations_counter % self.n_evals_report == 0): if mpirank==0: self.progress_report() return current_likelihood def _mpi_likelihood(self, cube): """ mpi log-likelihood calculator PyMultinest supports execution with MPI where sampler on each node follows DIFFERENT journeys in parameter space but keep in communication so we need to firstly register parameter position on each node and calculate log-likelihood value of each node with joint force of all nodes in this way, ensemble size is multiplied by the number of working nodes Parameters ---------- cube list of variable values Returns ------- log-likelihood value """ if self.sampler_supports_mpi: log.debug('@ pipeline::_mpi_likelihood') # Gathers cubes from all nodes cube_local_size = cube.size cube_pool = np.empty(cube_local_size*mpisize, dtype=np.float64) comm.Allgather([cube, MPI.DOUBLE], [cube_pool, MPI.DOUBLE]) # Calculates log-likelihood for each node loglike_pool = np.empty(mpisize, dtype=np.float64) for i in range(mpisize): # loop through nodes cube_local = cube_pool[i*cube_local_size : (i+1)*cube_local_size] loglike_pool[i] = self._core_likelihood(cube_local) # Scatters log-likelihood to each node loglike_local = np.empty(1, dtype=np.float64) comm.Scatter([loglike_pool, MPI.DOUBLE], [loglike_local, MPI.DOUBLE], root=0) return loglike_local[0] # Some samplers require a scalar value else: log.debug('@ dynesty_pipeline::_mpi_likelihood') # gather cubes from all nodes cube_local_size = cube.size cube_pool = np.empty(cube_local_size*mpisize, dtype=np.float64) comm.Allgather([cube, MPI.DOUBLE], [cube_pool, MPI.DOUBLE]) # check if all nodes are at the same parameter-space position assert ((cube_pool == np.tile(cube_pool[:cube_local_size], mpisize)).all()) return self._core_likelihood(cube)
[docs] def get_intermediate_results(self): raise NotImplementedError
[docs] @abc.abstractmethod def call(self, **kwargs): raise NotImplementedError
[docs] def save(self, **kwargs): """ Saves the state of the Pipeline The `run_directory` set in initialization is used. Any distributed data is gathered and the pipeline is serialized and saved to disk. Note ---- This method uses :py:meth:`imagine.tools.io.save_pipeline` """ return io.save_pipeline(self, **kwargs)
[docs] @classmethod def load(cls, directory_path='.'): """ Loads the state of a Pipeline object Parameters ---------- directory_path : str Path to the directory where the Pipeline state should be saved Note ---- This method uses :py:meth:`imagine.tools.io.load_pipeline` """ return io.load_pipeline(directory_path)
[docs] def prepare_likelihood_convergence_report(self, min_Nens=10, max_Nens=50, n_seeds=1, n_points=5, include_centre=True): """ Constructs a report dataset based on a given Pipeline setup, which can be used for studying the *likelihood convergence* in a particular problem The pipeline's ensemble size is temporarily set to `Nens*n_seeds`, and (for each point) the present pipeline setup is used to compute a Simulations dictionary object. Subsets of this simulations object are then produced and the likelihood computed. The end result is a :py:obj:`pandas.DataFrame` containing the following columns: * `likelihood` - The likelihood value. * `likelihood_std` - The likelihood dispersion, estimated by bootstrapping the available ensemble and computing the standard deviation. * `ensemble_size` - Size of the ensemble of simulations used. * `ipoint` - Index of the point used. * `iseed` - Index of the random (master) seed used. * `param_values` - Values of the parameters at a given point. Parameters ---------- min_Nens : int Minimum ensemble size to be considered max_Nens : int Maximum ensemble size to be examined n_seeds : int Number of different (master) random seeds to be used n_points : int Number of points to be evaluated. Points are randomly drawn from the *prior* distribution (but see `include_centre`). include_centre : bool If `True`, the first point is taken as the value corresponding to the centre of each parameter range. Returns ------- results : pandas.DataFrame A `pandas.DataFrame` object containing the report data. """ # Saves original choice for ensemble size original_size = self.ensemble_size original_likelihood_dispersion_switch = self.likelihood.compute_dispersion self.likelihood.compute_dispersion = True self.ensemble_size = max_Nens * n_seeds n_params = len(self.active_parameters) results = defaultdict(list) # Samples the prior distributions for i_point in range(n_points): if (i_point == 0) and include_centre: values = self.parameter_central_value() i_point = 'centre' else: # Draws a random point from the prior distributions values = self.prior_transform(np.random.random_sample(n_params)) # Produces max_Nens*n_seeds outputs from this maps = self._get_observables(values) # Now, we construct smaller Simulations dicts based on a # subset of `maps` for i_seed in range(n_seeds): for Nens in range(min_Nens,max_Nens+1): # Constructs a slice corresponding to a subset indices = slice(i_seed*Nens, (i_seed+1)*Nens) # Constructs the subset Simulations and computes likelihood maps_subset = maps.sub_sim(indices) L_value, L_std = self.likelihood(maps_subset) # Stores everything results['likelihood'].append(L_value) results['likelihood_std'].append(L_std) results['ensemble_size'].append(Nens) results['ipoint'].append(i_point) results['iseed'].append(i_seed) results['param_values'].append(values) # Restores original pipeline settings self.ensemble_size = original_size self.likelihood.compute_dispersion = original_likelihood_dispersion_switch return pd.DataFrame(data=results)
[docs] def likelihood_convergence_report(self, cmap='cmr.chroma', **kwargs): """ Prepares a standard set of plots of a likelihood convergence report (produced by the :py:meth:`Pipeline.prepare_likelihood_convergence_report` method). Parameters ---------- cmap : str Colormap to be used for the lineplots **kwargs Keyword arguments that will be supplied to `prepare_likelihood_convergence_report` (see its docstring for details). """ rep = self.prepare_likelihood_convergence_report(**kwargs) visualization.show_likelihood_convergence_report(rep, cmap)
[docs] def parameter_central_value(self): """ Gets central point in the parameter space of a given pipeline The ranges are extracted from each prior. The result is a pure list of numbers corresponding to the values in the native units of each prior. For non-finite ranges (e.g. [-inf,inf]), a zero central value is assumed. Returns ------- central_values : list A list of parameter values at the centre of each parameter range """ central_values = [] for param in self.active_parameters: centre = np.mean(self.priors[param].range) # Removes units if hasattr(centre,'value'): centre = centre.value # Deals with priors with undefined ranges if not np.isfinite(centre): centre = 0. central_values.append(centre) return central_values
[docs] def log_probability_unnormalized(self, theta): """ The log of the unnormalized posterior probability -- i.e. the sum of the log-likelihood and the log of the prior probability. Parameters ---------- theta : np.ndarray Array of parameter values (in their default units) Returns ------- log_prob : float log(likelihood(theta))+log(prior(theta)) """ # Checks whether the range is sane for pname, v in zip(self.active_parameters, theta): pmin, pmax = self.priors[pname].range.value if not (pmin < v < pmax): return -np.inf # Computes the (log) prior lp = np.log(self.prior_pdf(theta)) if not np.isfinite(lp): return -np.inf # Multiplies by the likelihood log_prob = lp + self._likelihood_function(theta) return log_prob
[docs] def get_MAP(self, initial_guess='auto', include_units=True, return_optimizer_result=False, **kwargs): """ Computes the parameter values at the Maximum A Posteriori This method uses `scipy.optimize.minimize` for the calculation. By default, it will use the `'Powell' <https://docs.scipy.org/doc/scipy/reference/optimize.minimize-powell.html#optimize-minimize-powell>`_ (which does not require the calculation of gradients or the assumption of continuity). Also by default the `bounds` keywords use the ranges specified in the priors. The MAP estimate is stored internally to be used by the properties: :py:data:`MAP_model` and :py:data:`MAP_simulation`. Parameters ---------- include_units : bool If `True` (default), returns the result as list of `Quantities <astropy.units.Quantity>`. Otherwise, returns a single numpy array with the parameter values in their default units. return_optimizer_result : bool If `False` (default) only the MAP values are returned. Otherwise, a tuple containing the values and the output of `scipy.optimize.minimize` is returned instead. initial_guess : str or numpy.ndarray The initial guess used by the optimizer. If set to 'centre', the centre of each parameter range is used (obtained using :py:meth:`parameter_central_value`). If set to 'samples', the median values of the samples produced by a previous posterior sampling are used (the Pipeline has to have been run before). If set to 'auto' (default), use 'samples' if available, and 'centre' otherwise. Alternatively, an array of active parameter values with the starting position may be provided. **kwargs Any other keyword arguments are passed directly to `scipy.optimize.minimize`. Returns ------- MAP : list or array Parameter values at the position of the maximum of the posterior distribution result : scipy.optimize.OptimizeResult Only if `return_optimizer_result` is set to `True`, this is returned together with the MAP. """ # By default, uses Powell which does not require calculation # of gradients and uses the 'bounds' information if 'method' not in kwargs: kwargs['method'] = 'Powell' # Finds ranges if 'bounds' not in kwargs: kwargs['bounds'] = [self.priors[k].range.value for k in self.active_parameters] # Sets function to minimize fun = lambda theta: -np.nan_to_num(self.log_probability_unnormalized(theta)) # Avoid a (sampling) progress report to appear during the minimization progress_reports_state = self.show_progress_reports self.show_progress_reports = False # If the pipeline had previously run, the posterior median as the # initial guess, otherwise, uses the centres of the ranges if ((initial_guess == 'samples') or (initial_guess == 'auto' and (self._samples_array is not None))): initial_guess = [self.posterior_summary[k]['median'].value for k in self.active_parameters] elif (initial_guess == 'centre') or (initial_guess == 'auto'): initial_guess = self.parameter_central_value() else: initial_guess = np.array(initial_guess) # Finds the MAP result = scipy_optimize.minimize(fun, initial_guess, **kwargs) if not result.success: print('Unable to find MAP! Check your settings.') return result MAP = result.x # Restores the original setup self.show_progress_reports = progress_reports_state if include_units: MAP = list(MAP) # Adds units to parameters for i, pname in enumerate(self.active_parameters): MAP[i] *= self.priors[pname].unit # Stores for later use self._MAP = MAP self._MAP_model = None self._MAP_simulation = None if not return_optimizer_result: return MAP else: return MAP, result