Source code for rubin_sim.maf.metrics.color_slope_metrics

__all__ = ["CheckColorSlope", "ColorSlopeMetric", "ColorSlope2NightMetric"]

import numpy as np
from rubin_scheduler.utils import int_binned_stat

from .base_metric import BaseMetric


[docs] class CheckColorSlope(object): """Check if the data has a color and a slope Parameters ---------- color_length : `float` The maximum length of time different filters be observed to still count as a color (hours). Default 1 hour. slope_length : `float` The length of time to demand observations in the same filter be greater than (hours). Default 3 hours. """ def __init__( self, color_length=1.0, slope_length=3.0, filter_col="filter", mjd_col="observationStartMJD" ): self.color_length = color_length / 24.0 self.slope_length = slope_length / 24.0 self.filter_col = filter_col self.mjd_col = mjd_col def __call__(self, data_slice): has_color = False has_slope = False if np.size(data_slice) < 3: return 0 filters = data_slice[self.filter_col] u_filters = np.unique(filters) for filtername in u_filters: in_filt = np.where(data_slice[self.filter_col] == filtername)[0] time_gap = ( data_slice[self.mjd_col][in_filt].max() - data_slice[self.mjd_col][in_filt][np.newaxis].min() ) if time_gap >= self.slope_length: has_slope = True break for filtername1 in u_filters: for filtername2 in u_filters: if filtername1 != filtername2: in_filt1 = np.where(filters == filtername1)[0] in_filt2 = np.where(filters == filtername2)[0] time_gaps = ( data_slice[self.mjd_col][in_filt1] - data_slice[self.mjd_col][in_filt2][np.newaxis].T ) time_gaps = time_gaps[np.where(time_gaps > 0)] if time_gaps.size > 0: if np.min(time_gaps[np.where(time_gaps > 0)]) <= self.color_length: has_color = True break if has_color & has_slope: return 1 else: return 0
[docs] class ColorSlopeMetric(BaseMetric): """How many times do we get a color and slope in a night A proxy metric for seeing how many times there would be the possibility of identifying and classifying a transient. Parameters ---------- mag : `dict` Dictionary with filternames as keys and minimum depth m5 magnitudes as values. If None, defaults to mag 20 in ugrizy. color_length : `float` The maximum length of time different filters be observed to still count as a color (hours). Default 1 hour. slope_length : `float` The length of time to demand observations in the same filter be greater than (hours). Default 3 hours. """ def __init__( self, mag=None, night_col="night", filter_col="filter", m5_col="fiveSigmaDepth", color_length=1.0, slope_length=3.0, time_col="observationStartMJD", units="#", metric_name="ColorSlope", **kwargs, ): cols = [filter_col, night_col, m5_col, time_col] if mag is None: mag = {"u": 20, "g": 20, "r": 20, "i": 20, "z": 20, "y": 20} self.night_col = night_col self.filter_col = filter_col self.m5_col = m5_col self.mag = mag self.time_col = time_col super().__init__(col=cols, units=units, metric_name=metric_name, **kwargs) self.sequence_checker = CheckColorSlope(color_length=color_length, slope_length=slope_length)
[docs] def run(self, data_slice, slice_point=None): result = 0 deep_enough = np.zeros(data_slice.size, dtype=bool) for filtername in np.unique(data_slice[self.filter_col]): in_filt = np.where(data_slice[self.filter_col] == filtername)[0] indx = np.where(data_slice[self.m5_col][in_filt] > self.mag[filtername])[0] deep_enough[in_filt[indx]] = True data = data_slice[deep_enough] if data.size > 0: _night, result = int_binned_stat(data[self.night_col], data, statistic=self.sequence_checker) return np.sum(result)
[docs] class ColorSlope2NightMetric(ColorSlopeMetric): """Like ColorSlopeMetric, but span over 2 nights Parameters ---------- mag : `dict` Dictionary with filternames as keys and minimum depth m5 magnitudes as values. If None, defaults to mag 20 in ugrizy. color_length : `float` The maximum length of time different filters be observed to still count as a color (hours). Default 1 hour. slope_length : `float` The length of time to demand observations in the same filter be greater than (hours). Default 15 hours. """ def __init__( self, mag=None, night_col="night", filter_col="filter", m5_col="fiveSigmaDepth", color_length=1.0, slope_length=15.0, time_col="observationStartMJD", units="#", metric_name="ColorSlope2Night", **kwargs, ): super().__init__( mag=mag, night_col=night_col, filter_col=filter_col, m5_col=m5_col, color_length=color_length, slope_length=slope_length, time_col=time_col, units=units, metric_name=metric_name, **kwargs, )
[docs] def run(self, data_slice, slice_point=None): result = 0 deep_enough = np.zeros(data_slice.size, dtype=bool) for filtername in np.unique(data_slice[self.filter_col]): in_filt = np.where(data_slice[self.filter_col] == filtername)[0] indx = np.where(data_slice[self.m5_col][in_filt] > self.mag[filtername])[0] deep_enough[in_filt[indx]] = True data = data_slice[deep_enough] if data.size > 0: # Send in nights as pairs, (0,1) (2,3), (4,5), etc night_id = np.floor(data[self.night_col] / 2).astype(int) _night, result1 = int_binned_stat(night_id, data, statistic=self.sequence_checker) # Now to do pairs (1,2), (3,4) night_id = np.ceil(data[self.night_col] / 2).astype(int) _night, result2 = int_binned_stat(night_id, data, statistic=self.sequence_checker) result = np.sum(result1) + np.sum(result2) return result