Source code for qube.fisher

"""
Fisher matrix computation for cosmological parameter estimation.

This module implements the Fisher class for calculating Fisher information matrices
used in cosmological parameter forecasting. The Fisher matrix provides the expected
parameter constraints from observations of spin-0 and spin-2 fields on the sphere
(e.g. CMB temperature and polarization, cosmic shear) and is computed as:

F_ij = (1/2) * Tr[C^(-1) * ∂C/∂θ_i * C^(-1) * ∂C/∂θ_j]

where C is the total covariance matrix (signal + noise) and θ_i are the cosmological
parameters of interest. The implementation supports MPI parallelization for efficient
computation of large Fisher matrices.

The unified API from Core transparently handles both harmonic and pixel
computation bases when configured, falling back to traditional pixel-space
computation otherwise.

Classes
-------
Fisher
    Main class for Fisher matrix computation inheriting from cosmocore.Core.

References
----------
.. [1] Tegmark, M. et al. "How to measure CMB power spectra without losing information"
   Phys. Rev. D 55, 5895 (1997)
.. [2] Challinor, A. & Chon, G. "Error analysis of quadratic power spectrum estimates"
   MNRAS 301, 657-688 (1998)
"""

from __future__ import annotations

import os
import time
from contextlib import contextmanager, nullcontext

import numpy as np

from cosmocore._mpi import MPI

try:
    from threadpoolctl import threadpool_limits as _threadpool_limits
except ImportError:  # pragma: no cover
    # Graceful degradation: setup just runs at OMP_NUM_THREADS.
    _threadpool_limits = None


@contextmanager
def _wide_threadpool():
    """Widen BLAS threads to the visible CPU set for rank-0 setup work."""
    if _threadpool_limits is None:  # pragma: no cover
        yield
        return
    n_visible = len(os.sched_getaffinity(0))
    with _threadpool_limits(limits=n_visible):
        yield


from cosmocore import (
    Bins,
    Core,
    InputParams,
    MPISharedMemoryMixin,
    SpectrumKey,
    SpectrumKind,
    SymmetryMode,
    cholesky_factor,
    cholesky_solve,
    compute_signal_matrix,
    do_derivative_step,
    matrix_inverse_symm,
    matrix_mult,
    matrix_trace,
    write_covmat_reduced,
    write_out_matrix,
)

_WEIGHT_ZERO_THRESHOLD = 1e-30  # Skip derivative terms with negligible weight


def _basis_path_label(basis_manager) -> str:
    """Human-readable label for the active computation-basis path."""
    if basis_manager is None:
        return "traditional"
    if basis_manager.method == "pixel":
        return (
            "pixel-truncated"
            if getattr(basis_manager, "_is_compressed", False)
            else "pixel-direct"
        )
    return "harmonic"


