__all__ = ("StackerRegistry", "BaseStacker")
import inspect
import warnings
import numpy as np
[docs]
class StackerRegistry(type):
"""
Meta class for Stackers, to build a registry of stacker classes.
"""
def __init__(cls, name, bases, dict):
super(StackerRegistry, cls).__init__(name, bases, dict)
if not hasattr(cls, "registry"):
cls.registry = {}
if not hasattr(cls, "source_dict"):
cls.source_dict = {}
modname = inspect.getmodule(cls).__name__
if modname.startswith("rubin_sim.maf.stackers"):
modname = ""
else:
if len(modname.split(".")) > 1:
modname = ".".join(modname.split(".")[:-1]) + "."
else:
modname = modname + "."
stackername = modname + name
if stackername in cls.registry:
raise Exception(
"Redefining stacker %s! (there are >1 stackers with the same name)" % (stackername)
)
if stackername != "BaseStacker":
cls.registry[stackername] = cls
cols_added = cls.cols_added
for col in cols_added:
cls.source_dict[col] = cls
def get_class(cls, stackername):
return cls.registry[stackername]
def help(cls, doc=False):
for stackername in sorted(cls.registry):
if not doc:
print(stackername)
if doc:
print("---- ", stackername, " ----")
print(cls.registry[stackername].__doc__)
stacker = cls.registry[stackername]()
print(" Columns added to SimData: ", ",".join(stacker.cols_added))
print(" Default columns required: ", ",".join(stacker.cols_req))
[docs]
class BaseStacker(metaclass=StackerRegistry):
"""Base MAF Stacker.
Stackers add columns generated at run-time to the simdata array.
"""
# List of the names of the columns generated by the Stacker.
cols_added = []
def __init__(self):
"""
Instantiate the stacker.
This method should be overriden by the user.
This serves to define the API required by MAF.
"""
# Add the list of new columns generated by the stacker
# as class attributes (colsAdded - above).
# List of the names of the columns required from the database
# (to generate the Stacker columns).
self.cols_req = []
# Optional: specify the new column types.
self.cols_added_dtypes = None
# Optional: provide a list of units for the columns
# defined in colsAdded.
self.units = [None]
def __hash__(self):
return None
[docs]
def __eq__(self, other_stacker):
"""
Evaluate if two stackers are equivalent.
This method is required to determine if metric_bundles may be
evaluated at the same time, on the same data.
"""
# If the class names are different, they are not 'the same'.
if self.__class__.__name__ != other_stacker.__class__.__name__:
return False
# Otherwise, this is the same stacker class,
# but may be instantiated differently.
# We have to delve a little further,
# and compare the kwargs & attributes for each stacker.
state_now = dir(self)
for key in state_now:
if not key.startswith("_") and key != "registry" and key != "run" and key != "next":
if not hasattr(other_stacker, key):
return False
# If the attribute is from numpy, assume it's an array
# and test it
if type(getattr(self, key)).__module__ == np.__name__:
if not np.array_equal(getattr(self, key), getattr(other_stacker, key)):
return False
else:
if getattr(self, key) != getattr(other_stacker, key):
return False
return True
[docs]
def __ne__(self, other_stacker):
"""
Evaluate if two stackers are not equal.
"""
if self == other_stacker:
return False
else:
return True
def _add_stacker_cols(self, sim_data):
"""
Add the new Stacker columns to the sim_data array.
If columns already present in sim_data,
just allows 'run' method to overwrite.
Returns sim_data array with these columns added
(so 'run' method can set their values).
"""
if not hasattr(self, "cols_added_dtypes") or self.cols_added_dtypes is None:
self.cols_added_dtypes = [float for col in self.cols_added]
# Create description of new recarray.
newdtype = sim_data.dtype.descr
cols_present = [False] * len(self.cols_added)
for i, (col, dtype) in enumerate(zip(self.cols_added, self.cols_added_dtypes)):
if col in sim_data.dtype.names:
if sim_data[col][0] is not None:
cols_present[i] = True
warnings.warn(
"Warning - column %s already present in sim_data, may be overwritten "
"(depending on stacker)." % (col)
)
else:
newdtype += [(col, dtype)]
new_data = np.empty(sim_data.shape, dtype=newdtype)
# Add references to old data.
for col in sim_data.dtype.names:
new_data[col] = sim_data[col]
# Were all columns present and populated with something not None?
# If so, then consider 'all there'.
if sum(cols_present) == len(self.cols_added):
cols_present = True
else:
cols_present = False
return new_data, cols_present
[docs]
def run(self, sim_data, override=False):
"""
Run the stacker, adding new columns.
Parameters
----------
sim_data : `np.ndarray`, (N, M)
The data to be used to evaluate metrics.
override : `bool`, optional
If True, recalculate new (stacker) columns even if present.
If False, calculates stacker columns only if they are not present.
Returns
-------
sim_data : `np.ndarray`, (N, MM)
The input data, plus additional stacker columns.
"""
# Add new columns
if len(sim_data) == 0:
return sim_data
sim_data, cols_present = self._add_stacker_cols(sim_data)
# If override is set, it means go ahead and recalculate stacker values.
if override:
cols_present = False
# Run the method to calculate/add new data.
try:
return self._run(sim_data, cols_present)
except TypeError:
warnings.warn(
"Please update the stacker %s so that the _run method matches the current API. "
"This will give you the option to skip re-running stackers if the columns are "
"already present." % (self.__class__.__name__)
)
return self._run(sim_data)
def _run(self, sim_data, cols_present=False):
"""Do the work to add the new columns. This method should be overriden
in subclasses.
Parameters
----------
sim_data: `np.ndarray`, (N, M)
The observation data, provided by the MAF framework.
cols_present: `bool`, optional
Flag to indicate what to do with the columns already present in
the data. This will also be provided by the MAF framework --
but your _run method can use the value. If it is 'True' and you
do trust the existing value, the _run method can simply
return sim_data without additional calculations.
Returns
-------
sim_data : `np.ndarray`, (N, MM)
The simdata, with the columns added or updated
(or simply already present).
"""
# By moving the calculation of these columns to a separate method,
# we add the possibility of using stackers with pandas dataframes.
# The _addStackerCols method won't work with dataframes, but the
# _run methods are quite likely to (depending on their details),
# as they are just populating columns.
raise NotImplementedError(
"Not Implemented: " "the child stackers should implement their own _run methods"
)