Source code for sctrial.preprocessing

from __future__ import annotations

import logging

import numpy as np
import scipy.sparse as sp
from anndata import AnnData

__all__ = ["add_log1p_cpm_layer"]

logger = logging.getLogger(__name__)


[docs] def add_log1p_cpm_layer( adata: AnnData, *, counts_layer: str | None = "counts", out_layer: str = "log1p_cpm", layer_out: str | None = None, # alias for out_layer scale: float = 1e6, overwrite: bool = False, inplace: bool = True, ) -> AnnData: """Add log1p(CPM) normalization as a layer. Parameters ---------- adata AnnData object with raw counts. counts_layer Layer name containing raw counts. If None, uses adata.X. out_layer Output layer name. layer_out Backwards-compatible alias for out_layer. scale CPM scale factor (default 1e6). Must be finite and positive. overwrite Overwrite if out_layer already exists. inplace Modify the input AnnData if True, else return a copy. Returns ------- AnnData The AnnData object with the new log1p(CPM) layer added. Raises ------ ValueError If ``scale`` is not a finite positive number, or if the counts matrix contains negative values. KeyError If ``counts_layer`` is not found in ``adata.layers``. """ if layer_out is not None: out_layer = layer_out # --- Validate scale --- if not np.isfinite(scale) or scale <= 0: raise ValueError(f"scale must be a finite positive number, got {scale!r}.") ad = adata if inplace else adata.copy() if (out_layer in ad.layers) and (not overwrite): logger.info( "Layer '%s' already exists; returning unchanged (pass overwrite=True to recompute).", out_layer, ) return ad # Fetch counts matrix if counts_layer is None: X = ad.X else: if counts_layer not in ad.layers: raise KeyError( f"counts_layer='{counts_layer}' not found in adata.layers. " f"Available layers: {list(ad.layers.keys())}. " f"Either add adata.layers['{counts_layer}']=<counts> or " f"pass counts_layer=None to use adata.X." ) X = ad.layers[counts_layer] # --- Validate counts are non-negative and finite --- if sp.issparse(X): vals = X.data else: vals = np.asarray(X).ravel() if vals.size > 0: if not np.all(np.isfinite(vals)): raise ValueError( "Counts matrix contains non-finite values (NaN or inf). " "log1p(CPM) requires finite non-negative counts." ) if np.min(vals) < 0: raise ValueError( f"Counts matrix contains negative values (min={np.min(vals):.4g}). " "log1p(CPM) requires non-negative counts. " "Pass raw counts, not log-transformed or z-scored data." ) # Compute log1p(CPM) if sp.issparse(X): if not isinstance(X, sp.csr_matrix): X = X.tocsr() libsize = np.asarray(X.sum(axis=1)).reshape(-1) n_zero = int(np.sum(libsize == 0)) if n_zero > 0: logger.warning( "%d cell(s) have zero total counts and will have all-zero " "CPM values. Consider filtering these cells before " "normalization.", n_zero, ) # Safe division: 0/0 → 0 via replacing zero libsizes with inf safe_libsize = libsize.astype(float, copy=True) safe_libsize[safe_libsize == 0] = np.inf X_cpm = X.multiply(scale / safe_libsize.reshape(-1, 1)) X_log = X_cpm.tocsr() X_log.data = np.log1p(X_log.data) else: X = np.asarray(X) libsize = X.sum(axis=1, keepdims=True) n_zero = int(np.sum(libsize == 0)) if n_zero > 0: logger.warning( "%d cell(s) have zero total counts and will have all-zero " "CPM values. Consider filtering these cells before " "normalization.", n_zero, ) # Safe division: 0/0 → 0 via replacing zero libsizes with inf safe_libsize = libsize.astype(float, copy=True) safe_libsize[safe_libsize == 0] = np.inf X_cpm = X / safe_libsize * scale X_log = np.log1p(X_cpm) ad.layers[out_layer] = X_log # Store provenance for reproducibility ad.uns.setdefault("sctrial", {})["log1p_cpm_scale"] = float(scale) return ad