[docs] class Fisher(Core, MPISharedMemoryMixin): """ Fisher matrix computation for cosmological parameter estimation. This class implements the Fisher information matrix calculation for forecasting cosmological parameter constraints from observations of spin-0 and spin-2 fields on the sphere. It inherits from cosmocore.Core and extends it with Fisher-specific functionality. The Fisher matrix F_ij quantifies the information content of the data about parameters θ_i and θ_j, computed as: F_ij = (1/2) * Tr[C^(-1) * ∂C/∂θ_i * C^(-1) * ∂C/∂θ_j] The computation uses Core's unified API which transparently handles harmonic and pixel computation bases when configured. Parameters ---------- params_file : str, optional Path to YAML parameter file containing analysis configuration. compression : dict, optional Computation basis configuration (method, epsilon, basis, mode_fraction). **kwargs : dict Additional keyword arguments passed to the Core parent class. Attributes ---------- comm : MPI.Comm MPI communicator for parallel computation. rank : int MPI process rank (0 for master process). size : int Total number of MPI processes. fisher : numpy.ndarray Computed Fisher information matrix. n_ell : int Number of multipole moments in the inference window (``params.lmax - params.lmin + 1``). n_params : int Total number of Fisher matrix parameters (nspectra * n_ell). Examples -------- Basic Fisher matrix computation: >>> from cosmoforge.qube import Fisher >>> fisher = Fisher("config/fisher_analysis.yaml") >>> fisher.run() >>> F_matrix = fisher.get_fisher_matrix() With harmonic basis: >>> fisher = Fisher("config.yaml", compression={"method": "harmonic"}) >>> fisher.run() # Uses harmonic basis transparently """
[docs] def __init__( self, params_file: str | None = None, compression: dict | None = None, cache_derivatives: bool = False, symmetry_mode: SymmetryMode | str | None = None, **kwargs, ): """ Initialize Fisher matrix computation class. Parameters ---------- params_file : str, optional Path to YAML configuration file. compression : dict, optional Computation basis configuration dictionary. Options: - method : str ("harmonic" or "pixel") - epsilon : float (eigenvalue threshold for pixel basis) - basis : str (for pixel: "harmonic", "noise_weighted", etc.) - mode_fraction : float (alternative to epsilon) cache_derivatives : bool, optional Whether to cache binned derivative matrices for Spectra to reuse. Default is False — Spectra recomputes via the pixel-direct fast path (single Legendre/Wigner pass per bin), which is amortized across MPI ranks. Setting True keeps the dense per-bin cache, which can cost n_params * n_pix^2 of memory (e.g. ~14 GB at QU_nside64 fsky=0.1) but skips the recompute. The harmonic-fast path always caches its small COO triplets regardless of this flag. **kwargs : dict Additional keyword arguments passed to Core. """ super().__init__(params=params_file, **kwargs) # MPI setup self.comm = MPI.COMM_WORLD self.rank = self.comm.Get_rank() self.size = self.comm.Get_size() # Computation basis config self._basis_config = compression # Derivative caching self._cache_derivatives = cache_derivatives # SymmetryMode (ADR-0011): controls cross-component spin-2 EB # handling. SYMMETRIC default reproduces pre-Slice-5 behaviour # bit-for-bit; DIRECTIONAL opt-in adds separate CG cross spectra. # Coerce strings ("SYMMETRIC" from YAML) so downstream identity # checks (`is SymmetryMode.SYMMETRIC`) don't silently misroute. if symmetry_mode is None: self.symmetry_mode = SymmetryMode.SYMMETRIC elif isinstance(symmetry_mode, SymmetryMode): self.symmetry_mode = symmetry_mode elif isinstance(symmetry_mode, str): try: self.symmetry_mode = SymmetryMode[symmetry_mode.upper()] except KeyError as exc: raise ValueError( f"unknown symmetry_mode {symmetry_mode!r}; expected one of " f"{[m.name for m in SymmetryMode]}" ) from exc else: raise TypeError( f"symmetry_mode must be SymmetryMode | str | None; " f"got {type(symmetry_mode).__name__}" ) # Initialize attributes self.signal_matrix = None self._lmax_signal = None # Ordered list of SpectrumKey instances enumerating the spectra # along the Fisher matrix's parameter axis. Populated by # ``_build_multi_spectrum_inputs`` once ``compute()`` runs; used # by user-facing label-keyed accessors to navigate the flat # output arrays. self.spectra_list: list | None = None
@property def lmax_signal(self) -> int: """Signal-cov ceiling (ADR 0009). Resolution order: explicit setter, then ``params.lmax_signal``, then ``4 * nside``. """ if self._lmax_signal is not None: return self._lmax_signal params_value = getattr(self.params, "lmax_signal", None) if params_value is not None: return params_value return 4 * self.params.nside @lmax_signal.setter def lmax_signal(self, value: int) -> None: self._lmax_signal = value # ========================================================================= # Profiling hook (no-op unless ``_profiler`` is set externally) # ========================================================================= def _stage(self, name: str): """Return the profiler's stage context, or a nullcontext when none. Hosts (e.g. benchmark scripts) attach a ``StageProfiler`` instance by setting ``fisher._profiler``; otherwise the wrapping ``with`` block is a zero-cost no-op. """ profiler = getattr(self, "_profiler", None) return profiler.stage(name) if profiler is not None else nullcontext() # ========================================================================= # Setup Methods # =========================================================================
[docs] def setup_signal_matrix(self) -> np.ndarray: """ Compute signal covariance S = Sum_ell C_ell P_ell from fiducial spectra. Returns ------- numpy.ndarray Signal covariance matrix of shape (n_pix, n_pix). """ if self.noise_cov1 is None: raise ValueError("Covariance matrices must be set up first") self.signal_matrix = np.zeros_like(self.noise_cov1, dtype=np.float64) self.signal_matrix = np.asfortranarray(self.signal_matrix, dtype=np.float64) start_time = time.time() if self.rank == 0 else None compute_signal_matrix( S=self.signal_matrix, lmax=self.lmax_signal, fields=self.collection, ) if self.rank == 0 and start_time is not None: elapsed = time.time() - start_time self.log(f"Signal matrix computed in {elapsed:.2f} seconds", level=3) return self.signal_matrix
[docs] def prepare_covariance_matrices(self): """ Build total covariance C = N + S, then compute C^{-1}. After this method, noise_cov1 (and noise_cov2 if cross-correlation) hold the inverted total covariance, not the original noise. """ if self.signal_matrix is None: self.setup_signal_matrix() # Save original noise covariance BEFORE adding signal write_covmat_reduced(self.params.outnoisecovmat1, self.noise_cov1) if self.params.do_cross: write_covmat_reduced(self.params.outnoisecovmat2, self.noise_cov2) # Add signal to noise covariance: C = N + S self.noise_cov1 = self.noise_cov1 + self.signal_matrix self.noise_cov1 = np.asfortranarray(self.noise_cov1) # Compute inverse: C^{-1} self.noise_cov1 = matrix_inverse_symm(self.noise_cov1) write_covmat_reduced(self.params.outinvcovmatfile1, self.noise_cov1) if self.params.do_cross: self.noise_cov2 = self.noise_cov2 + self.signal_matrix self.noise_cov2 = np.asfortranarray(self.noise_cov2) self.noise_cov2 = matrix_inverse_symm(self.noise_cov2) write_covmat_reduced(self.params.outinvcovmatfile2, self.noise_cov2)
def _build_derivative_matrix(self, ell: int, spectrum_idx: int = 0) -> np.ndarray: """Build pixel-space derivative matrix dC/dC_ell.""" dC = np.zeros_like(self.noise_cov1, dtype=np.float64) dC = np.asfortranarray(dC) do_derivative_step( dC, spectrum_idx, self.npixs, self.params.spins, ell, self.collection ) return dC def _build_multi_spectrum_inputs(self): """Build C_ell_dict and spectra_list keyed by SpectrumKey. Also stores the result on ``self.spectra_list`` so user-facing label-keyed accessors can navigate flat output arrays. """ C_ell_dict, spectra_list = self.collection.spectra_manager.build_inputs( symmetry_mode=self.symmetry_mode ) self.spectra_list = spectra_list return C_ell_dict, spectra_list # ========================================================================= # Main Entry Points # =========================================================================
[docs] def compute(self): """ Compute Fisher matrix F_{ij} = (1/2) Tr[C⁻¹ dC^i C⁻¹ dC^j]. Supports single- and multi-spectrum, with or without computation basis. For harmonic basis, exploits sparse derivative structure for O(nnz_i × nnz_j) trace computation instead of O(n_modes²). """ use_basis = hasattr(self, "basis_manager") and self.basis_manager is not None use_harmonic_fast = use_basis and self.basis_manager.method == "harmonic" nbins = self.bins.nbins nspectra = self.params.nspectra n_params = nspectra * nbins if self.rank == 0: path_label = ( _basis_path_label(self.basis_manager) if use_basis else "traditional" ) self.log( f"Starting Fisher computation (path: {path_label}, " f"{nspectra} spectra x {nbins} bins)", level=2, ) start_time = time.time() if self.rank == 0 else None total_elements = n_params * (n_params + 1) // 2 elements_per_proc = int(np.ceil(total_elements / self.size)) fisher_local = np.zeros((n_params, n_params)) with self._stage("fisher.compute.derivative_cache"): # --- Setup: get C_inv or V_Cinv_VT, spectra_list --- spectra_list = None # list[SpectrumKey] C_ell_dict = None if use_basis: C_ell_dict, spectra_list = self._build_multi_spectrum_inputs() elif self.params.do_cross: C_inv1 = self.noise_cov1 C_inv2 = self.noise_cov2 else: C_inv = self.noise_cov1 # Populate spectra_list for the non-basis path as well: user-facing # label accessors (get_bandpower_slices, get_fisher_block, # get_error_bars(as_dict=True)) need it to label the flat Fisher # axis. The pixel-space derivative path itself doesn't consume it. if not use_basis: _, spectra_list = self._build_multi_spectrum_inputs() if spectra_list is None: # Pixel-space generic path: build per-slot sentinel keys so # Core._resolve_spectrum_idx's reverse lookup routes each # spectrum_idx to its own slab. Identical keys would collide # in the dict cache and silently collapse every slot to 0. # spins=None bypasses SS-kind spin validation so spin-2 # single-field collections (e.g. QU) don't raise. spectra_list = [ SpectrumKey(i, i, SpectrumKind.SS) for i in range(nspectra) ] # --- Precompute derivatives --- # Harmonic fast path: sparse COO triplets (rows, cols, vals) # Generic path: dense C⁻¹ dC^b products sparse_coo_data = {} cinv_times_dcb = {} binned_derivatives = {} V_Cinv_VT = None deriv_start = time.time() if use_harmonic_fast: bm = self.basis_manager field_groups = bm._detect_field_blocks(C_ell_dict) V_Cinv_VT = bm.get_projected_inverse( C_ell_dict, field_groups=field_groups ) for param_idx in range(n_params): spectrum_idx = param_idx // nbins bin_idx = param_idx % nbins cache_key = (spectrum_idx, bin_idx) spec_key = spectra_list[spectrum_idx] beam_offset = spectrum_idx * self.n_ell is_diagonal = ( spec_key.comp_i == spec_key.comp_j and spec_key.kind in (SpectrumKind.SS, SpectrumKind.GG, SpectrumKind.CC) ) if is_diagonal: # dC^b diagonal: E_b = sum_ell beam * E_diag E_b_diag = np.zeros(bm.dim, dtype=np.float64) for ell in range( self.bins.lmins[bin_idx], self.bins.lmaxs[bin_idx] + 1 ): weight = self.beam_smoothing[ beam_offset + ell - self.params.lmin ] E_b_diag += weight * bm.get_derivative_diagonal(ell, spec_key) nz = np.nonzero(E_b_diag)[0] sparse_coo_data[cache_key] = (nz, nz, E_b_diag[nz]) else: # Cross-spectrum (EB/TE/TB): sparse off-diagonal COO all_rows = [] all_cols = [] all_vals = [] for ell in range( self.bins.lmins[bin_idx], self.bins.lmaxs[bin_idx] + 1 ): weight = self.beam_smoothing[ beam_offset + ell - self.params.lmin ] if abs(weight) < _WEIGHT_ZERO_THRESHOLD: continue dC_ell = bm.get_derivative_matrix( ell, spec_key, symmetry_mode=self.symmetry_mode ) r, c = np.nonzero(dC_ell) all_rows.append(r) all_cols.append(c) all_vals.append(dC_ell[r, c] * weight) if all_rows: rows = np.concatenate(all_rows) cols = np.concatenate(all_cols) vals = np.concatenate(all_vals) rc_pairs = rows * bm.dim + cols unique_pairs, inverse = np.unique( rc_pairs, return_inverse=True ) combined_vals = np.zeros(len(unique_pairs)) np.add.at(combined_vals, inverse, vals) sparse_coo_data[cache_key] = ( unique_pairs // bm.dim, unique_pairs % bm.dim, combined_vals, ) else: sparse_coo_data[cache_key] = ( np.array([], dtype=int), np.array([], dtype=int), np.array([], dtype=float), ) else: # Generic path: pixel basis or traditional pixel-space if use_basis: C_inv = self.basis_manager.get_projected_inverse(C_ell_dict) for param_idx in range(n_params): spectrum_idx = param_idx // nbins bin_idx = param_idx % nbins cache_key = (spectrum_idx, bin_idx) beam_offset = spectrum_idx * self.n_ell beam = self.beam_smoothing[beam_offset : beam_offset + self.n_ell] spec_key = spectra_list[spectrum_idx] dC_b = self.get_binned_derivative_matrix( bin_idx, spec_key, beam_smoothing=beam, symmetry_mode=self.symmetry_mode, ) # Retain dC when the trace consumes it OR when the user # opted in to caching for Spectra to reuse. trace_needs_dC = not use_basis and self.params.do_cross if trace_needs_dC or self._cache_derivatives: binned_derivatives[cache_key] = dC_b if use_basis or not self.params.do_cross: cinv_times_dcb[cache_key] = matrix_mult(C_inv, dC_b) else: cinv_times_dcb[cache_key] = matrix_mult( C_inv2, matrix_mult(dC_b, C_inv1) ) # Harmonic-fast COO triplets are small (~2l+1 nonzeros per bin) # and Spectra uses them on the hot path, so they're always cached. # Dense per-bin matrices can be huge (n_params * n_pix^2) and are # only cached when the user explicitly opts in. if use_harmonic_fast: self._cached_binned_derivatives = None self._cached_sparse_coo_data = sparse_coo_data elif self._cache_derivatives: self._cached_binned_derivatives = binned_derivatives self._cached_sparse_coo_data = None else: self._cached_binned_derivatives = None self._cached_sparse_coo_data = None if self.rank == 0: trace_path = ( "sparse-COO harmonic" if use_harmonic_fast else "dense matmul" ) self.log( f"All {n_params} derivatives precomputed in " f"{time.time() - deriv_start:.1f}s (trace path: {trace_path})", level=3, ) with self._stage("fisher.compute.trace_loop"): # --- Compute Fisher traces with MPI distribution --- # F_{ij} = (1/2) Tr[C⁻¹ dC^i C⁻¹ dC^j] trace_start = time.time() counter = 0 for param_i in range(n_params): key_i = (param_i // nbins, param_i % nbins) for param_j in range(param_i, n_params): counter += 1 if not ( counter > self.rank * elements_per_proc and counter <= (self.rank + 1) * elements_per_proc ): continue key_j = (param_j // nbins, param_j % nbins) if ( use_harmonic_fast and key_i in sparse_coo_data and key_j in sparse_coo_data ): r_i, c_i, v_i = sparse_coo_data[key_i] r_j, c_j, v_j = sparse_coo_data[key_j] M1 = V_Cinv_VT[np.ix_(c_j, r_i)] M2 = V_Cinv_VT[np.ix_(c_i, r_j)] fisher_val = 0.5 * np.einsum("ji,ij,i,j->", M1, M2, v_i, v_j) elif use_basis or not self.params.do_cross: fisher_val = 0.5 * matrix_trace( cinv_times_dcb[key_i], cinv_times_dcb[key_j] ) else: fisher_val = 0.5 * matrix_trace( binned_derivatives[key_j], cinv_times_dcb[key_i] ) fisher_local[param_i, param_j] = fisher_val if param_i != param_j: fisher_local[param_j, param_i] = fisher_val if self.rank == 0: self.log( f"Fisher traces done in {time.time() - trace_start:.1f}s " f"({total_elements} elements)", level=3, ) # --- MPI reduce and output --- self.comm.Barrier() reduced_fisher = np.zeros_like(fisher_local) self.comm.Reduce(fisher_local, reduced_fisher, op=MPI.SUM, root=0) if self.rank == 0: self.fisher = reduced_fisher self.log("-" * 80, level=1) self.log("Fisher matrix computation completed", level=1) if start_time is not None: elapsed = time.time() - start_time self.log(f"Total computation time: {elapsed:.2f} seconds", level=3) if hasattr(self.params, "outfilefisher"): write_out_matrix(self.params.outfilefisher, self.fisher) self.comm.Barrier()
[docs] def run(self) -> None: """ Execute the complete Fisher matrix analysis pipeline. A computation basis can be enabled via the compression parameter. """ # Setup phase (rank 0 only). Wrap the BLAS-heavy LAPACK work in a # widened threadpool so rank 0 can use the full visible CPU set # while ranks 1..N-1 idle on the subsequent broadcast. if self.rank == 0: if self.params is None: raise ValueError("Parameters must be set before running analysis") self.log("Starting Fisher matrix analysis pipeline", level=1) with self._stage("fisher.setup_static"): self.setup_fields() self.log("Fields setup completed", level=3) self.setup_geometry() self.log("Geometry setup completed", level=3) with self._stage("fisher.covariance_setup"): with _wide_threadpool(): self.setup_covariance_matrices() self.log("Covariance matrices setup completed", level=3) with self._stage("fisher.setup_cls_beams"): self.log( f"Using lmax_signal = {self.lmax_signal} for Cls and beams", level=3, ) self.setup_cls(lmax=self.lmax_signal) self.log("Power spectra setup completed", level=3) self.setup_beams(lmax=self.lmax_signal) self.log("Beam functions setup completed", level=3) # Setup computation basis if configured. Only forward keys that # are explicitly set in the config dict so the function defaults # (e.g. epsilon=1e-6) take effect when the user omits a key. if self._basis_config is not None: _basis_keys = ( "method", "lmax_signal", "epsilon", "mode_fraction", "compression_target", "C_ell", ) kwargs = { k: self._basis_config[k] for k in _basis_keys if k in self._basis_config } with self._stage("fisher.basis_setup"): with _wide_threadpool(): self.setup_computation_basis(**kwargs) bm = self.basis_manager path_label = _basis_path_label(bm) if bm.method == "harmonic": size_desc = f"{bm.dim} modes ({bm.compression_ratio:.1%} kept)" else: size_desc = f"{bm.dim} pixels" self.log( f"Computation basis: {path_label}{size_desc}", level=2, ) # Fisher's harmonic QML path reads only V_N_inv and V_Ninv_VT # after setup. Drop V to free n_modes × n_pix. bm.release_pixel_projector() # The basis manager handles covariance inversion internally # (SMW for harmonic, direct/truncated solve for pixel), so the # traditional explicit C = N+S build and inversion is skipped. self.log( f"Skipping explicit pixel-space C inversion " f"(handled by {path_label} basis)", level=3, ) else: # Traditional: prepare covariance matrices (signal + inverse) with self._stage("fisher.prepare_covariance"): with _wide_threadpool(): self.prepare_covariance_matrices() self.log("Signal matrix and covariance preparation completed", level=3) # Synchronize and broadcast shared variables to worker processes. # Skip broadcast for single-process runs to avoid unnecessary # serialization of large objects (basis_manager can exceed 2 GB). self.comm.Barrier() with self._stage("fisher.mpi_distribution"): if self.size > 1: # Python objects (small, serialization is fine) self.params: InputParams = self.comm.bcast( self.params if self.rank == 0 else None, root=0 ) self.collection = self.comm.bcast( self.collection if self.rank == 0 else None, root=0 ) self.npixs = self.comm.bcast( self.npixs if self.rank == 0 else None, root=0 ) self.pixact = self.comm.bcast( self.pixact if self.rank == 0 else None, root=0 ) # Numpy arrays via shared memory (zero-copy, no size limits). # Worker ranks may not have these attributes yet (setup is rank-0 only), # so use getattr to pass None for non-root ranks. point_vectors = getattr(self, "point_vectors", None) n_point_vectors = self.comm.bcast( len(point_vectors) if self.rank == 0 and point_vectors is not None else 0, root=0, ) self.point_vectors = tuple( self._shared_array( point_vectors[i] if self.rank == 0 and point_vectors is not None else None ) for i in range(n_point_vectors) ) if self._basis_config is not None: self.basis_manager = self.comm.bcast( self.basis_manager if self.rank == 0 else None, root=0 ) else: # Traditional path needs pixel-space matrices self.noise_cov1 = self._shared_array( getattr(self, "noise_cov1", None) ) self.signal_matrix = self._shared_array( getattr(self, "signal_matrix", None) ) if self.params.do_cross: self.noise_cov2 = self._shared_array( getattr(self, "noise_cov2", None) ) self.comm.Barrier() with self._stage("fisher.binning_beam"): # Binning: set_binning() > config bin_lmins/lmaxs > config delta_ell if not hasattr(self, "bins") or self.bins is None: bin_lmins = getattr(self.params, "bin_lmins", None) bin_lmaxs = getattr(self.params, "bin_lmaxs", None) if bin_lmins is not None and bin_lmaxs is not None: self.set_binning(Bins(bin_lmins, bin_lmaxs)) else: delta_ell = getattr(self.params, "delta_ell", 1) self.set_binning( Bins.fromdeltal(self.params.lmin, self.params.lmax, delta_ell) ) # Warn if bins don't cover the full ell range if self.bins.lmax < self.params.lmax: self.log( f"Binning covers ell up to {self.bins.lmax}, " f"but lmax={self.params.lmax}. " f"Multipoles {self.bins.lmax + 1}..{self.params.lmax} " f"are excluded.", level=1, ) # Beam smoothing factors b²_ell for each Fisher row. Flat vector # with each per-spectrum block holding the inference-range beams # (ell = lmin..lmax): [spec0_ell_lmin, ..., spec0_ellmax, # spec1_ell_lmin, ..., spec1_ellmax, ...]. For delta_ell=1 (unbinned) # this matches Fisher's n_params layout 1:1. lmin = self.params.lmin lmax = self.params.lmax self.n_ell = lmax - lmin + 1 smoothing_dict = self.collection.spectra_manager.compute_smoothing_factors( self.collection.beam_manager ) self.beam_smoothing = np.zeros( self.params.nspectra * self.n_ell, dtype=np.float64 ) idx = 0 for label in self.collection.spectra_manager.labels: # smoothing_dict[label] is ℓ-indexed length lmax_signal+1; # extract inference range ell=lmin..lmax (ADR 0009). self.beam_smoothing[idx : idx + self.n_ell] = smoothing_dict[label][ lmin : lmax + 1 ] idx += self.n_ell # Setup Fisher matrices dimensions self.n_params = self.params.nspectra * self.bins.nbins self.fisher = np.zeros((self.n_params, self.n_params)) # Compute Fisher matrix self.compute() self.comm.Barrier()
# ========================================================================= # Result Retrieval # =========================================================================
[docs] def get_fisher_matrix(self) -> np.ndarray | None: """Retrieve the beam-smoothed Fisher information matrix. Layout ------ Shape ``(nspectra * nbins, nspectra * nbins)``. Rows and columns run identically: each spectrum occupies a contiguous block of ``nbins`` columns; within a block, columns are ordered by bin index (``bin 0`` first, then ``bin 1``, ...). Spectrum order follows :attr:`spectra_list`, which is filled by the harmonic layer as: 1. All auto-spectra in component order (component ``i`` first, then the kinds emitted by that component — e.g. for a spin-2 component: ``GG`` (EE), ``CC`` (BB), ``GC`` (EB)). 2. All cross-spectra in lexicographic ``(i, j)`` order with ``i < j``, each pair contributing the kinds emitted by the spin pair (e.g. spin-0 × spin-2 emits ``SG`` (TE), ``SC`` (TB)). Example: for TQU (``spins=[0, 2]``, ``labels=["T", "E", "B"]``) under ``SymmetryMode.SYMMETRIC`` and ``nbins=10``, the matrix is ``(60, 60)`` and the column blocks are:: cols 0..9 : TT cols 10..19 : EE cols 20..29 : BB cols 30..39 : EB cols 40..49 : TE cols 50..59 : TB Use :meth:`get_bandpower_slices` to look up the slice for a given physical-label spectrum without having to compute these offsets by hand:: slices = fisher.get_bandpower_slices() tt_block = fisher.get_fisher_matrix()[slices["TT"], slices["TT"]] te_x_ee = fisher.get_fisher_matrix()[slices["TE"], slices["EE"]] """ if self.rank == 0: return self.fisher return None
[docs] def get_error_bars( self, *, as_dict: bool = False ) -> np.ndarray | dict[str, np.ndarray] | None: """Compute parameter forecast errors from the Fisher matrix. Parameters ---------- as_dict : bool, optional If ``False`` (default, back-compat), return a flat numpy array of shape ``(nspectra * nbins,)`` ordered as described in :meth:`get_fisher_matrix`. If ``True``, return a dict keyed by physical labels (``"TT"``, ``"EE"``, ``"TE"``, ...) with per-spectrum error arrays of shape ``(nbins,)``. Examples -------- Flat shape (default) — same as before:: err = fisher.get_error_bars() # shape (60,) for TQU ee_errors = err[10:20] # bins 0..9 of EE Dict shape — label-keyed, no manual offsets:: err = fisher.get_error_bars(as_dict=True) ee_errors = err["EE"] # shape (nbins,) te_errors = err["TE"] """ if self.rank != 0 or self.fisher is None: return None cov_matrix = matrix_inverse_symm(self.fisher) errors = np.sqrt(np.diag(cov_matrix)) if not as_dict: return errors from cosmocore.conventions.cmb import to_label_dict return to_label_dict( errors, labels=list(self.params.labels), spins=tuple(self.params.spins), spectra_list=self.spectra_list, n_bins=self.bins.nbins, )
[docs] def get_bandpower_slices(self) -> dict[str, slice] | None: """Return a mapping ``{label: slice}`` for the flat Fisher axis. Each value is a ``slice`` selecting that spectrum's bin block in ``get_fisher_matrix()`` (along either axis) or in ``get_error_bars()`` (the single axis). Labels are concatenated from ``params.labels`` per slot (e.g. ``"TT"``, ``"EE"``, ``"TE"``); for multi-frequency setups the user's per-slot labels make the keys unambiguous (``"T100T143"`` etc.). Returns ``None`` until ``compute()`` has run (``spectra_list`` is populated as a side effect of the Fisher pipeline). Examples -------- For TQU under SYMMETRIC mode with ``nbins=10`` the returned mapping is:: {"TT": slice(0, 10), "EE": slice(10, 20), "BB": slice(20, 30), "EB": slice(30, 40), "TE": slice(40, 50), "TB": slice(50, 60)} Typical usage:: F = fisher.get_fisher_matrix() slices = fisher.get_bandpower_slices() tt_diag = np.diag(F[slices["TT"], slices["TT"]]) te_x_ee = F[slices["TE"], slices["EE"]] # cross-spectrum block See Also -------- get_fisher_block : Convenience wrapper that returns a single ``(label_i, label_j)`` block directly. """ if self.spectra_list is None: return None from cosmocore.conventions.cmb import spectrum_key_to_label nbins = self.bins.nbins labels = list(self.params.labels) spins = tuple(self.params.spins) slices: dict[str, slice] = {} for spec_idx, key in enumerate(self.spectra_list): label = spectrum_key_to_label(key, labels=labels, spins=spins) slices[label] = slice(spec_idx * nbins, (spec_idx + 1) * nbins) return slices
[docs] def get_fisher_block( self, label_i: str, label_j: str | None = None ) -> np.ndarray | None: """Extract one ``(label_i, label_j)`` block of the Fisher matrix. Convenience wrapper over :meth:`get_fisher_matrix` and :meth:`get_bandpower_slices`. Parameters ---------- label_i : str Physical label for the row block (e.g. ``"TT"``, ``"EE"``). label_j : str or None, optional Physical label for the column block. If ``None`` (default), returns the auto block ``(label_i, label_i)``. Returns ------- numpy.ndarray or None Block of shape ``(nbins, nbins)``. Returns ``None`` on worker ranks or before ``compute()`` has run. Raises ------ KeyError If either label is not in the current :meth:`get_bandpower_slices` mapping. The error message lists the legal labels. Examples -------- >>> tt_block = fisher.get_fisher_block("TT") >>> te_x_ee = fisher.get_fisher_block("TE", "EE") >>> bb_eb = fisher.get_fisher_block("BB", "EB") # likely zero for std cosmo """ F = self.get_fisher_matrix() slices = self.get_bandpower_slices() if F is None or slices is None: return None if label_j is None: label_j = label_i for label in (label_i, label_j): if label not in slices: raise KeyError( f"Unknown spectrum label {label!r}. " f"Legal labels: {list(slices.keys())}" ) return F[slices[label_i], slices[label_j]]
[docs] def get_window_matrix(self) -> np.ndarray | None: """ Retrieve the window matrix for QML power spectrum estimation. The window matrix W relates the expected QML estimates to the beam-smoothed theory spectrum: ``<q> = W @ C_theory``. It encodes the mode coupling induced by partial sky coverage, beam, and pixel window effects. Layout ------ Shape ``(n_params, n_params)`` with the same flat layout as :meth:`get_fisher_matrix` — each spectrum occupies a contiguous block of ``nbins`` rows/columns, in :attr:`spectra_list` order. When using this matrix in an inference loop, the theory vector ``C_theory`` must follow the same flat ordering; see :meth:`get_bandpower_slices` or pass a label-keyed dict to the ``convolve_theory_func`` returned by :meth:`Spectra.get_power_spectra` (``mode="convolved"``). Returns ------- numpy.ndarray or None Window matrix of shape ``(n_params, n_params)`` where ``n_params = n_spectra * n_bins``. Returns ``None`` for worker processes or if computation hasn't completed. Notes ----- The window matrix is the beam-smoothed Fisher matrix:: W_{bb'} = (1/2) Tr[C⁻¹ dC^b C⁻¹ dC^{b'}] where ``dC^b = Sum_ell w_{b,ell} b²_ell dC^ell`` includes the binning weights and beam smoothing factors. Used by the "convolved" normalization mode, where instead of deconvolving the window, the theory is convolved for comparison. """ return self.get_fisher_matrix()
[docs] def get_bandpower_window_function( self, per_ell_fisher: np.ndarray | None = None, ) -> np.ndarray | None: """ Compute the bandpower window function for binned QML inference. The window matrix W maps a per-ℓ theory spectrum to the expected binned bandpower estimates from the deconvolved QML output: <C_hat_b> = W @ C_ell_theory Use this in a Gaussian likelihood for parameter inference: mu_b(θ) = W @ C_ell(θ) -2 ln L = (d - mu)ᵀ F_b (d - mu) where d = ``Spectra.get_power_spectra(mode="deconvolved")`` and F_b = ``Fisher.get_fisher_matrix()``. This differs from :meth:`get_window_matrix`, which returns the bin-to-bin Fisher used in "convolved" mode and assumes a constant spectrum within each bin. The bandpower window function is the correct quantity for inference when bins are wide enough that the theory varies appreciably within them. Parameters ---------- per_ell_fisher : np.ndarray, optional Pre-computed per-ℓ Fisher matrix of shape ``(n_ell, n_ell)`` (single spectrum). If None, the per-ℓ Fisher is computed on demand by re-running the Fisher pipeline with delta_ell=1 using the current configuration. The result is cached for subsequent calls. Returns ------- W : np.ndarray of shape (n_bins, n_ell), or None Bandpower window matrix. Rows correspond to bin indices, columns to multipoles ℓ=2..lmax. Returns None on worker processes (rank != 0). Notes ----- The window function encodes: - Mode coupling from sky cuts. - Beam smoothing (b²_ℓ). - Pixel window functions. - The Fisher-weighting that QML naturally applies within each bin. The theory C_ℓ should be the **unbeamed** physical spectrum (standard CAMB/CLASS output), since beam² is already absorbed into the derivatives used to build the window. For best statistical accuracy near lmax, use the buffer approach: estimate to ``lmax_buffer = lmax_science + margin`` (a few bin widths or ~1/fsky multipoles), then drop the buffer rows from ``W``, ``d`` and ``F_b`` before forming the likelihood. This absorbs mode coupling from ℓ > lmax_science into discarded buffer bins. **Scope**: currently supports single-spectrum analysis only — calling it on a multi-spectrum Fisher (``nspectra > 1``) raises ``NotImplementedError``. For multi-spectrum likelihoods today, use :meth:`get_window_matrix` together with the ``convolve_theory_func`` returned by :meth:`Spectra.get_power_spectra` (``mode="convolved"``); that path accepts label-keyed dict input and applies the multi- spectrum window matrix. Multi-spectrum extension of this per-ℓ window function is planned. Examples -------- Standard inference workflow: >>> fisher = Fisher("config.yaml", compression={"method": "harmonic"}) >>> fisher.set_binning(bins) >>> fisher.run() >>> W = fisher.get_bandpower_window_function() >>> F_b = fisher.get_fisher_matrix() >>> # In the MCMC loop: >>> mu = W @ cl_theory_per_ell # expected bandpower >>> r = data_bandpower - mu # residual >>> neg2lnL = r @ F_b @ r # quadratic form See Also -------- get_window_matrix : Bin-to-bin window for convolved mode. get_fisher_matrix : Inverse covariance for the bandpower likelihood. """ # _compute_per_ell_fisher() is collective; all ranks must # enter together. Only rank 0 returns W; workers return None. if not hasattr(self, "bins") or self.bins is None: if self.rank == 0: raise ValueError( "Bandpower window requires binning. Call set_binning() before run()." ) return None if self.params.nspectra > 1: if self.rank == 0: raise NotImplementedError( "Bandpower window function currently supports " "single-spectrum analysis only. " "Multi-spectrum extension is planned." ) return None # Broadcast rank 0's decisions so all ranks branch identically. cached = getattr(self, "_bandpower_window", None) provided = bool(self.comm.bcast(per_ell_fisher is not None, root=0)) has_cached = bool(self.comm.bcast(cached is not None, root=0)) and not provided if has_cached: return cached if self.rank == 0 else None if not provided: per_ell_fisher = self._compute_per_ell_fisher() # Only rank 0 has self.fisher and the reduced per_ell_fisher # populated; workers return None. if self.rank != 0 or self.fisher is None: return None # Build Q (sum-over-ℓ-in-bin operator) lmin = self.params.lmin n_ell = self.params.lmax - lmin + 1 # ell indices lmin..lmax Q = np.zeros((self.bins.nbins, n_ell), dtype=np.float64) for b, (lo, hi) in enumerate(zip(self.bins.lmins, self.bins.lmaxs)): Q[b, lo - lmin : hi - lmin + 1] = 1.0 # W = F_b^{-1} @ Q @ F_perell W = cholesky_solve(cholesky_factor(self.fisher), Q @ per_ell_fisher) self._bandpower_window = W return W
def _compute_per_ell_fisher(self) -> np.ndarray: """ Compute the per-ℓ Fisher matrix using the current configuration. Triggers a Fisher computation with delta_ell=1, reusing the existing setup (basis, beams, covariance) by re-invoking the compute() phase with binning temporarily set to per-ℓ. Returns ------- F_perell : np.ndarray, shape (n_ell, n_ell) Per-ℓ Fisher for ell=2..lmax. """ self.log("Computing per-ℓ Fisher for bandpower window function...", level=2) saved_bins = self.bins saved_fisher = self.fisher saved_n_params = self.n_params saved_cached = getattr(self, "_cached_binned_derivatives", None) saved_coo = getattr(self, "_cached_sparse_coo_data", None) try: self.bins = Bins.fromdeltal(self.params.lmin, self.params.lmax, 1) self.n_params = self.params.nspectra * self.bins.nbins self.fisher = np.zeros((self.n_params, self.n_params)) # Drop binned-derivative cache so per-ℓ run rebuilds its own self._cached_binned_derivatives = None self._cached_sparse_coo_data = None self.compute() F_perell = self.fisher.copy() finally: self.bins = saved_bins self.fisher = saved_fisher self.n_params = saved_n_params self._cached_binned_derivatives = saved_cached self._cached_sparse_coo_data = saved_coo return F_perell