Source code for rubin_sim.maf.slicers.time_interval_slicers

"""MAF slicers to slice into time intervals based on MJD

Primarily intended for hourglass plots.
"""

# pylint: disable=too-many-arguments
__all__ = (
    "TimeIntervalSlicer",
    "BlockIntervalSlicer",
    "VisitIntervalSlicer",
    "SlicerNotSetupError",
)

from collections import defaultdict

# imports
from functools import wraps

import numpy as np
import pandas as pd

from .base_slicer import BaseSlicer


[docs] class SlicerNotSetupError(Exception): """Thrown when a slicer is not setup for the method called."""
# interface functions # classes
[docs] class TimeIntervalSlicer(BaseSlicer): """Base for all time interval slicers. Slices in constant time intervals. Parameters ---------- interval_seconds : `int` Duration of slice time intervals, in seconds mjd_column_name : `str` Name of column on which to slice, must be in units of days badval : `float` Value to use for bad values in slice verbose : `bool` Print extra information? """ def __init__( self, interval_seconds=90, mjd_column_name="observationStartMJD", badval=np.NaN, verbose=False, ): super().__init__(verbose=verbose, badval=badval) self.interval_seconds = interval_seconds self.mjd_column_name = mjd_column_name self.columns_needed = [mjd_column_name] self.sim_idxs = defaultdict(list) # pylint: disable=invalid-name
[docs] def setup_slicer(self, sim_data, maps=None): visit_mjds = sim_data[self.mjd_column_name] start_mjd = np.floor(np.min(visit_mjds)).astype(int) end_mjd = np.ceil(np.max(visit_mjds)).astype(int) interval_days = self.interval_seconds / (24 * 60 * 60.0) mjd_bin_edges = np.arange(start_mjd, end_mjd + interval_days, interval_days) self.sim_idxs.update( pd.DataFrame( { "visit_idx": np.arange(len(visit_mjds)), "sid": np.digitize(visit_mjds, mjd_bin_edges), } ) .groupby("sid") .agg(list) .to_dict()["visit_idx"] ) mjds = np.arange(start_mjd, end_mjd, interval_days) self.slice_points["sid"] = np.arange(len(mjds)) self.slice_points["mjd"] = mjds self.slice_points["duration"] = np.full_like(mjds, self.interval_seconds) self.nslice = len(mjds) self.shape = self.nslice self._run_maps(maps) @wraps(self._slice_sim_data) def _slice_sim_data(islice): # pylint: disable=invalid-name idxs = self.sim_idxs[islice] try: _ = idxs[0] except (TypeError, IndexError): idxs = [idxs] slice_points = { "sid": islice, "mjd": self.slice_points["mjd"][islice], "duration": self.slice_points["duration"][islice], } return {"idxs": idxs, "slice_point": slice_points} setattr(self, "_slice_sim_data", _slice_sim_data)
[docs] def __eq__(self, other_slicer): """Evaluate if slicers are equivalent.""" if not isinstance(other_slicer, self.__class__): return False for key in ["sid", "mjd", "duration"]: if not np.array_equal(other_slicer.slice_points[key], self.slice_points[key]): return False return True
def _slice_sim_data(self, *args, **kwargs): raise SlicerNotSetupError()
[docs] class BlockIntervalSlicer(TimeIntervalSlicer): """Slices into intervals with common "note" values and no long gaps. Parameters ---------- mjd_column_name : `str` Name of column on which to slice, must be in units of days duration_column_name : `str` Name of column with the duration of each visit (in seconds) note_column_name : `str` Name of column with the visit note. badval : `float` Value to use for bad values in slice verbose : `bool` Print extra information? """ # Gap between visits in the same block, in hours gap_tolerance = 0.1 def __init__( self, mjd_column_name="observationStartMJD", duration_column_name="visitTime", note_column_name="scheduler_note", badval=np.NaN, verbose=False, ): super().__init__(verbose=verbose, badval=badval) self.mjd_column_name = mjd_column_name self.duration_column_name = duration_column_name self.note_column_name = note_column_name self.columns_needed = [ mjd_column_name, duration_column_name, note_column_name, ] self.sim_idxs = defaultdict(list) # pylint: disable=invalid-name
[docs] def setup_slicer(self, sim_data, maps=None): visits = pd.DataFrame( sim_data, index=pd.Index(np.arange(len(sim_data[self.mjd_column_name])), name="visit_idx"), ) visits.rename( columns={ self.mjd_column_name: "mjd", self.duration_column_name: "duration", self.note_column_name: "scheduler_note", }, inplace=True, ) # convert to hours visits.sort_values("mjd", inplace=True) visits["end_mjd"] = visits.mjd + visits.duration / (60 * 60 * 24.0) same_note = visits.note == visits.note.shift(-1) adjacent_times = visits.end_mjd + self.gap_tolerance / 24.0 > visits.mjd.shift(-1) visits["sid"] = np.logical_not(np.logical_and(same_note, adjacent_times)).cumsum().shift() visits["sid"].fillna(0, inplace=True) visits["sid"] = visits["sid"].astype(int) blocks = visits.groupby("sid").agg( mjd=pd.NamedAgg(column="mjd", aggfunc="min"), end_mjd=pd.NamedAgg(column="end_mjd", aggfunc="max"), ) blocks["duration"] = (blocks.end_mjd - blocks.mjd) * 24 * 60 * 60 self.nslice = len(blocks) self.shape = self.nslice self.sim_idxs.update( visits.reset_index()[["sid", "visit_idx"]].groupby("sid").agg(list).to_dict()["visit_idx"] ) self.slice_points["sid"] = blocks.reset_index().sid.values self.slice_points["mjd"] = blocks.mjd.values self.slice_points["duration"] = blocks.duration.values self._run_maps(maps) @wraps(self._slice_sim_data) def _slice_sim_data(islice): # pylint: disable=invalid-name idxs = self.sim_idxs[islice] try: _ = idxs[0] except TypeError: idxs = [idxs] slice_points = { "sid": islice, "mjd": self.slice_points["mjd"][islice], "duration": self.slice_points["duration"][islice], } return {"idxs": idxs, "slice_point": slice_points} setattr(self, "_slice_sim_data", _slice_sim_data)
[docs] class VisitIntervalSlicer(TimeIntervalSlicer): """Slices into intervals each of which contain one visit Parameters ---------- mjd_column_name : `str` Name of column on which to slice, must be in units of days duration_column_name : `str` Name of column with the duration of each visit (in seconds) badval : `float` Value to use for bad values in slice verbose : `bool` Print extra information? """ def __init__( self, mjd_column_name="observationStartMJD", duration_column_name="visitTime", extra_column_names=tuple(), badval=np.NaN, verbose=False, ): super().__init__(verbose=verbose, badval=badval) self.mjd_column_name = mjd_column_name self.duration_column_name = duration_column_name self.extra_column_names = extra_column_names self.columns_needed = [mjd_column_name, duration_column_name] self.sim_idxs = None # pylint: disable=invalid-name
[docs] def setup_slicer(self, sim_data, maps=None): self.nslice = len(sim_data[self.mjd_column_name]) self.shape = self.nslice self.sim_idxs = np.argsort(sim_data[self.mjd_column_name]) self.slice_points["sid"] = np.arange(self.nslice) self.slice_points["mjd"] = sim_data[self.mjd_column_name] self.slice_points["duration"] = sim_data[self.duration_column_name] for column_name in self.extra_column_names: self.slice_points[column_name] = sim_data[column_name] self._run_maps(maps) @wraps(self._slice_sim_data) def _slice_sim_data(islice): # pylint: disable=invalid-name idxs = self.sim_idxs[islice] slice_points = { "sid": [idxs], "mjd": self.slice_points["mjd"][islice], "duration": self.slice_points["duration"][islice], } for column_name in self.extra_column_names: slice_points[column_name] = self.slice_points[column_name] return {"idxs": idxs, "slice_point": slice_points} setattr(self, "_slice_sim_data", _slice_sim_data)
# internal functions & classes