Source code for rubin_sim.maf.metrics.visit_groups_metric

__all__ = ("VisitGroupsMetric", "PairFractionMetric")


import numpy as np

from .base_metric import BaseMetric

# Example of more complex metric
# Takes multiple columns of data (although 'night' could be calculable from 'expmjd')
# Returns variable length array of data
# Uses multiple reduce functions


[docs] class PairFractionMetric(BaseMetric): """What fraction of observations are part of a pair. Note, an observation can be a member of more than one "pair". For example, t=[0, 5, 30], all observations would be considered part of a pair because they all have an observation within the given window to pair with (the observation at t=30 pairs twice). Parameters ---------- min_gap : float, optional Minimum time to consider something part of a pair (minutes). Default 15. max_gap : float, optional Maximum time to consider something part of a pair (minutes). Default 90. """ def __init__( self, mjd_col="observationStartMJD", metric_name="PairFraction", min_gap=15.0, max_gap=90.0, **kwargs ): self.mjd_col = mjd_col self.min_gap = min_gap / 60.0 / 24.0 self.max_gap = max_gap / 60.0 / 24.0 units = "" super(PairFractionMetric, self).__init__( col=[mjd_col], metric_name=metric_name, units=units, **kwargs )
[docs] def run(self, data_slice, slice_point=None): nobs = np.size(data_slice[self.mjd_col]) times = np.sort(data_slice[self.mjd_col]) # Check which ones have a forard match t_plus = times + self.max_gap t_minus = times + self.min_gap ind1 = np.searchsorted(times, t_plus) ind2 = np.searchsorted(times, t_minus) # If ind1 and ind2 are the same, there is no pairable image for that exposure diff1 = ind1 - ind2 # Check which have a back match t_plus = times - self.max_gap t_minus = times - self.min_gap ind1 = np.searchsorted(times, t_plus) ind2 = np.searchsorted(times, t_minus) diff2 = ind1 - ind2 # The exposure has a pair ahead or behind is_paired = np.where((diff1 != 0) | (diff2 != 0))[0] result = np.size(is_paired) / float(nobs) return result
[docs] class VisitGroupsMetric(BaseMetric): """Count the number of visits per night within delta_t_min and delta_t_max.""" def __init__( self, time_col="observationStartMJD", nights_col="night", metric_name="VisitGroups", delta_t_min=15.0 / 60.0 / 24.0, delta_t_max=90.0 / 60.0 / 24.0, min_n_visits=2, window=30, min_n_nights=3, **kwargs, ): """ Instantiate metric. 'time_col' = column with the time of the visit (default expmjd), 'nights_col' = column with the night of the visit (default night), 'delta_t_min' = minimum time of window: units are days (default 15 min), 'delta_t_max' = maximum time of window: units are days (default 90 min), 'min_n_visits' = the minimum number of visits within a night (with spacing between delta_t_min/max from any other visit) required, 'window' = the number of nights to consider within a window (for reduce methods), 'min_n_nights' = the minimum required number of nights within window to make a full 'group'. """ self.times = time_col self.nights = nights_col eps = 1e-10 self.delta_tmin = float(delta_t_min) - eps self.delta_tmax = float(delta_t_max) self.min_n_visits = int(min_n_visits) self.window = int(window) self.min_n_nights = int(min_n_nights) super(VisitGroupsMetric, self).__init__( col=[self.times, self.nights], metric_name=metric_name, **kwargs ) self.reduce_order = { "Median": 0, "NNightsWithNVisits": 1, "NVisitsInWindow": 2, "NNightsInWindow": 3, "NLunations": 4, "MaxSeqLunations": 5, } self.comment = "Evaluation of the number of visits within a night, with separations between " self.comment += "tMin %.1f and tMax %.1f minutes." % ( self.delta_tmin * 24.0 * 60.0, self.delta_tmax * 24.0 * 60.0, ) self.comment += "Groups of visits use a minimum number of visits per night of %d, " % ( self.min_n_visits ) self.comment += "and minimum number of nights of %d." % (self.min_n_nights) self.comment += "Two visits within this interval would count as 2. " self.comment += "Visits closer than tMin, paired with visits that do fall within tMin/tMax, " self.comment += "count as half visits. <br>" self.comment += "VisitsGroups_Median calculates the median number of visits between tMin/tMax for " self.comment += "all nights. <br>" self.comment += "VisitGroups_NNightsWithNNights calculates the number of nights that have at " self.comment += "least %d visits. <br>" % (self.min_n_visits) self.comment += "VisitGroups_NVisitsInWindow calculates the max number of visits within a window of " self.comment += "%d days. <br>" % (self.window) self.comment += "VisitGroups_NNightsInWindow calculates the max number of nights that have more " self.comment += "than %d visits within %d days. <br>" % ( self.min_n_visits, self.window, ) self.comment += "VisitGroups_NLunations calculates the number of lunations (30 days) that have " self.comment += "at least one group of more than %d nights with more than %d visits, within " % ( self.min_n_nights, self.min_n_visits, ) self.comment += "%d days. <br>" % (self.window) self.comment += "VisitGroups_MaxSeqLunations calculates the maximum sequential lunations that have " self.comment += 'at least one "group". <br>'
[docs] def run(self, data_slice, slice_point=None): """ Return a dictionary of: the number of visits within a night (within delta tmin/tmax of another visit), and the nights with visits > minNVisits. Count two visits which are within tmin of each other, but which have another visit within tmin/tmax interval, as one and a half (instead of two). So for example: 4 visits, where 1, 2, 3 were all within deltaTMax of each other, and 4 was later but within deltaTmax of visit 3 -- would give you 4 visits. If visit 1 and 2 were closer together than deltaTmin, the two would be counted as 1.5 visits together (if only 1 and 2 existed, then there would be 0 visits as none would be within the qualifying time interval). """ uniquenights = np.unique(data_slice[self.nights]) nights = [] visit_num = [] # Find the nights with visits within deltaTmin/max of one another and count the number of visits for n in uniquenights: condition = data_slice[self.nights] == n times = np.sort(data_slice[self.times][condition]) nvisits = 0 ntooclose = 0 # Calculate difference between each visit and time of previous visit (tnext- tnow) timediff = np.diff(times) timegood = np.where( (timediff <= self.delta_tmax) & (timediff >= self.delta_tmin), True, False, ) timetooclose = np.where(timediff < self.delta_tmin, True, False) if len(timegood) > 1: # Count visits for all but last index in timediff for tg1, ttc1, tg2, ttc2 in zip( timegood[:-1], timetooclose[:-1], timegood[1:], timetooclose[1:] ): if tg1: nvisits += 1 if not tg2: nvisits += 1 if ttc1: ntooclose += 1 if not tg2 and not ttc2: ntooclose += 1 # Take care of last timediff if timegood[-1]: nvisits += 2 # close out visit sequence if timetooclose[-1]: ntooclose += 1 if not timegood[-2] and not timetooclose[-2]: ntooclose += 1 else: if timegood.size > 0: nvisits += 2 # Count up all visits for night. if nvisits > 0: nvisits = nvisits + ntooclose / 2.0 visit_num.append(nvisits) nights.append(n) # Convert to numpy arrays. visit_num = np.array(visit_num) nights = np.array(nights) metricval = {"visits": visit_num, "nights": nights} if len(visit_num) == 0: return self.badval return metricval
[docs] def reduce_median(self, metricval): """Reduce to median number of visits per night.""" return np.median(metricval["visits"])
[docs] def reduce_n_nights_with_n_visits(self, metricval): """Reduce to total number of nights with more than 'minNVisits' visits.""" condition = metricval["visits"] >= self.min_n_visits return len(metricval["visits"][condition])
def _in_window(self, visits, nights, night, window, min_n_visits): condition = (nights >= night) & (nights < night + window) condition2 = visits[condition] >= min_n_visits return visits[condition][condition2], nights[condition][condition2]
[docs] def reduce_n_visits_in_window(self, metricval): """Reduce to max number of total visits on all nights with more than minNVisits, within any 'window' (default=30 nights).""" maxnvisits = 0 for n in metricval["nights"]: vw, nw = self._in_window( metricval["visits"], metricval["nights"], n, self.window, self.min_n_visits, ) maxnvisits = max((vw.sum(), maxnvisits)) return maxnvisits
[docs] def reduce_n_nights_in_window(self, metricval): """Reduce to max number of nights with more than minNVisits, within 'window' over all windows.""" maxnights = 0 for n in metricval["nights"]: vw, nw = self._in_window( metricval["visits"], metricval["nights"], n, self.window, self.min_n_visits, ) maxnights = max(len(nw), maxnights) return maxnights
def _in_lunation(self, visits, nights, lunation_start, lunation_length): condition = (nights >= lunation_start) & (nights < lunation_start + lunation_length) return visits[condition], nights[condition]
[docs] def reduce_n_lunations(self, metricval): """Reduce to number of lunations (unique 30 day windows) that contain at least one 'group': a set of more than minNVisits per night, with more than minNNights of visits within 'window' time period. """ lunation_length = 30 lunations = np.arange( metricval["nights"][0], metricval["nights"][-1] + lunation_length / 2.0, lunation_length, ) n_lunations = 0 for l in lunations: # Find visits within lunation. vl, nl = self._in_lunation(metricval["visits"], metricval["nights"], l, lunation_length) for n in nl: # Find visits which are in groups within the lunation. vw, nw = self._in_window(vl, nl, n, self.window, self.min_n_visits) if len(nw) >= self.min_n_nights: n_lunations += 1 break return n_lunations
[docs] def reduce_max_seq_lunations(self, metricval): """Count the max number of sequential lunations (unique 30 day windows) that contain at least one 'group': a set of more than minNVisits per night, with more than minNNights of visits within 'window' time period. """ lunation_length = 30 lunations = np.arange( metricval["nights"][0], metricval["nights"][-1] + lunation_length / 2.0, lunation_length, ) max_sequence = 0 cur_sequence = 0 for l in lunations: # Find visits within lunation. vl, nl = self._in_lunation(metricval["visits"], metricval["nights"], l, lunation_length) # If no visits this lunation: if len(vl) == 0: max_sequence = max(max_sequence, cur_sequence) cur_sequence = 0 # Else, look to see if groups can be made from the visits. for n in nl: # Find visits which are in groups within the lunation. vw, nw = self._in_window(vl, nl, n, self.window, self.min_n_visits) # If there was a group within this lunation: if len(nw) >= self.min_n_nights: cur_sequence += 1 break # Otherwise we're not in a sequence (anymore, or still). else: max_sequence = max(max_sequence, cur_sequence) cur_sequence = 0 # Pick up last sequence if were in a sequence at last lunation. max_sequence = max(max_sequence, cur_sequence) return max_sequence