Source code for rubin_sim.maf.slicers.base_slicer

# Base class for all 'Slicer' objects.
#
__all__ = ("SlicerRegistry", "BaseSlicer")

import inspect
import json
import warnings
from io import StringIO

import numpy as np
import numpy.ma as ma

from rubin_sim.maf.utils import get_date_version


[docs] class SlicerRegistry(type): """ Meta class for slicers, to build a registry of slicer classes. """ def __init__(cls, name, bases, dict): super(SlicerRegistry, cls).__init__(name, bases, dict) if not hasattr(cls, "registry"): cls.registry = {} modname = inspect.getmodule(cls).__name__ + "." if modname.startswith("rubin_sim.maf.slicers"): modname = "" slicername = modname + name if slicername in cls.registry: raise Exception("Redefining metric %s! (there are >1 slicers with the same name)" % (slicername)) if slicername not in ["BaseSlicer", "BaseSpatialSlicer"]: cls.registry[slicername] = cls def get_class(cls, slicername): return cls.registry[slicername] def help(cls, doc=False): for slicername in sorted(cls.registry): if not doc: print(slicername) if doc: print("---- ", slicername, " ----") print(inspect.getdoc(cls.registry[slicername]))
[docs] class BaseSlicer(metaclass=SlicerRegistry): """ Base class for all slicers: sets required methods and implements common functionality. After first construction, the slicer should be ready for `setup_slicer` which defines slice_points, allowing the slicer to "slice" data and generate plots. After init after a restore: everything necessary for using slicer for plotting or saving/restoring metric data should be present, although the slicer does not need to be able to slice data again and generally will not be able to do so. Parameters ---------- verbose: `bool`, optional True/False flag to send extra output to screen. badval: `int` or `float`, optional The value the Slicer uses to fill masked metric data values """ def __init__(self, verbose=False, badval=-666): self.verbose = verbose self.badval = badval # Set the cache_size. # Currently only healpixSlicers (and their derivatives) use the cache. # The size of the cache is set directly by those slicers. self.cache_size = 0 # Set length of Slicer. This determines the endpoint for iteration. self.nslice = None # Set the length of the data (metric) values. # This is often but not necessarily the same as nslice. self.shape = None self.slice_points = {} self.slicer_name = self.__class__.__name__ self.columns_needed = [] # Create a dict that saves how to re-init the slicer. # This may not be the whole set of args/kwargs, may only be # those which carry useful metadata or are necesary for init. self.slicer_init = {"badval": badval} self.plot_funcs = [] def _run_maps(self, maps): """Add map metadata to slice_points.""" if maps is not None: for m in maps: self.slice_points = m.run(self.slice_points)
[docs] def setup_slicer(self, sim_data, maps=None): """Set up Slicer for data slicing. Set up internal parameters necessary for slicer to slice data and generates indexes on sim_data. Also sets _slice_sim_data for a particular slicer. Parameters ----------- sim_data : `np.recarray` The simulated data to be sliced. maps : `list` of `rubin_sim.maf.maps` objects, optional. Maps to apply at each slice_point, to add to the slice_point metadata. """ raise NotImplementedError()
[docs] def get_slice_points(self): """Return the slice_point metadata, for all slice points.""" return self.slice_points
[docs] def __len__(self): """Return nslice, the number of slice_points in the slicer.""" return self.nslice
[docs] def __iter__(self): """Iterate over the slices.""" self.islice = 0 return self
[docs] def __next__(self): """Returns results of self._slice_sim_data when iterating over slicer. Results of self._slice_sim_data should be dictionary of {'idxs': the data indexes relevant for this slice of the slicer, 'slice_point': the metadata for the slice_point, which always includes 'sid' key for ID of slice_point.} """ if self.islice >= self.nslice: raise StopIteration islice = self.islice self.islice += 1 return self._slice_sim_data(islice)
def __getitem__(self, islice): return self._slice_sim_data(islice)
[docs] def __eq__(self, other_slicer): """Evaluate if two slicers are equivalent.""" raise NotImplementedError()
[docs] def __ne__(self, other_slicer): """Evaluate if two slicers are not equivalent.""" if self == other_slicer: return False else: return True
def _slice_sim_data(self, slice_point): """Slice the simulation data appropriately for the slicer. Given the identifying slice_point metadata The slice of data returned will be the indices of the numpy rec array (the sim_data) which are appropriate for the metric to be working on, for that slice_point. """ raise NotImplementedError('This method is set up by "setup_slicer" - run that first.')
[docs] def write_data( self, outfilename, metric_values, metric_name="", sim_data_name="", constraint=None, info_label="", plot_dict=None, display_dict=None, summary_values=None, ): """ Save metric values along with the information required to re-build the slicer. Parameters ----------- outfilename : `str` The output file name. metric_values : `np.ma.MaskedArray` or `np.ndarray` The metric values to save to disk. metric_name : `str` Name of the metric as configured when run sim_data_name : `str` Name of the simulation metric run on constraint : `str` Constraint used to subselect data info_label : `str` Descriptive additional information plot_dict : `dict` Dictionary of plotting parameters display_dict : `dict` Dictionary of display parameters, including caption summary_values : `dict` Dictionary of summary statistics """ header = {} header["metric_name"] = metric_name header["constraint"] = constraint header["info_label"] = info_label header["sim_data_name"] = sim_data_name date, version_info = get_date_version() header["date_ran"] = date if display_dict is None: display_dict = {"group": "Ungrouped"} header["display_dict"] = display_dict header["plot_dict"] = plot_dict header["summary_values"] = summary_values for key in version_info: header[key] = version_info[key] if hasattr(metric_values, "mask"): # If it is a masked array data = metric_values.data mask = metric_values.mask fill = metric_values.fill_value else: data = metric_values mask = None fill = None # npz file acts like dictionary: each keyword/value pair # below acts as a dictionary in loaded NPZ file. np.savez( outfilename, # header saved as dictionary header=header, # metric data values metric_values=data, # metric mask values mask=mask, # metric badval/fill val fill=fill, # dictionary of instantiation parameters slicer_init=self.slicer_init, # class name slicer_name=self.slicer_name, # slice_point metadata saved (is a dictionary) slice_points=self.slice_points, slicer_n_slice=self.nslice, slicer_shape=self.shape, )
[docs] def output_json( self, metric_values, metric_name="", sim_data_name="", info_label="", plot_dict=None, ): """ Send metric data to JSON streaming API, along with a little bit of metadata. This method will only work for metrics where the metricDtype is float or int, as JSON will not interpret more complex data properly. These values can't be plotted anyway though. Parameters ----------- metric_values : `np.ma.MaskedArray` or `np.ndarray` The metric values. metric_name : `str`, optional The name of the metric. sim_data_name : `str`, optional The name of the simulated data source. info_label : `str`, optional Some additional information about this metric and how it was calculated. plot_dict : `dict`, optional. The plot_dict for this metric bundle. Returns -------- io : `StringIO` StringIO object containing a header dictionary with metric_name/metadata/sim_data_name/slicer_name, and plot labels from plot_dict, and metric values/data for plot. if oneDSlicer, the data is [ [bin_left_edge, value], [bin_left_edge, value]..]. if a spatial slicer, the data is [ [lon, lat, value], [lon, lat, value] ..]. """ # Bail if this is not a good data type for JSON. if not (metric_values.dtype == "float") or (metric_values.dtype == "int"): warnings.warn("Cannot generate JSON.") io = StringIO() json.dump(["Cannot generate JSON for this file."], io) return None # Else put everything together for JSON output. if plot_dict is None: plot_dict = {} plot_dict["units"] = "" # Preserve some of the metadata for the plot. header = {} header["metric_name"] = metric_name header["info_label"] = info_label header["sim_data_name"] = sim_data_name header["slicer_name"] = self.slicer_name header["slicer_len"] = int(self.nslice) # Set some default plot labels if appropriate. if "title" in plot_dict: header["title"] = plot_dict["title"] else: header["title"] = "%s %s: %s" % (sim_data_name, info_label, metric_name) if "xlabel" in plot_dict: header["xlabel"] = plot_dict["xlabel"] else: if hasattr(self, "slice_col_name"): header["xlabel"] = "%s (%s)" % ( self.slice_col_name, self.slice_col_units, ) else: header["xlabel"] = "%s" % metric_name if "units" in plot_dict: header["xlabel"] += " (%s)" % (plot_dict["units"]) if "ylabel" in plot_dict: header["ylabel"] = plot_dict["ylabel"] else: if hasattr(self, "slice_col_name"): header["ylabel"] = "%s" % metric_name if "units" in plot_dict: header["ylabel"] += " (%s)" % (plot_dict["units"]) else: # If it's not a oneDslicer and no ylabel given, don't need one. pass # Bundle up slicer and metric info. metric = [] # If metric values is a masked array. if hasattr(metric_values, "mask"): if "ra" in self.slice_points: # Spatial slicer. # Translate ra/dec to lon/lat in degrees and # output with metric value. for ra, dec, value, mask in zip( self.slice_points["ra"], self.slice_points["dec"], metric_values.data, metric_values.mask, ): if not mask: lon = ra * 180.0 / np.pi lat = dec * 180.0 / np.pi metric.append([lon, lat, value]) elif "bins" in self.slice_points: # OneD slicer. Translate bins into bin/left and # output with metric value. for i in range(len(metric_values)): bin_left = self.slice_points["bins"][i] value = metric_values.data[i] mask = metric_values.mask[i] if not mask: metric.append([bin_left, value]) else: metric.append([bin_left, 0]) metric.append([self.slice_points["bins"][i + 1], 0]) elif self.slicer_name == "UniSlicer": metric.append([metric_values[0]]) # Else: else: if "ra" in self.slice_points: for ra, dec, value in zip(self.slice_points["ra"], self.slice_points["dec"], metric_values): lon = ra * 180.0 / np.pi lat = dec * 180.0 / np.pi metric.append([lon, lat, value]) elif "bins" in self.slice_points: for i in range(len(metric_values)): bin_left = self.slice_points["bins"][i] value = metric_values[i] metric.append([bin_left, value]) metric.append(self.slice_points["bins"][i + 1][0]) elif self.slicer_name == "UniSlicer": metric.append([metric_values[0]]) # Write out JSON output. io = StringIO() json.dump([header, metric], io) return io
[docs] def read_data(self, infilename): """ Read metric data from disk, along with the info to rebuild the slicer (minus new slicing capability). Parameters ----------- infilename: `str` The filename containing the metric data. Returns ------- metric_values, slicer, header : `np.ma.MaskedArray`, `rubin_sim.maf.slicer`, `dict` MetricValues stored in data file, the slicer basis for those metric values, and a dictionary containing header information (run_name, metadata, etc.). """ import rubin_sim.maf.slicers as slicers # Allowing pickles here is required, because otherwise we cannot # restore data saved as objects. restored = np.load(infilename, allow_pickle=True) if "slicer_name" not in restored: metric_values, slicer, header = self.read_backwards_compatible(restored, infilename) return metric_values, slicer, header # This is the standard behavior and will be the # sole behavior at a future release point. # Get metadata and other sim_data info. header = restored["header"][()] if "dateRan" in header: header["date_ran"] = header["dateRan"] # Get slicer information. slicer_init = restored["slicer_init"][()] slicer_name = str(restored["slicer_name"]) slice_points = restored["slice_points"][()] slicer_nslice = restored["slicer_n_slice"] slicer_shape = restored["slicer_shape"] try: slicer = getattr(slicers, slicer_name)(**slicer_init) except TypeError: if self.verbose: warnings.warn( f"Cannot use saved slicer init values; falling back to defaults for {infilename}" ) slicer = getattr(slicers, slicer_name)() # Restore slice_point information. slicer.nslice = slicer_nslice slicer.slice_points = slice_points slicer.shape = slicer_shape # Get metric data set if restored["mask"][()] is None: metric_values = ma.MaskedArray(data=restored["metric_values"]) else: metric_values = ma.MaskedArray( data=restored["metric_values"], mask=restored["mask"], fill_value=restored["fill"], ) return metric_values, slicer, header
[docs] def read_backwards_compatible(self, restored, infilename): """Read pre v1.0 metric files.""" # Backwards compatibility for pre-v1.0 metric outputs. # To be deprecated at a future release. warnings.warn( "Reading pre-v1.0 metric data. To be deprecated in a future release.", FutureWarning, ) import rubin_sim.maf.slicers as slicers header = restored["header"][()] header["metric_name"] = header["metricName"] header["sim_data_name"] = header["simDataName"] if "metadata" in header: header["info_label"] = header["metadata"] if "plotDict" in header: header["plot_dict"] = header["plotDict"] if "displayDict" in header: header["display_dict"] = header["displayDict"] if "dateRan" in header: header["date_ran"] = header["dateRan"] slicer_init = restored["slicer_init"][()] slicer_name = str(restored["slicerName"]) slice_points = restored["slicePoints"][()] slicer_nslice = restored["slicerNSlice"] slicer_shape = restored["slicerShape"] # Slicer init update new = ["lat_col", "lon_col", "use_camera"] old = ["latCol", "lonCol", "useCamera"] for n, o in zip(new, old): if o in slicer_init: slicer_init[n] = slicer_init[o] del slicer_init[o] if "Hrange" in slicer_init: slicer_init["h_range"] = slicer_init["Hrange"] del slicer_init["Hrange"] new = ["bin_min", "bin_max", "bin_size", "slice_col_name", "slice_col_units"] old = ["binMin", "binMax", "binsize", "sliceColName", "sliceColUnits"] for n, o in zip(new, old): if o in slicer_init: slicer_init[n] = slicer_init[o] del slicer_init[o] # An earlier backwards compatibility issue - # map 'spatialkey1/spatialkey2' to 'lon_col/lat_col'. if "spatialkey1" in slicer_init: slicer_init["lon_col"] = slicer_init["spatialkey1"] del slicer_init["spatialkey1"] if "spatialkey2" in slicer_init: slicer_init["lat_col"] = slicer_init["spatialkey2"] del slicer_init["spatialkey2"] try: slicer = getattr(slicers, slicer_name)(**slicer_init) except TypeError: if self.verbose: warnings.warn( f"Cannot use saved slicer init values; falling back to defaults for {infilename}" ) slicer = getattr(slicers, slicer_name)() # Restore slice_point information. slicer.nslice = slicer_nslice slicer.slice_points = slice_points slicer.shape = slicer_shape # Get metric data set if restored["mask"][()] is None: metric_values = ma.MaskedArray(data=restored["metricValues"]) else: metric_values = ma.MaskedArray( data=restored["metricValues"], mask=restored["mask"], fill_value=restored["fill"], ) return metric_values, slicer, header