__all__ = (
"TotalPowerMetric",
"StaticProbesFoMEmulatorMetricSimple",
"TomographicClusteringSigma8biasMetric",
"MultibandMeanzBiasMetric",
)
import warnings
from copy import deepcopy
import healpy as hp
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from scipy import interpolate
from scipy.stats import median_abs_deviation
from sklearn.cluster import KMeans
from ..maf_contrib.static_probes_fom_summary_metric import StaticProbesFoMEmulatorMetric
from .area_summary_metrics import AreaThresholdMetric
from .base_metric import BaseMetric
from .simple_metrics import PercentileMetric
# Cosmology-related summary metrics.
# These generally calculate a FoM for various DESC metrics.
class TotalPowerMetric(BaseMetric):
"""Calculate the total power in the angular power spectrum,
between lmin/lmax.
Parameters
----------
lmin : `float`, optional
Minimum ell value to include when calculating total power.
lmax : `float`, optional
Maximum ell value to include when calculating total power.
remove_monopole : `bool`, optional
Flag to remove monopole when calculating total power.
remove_dipole : `bool`, optional
Flag to remove dipole when calculating total power.
col : `str`, optional
The column name to operate on.
For summary metrics, this is almost always `metricdata`.
mask_val : `float` or np.nan, optional
The mask value to apply to the metric values when passed.
If this attribute exists, the metric_values will be passed
using metric_values.filled(mask_val).
If mask_val is `None` for a metric, metric_values will be passed
using metric_values.compressed().
"""
def __init__(
self,
lmin=100.0,
lmax=300.0,
remove_monopole=True,
remove_dipole=True,
col="metricdata",
mask_val=np.nan,
**kwargs,
):
self.lmin = lmin
self.lmax = lmax
self.remove_monopole = remove_monopole
self.remove_dipole = remove_dipole
super().__init__(col=col, mask_val=mask_val, **kwargs)
def run(self, data_slice, slice_point=None):
# Calculate the power spectrum.
data = data_slice[self.colname]
if self.remove_monopole:
data = hp.remove_monopole(data, verbose=False, bad=self.mask_val)
if self.remove_dipole:
data = hp.remove_dipole(data, verbose=False, bad=self.mask_val)
cl = hp.anafast(data)
ell = np.arange(np.size(cl))
condition = np.where((ell <= self.lmax) & (ell >= self.lmin))[0]
totalpower = np.sum(cl[condition] * (2 * ell[condition] + 1))
return totalpower
class StaticProbesFoMEmulatorMetricSimple(BaseMetric):
"""Calculate the FoM for the combined static probes
(3x2pt, i.e. Weak Lensing, LSS, Clustering).
Parameters
----------
year : `int`, optional
The year of the survey to calculate FoM.
This calibrates expected depth and area.
Returns
-------
result : `float`
The simple 3x2pt FoM emulator value, for the
years where the correlation between area/depth and value is defined.
Notes
-----
This FoM is purely statistical and does not factor in systematics.
The implementation here is simpler than in
`rubin_sim.maf.mafContrib.StaticProbesFoMEmulatorMetric`, and that
more sophisticated version should replace this metric.
This version of the emulator was used to generate the results in
https://ui.adsabs.harvard.edu/abs/2018arXiv181200515L/abstract
Note that this is truly a summary metric and should be run on the
output of Exgalm5_with_cuts.
"""
def __init__(self, year=10, **kwargs):
self.year = year
super().__init__(col="metricdata", mask_val=-666, **kwargs)
def run(self, data_slice, slice_point=None):
# derive nside from length of data slice
nside = hp.npix2nside(len(data_slice))
pix_area = hp.nside2pixarea(nside, degrees=True)
# Chop off any outliers (and also the masked value)
good_pix = np.where(data_slice[self.col] > 0)[0]
# Calculate area and med depth from
area = pix_area * np.size(good_pix)
median_depth = np.median(data_slice[self.col][good_pix])
# FoM is calculated at the following values
if self.year == 1:
areas = [7500, 13000, 16000]
depths = [24.9, 25.2, 25.5]
fom_arr = [
[1.212257e02, 1.462689e02, 1.744913e02],
[1.930906e02, 2.365094e02, 2.849131e02],
[2.316956e02, 2.851547e02, 3.445717e02],
]
elif self.year == 3:
areas = [10000, 15000, 20000]
depths = [25.5, 25.8, 26.1]
fom_arr = [
[1.710645e02, 2.246047e02, 2.431472e02],
[2.445209e02, 3.250737e02, 3.516395e02],
[3.173144e02, 4.249317e02, 4.595133e02],
]
elif self.year == 6:
areas = [10000, 15000, 2000]
depths = [25.9, 26.1, 26.3]
fom_arr = [
[2.346060e02, 2.414678e02, 2.852043e02],
[3.402318e02, 3.493120e02, 4.148814e02],
[4.452766e02, 4.565497e02, 5.436992e02],
]
elif self.year == 10:
areas = [10000, 15000, 20000]
depths = [26.3, 26.5, 26.7]
fom_arr = [
[2.887266e02, 2.953230e02, 3.361616e02],
[4.200093e02, 4.292111e02, 4.905306e02],
[5.504419e02, 5.624697e02, 6.441837e02],
]
else:
warnings.warn("FoMEmulator is not defined for this year")
return self.badval
# Interpolate FoM to the actual values for this sim
areas = [[i] * 3 for i in areas]
depths = [depths] * 3
f = interpolate.interp2d(areas, depths, fom_arr, bounds_error=False)
fom = f(area, median_depth)[0]
return fom
[docs]
class TomographicClusteringSigma8biasMetric(BaseMetric):
"""Compute bias on sigma8 due to spurious contamination of density maps.
Run as summary metric on NestedLinearMultibandModelMetric.
Points of contact / contributors: Boris Leistedt
Parameters
----------
density_tomograph_model : `dict`
dictionary containing models calculated for fiducial N(z)s and Cells:
lmax : numpy.array of int, of shape (Nbins, )
lmax corresponding to kmax of 0.05
poly1d_coefs_loglog : numpy.array of float, of shape (Nbins, )
polynomial fits to log(C_ell) vs log(ell) computed for CCL
sigma8square_model (float)
value of sigma8^2 used as fiducal model for CCL
power_multiplier : `float`, optional
fraction of power (variance) which is uncorrected
and thus biases sigma8
lmin : `int`, optional
lmin for the analysis
convert_to_sigma8 : `str`, optional
Convert the bias to sigma8 instead of sigma8^2
(via change of variables for the uncertainty)
Returns
-------
result : `float`
Value of sigma8 bias calculated from this model:
(sigma8^2_obs - sigma^2_model) / error on sigma8^2_obs
if `convert_to_sigma8` is True,
then it is about sigma8 instead of sigma8^2.
Notes
-----
This is a summary metric to be run on the results
of the NestedLinearMultibandModelMetric.
NestedLinearMultibandModelMetric converts 6-band depth maps into
a set of maps (e.g tomographic redshift bins) which describe
spurious density fluctuations in each bin.
This summary metric multiplies the maps by the parameter power_multiplier,
which can be used to describe the fraction of power uncorrected by
systematics mitigation schemes, computes the total power
(via angular power spectra with lmin-lmax limits)
and then infers sigma8^2 via a model of the angular power spectra.
By taking sigma8_obs minus sigma8_model divided by the uncertainty,
one derives a bias.
"""
def __init__(
self,
density_tomography_model,
power_multiplier=0.1,
lmin=10,
convert_to_sigma8=True,
**kwargs,
):
super().__init__(col="metricdata", **kwargs)
# Set mask_val, so that we receive metric_values.filled(mask_val)
self.mask_val = hp.UNSEEN
self.badval = hp.UNSEEN
self.convert_to_sigma8 = convert_to_sigma8
self.power_multiplier = power_multiplier
self.lmin = lmin
self.density_tomography_model = density_tomography_model
# to compute angular power spectra and total power,
# initialize an array of metrics, with the right lmin and lmax.
self.totalPowerMetrics = [
TotalPowerMetric(lmin=lmin, lmax=lmax, mask_val=self.mask_val)
for lmax in density_tomography_model["lmax"]
]
self.areaThresholdMetric = AreaThresholdMetric(
lower_threshold=hp.UNSEEN,
upper_threshold=np.inf,
mask_val=self.mask_val,
)
[docs]
def run(self, data_slice, slice_point=None):
# need to define an array of bad values for the masked pixels
badval_arr = np.repeat(self.badval, len(self.density_tomography_model["lmax"]))
# converts the input recarray to an array
data_slice_list = [
badval_arr if isinstance(x, float) else x for x in data_slice["metricdata"].tolist()
]
# should be (nbins, npix)
data_slice_arr = np.asarray(data_slice_list, dtype=float).T
data_slice_arr[~np.isfinite(data_slice_arr)] = (
hp.UNSEEN
) # need to work with TotalPowerMetric and healpix
# measure valid sky fractions and total power
# (via angular power spectra) in each bin.
# The original metric returns an array at each slice_point (of the
# original slicer) -- so there is a bit of "rearrangement" that
# has to happen to be able to pass a np.array with right dtype
# (i.e. dtype = [("metricdata", float)]) to each call to
# the AreaThresholdMetric and TotalPowerMetric `run` methods.
totalsky = 42000
fskys = np.array(
[
self.areaThresholdMetric.run(np.core.records.fromrecords(x, dtype=[("metricdata", float)]))
/ totalsky
for x in data_slice_arr
]
) # sky fraction
spuriousdensitypowers = (
np.array(
[
self.totalPowerMetrics[i].run(
np.core.records.fromrecords(x, dtype=[("metricdata", float)])
)
for i, x in enumerate(data_slice_arr)
]
)
/ fskys
)
print("spuriousdensitypowers:", spuriousdensitypowers)
print("fskys:", fskys)
def solve_for_multiplicative_factor(spurious_powers, model_cells, fskys, lmin, power_multiplier):
"""
Infer multiplicative factor sigma8^2 (and uncertainty)
from the model Cells and observed total powers
since it os a Gaussian posterior distribution.
"""
# solve for multiplicative sigma8^2 term between
# measured angular power spectra
# (spurious measured Cells times power_multiplier)
# and model ones (polynomial model from CCL).
n_bins = model_cells["lmax"].size
assert len(spurious_powers) == n_bins
assert len(fskys) == n_bins
assert model_cells["poly1d_coefs_loglog"].shape[0] == n_bins
totalvar_mod = np.zeros((n_bins, 1))
totalvar_obs = np.zeros((n_bins, 1))
totalvar_var = np.zeros((n_bins, 1))
# loop over tomographic bins
# hardcoded; assumed CCL cosmology
sigma8square_model = model_cells["sigma8square_model"]
for i in range(n_bins):
# get model Cells from polynomial model (in log log space)
ells = np.arange(lmin, model_cells["lmax"][i])
polynomial_model = np.poly1d(model_cells["poly1d_coefs_loglog"][i, :])
cells_model = np.exp(polynomial_model(np.log(ells)))
# model variance is sum of cells x (2l+1)
totalvar_mod[i, 0] = np.sum(cells_model * (2 * ells + 1))
# observations is spurious power noiseless model
totalvar_obs[i, 0] = totalvar_mod[i, 0] + spurious_powers[i] * power_multiplier
# simple model variance of cell based on Gaussian covariance
cells_var = 2 * cells_model**2 / (2 * ells + 1) / fskys[i]
totalvar_var[i, 0] = np.sum(cells_var * (2 * ells + 1) ** 2)
# model assumed sigma8 = 0.8
# (add CCL cosmology here? or how I obtained them + documentation)
# results_fractional_spurious_power =
# totalvar_obs / totalvar_mod - 1.0
# model Cell variance divided by sigma8^2,
# which is the common normalization
transfers = totalvar_mod / sigma8square_model
# model ratio: formula for posterior distribution on unknown
# multiplicative factor in multivariate Gaussian likelihood
FOT = np.sum(transfers[:, 0] * totalvar_obs[:, 0] / totalvar_var[:, 0])
FTT = np.sum(transfers[:, 0] * transfers[:, 0] / totalvar_var[:, 0])
# mean and stddev of multiplicative factor
sigma8square_fit = FOT / FTT
sigma8square_error = FTT**-0.5
return sigma8square_fit, sigma8square_error, sigma8square_model
# solve for the gaussian posterior distribution on sigma8^2
sigma8square_fit, sigma8square_error, sigma8square_model = solve_for_multiplicative_factor(
spuriousdensitypowers, self.density_tomography_model, fskys, self.lmin, self.power_multiplier
)
results_sigma8_square_bias = (sigma8square_fit - sigma8square_model) / sigma8square_error
if not self.convert_to_sigma8:
return results_sigma8_square_bias
else:
# turn result into bias on sigma8,
# via change of variable and simple propagation of uncertainty.
sigma8_fit = sigma8square_fit**0.5
sigma8_model = sigma8square_model**0.5
sigma8_error = 0.5 * sigma8square_error * sigma8_fit / sigma8square_fit
results_sigma8_bias = (sigma8_fit - sigma8_model) / sigma8_error
print(sigma8square_model, sigma8square_fit, sigma8square_error, results_sigma8_square_bias)
print(sigma8_model, sigma8_fit, sigma8_error, results_sigma8_bias)
return results_sigma8_bias
[docs]
class MultibandMeanzBiasMetric(BaseMetric):
"""
Run as summary metric on MultibandExgalM5.
Parameters
----------
year : `int`, optional
The year of the survey to calculate the bias.
This is used to derive the dm/dz derivative used to
translate m5 rms into dz rms.
meanz_tomograph_model : `dict`
dictionary containing models calculated for fiducial N(z):
meanz: numpy.float
the meanz within a tomographic bin at a given band
dz_dm5: numpy.float
the absolute value of the derivative of z in a bin as a function
of the depth, m5 magnitude. This is later used to
translate fluctuations
in depth to fluctuations in tomographic redshift
Returns
-------
result : `float` array
The ratio of this bias to the desired DESC y1 upper
bound on the bias, and the ratio
between the clbias and the y10 DESC SRD requirement.
Desired values are less than 1 by Y10.
Notes
-----
This is a summary metric to be run on the results
of the MultibandExgalM5.
MultibandExgalM5 provides the m5 depth in all LSST bands
given a specific slice.
This summary metric takes those depths and reads the derivatives
from the tomogrpahic model,
computes the bias in shear signal Cl and then computes the ratio of
that bias to the Y1 goal and
Y10 science requirement.
"""
def __init__(
self,
meanz_tomography_model,
filter_list=["u", "g", "r", "i", "z", "y"],
year=10,
zbins=4,
**kwargs,
):
super().__init__(col="metricdata", **kwargs)
# Set mask_val, so that we receive metric_values.filled(mask_val)
self.mask_val = np.nan
self.badval = np.nan
self.rms_func = np.nanstd
self.year = year
self.filter_list = filter_list
self.zbins = zbins
self.tomo_model = meanz_tomography_model
[docs]
def run(self, data_slice, slice_point=None):
def compute_dzfromdm(zbins, band_name, year):
"""This computes the dm/dz relationship calibrated from simulations
by Jeff Newmann and Qianjun Hang, which forms the
meanz_tomographic_model.
Parameters
----------
zbins : `int`
The number of tomographic bins considered. For now
this is zbins < 5
band_name : `str`
The assumed filter band
model_z: dict
The meanz_tomographic_model assumed in this work
Returns
-------
dzdminterp : `float` array
The interpolated value of the derivative dz/dm
meanzinterp : `float` array
The meanz in each tomographic bin.
"""
meanzinterp = np.zeros(zbins)
dzdminterp = np.zeros(zbins)
for z in range(zbins):
meanzinterp[z] = self.tomo_model["year%i" % (year + 1)]["meanz"][z][band_name]
dzdminterp[z] = self.tomo_model["year%i" % (year + 1)]["dz_dm5"][z][band_name]
return dzdminterp, meanzinterp
# Not entirely sure if we should include these
# here or in a separate module/file
def use_zbins(meanz_vals, figure_9_mean_z=np.array([0.2, 0.4, 0.7, 1.0]), figure_9_width=0.2):
"""This computes which redshift bands are within the range
specified in https://arxiv.org/pdf/2305.15406.pdf and
can safely be used
to compute what Cl bias result from z fluctuations
caused by rms variations in the m5.
Parameters
----------
meanz_vals : `float` array
Array of meanz values to be used.
Returns
-------
use_bins : `boolean` array
An array of boolean values of length meanz_vals
"""
max_z_use = np.max(figure_9_mean_z) + 2 * figure_9_width
use_bins = meanz_vals < max_z_use
return use_bins
def compute_Clbias(meanz_vals, scatter_mean_z_values):
"""This computes the Cl bias
that results z fluctuations caused by rms variations in the m5.
Parameters
----------
meanz_vals : `float` array
Array of meanz values to be used.
scatter_mean_z_values : `float` array
Array of rms values of the z fluctuations
Returns
-------
clbiasvals : `float` array
An array of values of the clbias
mean_z_values_use : `float` array
An array of the meanz values that are within the
interpolation range of 2305.15406
Notes
------
This interpolates from the Figure 9 in https://arxiv.org/pdf/2305.15406.pdf
"""
figure_9_mean_z = np.array([0.2, 0.4, 0.7, 1.0])
figure_9_Clbias = np.array([1e-3, 2e-3, 5e-3, 1.1e-2])
figure_9_width = 0.2
figure_9_mean_z_scatter = 0.02
mzvals = np.array([float(mz) for mz in meanz_vals])
sctz = np.array([float(sz) for sz in scatter_mean_z_values])
fit_res = np.polyfit(figure_9_mean_z, figure_9_Clbias, 2)
poly_fit = np.poly1d(fit_res)
use_bins = use_zbins(meanz_vals, figure_9_mean_z, figure_9_width)
mean_z_values_use = mzvals[use_bins]
sctz_use = sctz[use_bins]
Clbias = poly_fit(mean_z_values_use)
rescale_fac = sctz_use / figure_9_mean_z_scatter
Clbias *= rescale_fac
fit_res_bias = np.polyfit(mean_z_values_use, Clbias, 1)
poly_fit_bias = np.poly1d(fit_res_bias)
clbiasvals = poly_fit_bias(mean_z_values_use)
return clbiasvals, mean_z_values_use
result = np.empty(self.zbins, dtype=[("name", np.str_, 20), ("y1ratio", float), ("y10ratio", float)])
result["name"][0] = "MultibandMeanzBiasMetric"
# Technically don't need this for now (isn't used in previous one)
# need to define an array of bad values for the masked pixels
badval_arr = np.repeat(self.badval, len(self.filter_list))
# converts the input recarray to an array
data_slice_list = [
badval_arr if isinstance(x, float) else x for x in data_slice["metricdata"].tolist()
]
# should be (nbins, npix)
data_slice_arr = np.asarray(data_slice_list, dtype=float).T
data_slice_arr[~np.isfinite(data_slice_arr)] = np.nan
# measure rms in each bin.
# XXX--this is probably meant to be run on
# only valid values, so changing to nanstd
rmsarray = self.rms_func(data_slice_arr, axis=1)
# totdz = 0
avmeanz = 0
totclbias = 0
for i, filt in enumerate(self.filter_list):
dzdminterp, meanzinterp = compute_dzfromdm(self.zbins, filt, self.year)
stdz = [float(np.abs(dz)) * float(rmsarray[i]) for dz in dzdminterp]
clbias, meanz_use = compute_Clbias(meanzinterp, stdz)
# totdz += [float(st**2) for st in stdz]
totclbias += clbias
avmeanz += meanzinterp
# These should be included in self rather than hard coded
y10_req = 0.003
y1_goal = 0.013
# clbiastot = np.max(clbias) # if adding doesn't
# work over z range -- CHECK
y10ratio = totclbias / y10_req
y1ratio = totclbias / y1_goal
result["y1ratio"] = y1ratio
result["y10ratio"] = y10ratio
return result
# Let's make code that pulls out the northern/southern galactic regions,
# and gets statistics of the footprint by region.
def _is_ngp(ra, dec):
c = SkyCoord(ra=ra * u.degree, dec=dec * u.degree, frame="icrs")
lat = c.galactic.b.deg
return lat >= 0
def get_stats_by_region(use_map, nside, maskval=0, region="all"):
if region not in ["all", "north", "south"]:
raise ValueError("Invalid region %s" % region)
to_use = use_map > maskval
if region != "all":
# Find the north/south part of the map as requested
npix = hp.nside2npix(nside)
ra, dec = hp.pix2ang(hp.npix2nside(npix), range(npix), lonlat=True)
ngp_mask = _is_ngp(ra, dec)
if region == "north":
reg_mask = ngp_mask
else:
reg_mask = ~ngp_mask
to_use = to_use & reg_mask
# Calculate the desired stats
reg_mad = median_abs_deviation(use_map[to_use])
reg_median = np.median(use_map[to_use])
reg_std = np.std(use_map[to_use])
# Return the values
return (reg_mad, reg_median, reg_std)
def stripiness_test_statistic(data_slice, nside):
"""
A utility to find whether a particular routine has stripey
features in the exposure time map.
"""
# Analyze the exposure time map to get MAD, median, std in north/south.
mad = {}
med = {}
frac_scatter = {}
regions = ["north", "south"]
for region in regions:
mad[region], med[region], _ = get_stats_by_region(data_slice, nside, region=region)
frac_scatter[region] = mad[region] / med[region]
test_statistic = frac_scatter["north"] / frac_scatter["south"] - 1
return test_statistic
class StripinessMetric(BaseMetric):
"""
Run as summary metric on NestedRIZExptimeExgalM5Metric.
This metric uses maps of the combined RIZ exposure time
to identify stripes or residual rolling features,
for the UniformAreaFoMFractionMetric.
The number returned quantifies the stripiness of the field.
In UniformAreaFoMFractionMetric it is a test statistic:
if it is outside of +-0.7/np.sqrt(year) then
the RIZ exposure time map is considered to have stripes.
Points of contact / contributors: Rachel Mandelbaum, Boris Leistedt
Parameters
----------
year: `int`
year of observation, in order to adjust the cut on
the depth fluctuations for the stipe detection
nside: `int`
must be the nside at which the base metric
NestedRIZExptimeExgalM5Metric is calculated at
verbose: bool, optional
if true, will display the segmentation maps and the areas and
FOMs of the two regions found
Returns
-------
result: `float`
A number quantifying the stripiness of the RIZ exposure time.
"""
def __init__(
self,
year,
nside,
verbose=True,
**kwargs,
):
self.year = year
self.mask_val = hp.UNSEEN
self.verbose = verbose
self.nside = nside
names = ["exgal_m5", "riz_exptime"]
types = [float] * 2
self.mask_val_arr = np.zeros(1, dtype=list(zip(names, types)))
self.mask_val_arr["exgal_m5"] = self.mask_val
self.mask_val_arr["riz_exptime"] = self.mask_val
super().__init__(col="metricdata", **kwargs)
def run(self, data_slice, slice_point=None):
data_slice_list = [
self.mask_val_arr if isinstance(x, float) else x for x in data_slice["metricdata"].tolist()
]
data_slice_arr = np.asarray(data_slice_list, dtype=self.mask_val_arr.dtype)
# a bit of gymnastics to make sure all bad values
# (nan, -666) are recast as hp.UNSEEN
ind = data_slice_arr["riz_exptime"] == -666
ind |= ~np.isfinite(data_slice_arr["riz_exptime"])
data_slice_arr["riz_exptime"][ind.ravel()] = hp.UNSEEN
# sanity check
nside = hp.npix2nside(data_slice["metricdata"].size)
assert nside == self.nside
test_statistic = stripiness_test_statistic(data_slice_arr["riz_exptime"].ravel(), nside)
return test_statistic
class UniformAreaFoMFractionMetric(BaseMetric):
"""
Run as summary metric on NestedRIZExptimeExgalM5Metric.
This metric uses maps of the combined RIZ exposure time and i-band depth
maps (with a consistent set of area cuts)
to identify potential reductions in cosmological constraining power due to
substantial large-scale power
in non-uniform coadds at a particular data release.
The RIZ exposure time map is used to identify whether there
are residual rolling features.
Under the hood, this runs the StripinessMetric.
If not, the metric returns 1. If there are such features, then the region
is segmented into similar-depth regions
and the one with the largest cosmological constraining power is presumed to
be used for science.
In that case, the metric returns the 3x2pt FoM
(StaticProbesFoMEmulatorMetric,
quantifying weak lensing and large-scale structure constraining power)
for the largest of those regions,
divided by that for the full region if it had been usable.
Points of contact / contributors: Rachel Mandelbaum, Boris Leistedt
Parameters
----------
year: `int`
year of observation, in order to adjust the
cut on the depth fluctuations
for the stipe detection
nside: `int`
must be the nside at which the base metric
NestedRIZExptimeExgalM5Metric
is calculated at
verbose: bool, optional
if true, will display the segmentation maps and the areas and FOMs of
the two regions found
Returns
-------
result: `float`
The ratio of the FOMs of the two areas found
"""
def __init__(
self,
year,
nside,
verbose=True,
**kwargs,
):
self.year = year
self.mask_val = hp.UNSEEN
self.verbose = verbose
self.nside = nside
names = ["exgal_m5", "riz_exptime"]
types = [float] * 2
self.mask_val_arr = np.zeros(1, dtype=list(zip(names, types)))
self.mask_val_arr["exgal_m5"] = self.mask_val
self.mask_val_arr["riz_exptime"] = self.mask_val
self.threebyTwoSummary = StaticProbesFoMEmulatorMetric(
nside=nside, metric_name="3x2ptFoM", col="exgal_m5"
)
super().__init__(col="metricdata", **kwargs)
def run(self, data_slice, slice_point=None):
data_slice_list = [
self.mask_val_arr if isinstance(x, float) else x for x in data_slice["metricdata"].tolist()
]
data_slice_arr = np.asarray(data_slice_list, dtype=self.mask_val_arr.dtype)
# a bit of gymnastics to make sure all bad values
# (nan, -666) are recast as hp.UNSEEN
ind = data_slice_arr["riz_exptime"] == -666
ind |= ~np.isfinite(data_slice_arr["riz_exptime"])
ind = data_slice_arr["exgal_m5"] == -666
ind |= ~np.isfinite(data_slice_arr["exgal_m5"])
data_slice_arr["exgal_m5"][ind.ravel()] = hp.UNSEEN
data_slice_arr["riz_exptime"][ind.ravel()] = hp.UNSEEN
# sanity check
nside = hp.npix2nside(data_slice["metricdata"].size)
assert nside == self.nside
# Check for stripiness
threshold = 0.7 / np.sqrt(self.year)
test_statistic = stripiness_test_statistic(data_slice_arr["riz_exptime"].ravel(), nside)
if np.abs(test_statistic) < np.abs(threshold):
stripes = False
else:
stripes = True
def apply_clustering(clustering_data):
# A thin wrapper around sklearn routines
# (can swap out which one we are using systematically).
# We fix parameters like `n_clusters` since realistically
# we know for rolling that we should expect 2 clusters.
# from sklearn.cluster import SpectralClustering
# clustering = SpectralClustering(n_clusters=2,
# assign_labels='discretize', random_state=0).fit(clustering_data)
clustering = KMeans(n_clusters=2, random_state=0, n_init="auto").fit(clustering_data)
labels = clustering.labels_ + 1
return labels
def expand_labels(depth_map, labels, maskval=0):
# A utility to apply the labels from a masked
# version of a depth map back to the entire depth map.
expanded_labels = np.zeros(hp.nside2npix(nside))
cutval = maskval + 0.1
expanded_labels[depth_map > cutval] = labels
return expanded_labels
def get_area_stats(depth_map, labels, maskval=0, n_clusters=2):
# A routine to get some statistics of the
# clustering: area fractions, median map values
expanded_labels = expand_labels(depth_map, labels, maskval=maskval)
cutval = maskval + 0.1
area_frac = []
med_val = []
for i in range(n_clusters):
new_map = depth_map.copy()
new_map[expanded_labels != i + 1] = maskval
this_area_frac = len(new_map[new_map > cutval]) / len(depth_map[depth_map > cutval])
this_med_val = np.median(new_map[new_map > cutval])
area_frac.append(this_area_frac)
med_val.append(this_med_val)
return area_frac, med_val
def show_clusters(depth_map, labels, maskval=0, n_clusters=2, min=500, max=3000):
# A routine to show the clusters found by the unsupervised
# clustering algorithm (start with original map then 2 clusters).
expanded_labels = expand_labels(depth_map, labels, maskval=maskval)
hp.visufunc.mollview(depth_map, min=min, max=max)
for i in range(n_clusters):
new_map = depth_map.copy()
new_map[expanded_labels != i + 1] = maskval
hp.visufunc.mollview(new_map, min=min, max=max)
return get_area_stats(depth_map, labels, maskval=maskval, n_clusters=n_clusters)
def make_clustering_dataset(depth_map, maskval=0, priority_fac=0.9, nside=64):
"""
A utility routine to get a dataset for unsupervised clustering.
Note:
- We want the unmasked regions of the depth map only.
- We assume masked regions are set to `maskval`,
and cut 0.1 magnitudes above that.
- We really want it to look at depth fluctuations.
So, we have to rescale the
RA/dec dimensions to avoid them being prioritized
because their values are larger and
have more variation than depth. Currently we rescale RA/dec
such that their standard deviations are 1-priority_fac times
the standard deviation of the depth map. That's why
priority_fac is a tunable parameter; it should be between 0 and 1
"""
if priority_fac < 0 or priority_fac >= 1:
raise ValueError("priority_fac must lie between 0 and 1")
theta, phi = hp.pixelfunc.pix2ang(nside, ipix=np.arange(hp.nside2npix(nside)))
# theta is 0 at the north pole, pi/2 at equator,
# pi at south pole; phi maps to RA
ra = np.rad2deg(phi)
dec = np.rad2deg(0.5 * np.pi - theta)
# Make a 3D numpy array containing the unmasked regions,
# including a rescaling factor to prioritize the depth
n_unmasked = len(depth_map[depth_map > 0.1])
my_data = np.zeros((n_unmasked, 3))
cutval = 0.1 + maskval
my_data[:, 0] = (
ra[depth_map > cutval]
* (1 - priority_fac)
* np.std(depth_map[depth_map > cutval])
/ np.std(ra[depth_map > cutval])
)
my_data[:, 1] = (
dec[depth_map > cutval]
* (1 - priority_fac)
* np.std(depth_map[depth_map > cutval])
/ np.std(dec[depth_map > cutval])
)
my_data[:, 2] = depth_map[depth_map > cutval]
return my_data
if not stripes:
return 1
else:
# Do the clustering if we got to this point
if self.verbose:
print("Verbose mode - Carrying out the clustering exercise for this map")
clustering_data = make_clustering_dataset(data_slice_arr["riz_exptime"].ravel(), nside=nside)
labels = apply_clustering(clustering_data)
area_frac, med_val = get_area_stats(data_slice_arr["riz_exptime"].ravel(), labels)
if self.verbose:
print("Verbose mode - showing original map and clusters identified for this map")
show_clusters(data_slice_arr["riz_exptime"].ravel(), labels)
print("Area fractions", area_frac)
print("Median exposure time values", med_val)
print("Median exposure time ratio", np.max(med_val) / np.min(med_val))
print("Verbose mode - proceeding with area cuts")
# Get the FoM without/with cuts. We want to check the FoM for each
# area, if we're doing cuts, and
# return the higher one. This will typically be for the
# larger area, but not necessarily, if the smaller area
# is deeper.
expanded_labels = expand_labels(data_slice_arr["riz_exptime"].ravel(), labels)
my_hpid_1 = expanded_labels == 1 # np.where(expanded_labels == 1)[0]
my_hpid_2 = expanded_labels == 2 # np.where(expanded_labels == 2)[0]
# should this be labels or expanded_labels?!
# copies need to be recarrays
data_slice_subset_2 = deepcopy(data_slice_arr)
data_slice_subset_1 = deepcopy(data_slice_arr)
data_slice_subset_1[my_hpid_2] = self.mask_val_arr
data_slice_subset_2[my_hpid_1] = self.mask_val_arr
fom1 = self.threebyTwoSummary.run(data_slice_subset_1)
fom2 = self.threebyTwoSummary.run(data_slice_subset_2)
fom = np.max((fom1, fom2))
fom_total = self.threebyTwoSummary.run(data_slice_arr)
if self.verbose:
print("FOMs:", fom1, fom2, fom, fom_total)
return fom / fom_total
class UDropoutsNumbersShallowestDepth(BaseMetric):
"""
Run as summary metric on MultibandExgalM5.
Parameters
----------
year : `int`, optional
The year of the survey to calculate the bias.
This is used to derive the dm/dz derivative used to translate
m5 rms into dz rms.
Returns
-------
result : `float` array
The ratio of this bias to the desired DESC y1 upper bound on
the bias, and the ratio
between the clbias and the y10 DESC SRD requirement.
Desired values are less than 1 by Y10.
Notes
-----
This is a summary metric to be run on the results
of the MultibandExgalM5.
MultibandExgalM5 provides the m5 depth in all LSST
bands given a specific slice.
"""
def __init__(self, filter_list=["u", "g", "r", "i", "z", "y"], **kwargs):
super().__init__(col="metricdata", percentile=5, **kwargs)
# Set mask_val, so that we receive metric_values.filled(mask_val)
self.mask_val = hp.UNSEEN
self.badval = hp.UNSEEN
self.percentileMetric = PercentileMetric(percentile=5, name="percentile")
self.filter_list = filter_list
def run(self, data_slice, slice_point=None):
# Technically don't need this for now (isn't used in previous one)
# need to define an array of bad values for the masked pixels
badval_arr = np.repeat(self.badval, len(self.filter_list))
# converts the input recarray to an array
data_slice_list = [
badval_arr if isinstance(x, float) else x for x in data_slice["metricdata"].tolist()
]
# should be (nbins, npix)
data_slice_arr = np.asarray(data_slice_list, dtype=float).T
data_slice_arr[~np.isfinite(data_slice_arr)] = (
hp.UNSEEN
) # need to work with TotalPowerMetric and healpix
# measure rms in each bin.
# The original metric returns an array at each slice_point (of the
# original slicer) -- so there is a bit of "rearrangement" that
# has to happen to be able to pass a np.array with right dtype
# (i.e. dtype = [("metricdata", float)]) to each call to
# the rmsMetric `run` methods.
percentiles = np.array(
[
self.percentileMetric.run(np.core.records.fromrecords(x, dtype=[("metricdata", float)]))
for x in data_slice_arr
]
) # 6 numbers
import astropy.units as u
from astropy.cosmology import Planck18
from astropy.modeling.models import Schechter1D
def schecter_lf(
m_grid: np.ndarray,
log_phi_star: float = -2.84,
M_star: float = -20.91,
alpha: float = -1.68,
redshift: float = 3,
) -> np.ndarray:
"""Schecter Luminosity Function on grid of apparent magnitudes.
Defaults are for z~3 u-dropout Luminosity Function from Table 6
of Harikane et al. 2022.
Parameters
----------
m_grid: np.ndarray
Array of apparent AB magnitudes on which to calculate the
luminosity function.
log_phi_star: float, default=-2.84
Natural log of phi_star, the normalization of the luminosity
function in units of mag^-1 Mpc^-3
M_star: float, default=-20.91
The characteristic absolute magnitude where the power-law form
of the luminosity function cuts off.
alpha: float, default=-1.68
The power law index, also known as the faint-end slope.
redshift: float, default=3
Redshift used for converting apparent magnitudes into absolute
magnitudes.
Returns
-------
np.ndarray
Number density in units mag^-1 Mpc^-3
"""
# Convert observed magnitudes to absolute
DL = Planck18.luminosity_distance(redshift).to(u.pc).value # Lum. Dist. in pc
M_grid = m_grid - 5 * np.log10(DL / 10) + 2.5 * np.log10(1 + redshift)
# Calculate luminosity function in absolute magnitudes
schechter = Schechter1D(10**log_phi_star, M_star, alpha)
return schechter(M_grid)
def number_density(
m_grid: np.ndarray,
LF: np.ndarray,
redshift: float = 3,
dz: float = 1,
) -> float:
"""Calculate number density per deg^2.
Parameters
----------
m_grid: np.ndarray
Array of apparent AB magnitudes corresponding to the
luminosity function values.
LF: np.ndarray
The luminosity function evaluated on m_grid, in units
mag^-1 Mpc^-3.
redshift: float, default=3
The central redshift used for evaluating comoving quantities.
dz: float, default=1
The full width of the redshift bin
Returns
-------
float
The total number density of galaxies in units deg^-2.
"""
# Calculate comoving depth of redshift bin (Mpc)
chi_far = Planck18.comoving_distance(redshift + dz / 2)
chi_near = Planck18.comoving_distance(redshift - dz / 2)
dchi = chi_far - chi_near
# Calculate number density (mag^-1 Mpc^-2)
n_dm = (LF / u.Mpc**3) * dchi
# Convert to mag^-1 deg^-2
deg_per_Mpc = Planck18.arcsec_per_kpc_comoving(redshift).to(u.deg / u.Mpc)
n_dm /= deg_per_Mpc**2
# Integrate the luminosity function
n = np.trapz(n_dm, m_grid)
return n.value
u5 = percentiles[self.filter_list == "u"] # uband m5
u3 = u5 + 2.5 * np.log10(5 / 3) # convert to uband 3-sigma
g_cut = u3 - 1.5 # cut on g band
# Calculate LF up to g_cut
m_grid = np.linspace(20, g_cut)
LF = schecter_lf(m_grid)
n_deg2 = number_density(m_grid, LF) # number of objects per sq deg
return n_deg2
class UDropoutsArea(BaseMetric):
"""
Run as summary metric on MultibandExgalM5.
Parameters
----------
year : `int`, optional
The year of the survey to calculate the bias.
This is used to derive the dm/dz derivative used to translate m5
rms into dz rms.
Returns
-------
result : `float` array
The ratio of this bias to the desired DESC y1 upper bound on
the bias, and the ratio
between the clbias and the y10 DESC SRD requirement.
Desired values are less than 1 by Y10.
Notes
-----
This is a summary metric to be run on the results
of the MultibandExgalM5.
MultibandExgalM5 provides the m5 depth in all LSST
bands given a specific slice.
"""
def __init__(self, filter_list=["u", "g", "r", "i", "z", "y"], **kwargs):
super().__init__(col="metricdata", percentile=5, **kwargs)
# Set mask_val, so that we receive metric_values.filled(mask_val)
self.mask_val = hp.UNSEEN
self.badval = hp.UNSEEN
self.percentileMetric = PercentileMetric(percentile=5, name="percentile")
self.filter_list = filter_list
def run(self, data_slice, slice_point=None):
# Technically don't need this for now (isn't used in previous one)
# need to define an array of bad values for the masked pixels
badval_arr = np.repeat(self.badval, len(self.filter_list))
# converts the input recarray to an array
data_slice_list = [
badval_arr if isinstance(x, float) else x for x in data_slice["metricdata"].tolist()
]
# should be (nbins, npix)
data_slice_arr = np.asarray(data_slice_list, dtype=float).T
data_slice_arr[~np.isfinite(data_slice_arr)] = (
hp.UNSEEN
) # need to work with TotalPowerMetric and healpix
# measure rms in each bin.
# The original metric returns an array at each slice_point (of the
# original slicer) -- so there is a bit of "rearrangement" that
# has to happen to be able to pass a np.array with right dtype
# (i.e. dtype = [("metricdata", float)]) to each call to
# the rmsMetric `run` methods.
percentiles = np.array(
[
self.percentileMetric.run(np.core.records.fromrecords(x, dtype=[("metricdata", float)]))
for x in data_slice_arr
]
) # 6 numbers
return percentiles