"""Container types that pair spectra with responses and binning policy.
A :class:`DataUnit` bundles one source spectrum with its background,
response (RMF/ARF or combined), statistic choice, channel noticing,
grouping, optional rebinning, and time tag. A :class:`Data` holds an
ordered collection of ``DataUnit`` instances and exposes aggregated
views of their counts, count rates, binned spectra, and responses, which
the inference layer consumes.
Many ``Data`` properties are thin aggregators that broadcast the
identically-named property over every contained ``DataUnit`` -- consult
the ``DataUnit`` property of the same name for the underlying formula.
"""
from collections import OrderedDict
import copy
import inspect
from io import BytesIO
import os
import warnings
import numpy as np
from scipy import special
from ..util.info import Info
from ..util.significance import pgsig, ppsig
from ..util.tools import SuperDict, cached_property, clear_cached_property, json_dump
from .response import Auxiliary, Redistribution, Response
from .spectrum import Background, Source
[docs]
class Data:
"""Ordered collection of :class:`DataUnit` objects.
Indexing with a key returns the stored :class:`DataUnit`; assignment
re-runs the aggregate update so cached views stay consistent.
**Group docstring for aggregator properties.** Every list-valued
property on this class is the per-unit concatenation of the
same-named property on :class:`DataUnit`; for the formulas, read the
unit-level property. The family covers these naming patterns (the
``_re_`` infix uses the re-binned channel grid; the ``_error`` suffix
returns 1-sigma uncertainties; the ``_f64`` suffix forces ``float64``):
- ``ebin``, ``ewidth``, ``nbin``, ``tarr``, ``ngrid``, ``egrid``,
``tgrid``, ``bin_start``, ``bin_stop``;
- ``src_efficiency``, ``bkg_efficiency``, ``alpha``;
- ``src_counts`` / ``src_counts_f64`` / ``src_re_counts`` /
``src_errors`` / ``src_errors_f64`` / ``src_re_errors``;
- ``bkg_counts`` / ``bkg_counts_f64`` / ``bkg_re_counts`` /
``bkg_errors`` / ``bkg_errors_f64`` / ``bkg_re_errors``;
- ``rsp_chbin``, ``rsp_re_chbin``, ``rsp_chbin_mean``,
``rsp_re_chbin_mean``, ``rsp_chbin_width``,
``rsp_re_chbin_width``, ``rsp_chbin_tarr``, ``rsp_re_chbin_tarr``;
- ``rsp_drm``, ``rsp_re_drm``, ``corr_rsp_drm``, ``corr_rsp_re_drm``,
``corr_src_efficiency``/``_f64``, ``corr_bkg_efficiency``/``_f64``;
- ``src_ctsrate``/``_error``, ``src_ctsspec``/``_error`` and their
``_re_`` variants; matching ``bkg_*`` and ``net_*`` families;
- ``npoints``.
``deconv_{pht,flx,erg}spec``/``_error`` and their ``_re_`` variants
are described on :attr:`deconv_phtspec`.
Attributes:
data: ``OrderedDict`` mapping names to :class:`DataUnit` instances.
names: Names of every unit in insertion order.
srcs/bkgs/rsps: Per-unit ``Source``/``Background``/``Response``.
stats/notcs/grpgs/rebns/times/weights: Per-unit metadata lists.
cdicts/pdicts: Per-unit config/parameter dictionaries.
"""
def __init__(self, data=None):
"""Build a container from a list of ``(name, unit)`` tuples or a dict.
Args:
data: ``None``, a list of ``(name, DataUnit)`` tuples, or a
dict mapping names to :class:`DataUnit` instances.
Raises:
ValueError: If ``data`` is not one of the supported types, or
if any unit fails its completeness check.
"""
self.data = data
@property
def data(self):
return self._data
@data.setter
def data(self, new_data):
"""Replace the contents from a list of tuples or a dict and re-extract.
Raises:
ValueError: If ``new_data`` is not ``None``, a list, or a dict.
"""
self._data = OrderedDict()
if new_data is None:
pass
elif isinstance(new_data, list):
for item in new_data:
if isinstance(item, tuple):
self._setitem(*item)
self._update()
elif isinstance(new_data, dict):
for item in new_data.items():
self._setitem(*item)
self._update()
else:
raise ValueError('unsupported data type')
def _setitem(self, key, value):
if not isinstance(value, DataUnit):
raise ValueError('value parameter should be DataUnit type')
if not value.completeness:
raise ValueError('failed for completeness check for dataunit')
value.name = key
self._data[key] = value
def _update(self):
if self.data is None:
raise ValueError('data is None')
self._UPDATE = object()
clear_cached_property(self)
self.names = [name for name in self.data]
self.srcs = [unit.src_ins for unit in self.data.values()]
self.bkgs = [unit.bkg_ins for unit in self.data.values()]
self.rsps = [unit.rsp_ins for unit in self.data.values()]
self.stats = [unit.stat for unit in self.data.values()]
self.notcs = [unit.notc for unit in self.data.values()]
self.grpgs = [unit.grpg for unit in self.data.values()]
self.rebns = [unit.rebn for unit in self.data.values()]
self.times = [unit.time for unit in self.data.values()]
self.weights = np.array([unit.weight for unit in self.data.values()])
self.cdicts = OrderedDict([(name, unit.config) for name, unit in self.data.items()])
self.pdicts = OrderedDict([(name, unit.params) for name, unit in self.data.items()])
@cached_property()
def cfg(self):
"""Flat :class:`SuperDict` of every config parameter across all units."""
cid = 0
cfg = SuperDict()
for config in self.cdicts.values():
for cg in config.values():
cid += 1
cfg[str(cid)] = cg
return cfg
@cached_property()
def par(self):
"""Flat :class:`SuperDict` of every free/fixed parameter across all units."""
pid = 0
par = SuperDict()
for params in self.pdicts.values():
for pr in params.values():
pid += 1
par[str(pid)] = pr
return par
@property
def pvalues(self):
"""Tuple of current parameter values, preserving ``par`` order."""
return tuple([pr.value for pr in self.par.values()])
@property
def all_config(self):
"""List of per-config rows with component, label, and value."""
cid = 0
all_config = list()
for expr, config in self.cdicts.items():
for cl, cg in config.items():
cid += 1
all_config.append(
{'cfg#': str(cid), 'Component': expr, 'Parameter': cl, 'Value': cg.val}
)
return all_config
@property
def all_params(self):
"""List of per-parameter rows with value, prior, posterior, and frozen flag."""
pid = 0
all_params = list()
for expr, params in self.pdicts.items():
for pl, pr in params.items():
pid += 1
all_params.append(
{
'par#': str(pid),
'Component': expr,
'Parameter': pl,
'Value': pr.val,
'Prior': f'{pr.prior_info}',
'Frozen': pr.frozen,
'Posterior': f'{pr.post_info}',
}
)
return all_params
@property
def cfg_info(self):
"""Tabular :class:`Info` view of every configuration parameter."""
all_config = self.all_config.copy()
return Info.from_list_dict(all_config)
@property
def par_info(self):
"""Tabular :class:`Info` view of parameters, posterior column dropped."""
all_params = self.all_params.copy()
all_params = Info.list_dict_to_dict(all_params)
del all_params['Posterior']
return Info.from_dict(all_params)
@property
def info(self):
"""Tabular :class:`Info` view of per-unit noticing, statistic, and time."""
info = OrderedDict()
info['Name'] = [key for key in self.data]
info['Noticing'] = [unit.notc for unit in self.data.values()]
info['Statistic'] = [unit.stat for unit in self.data.values()]
info['Grouping'] = [unit.grpg for unit in self.data.values()]
info['Time'] = [unit.time for unit in self.data.values()]
return Info.from_dict(info)
[docs]
def save(self, savepath):
"""Dump the per-unit info table to ``<savepath>/data.json``.
Args:
savepath: Directory path. Created if missing.
"""
if not os.path.exists(savepath):
os.makedirs(savepath)
json_dump(self.info.data_list_dict, savepath + '/data.json')
@property
def fit_with(self):
"""Return the ``Model`` this data is bound to, or raise if unset.
Raises:
AttributeError: If no model has been assigned.
"""
try:
return self._fit_with
except AttributeError:
raise AttributeError('no model fit with') from None
@fit_with.setter
def fit_with(self, new_model):
"""Bind a ``Model`` to this data and keep the back-reference in sync.
Raises:
ValueError: If ``new_model`` is not a ``Model``.
"""
from ..model.model import Model
self._fit_with = new_model
if not isinstance(self._fit_with, Model):
raise ValueError('fit_with argument should be Model type!')
try:
_ = self._fit_with.fit_to
except AttributeError:
self._fit_with.fit_to = self
else:
if self._fit_with.fit_to != self:
self._fit_with.fit_to = self
@cached_property()
def ebin(self):
return np.vstack([unit.ebin for unit in self.data.values()])
@cached_property()
def ewidth(self):
return self.ebin[:, 1] - self.ebin[:, 0]
@cached_property()
def nbin(self):
return [unit.nbin for unit in self.data.values()]
@cached_property()
def bin_start(self):
return np.cumsum([0, *self.nbin])[:-1]
@cached_property()
def bin_stop(self):
return np.cumsum([0, *self.nbin])[1:]
@cached_property()
def tarr(self):
return np.hstack([unit.tarr for unit in self.data.values()])
@cached_property()
def ngrid(self):
return 5
@cached_property()
def egrid(self):
scale = np.linspace(0.0, 1.0, self.ngrid, dtype=float)
egrid = self.ebin[:, [0]] + (self.ebin[:, [1]] - self.ebin[:, [0]]) * scale
np.maximum(egrid, 1e-10, out=egrid)
return egrid.ravel()
@cached_property()
def tgrid(self):
return np.repeat(self.tarr[:, None], self.ngrid, axis=1).ravel()
@cached_property()
def src_efficiency(self):
return [unit.src_efficiency for unit in self.data.values()]
@cached_property()
def bkg_efficiency(self):
return [unit.bkg_efficiency for unit in self.data.values()]
@cached_property()
def alpha(self):
return [unit.alpha for unit in self.data.values()]
@cached_property()
def src_counts(self):
return [unit.src_counts for unit in self.data.values()]
@cached_property()
def src_counts_f64(self):
return [np.float64(unit.src_counts) for unit in self.data.values()]
@cached_property()
def src_re_counts(self):
return [unit.src_re_counts for unit in self.data.values()]
@cached_property()
def src_errors(self):
return [unit.src_errors for unit in self.data.values()]
@cached_property()
def src_errors_f64(self):
return [np.float64(unit.src_errors) for unit in self.data.values()]
@cached_property()
def src_re_errors(self):
return [unit.src_re_errors for unit in self.data.values()]
@cached_property()
def bkg_counts(self):
return [unit.bkg_counts for unit in self.data.values()]
@cached_property()
def bkg_counts_f64(self):
return [np.float64(unit.bkg_counts) for unit in self.data.values()]
@cached_property()
def bkg_re_counts(self):
return [unit.bkg_re_counts for unit in self.data.values()]
@cached_property()
def bkg_errors(self):
return [unit.bkg_errors for unit in self.data.values()]
@cached_property()
def bkg_errors_f64(self):
return [np.float64(unit.bkg_errors) for unit in self.data.values()]
@cached_property()
def bkg_re_errors(self):
return [unit.bkg_re_errors for unit in self.data.values()]
@cached_property()
def rsp_chbin(self):
return [unit.rsp_chbin for unit in self.data.values()]
@cached_property()
def rsp_re_chbin(self):
return [unit.rsp_re_chbin for unit in self.data.values()]
@cached_property()
def rsp_chbin_mean(self):
return [unit.rsp_chbin_mean for unit in self.data.values()]
@cached_property()
def rsp_re_chbin_mean(self):
return [unit.rsp_re_chbin_mean for unit in self.data.values()]
@cached_property()
def rsp_chbin_width(self):
return [unit.rsp_chbin_width for unit in self.data.values()]
@cached_property()
def rsp_re_chbin_width(self):
return [unit.rsp_re_chbin_width for unit in self.data.values()]
@cached_property()
def rsp_chbin_tarr(self):
return [unit.rsp_chbin_tarr for unit in self.data.values()]
@cached_property()
def rsp_re_chbin_tarr(self):
return [unit.rsp_re_chbin_tarr for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def rsp_drm(self):
return [unit.rsp_drm for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def rsp_re_drm(self):
return [unit.rsp_re_drm for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def corr_rsp_drm(self):
return [unit.corr_rsp_drm for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def corr_rsp_re_drm(self):
return [unit.corr_rsp_re_drm for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def corr_src_efficiency(self):
return [unit.corr_src_efficiency for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def corr_src_efficiency_f64(self):
return [np.float64(unit.corr_src_efficiency) for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def corr_bkg_efficiency(self):
return [unit.corr_bkg_efficiency for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def corr_bkg_efficiency_f64(self):
return [np.float64(unit.corr_bkg_efficiency) for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def src_ctsrate(self):
return [unit.src_ctsrate for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def src_re_ctsrate(self):
return [unit.src_re_ctsrate for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def src_ctsrate_error(self):
return [unit.src_ctsrate_error for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def src_re_ctsrate_error(self):
return [unit.src_re_ctsrate_error for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def src_ctsspec(self):
return [unit.src_ctsspec for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def src_re_ctsspec(self):
return [unit.src_re_ctsspec for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def src_ctsspec_error(self):
return [unit.src_ctsspec_error for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def src_re_ctsspec_error(self):
return [unit.src_re_ctsspec_error for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def bkg_ctsrate(self):
return [unit.bkg_ctsrate for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def bkg_re_ctsrate(self):
return [unit.bkg_re_ctsrate for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def bkg_ctsrate_error(self):
return [unit.bkg_ctsrate_error for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def bkg_re_ctsrate_error(self):
return [unit.bkg_re_ctsrate_error for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def bkg_ctsspec(self):
return [unit.bkg_ctsspec for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def bkg_re_ctsspec(self):
return [unit.bkg_re_ctsspec for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def bkg_ctsspec_error(self):
return [unit.bkg_ctsspec_error for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def bkg_re_ctsspec_error(self):
return [unit.bkg_re_ctsspec_error for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def net_ctsrate(self):
return [unit.net_ctsrate for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def net_re_ctsrate(self):
return [unit.net_re_ctsrate for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def net_ctsrate_error(self):
return [unit.net_ctsrate_error for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def net_re_ctsrate_error(self):
return [unit.net_re_ctsrate_error for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def net_ctsspec(self):
return [unit.net_ctsspec for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def net_re_ctsspec(self):
return [unit.net_re_ctsspec for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def net_ctsspec_error(self):
return [unit.net_ctsspec_error for unit in self.data.values()]
@cached_property(lambda self: self.pvalues)
def net_re_ctsspec_error(self):
return [unit.net_re_ctsspec_error for unit in self.data.values()]
@cached_property()
def npoints(self):
return np.array([unit.npoint for unit in self.data.values()])
[docs]
def net_counts_upperlimit(self, cl=0.9):
"""Per-unit net-counts upper limit at confidence level ``cl``.
Args:
cl: Target confidence level in ``(0, 1)``.
Returns:
List of per-unit upper limits.
"""
return [unit.net_counts_upperlimit(cl) for unit in self.data.values()]
[docs]
def net_ctsrate_upperlimit(self, cl=0.9):
"""Per-unit net-count-rate upper limit at confidence level ``cl``."""
return [unit.net_ctsrate_upperlimit(cl) for unit in self.data.values()]
@property
def deconv_phtspec(self):
"""Deconvolved photon spectrum: net counts × model ``cts_to_pht``.
Each deconvolved-spectrum property (``deconv_phtspec``,
``deconv_flxspec``, ``deconv_ergspec``, their ``_error`` and
``_re_`` variants) multiplies per-unit net counts by the conversion
factor published by the bound model.
"""
return [
factor * cts
for (factor, cts) in zip(self.fit_with.cts_to_pht, self.net_ctsspec, strict=True)
]
@property
def deconv_re_phtspec(self):
return [
factor * cts
for (factor, cts) in zip(self.fit_with.re_cts_to_pht, self.net_re_ctsspec, strict=True)
]
@property
def deconv_phtspec_error(self):
return [
factor * cts
for (factor, cts) in zip(self.fit_with.cts_to_pht, self.net_ctsspec_error, strict=True)
]
@property
def deconv_re_phtspec_error(self):
return [
factor * cts
for (factor, cts) in zip(
self.fit_with.re_cts_to_pht, self.net_re_ctsspec_error, strict=True
)
]
@property
def deconv_flxspec(self):
return [
factor * cts
for (factor, cts) in zip(self.fit_with.cts_to_flx, self.net_ctsspec, strict=True)
]
@property
def deconv_re_flxspec(self):
return [
factor * cts
for (factor, cts) in zip(self.fit_with.re_cts_to_flx, self.net_re_ctsspec, strict=True)
]
@property
def deconv_flxspec_error(self):
return [
factor * cts
for (factor, cts) in zip(self.fit_with.cts_to_flx, self.net_ctsspec_error, strict=True)
]
@property
def deconv_re_flxspec_error(self):
return [
factor * cts
for (factor, cts) in zip(
self.fit_with.re_cts_to_flx, self.net_re_ctsspec_error, strict=True
)
]
@property
def deconv_ergspec(self):
return [
factor * cts
for (factor, cts) in zip(self.fit_with.cts_to_erg, self.net_ctsspec, strict=True)
]
@property
def deconv_re_ergspec(self):
return [
factor * cts
for (factor, cts) in zip(self.fit_with.re_cts_to_erg, self.net_re_ctsspec, strict=True)
]
@property
def deconv_ergspec_error(self):
return [
factor * cts
for (factor, cts) in zip(self.fit_with.cts_to_erg, self.net_ctsspec_error, strict=True)
]
@property
def deconv_re_ergspec_error(self):
return [
factor * cts
for (factor, cts) in zip(
self.fit_with.re_cts_to_erg, self.net_re_ctsspec_error, strict=True
)
]
@property
def expr(self):
"""Best-effort identifier for this container inferred from caller scope."""
return self.get_obj_name()
[docs]
def get_obj_name(self):
"""Walk call frames and return the outermost local name bound to ``self``.
Returns ``None`` if no binding is found.
"""
frame = inspect.currentframe()
possible_var_names = []
while frame:
local_vars = frame.f_locals.items()
var_names = [var_name for var_name, var_val in local_vars if var_val is self]
if var_names:
possible_var_names.extend(var_names)
frame = frame.f_back
if possible_var_names:
return possible_var_names[-1]
return None
def __getitem__(self, key):
"""Return the stored :class:`DataUnit` registered under ``key``."""
return self._data[key]
def __setitem__(self, key, value):
"""Register ``value`` under ``key`` and re-run the aggregate update."""
self._setitem(key, value)
self._update()
def __delitem__(self, key):
"""Remove the entry under ``key`` and re-run the aggregate update."""
del self._data[key]
self._update()
def __contains__(self, key):
"""Return ``True`` when ``key`` is registered."""
return key in self._data
def __str__(self):
return (
f'*** Data ***\n'
f'{self.info.text_table}\n'
f'*** Data Parameters ***\n'
f'{self.par_info.text_table}'
)
def __repr__(self):
return self.__str__()
def _repr_html_(self):
return (
f'{self.info.html_style}'
f'<details open>'
f'<summary style="margin-bottom: 10px;"><b>Data</b></summary>'
f'{self.info.html_table}'
f'<details open style="margin-top: 10px;">'
f'<summary style="margin-bottom: 10px;"><b>Data Parameters</b></summary>'
f'{self.par_info.html_table}'
f'</details>'
f'</details>'
)
[docs]
class DataUnit:
"""One source/background/response triplet with binning and statistic.
Each of the constructor's ``src``/``bkg``/``rmf``/``arf``/``rsp`` slots
accepts either an already-constructed domain object (``Source``,
``Background``, ``Redistribution``, ``Auxiliary``, ``Response``) or a
file handle. File handles may be a path string, a ``BytesIO``, or a
``(file, index)`` tuple for multi-row PHA2/RSP2/ARF2 archives.
Changing any public attribute re-runs :meth:`_update`, which rebuilds
noticing, grouping, rebinning, and invalidates cached views.
**Group docstring for computed properties.** One-line computed
properties are either trivial passes to an underlying object or a
direct numerical shortcut; read the member for the formula. The
``_re_`` infix toggles between the grouping-based grid and the
rebinning-based grid. The family covers:
- ``ebin``, ``ewidth``, ``nbin``, ``tarr``, ``ngrid``, ``egrid``,
``tgrid`` -- energy/time grids used by the model convolution;
- ``src_efficiency``, ``bkg_efficiency``, ``alpha`` -- exposure and
backscale combinations;
- ``src_counts``/``_re_`` and ``src_errors``/``_re_``, matching
``bkg_counts``/``bkg_errors`` -- grouping-aggregated counts/errors;
- ``rsp_chbin``/``_re_``, their ``_mean``/``_width``/``_tarr`` --
channel-bin views of the response;
- ``rsp_drm``/``_re_`` plus ``corr_rsp_drm``/``_re_`` -- response
matrices (``corr_`` applies the ``rsp`` factor);
- ``corr_src_efficiency``/``corr_bkg_efficiency`` -- efficiencies
multiplied by the relevant ``Par`` factor;
- ``src_ctsrate``/``_error`` and ``src_ctsspec``/``_error`` plus
their ``_re_`` variants; matching ``bkg_*`` and ``net_*`` families.
Attributes:
src_ins, bkg_ins, rmf_ins, arf_ins, rsp_ins: Concrete loaded
spectrum/response objects.
qualifying/noticing/grouping/rebining: Per-channel flag arrays
driving ``grouping_slice``/``rebining_slice``.
config/params: Per-unit config and fit parameters.
"""
def __init__(
self,
src,
bkg=None,
rmf=None,
arf=None,
rsp=None,
stat='pgstat',
notc=None,
grpg=None,
rebn=None,
time=None,
weight=1,
):
"""Initialise a unit and run the first :meth:`_update` pass.
Args:
src: Source spectrum or handle.
bkg: Optional background; defaults to a zeroed copy of ``src``.
rmf: Optional redistribution matrix.
arf: Optional auxiliary response.
rsp: Optional full response; synthesized from ``rmf * arf`` if
missing and both are available.
stat: Statistic name -- ``'pgstat'``, ``'pstat'``, ``'cstat'``,
``'gstat'``, ``'chi2'``, or a variant thereof.
notc: Optional noticing windows in channel energy.
grpg: Optional grouping rule dict (``min_evt``/``min_nevt``/
``min_sigma``/``max_bin``).
rebn: Optional rebinning rule dict with the same keys.
time: Optional time tag used by time-dependent models.
weight: Likelihood weight for this unit.
"""
self._src = src
self._bkg = bkg
self._rmf = rmf
self._arf = arf
self._rsp = rsp
self._stat = stat
self._notc = notc
self._grpg = grpg
self._rebn = rebn
self._time = time
self._weight = weight
self._update()
@property
def src(self):
return self._src
@src.setter
def src(self, new_src):
self._src = new_src
self._update()
def _update_src(self):
if isinstance(self._src, Source):
self.src_ins = self._src
self.src_name = self.src_ins.name
else:
if isinstance(self._src, tuple):
self.src_file, self.src_ii = self._src
else:
self.src_file = self._src
self.src_ii = None
if isinstance(self.src_file, BytesIO):
self.src_name = self.src_file.name
elif isinstance(self.src_file, str):
self.src_name = self.src_file.split('/')[-1]
else:
raise ValueError('unsupported src file type')
self.src_ins = Source.from_plain(self.src_file, self.src_ii)
@property
def bkg(self):
return self._bkg
@bkg.setter
def bkg(self, new_bkg):
self._bkg = new_bkg
self._update()
def _update_bkg(self):
if self._bkg is None:
self.src_ins_copy = copy.deepcopy(self.src_ins)
self.bkg_ins = Background(
self.src_ins_copy.counts,
self.src_ins_copy.errors,
self.src_ins_copy.exposure,
self.src_ins_copy.quality,
self.src_ins_copy.grouping,
self.src_ins_copy.backscale,
)
self.bkg_ins.set_zero()
self.bkg_name = None
else:
if isinstance(self._bkg, Background):
self.bkg_ins = self._bkg
self.bkg_name = self.bkg_ins.name
else:
if isinstance(self._bkg, tuple):
self.bkg_file, self.bkg_ii = self._bkg
else:
self.bkg_file = self._bkg
self.bkg_ii = None
if isinstance(self.bkg_file, BytesIO):
self.bkg_name = self.bkg_file.name
elif isinstance(self.bkg_file, str):
self.bkg_name = self.bkg_file.split('/')[-1]
else:
raise ValueError('unsupported bkg file type')
self.bkg_ins = Background.from_plain(self.bkg_file, self.bkg_ii)
@property
def rmf(self):
return self._rmf
@rmf.setter
def rmf(self, new_rmf):
self._rmf = new_rmf
self._update()
def _update_rmf(self):
if self._rmf is None:
self.rmf_name = None
self.rmf_ins = None
else:
if isinstance(self._rmf, Redistribution):
self.rmf_ins = self._rmf
self.rmf_name = self.rmf_ins.name
else:
if isinstance(self._rmf, tuple):
self.rmf_file, self.rmf_ii = self._rmf
else:
self.rmf_file = self._rmf
self.rmf_ii = None
if isinstance(self.rmf_file, BytesIO):
self.rmf_name = self.rmf_file.name
elif isinstance(self.rmf_file, str):
self.rmf_name = self.rmf_file.split('/')[-1]
else:
raise ValueError('unsupported rmf file type')
self.rmf_ins = Redistribution.from_plain(self.rmf_file, self.rmf_ii)
@property
def arf(self):
return self._arf
@arf.setter
def arf(self, new_arf):
self._arf = new_arf
self._update()
def _update_arf(self):
if self._arf is None:
self.arf_name = None
self.arf_ins = None
else:
if isinstance(self._arf, Auxiliary):
self.arf_ins = self._arf
self.arf_name = self.arf_ins.name
else:
if isinstance(self._arf, tuple):
self.arf_file, self.arf_ii = self._arf
else:
self.arf_file = self._arf
self.arf_ii = None
if isinstance(self.arf_file, BytesIO):
self.arf_name = self.arf_file.name
elif isinstance(self.arf_file, str):
self.arf_name = self.arf_file.split('/')[-1]
else:
raise ValueError('unsupported arf file type')
self.arf_ins = Auxiliary.from_plain(self.arf_file, self.arf_ii)
@property
def rsp(self):
return self._rsp
@rsp.setter
def rsp(self, new_rsp):
self._rsp = new_rsp
self._update()
def _update_rsp(self):
if self._rsp is None:
self.rsp_name = None
if self.rmf is not None and self.arf is not None:
self.rsp_ins = self.rmf_ins * self.arf_ins
else:
self.rsp_ins = None
warnings.warn(f'response is missing for spectrum {self.src_name}', stacklevel=2)
else:
if isinstance(self._rsp, Response):
self.rsp_ins = self._rsp
self.rsp_name = self.rsp_ins.name
else:
if isinstance(self._rsp, tuple):
self.rsp_file, self.rsp_ii = self._rsp
else:
self.rsp_file = self._rsp
self.rsp_ii = None
if isinstance(self.rsp_file, BytesIO):
self.rsp_name = self.rsp_file.name
elif isinstance(self.rsp_file, str):
self.rsp_name = self.rsp_file.split('/')[-1]
else:
raise ValueError('unsupported rsp file type')
self.rsp_ins = Response.from_plain(self.rsp_file, self.rsp_ii)
@property
def completeness(self):
"""``True`` when both source and response are populated."""
return not (self.src_ins is None or self.rsp_ins is None)
@property
def stat(self):
return self._stat
@stat.setter
def stat(self, new_stat):
self._stat = new_stat
self._update()
def _update_stat(self):
if not isinstance(self._stat, str):
raise ValueError('<stat> parameter should be str')
def _update_qual(self):
# quality flag:
# qual == 0 if the data quality is good -> True
# qual != 0 if the data quality is bad -> False
self.qualifying = list(np.array(self.src_ins.quality) == 0)
@property
def notc(self):
return self._notc
@notc.setter
def notc(self, new_notc):
self._notc = new_notc
self._update()
def _update_notc(self):
if self._notc is not None:
if not isinstance(self._notc, (list, np.ndarray)):
raise ValueError('<notc> parameter should be list or array')
else:
self._notc = list(self._notc)
if type(self._notc[0]) is not list:
self._notc = [self._notc]
else:
self._notc = self._union(self._notc)
self.noticing = self._notice(self.rsp_ins.chbin, self._notc)
@property
def grpg(self):
return self._grpg
@grpg.setter
def grpg(self, new_grpg):
self._grpg = new_grpg
self._update()
def _update_grpg(self):
if self._grpg is not None and not isinstance(self._grpg, dict):
raise ValueError('<grpg> parameter should be dict')
if self._grpg is None:
self.grouping = self.src_ins.grouping
else:
gr_params = {'min_evt': None, 'min_nevt': None, 'min_sigma': None, 'max_bin': None}
gr_params.update(self._grpg)
ini_flag = (np.array(self.qualifying) & np.array(self.noticing)).astype(int).tolist()
self.grouping = self._group(
self.src_ins.counts,
self.bkg_ins.counts,
self.bkg_ins.errors,
self.src_ins.exposure,
self.bkg_ins.exposure,
self.src_ins.backscale,
self.bkg_ins.backscale,
min_evt=gr_params['min_evt'],
min_nevt=gr_params['min_nevt'],
min_sigma=gr_params['min_sigma'],
max_bin=gr_params['max_bin'],
stat=self.stat,
ini_flag=ini_flag,
)
self.grouping_slice = []
for i, (ql, nt, gr) in enumerate(
zip(self.qualifying, self.noticing, self.grouping, strict=False)
):
if not (ql and nt):
continue
else:
if gr == 0:
continue
elif gr == 1:
self.grouping_slice.append([i, i + 1])
elif gr == -1:
if len(self.grouping_slice) == 0:
self.grouping_slice.append([i, i + 1])
else:
self.grouping_slice[-1][1] = i + 1
self.grouping_slice = np.array(self.grouping_slice, dtype=int)
@property
def rebn(self):
return self._rebn
@rebn.setter
def rebn(self, new_rebn):
self._rebn = new_rebn
self._update()
def _update_rebn(self):
if self._rebn is not None and not isinstance(self._rebn, dict):
raise ValueError('<rebn> parameter should be dict')
if self._rebn is None:
self.rebining = self.grouping
else:
rb_params = {'min_evt': None, 'min_nevt': None, 'min_sigma': None, 'max_bin': None}
rb_params.update(self._rebn)
ini_flag = (np.array(self.qualifying) & np.array(self.noticing)).astype(int).tolist()
self.rebining = self._rebin(
self.src_ins.counts,
self.bkg_ins.counts,
self.bkg_ins.errors,
self.src_ins.exposure,
self.bkg_ins.exposure,
self.src_ins.backscale,
self.bkg_ins.backscale,
min_evt=rb_params['min_evt'],
min_nevt=rb_params['min_nevt'],
min_sigma=rb_params['min_sigma'],
max_bin=rb_params['max_bin'],
stat=self.stat,
ini_flag=ini_flag,
)
self.rebining_slice = []
for i, (ql, nt, rb) in enumerate(
zip(self.qualifying, self.noticing, self.rebining, strict=False)
):
if not (ql and nt):
continue
else:
if rb == 0:
continue
elif rb == 1:
self.rebining_slice.append([i, i + 1])
elif rb == -1:
if len(self.rebining_slice) == 0:
self.rebining_slice.append([i, i + 1])
else:
self.rebining_slice[-1][1] = i + 1
self.rebining_slice = np.array(self.rebining_slice, dtype=int)
@property
def time(self):
return self._time
@time.setter
def time(self, new_time):
self._time = new_time
self._update()
def _update_time(self):
if self._time is not None and not isinstance(self._time, (int, float)):
raise ValueError('<time> parameter should be int or float')
@property
def weight(self):
return self._weight
@weight.setter
def weight(self, new_weight):
self._weight = new_weight
self._update()
def _update_weight(self):
if not isinstance(self._weight, (int, float)):
raise ValueError('<weight> parameter should be int or float')
def _update_params(self):
self.params = OrderedDict()
self.params['sf'] = self.src_ins.factor
self.params['bf'] = self.bkg_ins.factor
self.params['rf'] = self.rsp_ins.factor
self.params['ra'] = self.rsp_ins.ra
self.params['dec'] = self.rsp_ins.dec
def _update_config(self):
self.config = OrderedDict()
def _update(self):
self._UPDATE = object()
clear_cached_property(self)
self._update_src()
self._update_bkg()
self._update_rmf()
self._update_arf()
self._update_rsp()
self._update_stat()
self._update_time()
self._update_weight()
if self.completeness:
self._update_qual()
self._update_notc()
self._update_grpg()
self._update_rebn()
self._update_config()
self._update_params()
else:
self.qualifying = None
self.noticing = None
self.grouping = None
self.rebining = None
self.grouping_slice = None
self.rebining_slice = None
self.config = None
self.params = None
warnings.warn('data unit is uncomplete with missing src or rsp!', stacklevel=2)
@cached_property()
def cfg(self):
""":class:`SuperDict` indexed view of this unit's config parameters."""
cfg = SuperDict()
for cid, cg in enumerate(self.config.values(), 1):
cfg[str(cid)] = cg
return cfg
@cached_property()
def par(self):
""":class:`SuperDict` indexed view of this unit's fit parameters."""
par = SuperDict()
for pid, pr in enumerate(self.params.values(), 1):
par[str(pid)] = pr
return par
@property
def pvalues(self):
"""Tuple of current parameter values, preserving ``par`` order."""
return tuple([pr.value for pr in self.par.values()])
@property
def info(self):
"""Tabular :class:`Info` view of the unit's file names and settings."""
info_dict = OrderedDict()
info_dict['src'] = self.src_name
info_dict['bkg'] = self.bkg_name
info_dict['rmf'] = self.rmf_name
info_dict['arf'] = self.arf_name
info_dict['rsp'] = self.rsp_name
info_dict['notc'] = self.notc
info_dict['stat'] = self.stat
info_dict['grpg'] = self.grpg
info_dict['time'] = self.time
info_dict['weight'] = self.weight
info_dict = OrderedDict(
[('Property', list(info_dict.keys())), (self.name, list(info_dict.values()))]
)
return Info.from_dict(info_dict)
[docs]
def save(self, savepath):
"""Dump the unit's info table to ``<savepath>/dataunit.json``.
Args:
savepath: Directory path. Created if missing.
"""
if not os.path.exists(savepath):
os.makedirs(savepath)
json_dump(self.info.data_list_dict, savepath + '/dataunit.json')
@cached_property()
def ebin(self):
return np.array(self.rsp_ins.phbin, dtype=float)
@cached_property()
def ewidth(self):
return self.ebin[:, 1] - self.ebin[:, 0]
@cached_property()
def nbin(self):
return self.rsp_ins.phbin.shape[0]
@cached_property()
def tarr(self):
return np.repeat(self.time, self.nbin)
@cached_property()
def ngrid(self):
return 5
@cached_property()
def egrid(self):
scale = np.linspace(0.0, 1.0, self.ngrid, dtype=float)
egrid = self.ebin[:, [0]] + (self.ebin[:, [1]] - self.ebin[:, [0]]) * scale
np.maximum(egrid, 1e-10, out=egrid)
return egrid.ravel()
@cached_property()
def tgrid(self):
return np.repeat(self.tarr[:, None], self.ngrid, axis=1).ravel()
@cached_property()
def src_efficiency(self):
return self.src_ins.exposure
@cached_property()
def bkg_efficiency(self):
return self.bkg_ins.exposure * self.bkg_ins.backscale / self.src_ins.backscale
@cached_property()
def alpha(self):
return self.src_efficiency / self.bkg_efficiency
@cached_property()
def src_counts(self):
src_cts_cumsum = np.r_[0, np.cumsum(self.src_ins.counts)]
gr_starts, gr_stops = self.grouping_slice.T
return src_cts_cumsum[gr_stops] - src_cts_cumsum[gr_starts]
@cached_property()
def src_re_counts(self):
src_cts_cumsum = np.r_[0, np.cumsum(self.src_ins.counts)]
rb_starts, rb_stops = self.rebining_slice.T
return src_cts_cumsum[rb_stops] - src_cts_cumsum[rb_starts]
@cached_property()
def src_errors(self):
src_err_cumsum = np.r_[0, np.cumsum(self.src_ins.errors**2)]
gr_starts, gr_stops = self.grouping_slice.T
return np.sqrt(src_err_cumsum[gr_stops] - src_err_cumsum[gr_starts])
@cached_property()
def src_re_errors(self):
src_err_cumsum = np.r_[0, np.cumsum(self.src_ins.errors**2)]
rb_starts, rb_stops = self.rebining_slice.T
return np.sqrt(src_err_cumsum[rb_stops] - src_err_cumsum[rb_starts])
@cached_property()
def bkg_counts(self):
bkg_cts_cumsum = np.r_[0, np.cumsum(self.bkg_ins.counts)]
gr_starts, gr_stops = self.grouping_slice.T
return bkg_cts_cumsum[gr_stops] - bkg_cts_cumsum[gr_starts]
@cached_property()
def bkg_re_counts(self):
bkg_cts_cumsum = np.r_[0, np.cumsum(self.bkg_ins.counts)]
rb_starts, rb_stops = self.rebining_slice.T
return bkg_cts_cumsum[rb_stops] - bkg_cts_cumsum[rb_starts]
@cached_property()
def bkg_errors(self):
bkg_err_cumsum = np.r_[0, np.cumsum(self.bkg_ins.errors**2)]
gr_starts, gr_stops = self.grouping_slice.T
return np.sqrt(bkg_err_cumsum[gr_stops] - bkg_err_cumsum[gr_starts])
@cached_property()
def bkg_re_errors(self):
bkg_err_cumsum = np.r_[0, np.cumsum(self.bkg_ins.errors**2)]
rb_starts, rb_stops = self.rebining_slice.T
return np.sqrt(bkg_err_cumsum[rb_stops] - bkg_err_cumsum[rb_starts])
@cached_property()
def rsp_chbin(self):
ch_left = self.rsp_ins.chbin[self.grouping_slice[:, 0], 0]
ch_right = self.rsp_ins.chbin[self.grouping_slice[:, 1] - 1, 1]
return np.column_stack([ch_left, ch_right])
@cached_property()
def rsp_re_chbin(self):
ch_left = self.rsp_ins.chbin[self.rebining_slice[:, 0], 0]
ch_right = self.rsp_ins.chbin[self.rebining_slice[:, 1] - 1, 1]
return np.column_stack([ch_left, ch_right])
@cached_property()
def rsp_chbin_mean(self):
return np.mean(self.rsp_chbin, axis=1)
@cached_property()
def rsp_re_chbin_mean(self):
return np.mean(self.rsp_re_chbin, axis=1)
@cached_property()
def rsp_chbin_width(self):
return np.diff(self.rsp_chbin, axis=1).reshape(1, -1)[0]
@cached_property()
def rsp_re_chbin_width(self):
return np.diff(self.rsp_re_chbin, axis=1).reshape(1, -1)[0]
@cached_property()
def rsp_chbin_tarr(self):
return np.repeat(self.time, self.rsp_chbin_mean.shape[0])
@cached_property()
def rsp_re_chbin_tarr(self):
return np.repeat(self.time, self.rsp_re_chbin_mean.shape[0])
@cached_property(lambda self: self.pvalues[-2:])
def rsp_drm(self):
rsp_drm_cumsum = np.hstack(
[
np.zeros((self.rsp_ins.drm.shape[0], 1), dtype=float),
np.cumsum(self.rsp_ins.drm, axis=1, dtype=float),
]
)
gr_starts, gr_stops = self.grouping_slice.T
return rsp_drm_cumsum[:, gr_stops] - rsp_drm_cumsum[:, gr_starts]
@cached_property(lambda self: self.pvalues[-2:])
def rsp_re_drm(self):
rsp_drm_cumsum = np.hstack(
[
np.zeros((self.rsp_ins.drm.shape[0], 1), dtype=float),
np.cumsum(self.rsp_ins.drm, axis=1, dtype=float),
]
)
rb_starts, rb_stops = self.rebining_slice.T
return rsp_drm_cumsum[:, rb_stops] - rsp_drm_cumsum[:, rb_starts]
@cached_property(lambda self: self.pvalues[-3:])
def corr_rsp_drm(self):
return self.rsp_drm * self.rsp_ins.factor.value
@cached_property(lambda self: self.pvalues[-3:])
def corr_rsp_re_drm(self):
return self.rsp_re_drm * self.rsp_ins.factor.value
@cached_property(lambda self: self.pvalues[0])
def corr_src_efficiency(self):
return self.src_efficiency * self.src_ins.factor.value
@cached_property(lambda self: self.pvalues[1])
def corr_bkg_efficiency(self):
return self.bkg_efficiency * self.bkg_ins.factor.value
@cached_property(lambda self: self.pvalues[0])
def src_ctsrate(self):
return self.src_counts / self.corr_src_efficiency
@cached_property(lambda self: self.pvalues[0])
def src_re_ctsrate(self):
return self.src_re_counts / self.corr_src_efficiency
@cached_property(lambda self: self.pvalues[0])
def src_ctsrate_error(self):
return self.src_errors / self.corr_src_efficiency
@cached_property(lambda self: self.pvalues[0])
def src_re_ctsrate_error(self):
return self.src_re_errors / self.corr_src_efficiency
@cached_property(lambda self: self.pvalues[0])
def src_ctsspec(self):
return self.src_ctsrate / self.rsp_chbin_width
@cached_property(lambda self: self.pvalues[0])
def src_re_ctsspec(self):
return self.src_re_ctsrate / self.rsp_re_chbin_width
@cached_property(lambda self: self.pvalues[0])
def src_ctsspec_error(self):
return self.src_ctsrate_error / self.rsp_chbin_width
@cached_property(lambda self: self.pvalues[0])
def src_re_ctsspec_error(self):
return self.src_re_ctsrate_error / self.rsp_re_chbin_width
@cached_property(lambda self: self.pvalues[1])
def bkg_ctsrate(self):
return self.bkg_counts / self.corr_bkg_efficiency
@cached_property(lambda self: self.pvalues[1])
def bkg_re_ctsrate(self):
return self.bkg_re_counts / self.corr_bkg_efficiency
@cached_property(lambda self: self.pvalues[1])
def bkg_ctsrate_error(self):
return self.bkg_errors / self.corr_bkg_efficiency
@cached_property(lambda self: self.pvalues[1])
def bkg_re_ctsrate_error(self):
return self.bkg_re_errors / self.corr_bkg_efficiency
@cached_property(lambda self: self.pvalues[1])
def bkg_ctsspec(self):
return self.bkg_ctsrate / self.rsp_chbin_width
@cached_property(lambda self: self.pvalues[1])
def bkg_re_ctsspec(self):
return self.bkg_re_ctsrate / self.rsp_re_chbin_width
@cached_property(lambda self: self.pvalues[1])
def bkg_ctsspec_error(self):
return self.bkg_ctsrate_error / self.rsp_chbin_width
@cached_property(lambda self: self.pvalues[1])
def bkg_re_ctsspec_error(self):
return self.bkg_re_ctsrate_error / self.rsp_re_chbin_width
@cached_property(lambda self: self.pvalues[:2])
def net_ctsrate(self):
return self.src_ctsrate - self.bkg_ctsrate
@cached_property(lambda self: self.pvalues[:2])
def net_re_ctsrate(self):
return self.src_re_ctsrate - self.bkg_re_ctsrate
@cached_property(lambda self: self.pvalues[:2])
def net_ctsrate_error(self):
return np.sqrt(self.src_ctsrate_error**2 + self.bkg_ctsrate_error**2)
@cached_property(lambda self: self.pvalues[:2])
def net_re_ctsrate_error(self):
return np.sqrt(self.src_re_ctsrate_error**2 + self.bkg_re_ctsrate_error**2)
@cached_property(lambda self: self.pvalues[:2])
def net_ctsspec(self):
return self.src_ctsspec - self.bkg_ctsspec
@cached_property(lambda self: self.pvalues[:2])
def net_re_ctsspec(self):
return self.src_re_ctsspec - self.bkg_re_ctsspec
@cached_property(lambda self: self.pvalues[:2])
def net_ctsspec_error(self):
return np.sqrt(self.src_ctsspec_error**2 + self.bkg_ctsspec_error**2)
@cached_property(lambda self: self.pvalues[:2])
def net_re_ctsspec_error(self):
return np.sqrt(self.src_re_ctsspec_error**2 + self.bkg_re_ctsspec_error**2)
@cached_property()
def npoint(self):
return self.src_counts.shape[0]
[docs]
def net_counts_upperlimit(self, cl=0.9):
"""Return the Bayesian net-counts upper limit at confidence ``cl``.
Uses the Poisson formulation with the incomplete gamma function
inverse; ``alpha`` accounts for the source/background exposure
ratio.
Args:
cl: Target confidence level in ``(0, 1)``.
Returns:
Upper limit on the net counts.
"""
N = np.sum(self.src_counts)
B = np.sum(self.bkg_counts) * self.alpha
return (
special.gammaincinv(
N + 1, cl * special.gammaincc(N + 1, B) + special.gammainc(N + 1, B)
)
- B
)
[docs]
def net_ctsrate_upperlimit(self, cl=0.9):
"""Return the net-count-rate upper limit at confidence ``cl``."""
return self.net_counts_upperlimit(cl) / self.corr_src_efficiency
@property
def name(self):
"""User-assigned name if set, otherwise a best-effort caller-scope name."""
try:
return self._name
except AttributeError:
return self.get_obj_name()
@name.setter
def name(self, new_name):
self._name = new_name
[docs]
def get_obj_name(self):
"""Walk call frames and return the outermost local name bound to ``self``.
Returns ``None`` if no binding is found.
"""
frame = inspect.currentframe()
possible_var_names = []
while frame:
local_vars = frame.f_locals.items()
var_names = [var_name for var_name, var_val in local_vars if var_val is self]
if var_names:
possible_var_names.extend(var_names)
frame = frame.f_back
if possible_var_names:
return possible_var_names[-1]
return None
def __str__(self):
return f'*** DataUnit ***\n{self.info.text_table}'
def __repr__(self):
return self.__str__()
def _repr_html_(self):
return (
f'{self.info.html_style}'
f'<details open>'
f'<summary style="margin-bottom: 10px;"><b>DataUnit</b></summary>'
f'{self.info.html_table}'
f'</details>'
)
@staticmethod
def _union(bins):
"""Merge overlapping intervals, returning them sorted and disjoint.
Args:
bins: Iterable of ``[low, high]`` intervals; may be ``None``.
Returns:
A list of merged ``[low, high]`` intervals.
"""
if bins is None or len(bins) == 0:
return []
sorted_bins = sorted(bins, key=lambda x: x[0])
res = []
for current in sorted_bins:
if not res or current[0] > res[-1][1]:
res.append(list(current[:2]))
else:
res[-1][1] = max(res[-1][1], current[1])
return res
@staticmethod
def _notice(chbin, notc=None):
"""Return per-channel ``True``/``False`` flags for inclusion in the fit.
Args:
chbin: ``(nchan, 2)`` channel energy bin edges.
notc: Noticing windows, either a single ``[low, high]`` pair or
a list of such pairs; ``None`` means notice everything.
Returns:
A list of booleans aligned with ``chbin`` rows.
"""
if notc is None:
notc = [[np.min(chbin), np.max(chbin)]]
elif type(notc[0]) is not list:
notc = [notc]
else:
notc = notc
flag = [False] * len(chbin)
for _, (low, upp) in enumerate(notc):
flag_i = list(map(lambda lo, up: low <= lo and upp >= up, chbin[:, 0], chbin[:, 1]))
flag = [pre or now for pre, now in zip(flag, flag_i, strict=False)]
return flag
@staticmethod
def _group(
s,
b,
berr,
ts,
tb,
ss,
sb,
min_sigma=None,
min_evt=None,
min_nevt=None,
max_bin=None,
stat=None,
ini_flag=None,
):
"""Greedy channel-grouping using counts, significance, and width caps.
Walks channels left to right, starting a new bin on the first
allowed channel and extending it until the current bin meets every
threshold: total events, net events, significance, and maximum
width.
Args:
s: Source counts per channel.
b: Background counts per channel.
berr: Background errors per channel.
ts: Source exposure.
tb: Background exposure.
ss: Source backscale.
sb: Background backscale.
min_sigma: Minimum per-bin significance.
min_evt: Minimum per-bin total events.
min_nevt: Minimum per-bin net events.
max_bin: Maximum channels per bin.
stat: Statistic name driving the significance formula.
ini_flag: Per-channel gate: ``1`` means channel is allowed to
contribute, ``0`` excludes it.
Returns:
A numpy array of per-channel grouping flags: ``+1`` starts a
bin, ``-1`` continues one, ``0`` excludes the channel.
Raises:
AttributeError: If ``stat`` is not one of the known statistics.
"""
# grouping flag:
# grpg = 0 if the channel is not allowed to group, including the not qualified noticed channels
# grpg = +1 if the channel is the start of a new bin
# grpg = -1 if the channel is part of a continuing bin
if ini_flag is None:
ini_flag = [1] * len(s)
if min_sigma is None:
min_sigma = -np.inf
if min_evt is None:
min_evt = 0
if min_nevt is None:
min_nevt = 0
if max_bin is None:
max_bin = np.inf
alpha = ts * ss / (tb * sb)
flag, gs = [], []
nowbin = False
cs, cb, cberr, cp = 0, 0, 0, 0
for i in range(len(s)):
if ini_flag[i] != 1:
flag.append(0)
if nowbin:
if len(gs) < 2:
pass
else:
flag[gs[-1]] = -1
nowbin = False
cs, cb, cberr, cp = 0, 0, 0, 0
else:
if not nowbin:
flag.append(1)
gs.append(i)
cp = 1
else:
flag.append(-1)
cp += 1
si = s[i]
bi = b[i]
bierr = berr[i]
cs += si
cb += bi
cberr = np.sqrt(cberr**2 + bierr**2)
if stat is None:
stat = 'pgstat'
if stat in ['pstat', 'cstat', 'ppstat', 'Xppstat', 'Xcstat', 'ULppstat']:
sigma = 0 if (cb < 0 or cs < 0) and cb != cs else ppsig(cs, cb, alpha)
elif stat in ['gstat', 'chi2', 'pgstat', 'Xpgstat', 'ULpgstat']:
sigma = 0 if cs <= 0 or cberr == 0 else pgsig(cs, cb * alpha, cberr * alpha)
else:
raise AttributeError(f'unsupported stat: {stat}')
evt = cs
nevt = cs - cb * alpha
if (
(sigma >= min_sigma) and (evt >= min_evt) and (nevt >= min_nevt)
) or cp == max_bin:
nowbin = False
cs, cb, cberr, cp = 0, 0, 0, 0
else:
nowbin = True
if nowbin and i == (len(s) - 1):
if len(gs) < 2:
pass
else:
flag[gs[-1]] = -1
return np.array(flag)
@staticmethod
def _rebin(
s,
b,
berr,
ts,
tb,
ss,
sb,
min_sigma=None,
min_evt=None,
min_nevt=None,
max_bin=None,
stat=None,
ini_flag=None,
):
"""Alias for :meth:`_group` used for an independent rebinning pass."""
return DataUnit._group(
s, b, berr, ts, tb, ss, sb, min_sigma, min_evt, min_nevt, max_bin, stat, ini_flag
)