Source code for picslike.parameter_grid

"""
Parameter grid management for pixel-based likelihood analysis.

This module provides the ParameterGrid class for managing parameter spaces
and associated theoretical spectra in pixel-based likelihood computations.
It handles parameter range definitions, grid point generation, and efficient
distribution of computation across MPI processes.

Classes
-------
ParameterGrid
    Manager for parameter grids and theoretical spectra.

Notes
-----
The ParameterGrid class supports both regular grids defined by parameter ranges
and irregular grids defined by explicit parameter combinations. It provides
efficient methods for distributing computation across MPI processes and
retrieving theoretical spectra for specific parameter combinations.
"""

from __future__ import annotations

import itertools
from copy import deepcopy
from typing import Any

import numpy as np

from cosmocore import InputParams, readcl


[docs] class ParameterGrid: """ Manager for parameter grids and theoretical spectra. This class handles the organization of parameter spaces for likelihood analysis, including grid generation, spectrum management, and efficient distribution of parameter points across parallel processes. Parameters ---------- parameter_ranges : dict[str, np.ndarray] Dictionary mapping parameter names to their value ranges. Each value should be a 1D array of parameter values. theoretical_spectra : dict[tuple, np.ndarray] Dictionary mapping parameter value tuples to theoretical power spectra. Keys are tuples of parameter values in the same order as parameter_ranges. Attributes ---------- parameter_names : list[str] Names of parameters in the grid. parameter_ranges : dict[str, np.ndarray] Parameter ranges defining the grid. theoretical_spectra : dict[tuple, np.ndarray] Theoretical spectra for each parameter combination. grid_points : list[tuple] All parameter combinations in the grid. Examples -------- Regular parameter grid: >>> param_ranges = { ... 'omega_b': np.linspace(0.02, 0.025, 5), ... 'omega_c': np.linspace(0.10, 0.14, 5), ... } >>> spectra = {} # Filled with theoretical spectra >>> grid = ParameterGrid(param_ranges, spectra) >>> total_points = grid.get_total_points() # Returns 25 MPI process distribution: >>> points_for_process = grid.get_points_for_process(rank=0, size=4) >>> # Returns subset of points for MPI rank 0 out of 4 processes Notes ----- The class assumes that theoretical spectra are provided for all parameter combinations in the grid. Missing spectra will raise errors during likelihood computation. """
[docs] def __init__( self, core_params: InputParams, parameter_ranges: dict[str, np.ndarray], theoretical_spectra: dict[tuple, np.ndarray] | None = None, root_dir: str | None = None, root_filename: str | None = None, lmax_signal: int | None = None, ) -> None: """ Initialize parameter grid. Parameters ---------- core_params : InputParams Analysis parameters containing nside, lmax, etc. parameter_ranges : dict[str, np.ndarray] Dictionary mapping parameter names to value arrays. theoretical_spectra : dict[tuple, np.ndarray] Dictionary mapping parameter tuples to power spectra. root_dir : str, optional Directory containing theoretical spectra files. root_filename : str, optional Root filename for theoretical spectra. lmax_signal : int, optional Maximum multipole for signal computation. If None, uses 4*nside. This ensures spectra are read with the correct lmax even when analysis lmax differs from signal lmax. Raises ------ ValueError If parameter ranges are empty or inconsistent with spectra. """ self.core_params = core_params # Store lmax_signal, defaulting to 4*nside if not provided self.lmax_signal = ( lmax_signal if lmax_signal is not None else 4 * core_params.nside ) self.parameter_names = list(parameter_ranges.keys()) self.parameter_ranges = deepcopy(parameter_ranges) # Generate all parameter combinations self.grid_points = self._generate_grid_points() if theoretical_spectra is not None: self.theoretical_spectra = deepcopy(theoretical_spectra) else: assert root_dir is not None and root_filename is not None, ( "Either theoretical_spectra or both root_dir and root_filename " "must be provided to read spectra from files." ) self.root_dir = root_dir self.root_filename = root_filename self.read_theoretical_spectra() # Validate that all grid points have corresponding spectra self._validate_spectra()
def _generate_grid_points(self) -> list[tuple]: """ Generate all parameter combinations in the grid. Returns ------- grid_points : list[tuple] List of all parameter combinations as tuples. Notes ----- Uses itertools.product to generate the Cartesian product of all parameter ranges, creating a complete grid of parameter combinations. """ if not self.parameter_ranges: return [] param_values = [self.parameter_ranges[name] for name in self.parameter_names] # Generate Cartesian product of all parameter ranges return list(itertools.product(*param_values))
[docs] def read_theoretical_spectra(self) -> None: """ Read theoretical spectra from files, blending with the fiducial in the inference window per ADR 0009. Notes ----- For each grid point, loads the parameter-dependent spectrum and (if a fiducial file is configured) replaces multipoles outside the inference window ``[lmin, lmax]`` with the fiducial values. The inference window is read from ``core_params.lmin`` and ``core_params.lmax``; if either is missing, the fiducial blend is skipped (full parameter spectrum used). """ self.theoretical_spectra = {} params_lmin = getattr(self.core_params, "lmin", None) params_lmax = getattr(self.core_params, "lmax", None) fiducialfile = getattr(self.core_params, "fiducialfile", None) fiducial_spectrum = None if ( fiducialfile is not None and params_lmin is not None and params_lmax is not None ): fiducial_spectrum = readcl( inputclfile=fiducialfile.strip(), Params=self.core_params, lmax=self.lmax_signal, ) root = self.root_dir + "/" + self.root_filename for point in self.grid_points: filename = ( root + "_" + "_".join( f"{self.parameter_names[i]}{val:.6g}" for i, val in enumerate(point) ) + ".txt" ) param_spectrum = readcl( inputclfile=filename, Params=self.core_params, lmax=self.lmax_signal ) if fiducial_spectrum is not None: blended_spectrum = self._blend_spectra( param_spectrum, fiducial_spectrum, params_lmin, params_lmax ) self.theoretical_spectra[point] = blended_spectrum else: self.theoretical_spectra[point] = param_spectrum
def _blend_spectra( self, param_spectrum: dict, fiducial_spectrum: dict, lmin: int, lmax: int, ) -> dict: """ Blend parameter-dependent and fiducial spectra over the inference window ``[lmin, lmax]`` (ADR 0009). Arrays are ℓ-indexed: ``arr[ell]`` is the spectrum value at multipole ℓ. For ℓ outside ``[lmin, lmax]`` the fiducial is used; inside that range the parameter spectrum is used. """ blended = {} for key in param_spectrum: if key not in fiducial_spectrum: blended[key] = param_spectrum[key].copy() continue param_arr = param_spectrum[key] fid_arr = fiducial_spectrum[key] min_len = min(len(param_arr), len(fid_arr)) result = fid_arr[:min_len].copy() idx_low = max(0, lmin) idx_high = min(min_len - 1, lmax) if idx_low <= idx_high: result[idx_low : idx_high + 1] = param_arr[idx_low : idx_high + 1] blended[key] = result return blended def _validate_spectra(self) -> None: """ Validate that theoretical spectra exist for all grid points. Raises ------ ValueError If any grid point lacks a corresponding theoretical spectrum. Notes ----- Checks that every parameter combination in the grid has an associated theoretical power spectrum. This validation prevents runtime errors during likelihood computation. """ missing_spectra = [] for point in self.grid_points: if point not in self.theoretical_spectra: missing_spectra.append(point) if missing_spectra: msg = f"Missing theoretical spectra for {len(missing_spectra)} grid points" raise ValueError(msg)
[docs] def get_total_points(self) -> int: """ Get total number of points in the parameter grid. Returns ------- total_points : int Total number of parameter combinations in the grid. """ return len(self.grid_points)
[docs] def get_points_for_process(self, rank: int, size: int) -> list[tuple]: """ Get parameter points assigned to a specific MPI process. Parameters ---------- rank : int MPI process rank (0-indexed). size : int Total number of MPI processes. Returns ------- process_points : list[tuple] Parameter points assigned to this process. Notes ----- Distributes parameter grid points as evenly as possible across MPI processes. Uses simple round-robin assignment to ensure load balance. """ if size <= 0: msg = f"Invalid MPI size: {size}" raise ValueError(msg) if rank < 0 or rank >= size: msg = f"Invalid MPI rank {rank} for size {size}" raise ValueError(msg) # Distribute points using round-robin assignment return [self.grid_points[i] for i in range(rank, len(self.grid_points), size)]
[docs] def get_spectrum(self, param_point: tuple) -> np.ndarray: """ Get theoretical spectrum for a parameter point. Parameters ---------- param_point : tuple Parameter values as a tuple in the order of parameter_names. Returns ------- spectrum : numpy.ndarray Theoretical power spectrum for the given parameters. Raises ------ KeyError If no spectrum exists for the given parameter point. Notes ----- Retrieves the pre-computed theoretical power spectrum corresponding to the specified parameter combination. The spectrum is used for signal covariance matrix computation in likelihood analysis. """ try: return self.theoretical_spectra[param_point] except KeyError as exc: msg = f"No theoretical spectrum found for parameters: {param_point}" raise KeyError(msg) from exc
[docs] def get_parameter_dict(self, param_point: tuple) -> dict[str, Any]: """ Convert parameter tuple to named dictionary. Parameters ---------- param_point : tuple Parameter values as a tuple. Returns ------- param_dict : dict[str, Any] Dictionary mapping parameter names to values. Examples -------- >>> grid = ParameterGrid(param_ranges, spectra) >>> point = (0.022, 0.12) # (omega_b, omega_c) >>> param_dict = grid.get_parameter_dict(point) >>> # Returns {'omega_b': 0.022, 'omega_c': 0.12} """ if len(param_point) != len(self.parameter_names): msg = ( f"Parameter point has {len(param_point)} values, " f"expected {len(self.parameter_names)}" ) raise ValueError(msg) return dict(zip(self.parameter_names, param_point))
[docs] def get_grid_shape(self) -> tuple[int, ...]: """ Get shape of the parameter grid. Returns ------- grid_shape : tuple[int, ...] Shape of the grid with dimensions corresponding to each parameter. Notes ----- Returns the shape of the regular grid formed by the parameter ranges. Useful for reshaping results into multidimensional arrays for visualization and analysis. """ return tuple(len(self.parameter_ranges[name]) for name in self.parameter_names)
[docs] def get_parameter_index(self, param_point: tuple) -> int: """ Get flat index of a parameter point in the grid. Parameters ---------- param_point : tuple Parameter values as a tuple. Returns ------- index : int Flat index of the parameter point in the grid. Raises ------ ValueError If the parameter point is not found in the grid. Notes ----- Converts multidimensional parameter coordinates to a flat index, useful for indexing into result arrays and mapping between parameter space and result storage. """ try: return self.grid_points.index(param_point) except ValueError as exc: msg = f"Parameter point {param_point} not found in grid" raise ValueError(msg) from exc
[docs] def add_spectrum(self, param_point: tuple, spectrum: np.ndarray) -> None: """ Add theoretical spectrum for a parameter point. Parameters ---------- param_point : tuple Parameter values as a tuple. spectrum : numpy.ndarray Theoretical power spectrum to add. Notes ----- Allows dynamic addition of theoretical spectra to the grid. Useful for cases where spectra are computed on-the-fly or loaded incrementally. """ self.theoretical_spectra[param_point] = deepcopy(spectrum)
[docs] def __len__(self) -> int: """Return total number of points in the grid.""" return len(self.grid_points)
[docs] def __iter__(self): """Iterate over parameter points in the grid.""" return iter(self.grid_points)
[docs] def __contains__(self, param_point: tuple) -> bool: """Check if parameter point exists in the grid.""" return param_point in self.grid_points
[docs] def __eq__(self, other): """Check equality with another ParameterGrid.""" if not isinstance(other, ParameterGrid): return False return ( self.parameter_names == other.parameter_names and self.parameter_ranges == other.parameter_ranges and self.theoretical_spectra == other.theoretical_spectra and self.grid_points == other.grid_points )