Source code for cosmocore.in_out

"""
Input/output operations for cosmological data analysis.

This module provides functions for reading and writing various data formats
used in cosmological analysis, including covariance matrices, power spectra,
masks, geometry files, and map data from FITS files.

Functions
---------
read_covmat
    Read noise covariance matrix from binary file.
write_covmat_reduced
    Write reduced covariance matrix to binary file.
read_mask
    Read HEALPix mask from FITS file.
output_geometry
    Write geometry information to text file.
readcl
    Read power spectra from text file with header.
writecl
    Write power spectra to text file.
write_out_matrix
    Write matrix in formatted text output.
read_maps
    Read map data from multi-simulation FITS files.
get_field_index
    Extract field indices from FITS header information.

Notes
-----
This module handles various file formats commonly used in CMB analysis:
- Binary covariance matrices for noise modeling
- HEALPix FITS files for masks and maps
- Text files for power spectra with flexible header formats
- Multi-simulation FITS files with structured field organization
"""

import os

import healpy as hp
import numpy as np
from astropy.io import fits

from .settings import InputParams


def _ensure_output_directory(filepath):
    """
    Ensure the parent directory of the given filepath exists.

    Parameters
    ----------
    filepath : str
        Path to a file for which the parent directory should be created.
    """
    output_dir = os.path.dirname(filepath.strip())
    if output_dir and not os.path.exists(output_dir):
        os.makedirs(output_dir)


