__all__ = ("TransientMetric",)
import numpy as np
from .base_metric import BaseMetric
[docs]
class TransientMetric(BaseMetric):
"""
Calculate what fraction of the transients would be detected. Best paired
with a spatial slicer.
We are assuming simple light curves with no color evolution.
Parameters
----------
trans_duration : float, optional
How long the transient lasts (days). Default 10.
peak_time : float, optional
How long it takes to reach the peak magnitude (days). Default 5.
rise_slope : float, optional
Slope of the light curve before peak time (mags/day).
This should be negative since mags are backwards (magnitudes decrease
towards brighter fluxes).
Default 0.
decline_slope : float, optional
Slope of the light curve after peak time (mags/day).
This should be positive since mags are backwards. Default 0.
uPeak : float, optional
Peak magnitude in u band. Default 20.
gPeak : float, optional
Peak magnitude in g band. Default 20.
rPeak : float, optional
Peak magnitude in r band. Default 20.
iPeak : float, optional
Peak magnitude in i band. Default 20.
zPeak : float, optional
Peak magnitude in z band. Default 20.
yPeak : float, optional
Peak magnitude in y band. Default 20.
survey_duration : float, optional
Length of survey (years).
Default 10.
survey_start : float, optional
MJD for the survey start date.
Default None (uses the time of the first observation).
detect_m5_plus : float, optional
An observation will be used if the light curve magnitude is brighter
than m5+detect_m5_plus.
Default 0.
n_pre_peak : int, optional
Number of observations (in any filter(s)) to demand before peak_time,
before saying a transient has been detected.
Default 0.
n_per_lc : int, optional
Number of sections of the light curve that must be sampled above the
detect_m5_plus theshold
(in a single filter) for the light curve to be counted.
For example, setting n_per_lc = 2 means a light curve is only
considered detected if there is at least 1 observation in the first
half of the LC, and at least one in the second half of the LC.
n_per_lc = 4 means each quarter of the light curve must be detected to
count.
Default 1.
n_filters : int, optional
Number of filters that need to be observed for an object to be counted
as detected.
Default 1.
n_phase_check : int, optional
Sets the number of phases that should be checked.
One can imagine pathological cadences where many objects pass the
detection criteria,
but would not if the observations were offset by a phase-shift.
Default 1.
count_method : {'full' 'partialLC'}, defaults to 'full'
Sets the method of counting max number of transients. if 'full', the
only full light curves that fit the survey duration are counted. If
'partialLC', then the max number of possible transients is taken to be
the integer floor
"""
def __init__(
self,
metric_name="TransientDetectMetric",
mjd_col="observationStartMJD",
m5_col="fiveSigmaDepth",
filter_col="filter",
trans_duration=10.0,
peak_time=5.0,
rise_slope=0.0,
decline_slope=0.0,
survey_duration=10.0,
survey_start=None,
detect_m5_plus=0.0,
u_peak=20,
g_peak=20,
r_peak=20,
i_peak=20,
z_peak=20,
y_peak=20,
n_pre_peak=0,
n_per_lc=1,
n_filters=1,
n_phase_check=1,
count_method="full",
**kwargs,
):
self.mjd_col = mjd_col
self.m5_col = m5_col
self.filter_col = filter_col
super(TransientMetric, self).__init__(
col=[self.mjd_col, self.m5_col, self.filter_col],
units="Fraction Detected",
metric_name=metric_name,
**kwargs,
)
self.peaks = {
"u": u_peak,
"g": g_peak,
"r": r_peak,
"i": i_peak,
"z": z_peak,
"y": y_peak,
}
self.trans_duration = trans_duration
self.peak_time = peak_time
self.rise_slope = rise_slope
self.decline_slope = decline_slope
self.survey_duration = survey_duration
self.survey_start = survey_start
self.detect_m5_plus = detect_m5_plus
self.n_pre_peak = n_pre_peak
self.n_per_lc = n_per_lc
self.n_filters = n_filters
self.n_phase_check = n_phase_check
self.count_method = count_method
[docs]
def light_curve(self, time, filters):
"""
Calculate the magnitude of the object at each time, in each filter.
Parameters
----------
time : numpy.ndarray
The times of the observations.
filters : numpy.ndarray
The filters of the observations.
Returns
-------
numpy.ndarray
The magnitudes of the object at each time, in each filter.
"""
lc_mags = np.zeros(time.size, dtype=float)
rise = np.where(time <= self.peak_time)
lc_mags[rise] += self.rise_slope * time[rise] - self.rise_slope * self.peak_time
decline = np.where(time > self.peak_time)
lc_mags[decline] += self.decline_slope * (time[decline] - self.peak_time)
for key in self.peaks:
f_match = np.where(filters == key)
lc_mags[f_match] += self.peaks[key]
return lc_mags
[docs]
def run(self, data_slice, slice_point=None):
"""
Calculate the detectability of a transient with the specified
lightcurve.
Parameters
----------
data_slice : numpy.array
Numpy structured array containing the data related to the visits
provided by the slicer.
slice_point : dict, optional
Dictionary containing information about the slice_point currently
active in the slicer.
Returns
-------
float
The total number of transients that could be detected.
"""
# Total number of transients that could go off back-to-back
if self.count_method == "partialLC":
_n_trans_max = np.ceil(self.survey_duration / (self.trans_duration / 365.25))
else:
_n_trans_max = np.floor(self.survey_duration / (self.trans_duration / 365.25))
tshifts = np.arange(self.n_phase_check) * self.trans_duration / float(self.n_phase_check)
n_detected = 0
n_trans_max = 0
for tshift in tshifts:
# Compute the total number of back-to-back transients are possible
# to detect given the survey duration and the transient duration.
n_trans_max += _n_trans_max
if tshift != 0:
n_trans_max -= 1
if self.survey_start is None:
survey_start = data_slice[self.mjd_col].min()
time = (data_slice[self.mjd_col] - survey_start + tshift) % self.trans_duration
# Which lightcurve does each point belong to
lc_number = np.floor((data_slice[self.mjd_col] - survey_start) / self.trans_duration)
lc_mags = self.light_curve(time, data_slice[self.filter_col])
# How many criteria needs to be passed
detect_thresh = 0
# Flag points that are above the SNR limit
detected = np.zeros(data_slice.size, dtype=int)
detected[np.where(lc_mags < data_slice[self.m5_col] + self.detect_m5_plus)] += 1
detect_thresh += 1
# If we demand points on the rise
if self.n_pre_peak > 0:
detect_thresh += 1
ord = np.argsort(data_slice[self.mjd_col])
data_slice = data_slice[ord]
detected = detected[ord]
lc_number = lc_number[ord]
time = time[ord]
ulc_number = np.unique(lc_number)
left = np.searchsorted(lc_number, ulc_number)
right = np.searchsorted(lc_number, ulc_number, side="right")
# Note here I'm using np.searchsorted to basically do a
# 'group by' might be clearer to use
# scipy.ndimage.measurements.find_objects or pandas, but this
# numpy function is known for being efficient.
for le, ri in zip(left, right):
# Number of points where there are a detection
good = np.where(time[le:ri] < self.peak_time)
nd = np.sum(detected[le:ri][good])
if nd >= self.n_pre_peak:
detected[le:ri] += 1
# Check if we need multiple points per light curve
# or multiple filters
if (self.n_per_lc > 1) | (self.n_filters > 1):
# make sure things are sorted by time
ord = np.argsort(data_slice[self.mjd_col])
data_slice = data_slice[ord]
detected = detected[ord]
lc_number = lc_number[ord]
time = time[ord]
ulc_number = np.unique(lc_number)
left = np.searchsorted(lc_number, ulc_number)
right = np.searchsorted(lc_number, ulc_number, side="right")
detect_thresh += self.n_filters
for le, ri in zip(left, right):
points = np.where(detected[le:ri] > 0)
ufilters = np.unique(data_slice[self.filter_col][le:ri][points])
phase_sections = np.floor(time[le:ri][points] / self.trans_duration * self.n_per_lc)
for filt_name in ufilters:
good = np.where(data_slice[self.filter_col][le:ri][points] == filt_name)
if np.size(np.unique(phase_sections[good])) >= self.n_per_lc:
detected[le:ri] += 1
# Find the unique number of light curves that passed the required
# number of conditions
n_detected += np.size(np.unique(lc_number[np.where(detected >= detect_thresh)]))
# Rather than keeping a single "detected" variable, maybe make a mask
# for each criteria, then reduce functions like: reduce_singleDetect,
# reduce_NDetect, reduce_PerLC, reduce_perFilter.
# The way I'm running now it would speed things up.
return float(n_detected) / n_trans_max