Source code for rubin_sim.maf.maf_contrib.filter_pair_t_gaps_metric

__all__ = ("FilterPairTGapsMetric",)

import numpy as np

from ..metrics.base_metric import BaseMetric


[docs] class FilterPairTGapsMetric(BaseMetric): """Figure of merit to measure the coverage the time gaps in same and different filter pairs; FoM is defined as sum of Nv / standard deviation after a clip; Parameters ---------- fltpairs : `list` [`str`], optional List of filter pair sets to search for. mag_lim : `list` [`float`] FiveSigmaDepth threshold each filter, default {'u':18, 'g':18, 'r':18, 'i':18, 'z':18, 'y':18} bins_same : `np.ndarray`, (N,) Bins to get histogram for same-filter pair. bins_diff : `np.ndarray`, (N,) Bins to get histogram for diff-filter pair. nv_clip : `int`, optional Number of visits of pairs to clip, std is calculated below nv_clip. allgaps : `bool``, optional All possible pairs if True, else consider only nearest Returns ------- result : `float` sum of fom for all filterpairs, """ def __init__( self, mjd_col="observationStartMJD", filter_col="filter", m5_col="fiveSigmaDepth", fltpairs=[ "uu", "ug", "ur", "ui", "uz", "uy", "gg", "gr", "gi", "gz", "gy", "rr", "ri", "rz", "ry", "ii", "iz", "iy", "zz", "zy", "yy", ], mag_lim={"u": 18, "g": 18, "r": 18, "i": 18, "z": 18, "y": 18}, bins_same=np.logspace(np.log10(5 / 60 / 60 / 24), np.log10(3650), 50), bins_diff=np.logspace(np.log10(5 / 60 / 60 / 24), np.log10(2), 50), nv_clip={ "uu": 30, "ug": 30, "ur": 30, "ui": 30, "uz": 30, "uy": 30, "gg": 69, "gr": 30, "gi": 30, "gz": 30, "gy": 30, "rr": 344, "ri": 30, "rz": 30, "ry": 30, "ii": 355, "iz": 30, "iy": 30, "zz": 282, "zy": 30, "yy": 288, }, allgaps=True, **kwargs, ): self.mjd_col = mjd_col self.filter_col = filter_col self.m5_col = m5_col self.fltpairs = fltpairs self.mag_lim = mag_lim self.bins_same = bins_same self.bins_diff = bins_diff self.allgaps = allgaps # number of visits to clip, default from 1/10th of baseline_v2.0 WFD self.nv_clip = nv_clip super().__init__(col=[self.mjd_col, self.filter_col, self.m5_col], **kwargs) def _get_d_t(self, data_slice, f0, f1): # select idx0 = (data_slice[self.filter_col] == f0) & (data_slice[self.m5_col] > self.mag_lim[f0]) idx1 = (data_slice[self.filter_col] == f1) & (data_slice[self.m5_col] > self.mag_lim[f1]) time_col0 = data_slice[self.mjd_col][idx0] time_col1 = data_slice[self.mjd_col][idx1] # time_col0 = time_col0.reshape((len(time_col0), 1)) # time_col1 = time_col1.reshape((len(time_col1), 1)) # calculate time gaps matrix # diffmat = np.subtract(time_col0, time_col1.T) if self.allgaps: # collect all time gaps if f0 == f1: time_col0 = time_col0.reshape((len(time_col0), 1)) time_col1 = time_col1.reshape((len(time_col1), 1)) diffmat = np.subtract(time_col0, time_col1.T) diffmat = np.abs(diffmat) # get only triangle part dt_tri = np.tril(diffmat, -1) d_t = dt_tri[dt_tri != 0] # flatten lower triangle else: # d_t = diffmat.flatten() dtmax = np.max(self.bins_diff) # time gaps window for measure color d_t = [] for time_col in time_col0: time_col_in_window = time_col1[ (time_col1 >= (time_col - dtmax)) & (time_col1 <= (time_col + dtmax)) ] d_t.append(np.abs(time_col_in_window - time_col)) d_t = np.concatenate(d_t) if len(d_t) > 0 else np.array(d_t) else: # collect only nearest if f0 == f1: # get diagonal ones nearest nonzero, offset=1 # d_t = np.diagonal(diffmat, offset=1) d_t = np.diff(time_col0) else: time_col0 = data_slice[self.mjd_col][idx0] time_col1 = data_slice[self.mjd_col][idx1] time_col0 = time_col0.reshape((len(time_col0), 1)) time_col1 = time_col1.reshape((len(time_col1), 1)) # calculate time gaps matrix diffmat = np.subtract(time_col0, time_col1.T) # get tgaps both left and right # keep only negative ones masked_ar = np.ma.masked_where( diffmat >= 0, diffmat, ) left_ar = np.max(masked_ar, axis=1) d_t_left = -left_ar.data[~left_ar.mask] # keep only positive ones masked_ar = np.ma.masked_where(diffmat <= 0, diffmat) right_ar = np.min(masked_ar, axis=1) d_t_right = right_ar.data[~right_ar.mask] d_t = np.concatenate([d_t_left.flatten(), d_t_right.flatten()]) return d_t
[docs] def run(self, data_slice, slice_point=None): # sort the data_slice in order of time. data_slice.sort(order=self.mjd_col) fom_dic = {} d_t_dic = {} for fltpair in self.fltpairs: d_t = self._get_d_t(data_slice, fltpair[0], fltpair[1]) # calculate FoM # cut out at average # std below # FOM = Nv / std if fltpair[0] == fltpair[1]: bins = self.bins_same else: bins = self.bins_diff d_t_dic[fltpair] = d_t hist, _ = np.histogram(d_t, bins=bins) if np.any(hist): hist_clip = np.clip(hist, a_min=0, a_max=self.nv_clip[fltpair]) fom = hist.sum() / hist_clip.std() else: fom = np.nan fom_dic[fltpair] = fom # print( fltpair, fom) return np.nansum(list(fom_dic.values()))