Source code for rubin_sim.maf.slicers.one_d_slicer

__all__ = ("OneDSlicer",)

import warnings
from functools import wraps

import numpy as np

from rubin_sim.maf.plots.oned_plotters import OneDBinnedData
from rubin_sim.maf.stackers import ColInfo
from rubin_sim.maf.utils import optimal_bins

from .base_slicer import BaseSlicer


[docs] class OneDSlicer(BaseSlicer): """OneD Slicer allows the slicing of data into bins in a single dimension. Parameters ---------- slice_col_name : `str` The name of the data column to base slicing on (i.e. 'airmass', etc.) slice_col_units : `str`, optional Set a name for the units of the sliceCol. Used for plotting labels. bins : np.ndarray, optional The data will be sliced into 'bins': this can be defined as an array here. Default None. bin_min : `float`, optional bin_max : `float`, optional bin_size : `float`, optional If bins is not defined, then bin_min/bin_max/bin_size can be chosen to anchor the slice points. Default None. Priority goes: bins >> bin_min/bin_max/bin_size >> data values (if none of the above are chosen). Notes ----- All bins except for the last bin are half-open ([a, b)) while the last bin is ([a, b]). """ def __init__( self, slice_col_name=None, slice_col_units=None, bins=None, bin_min=None, bin_max=None, bin_size=None, verbose=True, badval=0, ): super().__init__(verbose=verbose, badval=badval) if slice_col_name is None: raise ValueError("slice_col_name cannot be left None - choose a data column to group data by") self.slice_col_name = slice_col_name self.columns_needed = [slice_col_name] # We could try to set up the self.bins here -- # but it's also possible that # these bin_min/max/size values have not been set and # should just be set from the data. self.bins = bins self.bin_min = bin_min self.bin_max = bin_max self.bin_size = bin_size # Forget binmin/max/stepsize if bins was set if self.bins is not None: if bin_min is not None or bin_max is not None or bin_size is not None: warnings.warning( f"Both bins and one of the bin_min/bin_max/bin_size was specified. " f"Using bins ({self.bins} values only." ) self.bin_min = None self.bin_max = None self.bin_size = None # Set the column units if slice_col_units is not None: self.slice_col_units = slice_col_units # Try to determine the column units else: co = ColInfo() self.slice_col_units = co.get_units(self.slice_col_name) # Set slicer re-initialize values and default plotFunction self.slicer_init = { "slice_col_name": self.slice_col_name, "slice_col_units": slice_col_units, "badval": badval, "bin_min": self.bin_min, "bin_max": self.bin_max, "bin_size": self.bin_size, "bins": self.bins, } self.plot_funcs = [ OneDBinnedData, ]
[docs] def setup_slicer(self, sim_data, maps=None): """Set up bins in slicer. This happens AFTER sim_data is defined, thus typically in the MetricBundleGroup. This maps data into the bins; it's not a good idea to reuse a OneDSlicer as a result. """ if "bins" in self.slice_points: warning_msg = "Warning: this OneDSlicer was already set up once. " warning_msg += ( "Re-setting up a OneDSlicer is unpredictable; at the very least, it " "will change the mapping of the simulated data into the data slices. " "A safer choice is to use a separate OneDSlicer for each MetricBundle." ) warnings.warn(warning_msg) slice_col = sim_data[self.slice_col_name] # Set bins from data or specified values, # if they were previously defined. if self.bins is None: # Set bin min/max values (could have been set in __init__) if self.bin_min is None: self.bin_min = np.nanmin(slice_col) if self.bin_max is None: self.bin_max = np.nanmax(slice_col) # Give warning if bin_min = bin_max, # and do something at least slightly reasonable. if self.bin_min == self.bin_max: warnings.warn( "bin_min = bin_max (maybe your data is single-valued?). " "Increasing bin_max by 1 (or 2*bin_size, if bin_size was set)." ) if self.bin_size is not None: self.bin_max = self.bin_max + 2 * self.bin_size else: self.bin_max = self.bin_max + 1 if self.bin_size is None: bins = optimal_bins(slice_col, self.bin_min, self.bin_max) nbins = np.round(bins) self.bin_size = (self.bin_max - self.bin_min) / float(nbins) # Set bins self.bins = np.arange(self.bin_min, self.bin_max + self.bin_size / 2.0, self.bin_size, "float") # nslice is used to stop iteration and should # reflect the usable length of the bins self.nslice = len(self.bins) - 1 # and "shape" refers to the length of the datavalues, # and should be one less than # of bins because last # binvalue is RH edge only self.shape = self.nslice # Set slice_point metadata. self.slice_points["sid"] = np.arange(self.nslice) self.slice_points["bins"] = self.bins # Add metadata from map if needed. self._run_maps(maps) indxs = np.argsort(sim_data[self.slice_col_name]) data_sorted = sim_data[self.slice_col_name][indxs] # Setting up slices such that left_edge <= data < right_edge # in each slice. left = np.searchsorted(data_sorted, self.bins[0:-1], "left") right = np.searchsorted(data_sorted, self.bins[1:], "left") self.sim_idxs = [indxs[le:ri] for le, ri in zip(left, right)] # Set up _slice_sim_data method for this class. @wraps(self._slice_sim_data) def _slice_sim_data(islice): """Slice sim_data on oneD sliceCol, to return relevant indexes for slice_point. """ idxs = self.sim_idxs[islice] bin_left = self.bins[islice] bin_right = self.bins[islice + 1] return { "idxs": idxs, "slice_point": { "sid": islice, "bin_left": bin_left, "bin_right": bin_right, }, } setattr(self, "_slice_sim_data", _slice_sim_data)
[docs] def __eq__(self, other_slicer): """Evaluate if slicers are equivalent.""" result = False if isinstance(other_slicer, OneDSlicer): if self.slice_col_name == other_slicer.slice_col_name: # If slicer restored from disk or setup, # then 'bins' in slice_points dict. # This is preferred method to see if slicers are equal. if ("bins" in self.slice_points) & ("bins" in other_slicer.slice_points): result = np.array_equal(other_slicer.slice_points["bins"], self.slice_points["bins"]) # However, before we 'setup' the slicer with data, # the slicers could be equivalent. else: if (self.bins is not None) and (other_slicer.bins is not None): result = np.array_equal(self.bins, other_slicer.bins) elif ( (self.bin_size is not None) and (self.bin_min is not None) & (self.bin_max is not None) and (other_slicer.bin_size is not None) and (other_slicer.bin_min is not None) and (other_slicer.bin_max is not None) ): if ( (self.bin_size == other_slicer.bin_size) and (self.bin_min == other_slicer.bin_min) and (self.bin_max == other_slicer.bin_max) ): result = True return result