Source code for bayspec.model.xspec

"""Bayspec wrappers for XSPEC additive and multiplicative models.

Walks the available ``xspec_models_cxc`` model list and synthesizes a
bayspec :class:`Model` subclass for each one under an ``XS_<name>``
alias. Additive XSPEC models get a ``logNorm`` parameter on top of the
XSPEC-native parameter set. Helper functions (``chatter``, ``abund``,
``xsect``, ``cosmo``) expose the XSPEC global state through wrappers.
"""

import numpy as np
import xspec_models_cxc as xsp
from collections import OrderedDict

from ..model import Model
from ...util.prior import unif
from ...util.param import Par, Cfg


__all__ = []

add_model_list = xsp.list_models(modeltype=xsp.ModelType.Add)
add_model_classes = {name: getattr(xsp, name) for name in add_model_list}
add_model_types = {name: 'add' for name in add_model_list}

mul_model_list = xsp.list_models(modeltype=xsp.ModelType.Mul)
mul_model_classes = {name: getattr(xsp, name) for name in mul_model_list}
mul_model_types = {name: 'mul' for name in mul_model_list}

conv_model_list = xsp.list_models(modeltype=xsp.ModelType.Con)
conv_model_classes = {name: getattr(xsp, name) for name in conv_model_list}
conv_model_types = {name: 'conv' for name in conv_model_list}

model_classes = dict(**add_model_classes, **mul_model_classes)
model_types = dict(**add_model_types, **mul_model_types)


[docs] def list_xspec_models(): """Return the ``XS_<name>`` aliases of every wrapped XSPEC model.""" return [f'XS_{name}' for name in model_classes]
__all__.append('list_xspec_models')
[docs] def chatter(val=None): """Read or write XSPEC's verbosity level; returns the current value when called with no argument.""" if val is None: return xsp.chatter() else: xsp.chatter(val)
__all__.append('chatter')
[docs] def abund(val=None): """Read or set the XSPEC elemental abundance table. Args: val: One of ``'angr'``, ``'aspl'``, ``'feld'``, ``'aneb'``, ``'grsa'``, ``'wilm'``, ``'lodd'``, ``'lpgp'``. ``None`` returns the current table. Raises: ValueError: If ``val`` is not in the allowed list. """ allowed_abund = ['angr', 'aspl', 'feld', 'aneb', 'grsa', 'wilm', 'lodd', 'lpgp'] if val is None: return xsp.abundance() else: if val not in allowed_abund: msg = f'{val} is not allowed abundance' raise ValueError(msg) xsp.abundance(val)
__all__.append('abund')
[docs] def xsect(val=None): """Read or set the XSPEC photoelectric cross-section table. Args: val: One of ``'bcmc'``, ``'obcm'``, ``'vern'``; ``None`` returns the current table. Raises: ValueError: If ``val`` is not in the allowed list. """ allowed_xsect = ['bcmc', 'obcm', 'vern'] if val is None: return xsp.cross_section() else: if val not in allowed_xsect: msg = f'{val} is not allowed cross section' raise ValueError(msg) xsp.cross_section(val)
__all__.append('xsect')
[docs] def cosmo(val=None): """Read or set the XSPEC cosmology. Args: val: Dict with keys ``h0``, ``lambda0``, ``q0``; ``None`` returns the current cosmology. Raises: ValueError: If ``val`` does not carry exactly those keys. """ allowed_key = ['h0', 'lambda0', 'q0'] if val is None: return xsp.cosmology() else: if set(val.keys) != set(allowed_key): msg = f'{val} should be dict with keys h0, lambda0 and q0' raise ValueError(msg) xsp.cosmology(**val)
__all__.append('cosmo') for name, cls in model_classes.items(): def make_init(name, cls): """Closure that produces the ``__init__`` for the wrapped ``XS_<name>`` class.""" def __init__(self): self.xsexpr = name self.expr = f'XS_{name}' self.type = model_types[name] self.comment = f'xspec model {name}' self.xsmodel = cls self.params = OrderedDict() self.config = OrderedDict() self.xsmodel_plabels = [] for pr in xsp.info(name).parameters: pl = pr.name value = pr.default min_value = pr.softmin max_value = pr.softmax self.xsmodel_plabels.append(pl) if pl == 'Redshift': self.config['redshift'] = Cfg(value) else: self.params[pl] = Par(value, unif(min_value, max_value), frozen=pr.frozen) if self.type == 'add': self.params['logNorm'] = Par(0, unif(-10, 10)) if 'Redshift' not in self.xsmodel_plabels: self.config['redshift'] = Cfg(0) return __init__ def func(self, E, T=None, O=None): # noqa: E741 """Gather parameters, invoke the XSPEC backend, and rescale the result.""" pars = [] for pr in xsp.info(self.xsexpr).parameters: pl = pr.name if pl == 'Redshift': pars.append(self.config['redshift'].value) else: pars.append(self.params[pl].value) norm = 10 ** self.params['logNorm'].value if self.type == 'add' else 1.0 E = np.asarray(E) scalar = E.ndim == 0 if scalar: E = E[np.newaxis] if 'Redshift' not in self.xsmodel_plabels: redshift = self.config['redshift'].value zi = 1 + redshift E = E * zi scale = 1e5 ediff = E.reshape(-1, 1) * [1 - 1 / scale, 1, 1 + 1 / scale] def integ(egrid): return np.mean(self.xsmodel(pars=pars, energies=egrid)) if self.type == 'add': xsres = norm * np.array(list(map(integ, ediff))) / (E / scale) elif self.type == 'mul': xsres = norm * np.array(list(map(integ, ediff))) return xsres[0] if scalar else xsres new_class = type( f'XS_{name}', (Model,), { '__init__': make_init(name, cls), 'func': func, '__doc__': f'XSPEC ``{name}`` model bridged into bayspec.', }, ) globals()[f'XS_{name}'] = new_class __all__.append(f'XS_{name}') xspec_models = { name: cls for name, cls in globals().items() if isinstance(cls, type) and issubclass(cls, Model) and name[:2] == 'XS' } __all__.append('xspec_models')