Source code for rubin_sim.maf.metrics.optimal_m5_metric
__all__ = ("OptimalM5Metric",)
import warnings
import numpy as np
from .base_metric import BaseMetric
from .simple_metrics import Coaddm5Metric
[docs]
class OptimalM5Metric(BaseMetric):
"""Compare the co-added depth of the survey to one where
all the observations were taken on the meridian.
Parameters
----------
m5_col : str ('fiveSigmaDepth')
Column name that contains the five-sigma limiting depth of
each observation
opt_m5_col : str ('m5Optimal')
The column name of the five-sigma-limiting depth if the
observation had been taken on the meridian.
normalize : bool (False)
If False, metric returns how many more observations would need
to be taken to reach the optimal depth. If True, the number
is normalized by the total number of observations already taken
at that position.
mag_diff : bool (False)
If True, metric returns the magnitude difference between the
achieved coadded depth and the optimal coadded depth.
Returns
--------
numpy.array
If mag_diff is True, returns the magnitude difference between the
optimal and actual coadded depth. If normalize is False
(default), the result is the number of additional observations
(taken at the median depth) the survey needs to catch up to
optimal. If normalize is True, the result is divided by the
number of observations already taken. So if a 10-year survey
returns 20%, it would need to run for 12 years to reach the same
depth as a 10-year meridian survey.
"""
def __init__(
self,
m5_col="fiveSigmaDepth",
opt_m5_col="m5Optimal",
filter_col="filter",
mag_diff=False,
normalize=False,
**kwargs,
):
if normalize:
self.units = "% behind"
else:
self.units = "N visits behind"
if mag_diff:
self.units = "mags"
super(OptimalM5Metric, self).__init__(
col=[m5_col, opt_m5_col, filter_col], units=self.units, **kwargs
)
self.m5_col = m5_col
self.opt_m5_col = opt_m5_col
self.normalize = normalize
self.filter_col = filter_col
self.mag_diff = mag_diff
self.coadd_regular = Coaddm5Metric(m5_col=m5_col)
self.coadd_optimal = Coaddm5Metric(m5_col=opt_m5_col)
[docs]
def run(self, data_slice, slice_point=None):
filters = np.unique(data_slice[self.filter_col])
if np.size(filters) > 1:
warnings.warn(
"OptimalM5Metric does not make sense mixing filters. Currently using filters " + str(filters)
)
regular_depth = self.coadd_regular.run(data_slice)
optimal_depth = self.coadd_optimal.run(data_slice)
if self.mag_diff:
return optimal_depth - regular_depth
median_single = np.median(data_slice[self.m5_col])
# Number of additional median observations to get as deep as optimal
result = (10.0 ** (0.8 * optimal_depth) - 10.0 ** (0.8 * regular_depth)) / (
10.0 ** (0.8 * median_single)
)
if self.normalize:
result = result / np.size(data_slice) * 100.0
return result