Source code for rubin_sim.maf.metrics.agnstructure
__all__ = ("SFUncertMetric",)
import warnings
import numpy as np
from astropy.stats import mad_std
from rubin_sim.maf.utils import m52snr
from rubin_sim.phot_utils import DustValues
from .base_metric import BaseMetric
[docs]
class SFUncertMetric(BaseMetric):
"""Structure Function (SF) Uncertainty Metric.
Developed on top of LogTGaps
Adapted from Weixiang Yu & Gordon Richards at:
https://github.com/RichardsGroup/
LSST_SF_Metric/blob/main/notebooks/00_SFErrorMetric.ipynb
Parameters
----------
mag : `float`
The magnitude of the fiducial object. Default 22.
times_col : `str`
Time column name. Defaults to "observationStartMJD".
all_gaps : `bool`
Whether to use all gaps (between any two pairs of observations).
If False, only use consecutive paris. Defaults to True.
units : `str`
Unit of this metric. Defaults to "mag".
bins : `object`
An array of bin edges.
Defaults to "np.logspace(0, np.log10(3650), 16)" for a
total of 15 (final) bins.
weight : `object`
The weight assigned to each delta_t bin for deriving the final metric.
Defaults to flat weighting with sum of 1.
Should have length 1 less than bins.
snr_cut : `float`
Ignore observations below an SNR limit, default 5.
dust : `bool`
Apply dust extinction to the fiducial object magnitude. Default True.
"""
def __init__(
self,
mag=22,
times_col="observationStartMJD",
m5_col="fiveSigmaDepth",
all_gaps=True,
units="mag",
bins=np.logspace(0, np.log10(3650), 16),
weight=None,
metric_name="Structure Function Uncert",
snr_cut=5,
filter_col="filter",
dust=True,
**kwargs,
):
# Assign metric parameters to instance object
self.times_col = times_col
self.m5_col = m5_col
self.filter_col = filter_col
self.all_gaps = all_gaps
self.bins = bins
if weight is None:
# If weight is none, set weight so that sum over bins = 1
self.weight = np.ones(len(self.bins) - 1)
self.weight /= self.weight.sum()
self.metric_name = metric_name
self.mag = mag
self.snr_cut = snr_cut
self.dust = dust
maps = ["DustMap"]
super(SFUncertMetric, self).__init__(
col=[self.times_col, m5_col, filter_col],
metric_name=self.metric_name,
units=units,
maps=maps,
**kwargs,
)
dust_properties = DustValues()
self.ax1 = dust_properties.ax1
[docs]
def run(self, data_slice, slice_point=None):
"""Code executed at each healpix pixel to compute the metric"""
df = np.unique(data_slice[self.filter_col])
if np.size(df) > 1:
msg = """Running structure function on multiple filters simultaneously.
Should probably change your SQL query to limit to a single filter."""
warnings.warn(msg)
if self.dust:
a_x = self.ax1[data_slice[self.filter_col][0]] * slice_point["ebv"]
extincted_mag = self.mag + a_x
else:
extincted_mag = self.mag
snr = m52snr(extincted_mag, data_slice[self.m5_col])
bright_enough = np.where(snr > self.snr_cut)[0]
# If the total number of visits < 2, mask as bad pixel
if data_slice[bright_enough].size < 2:
return self.badval
# sort data by time column
order = np.argsort(data_slice[self.times_col][bright_enough])
times = data_slice[self.times_col][bright_enough][order]
# Using the simple Gaussian approximation for magnitude uncertainty.
mag_err = 2.5 * np.log10(1.0 + 1.0 / snr[bright_enough][order])
# check if use all gaps (between any pairs of observations)
if self.all_gaps:
# use the vectorized method
dt_matrix = times.reshape((1, times.size)) - times.reshape((times.size, 1))
dts = dt_matrix[dt_matrix > 0].flatten().astype(np.float16)
else:
dts = np.diff(times)
# bin delta_t using provided bins;
# if zero pair found at any delta_t bin,
# replace 0 with 0.01 to avoid the exploding 1/sqrt(n) term
# in this metric
result, bins = np.histogram(dts, self.bins)
new_result = np.where(result > 0, result, 0.01)
# compute photometric_error^2 population variance and population mean
# note that variance is replaced by median_absolute_deviate^2
# mean is replaced by median in this implementation to make it robust
# to outliers in simulations (e.g., dcr simulations)
err_var = mag_err**2
err_var_mu = np.median(err_var)
err_var_std = mad_std(err_var)
# compute SF error
sf_var_dt = 2 * (err_var_mu + err_var_std / np.sqrt(new_result))
sf_var_metric = np.sum(sf_var_dt * self.weight)
return sf_var_metric