Source code for rubin_sim.maf.metrics.cadence_metrics
__all__ = (
"TemplateExistsMetric",
"UniformityMetric",
"GeneralUniformityMetric",
"RapidRevisitUniformityMetric",
"RapidRevisitMetric",
"NRevisitsMetric",
"IntraNightGapsMetric",
"InterNightGapsMetric",
"VisitGapMetric",
)
import numpy as np
from .base_metric import BaseMetric
class FSMetric(BaseMetric):
"""Calculate the fS value (Nvisit-weighted delta(M5-M5srd))."""
def __init__(self, filter_col="filter", metric_name="fS", **kwargs):
self.filter_col = filter_col
cols = [self.filter_col]
super().__init__(cols=cols, metric_name=metric_name, units="fS", **kwargs)
def run(self, data_slice, slice_point=None):
# We could import this from the m5_flat_sed values,
# but it makes sense to calculate the m5
# directly from the throughputs. This is easy enough to do and
# will allow variation of
# the throughput curves and readnoise and visit length, etc.
pass
[docs]
class TemplateExistsMetric(BaseMetric):
"""Calculate the fraction of images with a previous template
image of desired quality."""
def __init__(
self,
seeing_col="seeingFwhmGeom",
observation_start_mjd_col="observationStartMJD",
metric_name="TemplateExistsMetric",
**kwargs,
):
cols = [seeing_col, observation_start_mjd_col]
super(TemplateExistsMetric, self).__init__(
col=cols, metric_name=metric_name, units="fraction", **kwargs
)
self.seeing_col = seeing_col
self.observation_start_mjd_col = observation_start_mjd_col
[docs]
def run(self, data_slice, slice_point=None):
# Check that data is sorted in observationStartMJD order
data_slice.sort(order=self.observation_start_mjd_col)
# Find the minimum seeing up to a given time
seeing_mins = np.minimum.accumulate(data_slice[self.seeing_col])
# Find the difference between the seeing and the minimum seeing
# at the previous visit
seeing_diff = data_slice[self.seeing_col] - np.roll(seeing_mins, 1)
# First image never has a template; check how many others do
good = np.where(seeing_diff[1:] >= 0.0)[0]
frac = (good.size) / float(data_slice[self.seeing_col].size)
return frac
[docs]
class UniformityMetric(BaseMetric):
"""Calculate how uniformly the observations are spaced in time.
This is based on how a KS-test works:
look at the cumulative distribution of observation dates,
and compare to a perfectly uniform cumulative distribution.
Perfectly uniform observations = 0, perfectly non-uniform = 1.
Parameters
----------
mjd_col : `str`, optional
The column containing time for each observation.
Default "observationStartMJD".
survey_length : `float`, optional
The overall duration of the survey. Default 10.
"""
def __init__(self, mjd_col="observationStartMJD", units="", survey_length=10.0, **kwargs):
"""survey_length = time span of survey (years)"""
self.mjd_col = mjd_col
super(UniformityMetric, self).__init__(col=self.mjd_col, units=units, **kwargs)
self.survey_length = survey_length
[docs]
def run(self, data_slice, slice_point=None):
# If only one observation, there is no uniformity
if data_slice[self.mjd_col].size == 1:
return 1
# Scale dates to lie between 0 and 1,
# where 0 is the first observation date and 1 is surveyLength
dates = (data_slice[self.mjd_col] - data_slice[self.mjd_col].min()) / (self.survey_length * 365.25)
dates.sort() # Just to be sure
n_cum = np.arange(1, dates.size + 1) / float(dates.size)
d_max = np.max(np.abs(n_cum - dates - dates[1]))
return d_max
[docs]
class GeneralUniformityMetric(BaseMetric):
"""Calculate how uniformly any values are distributed.
This is based on how a KS-test works:
look at the cumulative distribution of data,
and compare to a perfectly uniform cumulative distribution.
Perfectly uniform observations = 0, perfectly non-uniform = 1.
To be "perfectly uniform" here, the endpoints need to be included.
Parameters
----------
col : `str`, optional
The column of data to use for the metric.
The default is "observationStartMJD" as this is most
typically used with time.
min_value : `float`, optional
The minimum value expected for the data.
Default None will calculate use the minimum value in this dataslice
(which may not cover the full range).
max_value : `float`, optional
The maximum value expected for the data.
Default None will calculate use the maximum value in this dataslice
(which may not cover the full range).
"""
def __init__(self, col="observationStartMJD", units="", min_value=None, max_value=None, **kwargs):
self.col = col
super().__init__(col=self.col, units=units, **kwargs)
self.min_value = min_value
self.max_value = max_value
[docs]
def run(self, data_slice, slice_point=None):
# If only one observation, there is no uniformity
if data_slice[self.col].size == 1:
return 1
# Scale values to lie between 0 and 1,
# where 0 is the min_value and 1 is max_value
if self.min_value is None:
min_value = data_slice[self.col].min()
else:
min_value = self.min_value
if self.max_value is None:
max_value = data_slice[self.col].max()
else:
max_value = self.max_value
scaled_values = (data_slice[self.col] - min_value) / max_value
scaled_values.sort() # Just to be sure
n_cum = np.arange(0, scaled_values.size) / float(scaled_values.size - 1)
d_max = np.max(np.abs(n_cum - scaled_values))
return d_max
[docs]
class RapidRevisitUniformityMetric(BaseMetric):
"""Calculate uniformity of time between consecutive visits on
short timescales (for RAV1).
Uses the same 'uniformity' calculation as the UniformityMetric,
based on the KS-test.
A value of 0 is perfectly uniform; a value of 1 is purely non-uniform.
Parameters
----------
mjd_col : `str`, optional
The column containing the 'time' value. Default observationStartMJD.
min_nvisits : `int`, optional
The minimum number of visits required within the
time interval (d_tmin to d_tmax).
Default 100.
d_tmin : `float`, optional
The minimum dTime to consider (in days). Default 40 seconds.
d_tmax : `float`, optional
The maximum dTime to consider (in days). Default 30 minutes.
"""
def __init__(
self,
mjd_col="observationStartMJD",
min_nvisits=100,
d_tmin=40.0 / 60.0 / 60.0 / 24.0,
d_tmax=30.0 / 60.0 / 24.0,
metric_name="RapidRevisitUniformity",
**kwargs,
):
self.mjd_col = mjd_col
self.min_nvisits = min_nvisits
self.d_tmin = d_tmin
self.d_tmax = d_tmax
super().__init__(col=self.mjd_col, metric_name=metric_name, **kwargs)
# Update min_nvisits, as 0 visits will crash algorithm
# and 1 is nonuniform by definition.
if self.min_nvisits <= 1:
self.min_nvisits = 2
[docs]
def run(self, data_slice, slice_point=None):
# Calculate consecutive visit time intervals
dtimes = np.diff(np.sort(data_slice[self.mjd_col]))
# Identify dtimes within interval from dTmin/dTmax.
good = np.where((dtimes >= self.d_tmin) & (dtimes <= self.d_tmax))[0]
# If there are not enough visits in this time range, return bad value.
if good.size < self.min_nvisits:
return self.badval
# Throw out dtimes outside desired range, and sort, then scale to 0-1.
dtimes = np.sort(dtimes[good])
dtimes = (dtimes - dtimes.min()) / float(self.d_tmax - self.d_tmin)
# Set up a uniform distribution between 0-1 (to match dtimes).
uniform_dtimes = np.arange(1, dtimes.size + 1, 1) / float(dtimes.size)
# Look at the differences between our times and the uniform times.
dmax = np.max(np.abs(uniform_dtimes - dtimes - dtimes[1]))
return dmax
[docs]
class RapidRevisitMetric(BaseMetric):
def __init__(
self,
mjd_col="observationStartMJD",
metric_name="RapidRevisit",
d_tmin=40.0 / 60.0 / 60.0 / 24.0,
d_tpairs=20.0 / 60.0 / 24.0,
d_tmax=30.0 / 60.0 / 24.0,
min_n1=28,
min_n2=82,
**kwargs,
):
self.mjd_col = mjd_col
self.d_tmin = d_tmin
self.d_tpairs = d_tpairs
self.d_tmax = d_tmax
self.min_n1 = min_n1
self.min_n2 = min_n2
super().__init__(col=self.mjd_col, metric_name=metric_name, **kwargs)
[docs]
def run(self, data_slice, slice_point=None):
dtimes = np.diff(np.sort(data_slice[self.mjd_col]))
n1 = len(np.where((dtimes >= self.d_tmin) & (dtimes <= self.d_tpairs))[0])
n2 = len(np.where((dtimes >= self.d_tmin) & (dtimes <= self.d_tmax))[0])
if (n1 >= self.min_n1) and (n2 >= self.min_n2):
val = 1
else:
val = 0
return val
[docs]
class NRevisitsMetric(BaseMetric):
"""Calculate the number of consecutive visits with
time differences less than d_t.
Parameters
----------
d_t : `float`, optional
The time interval to consider (in minutes). Default 30.
normed : `bool`, optional
Flag to indicate whether to return the total number of
consecutive visits with time differences less than d_t (False),
or the fraction of overall visits (True).
Note that we would expect (if all visits occur in pairs within d_t)
this fraction would be 0.5!
"""
def __init__(self, mjd_col="observationStartMJD", d_t=30.0, normed=False, metric_name=None, **kwargs):
units = ""
if metric_name is None:
if normed:
metric_name = "Fraction of revisits faster than %.1f minutes" % (d_t)
else:
metric_name = "Number of revisits faster than %.1f minutes" % (d_t)
units = "#"
self.mjd_col = mjd_col
self.d_t = d_t / 60.0 / 24.0 # convert to days
self.normed = normed
super(NRevisitsMetric, self).__init__(
col=self.mjd_col, units=units, metric_name=metric_name, **kwargs
)
[docs]
def run(self, data_slice, slice_point=None):
dtimes = np.diff(np.sort(data_slice[self.mjd_col]))
n_fast_revisits = np.size(np.where(dtimes <= self.d_t)[0])
if self.normed:
n_fast_revisits = n_fast_revisits / float(np.size(data_slice[self.mjd_col]))
return n_fast_revisits
[docs]
class IntraNightGapsMetric(BaseMetric):
"""
Calculate the (reduce_func) of the gap between consecutive
observations within a night, in hours.
Parameters
----------
reduce_func : function, optional
Function that can operate on array-like structures.
Typically numpy function.
Default np.median.
"""
def __init__(
self,
mjd_col="observationStartMJD",
night_col="night",
reduce_func=np.median,
metric_name="Median Intra-Night Gap",
**kwargs,
):
units = "hours"
self.mjd_col = mjd_col
self.night_col = night_col
self.reduce_func = reduce_func
super(IntraNightGapsMetric, self).__init__(
col=[self.mjd_col, self.night_col], units=units, metric_name=metric_name, **kwargs
)
[docs]
def run(self, data_slice, slice_point=None):
data_slice.sort(order=self.mjd_col)
dt = np.diff(data_slice[self.mjd_col])
dn = np.diff(data_slice[self.night_col])
good = np.where(dn == 0)
if np.size(good[0]) == 0:
result = self.badval
else:
result = self.reduce_func(dt[good]) * 24
return result
[docs]
class InterNightGapsMetric(BaseMetric):
"""Calculate the (reduce_func) of the gap between consecutive
observations in different nights, in days.
Parameters
----------
reduce_func : function, optional
Function that can operate on array-like structures.
Typically numpy function.
Default np.median.
"""
def __init__(
self,
mjd_col="observationStartMJD",
night_col="night",
reduce_func=np.median,
metric_name="Median Inter-Night Gap",
**kwargs,
):
units = "days"
self.mjd_col = mjd_col
self.night_col = night_col
self.reduce_func = reduce_func
super().__init__(col=[self.mjd_col, self.night_col], units=units, metric_name=metric_name, **kwargs)
[docs]
def run(self, data_slice, slice_point=None):
data_slice.sort(order=self.mjd_col)
unights = np.unique(data_slice[self.night_col])
if np.size(unights) < 2:
result = self.badval
else:
# Find the first and last observation of each night
first_of_night = np.searchsorted(data_slice[self.night_col], unights)
last_of_night = np.searchsorted(data_slice[self.night_col], unights, side="right") - 1
diff = data_slice[self.mjd_col][first_of_night[1:]] - data_slice[self.mjd_col][last_of_night[:-1]]
result = self.reduce_func(diff)
return result
[docs]
class VisitGapMetric(BaseMetric):
"""Calculate the (reduce_func) of the gap between any
consecutive observations, in hours, regardless of night boundaries.
Different from inter-night and intra-night gaps,
because this is really just counting all of the times between consecutive
observations (not time between nights or time within a night).
Parameters
----------
reduce_func : function, optional
Function that can operate on array-like structures.
Typically numpy function.
Default np.median.
"""
def __init__(
self,
mjd_col="observationStartMJD",
night_col="night",
reduce_func=np.median,
metric_name="VisitGap",
**kwargs,
):
units = "hours"
self.mjd_col = mjd_col
self.night_col = night_col
self.reduce_func = reduce_func
super().__init__(col=[self.mjd_col, self.night_col], units=units, metric_name=metric_name, **kwargs)
[docs]
def run(self, data_slice, slice_point=None):
data_slice.sort(order=self.mjd_col)
diff = np.diff(data_slice[self.mjd_col])
result = self.reduce_func(diff) * 24.0
return result