Source code for bayspec.data.spectrum

"""OGIP-style spectrum containers for source and background observations."""

from collections import OrderedDict
from copy import deepcopy
import inspect
from io import BytesIO

import astropy.io.fits as fits
import numpy as np

from ..util.info import Info
from ..util.param import Par


[docs] class Spectrum: """Channel-binned spectrum with exposure, quality, and scaling metadata. The object holds raw per-channel counts and errors, the observing exposure, optional quality and grouping flags, the backscale, and a multiplicative scaling parameter used during fitting. Attributes: counts: Per-channel counts. errors: Per-channel uncertainties. exposure: Observing exposure in seconds. quality: Per-channel quality flags; zero marks good channels. grouping: Per-channel OGIP grouping flags. backscale: Ratio of source-to-background extraction geometry. factor: ``Par`` scaling applied during model convolution. """ def __init__( self, counts, errors, exposure, quality=None, grouping=None, backscale=1.0, factor=None, ): """Build a spectrum from raw channel arrays and metadata. Args: counts: 1D array of per-channel counts. errors: 1D array of per-channel errors; same shape as ``counts``. exposure: Observing exposure in seconds. quality: Optional quality flags; defaults to all-good zeros. grouping: Optional OGIP grouping flags; defaults to all-ones. backscale: Source-to-background extraction ratio. factor: Multiplicative ``Par`` applied during fitting. Raises: ValueError: If ``counts``/``errors`` shapes disagree, either is not 1D, ``exposure`` is not numeric, or ``quality``/ ``grouping`` shapes disagree with ``counts``. """ if not (np.ndim(counts) == np.ndim(errors) == 1): raise ValueError('counts and errors must be 1D arrays') if not (np.shape(counts) == np.shape(errors)): raise ValueError('counts and errors must have the same shape') if not isinstance(exposure, (int, float, np.integer, np.floating)): raise ValueError('exposure must be int or float') if quality is None: quality = np.zeros(len(counts)).astype(int) else: if not (np.shape(quality) == np.shape(counts)): raise ValueError('quality must have the same shape with counts') if grouping is None: grouping = np.ones(len(counts)).astype(int) else: if not (np.shape(grouping) == np.shape(counts)): raise ValueError('grouping must have the same shape with counts') self._counts = counts self._errors = errors self._exposure = exposure self._quality = quality self._grouping = grouping self._backscale = backscale self._factor = factor if factor is not None else Par(1, frozen=True) @property def counts(self): return self._counts @property def errors(self): return self._errors @property def exposure(self): return self._exposure @property def quality(self): return self._quality @property def grouping(self): return self._grouping @property def backscale(self): return self._backscale @property def factor(self): return self._factor @factor.setter def factor(self, new_factor): """Set the multiplicative ``Par``; ``None`` resets to a frozen unit factor. Raises: ValueError: If the resolved value is not a ``Par``. """ if new_factor is None: self._factor = Par(1, frozen=True) else: self._factor = new_factor if not isinstance(self._factor, Par): raise ValueError('<factor> parameter should be Param type')
[docs] def set_zero(self): """Zero out both ``counts`` and ``errors`` in place.""" self._counts = np.zeros_like(self._counts).astype(float) self._errors = np.zeros_like(self._errors).astype(float)
@property def info(self): """Return a tabular :class:`Info` summary of name, channels, and counts.""" num_channel = len(self.counts) num_counts = sum(self.counts) info_dict = OrderedDict( [ ('Name', [self.name]), ('Channels', [num_channel]), ('Counts', [num_counts]), ('Exposure', [self.exposure]), ('Backscale', [self.backscale]), ] ) return Info.from_dict(info_dict) @property def name(self): """Best-effort identifier for this spectrum inferred from caller scope.""" return self.get_obj_name()
[docs] def get_obj_name(self): """Walk call frames and return the outermost local name bound to ``self``. Used to label spectra by the variable name the user chose. Returns ``None`` if no binding is found. """ frame = inspect.currentframe() possible_var_names = [] while frame: local_vars = frame.f_locals.items() var_names = [var_name for var_name, var_val in local_vars if var_val is self] if var_names: possible_var_names.extend(var_names) frame = frame.f_back if possible_var_names: return possible_var_names[-1] return None
def __str__(self): return f'*** Spectrum ***\n{self.info.text_table}' def __repr__(self): return self.__str__() def _repr_html_(self): return ( f'{self.info.html_style}' f'<details open>' f'<summary style="margin-bottom: 10px;"><b>Spectrum</b></summary>' f'{self.info.html_table}' f'</details>' )
[docs] class Source(Spectrum): """Source-region spectrum, constructed from OGIP FITS files."""
[docs] @classmethod def from_src(cls, src_file): """Load a source spectrum from a single-row OGIP PHA file. Falls back to ``RATE`` if ``COUNTS`` is absent, and synthesizes Poisson errors if ``STAT_ERR`` is missing. Args: src_file: Path to the PHA file, or a ``BytesIO`` holding its bytes. Returns: A new ``Source`` instance. Raises: ValueError: If ``src_file`` is neither a string nor ``BytesIO``. """ if isinstance(src_file, BytesIO): src_file = deepcopy(src_file) elif isinstance(src_file, str): pass else: raise ValueError('unsupported src_file type') src_hdu = fits.open(src_file, ignore_missing_simple=True) specExt = src_hdu['SPECTRUM'] specData = specExt.data SrcExpo = float(specExt.header['EXPOSURE']) try: SrcCounts = specData['COUNTS'].astype(float) try: Src_StatErr = specData['STAT_ERR'].astype(float) except KeyError: Src_StatErr = np.sqrt(SrcCounts) try: Src_SysErr = specData['SYS_ERR'].astype(float) except KeyError: Src_SysErr = 0 SrcErr = np.sqrt(Src_StatErr**2 + (SrcCounts * Src_SysErr) ** 2) except KeyError: SrcCounts = specData['RATE'].astype(float) * SrcExpo try: Src_StatErr = specData['STAT_ERR'].astype(float) * SrcExpo except KeyError: Src_StatErr = np.sqrt(SrcCounts) try: Src_SysErr = specData['SYS_ERR'].astype(float) except KeyError: Src_SysErr = 0 SrcErr = np.sqrt(Src_StatErr**2 + (SrcCounts * Src_SysErr) ** 2) try: SrcQual = specData['QUALITY'].astype(int) except KeyError: SrcQual = np.zeros(len(SrcCounts)).astype(int) try: SrcGrpg = specData['GROUPING'].astype(int) except KeyError: SrcGrpg = np.ones(len(SrcCounts)).astype(int) try: SrcBackSc = specData['BACKSCAL'].astype(float) except KeyError: try: SrcBackSc = float(specExt.header['BACKSCAL']) except KeyError: SrcBackSc = 1.0 src_hdu.close() return cls(SrcCounts, SrcErr, SrcExpo, SrcQual, SrcGrpg, SrcBackSc)
[docs] @classmethod def from_src2(cls, src_file, ii=None): """Load row ``ii`` of a multi-row PHA2 source spectrum. The row index may be passed explicitly via ``ii`` or appended to the path as ``"file.pha2:ii"``. Args: src_file: Path or ``BytesIO`` to the PHA2 file. ii: Zero-based row index within the ``SPECTRUM`` extension. Returns: A new ``Source`` instance for the selected row. Raises: ValueError: If ``src_file`` is neither a string nor ``BytesIO``. AssertionError: If ``ii`` is not an ``int`` when required. """ if isinstance(src_file, BytesIO): src_file = deepcopy(src_file) assert isinstance(ii, int), 'ii should be int type' elif isinstance(src_file, str): if ':' in src_file: src_file, ii = src_file.split(':') src_file = src_file.strip() ii = int(ii.strip()) else: assert isinstance(ii, int), 'ii should be int type' else: raise ValueError('unsupported src_file type') src_hdu = fits.open(src_file, ignore_missing_simple=True) specExt = src_hdu['SPECTRUM'] specData = specExt.data SrcExpo = specData['EXPOSURE'][ii].astype(float) try: SrcCounts = specData['COUNTS'][ii].astype(float) try: Src_StatErr = specData['STAT_ERR'][ii].astype(float) except KeyError: Src_StatErr = np.sqrt(SrcCounts) try: Src_SysErr = specData['SYS_ERR'][ii].astype(float) except KeyError: Src_SysErr = 0 SrcErr = np.sqrt(Src_StatErr**2 + (SrcCounts * Src_SysErr) ** 2) except KeyError: SrcCounts = specData['RATE'][ii].astype(float) * SrcExpo try: Src_StatErr = specData['STAT_ERR'][ii].astype(float) * SrcExpo except KeyError: Src_StatErr = np.sqrt(SrcCounts) try: Src_SysErr = specData['SYS_ERR'][ii].astype(float) except KeyError: Src_SysErr = 0 SrcErr = np.sqrt(Src_StatErr**2 + (SrcCounts * Src_SysErr) ** 2) try: SrcQual = specData['QUALITY'][ii].astype(int) except KeyError: SrcQual = np.zeros(len(SrcCounts)).astype(int) try: SrcGrpg = specData['GROUPING'][ii].astype(int) except KeyError: SrcGrpg = np.ones(len(SrcCounts)).astype(int) try: SrcBackSc = specData['BACKSCAL'][ii].astype(float) except KeyError: try: SrcBackSc = specExt.header['BACKSCAL'] except KeyError: SrcBackSc = 1.0 src_hdu.close() return cls(SrcCounts, SrcErr, SrcExpo, SrcQual, SrcGrpg, SrcBackSc)
[docs] @classmethod def from_plain(cls, src_file, ii=None): """Dispatch to ``from_src`` or ``from_src2`` based on ``src_file`` form. Args: src_file: PHA/PHA2 path (optionally ``"path:ii"``) or ``BytesIO``. ii: Row index; when given, forces the PHA2 loader. Returns: A new ``Source`` instance. """ if ii is not None: return cls.from_src2(src_file, ii) else: if isinstance(src_file, str) and ':' in src_file: return cls.from_src2(src_file) else: return cls.from_src(src_file)
[docs] class Background(Spectrum): """Background-region spectrum, constructed from OGIP FITS files."""
[docs] @classmethod def from_bkg(cls, bkg_file): """Load a background spectrum from a single-row OGIP PHA file. Mirrors :meth:`Source.from_src`; quality and grouping are not read. Args: bkg_file: Path to the PHA file, or a ``BytesIO``. Returns: A new ``Background`` instance. Raises: ValueError: If ``bkg_file`` is neither a string nor ``BytesIO``. """ if isinstance(bkg_file, BytesIO): bkg_file = deepcopy(bkg_file) elif isinstance(bkg_file, str): pass else: raise ValueError('unsupported bkg_file type') bkg_hdu = fits.open(bkg_file, ignore_missing_simple=True) specExt = bkg_hdu['SPECTRUM'] specData = specExt.data BkgExpo = float(specExt.header['EXPOSURE']) try: BkgCounts = specData['COUNTS'].astype(float) try: Bkg_StatErr = specData['STAT_ERR'].astype(float) except KeyError: Bkg_StatErr = np.sqrt(BkgCounts) try: Bkg_SysErr = specData['SYS_ERR'].astype(float) except KeyError: Bkg_SysErr = 0 BkgErr = np.sqrt(Bkg_StatErr**2 + (BkgCounts * Bkg_SysErr) ** 2) except KeyError: BkgCounts = specData['RATE'].astype(float) * BkgExpo try: Bkg_StatErr = specData['STAT_ERR'].astype(float) * BkgExpo except KeyError: Bkg_StatErr = np.sqrt(BkgCounts) try: Bkg_SysErr = specData['SYS_ERR'].astype(float) except KeyError: Bkg_SysErr = 0 BkgErr = np.sqrt(Bkg_StatErr**2 + (BkgCounts * Bkg_SysErr) ** 2) try: BkgBackSc = specData['BACKSCAL'].astype(float) except KeyError: try: BkgBackSc = float(specExt.header['BACKSCAL']) except KeyError: BkgBackSc = 1.0 bkg_hdu.close() return cls(BkgCounts, BkgErr, BkgExpo, None, None, BkgBackSc)
[docs] @classmethod def from_bkg2(cls, bkg_file, ii=None): """Load row ``ii`` of a multi-row PHA2 background spectrum. Mirrors :meth:`Source.from_src2`. Args: bkg_file: Path or ``BytesIO`` to the PHA2 file; may use the ``"path:ii"`` convention. ii: Zero-based row index. Returns: A new ``Background`` instance. """ if isinstance(bkg_file, BytesIO): bkg_file = deepcopy(bkg_file) assert isinstance(ii, int), 'ii should be int type' elif isinstance(bkg_file, str): if ':' in bkg_file: bkg_file, ii = bkg_file.split(':') bkg_file = bkg_file.strip() ii = int(ii.strip()) else: assert isinstance(ii, int), 'ii should be int type' else: raise ValueError('unsupported bkg_file type') bkg_hdu = fits.open(bkg_file, ignore_missing_simple=True) specExt = bkg_hdu['SPECTRUM'] specData = specExt.data BkgExpo = specData['EXPOSURE'][ii].astype(float) try: BkgCounts = specData['COUNTS'][ii].astype(float) try: Bkg_StatErr = specData['STAT_ERR'][ii].astype(float) except KeyError: Bkg_StatErr = np.sqrt(BkgCounts) try: Bkg_SysErr = specData['SYS_ERR'][ii].astype(float) except KeyError: Bkg_SysErr = 0 BkgErr = np.sqrt(Bkg_StatErr**2 + (BkgCounts * Bkg_SysErr) ** 2) except KeyError: BkgCounts = specData['RATE'][ii].astype(float) * BkgExpo try: Bkg_StatErr = specData['STAT_ERR'][ii].astype(float) * BkgExpo except KeyError: Bkg_StatErr = np.sqrt(BkgCounts) try: Bkg_SysErr = specData['SYS_ERR'][ii].astype(float) except KeyError: Bkg_SysErr = 0 BkgErr = np.sqrt(Bkg_StatErr**2 + (BkgCounts * Bkg_SysErr) ** 2) try: BkgBackSc = specData['BACKSCAL'][ii].astype(float) except KeyError: try: BkgBackSc = specExt.header['BACKSCAL'] except KeyError: BkgBackSc = 1.0 bkg_hdu.close() return cls(BkgCounts, BkgErr, BkgExpo, None, None, BkgBackSc)
[docs] @classmethod def from_plain(cls, bkg_file, ii=None): """Dispatch to ``from_bkg`` or ``from_bkg2`` based on ``bkg_file`` form. Args: bkg_file: PHA/PHA2 path (optionally ``"path:ii"``) or ``BytesIO``. ii: Row index; when given, forces the PHA2 loader. Returns: A new ``Background`` instance. """ if ii is not None: return cls.from_bkg2(bkg_file, ii) else: if isinstance(bkg_file, str) and ':' in bkg_file: return cls.from_bkg2(bkg_file) else: return cls.from_bkg(bkg_file)