"""Additive photon-spectrum components shipped with bayspec.
Each class implements a concrete analytic family (power-law, cutoff
power-law, Band function, blackbody, synchrotron, etc.) and returns the
photon flux density :math:`N(E, T)` from :meth:`func`. Most components
expose a ``vfv_peak`` configuration flag that switches between
peak-energy and break-energy parameterizations, and every component
accepts an optional ``redshift`` configuration that blueshifts ``E`` by
``1 + redshift`` before evaluation.
"""
from collections import OrderedDict
import os
from os.path import abspath, dirname
import subprocess as sp
from astropy.cosmology import Planck18
import astropy.units as u
import numba as nb
import numpy as np
import toml
from ...util.param import Cfg, Par
from ...util.prior import unif
from ...util.tools import cached_property
from ..model import Additive
docs_path = dirname(dirname(dirname(abspath(__file__)))) + '/docs'
[docs]
class pl(Additive):
"""Single power-law photon spectrum."""
def __init__(self):
"""Initialise power-law with spectral index and log-normalisation."""
self.expr = 'pl'
self.comment = 'power-law model'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.params = OrderedDict()
self.params[r'$\alpha$'] = Par(0, unif(-10, 10))
self.params[r'log$A$'] = Par(0, unif(-10, 10))
[docs]
def func(self, E, T=None, O=None): # noqa: E741
r"""Return the power-law photon flux density :math:`A (E/E_0)^\alpha`."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
alpha = self.params[r'$\alpha$'].value
logA = self.params[r'log$A$'].value
Amp = 10**logA
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
zi = 1 + redshift
E = E * zi
phtspec = Amp * (E / epiv) ** alpha
return phtspec[0] if scalar else phtspec
[docs]
class cpl(Additive):
"""Power law with an exponential high-energy cutoff (``vfv_peak`` aware)."""
def __init__(self):
"""Initialise cutoff power-law; parameters switch on ``vfv_peak`` config."""
self.expr = 'cpl'
self.comment = 'power-law model with high-energy cutoff'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.config['vfv_peak'] = Cfg(True)
@cached_property(lambda self: self.config['vfv_peak'].value)
def params(self):
"""Return the parameter dict chosen by the ``vfv_peak`` config flag."""
params = OrderedDict()
if self.config['vfv_peak'].value:
params[r'$\alpha$'] = Par(-1, unif(-2, 2))
params[r'log$E_p$'] = Par(2, unif(0, 4))
params[r'log$A$'] = Par(0, unif(-10, 10))
elif not self.config['vfv_peak'].value:
params[r'$\alpha$'] = Par(-1, unif(-8, 4))
params[r'log$E_c$'] = Par(2, unif(0, 4))
params[r'log$A$'] = Par(0, unif(-10, 10))
else:
raise ValueError('Invalid value for vfv_peak config.')
return params
[docs]
def func(self, E, T=None, O=None): # noqa: E741
r"""Return the cutoff power-law :math:`A (E/E_0)^\alpha e^{-E/E_c}`."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
peak = self.config['vfv_peak'].value
alpha = self.params[r'$\alpha$'].value
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
if peak:
logEp = self.params[r'log$E_p$'].value
Ep = 10**logEp
if not alpha > -2:
return np.nan if scalar else np.ones_like(E) * np.nan
Ec = Ep / (2 + alpha)
else:
logEc = self.params[r'log$E_c$'].value
Ec = 10**logEc
logA = self.params[r'log$A$'].value
Amp = 10**logA
zi = 1 + redshift
E = E * zi
phtspec = Amp * (E / epiv) ** alpha * np.exp(-E / Ec)
return phtspec[0] if scalar else phtspec
[docs]
class sbpl(Additive):
"""Smoothly broken power law (Kaneko et al. 2006, ``10.1086/505911``)."""
# 10.1086/505911
def __init__(self):
"""Initialise sbpl; params switch on ``vfv_peak`` config."""
self.expr = 'sbpl'
self.comment = 'smoothly broken power-law model'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.config['vfv_peak'] = Cfg(True)
self.config['smoothness'] = Cfg(0.3)
@cached_property(lambda self: self.config['vfv_peak'].value)
def params(self):
"""Return the parameter dict chosen by the ``vfv_peak`` config flag."""
params = OrderedDict()
if self.config['vfv_peak'].value:
params[r'$\alpha$'] = Par(-1, unif(-2, 2))
params[r'$\beta$'] = Par(-3, unif(-6, -2))
params[r'log$E_p$'] = Par(2, unif(0, 4))
params[r'log$A$'] = Par(0, unif(-10, 10))
elif not self.config['vfv_peak'].value:
params[r'$\alpha_1$'] = Par(-1, unif(-6, 4))
params[r'$\alpha_2$'] = Par(-3, unif(-6, 4))
params[r'log$E_b$'] = Par(2, unif(0, 4))
params[r'log$A$'] = Par(0, unif(-10, 10))
else:
raise ValueError('Invalid value for vfv_peak config.')
return params
@staticmethod
def _log_cosh(q):
return np.logaddexp(q, -q) - np.log(2.0)
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the sbpl photon spectrum joining two power laws at ``Eb``."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
peak = self.config['vfv_peak'].value
delta = self.config['smoothness'].value
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
if peak:
alpha1 = self.params[r'$\alpha$'].value
alpha2 = self.params[r'$\beta$'].value
logEp = self.params[r'log$E_p$'].value
Ep = 10**logEp
if not (alpha1 > -2 and alpha2 < -2):
return np.nan if scalar else np.ones_like(E) * np.nan
Eb = Ep / (10 ** (delta * np.arctanh((alpha1 + alpha2 + 4) / (alpha1 - alpha2))))
else:
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
logEb = self.params[r'log$E_b$'].value
Eb = 10**logEb
b = (alpha1 + alpha2) / 2
m = (alpha2 - alpha1) / 2
logA = self.params[r'log$A$'].value
Amp = 10**logA
zi = 1 + redshift
E = E * zi
q = np.log10(E / Eb) / delta
qpiv = np.log10(epiv / Eb) / delta
a = m * delta * self._log_cosh(q)
apiv = m * delta * self._log_cosh(qpiv)
phtspec = Amp * (E / epiv) ** b * 10 ** (a - apiv)
return phtspec[0] if scalar else phtspec
[docs]
def slope_func(self, E, T=None, O=None): # noqa: E741
"""Return the local spectral slope of the sbpl model at ``E``."""
redshift = self.config['redshift'].value
peak = self.config['vfv_peak'].value
delta = self.config['smoothness'].value
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
if peak:
alpha1 = self.params[r'$\alpha$'].value
alpha2 = self.params[r'$\beta$'].value
logEp = self.params[r'log$E_p$'].value
Ep = 10**logEp
if not (alpha1 > -2 and alpha2 < -2):
return np.nan if scalar else np.ones_like(E) * np.nan
Eb = Ep / (10 ** (delta * np.arctanh((alpha1 + alpha2 + 4) / (alpha1 - alpha2))))
else:
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
logEb = self.params[r'log$E_b$'].value
Eb = 10**logEb
b = (alpha1 + alpha2) / 2
m = (alpha2 - alpha1) / 2
zi = 1 + redshift
E = E * zi
q = np.log10(E / Eb) / delta
slope = b + m * np.tanh(q)
return slope[0] if scalar else slope
[docs]
class csbpl(Additive):
"""Smoothly broken power law with an added exponential cutoff."""
def __init__(self):
"""Initialise sbpl with exponential cutoff; uses ``vfv_peak`` config."""
self.expr = 'csbpl'
self.comment = 'smoothly broken power-law model with high-energy cutoff'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.config['vfv_peak'] = Cfg(True)
self.config['smoothness'] = Cfg(0.3)
@cached_property(lambda self: self.config['vfv_peak'].value)
def params(self):
"""Return the parameter dict chosen by the ``vfv_peak`` config flag."""
params = OrderedDict()
if self.config['vfv_peak'].value:
params[r'$\alpha_1$'] = Par(1, unif(-2, 2))
params[r'$\alpha_2$'] = Par(-1, unif(-2, 2))
params[r'log$E_b$'] = Par(1, unif(-1, 3))
params[r'log$E_p$'] = Par(2, unif(0, 4))
params[r'log$A$'] = Par(0, unif(-10, 10))
elif not self.config['vfv_peak'].value:
params[r'$\alpha_1$'] = Par(1, unif(-6, 4))
params[r'$\alpha_2$'] = Par(-1, unif(-6, 4))
params[r'log$E_b$'] = Par(1, unif(-1, 3))
params[r'log$E_c$'] = Par(2, unif(0, 4))
params[r'log$A$'] = Par(0, unif(-10, 10))
else:
raise ValueError('Invalid value for vfv_peak config.')
return params
@staticmethod
def _log_cosh(q):
return np.logaddexp(q, -q) - np.log(2.0)
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the sbpl photon spectrum with exponential cutoff."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
peak = self.config['vfv_peak'].value
delta = self.config['smoothness'].value
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
logEb = self.params[r'log$E_b$'].value
Eb = 10**logEb
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
if peak:
logEp = self.params[r'log$E_p$'].value
Ep = 10**logEp
if not alpha2 > -2:
return np.nan if scalar else np.ones_like(E) * np.nan
Ec = Ep / (2 + alpha2)
else:
logEc = self.params[r'log$E_c$'].value
Ec = 10**logEc
logA = self.params[r'log$A$'].value
Amp = 10**logA
zi = 1 + redshift
E = E * zi
b = (alpha1 + alpha2) / 2
m = (alpha2 - alpha1) / 2
q = np.log10(E / Eb) / delta
qpiv = np.log10(epiv / Eb) / delta
a = m * delta * self._log_cosh(q)
apiv = m * delta * self._log_cosh(qpiv)
phtspec = Amp * (E / epiv) ** b * 10 ** (a - apiv) * np.exp(-E / Ec)
return phtspec[0] if scalar else phtspec
[docs]
class dsbpl(Additive):
"""Double smoothly broken power law (three segments, two smooth breaks)."""
def __init__(self):
"""Initialise double sbpl with three indices and two breaks."""
self.expr = 'dsbpl'
self.comment = 'double smoothly broken power-law model'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.config['smoothness1'] = Cfg(0.3)
self.config['smoothness2'] = Cfg(0.3)
self.params = OrderedDict()
self.params[r'$\alpha_1$'] = Par(1, unif(-6, 4))
self.params[r'$\alpha_2$'] = Par(-1, unif(-6, 4))
self.params[r'$\alpha_3$'] = Par(-3, unif(-6, 4))
self.params[r'log$E_{b1}$'] = Par(1, unif(-1, 3))
self.params[r'log$E_{b2}$'] = Par(2, unif(0, 4))
self.params[r'log$A$'] = Par(0, unif(-10, 10))
@staticmethod
def _log_cosh(q):
return np.logaddexp(q, -q) - np.log(2.0)
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the dsbpl photon spectrum, joining three power laws."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
delta1 = self.config['smoothness1'].value
delta2 = self.config['smoothness2'].value
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
alpha3 = self.params[r'$\alpha_3$'].value
logEb1 = self.params[r'log$E_{b1}$'].value
logEb2 = self.params[r'log$E_{b2}$'].value
logA = self.params[r'log$A$'].value
Eb1 = 10.0**logEb1
Eb2 = 10.0**logEb2
Amp = 10.0**logA
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
zi = 1.0 + redshift
E = E * zi
b = 0.5 * (alpha1 + alpha3)
m1 = 0.5 * (alpha2 - alpha1)
m2 = 0.5 * (alpha3 - alpha2)
q1 = np.log10(E / Eb1) / delta1
q2 = np.log10(E / Eb2) / delta2
qpiv1 = np.log10(epiv / Eb1) / delta1
qpiv2 = np.log10(epiv / Eb2) / delta2
a1 = m1 * delta1 * self._log_cosh(q1)
a2 = m2 * delta2 * self._log_cosh(q2)
apiv1 = m1 * delta1 * self._log_cosh(qpiv1)
apiv2 = m2 * delta2 * self._log_cosh(qpiv2)
phtspec = Amp * (E / epiv) ** b * 10.0 ** ((a1 + a2) - (apiv1 + apiv2))
return phtspec[0] if scalar else phtspec
[docs]
def slope_func(self, E, T=None, O=None): # noqa: E741
"""Return the local spectral slope of the dsbpl model at ``E``."""
redshift = self.config['redshift'].value
delta1 = self.config['smoothness1'].value
delta2 = self.config['smoothness2'].value
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
alpha3 = self.params[r'$\alpha_3$'].value
logEb1 = self.params[r'log$E_{b1}$'].value
logEb2 = self.params[r'log$E_{b2}$'].value
Eb1 = 10**logEb1
Eb2 = 10**logEb2
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
zi = 1 + redshift
E = E * zi
b = 0.5 * (alpha1 + alpha3)
m1 = 0.5 * (alpha2 - alpha1)
m2 = 0.5 * (alpha3 - alpha2)
q1 = np.log10(E / Eb1) / delta1
q2 = np.log10(E / Eb2) / delta2
slope = b + m1 * np.tanh(q1) + m2 * np.tanh(q2)
return slope[0] if scalar else slope
[docs]
class tsbpl(Additive):
"""Triple smoothly broken power law (four segments, three smooth breaks)."""
def __init__(self):
"""Initialise triple sbpl with four indices and three breaks."""
self.expr = 'tsbpl'
self.comment = 'triple smoothly broken power-law model'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.config['smoothness1'] = Cfg(0.3)
self.config['smoothness2'] = Cfg(0.3)
self.config['smoothness3'] = Cfg(0.3)
self.params = OrderedDict()
self.params[r'$\alpha_1$'] = Par(2, unif(-6, 4))
self.params[r'$\alpha_2$'] = Par(1, unif(-6, 4))
self.params[r'$\alpha_3$'] = Par(-1, unif(-6, 4))
self.params[r'$\alpha_4$'] = Par(-3, unif(-6, 4))
self.params[r'log$E_{b1}$'] = Par(1, unif(-1, 3))
self.params[r'log$E_{b2}$'] = Par(2, unif(0, 4))
self.params[r'log$E_{b3}$'] = Par(3, unif(1, 5))
self.params[r'log$A$'] = Par(0, unif(-10, 10))
@staticmethod
def _log_cosh(q):
return np.logaddexp(q, -q) - np.log(2.0)
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the tsbpl photon spectrum, joining four power laws."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
delta1 = self.config['smoothness1'].value
delta2 = self.config['smoothness2'].value
delta3 = self.config['smoothness3'].value
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
alpha3 = self.params[r'$\alpha_3$'].value
alpha4 = self.params[r'$\alpha_4$'].value
logEb1 = self.params[r'log$E_{b1}$'].value
logEb2 = self.params[r'log$E_{b2}$'].value
logEb3 = self.params[r'log$E_{b3}$'].value
logA = self.params[r'log$A$'].value
Eb1 = 10.0**logEb1
Eb2 = 10.0**logEb2
Eb3 = 10.0**logEb3
Amp = 10.0**logA
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
zi = 1.0 + redshift
E = E * zi
b = 0.5 * (alpha1 + alpha4)
m1 = 0.5 * (alpha2 - alpha1)
m2 = 0.5 * (alpha3 - alpha2)
m3 = 0.5 * (alpha4 - alpha3)
q1 = np.log10(E / Eb1) / delta1
q2 = np.log10(E / Eb2) / delta2
q3 = np.log10(E / Eb3) / delta3
qpiv1 = np.log10(epiv / Eb1) / delta1
qpiv2 = np.log10(epiv / Eb2) / delta2
qpiv3 = np.log10(epiv / Eb3) / delta3
a1 = m1 * delta1 * self._log_cosh(q1)
a2 = m2 * delta2 * self._log_cosh(q2)
a3 = m3 * delta3 * self._log_cosh(q3)
apiv1 = m1 * delta1 * self._log_cosh(qpiv1)
apiv2 = m2 * delta2 * self._log_cosh(qpiv2)
apiv3 = m3 * delta3 * self._log_cosh(qpiv3)
shape_term = (a1 + a2 + a3) - (apiv1 + apiv2 + apiv3)
phtspec = Amp * (E / epiv) ** b * 10.0**shape_term
return phtspec[0] if scalar else phtspec
[docs]
def slope_func(self, E, T=None, O=None): # noqa: E741
"""Return the local spectral index of the tsbpl model at ``E``."""
redshift = self.config['redshift'].value
delta1 = self.config['smoothness1'].value
delta2 = self.config['smoothness2'].value
delta3 = self.config['smoothness3'].value
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
alpha3 = self.params[r'$\alpha_3$'].value
alpha4 = self.params[r'$\alpha_4$'].value
logEb1 = self.params[r'log$E_{b1}$'].value
logEb2 = self.params[r'log$E_{b2}$'].value
logEb3 = self.params[r'log$E_{b3}$'].value
Eb1 = 10.0**logEb1
Eb2 = 10.0**logEb2
Eb3 = 10.0**logEb3
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
zi = 1 + redshift
E = E * zi
b = 0.5 * (alpha1 + alpha4)
m1 = 0.5 * (alpha2 - alpha1)
m2 = 0.5 * (alpha3 - alpha2)
m3 = 0.5 * (alpha4 - alpha3)
q1 = np.log10(E / Eb1) / delta1
q2 = np.log10(E / Eb2) / delta2
q3 = np.log10(E / Eb3) / delta3
slope = b + m1 * np.tanh(q1) + m2 * np.tanh(q2) + m3 * np.tanh(q3)
return slope[0] if scalar else slope
[docs]
class sb2pl(Additive):
"""Smoothly broken power law variant (Ravasio et al. 2018, ``10.1051/0004-6361/201732245``)."""
# 10.1051/0004-6361/201732245
def __init__(self):
"""Initialise convex two-segment sbpl; uses ``vfv_peak`` config."""
self.expr = 'sb2pl'
self.comment = '2-segment smoothly broken power-law model (always convex)'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.config['vfv_peak'] = Cfg(True)
self.config['smoothness'] = Cfg(2.0)
@cached_property(lambda self: self.config['vfv_peak'].value)
def params(self):
"""Return the parameter dict chosen by the ``vfv_peak`` config flag."""
params = OrderedDict()
if self.config['vfv_peak'].value:
params[r'$\alpha$'] = Par(-1, unif(-2, 2))
params[r'$\beta$'] = Par(-3, unif(-6, -2))
params[r'log$E_p$'] = Par(2, unif(0, 4))
params[r'log$A$'] = Par(0, unif(-10, 10))
elif not self.config['vfv_peak'].value:
params[r'$\alpha_1$'] = Par(-1, unif(-6, 4))
params[r'$\alpha_2$'] = Par(-3, unif(-6, 4))
params[r'log$E_b$'] = Par(2, unif(0, 4))
params[r'log$A$'] = Par(0, unif(-10, 10))
else:
raise ValueError('Invalid value for vfv_peak config.')
return params
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the sb2pl photon spectrum (convex join of two power laws)."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
peak = self.config['vfv_peak'].value
omega = self.config['smoothness'].value
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
if peak:
alpha1 = self.params[r'$\alpha$'].value
alpha2 = self.params[r'$\beta$'].value
logEp = self.params[r'log$E_p$'].value
Ep = 10**logEp
if not (alpha1 > -2 and alpha2 < -2):
return np.nan if scalar else np.ones_like(E) * np.nan
Eb = Ep * (-(alpha1 + 2) / (alpha2 + 2)) ** (1 / ((alpha2 - alpha1) * omega))
else:
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
logEb = self.params[r'log$E_b$'].value
Eb = 10**logEb
if not alpha1 > alpha2:
return np.nan if scalar else np.ones_like(E) * np.nan
logA = self.params[r'log$A$'].value
Amp = 10**logA
zi = 1 + redshift
E = E * zi
f = ((E / Eb) ** (-alpha1 * omega) + (E / Eb) ** (-alpha2 * omega)) ** (-1 / omega)
fpiv = ((epiv / Eb) ** (-alpha1 * omega) + (epiv / Eb) ** (-alpha2 * omega)) ** (-1 / omega)
phtspec = Amp * (f / fpiv)
return phtspec[0] if scalar else phtspec
[docs]
class csb2pl(Additive):
"""Convex 2-segment smoothly broken power law with an exponential cutoff."""
def __init__(self):
"""Initialise convex two-segment sbpl with cutoff; uses ``vfv_peak``."""
self.expr = 'csb2pl'
self.comment = (
'2-segment smoothly broken power-law model (always convex) with high-energy cutoff'
)
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.config['vfv_peak'] = Cfg(True)
self.config['smoothness'] = Cfg(2.0)
@cached_property(lambda self: self.config['vfv_peak'].value)
def params(self):
"""Return the parameter dict chosen by the ``vfv_peak`` config flag."""
params = OrderedDict()
if self.config['vfv_peak'].value:
params[r'$\alpha_1$'] = Par(1, unif(-2, 2))
params[r'$\alpha_2$'] = Par(-1, unif(-2, 2))
params[r'log$E_b$'] = Par(1, unif(-1, 3))
params[r'log$E_p$'] = Par(2, unif(0, 4))
params[r'log$A$'] = Par(0, unif(-10, 10))
elif not self.config['vfv_peak'].value:
params[r'$\alpha_1$'] = Par(1, unif(-6, 4))
params[r'$\alpha_2$'] = Par(-1, unif(-6, 4))
params[r'log$E_b$'] = Par(1, unif(-1, 3))
params[r'log$E_c$'] = Par(2, unif(0, 4))
params[r'log$A$'] = Par(0, unif(-10, 10))
else:
raise ValueError('Invalid value for vfv_peak config.')
return params
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the csb2pl photon spectrum (convex sb2pl plus cutoff)."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
peak = self.config['vfv_peak'].value
omega = self.config['smoothness'].value
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
logEb = self.params[r'log$E_b$'].value
Eb = 10**logEb
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
if not alpha1 > alpha2:
return np.nan if scalar else np.ones_like(E) * np.nan
if peak:
logEp = self.params[r'log$E_p$'].value
Ep = 10**logEp
if not alpha2 > -2:
return np.nan if scalar else np.ones_like(E) * np.nan
Ec = Ep / (2 + alpha2)
else:
logEc = self.params[r'log$E_c$'].value
Ec = 10**logEc
logA = self.params[r'log$A$'].value
Amp = 10**logA
zi = 1 + redshift
E = E * zi
f = ((E / Eb) ** (-alpha1 * omega) + (E / Eb) ** (-alpha2 * omega)) ** (
-1 / omega
) * np.exp(-E / Ec)
fpiv = ((epiv / Eb) ** (-alpha1 * omega) + (epiv / Eb) ** (-alpha2 * omega)) ** (-1 / omega)
phtspec = Amp * (f / fpiv)
return phtspec[0] if scalar else phtspec
[docs]
class sb3pl(Additive):
"""Convex 3-segment smoothly broken power law."""
def __init__(self):
"""Initialise convex three-segment smoothly broken power-law."""
self.expr = 'sb3pl'
self.comment = '3-segment smoothly broken power-law model (always convex)'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.config['smoothness1'] = Cfg(2.0)
self.config['smoothness2'] = Cfg(2.0)
self.params = OrderedDict()
self.params[r'$\alpha_1$'] = Par(1, unif(-6, 4))
self.params[r'$\alpha_2$'] = Par(-1, unif(-6, 4))
self.params[r'$\alpha_3$'] = Par(-3, unif(-6, 4))
self.params[r'log$E_{b1}$'] = Par(1, unif(-1, 3))
self.params[r'log$E_{b2}$'] = Par(2, unif(0, 4))
self.params[r'log$A$'] = Par(0, unif(-10, 10))
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the sb3pl spectrum (convex join of three power laws)."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
omega1 = self.config['smoothness1'].value
omega2 = self.config['smoothness2'].value
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
alpha3 = self.params[r'$\alpha_3$'].value
logEb1 = self.params[r'log$E_{b1}$'].value
logEb2 = self.params[r'log$E_{b2}$'].value
logA = self.params[r'log$A$'].value
Eb1 = 10**logEb1
Eb2 = 10**logEb2
Amp = 10**logA
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
if not alpha1 > alpha2 > alpha3:
return np.nan if scalar else np.ones_like(E) * np.nan
zi = 1 + redshift
E = E * zi
f = self._sb3pl(E, [alpha1, alpha2, alpha3, Eb1, Eb2, omega1, omega2])
fpiv = self._sb3pl(epiv, [alpha1, alpha2, alpha3, Eb1, Eb2, omega1, omega2])
phtspec = Amp * (f / fpiv)
return phtspec[0] if scalar else phtspec
def _pl(self, E, P):
alpha = P[0]
Eb = P[1]
return (E / Eb) ** alpha
def _sb2pl(self, E, P):
alpha1 = P[0]
alpha2 = P[1]
Eb = P[2]
omega = P[3]
F1 = self._pl(E, [alpha1, Eb])
F2 = self._pl(E, [alpha2, Eb])
F12 = (F1 ** (-omega) + F2 ** (-omega)) ** (-1 / omega)
return F12
def _sb3pl(self, E, P):
alpha1 = P[0]
alpha2 = P[1]
alpha3 = P[2]
Eb1 = P[3]
Eb2 = P[4]
omega1 = P[5]
omega2 = P[6]
F12 = self._sb2pl(E, [alpha1, alpha2, Eb1, omega1])
F3 = self._pl(E, [alpha3, Eb2]) * self._sb2pl(Eb2, [alpha1, alpha2, Eb1, omega1])
F123 = (F12 ** (-omega2) + F3 ** (-omega2)) ** (-1 / omega2)
return F123
[docs]
class sb4pl(Additive):
"""Convex 4-segment smoothly broken power law."""
def __init__(self):
"""Initialise convex four-segment smoothly broken power-law."""
self.expr = 'sb4pl'
self.comment = '4-segment smoothly broken power-law model (always convex)'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.config['smoothness1'] = Cfg(2.0)
self.config['smoothness2'] = Cfg(2.0)
self.config['smoothness3'] = Cfg(2.0)
self.params = OrderedDict()
self.params[r'$\alpha_1$'] = Par(2, unif(-6, 4))
self.params[r'$\alpha_2$'] = Par(1, unif(-6, 4))
self.params[r'$\alpha_3$'] = Par(-1, unif(-6, 4))
self.params[r'$\alpha_4$'] = Par(-3, unif(-6, 4))
self.params[r'log$E_{b1}$'] = Par(1, unif(-1, 3))
self.params[r'log$E_{b2}$'] = Par(2, unif(0, 4))
self.params[r'log$E_{b3}$'] = Par(3, unif(1, 5))
self.params[r'log$A$'] = Par(0, unif(-10, 10))
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the sb4pl photon spectrum (convex join of four power laws)."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
omega1 = self.config['smoothness1'].value
omega2 = self.config['smoothness2'].value
omega3 = self.config['smoothness3'].value
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
alpha3 = self.params[r'$\alpha_3$'].value
alpha4 = self.params[r'$\alpha_4$'].value
logEb1 = self.params[r'log$E_{b1}$'].value
logEb2 = self.params[r'log$E_{b2}$'].value
logEb3 = self.params[r'log$E_{b3}$'].value
logA = self.params[r'log$A$'].value
Eb1 = 10**logEb1
Eb2 = 10**logEb2
Eb3 = 10**logEb3
Amp = 10**logA
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
zi = 1 + redshift
E = E * zi
if not alpha1 > alpha2 > alpha3 > alpha4:
return np.nan if scalar else np.ones_like(E) * np.nan
f = self._sb4pl(E, [alpha1, alpha2, alpha3, alpha4, Eb1, Eb2, Eb3, omega1, omega2, omega3])
fpiv = self._sb4pl(
epiv, [alpha1, alpha2, alpha3, alpha4, Eb1, Eb2, Eb3, omega1, omega2, omega3]
)
phtspec = Amp * (f / fpiv)
return phtspec[0] if scalar else phtspec
def _pl(self, E, P):
alpha = P[0]
Eb = P[1]
return (E / Eb) ** alpha
def _sb2pl(self, E, P):
alpha1 = P[0]
alpha2 = P[1]
Eb = P[2]
omega = P[3]
F1 = self._pl(E, [alpha1, Eb])
F2 = self._pl(E, [alpha2, Eb])
F12 = (F1 ** (-omega) + F2 ** (-omega)) ** (-1 / omega)
return F12
def _sb3pl(self, E, P):
alpha1 = P[0]
alpha2 = P[1]
alpha3 = P[2]
Eb1 = P[3]
Eb2 = P[4]
omega1 = P[5]
omega2 = P[6]
F12 = self._sb2pl(E, [alpha1, alpha2, Eb1, omega1])
F3 = self._pl(E, [alpha3, Eb2]) * self._sb2pl(Eb2, [alpha1, alpha2, Eb1, omega1])
F123 = (F12 ** (-omega2) + F3 ** (-omega2)) ** (-1 / omega2)
return F123
def _sb4pl(self, E, P):
alpha1 = P[0]
alpha2 = P[1]
alpha3 = P[2]
alpha4 = P[3]
Eb1 = P[4]
Eb2 = P[5]
Eb3 = P[6]
omega1 = P[7]
omega2 = P[8]
omega3 = P[9]
F123 = self._sb3pl(E, [alpha1, alpha2, alpha3, Eb1, Eb2, omega1, omega2])
F4 = self._pl(E, [alpha4, Eb3]) * self._sb3pl(
Eb3, [alpha1, alpha2, alpha3, Eb1, Eb2, omega1, omega2]
)
F1234 = (F123 ** (-omega3) + F4 ** (-omega3)) ** (-1 / omega3)
return F1234
[docs]
class band(Additive):
"""Band function (Band et al. 1993, ``10.1086/172995``)."""
# 10.1086/172995
def __init__(self):
"""Initialise Band function with low/high indices, log-Ep, log-A."""
self.expr = 'band'
self.comment = 'band function'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(100.0)
self.params = OrderedDict()
self.params[r'$\alpha$'] = Par(-1, unif(-2, 2))
self.params[r'$\beta$'] = Par(-3, unif(-6, -2))
self.params[r'log$E_p$'] = Par(2, unif(0, 4))
self.params[r'log$A$'] = Par(0, unif(-10, 10))
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the Band-function photon spectrum; piecewise below/above ``Ebreak``."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
alpha = self.params[r'$\alpha$'].value
beta = self.params[r'$\beta$'].value
logEp = self.params[r'log$E_p$'].value
logA = self.params[r'log$A$'].value
Ep = 10**logEp
Amp = 10**logA
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
zi = 1 + redshift
E = E * zi
Ec = Ep / (2 + alpha)
Eb = (alpha - beta) * Ec
phtspec = np.zeros_like(E, dtype=float)
i1 = Eb >= E
i2 = Eb < E
phtspec[i1] = Amp * (E[i1] / epiv) ** alpha * np.exp(-E[i1] / Ec)
phtspec[i2] = (
Amp * (Eb / epiv) ** (alpha - beta) * np.exp(beta - alpha) * (E[i2] / epiv) ** beta
)
return phtspec[0] if scalar else phtspec
[docs]
class cband(Additive):
"""Band function augmented with an exponential high-energy cutoff."""
# 10.1088/0004-637X/751/2/90
def __init__(self):
"""Initialise Band function with high-energy exponential cutoff."""
self.expr = 'cband'
self.comment = 'band function with high-energy cutoff'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.params = OrderedDict()
self.params[r'$\alpha_1$'] = Par(1, unif(-2, 2))
self.params[r'$\alpha_2$'] = Par(-1, unif(-2, 2))
self.params[r'log$E_b$'] = Par(1, unif(0, 3))
self.params[r'log$E_p$'] = Par(3, unif(1, 4))
self.params[r'log$A$'] = Par(0, unif(-10, 10))
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the cband spectrum: Band function with exponential cutoff."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
logEb = self.params[r'log$E_b$'].value
logEp = self.params[r'log$E_p$'].value
logA = self.params[r'log$A$'].value
Eb = 10**logEb
Ep = 10**logEp
Amp = 10**logA
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
zi = 1 + redshift
E = E * zi
Ec2 = Ep / (2 + alpha2)
Ec1 = 1 / (1 / Ec2 + (alpha1 - alpha2) / Eb)
phtspec = np.zeros_like(E, dtype=float)
i1 = Eb >= E
i2 = Eb < E
phtspec[i1] = Amp * (E[i1] / epiv) ** alpha1 * np.exp(-E[i1] / Ec1)
phtspec[i2] = (
Amp
* (Eb / epiv) ** (alpha1 - alpha2)
* np.exp(alpha2 - alpha1)
* (E[i2] / epiv) ** alpha2
* np.exp(-E[i2] / Ec2)
)
return phtspec[0] if scalar else phtspec
[docs]
class dband(Additive):
"""Double Band function (two adjoining Band segments)."""
# 10.1088/0004-637X/751/2/90
def __init__(self):
"""Initialise double Band: two low indices, high index, two breaks."""
self.expr = 'dband'
self.comment = 'double band functions'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.params = OrderedDict()
self.params[r'$\alpha_1$'] = Par(1, unif(-2, 2))
self.params[r'$\alpha_2$'] = Par(-1, unif(-2, 2))
self.params[r'$\beta$'] = Par(-3, unif(-6, -2))
self.params[r'log$E_b$'] = Par(1, unif(0, 3))
self.params[r'log$E_p$'] = Par(3, unif(1, 4))
self.params[r'log$A$'] = Par(0, unif(-10, 10))
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the dband spectrum with three piecewise Band-like segments."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
alpha1 = self.params[r'$\alpha_1$'].value
alpha2 = self.params[r'$\alpha_2$'].value
beta = self.params[r'$\beta$'].value
logEb = self.params[r'log$E_b$'].value
logEp = self.params[r'log$E_p$'].value
logA = self.params[r'log$A$'].value
Eb = 10**logEb
Ep = 10**logEp
Amp = 10**logA
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
zi = 1 + redshift
E = E * zi
Eb1 = Eb
Ec2 = Ep / (2 + alpha2)
Ec1 = 1 / (1 / Ec2 + (alpha1 - alpha2) / Eb1)
Eb2 = Ec2 * (alpha2 - beta)
phtspec = np.zeros_like(E, dtype=float)
i1 = Eb1 >= E
i2 = (Eb1 < E) & (Eb2 >= E)
i3 = Eb2 < E
phtspec[i1] = Amp * (E[i1] / epiv) ** alpha1 * np.exp(-E[i1] / Ec1)
phtspec[i2] = (
Amp
* (Eb1 / epiv) ** (alpha1 - alpha2)
* np.exp(alpha2 - alpha1)
* (E[i2] / epiv) ** alpha2
* np.exp(-E[i2] / Ec2)
)
phtspec[i3] = (
Amp
* (Eb1 / epiv) ** (alpha1 - alpha2)
* np.exp(alpha2 - alpha1)
* (Eb2 / epiv) ** (alpha2 - beta)
* np.exp(beta - alpha2)
* (E[i3] / epiv) ** beta
)
return phtspec[0] if scalar else phtspec
[docs]
class bb(Additive):
"""Single-temperature blackbody photon spectrum."""
def __init__(self):
"""Initialise single-temperature blackbody with log-kT and log-A."""
self.expr = 'bb'
self.comment = 'black-body model'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.params = OrderedDict()
self.params[r'log$kT$'] = Par(1, unif(-2, 4))
self.params[r'log$A$'] = Par(0, unif(-10, 10))
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the single-temperature blackbody photon flux density."""
redshift = self.config['redshift'].value
logKT = self.params[r'log$kT$'].value
logA = self.params[r'log$A$'].value
kT = 10**logKT
Amp = 10**logA
E = np.asarray(E)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
zi = 1 + redshift
E = E * zi
# Use the identity 1/(exp(x) - 1) = exp(-x)/(1 - exp(-x)) so the
# exponential only ever sees a non-positive argument: exp(-x) safely
# underflows to 0 for large x instead of overflowing, while -expm1(-x)
# keeps the small-x limit numerically stable.
x = E / kT
phtspec = Amp * 8.0525 * E**2 / kT**4 * np.exp(-x) / (-np.expm1(-x))
return phtspec[0] if scalar else phtspec
[docs]
class mbb(Additive):
"""Multi-color (multi-temperature) blackbody (Hou et al. 2018, ``10.3847/1538-4357/aadc07``)."""
# 10.3847/1538-4357/aadc07
_MBB_GAUSS_NODES, _MBB_GAUSS_WEIGHTS = np.polynomial.legendre.leggauss(64)
_MBB_GAUSS_NODES = _MBB_GAUSS_NODES.astype(np.float64)
_MBB_GAUSS_WEIGHTS = _MBB_GAUSS_WEIGHTS.astype(np.float64)
def __init__(self):
"""Initialise multi-colour blackbody: kT-range, slope ``m``, log-A."""
self.expr = 'mbb'
self.comment = 'multi-color black-body model'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.params = OrderedDict()
self.params[r'log$kT_{min}$'] = Par(0, unif(-1, 3))
self.params[r'log$kT_{max}$'] = Par(2, unif(0, 4))
self.params[r'$m$'] = Par(0, unif(-2, 2))
self.params[r'log$A$'] = Par(0, unif(-10, 10))
[docs]
def func(self, E, T=None, O=None): # noqa: E741
"""Return the multi-colour blackbody via Gauss-Legendre quadrature."""
redshift = self.config['redshift'].value
logkTmin = self.params[r'log$kT_{min}$'].value
logkTmax = self.params[r'log$kT_{max}$'].value
m = self.params[r'$m$'].value
logA = self.params[r'log$A$'].value
kTmin = 10**logkTmin
kTmax = 10**logkTmax
Amp = 10**logA
E = np.asarray(E, dtype=np.float64)
scalar = E.ndim == 0
if scalar:
E = E[np.newaxis]
phtspec = self._func_nb(
E, redshift, kTmin, kTmax, m, Amp, self._MBB_GAUSS_NODES, self._MBB_GAUSS_WEIGHTS
)
return phtspec[0] if scalar else phtspec
@staticmethod
@nb.njit(cache=True, fastmath=True)
def _func_nb(E, redshift, kTmin, kTmax, m, amp, nodes, weights):
n = E.shape[0]
phtspec = np.empty(n, dtype=np.float64)
if not kTmax > kTmin:
for i in range(n):
phtspec[i] = np.nan
return phtspec
zi = 1.0 + redshift
ratio_T = kTmax / kTmin
term1 = 8.0525 * (m + 1.0) * amp
term2 = ratio_T ** (m + 1.0) - 1.0
term3 = kTmin ** (-2.0)
prefactor = (term1 / term2) * term3
for i in range(n):
Ei = E[i] * zi
lower_limit = Ei / kTmax
upper_limit = Ei / kTmin
if upper_limit <= lower_limit:
integral_val = 0.0
else:
half_width = 0.5 * (upper_limit - lower_limit)
center = 0.5 * (upper_limit + lower_limit)
acc = 0.0
for j in range(nodes.shape[0]):
x = half_width * nodes[j] + center
y = 0.0 if x > 700.0 else x ** (2.0 - m) / np.expm1(x)
acc += weights[j] * y
integral_val = half_width * acc
scaling_factor = (Ei / kTmin) ** (m - 1.0)
phtspec[i] = prefactor * scaling_factor * integral_val
return phtspec
[docs]
class hlecpl(Additive):
"""High-latitude-emission curvature model for a cutoff power law (time-dependent)."""
# 10.1088/0004-637X/690/1/L10
def __init__(self):
"""Initialise HLE cutoff power-law: alpha, log-Ep/c, log-Ac, t0, tc."""
self.expr = 'hlecpl'
self.comment = 'curvature effect model for cpl function'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(1.0)
self.params = OrderedDict()
self.params[r'$\alpha$'] = Par(-1, unif(-2, 2))
self.params[r'log$E_{p,c}$'] = Par(2, unif(0, 4))
self.params[r'log$A_c$'] = Par(0, unif(-6, 6))
self.params[r'$t_0$'] = Par(0, unif(-20, 20))
self.params[r'$t_c$'] = Par(10, unif(0, 50))
[docs]
def func(self, E, T, O=None): # noqa: E741
"""Return the time-dependent HLE cutoff-power-law at ``(E, T)``."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
alpha = self.params[r'$\alpha$'].value
logEpc = self.params[r'log$E_{p,c}$'].value
logAc = self.params[r'log$A_c$'].value
t0 = self.params[r'$t_0$'].value
tc = self.params[r'$t_c$'].value
Epc = 10**logEpc
Ac = 10**logAc
E = np.asarray(E)
E_scalar = E.ndim == 0
if E_scalar:
E = E[np.newaxis]
T = np.asarray(T)
T_scalar = T.ndim == 0
if T_scalar:
T = T[np.newaxis]
if E_scalar == T_scalar:
if E.shape != T.shape:
raise ValueError('E and T must have the same shape')
else:
scalar = E_scalar
else:
raise ValueError('E and T must both be scalars or both be arrays')
if tc <= t0 or tc > np.min(T):
return np.nan if scalar else np.ones_like(E) * np.nan
zi = 1 + redshift
E = E * zi
Ept = Epc * ((T - t0) / (tc - t0)) ** (-1)
At = Ac * ((T - t0) / (tc - t0)) ** (alpha - 1)
phtspec = At * (E / epiv) ** alpha * np.exp(-(2 + alpha) * E / Ept)
return phtspec[0] if scalar else phtspec
[docs]
class hleband(Additive):
"""High-latitude-emission curvature model for a Band function (time-dependent)."""
def __init__(self):
"""Initialise HLE Band with alpha, beta, log-Ep/c, log-Ac, t0, tc."""
self.expr = 'hleband'
self.comment = 'curvature effect model for band function'
self.config = OrderedDict()
self.config['redshift'] = Cfg(0.0)
self.config['pivot_energy'] = Cfg(100.0)
self.params = OrderedDict()
self.params[r'$\alpha$'] = Par(-1, unif(-2, 2))
self.params[r'$\beta$'] = Par(-4, unif(-6, -2))
self.params[r'log$E_{p,c}$'] = Par(2, unif(0, 4))
self.params[r'log$A_c$'] = Par(0, unif(-6, 6))
self.params[r'$t_0$'] = Par(0, unif(-20, 20))
self.params[r'$t_c$'] = Par(10, unif(0, 50))
[docs]
def func(self, E, T, O=None): # noqa: E741
"""Return the time-dependent HLE Band-function at ``(E, T)``."""
redshift = self.config['redshift'].value
epiv = self.config['pivot_energy'].value
alpha = self.params[r'$\alpha$'].value
beta = self.params[r'$\beta$'].value
logEpc = self.params[r'log$E_{p,c}$'].value
logAc = self.params[r'log$A_c$'].value
t0 = self.params[r'$t_0$'].value
tc = self.params[r'$t_c$'].value
Epc = 10**logEpc
Ac = 10**logAc
E = np.asarray(E)
E_scalar = E.ndim == 0
if E_scalar:
E = E[np.newaxis]
T = np.asarray(T)
T_scalar = T.ndim == 0
if T_scalar:
T = T[np.newaxis]
if E_scalar == T_scalar:
if E.shape != T.shape:
raise ValueError('E and T must have the same shape')
else:
scalar = E_scalar
else:
raise ValueError('E and T must both be scalars or both be arrays')
if tc <= t0 or tc > np.min(T):
return np.nan if scalar else np.ones_like(E) * np.nan
zi = 1 + redshift
E = E * zi
Ept = Epc * ((T - t0) / (tc - t0)) ** (-1)
At = Ac * ((T - t0) / (tc - t0)) ** (alpha - 1)
Ebt = (alpha - beta) / (alpha + 2) * Ept
phtspec = np.zeros_like(E, dtype=float)
i1 = Ebt > E
i2 = Ebt <= E
phtspec[i1] = At[i1] * (E[i1] / epiv) ** alpha * np.exp(-(2 + alpha) * E[i1] / Ept[i1])
phtspec[i2] = (
At[i2]
* (Ebt[i2] / epiv) ** (alpha - beta)
* np.exp(beta - alpha)
* (E[i2] / epiv) ** beta
)
return phtspec[0] if scalar else phtspec
[docs]
class zxhsync(Additive):
"""Time-dependent synchrotron model (ZXH provider, reads an external ``.o`` kernel)."""
def __init__(self):
"""Initialise ZXH synchrotron: magnetic, electron, jet, injection."""
self.expr = 'zxhsync'
self.comment = "zxh's synchrotron model"
self.mo_prefix = docs_path + '/zxhsync'
self.mo_dir = self.mo_prefix + '/spec_lc_ele_z_dL_gm_gmax_injpl_v2.o'
self.config = OrderedDict()
self.config['redshift'] = Cfg(1.0)
self.config['max_time'] = Cfg(26.0)
self.config['zero_offset'] = Cfg(0.0)
self.config['spec_prec'] = Cfg(2)
self.config['temp_prec'] = Cfg(25)
self.params = OrderedDict()
self.params[r'log$B_0$'] = Par(1, unif(0, 2))
self.params[r'$\alpha_B$'] = Par(2, unif(0, 3))
self.params[r'log$\gamma_{min}$'] = Par(5, unif(3, 7))
self.params[r'log$\gamma_{max}$'] = Par(7, unif(5, 9))
self.params[r'log$\Gamma$'] = Par(2, unif(1, 3))
self.params[r'$p$'] = Par(2, unif(1.5, 3.5))
self.params[r'$t_{inj}$'] = Par(10, unif(-0.5, 26.00))
self.params[r'$q$'] = Par(5, unif(0, 10))
self.params[r'log$R_0$'] = Par(15, unif(12, 17))
self.params[r'log$Q_0$'] = Par(40, unif(30, 50))
[docs]
def func(self, E, T, O=None): # noqa: E741
"""Return the ZXH synchrotron spectrum via the external kernel."""
logB0 = self.params[r'log$B_0$'].value
alphaB = self.params[r'$\alpha_B$'].value
loggamma_min = self.params[r'log$\gamma_{min}$'].value
loggamma_max = self.params[r'log$\gamma_{max}$'].value
logGamma = self.params[r'log$\Gamma$'].value
p = self.params[r'$p$'].value
tinj = self.params[r'$t_{inj}$'].value
q = self.params[r'$q$'].value
logR0 = self.params[r'log$R_0$'].value
logQ0 = self.params[r'log$Q_0$'].value # in unit of s^-1
B0 = 10**logB0
gamma_min = 10**loggamma_min
gamma_max = 10**loggamma_max
Gamma = 10**logGamma
R0 = 10**logR0
Q0 = 10**logQ0
B0_str = str(B0)
alphaB_str = str(alphaB)
gamma_min_str = str(gamma_min)
gamma_max_str = str(gamma_max)
Gamma_str = str(Gamma)
p_str = str(p)
zero_dt_str = '{:.4f}'.format(self.config['zero_offset'].value)
tinj_str = str(tinj)
q_str = str(q)
max_time_str = '{:.4f}'.format(self.config['max_time'].value)
R0_str = str(R0)
Q0_str = str(Q0)
z_str = '{:.4f}'.format(self.config['redshift'].value)
dL_str = f'{self.luminosity_distance:.4e}'
spec_prec_str = '{:.2f}'.format(self.config['spec_prec'].value)
temp_prec_str = '{:.2f}'.format(self.config['temp_prec'].value)
E = np.asarray(E)
E_scalar = E.ndim == 0
if E_scalar:
E = E[np.newaxis]
T = np.asarray(T)
T_scalar = T.ndim == 0
if T_scalar:
T = T[np.newaxis]
if E_scalar == T_scalar:
if E.shape != T.shape:
raise ValueError('E and T must have the same shape')
else:
scalar = E_scalar
else:
raise ValueError('E and T must both be scalars or both be arrays')
phtspec = np.zeros_like(E, dtype=float)
for Ti in set(T):
idx = np.where(Ti == T)[0]
t_obs_str = str(Ti)
E_str = ' '.join([str(Ei) for Ei in E[idx]])
n_str = str(len(E[idx]))
it_str = '0'
ielec_str = '0'
cmd = (
self.mo_dir
+ ' '
+ self.mo_prefix
+ ' '
+ B0_str
+ ' '
+ alphaB_str
+ ' '
+ gamma_min_str
+ ' '
+ gamma_max_str
+ ' '
+ Gamma_str
+ ' '
+ p_str
+ ' '
+ t_obs_str
+ ' '
+ zero_dt_str
+ ' '
+ tinj_str
+ ' '
+ q_str
+ ' '
+ max_time_str
+ ' '
+ R0_str
+ ' '
+ Q0_str
+ ' '
+ z_str
+ ' '
+ dL_str
+ ' '
+ n_str
+ ' '
+ it_str
+ ' '
+ ielec_str
+ ' '
+ spec_prec_str
+ ' '
+ temp_prec_str
+ ' '
+ E_str
)
process = sp.Popen(
cmd, shell=True, stdout=sp.PIPE, stderr=sp.PIPE, universal_newlines=True
)
(out, err) = process.communicate()
Fd_str = out.split()
if out == '' or len(Fd_str) != len(E[idx]):
print('+++++ Error Message +++++')
print('out: ', out)
print('err: ', err)
print('cmd: ', cmd)
print('+++++ +++++++++++++ +++++')
raise RuntimeError('ZXHSYNC model execution failed')
Fd = np.array([float(Fdi) for Fdi in Fd_str]) # in unit of mJy
Fv = (
Fd / (E[idx] * 1.6022e-9) / (6.62607e-34 * 6.2415e15) / 1.0e26
) # in unit of photons/s/cm^2/keV
phtspec[idx] = Fv
return phtspec[0] if scalar else phtspec
@property
def luminosity_distance(self):
"""Luminosity distance in cm at the configured redshift (Planck18 cosmology)."""
return Planck18.luminosity_distance(self.config['redshift'].value).to(u.cm).value
[docs]
def elecspec(self, ielec_list=None):
"""
Generate the electron spectrum for each step
ielec_list: list of steps
return: list of electron spectra
"""
logB0 = self.params[r'log$B_0$'].value
alphaB = self.params[r'$\alpha_B$'].value
loggamma_min = self.params[r'log$\gamma_{min}$'].value
loggamma_max = self.params[r'log$\gamma_{max}$'].value
logGamma = self.params[r'log$\Gamma$'].value
p = self.params[r'$p$'].value
tinj = self.params[r'$t_{inj}$'].value
q = self.params[r'$q$'].value
logR0 = self.params[r'log$R_0$'].value
logQ0 = self.params[r'log$Q_0$'].value # in unit of s^-1
B0 = 10**logB0
gamma_min = 10**loggamma_min
gamma_max = 10**loggamma_max
Gamma = 10**logGamma
R0 = 10**logR0
Q0 = 10**logQ0
B0_str = str(B0)
alphaB_str = str(alphaB)
gamma_min_str = str(gamma_min)
gamma_max_str = str(gamma_max)
Gamma_str = str(Gamma)
p_str = str(p)
zero_dt_str = '{:.4f}'.format(self.config['zero_offset'].value)
tinj_str = str(tinj)
q_str = str(q)
max_time_str = '{:.4f}'.format(self.config['max_time'].value)
R0_str = str(R0)
Q0_str = str(Q0)
z_str = '{:.4f}'.format(self.config['redshift'].value)
dL_str = f'{self.luminosity_distance:.4e}'
spec_prec_str = '{:.2f}'.format(self.config['spec_prec'].value)
temp_prec_str = '{:.2f}'.format(self.config['temp_prec'].value)
t_obs_str = '1.0'
n_str = '0'
it_str = '0'
if ielec_list is None:
ielec_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
beta = np.sqrt(Gamma**2 - 1) / Gamma
tobspre = self.config['zero_offset'].value
tobs0 = R0 * (1 - beta) * (1 + 0.36) / beta / 3e10
tobsmax = self.config['max_time'].value
Rmax = 3e10 * beta * (tobsmax + tobs0 + tobspre) / (1 + 0.36) / (1 - beta)
elec_spec = []
for ielec in ielec_list:
Rn = 10 ** (np.log10(R0) + (ielec - 1) * np.log10(Rmax / R0) / 399)
tn = (Rn - R0) / beta / 3e10
ielec_str = str(ielec)
cmd = (
self.mo_dir
+ ' '
+ self.mo_prefix
+ ' '
+ B0_str
+ ' '
+ alphaB_str
+ ' '
+ gamma_min_str
+ ' '
+ gamma_max_str
+ ' '
+ Gamma_str
+ ' '
+ p_str
+ ' '
+ t_obs_str
+ ' '
+ zero_dt_str
+ ' '
+ tinj_str
+ ' '
+ q_str
+ ' '
+ max_time_str
+ ' '
+ R0_str
+ ' '
+ Q0_str
+ ' '
+ z_str
+ ' '
+ dL_str
+ ' '
+ n_str
+ ' '
+ it_str
+ ' '
+ ielec_str
+ ' '
+ spec_prec_str
+ ' '
+ temp_prec_str
)
process = sp.Popen(
cmd, shell=True, stdout=sp.PIPE, stderr=sp.PIPE, universal_newlines=True
)
(out, err) = process.communicate()
ge_str = out.split()[::2]
Ne_str = out.split()[1::2]
if err != '':
print('+++++ Error Message +++++')
print('out: ', out)
print('err: ', err)
print('cmd: ', cmd)
print('+++++ +++++++++++++ +++++')
raise RuntimeError('ZXHSYNC model execution failed')
ge = np.array([float(gei) for gei in ge_str])
Ne = np.array([float(Nei) for Nei in Ne_str])
elec_spec.append({'nstep': ielec, 'Rn': Rn, 'tn': tn, 'ge': ge, 'Ne': Ne})
return elec_spec
[docs]
def lightcurve(self, response_list, time_list, tarr):
"""
Generate the model-predicted counts spectrum for each time
response_list: list of response files
time_list: list of times
tarr: time array of the light curve
return: list of model-predicted counts spectrum for each time
"""
idx = np.argmin(np.abs(tarr[:, None] - time_list), axis=1)
lc = []
for i, j in enumerate(idx):
ctsrate, _ = self.convolve_response(response_list[j], tarr[i])
lc.append(ctsrate)
return lc
[docs]
class katu(Additive):
"""External KATU provider invoked through a TOML-configured subprocess."""
def __init__(self):
"""Initialise KATU wrapper: load TOML, expose jet/injection params."""
self.expr = 'katu'
self.comment = 'katu model'
self.pwd = os.getcwd()
self.mo_prefix = docs_path + '/katu'
self.mo_dir = self.mo_prefix + '/GRB_MZ'
self.cfg_dir = self.mo_prefix + '/prompt.toml'
with open(self.cfg_dir) as f_obj:
self.mo_cfg = toml.load(f_obj)
self.config = OrderedDict()
self.config['redshift'] = Cfg(1.0)
self.config['t_obs_start'] = Cfg(0)
self.config['t_obs_end'] = Cfg(20)
self.params = OrderedDict()
self.params[r'log$B_0$'] = Par(1, unif(0, 2))
self.params[r'$\alpha_B$'] = Par(1, unif(0, 3))
self.params[r'log$\gamma_{min}$'] = Par(5, unif(3, 7))
self.params[r'log$\Gamma$'] = Par(2, unif(1, 3))
self.params[r'$p$'] = Par(3, unif(1.5, 3.5))
self.params[r'$t_{inj}$'] = Par(10, unif(0, 20))
self.params[r'log$R_0$'] = Par(15, unif(12, 17))
self.params[r'log$Q_0$'] = Par(40, unif(30, 50))
[docs]
def func(self, E, T, O=None): # noqa: E741
"""Return the KATU photon spectrum via the external binary."""
redshift = self.config['redshift'].value
t_obs_start = self.config['t_obs_start'].value
t_obs_end = self.config['t_obs_end'].value
self.set_cfg('Flux.z', redshift)
self.set_cfg('General.t_obs_start', t_obs_start)
self.set_cfg('General.t_obs_end', t_obs_end)
logB0 = self.params[r'log$B_0$'].value
alphaB = self.params[r'$\alpha_B$'].value
loggamma_min = self.params[r'log$\gamma_{min}$'].value
logGamma = self.params[r'log$\Gamma$'].value
p = self.params[r'$p$'].value
tinj = self.params[r'$t_{inj}$'].value
logR0 = self.params[r'log$R_0$'].value
logQ0 = self.params[r'log$Q_0$'].value
self.set_cfg('Prompt.B_0', logB0)
self.set_cfg('Prompt.alpha', alphaB)
self.set_cfg('Prompt.prompt_gmin', loggamma_min)
self.set_cfg('Prompt.Gamma_init', logGamma)
self.set_cfg('Prompt.prompt_p', p)
self.set_cfg('Prompt.t_inj', tinj)
self.set_cfg('Prompt.R_0', logR0)
self.set_cfg('Prompt.Q_0', logQ0)
with open(self.cfg_dir, 'w') as f_obj:
toml.dump(self.mo_cfg, f_obj)
E = np.asarray(E)
E_scalar = E.ndim == 0
if E_scalar:
E = E[np.newaxis]
T = np.asarray(T)
T_scalar = T.ndim == 0
if T_scalar:
T = T[np.newaxis]
if E_scalar == T_scalar:
if E.shape != T.shape:
raise ValueError('E and T must have the same shape')
else:
scalar = E_scalar
else:
raise ValueError('E and T must both be scalars or both be arrays')
E_list = [str(Ei) for Ei in E]
T_list = [str(Ti) for Ti in T]
E_split_list = [E_list[i : i + 10000] for i in range(0, len(E_list), 10000)]
T_split_list = [T_list[i : i + 10000] for i in range(0, len(T_list), 10000)]
os.chdir(self.mo_prefix)
sed_split_list = []
for E_split, T_split in zip(E_split_list, T_split_list, strict=False):
cmd = [self.mo_dir, 'prompt.toml', '--energy', *E_split, '--time', *T_split]
process = sp.Popen(
cmd, shell=False, stdout=sp.PIPE, stderr=sp.PIPE, universal_newlines=True
)
(out, err) = process.communicate()
if out == '':
print('+++++ Error Message +++++')
print('out: ', out)
print('err: ', err)
print('cmd: ', cmd)
print('+++++ +++++++++++++ +++++')
raise RuntimeError('Katu model execution failed')
sed_split = np.array([float(val) for val in out.split()[3::4]])
sed_split_list.append(sed_split)
os.chdir(self.pwd)
sed = np.concatenate(sed_split_list) # in unit of erg/cm^2/s
phtspec = sed / (E * E * 1.60218e-9) # in unit of photons/s/cm^2/keV
return phtspec[0] if scalar else phtspec
[docs]
def set_cfg(self, key, value):
"""Set a nested KATU TOML key ``a.b.c`` in ``mo_cfg`` to ``value``."""
key_list = key.split('.')
n_key = len(key_list)
cfg_ = self.mo_cfg
for i, key_i in enumerate(key_list):
assert key_i in cfg_, f'no key ({key_i}) in cfg'
if i < n_key - 1:
cfg_ = cfg_[key_i]
else:
cfg_[key_i] = value