Source code for rubin_sim.maf.metrics.simple_metrics

__all__ = (
    "PassMetric",
    "Coaddm5Metric",
    "MaxMetric",
    "AbsMaxMetric",
    "MeanMetric",
    "AbsMeanMetric",
    "MedianMetric",
    "AbsMedianMetric",
    "MinMetric",
    "FullRangeMetric",
    "RmsMetric",
    "RelRmsMetric",
    "SumMetric",
    "CountUniqueMetric",
    "CountMetric",
    "CountRatioMetric",
    "CountSubsetMetric",
    "CountBeyondThreshold",
    "RobustRmsMetric",
    "MaxPercentMetric",
    "AbsMaxPercentMetric",
    "BinaryMetric",
    "FracAboveMetric",
    "FracBelowMetric",
    "PercentileMetric",
    "NoutliersNsigmaMetric",
    "UniqueRatioMetric",
    "MeanAngleMetric",
    "RmsAngleMetric",
    "FullRangeAngleMetric",
    "CountExplimMetric",
    "AngularSpreadMetric",
    "RealMeanMetric",
)

import numpy as np

from .base_metric import BaseMetric

# A collection of commonly used simple metrics,
# operating on a single column and returning a float.

twopi = 2.0 * np.pi


[docs] class PassMetric(BaseMetric): """Pass the entire dataslice array back to the MetricBundle. This is most likely useful while prototyping metrics and wanting to just 'get the data at a point in the sky', while using a HealpixSlicer or a UserPointSlicer. """ def __init__(self, cols=None, **kwargs): if cols is None: cols = [] super(PassMetric, self).__init__(col=cols, metric_dtype="object", **kwargs)
[docs] def run(self, data_slice, slice_point=None): return data_slice
[docs] class Coaddm5Metric(BaseMetric): """Calculate the coadded m5 value. Parameters ---------- m5_col : `str`, optional Name of the m5 column. Default fiveSigmaDepth. metric_name : `str`, optional Name to associate with the metric output. Default "CoaddM5". filter_name : `str`, optional Optionally specify a filter to sub-select visits. Default None, does no sub-selection or checking. filter_col : `str`, optional Name of the filter column. units : `str`, optional Units for the metric. Default "mag". """ def __init__( self, m5_col="fiveSigmaDepth", metric_name="CoaddM5", filter_name=None, filter_col="Filter", units="mag", **kwargs, ): self.filter_name = filter_name self.filter_col = filter_col self.m5_col = m5_col super().__init__(col=m5_col, metric_name=metric_name, units=units, **kwargs) @staticmethod def coadd(single_visit_m5s): return 1.25 * np.log10(np.sum(10.0 ** (0.8 * single_visit_m5s)))
[docs] def run(self, data_slice, slice_point=None): if len(data_slice) == 0: return self.badval if self.filter_name is not None: matched = np.where(data_slice[self.filter_col] == self.filter_name) coadd = self.coadd(data_slice[matched][self.m5_col]) else: coadd = self.coadd(data_slice[self.m5_col]) return coadd
[docs] class MaxMetric(BaseMetric): """Calculate the maximum of a simData column slice."""
[docs] def run(self, data_slice, slice_point=None): return np.max(data_slice[self.colname])
[docs] class AbsMaxMetric(BaseMetric): """Calculate the max of the absolute value of a simData column slice."""
[docs] def run(self, data_slice, slice_point=None): return np.max(np.abs(data_slice[self.colname]))
[docs] class MeanMetric(BaseMetric): """Calculate the mean of a simData column slice."""
[docs] def run(self, data_slice, slice_point=None): return np.mean(data_slice[self.colname])
[docs] class AbsMeanMetric(BaseMetric): """Calculate the mean of the absolute value of a simData column slice."""
[docs] def run(self, data_slice, slice_point=None): return np.mean(np.abs(data_slice[self.colname]))
[docs] class MedianMetric(BaseMetric): """Calculate the median of a simData column slice."""
[docs] def run(self, data_slice, slice_point=None): return np.median(data_slice[self.colname])
[docs] class AbsMedianMetric(BaseMetric): """Calculate the median of the absolute value of a simData column slice."""
[docs] def run(self, data_slice, slice_point=None): return np.median(np.abs(data_slice[self.colname]))
[docs] class MinMetric(BaseMetric): """Calculate the minimum of a simData column slice."""
[docs] def run(self, data_slice, slice_point=None): return np.min(data_slice[self.colname])
[docs] class FullRangeMetric(BaseMetric): """Calculate the range of a simData column slice."""
[docs] def run(self, data_slice, slice_point=None): return np.max(data_slice[self.colname]) - np.min(data_slice[self.colname])
[docs] class RmsMetric(BaseMetric): """Calculate the standard deviation of a simData column slice."""
[docs] def run(self, data_slice, slice_point=None): return np.std(data_slice[self.colname])
[docs] class RelRmsMetric(BaseMetric): """Calculate the relative scatter metric (RMS divided by median)."""
[docs] def run(self, data_slice, slice_point=None): return np.std(data_slice[self.colname]) / np.median(data_slice[self.colname])
[docs] class SumMetric(BaseMetric): """Calculate the sum of a simData column slice."""
[docs] def run(self, data_slice, slice_point=None): return np.sum(data_slice[self.colname])
[docs] class CountUniqueMetric(BaseMetric): """Return the number of unique values."""
[docs] def run(self, data_slice, slice_point=None): return np.size(np.unique(data_slice[self.colname]))
[docs] class UniqueRatioMetric(BaseMetric): """Return the number of unique values divided by the total number of values."""
[docs] def run(self, data_slice, slice_point=None): ntot = float(np.size(data_slice[self.colname])) result = np.size(np.unique(data_slice[self.colname])) / ntot return result
[docs] class CountMetric(BaseMetric): """Count the length of a simData column slice.""" def __init__(self, col=None, units="#", **kwargs): super().__init__(col=col, units=units, **kwargs) self.metric_dtype = "int"
[docs] def run(self, data_slice, slice_point=None): return len(data_slice[self.colname])
[docs] class CountExplimMetric(BaseMetric): """Count the number of x second visits. Useful for rejecting very short exposures and counting 60s exposures as 2 visits. Parameters ---------- min_exp : `float`, optional Minimum exposure time to consider as a "visit". Exposures shorter than this will not be counted (count as 0). expected_exp : `float`, optional Typical exposure time to expect. Exposures longer than this will be counted as round(visit exposure time / expected_exp). (i.e. 40s = 1, 50s = 2). exp_col : `str`, optional Column name to use for visit exposure time. Returns ------- value : `int` The number of visits longer than min_exp and weighted by expected_exp. """ def __init__(self, min_exp=20.0, expected_exp=30.0, exp_col="visitExposureTime", **kwargs): self.min_exp = min_exp self.expected_exp = expected_exp self.exp_col = exp_col if "col" in kwargs: del kwargs["col"] super().__init__(col=[exp_col], **kwargs) self.metric_dtype = "int"
[docs] def run(self, data_slice, slice_point=None): nv = data_slice[self.exp_col] / self.expected_exp nv[np.where(data_slice[self.exp_col] < self.min_exp)[0]] = 0 nv = np.round(nv) return int(np.sum(nv))
[docs] class CountRatioMetric(BaseMetric): """Count the length of a column slice, then divide by `norm_val`.""" def __init__(self, col=None, norm_val=1.0, metric_name=None, units="", **kwargs): self.norm_val = float(norm_val) if metric_name is None: metric_name = "CountRatio %s div %.1f" % (col, norm_val) super(CountRatioMetric, self).__init__(col=col, metric_name=metric_name, **kwargs)
[docs] def run(self, data_slice, slice_point=None): return len(data_slice[self.colname]) / self.norm_val
[docs] class CountSubsetMetric(BaseMetric): """Count the length of a column slice which matches `subset`.""" def __init__(self, col=None, subset=None, units="#", **kwargs): super(CountSubsetMetric, self).__init__(col=col, units=units, **kwargs) self.metric_dtype = "int" self.badval = 0 self.subset = subset
[docs] def run(self, data_slice, slice_point=None): count = len(np.where(data_slice[self.colname] == self.subset)[0]) return count
[docs] class CountBeyondThreshold(BaseMetric): """Count the number of entries in a data column above or below the `threshold`.""" def __init__(self, col=None, lower_threshold=None, upper_threshold=None, **kwargs): super().__init__(col=col, **kwargs) self.lower_threshold = lower_threshold self.upper_threshold = upper_threshold
[docs] def run(self, data_slice, slice_point=None): # Look for data values which match the criteria for the thresholds if self.upper_threshold is None and self.lower_threshold is None: count = len(data_slice) elif self.upper_threshold is None: count = len(np.where(data_slice[self.colname] > self.lower_threshold)[0]) elif self.lower_threshold is None: count = len(np.where(data_slice[self.colname] < self.upper_threshold)[0]) else: count = len( np.where( (data_slice[self.colname] > self.lower_threshold) and (data_slice[self.colname] < self.upper_threshold) )[0] ) return count
[docs] class RobustRmsMetric(BaseMetric): """Use the inter-quartile range of the data to estimate the RMS. Robust, as this calculation does not include outliers in the distribution. """
[docs] def run(self, data_slice, slice_point=None): iqr = np.percentile(data_slice[self.colname], 75) - np.percentile(data_slice[self.colname], 25) rms = iqr / 1.349 # approximation return rms
[docs] class MaxPercentMetric(BaseMetric): """Return the percent of data which matches the maximum value of the data. """
[docs] def run(self, data_slice, slice_point=None): n_max = np.size(np.where(data_slice[self.colname] == np.max(data_slice[self.colname]))[0]) percent = n_max / float(data_slice[self.colname].size) * 100.0 return percent
[docs] class AbsMaxPercentMetric(BaseMetric): """Return the percent of data which matches the absolute value of the max value of the data. """
[docs] def run(self, data_slice, slice_point=None): max_val = np.abs(np.max(data_slice[self.colname])) n_max = np.size(np.where(np.abs(data_slice[self.colname]) == max_val)[0]) percent = n_max / float(data_slice[self.colname].size) * 100.0 return percent
[docs] class BinaryMetric(BaseMetric): """Return 1 if there is data, `badval` otherwise."""
[docs] def run(self, data_slice, slice_point=None): if data_slice.size > 0: return 1 else: return self.badval
[docs] class FracAboveMetric(BaseMetric): """Find the fraction of data values above a given `cutoff`.""" def __init__(self, col=None, cutoff=0.5, scale=1, metric_name=None, **kwargs): # Col could just get passed in bundle with kwargs, # by explicitly pulling it out first, we support use cases where # class instantiated without explicit 'col='). if metric_name is None: metric_name = "FracAbove %.2f in %s" % (cutoff, col) super(FracAboveMetric, self).__init__(col, metric_name=metric_name, **kwargs) self.cutoff = cutoff self.scale = scale
[docs] def run(self, data_slice, slice_point=None): good = np.where(data_slice[self.colname] >= self.cutoff)[0] frac_above = np.size(good) / float(np.size(data_slice[self.colname])) frac_above = frac_above * self.scale return frac_above
[docs] class FracBelowMetric(BaseMetric): """Find the fraction of data values below a given `cutoff`.""" def __init__(self, col=None, cutoff=0.5, scale=1, metric_name=None, **kwargs): if metric_name is None: metric_name = "FracBelow %.2f %s" % (cutoff, col) super(FracBelowMetric, self).__init__(col, metric_name=metric_name, **kwargs) self.cutoff = cutoff self.scale = scale
[docs] def run(self, data_slice, slice_point=None): good = np.where(data_slice[self.colname] <= self.cutoff)[0] frac_below = np.size(good) / float(np.size(data_slice[self.colname])) frac_below = frac_below * self.scale return frac_below
[docs] class PercentileMetric(BaseMetric): """Find the value of a column at a given `percentile`.""" def __init__(self, col=None, percentile=90, metric_name=None, **kwargs): if metric_name is None: metric_name = "%.0fth%sile %s" % (percentile, "%", col) super(PercentileMetric, self).__init__(col=col, metric_name=metric_name, **kwargs) self.percentile = percentile
[docs] def run(self, data_slice, slice_point=None): pval = np.percentile(data_slice[self.colname], self.percentile) return pval
[docs] class NoutliersNsigmaMetric(BaseMetric): """Calculate the # of visits less than n_sigma below the mean (n_sigma<0) or more than n_sigma above the mean. """ def __init__(self, col=None, n_sigma=3.0, metric_name=None, **kwargs): self.n_sigma = n_sigma self.col = col if metric_name is None: metric_name = "Noutliers %.1f %s" % (self.n_sigma, self.col) super(NoutliersNsigmaMetric, self).__init__(col=col, metric_name=metric_name, **kwargs) self.metric_dtype = "int"
[docs] def run(self, data_slice, slice_point=None): med = np.mean(data_slice[self.colname]) std = np.std(data_slice[self.colname]) boundary = med + self.n_sigma * std # If nsigma is positive, look for outliers above median. if self.n_sigma >= 0: outsiders = np.where(data_slice[self.colname] > boundary) # Else look for outliers below median. else: outsiders = np.where(data_slice[self.colname] < boundary) return len(data_slice[self.colname][outsiders])
def _rotate_angles(angles): """Private utility for the '*Angle' Metrics below. This takes a series of angles between 0-2pi and rotates them so that the first angle is at 0, ensuring the biggest 'gap' is at the end of the series. This simplifies calculations like the 'mean' and 'rms' or 'fullrange', removing the discontinuity at 0/2pi. """ angleidx = np.argsort(angles) diffangles = np.diff(angles[angleidx]) start_to_end = np.array([twopi - angles[angleidx][-1] + angles[angleidx][0]], float) if start_to_end < -2.0 * np.pi: raise ValueError("Angular metrics expect radians, this seems to be in degrees") diffangles = np.concatenate([diffangles, start_to_end]) maxdiff = np.where(diffangles == diffangles.max())[0] if len(maxdiff) > 1: maxdiff = maxdiff[-1:] if maxdiff == (len(angles) - 1): rotation = angles[angleidx][0] else: rotation = angles[angleidx][maxdiff + 1][0] return (rotation, (angles - rotation) % twopi)
[docs] class MeanAngleMetric(BaseMetric): """Calculate the mean of an angular (degree) column slice. 'MeanAngle' differs from 'Mean' in that it accounts for wraparound at 2pi. """
[docs] def run(self, data_slice, slice_point=None): """Calculate mean angle via unit vectors. If unit vector 'strength' is less than 0.1, then just set mean to 180 degrees (as this indicates nearly uniformly distributed angles). """ x = np.cos(np.radians(data_slice[self.colname])) y = np.sin(np.radians(data_slice[self.colname])) meanx = np.mean(x) meany = np.mean(y) angle = np.arctan2(meany, meanx) radius = np.sqrt(meanx**2 + meany**2) mean = angle % twopi if radius < 0.1: mean = np.pi return np.degrees(mean)
[docs] class RmsAngleMetric(BaseMetric): """Calculate the standard deviation of an angular (degrees) column slice. 'RmsAngle' differs from 'Rms' in that it accounts for wraparound at 2pi. """
[docs] def run(self, data_slice, slice_point=None): rotation, angles = _rotate_angles(np.radians(data_slice[self.colname])) return np.std(np.degrees(angles))
[docs] class FullRangeAngleMetric(BaseMetric): """Calculate the full range of an angular (degrees) column slice. 'FullRangeAngle' differs from 'FullRange' in that it accounts for wraparound at 2pi. """
[docs] def run(self, data_slice, slice_point=None): rotation, angles = _rotate_angles(np.radians(data_slice[self.colname])) return np.degrees(angles.max() - angles.min())
[docs] class AngularSpreadMetric(BaseMetric): """Compute the angular spread statistic which measures uniformity of a distribution angles accounting for 2pi periodicity. The strategy is to first map angles into unit vectors on the unit circle, and then compute the 2D centroid of those vectors. A uniform distribution of angles will lead to a distribution of unit vectors with mean that approaches the origin. In contrast, a delta function distribution of angles leads to a delta function distribution of unit vectors with a mean that lands on the unit circle. The angular spread statistic is then defined as 1 - R, where R is the radial offset of the mean of the unit vectors derived from the input angles. R approaches 1 for a uniform distribution of angles and 0 for a delta function distribution of angles. The optional parameter `period` may be used to specificy periodicity other than 2 pi. """ def __init__(self, col=None, period=2.0 * np.pi, **kwargs): # https://en.wikipedia.org/wiki/Directional_statistics # #Measures_of_location_and_spread # jmeyers314@gmail.com self.period = period super(AngularSpreadMetric, self).__init__(col=col, **kwargs)
[docs] def run(self, data_slice, slice_point=None): # Unit vectors; unwrapped at specified period x = np.cos(data_slice[self.colname] * 2.0 * np.pi / self.period) y = np.sin(data_slice[self.colname] * 2.0 * np.pi / self.period) meanx = np.mean(x) meany = np.mean(y) # radial offset (i.e., length) of the mean unit vector R = np.hypot(meanx, meany) return 1.0 - R
[docs] class RealMeanMetric(BaseMetric): """Calculate the mean of a column with no nans or infs."""
[docs] def run(self, data_slice, slice_point=None): return np.mean(data_slice[self.colname][np.isfinite(data_slice[self.colname])])