__all__ = ("SingleLinearMultibandModelMetric", "NestedLinearMultibandModelMetric")
import healpy as hp
import numpy as np
from .base_metric import BaseMetric
from .exgal_m5 import ExgalM5
from .weak_lensing_systematics_metric import ExgalM5WithCuts, RIZDetectionCoaddExposureTime
[docs]
class SingleLinearMultibandModelMetric(BaseMetric):
"""Calculate a single linear combination of depths.
This is useful for calculating density or redshift fluctuations
from depth fluctuations on the sky,
for a single redshift bins (thus it is a single metric).
For multiple bins, see NestedLinearMultibandModelMetric.
Parameters
----------
arr_of_model_dicts : `dict`
Linear coefficients to be applied to the M5 depth values.
Keys should be filter names + 'cst' if there is a constant offset.
post_processing_fn : lambda
Post processing function to apply to the linear combination of inputs.
Default `lambda x: x` simply returns the output.
extinction_cut : `float`, optional
E(B-V) cut on extinction (0.2 by default).
n_filters : `int`, optional
Cut on the number of filters required (6 by default).
min_depth_cut : `dict`, optional
Cut the areas of low depths, based on cuts in this dict.
Keys should be filter names. Default is no cuts.
max_depth_cut : `dict`, optional
Cut the areas of high depths, based on cuts in this dict.
Keys should be filter names. Default is no cuts.
mean_depth : `dict`, optional
Mean depths, which will be subtracted from the inputs
before applying model.
Keys should be filter names. Default is zero in each band.
In a lot of cases, instead of feeding mean_depths,
one can remove the monopole in the constructed healpix maps.
m5_col : `str`, optional
Column name for the m5 depth.
filter_col : `str`, optional
Column name for the filter.
units : `str`, optional
Label for "units" in the output, for use in plots.
badval : `float`, optional
Value to return for metric failure.
Specified here so that exgalm5 uses the same value.
Returns
-------
result : `float`
Linear combination of the M5 depths (minus mean depth, if provided).
result = np.sum([
model_dict[band] * M5_depth[band]
for band in ["u", "g", "r", "i", "z", "y"]
])
if post_processing_fn is provided:
result => post_processing_fn(result)
"""
def __init__(
self,
model_dict,
post_processing_fn=lambda x: x,
extinction_cut=0.2,
n_filters=6,
min_depth_cut=None,
max_depth_cut=None,
mean_depth=None,
m5_col="fiveSigmaDepth",
filter_col="filter",
units="mag",
badval=np.nan,
**kwargs,
):
if min_depth_cut is None:
min_depth_cut = {}
self.min_depth_cut = min_depth_cut
if max_depth_cut is None:
max_depth_cut = {}
self.max_depth_cut = max_depth_cut
if mean_depth is None:
# Set mean_depth to 0 if not specified
mean_depth = dict([(f, 0) for f in "ugrizy"])
self.mean_depth = mean_depth
self.n_filters = n_filters
self.extinction_cut = extinction_cut
if "cst" in model_dict:
self.cst = model_dict["cst"]
else:
self.cst = 0.0
lsst_filters = ["u", "g", "r", "i", "z", "y"]
self.bands = [x for x in model_dict.keys() if x in lsst_filters]
self.model_arr = np.array([model_dict[x] for x in self.bands])[:, None]
self.m5_col = m5_col
self.filter_col = filter_col
self.post_processing_fn = post_processing_fn
self.exgal_m5 = ExgalM5(m5_col=m5_col, filter_col=filter_col, badval=badval)
super().__init__(
col=[m5_col, filter_col], units=units, maps=self.exgal_m5.maps, badval=badval, **kwargs
)
def run(self, data_slice, slice_point=None):
""" """
if slice_point["ebv"] > self.extinction_cut:
return self.badval
n_filters = len(set(data_slice[self.filter_col]))
if n_filters < self.n_filters:
return self.badval
# add extinction_cut and depth cuts? and n_filters cuts
depths = np.vstack(
[
self.exgal_m5.run(data_slice[data_slice[self.filter_col] == lsst_filter], slice_point)
for lsst_filter in self.bands
]
).T
for i, lsst_filter in enumerate(self.bands):
if lsst_filter in self.mean_depth:
depths[i] -= self.mean_depth[lsst_filter]
# Don't raise Exceptions in the middle of metrics -
# assert depths.shape[0] >= 1
if depths.shape[0] < 1:
return self.badval
if np.any(depths == self.badval):
return self.badval
for i, lsst_filter in enumerate(self.bands):
if (
lsst_filter in self.min_depth_cut
and depths[0, i] < self.min_depth_cut[lsst_filter] - self.mean_depth[lsst_filter]
):
return self.badval
if (
lsst_filter in self.max_depth_cut
and depths[0, i] > self.max_depth_cut[lsst_filter] - self.mean_depth[lsst_filter]
):
return self.badval
val = self.post_processing_fn(self.cst + np.dot(depths, self.model_arr))
return val
[docs]
class NestedLinearMultibandModelMetric(BaseMetric):
"""Calculate multiple linear combinations of depths.
This is useful for calculating density or redshift fluctuations
from depth fluctuations on the sky,
for multiple redshift bins (thus it is a nested metric).
For a single bin, see LinearMultibandModelMetric.
Points of contact / contributors: Boris Leistedt
Parameters
----------
arr_of_model_dicts : `list` [ `dicts` ]
Array of linear coefficients to be applied to the M5 depth values.
Keys should be filter names + 'cst' if there is a constant offset.
post_processing_fn : lambda
Post processing function to apply to the linear combination of inputs.
Default `lambda x: x` simply returns the output.
extinction_cut : `float`, optional
E(B-V) cut on extinction (0.2 by default).
n_filters : `int`, optional
Cut on the number of filters required (6 by default).
min_depth_cut : `dict`, optional
Cut the areas of low depths, based on cuts in this dict.
Keys should be filter names. Default is no cuts.
max_depth_cut : `dict`, optional
Cut the areas of high depths, based on cuts in this dict.
Keys should be filter names. Default is no cuts.
mean_depth : `dict`, optional
Mean depths, which will be subtracted from the inputs
before applying model.
Keys should be filter names. Default is zero in each band.
In a lot of cases, instead of feeding mean_depths,
one can remove the monopole in the constructed healpix maps.
m5_col : `str`, optional
Column name for the m5 depth.
filter_col : `str`, optional
Column name for the filter.
units : `str`, optional
Label for "units" in the output, for use in plots.
badval : `float`, optional
Value to return for metric failure.
Specified here so that exgalm5 uses the same value.
Returns
-------
result : `list` [ `float` ]
List is the same size as arr_of_model_dicts.
For each element, return a linear combination of the M5 depths
(minus mean depth, if provided).
for i, model_dict in enumerate(arr_of_model_dicts):
result[i] = np.sum([
model_dict[band] * M5_depth[band]
for band in ["u", "g", "r", "i", "z", "y"]
])
if post_processing_fn is provided:
result[i] => post_processing_fn(result[i])
"""
def __init__(
self,
arr_of_model_dicts,
post_processing_fn=lambda x: x,
extinction_cut=0.2,
n_filters=6,
min_depth_cut=None,
max_depth_cut=None,
mean_depth=None,
m5_col="fiveSigmaDepth",
filter_col="filter",
units="mag",
badval=np.nan,
**kwargs,
):
self.arr_of_model_dicts = arr_of_model_dicts
self.n_filters = n_filters
self.extinction_cut = extinction_cut
if min_depth_cut is None:
min_depth_cut = {}
self.min_depth_cut = min_depth_cut
if max_depth_cut is None:
max_depth_cut = {}
self.max_depth_cut = max_depth_cut
if mean_depth is None:
mean_depth = {}
self.mean_depth = mean_depth
self.m5_col = m5_col
self.badval = badval
self.mask_val = hp.UNSEEN
self.filter_col = filter_col
self.post_processing_fn = post_processing_fn
self.n_bins = len(self.arr_of_model_dicts)
self.bad_val_arr = np.repeat(self.badval, self.n_bins)
self.exgal_m5 = ExgalM5(
m5_col=m5_col,
filter_col=filter_col,
badval=self.badval,
)
super().__init__(
col=[m5_col, filter_col],
badval=self.badval,
units=units,
maps=self.exgal_m5.maps,
metric_dtype="object",
**kwargs,
)
[docs]
def run(self, data_slice, slice_point=None):
# apply extinction and n_filters cut
if slice_point["ebv"] > self.extinction_cut:
return self.bad_val_arr
n_filters = len(set(data_slice[self.filter_col]))
if n_filters < self.n_filters:
return self.bad_val_arr
lsst_filters = ["u", "g", "r", "i", "z", "y"]
# initialize dictionary of outputs
# types = [float]*n_bins
result_arr = np.zeros((self.n_bins,), dtype=float)
for binnumber, model_dict in enumerate(self.arr_of_model_dicts):
# put together a vector of depths
depths = np.vstack(
[
self.exgal_m5.run(data_slice[data_slice[self.filter_col] == lsst_filter], slice_point)
for lsst_filter in lsst_filters
]
).T
# if there are depth cuts, apply them
for i, lsst_filter in enumerate(lsst_filters):
if lsst_filter in self.min_depth_cut and depths[0, i] < self.min_depth_cut[lsst_filter]:
depths[0, i] = self.badval
if lsst_filter in self.max_depth_cut and depths[0, i] > self.max_depth_cut[lsst_filter]:
depths[0, i] = self.badval
# if provided, subtract the mean
for i, lsst_filter in enumerate(lsst_filters):
if lsst_filter in self.mean_depth:
depths[i] -= self.mean_depth[lsst_filter]
# now assemble the results
model_arr = np.array([model_dict[x] if x in model_dict else 0.0 for x in lsst_filters])
cst = model_dict["cst"] if "cst" in model_dict else 0.0
result_arr[binnumber] = self.post_processing_fn(cst + np.dot(depths, model_arr))
# returns array where each element of the original
# input arr_of_model_dicts contains a scalar value resulting
# from a linear combination of the six depths
return result_arr
class NestedRIZExptimeExgalM5Metric(BaseMetric):
"""TODO"""
def __init__(
self,
m5_col="fiveSigmaDepth",
filter_col="filter",
exptime_col="visitExposureTime",
extinction_cut=0.2,
n_filters=6,
depth_cut=24,
metric_name="new_nested_FoM",
lsst_filter="i",
badval=np.nan,
**kwargs,
):
maps = ["DustMap"]
self.m5_col = m5_col
self.filter_col = filter_col
self.exptime_col = exptime_col
self.lsst_filter = lsst_filter
self.n_filters = n_filters
cols = [self.m5_col, self.filter_col, self.exptime_col]
super().__init__(cols, metric_name=metric_name, maps=maps, badval=badval, **kwargs)
self.riz_exptime_metric = RIZDetectionCoaddExposureTime(ebvlim=extinction_cut)
self.exgalm5_metric = ExgalM5WithCuts(
m5_col=m5_col,
filter_col=filter_col,
extinction_cut=extinction_cut,
depth_cut=depth_cut,
lsst_filter=lsst_filter,
badval=badval,
n_filters=n_filters,
)
self.metric_dtype = "object"
def run(self, data_slice, slice_point):
# set up array to hold the results
names = ["exgal_m5", "riz_exptime"]
types = [float] * 2
result = np.zeros(1, dtype=list(zip(names, types)))
result["exgal_m5"] = self.exgalm5_metric.run(data_slice, slice_point)
result["riz_exptime"] = self.riz_exptime_metric.run(
data_slice[data_slice[self.filter_col] == "i"], slice_point
)
return result
class MultibandExgalM5(BaseMetric):
"""Calculate multiple linear combinations of depths."""
def __init__(
self,
extinction_cut=0.2,
n_filters=6,
m5_col="fiveSigmaDepth",
filter_col="filter",
units="mag",
badval=np.nan,
**kwargs,
):
self.n_filters = n_filters
self.extinction_cut = extinction_cut
self.m5_col = m5_col
self.badval = badval
self.filter_col = filter_col
self.lsst_filters = ["u", "g", "r", "i", "z", "y"]
self.bad_val_arr = np.repeat(self.badval, len(self.lsst_filters))
self.exgal_m5 = ExgalM5(
m5_col=m5_col,
filter_col=filter_col,
badval=self.badval,
)
super().__init__(
col=[m5_col, filter_col],
badval=self.badval,
units=units,
maps=self.exgal_m5.maps,
metric_dtype="object",
**kwargs,
)
def run(self, data_slice, slice_point=None):
# apply extinction and n_filters cut
if slice_point["ebv"] > self.extinction_cut:
return self.bad_val_arr
n_filters = len(set(data_slice[self.filter_col]))
if n_filters < self.n_filters:
return self.bad_val_arr
depths = np.vstack(
[
self.exgal_m5.run(data_slice[data_slice[self.filter_col] == lsst_filter], slice_point)
for lsst_filter in self.lsst_filters
]
).T
return depths.ravel()