Source code for picslike.picslike

"""
Pixel-based likelihood for cosmological parameter inference.

This module implements the PICSLike class for computing likelihoods directly in
pixel space, providing an alternative to harmonic-space methods. The approach
is particularly useful for handling incomplete sky coverage, non-Gaussian features,
and scenarios where pixel-space analysis offers computational or methodological
advantages.

The pixel-based likelihood function is computed as:

ln L(θ) = -1/2 * (d - s(θ))^T * C^(-1) * (d - s(θ))

where d is the observed data vector, s(θ) is the theoretical signal prediction
for parameters θ, and C is the total covariance matrix including signal and noise
contributions.

Classes
-------
PICSLike
    Main class for pixel-based likelihood computation inheriting from cosmocore.Core.

Notes
-----
The pixel-based approach offers several advantages:
1. Natural handling of masked regions without harmonic complications
2. Direct treatment of non-Gaussian signals and systematics
3. Efficient cross-correlation analysis between different maps
4. Computational efficiency for certain analysis configurations

The implementation supports MPI parallelization for efficient computation across
parameter grids and handles memory optimization for large-scale analyses.

References
----------
.. [1] Wandelt, B.D., Larson, D.L. & Lakshminarayanan, A. "Global, exact cosmic
   microwave background data analysis using Gibbs sampling"
   Phys. Rev. D 70, 083511 (2004)
.. [2] Jewell, J., Levin, S. & Anderson, C.H. "Application of Monte Carlo algorithms
   to the Bayesian analysis of the cosmic microwave background"
   Astrophys. J. 609, 1-14 (2004)
.. [3] Chu, M. et al. "Cosmological parameter constraints as derived from the Wilkinson
   Microwave Anisotropy Probe data via Gibbs sampling and the Blackwell-Rao estimator"
   Phys. Rev. D 71, 103002 (2005)
.. [4] Eriksen, H.K. et al. "Power Spectrum Estimation from High-Resolution Maps by
   Gibbs Sampling" Astrophys. J. Suppl. 155, 227-241 (2004)
.. [5] Hinshaw, G. et al. "First-Year Wilkinson Microwave Anisotropy Probe (WMAP)
   Observations: Angular Power Spectrum" Astrophys. J. Suppl. 148, 135-159 (2003)
.. [6] Planck Collaboration "Planck 2018 results. V. CMB power spectra and likelihoods"
   Astron. Astrophys. 641, A5 (2020)
.. [7] Tegmark, M. "How to measure CMB power spectra without losing information"
   Phys. Rev. D 55, 5895 (1997) - For connection between pixel-based and QML methods
"""

from __future__ import annotations

import time
from typing import Any

import numpy as np

from cosmocore import (
    Core,
    FieldCollection,
    MPISharedMemoryMixin,
    SpectrumKey,
    cholesky_solve,
    compute_signal_matrix,
    matrix_inverse_symm,
    matrix_slogdet_symm,
    read_maps,
)
from cosmocore._mpi import MPI

from .likelihood_result import LikelihoodResult
from .parameter_grid import ParameterGrid


