# %% IMPORTS
# Built-in imports
import abc
from itertools import chain
import logging as log
import tempfile
import os
from os import path
import shutil
from collections import defaultdict
# Package imports
from astropy.table import QTable
import astropy.units as apu
from e13tools import q2tex
from mpi4py import MPI
import numpy as np
from scipy.stats import norm as scipy_norm
from scipy.stats import pearsonr as scipy_pearsonr
import scipy.optimize as scipy_optimize
import IPython.display as ipd
import matplotlib.pyplot as plt
import hickle as hkl
import pandas as pd
# IMAGINE imports
from imagine import rc
from imagine.likelihoods import Likelihood
from imagine.fields import FieldFactory
from imagine.priors import Prior
from imagine.simulators import Simulator
from imagine.tools import BaseClass, ensemble_seed_generator, misc, visualization
from imagine.tools import io, Timer
# GLOBALS
comm = MPI.COMM_WORLD
mpisize = comm.Get_size()
mpirank = comm.Get_rank()
# All declaration
__all__ = ['Pipeline']
# %% CLASS DEFINITIONS
[docs]class Pipeline(BaseClass, metaclass=abc.ABCMeta):
"""
Base class used for for initialing Bayesian analysis pipeline
Attributes
----------
likelihood_rescaler : double
Rescale log-likelihood value
random_type : str
If set to 'fixed', the exact same set of ensemble seeds will be used
for the evaluation of all fields, generated using the `master_seed`.
If set to 'controllable', each individual field will get their own set
of ensemble fields, but multiple runs will lead to the same results,
as they are based on the same `master_seed`.
If set to 'free', every time the pipeline is run, the `master_seed` is
reset to a different value, and the ensemble seeds for each individual
field are drawn based on this.
master_seed : int
Master seed used by the random number generators
Parameters
----------
simulator : imagine.simulators.simulator.Simulator
Simulator object
factory_list : list
List or tuple of field factory objects
likelihood : imagine.likelihoods.likelihood.Likelihood
Likelihood object
ensemble_size : int
Number of observable realizations to be generated by the simulator
run_directory : str
Directory where the pipeline state and reports are saved.
chains_directory : str
Path of the directory where the chains should be saved. By default,
this is saved to a 'chains' subdirectory of `run_directory`.
prior_correlations : dict
Dictionary used to set up prior distribution correlations. If two
parameters are A and B are correlated a priori, an entry should be
added to the prior_correlations dictionary in the form
`(name_A, name_B): True`, to extract the correlation from the samples
(in the case of CustomPriors) or `(name_A, name_B): value` otherwise.
show_summary_reports : bool
If True (default), shows/saves a corner plot and shows the evidence
after the pipeline run has finished
show_progress_reports : bool
If True, shows/saves a simple progress of the sampler during the run.
n_evals_report : int
The number of likelihood evaluations before showing a progress report
"""
def __init__(self, *, simulator, factory_list, likelihood, ensemble_size=1,
run_directory=None, chains_directory=None,
prior_correlations=None, show_summary_reports=True,
show_progress_reports=False, n_evals_report=500):
# Call super constructor
super().__init__()
self.factory_list = factory_list
# NB setting the factory list automatically sets: the active parameters,
# parameter ranges and priors, based on the list
self.simulator = simulator
self.likelihood = likelihood
self.prior_correlations = prior_correlations
self.run_directory = run_directory
self.chains_directory = chains_directory
self.sampling_controllers = {}
self._distribute_ensemble = rc['pipeline_distribute_ensemble']
# Random seed settings
self.random_type = 'controllable'
# This may change on every execution if random_type=='free'
self.master_seed = rc['pipeline_default_seed']
# The ensemble_seeds are fixed in the case of the 'fixed' random_type;
# or are regenerated on each Field evaluation, in the 'free' and
# 'controllable' cases
self.ensemble_seeds = None
self.ensemble_size = ensemble_size
# rescaling total likelihood in _core_likelihood
self.likelihood_rescaler = 1
# Checking likelihood threshold
self.check_threshold = False
# Place holders
self.sampler = None
self.results = None
self._evidence = None
self._evidence_err = None
self._posterior_summary = None
self._samples_array = None
self._samples = None
self.correlated_priors = None
self._median_simulation = None
self._median_model = None
self._MAP_simulation = None
self._MAP_model = None
self._MAP = None
# Report settings
self.show_summary_reports = show_summary_reports
self.show_progress_reports = show_progress_reports
self.n_evals_report = n_evals_report
self._likelihood_evaluations_counter = 0
self.intermediate_results = defaultdict(lambda: None)
[docs] def __call__(self, *args, save_pipeline_state=True, **kwargs):
# Keeps the setup safe
if save_pipeline_state:
self.save()
# Resets internal state and adjusts random seed
self.tidy_up()
result = self.call(*args, **kwargs)
if self.show_summary_reports and (mpirank == 0):
self.posterior_report()
self.evidence_report()
# Stores the final results
if save_pipeline_state:
self.save()
return result
@property
def run_directory(self):
"""
Directory where the chains are stored
(NB details of what is stored are sampler-dependent)
"""
return self._run_directory
@run_directory.setter
def run_directory(self, run_directory):
assert run_directory is not None, 'A valid run_directory must be specified'
if mpirank == 0:
# Creates new directory (if needed)
os.makedirs(run_directory, exist_ok=True)
assert path.isdir(run_directory), 'Unable to created run directory'
comm.Barrier()
self._run_directory = run_directory
@property
def chains_directory(self):
"""
Directory where the chains are stored
(NB details of what is stored are sampler-dependent)
"""
return self._chains_directory
@chains_directory.setter
def chains_directory(self, chains_directory):
if chains_directory is None:
chains_directory = os.path.join(self._run_directory, 'chains')
os.makedirs(chains_directory, exist_ok=True)
assert path.isdir(chains_directory)
self._chains_directory = chains_directory
[docs] def clean_chains_directory(self):
"""Removes the contents of the chains directory"""
log.debug('@ pipeline::clean_chains_directory')
if mpirank==0:
for f in os.listdir(self._chains_directory):
fullpath = path.join(self._chains_directory, f)
try:
shutil.rmtree(fullpath)
except NotADirectoryError:
os.remove(fullpath)
comm.Barrier()
@property
def active_parameters(self):
"""
List of all the active parameters
"""
# The user should not be able to set this attribute manually
return self._active_parameters
@property
def wrapped_parameters(self):
"""
List of parameters which are periodic or "wrapped around"
"""
return [self._priors[k].wrapped for k in self._active_parameters]
@property
def priors(self):
"""
Dictionary containing priors for all active parameters
"""
# The user should not be able to set this attribute manually
return self._priors
@property
def posterior_summary(self):
r"""
A dictionary containing a summary of posterior statistics for each of
the active parameters. These are: 'median', 'errlo'
(15.87th percentile), 'errup' (84.13th percentile), 'mean' and 'stdev'.
"""
if self._posterior_summary is None:
samp = self.samples
self._posterior_summary = {}
for name, column in zip(samp.columns, samp.itercols()):
lo, median, up = np.percentile(column, [15.865, 50, 84.135])
errlo = abs(median-lo)
errup = abs(up-median)
self._posterior_summary[name] = {
'median': median,
'errlo': errlo,
'errup': errup,
'mean': np.mean(column),
'stdev': np.std(column)}
return self._posterior_summary
@property
def prior_correlations(self):
return self._prior_correlations
@prior_correlations.setter
def prior_correlations(self, prior_correlations):
if prior_correlations is None:
# If None is provided, resets the correlations
self._prior_correlations = None
return
else:
# Otherwise, updates the prior correlations dictionary and
# computes the L matrix
# First, checks coefficient consistency
correlated_priors = []
new_prior_correlations = {}
for n in list(prior_correlations.keys()):
t = []
for m in range(2):
for f in self._factory_list:
if list(n)[m] in f.active_parameters:
newname = f.name + '_' + list(n)[m]
correlated_priors.append(newname)
t.append(newname)
new_prior_correlations.update({tuple(t): prior_correlations[n]})
name_pairs = list(new_prior_correlations.keys())
if len(name_pairs) != len(list(set(name_pairs))):
raise ValueError('Inconsistent prior correlations, '
'possibly multiple values for the same coefficient')
corr_matrix = np.eye(len(self._active_parameters))
for name_pair in name_pairs:
i, j = (self._prior_cube_mapping[param] for param in name_pair)
c = new_prior_correlations[name_pair]
if isinstance(c, bool):
if not c:
raise ValueError()
# Creates zero mean Gaussian distributions from the
# original samples
gaussian_pair = []
for name in name_pair:
prior = self._priors[name]
assert prior is not None
# Makes sure the samples are within the selected range
ok = (prior.samples >= prior.range[0])
ok *= (prior.samples <= prior.range[1])
x = scipy_norm.ppf(loc=0, scale=1,
q=prior.cdf(prior.samples.value[ok]))
gaussian_pair.append(x)
# Computes the Pearson r correlation coefficient from the
# pair of distributions
c = scipy_pearsonr(*gaussian_pair)[0]
else:
# If this was supplied, check whether the value is consistent
assert (-1 <= c <= 1)
corr_matrix[i, j] = corr_matrix[j, i] = c
# Computes the Cholesky L (lower-triangular) matrix which is used
# to correlate the priors in the prior_transform method
self._correlator_L = np.linalg.cholesky(corr_matrix)
# Stores a list of correlated priors, for convenience
self.correlated_priors = set(correlated_priors)
# Finally, saves updated dictionary
self._prior_correlations = new_prior_correlations
[docs] def corner_plot(self, **kwargs):
"""
Calls :py:func:`imagine.tools.visualization.corner_plot` to make
a corner plot of samples produced by this Pipeline
"""
return visualization.corner_plot(pipeline=self, **kwargs)
[docs] def progress_report(self):
"""
Reports the progress of the inference
"""
# Try to call get_intermediate_results
try:
self.get_intermediate_results()
# If this method is not implemented, show error instead and continue
except NotImplementedError:
print("Progress reports can only be made if the "
"'get_intermediate_results()'-method is overridden! "
"Skipping report.")
# If this method is implemented, create progress report
else:
if mpirank!=0:
return
dead_samples = self.intermediate_results['rejected_points']
live_samples = self.intermediate_results['live_points']
likelihood = self.intermediate_results['logLikelihood']
lnX = self.intermediate_results['lnX']
if dead_samples is not None:
fig = visualization.trace_plot(
parameter_names=self._active_parameters,
samples=dead_samples,
live_samples=live_samples,
likelihood=likelihood, lnX=lnX)
fig_filepath = os.path.join(self._run_directory,
'progress_report.pdf')
msg = 'Saving progress report to {}'.format(fig_filepath)
log.info(msg)
fig.savefig(fig_filepath)
if misc.is_notebook():
ipd.clear_output()
ipd.display(ipd.Markdown(
"\n**Progress report:**\nnumber of likelihood "
"evaluations {}".format(
self._likelihood_evaluations_counter)))
plt.show()
else:
print(msg)
[docs] def posterior_report(self, sdigits=2, **kwargs):
"""
Displays the best fit values and 1-sigma errors for each active
parameter. Also produces a corner plot of the samples, which is
saved to the run directory.
If running on a jupyter-notebook, a nice LaTeX display is used, and
the plot is shown.
Parameters
----------
sdigits : int
The number of significant digits to be used
"""
out = ''
for param, pdict in self.posterior_summary.items():
if misc.is_notebook():
# Extracts LaTeX representation from astropy unit object
out += r"\\ \text{{ {0}: }}\; ".format(param)
out += q2tex(*map(pdict.get, ['median', 'errup', 'errlo']),
sdigits=sdigits)
out += r"\\"
else:
out += r"{0}: ".format(param)
md, errlo, errup = map(pdict.get, ['median', 'errlo', 'errup'])
if isinstance(md, apu.Quantity):
unit = str(md.unit)
md, errlo, errup = map(lambda x: x.value, [md, errlo, errup])
else:
unit = ""
v, l, u = misc.adjust_error_intervals(
md, errlo, errup, sdigits=sdigits)
out += r'{0} (-{1})/(+{2}) {3}\n'.format(v, l, u, unit)
fig = self.corner_plot(**kwargs)
fig.savefig(os.path.join(self._run_directory, 'corner_plot.pdf'))
if misc.is_notebook():
ipd.display(ipd.Markdown("\n**Posterior report:**"))
plt.show()
ipd.display(ipd.Math(out))
else:
# Restores linebreaks and prints
print('Posterior report')
print(out.replace(r'\n','\n'))
[docs] def evidence_report(self, sdigits=4):
if not np.isfinite(self.log_evidence):
return
if misc.is_notebook():
ipd.display(ipd.Markdown("**Evidence report:**"))
out = r"\log\mathcal{{ Z }} = "
out += q2tex(self.log_evidence, self.log_evidence_err)
ipd.display(ipd.Math(out))
else:
print('Evidence report')
print('logZ =', self.log_evidence, '±', self.log_evidence_err)
@property
def log_evidence(self):
r"""
Natural logarithm of the *marginal likelihood* or *Bayesian model evidence*,
:math:`\ln\mathcal{Z}`, where
.. math::
\mathcal{Z} = P(d|m) = \int_{\Omega_\theta} P(d | \theta, m) P(\theta | m) \mathrm{d}\theta .
Note
----
Available only after the pipeline is run.
"""
if self._evidence is None:
raise ValueError('Evidence not set! Have you run the pipeline?')
else:
return self._evidence
@property
def log_evidence_err(self):
"""
Error estimate in the natural logarithm of the *Bayesian model evidence*.
Available once the pipeline is run.
Note
----
Available only after the pipeline is run.
"""
assert self._evidence_err is not None, 'Evidence error not set! Did you run the pipeline?'
return self._evidence_err
@property
def median_model(self):
"""
Posterior median model
List of Field objects corresponding to the median values of the
distributions of parameter values found *after a Pipeline run*.
"""
if self._median_model is None:
self._median_model = []
for factory in self.factory_list:
params_dict = {}
for param, results_dict in self.posterior_summary.items():
if factory.field_name in param:
param = param.replace(factory.field_name + '_','')
params_dict[param] = results_dict['median']
field = factory(ensemble_seeds=self.ensemble_seeds[factory],
variables=params_dict)
self._median_model.append(field)
return self._median_model
@property
def MAP_model(self):
"""
Maximum a posteriori (MAP) model
List of Field objects corresponding to the mode of the posterior
distribution. This does not require a previous run of the Pipeline,
as the MAP is computed by maximizing the unnormalized posterior
distribution.
This convenience property uses the results from the latest call of the
:py:meth:`Pipeline.get_MAP` method. If :py:meth:`Pipeline.get_MAP` has
never been called, the MAP is found calling it with with default
arguments.
See :py:meth:`Pipeline.get_MAP` for details.
"""
if self._MAP_model is None:
if self._MAP is None:
self.get_MAP()
assert not isinstance(self._MAP, scipy_optimize.OptimizeResult), 'Try running get_MAP directly, with different parameters'
self._MAP_model = []
for factory in self.factory_list:
params_dict = {}
for pname, MAP_val in zip(self.active_parameters, self._MAP):
if factory.name in pname:
params_dict[pname.replace(factory.name+'_','')] = MAP_val
field = factory(ensemble_seeds=self.ensemble_seeds[factory],
variables=params_dict)
self._MAP_model.append(field)
return self._MAP_model
@property
def median_simulation(self):
"""
Simulation corresponding to the
:py:data:`median_model <Pipeline.median_model>`.
"""
if self._median_simulation is None:
self._median_simulation = self.simulator(self.median_model)
return self._median_simulation
@property
def MAP_simulation(self):
"""
Simulation corresponding to the
:py:data:`MAP_model <Pipeline.MAP_model>`.
"""
if self._MAP_simulation is None:
self._MAP_simulation = self.simulator(self.MAP_model)
return self._MAP_simulation
@property
def samples(self):
"""
An :py:class:`astropy.table.QTable` object containing parameter values
of the samples produced in the run.
"""
if self._samples is None:
assert self._samples_array is not None, 'Samples not available. Did you run the pipeline?'
self._samples = QTable(data=self._samples_array,
names=self._active_parameters)
# Restores the units
for param, prior in self._priors.items():
self._samples[param] = self._samples[param] << prior.unit
return self._samples
@property
def factory_list(self):
"""
List of the
:py:class:`Field Factories <imagine.fields.field_factory.GeneralFieldFactory>`
currently being used.
Updating the factory list automatically extracts active_parameters,
parameter ranges and priors from each field factory.
"""
return self._factory_list
@factory_list.setter
def factory_list(self, factory_list):
# Notice that the parameter/variable ordering is fixed wrt
# factory ordering. This is useful for recovering variable logic value
# for each factory and necessary to construct the common prior function.
assert isinstance(factory_list, (list, tuple)), 'Factory list must be a tuple or list'
self._active_parameters = tuple()
self._priors = dict()
self._prior_cube_mapping = dict()
i = 0
for factory in factory_list:
assert isinstance(factory, FieldFactory)
for ap_name in factory.active_parameters:
if ap_name in self._prior_cube_mapping:
raise KeyError('Ambiguous prior naming')
self._prior_cube_mapping.update({factory.name+'_'+ap_name: i})
assert isinstance(ap_name, str)
# Sets the parameters and ranges
self._active_parameters += (str(factory.name+'_'+ap_name),)
# Sets the Prior
prior = factory.priors[ap_name]
assert isinstance(prior, Prior)
self._priors[factory.name+'_'+ap_name] = prior
i += 1
self._factory_list = factory_list
@property
def sampler_supports_mpi(self):
return getattr(self, 'SUPPORTS_MPI', False)
@property
def simulator(self):
"""
The :py:class:`Simulator <imagine.simulators.simulator.Simulator>`
object used by the pipeline
"""
return self._simulator
@simulator.setter
def simulator(self, simulator):
assert isinstance(simulator, Simulator)
self._simulator = simulator
@property
def likelihood(self):
"""
The :py:class:`Likelihood <imagine.likelihoods.likelihood.Likelihood>`
object used by the pipeline
"""
return self._likelihood
@likelihood.setter
def likelihood(self, likelihood):
assert isinstance(likelihood, Likelihood)
self._likelihood = likelihood
[docs] def prior_pdf(self, cube):
"""
Probability distribution associated with the all parameters being used by
the multiple Field Factories
Parameters
----------
cube : np.ndarray
Each row of the array corresponds to a different parameter value
in the sampling (dimensionless, but in the standard units of the
prior).
Returns
-------
rtn : float
Prior probability of the parameter choice specified by `cube`
"""
rtn = 1.
for i, parameter in enumerate(self._active_parameters):
prior = self._priors[parameter]
rtn *= prior.pdf(cube[i]*prior.unit)
return rtn
@property
def distribute_ensemble(self):
"""
If True, whenever the sampler requires a likelihood evaluation,
the ensemble of stochastic fields realizations is distributed among
all the nodes.
Otherwise, each likelihood evaluations will go through the whole
ensemble size on a single node. See :doc:`parallel` for details.
"""
return self._distribute_ensemble
@distribute_ensemble.setter
def distribute_ensemble(self, distr_ensemble):
# Saves the choice
self._distribute_ensemble = distr_ensemble
# Calls the setter method to ajust hidden parts
self.ensemble_size = self.ensemble_size
@property
def ensemble_size(self):
return self._ensemble_size
@ensemble_size.setter
def ensemble_size(self, ensemble_size):
ensemble_size = int(ensemble_size)
assert (ensemble_size > 0)
self._ensemble_size = ensemble_size
log.debug('set ensemble size to %i' % int(ensemble_size))
if self._distribute_ensemble:
# Sets pointer to the correct likelihood function
self._likelihood_function = self._mpi_likelihood
# Sets effective ensemble size
if self.ensemble_size % mpisize != 0:
raise ValueError("In 'distribute_ensemble' mode, ensemble_size "
"must be a multiple of the number of MPI nodes")
self.ensemble_size_actual = self.ensemble_size // mpisize
else:
# Sets pointer to the correct likelihood function
self._likelihood_function = self._core_likelihood
# Sets effective ensemble size
self.ensemble_size_actual = self.ensemble_size
self._randomness()
@property
def sampling_controllers(self):
"""
Settings used by the sampler (e.g. `'dlogz'`).
See the documentation of each specific pipeline subclass for details.
After the pipeline runs, this property is updated to reflect the
actual final choice of sampling controllers (including
default values).
"""
return self._sampling_controllers
@sampling_controllers.setter
def sampling_controllers(self, pp_dict):
try:
self._sampling_controllers.update(pp_dict)
except AttributeError:
self._sampling_controllers = pp_dict
[docs] def tidy_up(self):
"""
Resets internal state before a new run
"""
log.debug('@ pipeline::tidy_up')
self.results = None
self._evidence = None
self._evidence_err = None
self._posterior_summary = None
self._samples_array = None
self._samples = None
self._median_model = None
self._median_simulation = None
self._randomness()
self._likelihood_evaluations_counter = 0
self.intermediate_results = defaultdict(lambda: None)
def _randomness(self):
"""
Manipulate random seed(s)
isolating this process for convenience of testing
"""
log.debug('@ pipeline::_randomness')
assert self.random_type in ('free', 'controllable', 'fixed')
if self.random_type == 'free':
# Refreshes the master seed
self.master_seed = np.random.randint(0, 2**32)
# Updates numpy random accordingly
np.random.seed(self.master_seed)
if self.random_type == 'fixed':
common_ensemble_seeds = ensemble_seed_generator(self.ensemble_size_actual)
self.ensemble_seeds = {factory: common_ensemble_seeds
for factory in self._factory_list}
elif self.random_type == 'controllable':
self.ensemble_seeds = {factory: ensemble_seed_generator(self.ensemble_size_actual)
for factory in self._factory_list}
else:
self.ensemble_seeds = {factory: None for factory in self._factory_list}
# This function returns all parameter names of all factories in order
[docs] def get_par_names(self):
# Create list of names
names = list(chain(*map(
lambda x: ["%s_%s" % (x.name, par) for par in x.active_parameters],
self._factory_list)))
# Return them
return(names)
def _get_observables(self, cube):
# return active variables from pymultinest cube to factories
# and then generate new field objects
head_idx = 0
tail_idx = 0
field_list = tuple()
# the ordering in factory list and variable list is vital
for factory in self._factory_list:
variable_dict = dict()
tail_idx = head_idx + len(factory.active_parameters)
factory_cube = cube[head_idx:tail_idx]
for i, av in enumerate(factory.active_parameters):
variable_dict[av] = factory_cube[i]*self._priors[factory.field_name + '_' + av].unit
ensemble_seeds = self.ensemble_seeds[factory]
field_list += (factory(variables=variable_dict,
ensemble_size=self.ensemble_size_actual,
ensemble_seeds=ensemble_seeds),)
log.debug('create '+factory.name+' field')
head_idx = tail_idx
assert(head_idx == len(self._active_parameters))
observables = self._simulator(field_list)
return(observables)
[docs] def test(self, n_points=3, include_centre=True, verbose=True):
"""
Tests the present IMAGINE pipeline evaluating the likelihood on
a small number of points and reporting the run-time.
Paramters
---------
n_points : int
Number of points to evaluate the likelihood on. The first point
corresponds to the centre of the active parameter ranges (unless
`include_centre` is set to `False`) and the other are randomly
sampled from the prior distributions.
include_centre : bool
If True, the initial point will be obtained from the centre of the
active parameter ranges.
Returns
-------
mean_time : astropy.units.Quantity
The average execution time of a single likelihood evaluation
"""
n_params = len(self.active_parameters)
timer = Timer()
times = []
for i_point in range(n_points):
timer.tick(i_point)
if (i_point == 0) and include_centre:
if verbose:
print('Sampling centres of the parameter ranges.')
values = self.parameter_central_value()
else:
if verbose:
print('Randomly sampling from prior.')
# Draws a random point from the prior distributions
values = self.prior_transform(np.random.random_sample(n_params))
if verbose:
print('\tEvaluating point:', values)
L = self._likelihood_function(values)
time = timer.tock(i_point)
if verbose:
print('\tLog-likelihood', L)
print('\tTotal execution time: ', time,'s\n')
times.append(time)
mean_time = np.mean(times) * apu.s
if verbose:
print('Average execution time:', mean_time)
return mean_time
def _core_likelihood(self, cube):
"""
core log-likelihood calculator
Parameters
----------
cube
list of variable values
Returns
-------
log-likelihood value
"""
log.debug('@ pipeline::_core_likelihood')
log.debug('sampler at %s' % str(cube))
# Obtain observables for provided cube
observables = self._get_observables(cube)
# add up individual log-likelihood terms
current_likelihood = self.likelihood(observables)
current_likelihood *= self.likelihood_rescaler
# check likelihood value until negative (or no larger than given threshold)
if self.check_threshold and current_likelihood > self.likelihood_threshold:
raise ValueError('log-likelihood beyond threshold')
# Logs the value
log.info('Likelihood evaluation at point:'
' {0} value: {1}'.format(cube, current_likelihood))
# Reports, if needed
self._likelihood_evaluations_counter += 1
if (self.show_progress_reports and
self._likelihood_evaluations_counter % self.n_evals_report == 0):
if mpirank==0:
self.progress_report()
return current_likelihood
def _mpi_likelihood(self, cube):
"""
mpi log-likelihood calculator
PyMultinest supports execution with MPI
where sampler on each node follows DIFFERENT journeys in parameter space
but keep in communication
so we need to firstly register parameter position on each node
and calculate log-likelihood value of each node with joint force of all nodes
in this way, ensemble size is multiplied by the number of working nodes
Parameters
----------
cube
list of variable values
Returns
-------
log-likelihood value
"""
if self.sampler_supports_mpi:
log.debug('@ pipeline::_mpi_likelihood')
# Gathers cubes from all nodes
cube_local_size = cube.size
cube_pool = np.empty(cube_local_size*mpisize, dtype=np.float64)
comm.Allgather([cube, MPI.DOUBLE], [cube_pool, MPI.DOUBLE])
# Calculates log-likelihood for each node
loglike_pool = np.empty(mpisize, dtype=np.float64)
for i in range(mpisize): # loop through nodes
cube_local = cube_pool[i*cube_local_size : (i+1)*cube_local_size]
loglike_pool[i] = self._core_likelihood(cube_local)
# Scatters log-likelihood to each node
loglike_local = np.empty(1, dtype=np.float64)
comm.Scatter([loglike_pool, MPI.DOUBLE], [loglike_local, MPI.DOUBLE], root=0)
return loglike_local[0] # Some samplers require a scalar value
else:
log.debug('@ dynesty_pipeline::_mpi_likelihood')
# gather cubes from all nodes
cube_local_size = cube.size
cube_pool = np.empty(cube_local_size*mpisize, dtype=np.float64)
comm.Allgather([cube, MPI.DOUBLE], [cube_pool, MPI.DOUBLE])
# check if all nodes are at the same parameter-space position
assert ((cube_pool == np.tile(cube_pool[:cube_local_size], mpisize)).all())
return self._core_likelihood(cube)
[docs] @abc.abstractmethod
def call(self, **kwargs):
raise NotImplementedError
[docs] def save(self, **kwargs):
"""
Saves the state of the Pipeline
The `run_directory` set in initialization is used. Any distributed
data is gathered and the pipeline is serialized and saved to disk.
Note
----
This method uses :py:meth:`imagine.tools.io.save_pipeline`
"""
return io.save_pipeline(self, **kwargs)
[docs] @classmethod
def load(cls, directory_path='.'):
"""
Loads the state of a Pipeline object
Parameters
----------
directory_path : str
Path to the directory where the Pipeline state should be saved
Note
----
This method uses :py:meth:`imagine.tools.io.load_pipeline`
"""
return io.load_pipeline(directory_path)
[docs] def prepare_likelihood_convergence_report(self, min_Nens=10, max_Nens=50,
n_seeds=1, n_points=5,
include_centre=True):
"""
Constructs a report dataset based on a given Pipeline setup, which can
be used for studying the *likelihood convergence* in a particular problem
The pipeline's ensemble size is temporarily set to `Nens*n_seeds`, and
(for each point) the present pipeline setup is used to compute a
Simulations dictionary object.
Subsets of this simulations object are then produced and the likelihood
computed.
The end result is a :py:obj:`pandas.DataFrame` containing the following
columns:
* `likelihood` - The likelihood value.
* `likelihood_std` - The likelihood dispersion, estimated by
bootstrapping the available ensemble and computing the standard
deviation.
* `ensemble_size` - Size of the ensemble of simulations used.
* `ipoint` - Index of the point used.
* `iseed` - Index of the random (master) seed used.
* `param_values` - Values of the parameters at a given point.
Parameters
----------
min_Nens : int
Minimum ensemble size to be considered
max_Nens : int
Maximum ensemble size to be examined
n_seeds : int
Number of different (master) random seeds to be used
n_points : int
Number of points to be evaluated. Points are randomly drawn
from the *prior* distribution (but see `include_centre`).
include_centre : bool
If `True`, the first point is taken as the value corresponding to
the centre of each parameter range.
Returns
-------
results : pandas.DataFrame
A `pandas.DataFrame` object containing the report data.
"""
# Saves original choice for ensemble size
original_size = self.ensemble_size
original_likelihood_dispersion_switch = self.likelihood.compute_dispersion
self.likelihood.compute_dispersion = True
self.ensemble_size = max_Nens * n_seeds
n_params = len(self.active_parameters)
results = defaultdict(list)
# Samples the prior distributions
for i_point in range(n_points):
if (i_point == 0) and include_centre:
values = self.parameter_central_value()
i_point = 'centre'
else:
# Draws a random point from the prior distributions
values = self.prior_transform(np.random.random_sample(n_params))
# Produces max_Nens*n_seeds outputs from this
maps = self._get_observables(values)
# Now, we construct smaller Simulations dicts based on a
# subset of `maps`
for i_seed in range(n_seeds):
for Nens in range(min_Nens,max_Nens+1):
# Constructs a slice corresponding to a subset
indices = slice(i_seed*Nens, (i_seed+1)*Nens)
# Constructs the subset Simulations and computes likelihood
maps_subset = maps.sub_sim(indices)
L_value, L_std = self.likelihood(maps_subset)
# Stores everything
results['likelihood'].append(L_value)
results['likelihood_std'].append(L_std)
results['ensemble_size'].append(Nens)
results['ipoint'].append(i_point)
results['iseed'].append(i_seed)
results['param_values'].append(values)
# Restores original pipeline settings
self.ensemble_size = original_size
self.likelihood.compute_dispersion = original_likelihood_dispersion_switch
return pd.DataFrame(data=results)
[docs] def likelihood_convergence_report(self, cmap='cmr.chroma', **kwargs):
"""
Prepares a standard set of plots of a likelihood convergence report
(produced by the :py:meth:`Pipeline.prepare_likelihood_convergence_report`
method).
Parameters
----------
cmap : str
Colormap to be used for the lineplots
**kwargs
Keyword arguments that will be supplied to
`prepare_likelihood_convergence_report` (see its docstring for
details).
"""
rep = self.prepare_likelihood_convergence_report(**kwargs)
visualization.show_likelihood_convergence_report(rep, cmap)
[docs] def parameter_central_value(self):
"""
Gets central point in the parameter space of a given pipeline
The ranges are extracted from each prior. The result is a pure list of
numbers corresponding to the values in the native units of each prior.
For non-finite ranges (e.g. [-inf,inf]), a zero central value is assumed.
Returns
-------
central_values : list
A list of parameter values at the centre of each
parameter range
"""
central_values = []
for param in self.active_parameters:
centre = np.mean(self.priors[param].range)
# Removes units
if hasattr(centre,'value'):
centre = centre.value
# Deals with priors with undefined ranges
if not np.isfinite(centre):
centre = 0.
central_values.append(centre)
return central_values
[docs] def log_probability_unnormalized(self, theta):
"""
The log of the unnormalized posterior probability -- i.e. the sum of
the log-likelihood and the log of the prior probability.
Parameters
----------
theta : np.ndarray
Array of parameter values (in their default units)
Returns
-------
log_prob : float
log(likelihood(theta))+log(prior(theta))
"""
# Checks whether the range is sane
for pname, v in zip(self.active_parameters, theta):
pmin, pmax = self.priors[pname].range.value
if not (pmin < v < pmax):
return -np.inf
# Computes the (log) prior
lp = np.log(self.prior_pdf(theta))
if not np.isfinite(lp):
return -np.inf
# Multiplies by the likelihood
log_prob = lp + self._likelihood_function(theta)
return log_prob
[docs] def get_MAP(self, initial_guess='auto', include_units=True,
return_optimizer_result=False, **kwargs):
"""
Computes the parameter values at the Maximum A Posteriori
This method uses `scipy.optimize.minimize` for the calculation. By
default, it will use the
`'Powell' <https://docs.scipy.org/doc/scipy/reference/optimize.minimize-powell.html#optimize-minimize-powell>`_
(which does not require the calculation of gradients or the assumption
of continuity).
Also by default the `bounds` keywords use the ranges specified in the
priors.
The MAP estimate is stored internally to be used by the properties:
:py:data:`MAP_model` and :py:data:`MAP_simulation`.
Parameters
----------
include_units : bool
If `True` (default), returns the result as list of
`Quantities <astropy.units.Quantity>`. Otherwise, returns a single
numpy array with the parameter values in their default units.
return_optimizer_result : bool
If `False` (default) only the MAP values are returned. Otherwise,
a tuple containing the values and the output of
`scipy.optimize.minimize` is returned instead.
initial_guess : str or numpy.ndarray
The initial guess used by the optimizer. If set to 'centre', the
centre of each parameter range is used (obtained using
:py:meth:`parameter_central_value`). If set to 'samples', the
median values of the samples produced by a previous posterior
sampling are used (the Pipeline has to have been run before).
If set to 'auto' (default), use 'samples' if available, and
'centre' otherwise. Alternatively, an array of active parameter
values with the starting position may be provided.
**kwargs
Any other keyword arguments are passed directly to
`scipy.optimize.minimize`.
Returns
-------
MAP : list or array
Parameter values at the position of the maximum of the posterior
distribution
result : scipy.optimize.OptimizeResult
Only if `return_optimizer_result` is set to `True`, this is
returned together with the MAP.
"""
# By default, uses Powell which does not require calculation
# of gradients and uses the 'bounds' information
if 'method' not in kwargs:
kwargs['method'] = 'Powell'
# Finds ranges
if 'bounds' not in kwargs:
kwargs['bounds'] = [self.priors[k].range.value
for k in self.active_parameters]
# Sets function to minimize
fun = lambda theta: -np.nan_to_num(self.log_probability_unnormalized(theta))
# Avoid a (sampling) progress report to appear during the minimization
progress_reports_state = self.show_progress_reports
self.show_progress_reports = False
# If the pipeline had previously run, the posterior median as the
# initial guess, otherwise, uses the centres of the ranges
if ((initial_guess == 'samples') or
(initial_guess == 'auto' and (self._samples_array is not None))):
initial_guess = [self.posterior_summary[k]['median'].value
for k in self.active_parameters]
elif (initial_guess == 'centre') or (initial_guess == 'auto'):
initial_guess = self.parameter_central_value()
else:
initial_guess = np.array(initial_guess)
# Finds the MAP
result = scipy_optimize.minimize(fun, initial_guess, **kwargs)
if not result.success:
print('Unable to find MAP! Check your settings.')
return result
MAP = result.x
# Restores the original setup
self.show_progress_reports = progress_reports_state
if include_units:
MAP = list(MAP)
# Adds units to parameters
for i, pname in enumerate(self.active_parameters):
MAP[i] *= self.priors[pname].unit
# Stores for later use
self._MAP = MAP
self._MAP_model = None
self._MAP_simulation = None
if not return_optimizer_result:
return MAP
else:
return MAP, result