[docs] def read_covmat(covmatfile, npix, nmaps, active, C): """ Read noise covariance matrix from binary file and extract active pixel submatrix. Parameters ---------- covmatfile : str Path to binary file containing full covariance matrix. npix : int Number of pixels per map. nmaps : int Number of maps/fields. active : numpy.ndarray 1D array of active pixel indices in flattened format. C : numpy.ndarray Output array to store the extracted covariance submatrix. Returns ------- numpy.ndarray Filled covariance matrix C for active pixels only. Notes ----- Reads a full-sky covariance matrix stored in binary format and extracts only the relevant submatrix corresponding to active (unmasked) pixels. The input matrix is assumed to be stored as float64 in row-major order. """ full_size = int(npix * nmaps) NCVMfull = np.fromfile(covmatfile.strip(), dtype=np.float64) NCVMfull = NCVMfull.reshape((full_size, full_size)) idx = np.asarray(active, dtype=np.intp) n_active = idx.size # np.ix_ would allocate a full (n_active, n_active) temporary. target_block_bytes = 64 * 1024 * 1024 block_size = max(1, int(np.sqrt(target_block_bytes / NCVMfull.dtype.itemsize))) for row_start in range(0, n_active, block_size): row_stop = min(row_start + block_size, n_active) row_idx = idx[row_start:row_stop] for col_start in range(0, n_active, block_size): col_stop = min(col_start + block_size, n_active) col_idx = idx[col_start:col_stop] C[row_start:row_stop, col_start:col_stop] = NCVMfull[ row_idx[:, np.newaxis], col_idx ] return C
[docs] def read_covmat_reduced(covmatfile, C): """ Read a pre-reduced covariance matrix from binary file. Parameters ---------- covmatfile : str Path to binary file containing the reduced covariance matrix (active pixels only). C : numpy.ndarray Output array of shape (n_active, n_active) to store the matrix. Returns ------- numpy.ndarray Filled covariance matrix C. """ ntot = C.shape[0] filepath = covmatfile.strip() data = np.fromfile(filepath, dtype=np.float64) expected_size = ntot * ntot if data.size != expected_size: expected_bytes = expected_size * data.dtype.itemsize actual_bytes = data.size * data.dtype.itemsize raise ValueError( f"Invalid reduced covariance matrix size in '{filepath}': " f"expected {expected_size} float64 values " f"({expected_bytes} bytes) for shape ({ntot}, {ntot}), " f"but found {data.size} values ({actual_bytes} bytes)." ) data = data.reshape((ntot, ntot)) C[:] = data return C
[docs] def write_covmat_reduced(outcovmatfile, C): """ Write reduced covariance matrix to binary file. Parameters ---------- outcovmatfile : str Output filename for the reduced covariance matrix. C : numpy.ndarray Covariance matrix to write, typically for active pixels only. Notes ----- Writes the matrix in binary format using numpy's tofile method. The output can be read back using numpy.fromfile with appropriate reshaping. """ _ensure_output_directory(outcovmatfile) with open(outcovmatfile.strip(), "wb") as f: C.tofile(f)
[docs] def read_mask(maskfile, mask): """ Read HEALPix mask from FITS file. Parameters ---------- maskfile : str Path to FITS file containing HEALPix mask(s). mask : numpy.ndarray Output array of shape (npix, nmaps) to store mask values. Returns ------- numpy.ndarray Mask array with shape (npix, nmaps) where each column represents a different field mask. Notes ----- Uses HEALPix's read_map function to load mask data and transposes the result to have pixels as rows and maps as columns for consistent indexing with the analysis framework. """ nmaps, _ = mask.shape m = hp.read_map(maskfile.strip(), field=range(nmaps), dtype=np.float64) return np.array(m).T # transpose to (npix, nmaps)
[docs] def output_geometry(filegeometry, npixs, point_vectors, active): """ Write geometry information to text file. Parameters ---------- filegeometry : str Output filename for geometry data. npixs : list of int Number of active pixels for each field. point_vectors : tuple of numpy.ndarray Pointing vectors for each field, each array shape (n_active, 3). active : numpy.ndarray or tuple Active pixel indices for each field. Notes ----- Writes geometry information in a structured text format containing: - Field header with field index, number of active pixels - For each active pixel: original pixel index and 3D pointing vector This format can be used for geometry debugging and external processing. """ nmaps = len(npixs) _ensure_output_directory(filegeometry) with open(filegeometry.strip(), "w") as f: for field_idx in range(nmaps): ntemp = npixs[field_idx] f.write(f"{field_idx + 1:6d}{ntemp:6d}{0:6d}{0:6d}\n") for i in range(ntemp): idx = active[field_idx, i] vec = point_vectors[field_idx][i, :] f.write(f"{idx:6d}{vec[0]:24.16e}{vec[1]:24.16e}{vec[2]:24.16e}\n")
def _parse_convention(value): """Normalize a convention string to 'Cl' or 'Dl' (case-insensitive).""" canonical = {"cl": "Cl", "dl": "Dl"} key = value.strip().lower() if key not in canonical: raise ValueError(f"Unknown spectra convention '{value}'. Must be 'Cl' or 'Dl'.") return canonical[key]
[docs] def convert_spectra_normalization(cls_dict, from_norm, to_norm): """Convert between Cl and Dl = l(l+1)/(2pi) Cl conventions. Operates on ℓ-indexed arrays: ``cls_dict[label][ell]`` is the spectrum value at multipole ``ell``. Indices below the physical floor of the spectrum are expected to be zero. Modifies ``cls_dict`` in place and returns it. Parameters ---------- cls_dict : dict Dictionary mapping spectrum labels to ℓ-indexed power-spectrum arrays. from_norm : str Input convention ("Cl" or "Dl", case-insensitive). to_norm : str Output convention ("Cl" or "Dl", case-insensitive). Returns ------- dict The same ``cls_dict``, modified in place. """ from_norm = _parse_convention(from_norm) to_norm = _parse_convention(to_norm) if from_norm == to_norm: return cls_dict n_ell = next(iter(cls_dict.values())).shape[0] ell = np.arange(n_ell, dtype=np.float64) if from_norm == "Dl" and to_norm == "Cl": # Dl[ell] = ell*(ell+1)/(2pi) * Cl[ell]; inversion undefined at ell=0. # Convention: Dl[0] = 0 ⇒ Cl[0] = 0, so factor[0] = 0 is safe. factor = np.zeros_like(ell) nz = ell >= 1 factor[nz] = 2 * np.pi / (ell[nz] * (ell[nz] + 1)) else: # Cl -> Dl factor = ell * (ell + 1) / (2 * np.pi) for label in cls_dict: cls_dict[label] = cls_dict[label] * factor return cls_dict
[docs] def readcl(inputclfile, Params: InputParams, logger=None, lmax: int | None = None): """ Read power spectra from text file with header. Parameters ---------- inputclfile : str Path to text file containing power spectra data. Params : InputParams Analysis parameters containing lmax and feedback level. logger : CosmoLogger, optional CosmoLogger instance for output. If None, uses print for backward compatibility. lmax : int, optional Maximum multipole to read. If None, uses Params.lmax. This allows loading Cls up to a different lmax than the analysis lmax. Returns ------- dict Dictionary mapping spectrum labels to ℓ-indexed power-spectrum arrays. Each array has length ``effective_lmax + 1``: ``cls_dict[label][ell]`` is C_ℓ. Indices below the file's lowest available ℓ are zero-padded. Raises ------ ValueError If the first line doesn't start with '#' as expected header format. Notes ----- Expected file format: - First line: header starting with '#' containing column labels - Subsequent lines: numerical data with columns for different spectra - 'ell' column is used to position rows at the correct ℓ index when present; otherwise the file is assumed to start at ℓ=2. - Rows for ℓ > ``effective_lmax`` are dropped silently. """ effective_lmax = lmax if lmax is not None else Params.lmax with open(inputclfile.strip()) as f: header = f.readline() if not header.lstrip().startswith("#"): raise ValueError("First line must be a header starting with '#'") labels = header.strip().lstrip("#").split() arr = np.loadtxt(f, dtype=np.float64) if logger is not None: logger.log_with_feedback(f"Read Cls: {arr.shape} {labels}", level=4) ell_col = None for i, label in enumerate(labels): if label.lower() == "ell": ell_col = i break n_out = effective_lmax + 1 cls_dict = {} if ell_col is not None: ell_values = arr[:, ell_col].astype(int) keep = (ell_values >= 0) & (ell_values <= effective_lmax) target_ells = ell_values[keep] else: # No ell column: assume data starts at ell=2. n_fit = max(0, min(arr.shape[0], effective_lmax - 1)) target_ells = None # indicates the "start at ell=2" branch for i, label in enumerate(labels): if label.lower() == "ell": continue arr_out = np.zeros(n_out, dtype=np.float64) if target_ells is not None: arr_out[target_ells] = arr[keep, i] elif n_fit > 0: arr_out[2 : 2 + n_fit] = arr[:n_fit, i] cls_dict[label] = arr_out input_conv = _parse_convention(getattr(Params, "input_convention", "Cl")) if input_conv != "Cl": convert_spectra_normalization(cls_dict, from_norm=input_conv, to_norm="Cl") return cls_dict
[docs] def writecl(filename: str, power_spectra: np.ndarray): """ Write power spectra array to text file. Parameters ---------- filename : str Output filename for power spectra data. power_spectra : numpy.ndarray Power spectra array to write to file. Notes ----- Uses high precision format (%.16e) to preserve full double precision. Could be enhanced to support more sophisticated formats with headers and labels. """ # Use high precision format to preserve full double precision np.savetxt(filename, power_spectra, fmt="%.16e")
[docs] def write_out_matrix(outfilematrix, matrix): """ Write matrix in formatted text output. Parameters ---------- outfilematrix : str Output filename for the formatted matrix. matrix : numpy.ndarray 2D array to write in formatted text format. Notes ----- Writes the matrix with scientific notation formatting (24.16E format) with each row on a separate line. Suitable for matrices that need to be human-readable or imported into other analysis tools. The 16 decimal places preserve full double precision. """ n = matrix.shape[0] _ensure_output_directory(outfilematrix) with open(outfilematrix, "w") as f: for i in range(n): line = "".join(f"{matrix[i, j]:24.16E}" for j in range(n)) f.write(line + "\n")
[docs] def read_maps(maps, filename, pixact, field_labels, calibration: float = 1.0): """ Read map data from files. Supports three formats: 1. Numpy format (.npy): 3D array of shape (n_fields, n_pix, n_sims). The first ``len(field_labels)`` entries along axis 0 are assumed to be ordered exactly as ``field_labels``. 2. Multi-simulation FITS: HDUs named "SIM_XXX" with FIELDS header. 3. Simple HEALPix FITS: Standard HEALPix file (for single simulation). Parameters ---------- maps : numpy.ndarray Output array of shape (n_total_active, n_sims) to store map data. filename : str Path to data file (.npy or .fits). pixact : list of numpy.ndarray List of active pixel indices for each field. field_labels : list of str Labels identifying which fields to read from each simulation. These should be individual field names (e.g., ["T", "Q", "U"]). For numpy format, the first ``len(field_labels)`` entries in the file are assumed to match this ordering exactly. calibration : float, optional Calibration factor to multiply all map values. Default is 1.0. Raises ------ AssertionError If maps array shape doesn't match expected total active pixels. ValueError If field labels don't match expected format or pixel indices are invalid. Notes ----- For numpy format: - File must contain a 3D array of shape (n_fields, n_pix, n_sims) - Fields are ordered to match the config's ``physical_labels`` - No header metadata; field ordering is determined by convention For multi-simulation FITS: - Each HDU named "SIM_XXX" contains one simulation - HDU headers contain "FIELDS" keyword describing field organization For simple HEALPix format (nsims=1): - Reads directly using healpy - Field mapping: T=0, Q=1, U=2, E=1, B=2 Applies calibration factor to all loaded data. """ assert maps.shape[0] == sum(len(p) for p in pixact), "maps array has incorrect shape" nsims = maps.shape[1] if len(field_labels) != len(pixact): raise ValueError( f"Mismatch between pixact length ({len(pixact)}) and " f"field labels ({len(field_labels)}): {field_labels}" ) if filename.endswith(".npy"): _read_maps_numpy(maps, filename, pixact, nsims) else: _read_maps_fits(maps, filename, pixact, field_labels, nsims) maps *= calibration
def _read_maps_numpy(maps, filename, pixact, nsims): """Read maps from numpy .npy file (n_fields, n_pix, n_sims).""" all_data = np.load(filename) n_fields = len(pixact) if all_data.ndim != 3: raise ValueError( f"Numpy map file must be 3D (n_fields, n_pix, n_sims), " f"got shape {all_data.shape}" ) if all_data.shape[0] < n_fields: raise ValueError( f"Numpy map file has {all_data.shape[0]} fields, need at least {n_fields}" ) if all_data.shape[2] < nsims: raise ValueError( f"Numpy map file has {all_data.shape[2]} sims, need at least {nsims}" ) pixact_int = [p.astype(int) for p in pixact] counter = 0 for field_idx in range(n_fields): pixels = pixact_int[field_idx] n_active = pixels.size maps[counter : counter + n_active, :] = all_data[field_idx][pixels, :nsims] counter += n_active def _read_maps_fits(maps, filename, pixact, field_labels, nsims): """Read maps from FITS file (multi-sim or simple HEALPix).""" pixact_int = [p.astype(int) for p in pixact] with fits.open(filename) as hdul: has_sim_format = "SIM_000" in hdul if has_sim_format: # Cache field indices from first sim (same for all sims) field_indices = [] for field_idx in range(len(pixact)): label = field_labels[field_idx] field_indices.append(get_field_index(hdul["SIM_000"], label)) # Precompute output offsets offsets = [] counter = 0 for field_idx in range(len(pixact)): offsets.append(counter) counter += pixact_int[field_idx].size # Read all sims for isim in range(nsims): sim_data = hdul[f"SIM_{isim:03d}"].data for field_idx in range(len(pixact)): pixels = pixact_int[field_idx] n_active = pixels.size maps[offsets[field_idx] : offsets[field_idx] + n_active, isim] = ( sim_data[field_indices[field_idx]][pixels] ) else: if nsims != 1: raise ValueError( f"Simple HEALPix format only supports nsims=1, got nsims={nsims}" ) field_index_map = {"T": 0, "Q": 1, "U": 2, "E": 1, "B": 2} all_maps = hp.read_map(filename, field=None) if all_maps.ndim == 1: all_maps = all_maps.reshape(1, -1) counter = 0 for field_idx in range(len(pixact)): label = field_labels[field_idx] if label not in field_index_map: raise ValueError( f"Unknown field label '{label}' for simple HEALPix format" ) map_idx = field_index_map[label] if map_idx >= all_maps.shape[0]: raise ValueError( f"Field '{label}' (index {map_idx}) not available in file " f"with {all_maps.shape[0]} fields" ) pixels = pixact_int[field_idx] n_active = pixels.size maps[counter : counter + n_active, 0] = all_maps[map_idx][pixels] counter += n_active
[docs] def get_field_index(hdu, field_name): """ Extract field index from FITS HDU header information. Parameters ---------- hdu : astropy.io.fits.HDU FITS HDU containing field information in header. field_name : str Field name to look up (e.g., "T", "Q", "U", "T1", "E2"). Returns ------- int Field index corresponding to the requested field name. Raises ------ ValueError If requested field is not found in the HDU header. Notes ----- Supports two header formats for the "FIELDS" keyword: - Comma-separated: "T,Q,U" or "T1,T2,T3" - Concatenated: "TQU" (for single-character field names) Field expansion is now handled at the InputParams level, so this function expects individual field names only. """ fields_str = hdu.header.get("FIELDS", "") # Detect format: comma-separated vs concatenated if "," in fields_str: # Comma-separated format: "T,Q,U" or "T1,T2,T3" available_fields = [f.strip() for f in fields_str.split(",")] else: # Concatenated format: "TQU" -> ["T", "Q", "U"] available_fields = list(fields_str.strip()) # Look up the field name if field_name not in available_fields: raise ValueError(f"Field '{field_name}' not found in {available_fields}") return available_fields.index(field_name)