[docs] class PICSLike(Core, MPISharedMemoryMixin): """ Pixel-based likelihood computation for cosmological parameter inference. This class implements pixel-space likelihood analysis for CMB observations, providing an alternative to harmonic-space methods. It inherits from cosmocore.Core and extends it with pixel-based likelihood functionality including parameter grid management, signal covariance computation, and likelihood evaluation across parameter space. The likelihood function computed is: ln L(θ) = -1/2 * (d - s(θ))^T * C^(-1) * (d - s(θ)) where the total covariance matrix C includes both signal and noise contributions and is recomputed for each parameter point θ in the grid. Parameters ---------- params_file : str, optional Path to YAML parameter file containing analysis configuration. **kwargs : dict Additional keyword arguments passed to the Core parent class. Attributes ---------- comm : MPI.Comm MPI communicator for parallel computation. rank : int MPI process rank (0 for master process). size : int Total number of MPI processes. maps : FieldCollection Collection of observed data maps. parameter_grid : ParameterGrid Grid of parameter values and corresponding theoretical spectra. likelihood_result : LikelihoodResult Container for computed likelihood values and statistics. data_vector : numpy.ndarray Flattened data vector from observed maps. noise_covariance : numpy.ndarray Noise covariance matrix. Examples -------- Basic pixel-based likelihood analysis: >>> from cosmoforge.picslike import PICSLike >>> picslike = PICSLike("config/pixel_analysis.yaml") >>> picslike.setup_parameter_grid(param_ranges, theoretical_spectra) >>> picslike.compute_likelihood_grid() >>> chi2_values = picslike.get_chi_squared() >>> best_fit = picslike.get_best_fit() MPI parallel execution: >>> # Run with: mpirun -n 4 python pixel_analysis.py >>> picslike = PICSLike("config/pixel_config.yaml") >>> picslike.compute_likelihood_grid() # Distributed across processes Notes ----- The pixel-based likelihood computation is parallelized using MPI, with each process handling a subset of parameter grid points. The method scales well with the number of parameter points but can be memory-intensive for large maps due to covariance matrix storage and inversion. For analyses with many parameters or high-resolution maps, consider using appropriate computational resources and memory optimization strategies. See Also -------- cosmocore.Core : Base class providing fundamental analysis infrastructure ParameterGrid : Helper class for parameter space management LikelihoodResult : Container for likelihood computation results """
[docs] def __init__( self, params_file: str | None = None, compression: dict | None = None, **kwargs: Any, ) -> None: """ Initialize pixel-based likelihood computation class. Parameters ---------- params_file : str, optional Path to YAML configuration file. compression : dict, optional Computation basis configuration (method, epsilon, basis, mode_fraction). **kwargs : Any Additional arguments passed to Core. """ # Initialize parent Core class super().__init__(params=params_file, **kwargs) self.comm = MPI.COMM_WORLD self.rank = self.comm.Get_rank() self.size = self.comm.Get_size() self._basis_config = compression # Initialize attributes self.maps1 = None self.maps2 = None self.parameter_grid: ParameterGrid | None = None self.likelihood_result: LikelihoodResult | None = None self.simulation_index: int = 0 # Which simulation to use for likelihood self._lmax_signal = None self.fiducial_spectrum: dict | None = None self._legendre_cache: np.ndarray | None = ( None # Pre-computed Legendre polynomials ) if self.rank == 0: self.log("PICSLike initialized!")
@property def lmax_signal(self) -> int: """Signal-cov ceiling (ADR 0009). Resolution order: explicit setter, then ``params.lmax_signal``, then ``4 * nside``. """ if self._lmax_signal is not None: return self._lmax_signal params_value = getattr(self.params, "lmax_signal", None) if params_value is not None: return params_value return 4 * self.params.nside @lmax_signal.setter def lmax_signal(self, value: int) -> None: """Set custom lmax_signal value.""" self._lmax_signal = value
[docs] def compute_signal_matrix(self, param_point: tuple) -> np.ndarray: """ Compute signal covariance matrix for a given parameter point. Returns ------- np.ndarray Signal covariance matrix S, shape (n_active_pixels, n_active_pixels). Raises ------ ValueError If noise covariance matrices not set up. """ if self.noise_cov1 is None: raise ValueError("Covariance matrices must be set up first") self.signal_matrix = np.zeros_like(self.noise_cov1, dtype=np.float64) self.signal_matrix = np.asfortranarray(self.signal_matrix, dtype=np.float64) start_time = time.time() if self.rank == 0 else None spectra_dict = self.parameter_grid.get_spectrum(param_point) self.collection.set_cls(spectra_dict, lmax=self.lmax_signal) self.collection.set_beams(lmax=self.lmax_signal) compute_signal_matrix( S=self.signal_matrix, lmax=self.lmax_signal, fields=self.collection, ) if self.rank == 0 and start_time is not None: elapsed = time.time() - start_time self.log(f"Signal matrix computed in {elapsed:.2f} seconds", level=3) self.log(f"Signal matrix shape: {self.signal_matrix.shape}", level=4) self.log(f"Signal matrix first row: {self.signal_matrix[0, :10]}", level=4) return self.signal_matrix
[docs] def prepare_covariance_matrix(self): """ Prepare total covariance C = S + N and compute its inverse. Modifies self.signal_matrix in-place to form total covariance, then inverts it. """ # Add noise to signal IN-PLACE to form total covariance. # This avoids allocating a separate array (saves n_pix^2 memory). # Safe because self.signal_matrix is recreated fresh for each parameter point. self.signal_matrix += self.noise_cov1 self.signal_matrix = np.asfortranarray(self.signal_matrix) # Alias for clarity in downstream code (e.g., slogdet). # This is just a reference, no memory copy. self.total_cov = self.signal_matrix self.log(f"Combined covariance matrix shape: {self.total_cov.shape}", level=4) self.inv_cov = matrix_inverse_symm(self.total_cov) self.log("Computed inverse of primary covariance matrix", level=4)
[docs] def setup_maps(self): """ Read observational map data from FITS files. Loads maps with proper pixel selection, field extraction, and calibration. For cross-correlation (do_cross=True), loads both primary and secondary maps. Output shape: (n_active_pixels, n_simulations). Raises ------ ValueError If pixel information not available (setup_geometry not called). FileNotFoundError If input map files not found. """ if self.rank == 0: self.log("Reading maps", level=2) # Ensure we have pixel information if not hasattr(self, "npixs") or self.npixs is None: raise ValueError( "Pixel information not available. Run setup_geometry first." ) # Invalidate any cached parameter-independent SMW pieces; maps # are about to be (re)loaded. self._smw_data_cache = None # Read maps using the core functionality ntot = sum(self.collection.n_active) self.maps1 = np.empty((ntot, self.params.nsims), dtype=np.float64) # Read maps1 read_maps( maps=self.maps1, filename=self.params.inputmapfile1, pixact=self.pixact, field_labels=self.params.physical_labels, calibration=self.params.calibration, ) # Read maps2 if doing cross-correlation if self.params.do_cross: self.maps2 = np.empty((ntot, self.params.nsims), dtype=np.float64) read_maps( maps=self.maps2, filename=self.params.inputmapfile2, pixact=self.pixact, field_labels=self.params.physical_labels, calibration=self.params.calibration, )
[docs] def set_simulation_index(self, sim_idx: int) -> None: """ Set which simulation to use for likelihood computation. Parameters ---------- sim_idx : int Index of the simulation to use (0-based indexing). Must be within the range [0, nsims-1]. Raises ------ ValueError If sim_idx is out of range for the available simulations. Notes ----- In a real analysis, this would typically be 0 since you have only one observational dataset. For Monte Carlo studies, you might want to compute likelihoods for different simulations separately. """ if self.maps1 is not None: max_sim = self.maps1.shape[1] - 1 if sim_idx < 0 or sim_idx > max_sim: raise ValueError( f"Simulation index {sim_idx} out of range [0, {max_sim}]" ) self.simulation_index = sim_idx if self.rank == 0: self.log(f"Set simulation index to: {sim_idx}")
[docs] def setup_parameter_grid(self) -> None: """Set up parameter grid from config and load associated theoretical spectra.""" if self.rank == 0: self.log("Setting up parameter grid...", level=2) self.parameter_names = self.params.parameters.keys() self.parameter_ranges = { name: np.linspace(*self.params.parameters[name]) for name in self.parameter_names } self.parameter_grid = ParameterGrid( core_params=self.params, parameter_ranges=self.parameter_ranges, root_dir=self.params.root_dir, root_filename=self.params.root_filename, lmax_signal=self.lmax_signal, ) if self.rank == 0: total_points = self.parameter_grid.get_total_points() self.log(f"Parameter grid contains {total_points} points", level=2)
def _broadcast_variables(self): """Broadcast essential data from master to all MPI worker processes.""" # Python objects (small, serialization is fine) self.params = self.comm.bcast(self.params if self.rank == 0 else None, root=0) self.collection: FieldCollection = self.comm.bcast( self.collection if self.rank == 0 else None, root=0 ) self.npixs = self.comm.bcast(self.npixs if self.rank == 0 else None, root=0) self.pixact = self.comm.bcast(self.pixact if self.rank == 0 else None, root=0) # Numpy arrays via shared memory (zero-copy, no size limits). # Worker ranks may not have these attributes yet, so use getattr. point_vectors = getattr(self, "point_vectors", None) n_point_vectors = self.comm.bcast( len(point_vectors) if self.rank == 0 and point_vectors is not None else 0, root=0, ) self.point_vectors = tuple( self._shared_array( point_vectors[i] if self.rank == 0 and point_vectors is not None else None ) for i in range(n_point_vectors) ) self.noise_cov1 = self._shared_array(getattr(self, "noise_cov1", None)) self.maps1 = self._shared_array(getattr(self, "maps1", None)) if self.params.do_cross: self.maps2 = self._shared_array(getattr(self, "maps2", None))
[docs] def compute(self) -> None: """ Compute likelihood across the entire parameter grid for all simulations. Evaluates the likelihood function at each point in the parameter grid for each simulation, distributing the computation across MPI processes for efficiency. The signal covariance matrix is recomputed for each parameter point using the corresponding theoretical power spectrum. Notes ----- This is the main computational routine that: 1. Distributes parameter grid points across MPI processes 2. For each simulation: - Computes signal covariance matrix for each parameter point - Inverts total covariance matrix (signal + noise) - Evaluates likelihood function 3. Gathers results from all processes 4. Stores collection of LikelihoodResult objects (one per simulation) The computation scales as O(N_sim * N_param * N_pix^3) where N_sim is the number of simulations, N_param is the number of parameter points and N_pix is the number of pixels. """ # Store results for each simulation simulation_results = [] # Get parameter points for this MPI process param_points = self.parameter_grid.get_points_for_process(self.rank, self.size) n_sims = self.params.nsims # Initialize results storage for this simulation local_chi2_values = np.zeros((len(param_points), n_sims)) local_log_likelihood = np.zeros((len(param_points), n_sims)) # Compute likelihood for each parameter point for this simulation for i, param_point in enumerate(param_points): chi2, log_like = self._compute_likelihood_point(param_point) local_chi2_values[i] = chi2 local_log_likelihood[i] = log_like # Gather results from all processes for this simulation all_chi2 = self.comm.gather(local_chi2_values, root=0) all_log_like = self.comm.gather(local_log_likelihood, root=0) if self.rank == 0: # Combine results from all processes for this simulation combined_chi2 = np.concatenate(all_chi2) combined_log_like = np.concatenate(all_log_like) # Create LikelihoodResult for this simulation for i in range(n_sims): sim_result = LikelihoodResult( parameter_grid=self.parameter_grid, chi_squared_values=combined_chi2[:, i], log_likelihood_values=combined_log_like[:, i], ) simulation_results.append(sim_result) # Store the collection of results self.simulation_results = simulation_results self.likelihood_result = self._compute_mean_likelihood_result( simulation_results ) self.log(f"Likelihood computation completed for {n_sims} simulations") self.log("Mean likelihood result computed for analysis", level=2)
def _compute_mean_likelihood_result( self, simulation_results: list[LikelihoodResult] ) -> LikelihoodResult: """Compute mean likelihood result by averaging over simulations.""" if not simulation_results: raise ValueError("No simulation results provided") n_points = len(simulation_results[0].chi_squared_values) # Initialize arrays for mean computation mean_chi2 = np.zeros(n_points) mean_log_like = np.zeros(n_points) # Compute means across simulations for i in range(n_points): chi2_values = [result.chi_squared_values[i] for result in simulation_results] log_like_values = [ result.log_likelihood_values[i] for result in simulation_results ] mean_chi2[i] = np.mean(chi2_values) mean_log_like[i] = np.mean(log_like_values) mean_result = LikelihoodResult( parameter_grid=simulation_results[0].parameter_grid, chi_squared_values=mean_chi2, log_likelihood_values=mean_log_like, ) return mean_result def _compute_likelihood_point(self, param_point: tuple) -> tuple[float, float]: """ Compute chi-squared and log-likelihood for a single parameter point. ln L = -0.5 * (d^T C^{-1} d + ln|C|) When a computation basis is enabled, uses SMW formula in harmonic space — O(n_modes³) instead of O(n_pix³). Quadratic forms are batched across simulations for efficiency. """ use_basis = hasattr(self, "basis_manager") and self.basis_manager is not None if use_basis: bm = self.basis_manager # Only the signal spectrum changes between parameter points. # Beams are loaded once during setup (setup_beams), but smoothing # must be re-applied after each set_cls call. spectra_dict = self.parameter_grid.get_spectrum(param_point) self.collection.set_cls(spectra_dict, lmax=self.lmax_signal) self.collection.beam_manager.apply_smoothing( self.collection.spectra_manager, lmax=self.lmax_signal ) has_spin2 = any(f.spin == 2 for f in self.collection.fields) is_multi_field = len(self.collection.fields) > 1 or has_spin2 if is_multi_field: C_ell_dict = self._build_c_ell_dict() C_ell_input = C_ell_dict else: C_ell_input = self.collection.spectra_manager.get_cls(0, 0, 0) self.log(f"C_ell set for parameters: {param_point}", level=3) C_ell_arg = C_ell_dict if is_multi_field else {(0, 0, 0): C_ell_input} if bm.method == "harmonic": # SMW fast path reaches into HarmonicBasis-private state # (``_V_N_inv``, ``_N_chol``); ``term1`` / ``projected_i`` # depend on N and the data only, so cache once per parameter # scan. if getattr(self, "_smw_data_cache", None) is None: projected1 = bm._V_N_inv @ self.maps1 if self.params.do_cross: projected2 = bm._V_N_inv @ self.maps2 ninv_maps2 = cholesky_solve(bm._N_chol, self.maps2) term1 = np.einsum("in,in->n", self.maps1, ninv_maps2) else: projected2 = projected1 ninv_maps1 = cholesky_solve(bm._N_chol, self.maps1) term1 = np.einsum("in,in->n", self.maps1, ninv_maps1) self._smw_data_cache = (projected1, projected2, term1) projected1, projected2, term1 = self._smw_data_cache # SMW identity: d1^T C^{-1} d2 = d1^T N^{-1} d2 − y1^T K^{-1} y2 # with y_i = V N^{-1} d_i. Auto case (d1 = d2) is the # symmetric specialisation. ``logdet`` from prepare_for_basis # is the exact full pixel-space log determinant on harmonic. K_chol, logdet = bm.prepare_for_basis(C_ell_arg) kernel_inv_y2 = cholesky_solve((K_chol, True), projected2) term2 = np.einsum("in,in->n", projected1, kernel_inv_y2) chi_squared = term1 - term2 else: # Polymorphic basis-space path. ``chi_squared`` and ``logdet`` # are both basis-space, so the log-likelihood is internally # consistent. On pixel-direct (U = I) it equals the full # Gaussian; on truncated compressed pixel it is the # basis-space approximation admitted by the kept subspace. C_basis_inv = bm.get_inverse(C_ell_arg) d1_basis = bm.to_basis(self.maps1) if self.params.do_cross: d2_basis = bm.to_basis(self.maps2) chi_squared = np.einsum( "in,ij,jn->n", d1_basis, C_basis_inv, d2_basis ) else: chi_squared = np.einsum( "in,ij,jn->n", d1_basis, C_basis_inv, d1_basis ) logdet = bm.get_logdet(C_ell_arg) self.log(f"Log-determinant: {logdet:.2f}", level=3) else: self.compute_signal_matrix(param_point) self.log(f"Signal matrix computed for parameters: {param_point}", level=3) self.prepare_covariance_matrix() self.log("Total covariance matrix prepared and inverted", level=3) if self.params.do_cross: chi_squared = np.einsum( "in,ij,jn->n", self.maps1, self.inv_cov, self.maps2 ) else: chi_squared = np.einsum( "in,ij,jn->n", self.maps1, self.inv_cov, self.maps1 ) logdet = matrix_slogdet_symm(self.total_cov)[1] self.log(f"Log-determinant: {logdet:.2f}", level=3) # ln L = -0.5 * (χ² + ln|C|) log_likelihood = -0.5 * (chi_squared + logdet) return chi_squared, log_likelihood def _build_c_ell_dict(self) -> dict[SpectrumKey, np.ndarray]: """Build C_ell_dict from spectra_manager for compressed operations.""" C_ell_dict, _ = self.collection.spectra_manager.build_inputs() return C_ell_dict
[docs] def get_chi_squared(self) -> np.ndarray: """ Get chi-squared values from likelihood computation. Returns ------- chi_squared : numpy.ndarray Array of chi-squared values corresponding to parameter grid points. Raises ------ RuntimeError If likelihood computation has not been performed. """ if self.likelihood_result is None: msg = "Likelihood not computed. Call compute_likelihood_grid() first." raise RuntimeError(msg) return self.likelihood_result.chi_squared_values
[docs] def get_log_likelihood(self) -> np.ndarray: """ Get log-likelihood values from likelihood computation. Returns ------- log_likelihood : numpy.ndarray Array of log-likelihood values corresponding to parameter grid points. Raises ------ RuntimeError If likelihood computation has not been performed. """ if self.likelihood_result is None: msg = "Likelihood not computed. Call compute_likelihood_grid() first." raise RuntimeError(msg) return self.likelihood_result.log_likelihood_values
[docs] def get_best_fit(self) -> dict[str, float]: """ Get best-fit parameter values from likelihood computation. Returns ------- best_fit_params : dict[str, float] Dictionary mapping parameter names to their best-fit values. Raises ------ RuntimeError If likelihood computation has not been performed. """ if self.likelihood_result is None: msg = "Likelihood not computed. Call compute_likelihood_grid() first." raise RuntimeError(msg) return self.likelihood_result.get_best_fit()
[docs] def get_simulation_results(self) -> list[LikelihoodResult]: """ Get likelihood results for each individual simulation. Returns ------- list[LikelihoodResult] One LikelihoodResult per simulation. Raises ------ RuntimeError If compute() has not been called. """ if not hasattr(self, "simulation_results") or self.simulation_results is None: msg = "Simulation results not available. Call compute() first." raise RuntimeError(msg) return self.simulation_results
[docs] def get_mean_likelihood_result(self) -> LikelihoodResult: """ Get the mean likelihood result averaged over all simulations. Returns ------- LikelihoodResult Mean likelihood result for plotting and analysis. Raises ------ RuntimeError If compute() has not been called. """ if self.likelihood_result is None: msg = "Likelihood not computed. Call compute() first." raise RuntimeError(msg) return self.likelihood_result
[docs] def save_results(self, output_path: str) -> None: """ Save likelihood results to file. Parameters ---------- output_path : str Base path for saving. Individual sims saved as output_path_sim_XX. """ if self.likelihood_result is None: msg = "No results to save. Run compute() first." raise RuntimeError(msg) if self.rank == 0: # Save mean likelihood result self.likelihood_result.save(output_path) self.log(f"Mean likelihood results saved to {output_path}") # Save individual simulation results if available if ( hasattr(self, "simulation_results") and self.simulation_results is not None ): from pathlib import Path base_path = Path(output_path) base_dir = base_path.parent base_name = base_path.stem base_ext = base_path.suffix for i, sim_result in enumerate(self.simulation_results): sim_path = base_dir / f"{base_name}_sim_{i:02d}{base_ext}" sim_result.save(str(sim_path)) n_files = len(self.simulation_results) self.log(f"Individual simulation results saved ({n_files} files)") self.log( "Use get_simulation_results() to access individual results", level=2, )
[docs] def run(self): """ Execute the complete pixel-based likelihood analysis pipeline. Pipeline: setup fields/geometry/covariance, configure parameter grid, compute likelihood across parameter space, gather results. """ self.setup_parameter_grid() if self.rank == 0: self.log("Starting PICSLike analysis pipeline", level=1) self.setup_fields() self.setup_geometry() self.setup_covariance_matrices() self.setup_cls(lmax=self.lmax_signal) self.setup_beams(lmax=self.lmax_signal) if self._basis_config is not None: _basis_keys = ( "method", "epsilon", "mode_fraction", "compression_target", "C_ell", ) kwargs = { k: self._basis_config[k] for k in _basis_keys if k in self._basis_config } self.setup_computation_basis(**kwargs) self.log("Computation basis setup completed", level=3) if self.maps1 is None: self.setup_maps() if self.params.do_cross: assert self.maps2 is not None, ( "Maps2 should be loaded for cross-correlation." ) # Synchronize and broadcast shared variables to worker processes. # Skip broadcast for single-process runs to avoid unnecessary # serialization of large objects (covariance matrices can exceed 2 GB). self.comm.Barrier() if self.size > 1: self._broadcast_variables() self.comm.Barrier() self.compute() self.comm.Barrier()