"""
Quadratic Maximum Likelihood (QML) power spectrum estimation for cosmological analysis.
This module implements the Spectra class for optimal power spectrum estimation from
observations of spin-0 and spin-2 fields on the sphere using the Quadratic Maximum
Likelihood estimator. The QML method provides unbiased, minimum-variance power spectrum
estimates that properly account for sky cuts, instrumental noise, and pixel correlations.
Applications include CMB temperature and polarization, cosmic shear, and any other signal
described by angular power spectra.
The QML estimator for power spectrum amplitude q_l is given by:
q̂_l = (1/2) * x^T * C^(-1) * ∂C/∂q_l * C^(-1) * x
where x is the data vector, C is the total covariance matrix (signal + noise),
and the covariance matrix of the estimates is (F^(-1))_ll where F is the Fisher
information matrix.
Classes
-------
Spectra
Main class for QML power spectrum estimation inheriting from cosmocore.Core.
Notes
-----
The QML implementation includes several key features:
1. Exact likelihood computation with proper noise modeling
2. MPI parallelization for computational efficiency
3. Support for auto-correlation and cross-correlation spectra
4. Noise bias computation and optional subtraction
5. Integration with Fisher matrix computation for error propagation
The method is computationally intensive, scaling as O(N_pix^3) for matrix operations,
but provides optimal statistical properties compared to pseudo-C_l estimators.
References
----------
.. [1] Tegmark, M. "How to measure CMB power spectra without losing information"
Phys. Rev. D 55, 5895 (1997)
.. [2] Bond, J.R., Jaffe, A.H. & Knox, L. "Estimating the power spectrum of the
cosmic microwave background" Phys. Rev. D 57, 2117 (1998)
.. [3] Oh, S.P., Spergel, D.N. & Hinshaw, G. "An efficient technique to determine
the power spectrum from cosmic microwave background sky maps"
Astrophys. J. 510, 551 (1999)
.. [4] Wandelt, B.D., Larson, D.L. & Lakshminarayanan, A. "Global, exact cosmic
microwave background data analysis using Gibbs sampling"
Phys. Rev. D 70, 083511 (2004)
"""
from __future__ import annotations
import os
import time
import typing
from contextlib import nullcontext
import numpy as np
from cosmocore import (
Bins,
Core,
MPISharedMemoryMixin,
SpectrumKey,
SpectrumKind,
SymmetryMode,
do_derivative_step,
matrix_inverse_symm,
matrix_mult,
matrix_trace,
read_maps,
vec_to_cl,
write_out_matrix,
writecl,
)
from cosmocore._mpi import MPI
from cosmocore.basics import eigh
from cosmocore.settings import InputParams
from qube import Fisher
from qube.fisher import _basis_path_label
[docs]
class Spectra(Core, MPISharedMemoryMixin):
"""
Quadratic Maximum Likelihood (QML) power spectrum estimator.
This class implements the QML method for optimal power spectrum estimation from
observations of spin-0 and spin-2 fields on the sphere (e.g. CMB temperature
and polarization, cosmic shear). The QML estimator provides unbiased,
minimum-variance estimates of angular power spectra C_l while properly accounting
for instrumental noise, sky cuts, and pixel correlations.
The QML estimator computes power spectrum amplitudes as:
q̂_l = (1/2) * x^T * E_l * x
where E_l = C^(-1) * ∂C/∂q_l * C^(-1) is the quadratic estimator matrix,
x is the data vector, and C is the total covariance matrix.
Parameters
----------
params_file : str, optional
Path to YAML parameter file containing analysis configuration.
fisher : Fisher, optional
Pre-computed Fisher matrix instance. If provided, reuses computed
components (covariance matrices, geometry, etc.) for efficiency.
**kwargs : dict
Additional keyword arguments passed to the Core parent class.
Attributes
----------
comm : MPI.Comm
MPI communicator for parallel computation.
rank : int
MPI process rank (0 for master process).
size : int
Total number of MPI processes.
fisher_instance : Fisher
Fisher matrix computation instance, either provided or computed.
maps1, maps2 : numpy.ndarray
Input map data for primary and secondary fields (cross-correlation).
qml_results : numpy.ndarray
QML power spectrum estimates for all simulations and multipoles.
qml_noise_bias : numpy.ndarray
Noise bias estimates for auto-correlation spectra.
invfisher : numpy.ndarray
Inverse of the beam-smoothed Fisher matrix.
inv_cov1, inv_cov2 : numpy.ndarray
Inverted covariance matrices for primary and secondary datasets.
Examples
--------
Basic QML power spectrum estimation:
>>> from cosmoforge.qube import Spectra
>>> spectra = Spectra("config/qml_analysis.yaml")
>>> spectra.run()
>>> power_spectra = spectra.get_power_spectra()
>>> noise_bias = spectra.get_noise_bias()
Using pre-computed Fisher matrix:
>>> from cosmoforge.qube import Fisher, Spectra
>>> fisher = Fisher("config/fisher_config.yaml")
>>> fisher.run()
>>> spectra = Spectra("config/qml_config.yaml", fisher=fisher)
>>> spectra.run() # Reuses Fisher components for efficiency
MPI parallel execution:
>>> # Run with: mpirun -n 8 python qml_analysis.py
>>> spectra = Spectra("config/high_res_config.yaml")
>>> spectra.run() # Distributes computation across processes
Notes
-----
The QML method offers several advantages over pseudo-C_l estimators:
1. Optimal statistical properties (minimum variance, unbiased)
2. Exact treatment of sky cuts and inhomogeneous noise
3. Proper error propagation through Fisher matrix
4. Natural handling of mode coupling
However, it is computationally expensive, scaling as O(N_pix^3) due to
matrix inversions and O(N_ell * N_pix^2) for quadratic form evaluations.
For auto-correlation analyses, noise bias is computed and can be subtracted:
noise_bias_l = (1/2) * Tr[N * E_l]
where N is the noise covariance matrix.
Cross-correlation analyses are naturally noise-bias free when using
independent realizations of noise between maps.
The implementation supports both temperature-only and polarization analyses
with proper treatment of spin-2 fields and E/B mode decomposition.
See Also
--------
Fisher : Fisher information matrix computation
cosmocore.Core : Base class providing fundamental analysis infrastructure
"""
[docs]
def __init__(
self,
params_file: str | None = None,
fisher: Fisher | None = None,
compression: dict | None = None,
**kwargs,
):
"""
Initialize QML power spectrum estimation class.
Parameters
----------
params_file : str, optional
Path to YAML configuration file.
fisher : Fisher, optional
Pre-computed Fisher instance. If provided, reuses computed components
(covariance matrices, geometry, field collections) for efficiency.
compression : dict, optional
Computation basis configuration (method, epsilon, basis, mode_fraction).
**kwargs : dict
Additional arguments passed to Core.
Raises
------
TypeError
If fisher is not a Fisher instance.
ValueError
If fisher doesn't contain a valid Fisher matrix.
"""
self.params: InputParams = None
super().__init__(params=params_file, **kwargs)
# MPI setup
self.comm = MPI.COMM_WORLD
self.rank = self.comm.Get_rank()
self.size = self.comm.Get_size()
# Store computation basis config for Fisher creation
self._basis_config = compression
# Initialize Fisher matrix or compute it
if fisher is not None:
if not isinstance(fisher, Fisher):
raise TypeError("fisher must be an instance of Fisher class.")
if not hasattr(fisher, "fisher") or fisher.fisher is None:
raise ValueError("Fisher instance must have a valid fisher matrix.")
self.fisher_instance = fisher
# Reuse already computed components from Fisher
self._reuse_fisher_components()
else:
self.fisher_instance = self._get_fisher()
# Also reuse components from the internally created Fisher
self._reuse_fisher_components()
# SymmetryMode is owned by Fisher (ADR-0011); Spectra inherits it
# so the two cannot drift apart and produce dimension-mismatched
# spectra lists.
self.symmetry_mode = self.fisher_instance.symmetry_mode
# Initialize QML-specific variables
self.maps1 = None
self.maps2 = None
self.qml_results = None
self.qml_noise_bias = None
self.invfisher = None
# Populated lazily in compute_qml_spectra so _get_binned_derivative
# (Core's keyed API consumer) can resolve spectrum_idx → SpectrumKey
# without threading lists through method signatures.
self.spectra_list: list | None = None
# Normalization mode support
self.inv_fisher_sqrt: np.ndarray | None = None # F^(-1/2) for decorrelated mode
self.fisher_normalized: np.ndarray | None = (
None # Normalized Fisher (for convolved covariance)
)
# lmax for signal matrix computation (matches Fortran convention of 4*nside)
self._lmax_signal = None
def _stage(self, name: str):
"""Return the profiler's stage context, or a nullcontext when none.
Hosts (e.g. benchmark scripts) attach a ``StageProfiler`` instance
by setting ``spectra._profiler``; otherwise the wrapping ``with``
block is a zero-cost no-op.
"""
profiler = getattr(self, "_profiler", None)
return profiler.stage(name) if profiler is not None else nullcontext()
@property
def lmax_signal(self) -> int:
"""
Maximum multipole for signal/derivative matrix computation.
This defaults to 4*nside to match the Fortran reference implementation.
The derivative matrices are computed up to this lmax, while the output
power spectra use params.lmax.
Returns
-------
int
Maximum multipole for signal covariance and derivative computation.
"""
if self._lmax_signal is not None:
return self._lmax_signal
return 4 * self.params.nside
@lmax_signal.setter
def lmax_signal(self, value: int) -> None:
"""Set custom lmax_signal value."""
self._lmax_signal = value
def _reuse_fisher_components(self):
"""Copy computational components from a pre-computed Fisher instance."""
attrs = [
"collection",
"npixs",
"pixact",
"point_vectors",
"noise_cov1",
"noise_cov2",
"signal_matrix",
"bins",
"beam_smoothing",
"basis_manager",
]
for attr in attrs:
val = getattr(self.fisher_instance, attr, None)
if val is not None:
setattr(self, attr, val)
if not hasattr(self, "basis_manager"):
self.basis_manager = None
# Harmonic basis uses SMW formula — pixel-space covariance matrices
# (inv_cov, noise_cov) are not needed for the compressed QML path.
if self.basis_manager is None:
self._load_covariance_matrices()
def _load_covariance_matrices(self):
"""Load noise and inverted covariance matrices from disk files."""
ntot = self.collection.total_active_pixels
# Load inverted covariance matrices
self.inv_cov1 = np.fromfile(self.params.outinvcovmatfile1).reshape(ntot, ntot)
if self.params.do_cross:
self.inv_cov2 = np.fromfile(self.params.outinvcovmatfile2).reshape(ntot, ntot)
# Load noise covariance matrices (created by Fisher.run())
if not os.path.exists(self.params.outnoisecovmat1):
raise FileNotFoundError(
f"Noise covariance file not found: {self.params.outnoisecovmat1}. "
f"Run Fisher analysis first to generate this file."
)
self.noise_cov1 = np.fromfile(self.params.outnoisecovmat1).reshape(ntot, ntot)
if self.params.do_cross:
if not os.path.exists(self.params.outnoisecovmat2):
raise FileNotFoundError(
f"Noise covariance file not found: {self.params.outnoisecovmat2}. "
f"Run Fisher analysis first to generate this file."
)
self.noise_cov2 = np.fromfile(self.params.outnoisecovmat2).reshape(ntot, ntot)
def _get_fisher(self) -> Fisher:
"""
Compute Fisher information matrix for QML normalization.
Returns
-------
Fisher
Completed Fisher instance with all components computed.
"""
if self.rank == 0:
self.log("Starting Fisher matrix computation...", level=1)
start_time = time.time()
fisher = Fisher(self.params, compression=self._basis_config)
fisher.run()
if self.rank == 0:
elapsed = time.time() - start_time
self.log(
f"Fisher matrix computation completed in {elapsed:.2f} seconds", level=1
)
return fisher
[docs]
def setup_maps(self):
"""
Read observational map data from FITS files.
Loads maps with proper pixel selection, field extraction, and calibration.
For cross-correlation (do_cross=True), loads both primary and secondary maps.
Output shape: (n_active_pixels, n_simulations).
Raises
------
ValueError
If pixel information not available (setup_geometry not called).
FileNotFoundError
If input map files not found.
"""
if self.rank == 0:
self.log("Reading maps", level=2)
# Ensure we have pixel information
if not hasattr(self, "npixs") or self.npixs is None:
raise ValueError(
"Pixel information not available. Run setup_geometry first."
)
# Read maps using the core functionality
ntot = sum(self.collection.n_active)
self.maps1 = np.empty((ntot, self.params.nsims), dtype=np.float64)
# Read maps1
read_maps(
maps=self.maps1,
filename=self.params.inputmapfile1,
pixact=self.pixact,
field_labels=self.params.physical_labels,
calibration=self.params.calibration,
)
# Read maps2 if doing cross-correlation
if self.params.do_cross:
self.maps2 = np.empty((ntot, self.params.nsims), dtype=np.float64)
read_maps(
maps=self.maps2,
filename=self.params.inputmapfile2,
pixact=self.pixact,
field_labels=self.params.physical_labels,
calibration=self.params.calibration,
)
[docs]
def setup_fisher_inversion(self):
"""
Prepare inverted Fisher matrix for QML estimation.
The Fisher matrix is already beam-smoothed (beam window functions
absorbed into derivatives). This method computes F⁻¹, F⁻¹/²,
and writes results to output files.
Raises
------
ValueError
If Fisher matrix is not available or singular.
LinAlgError
If Fisher matrix inversion fails.
"""
if self.rank == 0:
self.log("Reading and inverting Fisher matrix", level=2)
# Get Fisher matrix from the Fisher instance
fisher_matrix = self.fisher_instance.get_fisher_matrix()
if fisher_matrix is None:
raise ValueError("Fisher matrix not available")
self.invfisher = fisher_matrix.copy()
# Store beam-smoothed Fisher for convolved mode covariance
self.fisher_normalized = self.invfisher.copy()
# F^(-1/2) for the decorrelated mode is computed lazily on first
# request — it costs an O(n_params^3) eigendecomposition that only
# decorrelated mode consumes.
# Invert Fisher matrix
self.log("Inverting normalized Fisher matrix", level=2)
start_time = time.time()
self.invfisher = matrix_inverse_symm(self.invfisher)
self.log(
f"Fisher matrix inversion time: {time.time() - start_time:.2f} seconds",
level=3,
)
# Write out covariance matrix and errors
self.log("Writing out covariance and errors", level=2)
write_out_matrix(self.params.outcovmatfile, self.invfisher)
# Compute and write parameter errors
vec_error_bars = np.sqrt(np.diag(self.invfisher))
# Convert vector to Cl format and write errors
nbins = self.bins.nbins
nspectra = len(vec_error_bars) // nbins
error_bars = np.zeros((nbins, nspectra), dtype=np.float64)
vec_to_cl(vec_error_bars, error_bars)
writecl(self.params.outerrfile, error_bars)
def _compute_inv_fisher_sqrt(self, fisher: np.ndarray) -> None:
"""
Compute F^(-1/2) using eigendecomposition for decorrelated mode.
This method computes the matrix square root of the inverse Fisher matrix
using eigendecomposition: F = V Λ V^T → F^(-1/2) = V Λ^(-1/2) V^T.
This is used for the "decorrelated" normalization mode which produces
uncorrelated bandpower estimates.
Parameters
----------
fisher : numpy.ndarray
Normalized Fisher information matrix of shape (n_params, n_params).
Must be symmetric positive semi-definite.
Notes
-----
The computation uses eigendecomposition for numerical stability:
1. Decompose: F = V Λ V^T where V are eigenvectors, Λ are eigenvalues
2. Compute inverse square root of eigenvalues: Λ^(-1/2)
3. Reconstruct: F^(-1/2) = V @ diag(Λ^(-1/2)) @ V^T
**Ill-conditioning handling:**
- Eigenvalues below 10^(-12) × max eigenvalue are set to zero
- A warning is logged if condition number exceeds 10^10
- This truncation prevents numerical instability from near-zero modes
The resulting F^(-1/2) matrix is stored in self.inv_fisher_sqrt and
used by the decorrelated mode to produce estimates with identity
covariance matrix.
Examples
--------
This method is called automatically during setup_fisher_inversion():
>>> spectra = Spectra("config.yaml")
>>> spectra.run()
>>> # inv_fisher_sqrt is now available for decorrelated mode
>>> cl_decorr = spectra.get_power_spectra(mode="decorrelated")
See Also
--------
get_power_spectra : Uses inv_fisher_sqrt for 'decorrelated' mode
setup_fisher_inversion : Calls this method during setup
"""
eigenvalues, eigenvectors = eigh(fisher)
# Check conditioning
min_eigenvalue = (
eigenvalues[eigenvalues > 0].min() if np.any(eigenvalues > 0) else 1e-300
)
max_eigenvalue = eigenvalues.max()
cond = max_eigenvalue / min_eigenvalue if min_eigenvalue > 0 else np.inf
if cond > 1e10:
self.log(
f"Warning: Fisher matrix poorly conditioned (cond={cond:.2e}). "
"Decorrelated mode may have inflated errors.",
level=1,
)
# Compute Λ^(-1/2), handling small eigenvalues
inv_sqrt_eigenvalues = np.zeros_like(eigenvalues)
threshold = max_eigenvalue * 1e-12
valid = eigenvalues > threshold
inv_sqrt_eigenvalues[valid] = 1.0 / np.sqrt(eigenvalues[valid])
# F^(-1/2) = V @ diag(Λ^(-1/2)) @ V^T
self.inv_fisher_sqrt = (
eigenvectors @ np.diag(inv_sqrt_eigenvalues) @ eigenvectors.T
)
self.log(
f"Computed F^(-1/2) for decorrelated mode "
f"({np.sum(valid)}/{len(eigenvalues)} modes valid)",
level=3,
)
[docs]
def setup_qml_computation(self):
"""
Initialize arrays and variables for QML power spectrum computation.
This method allocates memory for QML estimation results and prepares
the computational infrastructure for the main QML calculation phase.
Arrays are sized based on the number of multipoles, spectra, and
Monte Carlo simulations.
Notes
-----
Initializes the following arrays:
- qml_results: Storage for QML power spectrum estimates with shape
(n_simulations, n_total_parameters) where n_total_parameters =
n_spectra * (lmax - 1)
- qml_noise_bias: Storage for noise bias estimates (auto-correlation only)
with shape (n_total_parameters,)
The qml_results array stores the raw quadratic estimates:
q̂_l^(sim) = (1/2) * x^(sim)T * E_l * x^(sim)
for each simulation and multipole. These raw estimates will be
combined using the inverse Fisher matrix to produce the final
optimally-weighted power spectrum estimates.
For auto-correlation analyses, noise bias terms are computed as:
bias_l = (1/2) * Tr[N * E_l]
where N is the noise covariance matrix and E_l is the quadratic
estimator matrix for multipole l.
Memory allocation scales as O(n_sims * n_ell * n_spec) for results
and O(n_ell * n_spec) for bias terms, which is typically much
smaller than the O(n_pix^2) covariance matrix storage.
Examples
--------
This method is called automatically during computation:
>>> spectra = Spectra("config.yaml")
>>> spectra.setup_qml_computation()
>>> print(f"QML results shape: {spectra.qml_results.shape}")
>>> print(f"Total parameters: {spectra.qml_results.shape[1]}")
"""
n_params = self.params.nspectra * self.bins.nbins
# Initialize y vectors for QML estimation
self.qml_results = np.zeros((self.params.nsims, n_params), dtype=np.float64)
if not self.params.do_cross:
self.qml_noise_bias = np.zeros(n_params, dtype=np.float64)
[docs]
def compute_qml_spectra(self):
"""
Execute parallel QML power spectrum computation.
Distributes multipole computations across MPI processes (round-robin).
For each multipole: computes signal derivative, E-operator, and
quadratic estimates for all simulations. Selects compressed or
traditional method based on basis_manager availability.
"""
# Resolve spectra_list once so the per-bin SpectrumKey lookup inside
# _get_binned_derivative works for both compressed and traditional
# paths. Mirrors Fisher.compute() which populates
# ``self.fisher_instance.spectra_list`` for both paths too. Fall
# back to building locally if Fisher hasn't populated it yet
# (test fixtures that bypass Fisher.compute()).
if self.spectra_list is None:
fisher_list = getattr(self.fisher_instance, "spectra_list", None)
if fisher_list is not None:
self.spectra_list = list(fisher_list)
else:
_, self.spectra_list = self._build_multi_spectrum_inputs()
# Check if we should use compressed computation
use_basis = hasattr(self, "basis_manager") and self.basis_manager is not None
if use_basis:
self._compute_qml_spectra_compressed()
else:
self._compute_qml_spectra_traditional()
def _build_multi_spectrum_inputs(self):
"""Build C_ell_dict and spectra_list keyed by SpectrumKey."""
# Fall back to SYMMETRIC for callers that constructed Spectra via
# __new__ (test fixtures bypass __init__).
symmetry_mode = getattr(self, "symmetry_mode", SymmetryMode.SYMMETRIC)
return self.collection.spectra_manager.build_inputs(symmetry_mode=symmetry_mode)
def _build_derivative_matrix(self, ell: int, spectrum_idx: int = 0) -> np.ndarray:
"""Build pixel-space derivative matrix dC/dC_ell (no-basis fallback)."""
ntot = sum(self.collection.n_active)
dC = np.zeros((ntot, ntot), dtype=np.float64, order="F")
do_derivative_step(
dC, spectrum_idx, self.npixs, self.params.spins, ell, self.collection
)
return dC
def _get_binned_derivative(self, bin_idx: int, spectrum_idx: int = 0) -> np.ndarray:
"""
Compute binned derivative dC^b = Sum_{ell in bin} b²_ell dC^ell.
Uses Fisher's cache when available, otherwise delegates to
:meth:`Core.get_binned_derivative_matrix` with the SpectrumKey
resolved from ``self.spectra_list[spectrum_idx]``.
"""
if hasattr(self, "fisher_instance") and self.fisher_instance is not None:
cache = getattr(self.fisher_instance, "_cached_binned_derivatives", None)
if cache is not None:
cache_key = (spectrum_idx, bin_idx)
if cache_key in cache:
return cache[cache_key]
# beam_smoothing per-spectrum blocks hold the inference-range beams
# (ell=lmin..lmax, offset-from-lmin; ADR 0009).
n_ell = self.params.lmax - self.params.lmin + 1
beam_offset = spectrum_idx * n_ell
beam = self.beam_smoothing[beam_offset : beam_offset + n_ell]
key = self.spectra_list[spectrum_idx]
return self.get_binned_derivative_matrix(
bin_idx,
key,
beam_smoothing=beam,
symmetry_mode=self.symmetry_mode,
)
def _compute_noise_cov_compressed(
self,
bm,
stable_inner_inv,
*,
full_matrix=False,
) -> np.ndarray:
"""Compute noise covariance in compressed space.
``Cov(w|noise) = V C^{-1} N C^{-1} V^T = A T A^T`` with
``A = (I + Lambda M)^{-T} = stable_inner_inv.T`` and
``T = V N_eff^{-1} N N_eff^{-1} V^T`` (sourced from the basis
via :meth:`ComputationBasis.get_noise_for_bias`).
"""
A = stable_inner_inv.T
AT = A @ bm.get_noise_for_bias()
if full_matrix:
return AT @ A.T
return np.einsum("ij,ij->i", AT, A)
def _compute_qml_spectra_compressed(self):
"""
Compute QML spectra using a computation basis (harmonic or pixel).
Performs QML estimation entirely in basis space, consistent with the
basis-aware Fisher matrix computation. The estimator is
q_b = (1/2) * w^T @ E_b @ w
where w = V C^{-1} d is the basis-projected weighted data (built via
SMW for the harmonic basis) and E_b is the binned derivative matrix.
This is mathematically equivalent to the traditional pixel-space form
q_b = (1/2) * d^T C^{-1} dC_b C^{-1} d, since dC_b = V^T E_b V.
Inner-loop paths
----------------
- **Sparse-COO** (preferred when ``basis_manager.method == "harmonic"``
and Fisher cached the COO triplets): for each bin the per-ℓ
derivative is stored as ``(rows, cols, vals)`` with O(2ℓ+1) nonzeros.
q and the noise-bias trace reduce to einsum/contractions over the
nonzero pattern (~200x-800x faster than dense E_b @ w at production
scale; same trick the Fisher trace path uses).
- **Dense fallback**: pixel basis, or harmonic basis when the COO
cache is missing. Builds the full E_b matrix on demand and uses
the original dense E_b @ w / matrix_trace path.
Supports single-field, multi-field, and cross-correlation
(``do_cross=True``) configurations on both paths.
"""
if self.rank == 0:
path_label = _basis_path_label(self.basis_manager)
self.log(f"Starting QML computation (path: {path_label})", level=2)
start_time = time.time()
bm = self.basis_manager
n_sims = self.params.nsims
dim = bm.dim
# Multi-field path is needed when >1 components or spin-2 (spin-2 has
# multiple spectra EE/BB/EB even for a single field)
has_spin2 = any(f.spin == 2 for f in self.collection.fields)
is_multi_field = bm.n_components > 1 or has_spin2
# Build C_ell or C_ell_dict depending on multi-field
if is_multi_field:
C_ell_dict, spectra_list = self._build_multi_spectrum_inputs()
C_ell = None # Not used for multi-field
else:
C_ell = self.collection.spectra_manager.get_cls(0, 0, 0)
C_ell_dict = None
spins = tuple(f.spin for f in self.collection.fields)
spectra_list = [SpectrumKey(0, 0, SpectrumKind.SS, spins=spins)]
self.spectra_list = spectra_list
# Compute weighted compressed data for all simulations
# w = V @ C^{-1} @ d (using SMW formula internally)
# For multi-field harmonic, precompute kernel_inv once — reused for
# data weighting, cross-correlation weighting, and noise bias.
# Stable SMW algebra: w = V C^{-1} d = (I + Lambda M)^{-1} (V N^{-1} d).
# Replaces the legacy w = y - M K^{-1} y, which loses precision in
# the cosmic-variance-limited regime via catastrophic cancellation.
# The same matrix (I + Lambda M)^{-1} reappears as the noise-bias
# matrix A = I - M K^{-1}, so we cache it once and reuse below.
stable_inner_inv = None
maps1_weighted = np.zeros((dim, n_sims), dtype=np.float64)
C_arg = C_ell_dict if is_multi_field else C_ell
if bm.method == "harmonic":
stable_inner_inv = bm.prepare_stable_inner_inv(C_arg)
maps1_weighted = bm.get_weighted_data(
self.maps1, C_arg, stable_inner_inv=stable_inner_inv
)
else:
C_c_inv = bm.get_inverse(C_arg)
if is_multi_field:
# pixel multi-field: explicit compress-then-multiply
d_c = bm.to_basis(self.maps1)
maps1_weighted = C_c_inv @ d_c
else:
maps1_weighted = bm.get_weighted_data(self.maps1, C_ell, C_c_inv=C_c_inv)
if self.params.do_cross:
maps2_weighted = np.zeros((dim, n_sims), dtype=np.float64)
if bm.method == "harmonic":
maps2_weighted = bm.get_weighted_data(
self.maps2, C_arg, stable_inner_inv=stable_inner_inv
)
elif is_multi_field:
d_c2 = bm.to_basis(self.maps2)
maps2_weighted = C_c_inv @ d_c2
else:
maps2_weighted = bm.get_weighted_data(self.maps2, C_ell, C_c_inv=C_c_inv)
# For noise bias computation (only for non-cross)
noise_cov_w_diag = None
noise_cov_w = None
if not self.params.do_cross:
if bm.method == "harmonic":
# Non-diagonal derivatives (cross-field spin-0, EB, TE, TB)
# need the full noise covariance for correct Tr[E_b @ Cov].
# Only single-spectrum (pure TT/EE/BB) can use diagonal alone.
need_full = self.params.nspectra > 1
if need_full:
noise_cov_w = self._compute_noise_cov_compressed(
bm, stable_inner_inv, full_matrix=True
)
noise_cov_w_diag = np.diag(noise_cov_w)
else:
noise_cov_w_diag = self._compute_noise_cov_compressed(
bm, stable_inner_inv
)
else:
# For pixel: use compressed quantities. Noise bias requires
# the raw N (not N_eff with absorbed high-ℓ signal), so use
# the dedicated get_noise_for_bias() method.
if is_multi_field:
C_bar_inv = bm.get_inverse(C_ell_dict)
else:
C_bar_inv = bm.get_inverse(C_ell)
N_bar = bm.get_noise_for_bias()
noise_cov_w = C_bar_inv @ N_bar @ C_bar_inv
# Harmonic-basis binned derivatives are extremely sparse (~2ℓ+1 nonzeros
# in an n_modes×n_modes layout), so Fisher caches them in COO form. When
# available, replace dense E_b @ w with einsum over the nonzero pattern.
# The dense path remains for pixel-basis QML, which has no comparable
# sparsity, and for the cache-less case used by some tests.
coo_cache = (
getattr(self.fisher_instance, "_cached_sparse_coo_data", None)
if getattr(self, "fisher_instance", None) is not None
else None
)
use_sparse = coo_cache is not None
self._qml_path_used = "sparse" if use_sparse else "dense"
# Main computation loop - distribute bins across processes
nbins = self.bins.nbins
n_params = self.params.nspectra * nbins
for il in range(n_params):
if self.rank == il % self.size:
spectrum_idx = il // nbins
bin_idx = il % nbins
if use_sparse:
rows, cols, vals = coo_cache[(spectrum_idx, bin_idx)]
if vals.size == 0:
if not self.params.do_cross:
self.qml_noise_bias[il] = 0.0
self.qml_results[:, il] = 0.0
continue
if self.params.do_cross:
# q = 0.5 * sum_k v_k * w2[r_k, s] * w1[c_k, s]
self.qml_results[:, il] = 0.5 * np.einsum(
"k,ks,ks->s",
vals,
maps2_weighted[rows],
maps1_weighted[cols],
)
else:
q_per_sim = 0.5 * np.einsum(
"k,ks,ks->s",
vals,
maps1_weighted[rows],
maps1_weighted[cols],
)
# Noise bias: 0.5 * Tr[E_b @ Cov] = 0.5 * sum_k v * Cov[c,r]
if noise_cov_w is not None:
tr_ne = 0.5 * float(np.sum(vals * noise_cov_w[cols, rows]))
else:
# Diagonal-only noise cov is sufficient when E_b is
# diagonal (rows == cols for harmonic auto-spectra).
tr_ne = 0.5 * float(np.sum(vals * noise_cov_w_diag[cols]))
self.qml_noise_bias[il] = tr_ne
if hasattr(self.params, "remove_nb") and self.params.remove_nb:
q_per_sim -= tr_ne
self.qml_results[:, il] = q_per_sim
continue
# Get binned derivative: 1D diagonal vector (harmonic auto-
# spectra) or 2D dense matrix (cross-spectra / pixel-space).
E_b = self._get_binned_derivative(bin_idx, spectrum_idx)
# Exploit diagonal structure when available:
# diag * maps is O(n × n_sims) vs dense @ maps O(n² × n_sims)
is_diag = E_b.ndim == 1
if is_diag:
E_b_times_w1 = E_b[:, np.newaxis] * maps1_weighted
else:
E_b_times_w1 = E_b @ maps1_weighted # (n_modes, n_sims)
if self.params.do_cross:
# q = 0.5 * w2^T @ E_b @ w1 for all sims
self.qml_results[:, il] = 0.5 * np.sum(
maps2_weighted * E_b_times_w1, axis=0
)
else:
# Noise bias: 0.5 * Tr[E_b @ Cov(w|noise)]. The dense-E_b /
# diag-only-noise branch handles the single-spectrum cache-miss
# recompute path where _get_binned_derivative returns dense.
if is_diag and noise_cov_w_diag is not None:
tr_ne = 0.5 * np.sum(E_b * noise_cov_w_diag)
elif is_diag:
tr_ne = 0.5 * np.sum(E_b * np.diag(noise_cov_w))
elif noise_cov_w is not None:
tr_ne = 0.5 * matrix_trace(E_b, noise_cov_w)
else:
tr_ne = 0.5 * np.sum(np.diagonal(E_b) * noise_cov_w_diag)
self.qml_noise_bias[il] = tr_ne
# q = 0.5 * w^T @ E_b @ w for all sims
qml_values = 0.5 * np.sum(maps1_weighted * E_b_times_w1, axis=0)
if hasattr(self.params, "remove_nb") and self.params.remove_nb:
qml_values -= tr_ne
self.qml_results[:, il] = qml_values
# Synchronize all processes
self.comm.Barrier()
if self.rank == 0:
path_label = _basis_path_label(self.basis_manager)
self.log(f"QML computation done (path: {path_label})", level=2)
self.log(
f"QML computation time: {time.time() - start_time:.2f} seconds", level=3
)
# Reduce results from all processes
self._reduce_qml_results(n_params)
def _compute_qml_spectra_traditional(self):
"""
Compute QML spectra using traditional pixel-space computation.
Optimized: Precomputes y = C^{-1} @ d to avoid building full E matrix.
The QML estimator is:
q_b = (1/2) * d^T @ C^{-1} @ dC_b @ C^{-1} @ d
= (1/2) * y^T @ dC_b @ y where y = C^{-1} @ d
"""
if self.rank == 0:
self.log("Starting QML computation (path: traditional)", level=2)
start_time = time.time()
nbins = self.bins.nbins
nspectra = self.params.nspectra
n_params = nspectra * nbins
# Precompute weighted data: y = C⁻¹ d for all simulations
weighted_maps1 = matrix_mult(self.inv_cov1, self.maps1)
if self.params.do_cross:
weighted_maps2 = matrix_mult(self.inv_cov2, self.maps2)
# For noise bias: C⁻¹ N C⁻¹ (precomputed once)
if not self.params.do_cross:
cinv_noise_cinv = matrix_mult(
self.inv_cov1, matrix_mult(self.noise_cov1, self.inv_cov1)
)
for param_idx in range(n_params):
if self.rank == param_idx % self.size:
spectrum_idx = param_idx // nbins
bin_idx = param_idx % nbins
binned_deriv = self._get_binned_derivative(bin_idx, spectrum_idx)
deriv_times_y1 = matrix_mult(binned_deriv, weighted_maps1)
if self.params.do_cross:
self.qml_results[:, param_idx] = 0.5 * np.sum(
weighted_maps2 * deriv_times_y1, axis=0
)
else:
noise_bias = 0.5 * matrix_trace(cinv_noise_cinv, binned_deriv)
self.qml_noise_bias[param_idx] = noise_bias
qml_values = 0.5 * np.sum(weighted_maps1 * deriv_times_y1, axis=0)
if hasattr(self.params, "remove_nb") and self.params.remove_nb:
qml_values -= noise_bias
self.qml_results[:, param_idx] = qml_values
self.comm.Barrier()
if self.rank == 0:
self.log("QML computation done (path: traditional)", level=2)
self.log(
f"QML computation time: {time.time() - start_time:.2f} seconds",
level=3,
)
self._reduce_qml_results(n_params)
def _reduce_qml_results(self, n_params: int):
"""Gather and combine QML results from all MPI processes."""
# Reduce y vectors
reduced_qml_results = np.zeros((self.params.nsims, n_params), dtype=np.float64)
self.comm.Reduce(self.qml_results, reduced_qml_results, op=MPI.SUM, root=0)
if not self.params.do_cross:
# Reduce noise bias
reduced_qml_noise_bias = np.zeros(n_params, dtype=np.float64)
self.comm.Reduce(
self.qml_noise_bias, reduced_qml_noise_bias, op=MPI.SUM, root=0
)
if self.rank == 0:
self.qml_results = reduced_qml_results
if not self.params.do_cross:
self.qml_noise_bias = reduced_qml_noise_bias
[docs]
def compute(self):
"""
Execute QML power spectrum computation.
Calls setup_qml_computation() to initialize arrays, then
compute_qml_spectra() for the main parallel computation.
"""
with self._stage("spectra.compute.setup"):
self.setup_qml_computation()
# Compute QML power spectra
with self._stage("spectra.compute.qml_spectra"):
self.compute_qml_spectra()
[docs]
def run(self):
"""
Execute the complete QML power spectrum analysis pipeline.
Pipeline phases:
1. Master setup: fields, geometry, covariance, spectra, beams
(reuses Fisher components if provided)
2. QML setup: maps, Fisher inversion
3. MPI broadcast of shared data to workers
4. Parallel QML computation
5. MPI synchronization
Raises
------
ValueError
If required setup components are missing.
FileNotFoundError
If input files not accessible.
"""
# Only rank 0 does the initial setup
if self.rank == 0:
with self._stage("spectra.setup_static"):
# If we have a pre-computed Fisher instance, reuse its setup
if hasattr(self, "collection") and self.collection is not None:
self.log("Reusing setup from Fisher instance", level=2)
else:
# Setup from Core class
self.setup_fields()
self.setup_geometry()
self.setup_covariance_matrices()
# Setup Cls and beams with lmax_signal (defaults to 4*nside)
# This matches the Fortran convention for derivative computation
self.log(
f"Using lmax_signal = {self.lmax_signal} for Cls and beams",
level=3,
)
self.setup_cls(lmax=self.lmax_signal)
self.setup_beams(lmax=self.lmax_signal)
# Load covariance matrices when not reusing a Fisher instance
self._load_covariance_matrices()
with self._stage("spectra.binning"):
# Setup binning: Fisher > set_binning() > config > default
if not hasattr(self, "bins") or self.bins is None:
delta_ell = getattr(self.params, "delta_ell", 1)
self.set_binning(
Bins.fromdeltal(self.params.lmin, self.params.lmax, delta_ell)
)
with self._stage("spectra.setup_maps"):
self.setup_maps()
with self._stage("spectra.setup_fisher_inversion"):
self.setup_fisher_inversion()
# Synchronize and broadcast shared variables to worker processes.
# Skip broadcast for single-process runs to avoid unnecessary
# serialization of large objects (covariance matrices can exceed 2 GB).
self.comm.Barrier()
with self._stage("spectra.mpi_distribution"):
if self.size > 1:
self._broadcast_variables()
self.comm.Barrier()
# Main QML computation (parallel computation)
self.compute()
# Finalize
self.comm.Barrier()
def _broadcast_variables(self):
"""Broadcast essential data from master to all MPI worker processes."""
# Python objects (small, serialization is fine)
self.params = self.comm.bcast(self.params if self.rank == 0 else None, root=0)
self.collection = self.comm.bcast(
self.collection if self.rank == 0 else None, root=0
)
self.bins = self.comm.bcast(self.bins if self.rank == 0 else None, root=0)
self.npixs = self.comm.bcast(self.npixs if self.rank == 0 else None, root=0)
self.pixact = self.comm.bcast(self.pixact if self.rank == 0 else None, root=0)
# Numpy arrays via shared memory (zero-copy, no size limits).
# Worker ranks may not have these attributes yet, so use getattr.
point_vectors = getattr(self, "point_vectors", None)
n_point_vectors = self.comm.bcast(
len(point_vectors) if self.rank == 0 and point_vectors is not None else 0,
root=0,
)
self.point_vectors = tuple(
self._shared_array(
point_vectors[i] if self.rank == 0 and point_vectors is not None else None
)
for i in range(n_point_vectors)
)
use_basis = hasattr(self, "basis_manager") and self.basis_manager is not None
# Pixel-space covariance matrices only needed for traditional path
if not use_basis:
self.noise_cov1 = self._shared_array(getattr(self, "noise_cov1", None))
self.inv_cov1 = self._shared_array(getattr(self, "inv_cov1", None))
if self.params.do_cross:
self.noise_cov2 = self._shared_array(getattr(self, "noise_cov2", None))
self.inv_cov2 = self._shared_array(getattr(self, "inv_cov2", None))
# Maps (can be large: n_pix × n_sims)
self.maps1 = self._shared_array(getattr(self, "maps1", None))
if self.params.do_cross:
self.maps2 = self._shared_array(getattr(self, "maps2", None))
# Fisher-related (small but still read-only)
self.invfisher = self._shared_array(getattr(self, "invfisher", None))
self.beam_smoothing = self._shared_array(getattr(self, "beam_smoothing", None))
self.inv_fisher_sqrt = self._shared_array(getattr(self, "inv_fisher_sqrt", None))
self.fisher_normalized = self._shared_array(
getattr(self, "fisher_normalized", None)
)
def _normalize_spectra(self, spectra: np.ndarray) -> np.ndarray:
"""
Apply inverse Fisher to raw QML estimates: Ĉ = F⁻¹ q.
Parameters
----------
spectra : np.ndarray
Raw QML estimates, shape (n_params,) or (n_sims, n_params).
Returns
-------
np.ndarray
Deconvolved power spectrum estimates, same shape as input.
"""
if self.invfisher is None:
raise ValueError("Fisher inversion must be set up first.")
return spectra @ self.invfisher
[docs]
def get_power_spectra(
self, mode: str = "deconvolved", *, as_dict: bool = False
) -> np.ndarray | tuple[np.ndarray, np.ndarray, typing.Callable] | dict | None:
"""
Retrieve power spectrum estimates in specified normalization mode.
This method returns power spectrum estimates with three different
normalization prescriptions, allowing users to choose the output format
best suited to their analysis needs.
Parameters
----------
mode : str, optional
Normalization mode for output (default: "deconvolved"):
- "deconvolved": F⁻¹y - estimates of true C_ℓ with correlated errors.
This is the standard QML output that attempts to recover the true
underlying spectrum by inverting the window function.
- "decorrelated": F⁻¹/²y - uncorrelated bandpower estimates with
identity covariance matrix. Useful when independent error bars
are needed.
- "convolved": Raw y estimates with window matrix W for theory
comparison. Instead of deconvolving the window, compare with
window-convolved theory: <y> = W @ C_true.
as_dict : bool, optional
If ``False`` (default, back-compat), return the flat numpy
array described below. If ``True``, return a dict keyed by
physical labels (``"TT"``, ``"EE"``, ``"TE"``, ...) whose
values have shape ``(n_simulations, n_bins)``. In
``"convolved"`` mode, the dict applies to the raw estimates
``y`` only; the window matrix ``W`` and ``convolve_theory``
callable are unchanged (use :meth:`Fisher.get_bandpower_slices`
on ``self.fisher_instance`` to navigate ``W``).
Returns
-------
numpy.ndarray or tuple or dict or None
For "deconvolved" or "decorrelated" modes with
``as_dict=False``:
Array of shape ``(n_simulations, n_parameters)`` where
``n_parameters = n_spectra * n_bins``.
For "convolved" mode with ``as_dict=False``:
Tuple of ``(y, W, convolve_theory_func)`` where:
- ``y``: Raw estimates of shape ``(n_simulations, n_parameters)``
- ``W``: Window matrix of shape ``(n_parameters, n_parameters)``
- ``convolve_theory_func``: Callable that applies ``W @ theory``
With ``as_dict=True``: a dict for the per-spectrum arrays
(and, in convolved mode, a 3-tuple ``(y_dict, W, convolve)``).
Returns ``None`` for worker processes (rank != 0) or if
computation has not completed.
**Layout (flat-array shape)**:
The columns of the returned array are organised as ``n_spectra``
contiguous blocks of ``n_bins`` columns each. Spectrum order
follows :attr:`Fisher.spectra_list` on
``self.fisher_instance`` — auto-spectra first (per component,
in the order each component emits them), then cross-spectra in
``(i, j)`` order with ``i < j``. For TQU (``spins=[0, 2]``,
``labels=["T", "E", "B"]``) under ``SymmetryMode.SYMMETRIC``
with ``nbins=10``, ``n_parameters = 60`` and the columns are::
cols 0..9 : TT
cols 10..19 : EE
cols 20..29 : BB
cols 30..39 : EB
cols 40..49 : TE
cols 50..59 : TB
Use :meth:`Fisher.get_bandpower_slices` (on
``self.fisher_instance``) or pass ``as_dict=True`` to look up
per-spectrum slices without computing these offsets by hand.
Raises
------
ValueError
If mode is not one of "deconvolved", "decorrelated", "convolved".
Notes
-----
**Mode Comparison:**
| Mode | Formula | Covariance | Use Case |
|------|---------|------------|----------|
| deconvolved | F⁻¹y | F⁻¹ (correlated) | Standard analysis |
| decorrelated | F⁻¹/²y | I (identity) | Independent errors |
| convolved | y | F | Theory comparison |
**Deconvolved Mode (default):**
The standard QML output that inverts the window function to recover
estimates of the true C_ℓ. Errors are correlated between multipoles.
**Decorrelated Mode:**
Produces bandpower estimates with uncorrelated errors (unit variance).
Useful for plotting error bars or when independent measurements are
required. Note that information content is preserved but redistributed.
**Convolved Mode:**
Returns raw QML estimates without deconvolution, along with the window
matrix for comparing with convolved theoretical spectra. This avoids
numerical issues from inverting poorly-conditioned window matrices.
**Output Convention:**
When ``output_convention`` is set to ``"Dl"`` in the configuration,
all returned spectra are converted from C_ℓ to D_ℓ = ℓ(ℓ+1)/(2π) C_ℓ.
In convolved mode, the window matrix is also transformed so that
the returned ``convolve_theory_func`` expects D_ℓ input.
Examples
--------
Default (flat array) — back-compat::
>>> spectra = Spectra("config.yaml")
>>> spectra.run()
>>> cl_deconv = spectra.get_power_spectra() # shape (n_sims, 60)
>>> ee_slice = cl_deconv[:, 10:20] # bins 0..9 of EE
Label-keyed dict (recommended for human-readable code)::
>>> cl = spectra.get_power_spectra(as_dict=True)
>>> cl["EE"].shape # (n_sims, n_bins)
>>> cl["TE"] # cross-spectrum
Decorrelated bandpowers::
>>> cl_decorr = spectra.get_power_spectra(mode="decorrelated")
>>> # Errors are all 1.0 by construction
Convolved mode for theory comparison::
>>> y, W, convolve = spectra.get_power_spectra(mode="convolved")
>>> cl_theory_convolved = convolve(cl_theory)
>>> # Compare: y should match cl_theory_convolved
>>> # With as_dict=True, the y component becomes a dict; W and
>>> # the convolve callable are unchanged.
>>> y_dict, W, convolve = spectra.get_power_spectra(
... mode="convolved", as_dict=True
... )
See Also
--------
get_covariance : Get covariance matrix for specified mode
get_error_bars : Get 1-sigma error bars for specified mode
convolve_theory : Apply window matrix to theoretical spectrum
Fisher.get_bandpower_slices : Slice mapping for navigating the flat layout.
"""
valid_modes = ("deconvolved", "decorrelated", "convolved")
if mode not in valid_modes:
raise ValueError(f"mode must be one of {valid_modes}, got '{mode}'")
if self.rank != 0 or self.qml_results is None:
return None
if mode == "deconvolved":
result = self._get_deconvolved()
elif mode == "decorrelated":
result = self._get_decorrelated()
else: # convolved
result = self._get_convolved()
if self._output_is_dl() and result is not None:
result = self._apply_output_convention(result, mode)
if as_dict and result is not None:
from cosmocore.conventions.cmb import to_label_dict
labels = list(self.params.labels)
spins = tuple(self.params.spins)
spectra_list = self.fisher_instance.spectra_list
n_bins = self.bins.nbins
if mode == "convolved":
y, W, convolve = result
y_dict = to_label_dict(
y,
labels=labels,
spins=spins,
spectra_list=spectra_list,
n_bins=n_bins,
)
return y_dict, W, convolve
return to_label_dict(
result,
labels=labels,
spins=spins,
spectra_list=spectra_list,
n_bins=n_bins,
)
return result
def _output_is_dl(self) -> bool:
"""Check if output convention is Dl (case-insensitive)."""
value = getattr(self.params, "output_convention", "Cl")
key = value.strip().lower()
if key not in ("cl", "dl"):
raise ValueError(
f"Unknown spectra convention '{value}'. Must be 'Cl' or 'Dl'."
)
return key == "dl"
def _dl_factor(self) -> np.ndarray:
"""Return the Cl->Dl factor tiled over all spectra."""
ell = self.bins.lbin.astype(np.float64)
return np.tile(ell * (ell + 1) / (2 * np.pi), self.params.nspectra)
def _apply_output_convention(self, result, mode):
"""Apply Cl->Dl conversion to output power spectra."""
d = self._dl_factor()
if mode in ("deconvolved", "decorrelated"):
return result * d[np.newaxis, :]
else: # convolved
y, W, convolve_cl = result
# W_Dl = D @ W_Cl @ D^{-1} so that <y_Dl> = W_Dl @ C_Dl
W_dl = W * np.outer(d, 1.0 / d)
convolve_theory_dl = self._make_convolve_theory(W_dl)
return (y * d[np.newaxis, :], W_dl, convolve_theory_dl)
def _make_convolve_theory(self, W: np.ndarray):
"""Build a ``convolve_theory`` callable that accepts either a flat
``cl_theory`` of length ``n_params`` or a label-keyed dict mapping
physical labels (e.g. ``"TT"``, ``"EE"``) to per-bin arrays.
Both forms return a flat numpy array of length ``n_params`` so the
downstream comparison code does not have to branch on shape.
Convention
----------
The input values must be in the configuration's active output
convention. If ``params.output_convention == "Dl"`` the window
matrix wraps ``W_dl = D · W_cl · D^{-1}`` and the user must
pass D_ℓ-binned theory; otherwise pass C_ℓ-binned theory.
Mixing conventions yields silently-wrong predictions.
"""
from cosmocore.conventions.cmb import spectrum_key_to_label
labels_per_slot = list(self.params.labels)
spins = tuple(self.params.spins)
spectra_list = self.fisher_instance.spectra_list
# Pre-compute the spectrum order in physical labels so dict inputs
# can be assembled into the flat vector without re-deriving labels
# on every call.
if spectra_list is not None:
spectrum_labels = [
spectrum_key_to_label(k, labels=labels_per_slot, spins=spins)
for k in spectra_list
]
else:
spectrum_labels = None
def convolve_theory(cl_theory) -> np.ndarray:
if isinstance(cl_theory, dict):
if spectrum_labels is None:
raise RuntimeError(
"convolve_theory dict input requires spectra_list "
"(call fisher.run() first)."
)
missing = [lab for lab in spectrum_labels if lab not in cl_theory]
if missing:
raise KeyError(
f"convolve_theory dict missing entries for: {missing}. "
f"Expected keys: {spectrum_labels}"
)
cl_flat = np.concatenate(
[np.asarray(cl_theory[lab]) for lab in spectrum_labels]
)
else:
cl_flat = np.asarray(cl_theory)
return W @ cl_flat
return convolve_theory
def _get_deconvolved(self) -> np.ndarray:
"""
Compute deconvolved power spectrum estimates (F⁻¹y).
This is the standard QML output that multiplies raw estimates by the
inverse Fisher matrix to recover estimates of the true C_ℓ.
Returns
-------
numpy.ndarray
Deconvolved power spectrum estimates with shape (nsims, n_params).
"""
return self._normalize_spectra(self.qml_results)
def _get_decorrelated(self) -> np.ndarray:
"""
Compute decorrelated bandpower estimates (F⁻¹/²y).
Produces uncorrelated estimates with identity covariance matrix.
F^(-1/2) is computed lazily on first call.
Returns
-------
numpy.ndarray
Decorrelated power spectrum estimates with shape (nsims, n_params).
"""
if self.inv_fisher_sqrt is None:
self._compute_inv_fisher_sqrt(self.fisher_normalized)
decorrelated = self.qml_results @ self.inv_fisher_sqrt
return decorrelated
def _get_convolved(self) -> tuple[np.ndarray, np.ndarray, typing.Callable]:
"""
Get raw QML estimates with window matrix for theory comparison.
Returns raw y estimates along with the window matrix W, allowing
comparison with window-convolved theoretical spectra.
Returns
-------
tuple
``(y, W, convolve_theory_func)`` where:
- ``y``: Raw normalized estimates, shape ``(nsims, n_params)``
- ``W``: Window matrix, shape ``(n_params, n_params)``
- ``convolve_theory_func``: Callable accepting either a flat
``cl_theory`` array of length ``n_params`` (ordered per
:meth:`Fisher.get_bandpower_slices`) or a label-keyed dict
like ``{"TT": [...], "EE": [...], ...}``. Returns a flat
array of length ``n_params``.
Notes
-----
With binning enabled, ``n_params = nspectra * nbins``. The theory
input to ``convolve_theory`` must be binned (one value per bin),
e.g. via ``bins.bin_spectra(cl_theory)`` (``cl_theory``
ℓ-indexed). For the flat form, the entries must be ordered the
same way as the Fisher matrix columns; the dict form bypasses
the ordering question entirely.
"""
y = self.qml_results
W = self.fisher_instance.get_window_matrix()
if W is None:
raise ValueError("Window matrix not available from Fisher instance.")
return (y, W, self._make_convolve_theory(W))
[docs]
def get_effective_ells(self) -> np.ndarray | None:
"""
Return effective multipole for each bin.
For unbinned (delta_ell=1), returns integer ells from 2 to lmax.
For binned, returns bin midpoints.
Returns
-------
np.ndarray or None
Effective multipoles, shape (nbins,). None for workers.
"""
if self.rank != 0:
return None
return self.bins.lbin
[docs]
def get_noise_bias(self) -> np.ndarray | None:
"""
Retrieve noise bias estimates for auto-correlation spectra.
Returns
-------
np.ndarray or None
Shape (n_params,). Returns None for workers, cross-correlation
analyses (do_cross=True), or if computation incomplete.
Notes
-----
Noise bias: (1/2) * Tr[N * E_l]. Cross-correlations are bias-free.
"""
if self.rank == 0 and self.qml_noise_bias is not None:
noise_bias = self._normalize_spectra(self.qml_noise_bias)
if self._output_is_dl():
noise_bias = noise_bias * self._dl_factor()
return noise_bias
return None
[docs]
def convolve_theory_for_inference(self, cl_theory: np.ndarray) -> np.ndarray | None:
"""
Apply the QML bandpower window to a per-ℓ theory spectrum.
Returns the expected binned bandpower for the given theory, in
the same convention as ``get_power_spectra(mode="deconvolved")``.
Use this in a Gaussian likelihood for parameter inference:
mu_b(θ) = convolve_theory_for_inference(C_ℓ(θ))
-2 ln L = (d - mu)ᵀ F_b (d - mu)
Parameters
----------
cl_theory : np.ndarray
Per-ℓ theory C_ℓ. Two formats accepted:
- shape ``(n_ell,)`` for ℓ=lmin..lmax (length lmax-lmin+1)
- shape ``(lmax+1,)`` starting at ℓ=0 (entries below ``lmin``
are ignored)
Must be **unbeamed** physical C_ℓ — beam² is already absorbed
into the window.
Returns
-------
cl_binned : np.ndarray of shape (n_bins,), or None
Expected binned bandpower for the given theory. If the
output convention is "Dl", the result is converted to D_ℓ
using the bin effective ells. Returns None on worker
processes (rank != 0).
Notes
-----
This delegates to :meth:`Fisher.get_bandpower_window_function`
on the underlying Fisher instance. The first call triggers a
per-ℓ Fisher computation (cached for subsequent calls).
**Scope**: single-spectrum only. For multi-spectrum likelihoods,
use :meth:`get_power_spectra` (``mode="convolved"``) — the
returned ``convolve_theory_func`` accepts a label-keyed dict
and handles the full ``n_spectra * n_bins`` window matrix.
See :meth:`Fisher.get_bandpower_window_function` for details on
the buffer approach and the multi-spectrum limitation.
Examples
--------
>>> spectra = Spectra("config.yaml", fisher=fisher,
... compression={"method": "harmonic"})
>>> spectra.set_binning(bins)
>>> spectra.run()
>>> cl_th = compute_camb_cl(theta) # ell=0..lmax
>>> mu_b = spectra.convolve_theory_for_inference(cl_th)
>>> data = spectra.get_power_spectra(mode="deconvolved").mean(0)
>>> F_b = spectra.fisher_instance.get_fisher_matrix()
>>> neg2lnL = (data - mu_b) @ F_b @ (data - mu_b)
See Also
--------
Fisher.get_bandpower_window_function : Underlying window matrix.
get_power_spectra : Get the data bandpower (deconvolved mode).
get_covariance : Get the covariance matrix (= F_b⁻¹).
"""
if self.rank != 0:
return None
if not hasattr(self, "fisher_instance") or self.fisher_instance is None:
raise ValueError(
"convolve_theory_for_inference requires an attached Fisher "
"instance. Pass fisher=fisher to Spectra() at construction."
)
if getattr(self.fisher_instance, "fisher", None) is None:
raise ValueError(
"Fisher matrix has not been computed. Call fisher.run() "
"(and spectra.run()) before convolve_theory_for_inference()."
)
lmin = self.params.lmin
n_ell = self.params.lmax - lmin + 1
cl = np.asarray(cl_theory, dtype=np.float64)
if cl.ndim != 1:
raise ValueError(f"cl_theory must be 1D, got shape {cl.shape}")
if cl.size == n_ell:
cl_vec = cl
elif cl.size >= self.params.lmax + 1:
cl_vec = cl[lmin : self.params.lmax + 1]
else:
raise ValueError(
f"cl_theory length {cl.size} does not match expected "
f"{n_ell} (ℓ=lmin..lmax) or {self.params.lmax + 1} (ℓ=0..lmax)"
)
W = self.fisher_instance.get_bandpower_window_function()
if W is None:
return None
cl_binned = W @ cl_vec
if self._output_is_dl():
cl_binned = cl_binned * self._dl_factor()
return cl_binned
[docs]
def get_covariance(self, mode: str = "deconvolved") -> np.ndarray | None:
"""
Get covariance matrix for power spectrum estimates in specified mode.
Returns the covariance matrix appropriate for the normalization mode,
which quantifies the statistical uncertainties and correlations between
different multipole estimates.
Parameters
----------
mode : str, optional
Normalization mode (default: "deconvolved"):
- "deconvolved": Returns F⁻¹, the inverse Fisher matrix with
correlated errors between multipoles.
- "decorrelated": Returns identity matrix I, as decorrelated
estimates have unit variance by construction.
- "convolved": Returns F, the normalized Fisher matrix, which
is the covariance of raw y estimates.
Returns
-------
numpy.ndarray or None
Covariance matrix of shape (n_params, n_params). Returns None for worker
processes (rank != 0) or if matrices are not available.
Raises
------
ValueError
If mode is not one of "deconvolved", "decorrelated", "convolved".
Notes
-----
**Mode-specific covariances:**
| Mode | Covariance | Interpretation |
|------|------------|----------------|
| deconvolved | F⁻¹ | Correlated errors on C_ℓ estimates |
| decorrelated | I | Unit variance (uncorrelated) |
| convolved | F | Covariance of raw y estimates |
The covariance matrix is essential for:
- Computing chi-square statistics
- Parameter estimation from power spectra
- Constructing likelihood functions
- Understanding multipole correlations
Examples
--------
Get covariance for chi-square computation:
>>> cov = spectra.get_covariance(mode="deconvolved")
>>> cl_data = spectra.get_power_spectra(mode="deconvolved")
>>> residuals = np.mean(cl_data, axis=0) - cl_theory
>>> chi2 = residuals @ np.linalg.inv(cov) @ residuals
Decorrelated mode has identity covariance:
>>> cov_decorr = spectra.get_covariance(mode="decorrelated")
>>> # cov_decorr is identity matrix
See Also
--------
get_power_spectra : Get power spectrum estimates
get_error_bars : Get 1-sigma error bars (sqrt of diagonal)
"""
valid_modes = ("deconvolved", "decorrelated", "convolved")
if mode not in valid_modes:
raise ValueError(f"mode must be one of {valid_modes}, got '{mode}'")
if self.rank != 0:
return None
if mode == "deconvolved":
if self.invfisher is None:
return None
cov = self.invfisher.copy()
elif mode == "decorrelated":
if self.invfisher is None:
return None
n_params = self.invfisher.shape[0]
cov = np.eye(n_params)
else: # convolved
if self.fisher_normalized is None:
return None
cov = self.fisher_normalized.copy()
if self._output_is_dl():
d = self._dl_factor()
cov = cov * np.outer(d, d)
return cov
[docs]
def get_error_bars(self, mode: str = "deconvolved") -> np.ndarray | None:
"""
Get 1-sigma error bars for power spectrum estimates.
Returns the marginal standard deviations for each multipole estimate,
computed from the diagonal of the covariance matrix.
Parameters
----------
mode : str, optional
Normalization mode (default: "deconvolved"):
- "deconvolved": sqrt(diag(F⁻¹)) - standard QML errors
- "decorrelated": all ones (unit variance by construction)
- "convolved": sqrt(diag(F)) - errors on raw y estimates
Returns
-------
numpy.ndarray or None
1D array of error bars with shape (n_params,). Returns None for
worker processes (rank != 0) or if covariance not available.
Raises
------
ValueError
If mode is not one of "deconvolved", "decorrelated", "convolved".
Notes
-----
The error bars are computed as:
σ_i = sqrt(Cov_ii)
where Cov is the mode-appropriate covariance matrix. These represent
marginal uncertainties on individual estimates, marginalized over all
other multipoles.
**Important:** For deconvolved and convolved modes, errors are
correlated between multipoles. The full covariance matrix (from
get_covariance) should be used for proper statistical analysis.
For decorrelated mode, error bars are all 1.0 by construction,
making them suitable for simple error bar plots.
Examples
--------
Get error bars for plotting:
>>> errors = spectra.get_error_bars(mode="deconvolved")
>>> cl = np.mean(spectra.get_power_spectra(), axis=0)
>>> plt.errorbar(ell, cl, yerr=errors)
Decorrelated mode has unit errors:
>>> errors_decorr = spectra.get_error_bars(mode="decorrelated")
>>> # All elements are 1.0
See Also
--------
get_covariance : Get full covariance matrix
get_power_spectra : Get power spectrum estimates
"""
cov = self.get_covariance(mode)
if cov is None:
return None
return np.sqrt(np.diag(cov))
[docs]
def convolve_theory(self, cl_theory: np.ndarray) -> np.ndarray | None:
"""
Apply window matrix to theoretical power spectrum.
Convolves a theoretical power spectrum with the QML window matrix,
producing the expected value of the raw QML estimates y for that
theory: <y> = W @ C_theory.
Parameters
----------
cl_theory : numpy.ndarray
Theoretical power spectrum values. Should be a 1D array with
shape (n_params,) matching the QML output dimensions. When
``output_convention="Dl"``, this should be D_ℓ values.
Returns
-------
numpy.ndarray or None
Window-convolved theoretical spectrum with shape (n_params,),
in the same convention as the input. Returns None if window
matrix is not available.
Notes
-----
This method is useful for comparing QML estimates with theoretical
predictions when using the "convolved" normalization mode. Instead
of deconvolving the window function from the data (which can be
numerically unstable), the window is applied to the theory.
The relationship is:
<y> = W @ C_true
where y are the raw QML estimates and W is the window matrix.
When ``output_convention="Dl"``, the window matrix is internally
transformed so the input and output are both in D_ℓ convention.
Examples
--------
Compare convolved estimates with theory:
>>> y, W, _ = spectra.get_power_spectra(mode="convolved")
>>> cl_theory_convolved = spectra.convolve_theory(cl_theory)
>>> residuals = np.mean(y, axis=0) - cl_theory_convolved
See Also
--------
get_power_spectra : Get power spectra (convolved mode returns window)
"""
if self.rank != 0:
return None
W = self.fisher_instance.get_window_matrix()
if W is None:
return None
if self._output_is_dl():
d = self._dl_factor()
return d * (W @ (cl_theory / d))
return W @ cl_theory
[docs]
def write_power_spectra(
self,
mode: str = "deconvolved",
filename: str | None = None,
include_errors: bool = True,
) -> None:
"""
Write power spectra to file in specified normalization mode.
Outputs power spectrum estimates and optionally error bars to text
files. For convolved mode, also writes the window matrix.
Parameters
----------
mode : str, optional
Normalization mode (default: "deconvolved"):
"deconvolved", "decorrelated", or "convolved".
filename : str or None, optional
Output filename. If None, auto-generates based on mode:
"{outclfile_base}_{mode}.{ext}"
include_errors : bool, optional
If True, writes error bars to a separate file (default: True).
For convolved mode, this parameter is ignored.
Notes
-----
**File outputs by mode:**
- deconvolved/decorrelated: Writes mean spectrum across simulations
and optionally error bars from diagonal of covariance.
- convolved: Writes mean raw y estimates and the window matrix W
to separate files (filename and filename_window).
Files are only written on the master process (rank 0).
Examples
--------
Write all modes:
>>> spectra.write_power_spectra(mode="deconvolved")
>>> spectra.write_power_spectra(mode="decorrelated")
>>> spectra.write_power_spectra(mode="convolved")
Custom filename:
>>> spectra.write_power_spectra(
... mode="deconvolved",
... filename="my_results.dat"
... )
See Also
--------
get_power_spectra : Get power spectrum estimates
get_error_bars : Get 1-sigma error bars
"""
if self.rank != 0:
return
valid_modes = ("deconvolved", "decorrelated", "convolved")
if mode not in valid_modes:
raise ValueError(f"mode must be one of {valid_modes}, got '{mode}'")
# Generate default filename
if filename is None:
if hasattr(self.params, "outclfile") and self.params.outclfile:
parts = self.params.outclfile.rsplit(".", 1)
base = parts[0]
ext = parts[1] if len(parts) > 1 else "dat"
filename = f"{base}_{mode}.{ext}"
else:
filename = f"spectra_{mode}.dat"
if mode == "convolved":
self._write_convolved(filename)
else:
spectra = self.get_power_spectra(mode=mode)
if spectra is None:
self.log(f"Cannot write {mode} spectra: not available", level=1)
return
mean_spectra = np.mean(spectra, axis=0)
# Convert to Cl format and write
n_ell = self.params.lmax - self.params.lmin + 1
nspectra = len(mean_spectra) // n_ell
cl_array = np.zeros((n_ell, nspectra), dtype=np.float64)
vec_to_cl(mean_spectra, cl_array)
writecl(filename, cl_array)
self.log(f"Wrote {mode} power spectra to {filename}", level=2)
if include_errors:
errors = self.get_error_bars(mode=mode)
if errors is not None:
error_filename = filename.rsplit(".", 1)
error_filename = (
f"{error_filename[0]}_errors.{error_filename[1]}"
if len(error_filename) > 1
else f"{filename}_errors"
)
error_array = np.zeros((n_ell, nspectra), dtype=np.float64)
vec_to_cl(errors, error_array)
writecl(error_filename, error_array)
self.log(f"Wrote {mode} error bars to {error_filename}", level=2)
def _write_convolved(self, filename: str) -> None:
"""
Write convolved mode outputs: y vector and window matrix.
Parameters
----------
filename : str
Base filename for outputs.
"""
result = self.get_power_spectra(mode="convolved")
if result is None:
self.log("Cannot write convolved spectra: not available", level=1)
return
y, W, _ = result
mean_y = np.mean(y, axis=0)
# Write y vector
np.savetxt(
filename,
mean_y,
header="Raw QML estimates (y vector, mean across simulations)",
)
self.log(f"Wrote convolved y vector to {filename}", level=2)
# Write window matrix
parts = filename.rsplit(".", 1)
window_filename = (
f"{parts[0]}_window.{parts[1]}" if len(parts) > 1 else f"{filename}_window"
)
np.savetxt(
window_filename,
W,
header="Window matrix W: <y> = W @ C_true",
)
self.log(f"Wrote window matrix to {window_filename}", level=2)