Source code for bayspec.model.local.multiplicative

"""Dimensionless multiplicative components -- cutoffs and ISM absorbers.

The absorption models (``wabs``, ``phabs``, ``tbabs``, ``tinvabs``) load
their tabulated cross sections from the ``docs/xsect`` archive; the
cross-section lookup is memoized per instance for speed.
"""

from collections import OrderedDict
from os.path import abspath, dirname

import numpy as np

from ...util.param import Cfg, Par
from ...util.prior import unif
from ...util.tools import memoized
from ..model import Multiplicative

docs_path = dirname(dirname(dirname(abspath(__file__)))) + '/docs'


[docs] class hecut(Multiplicative): """High-energy exponential cutoff above a configurable threshold energy.""" def __init__(self): r"""Initialise the cutoff with a log-cutoff parameter :math:`\log E_c`.""" self.expr = 'hecut' self.comment = 'high-energy cutoff model' self.config = OrderedDict() self.config['redshift'] = Cfg(0.0) self.config['threshold_energy'] = Cfg(0.0) self.params = OrderedDict() self.params[r'log$E_c$'] = Par(2, unif(-1, 5))
[docs] def func(self, E, T=None, O=None): # noqa: E741 """Return 1 below ``threshold_energy`` and ``exp((ethr - E) / Ec)`` above.""" redshift = self.config['redshift'].value ethr = self.config['threshold_energy'].value logEc = self.params[r'log$E_c$'].value Ec = 10**logEc E = np.asarray(E) scalar = E.ndim == 0 if scalar: E = E[np.newaxis] zi = 1 + redshift E = E * zi nouspec = np.zeros_like(E, dtype=float) i1 = ethr >= E i2 = ethr < E nouspec[i1] = 1.0 nouspec[i2] = np.exp((ethr - E[i2]) / Ec) return nouspec[0] if scalar else nouspec
[docs] class wabs(Multiplicative): """Wisconsin ISM photoelectric absorption (``angr`` abundances).""" def __init__(self): """Initialise with the single parameter :math:`N_H` in ``1e22 cm^-2`` units.""" self.expr = 'wabs' self.comment = 'Wisconsin ISM absorption model' self.config = OrderedDict() self.config['redshift'] = Cfg(0.0) self.params = OrderedDict() self.params[r'$N_H$'] = Par(1, unif(1e-4, 50)) self.xsect_prefix = docs_path + '/xsect' self.xsect_dir = self.xsect_prefix + '/xsect_wabs_angr.npz' self.xsect_data = np.load(self.xsect_dir) self.xsect_energy = self.xsect_data['energy'] self.xsect_sigma = self.xsect_data['sigma']
[docs] def func(self, E, T=None, O=None): # noqa: E741 r"""Return :math:`\exp(-N_H \, \sigma(E))` using the tabulated cross section.""" redshift = self.config['redshift'].value nh = self.params[r'$N_H$'].value sigma = self._get_cached_sigma(E, redshift) fracspec = np.exp(-nh * sigma) return fracspec
@memoized() def _get_cached_sigma(self, E, redshift): """Interpolate the rest-frame cross section at redshift-corrected ``E``.""" E = np.asarray(E, dtype=np.float64) scalar = E.ndim == 0 if scalar: E = E[np.newaxis] zi = 1 + redshift E = E * zi sigma = np.interp(E, self.xsect_energy, self.xsect_sigma, right=0.0) return sigma[0] if scalar else sigma
[docs] class phabs(Multiplicative): """Photoelectric absorption using the ``aspl`` abundance table.""" def __init__(self): """Initialise with the single parameter :math:`N_H` in ``1e22 cm^-2`` units.""" self.expr = 'phabs' self.comment = 'photoelectric absorption model' self.config = OrderedDict() self.config['redshift'] = Cfg(0.0) self.params = OrderedDict() self.params[r'$N_H$'] = Par(1, unif(1e-4, 50)) self.xsect_prefix = docs_path + '/xsect' self.xsect_dir = self.xsect_prefix + '/xsect_phabs_aspl.npz' self.xsect_data = np.load(self.xsect_dir) self.xsect_energy = self.xsect_data['energy'] self.xsect_sigma = self.xsect_data['sigma']
[docs] def func(self, E, T=None, O=None): # noqa: E741 r"""Return :math:`\exp(-N_H \, \sigma(E))` using the tabulated cross section.""" redshift = self.config['redshift'].value nh = self.params[r'$N_H$'].value sigma = self._get_cached_sigma(E, redshift) fracspec = np.exp(-nh * sigma) return fracspec
@memoized() def _get_cached_sigma(self, E, redshift): """Interpolate the rest-frame cross section at redshift-corrected ``E``.""" E = np.asarray(E, dtype=np.float64) scalar = E.ndim == 0 if scalar: E = E[np.newaxis] zi = 1 + redshift E = E * zi sigma = np.interp(E, self.xsect_energy, self.xsect_sigma, right=0.0) return sigma[0] if scalar else sigma
[docs] class tbabs(Multiplicative): """Tuebingen-Boulder ISM absorption using the ``wilm`` abundance table.""" def __init__(self): """Initialise with the single parameter :math:`N_H` in ``1e22 cm^-2`` units.""" self.expr = 'tbabs' self.comment = 'Tuebingen-Boulder ISM absorption model' self.config = OrderedDict() self.config['redshift'] = Cfg(0.0) self.params = OrderedDict() self.params[r'$N_H$'] = Par(1, unif(1e-4, 50)) self.xsect_prefix = docs_path + '/xsect' self.xsect_dir = self.xsect_prefix + '/xsect_tbabs_wilm.npz' self.xsect_data = np.load(self.xsect_dir) self.xsect_energy = self.xsect_data['energy'] self.xsect_sigma = self.xsect_data['sigma']
[docs] def func(self, E, T=None, O=None): # noqa: E741 r"""Return :math:`\exp(-N_H \, \sigma(E))` using the tabulated cross section.""" redshift = self.config['redshift'].value nh = self.params[r'$N_H$'].value sigma = self._get_cached_sigma(E, redshift) fracspec = np.exp(-nh * sigma) return fracspec
@memoized() def _get_cached_sigma(self, E, redshift): """Interpolate the rest-frame cross section at redshift-corrected ``E``.""" E = np.asarray(E, dtype=np.float64) scalar = E.ndim == 0 if scalar: E = E[np.newaxis] zi = 1 + redshift E = E * zi sigma = np.interp(E, self.xsect_energy, self.xsect_sigma, right=0.0) return sigma[0] if scalar else sigma
[docs] class tinvabs(Multiplicative): r"""Exponentially-decaying absorption column :math:`N_H(T) = N_{H,0} e^{-T/\tau}`. Delegates the energy-dependent absorption to an inner :class:`tbabs` while updating its :math:`N_H` per unique ``T`` value. """ def __init__(self): r"""Initialise with initial column :math:`N_{H,0}` and decay time :math:`\tau`.""" self.expr = 'tinvabs' self.comment = 'time-involved absorption model' self.tbabs = tbabs() self.config = OrderedDict() self.config['redshift'] = Cfg(0.0) self.params = OrderedDict() self.params[r'$N_{H,0}$'] = Par(1, unif(1e-4, 50)) self.params[r'$\tau$'] = Par(50, unif(0, 300))
[docs] def func(self, E, T, O=None): # noqa: E741 """Absorb ``E`` by a column that decays with ``T``. Args: E: Energies in keV. T: Times; ``E`` and ``T`` must match in scalar-ness and shape. O: Unused. Returns: Per-sample attenuation factor. Raises: ValueError: If ``E`` and ``T`` shapes disagree. """ redshift = self.config['redshift'].value NH0 = self.params[r'$N_{H,0}$'].value tau = self.params[r'$\tau$'].value self.tbabs.parameters['redshift'].value = redshift E = np.asarray(E) E_scalar = E.ndim == 0 if E_scalar: E = E[np.newaxis] T = np.asarray(T) T_scalar = T.ndim == 0 if T_scalar: T = T[np.newaxis] if E_scalar == T_scalar: if E.shape != T.shape: raise ValueError('E and T must have the same shape') else: scalar = E_scalar else: raise ValueError('E and T must both be scalars or both be arrays') fracspec = np.zeros_like(E, dtype=float) for Ti in set(T): idx = np.where(Ti == T)[0] NH = NH0 * np.exp(-Ti / tau) if NH < 1e-4: return np.nan if scalar else np.ones_like(E) * np.nan self.tbabs.params[r'$N_H$'].value = NH res = self.tbabs(np.array(E, dtype=float)) fracspec[idx] = res return fracspec[0] if scalar else fracspec