Source code for rubin_sim.maf.maf_contrib.selfcal_uniformity_metric
__all__ = ("PhotometricSelfCalUniformityMetric",)
import os
import time
import healpy as hp
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.table import Table
from rubin_scheduler.data import get_data_dir
from rubin_scheduler.utils import healbin
from scipy.stats import median_abs_deviation
from rubin_sim.maf.metrics import BaseMetric
from rubin_sim.selfcal import LsqrSolver, OffsetSNR, generate_catalog
from rubin_sim.selfcal.offsets import OffsetSys
def _match(arr1, arr2):
st1 = np.argsort(arr1)
sub1 = np.searchsorted(arr1, arr2, sorter=st1)
if arr2.max() > arr1.max():
bad = sub1 == arr1.size
sub1[bad] = arr1.size - 1
(sub2,) = np.where(arr1[st1[sub1]] == arr2)
sub1 = st1[sub1[sub2]]
return sub1, sub2
[docs]
class PhotometricSelfCalUniformityMetric(BaseMetric):
def __init__(
self,
nside_residual=128,
highglat_cut=30.0,
outlier_nsig=4.0,
metric_name="PhotometricSelfCalUniformityMetric",
filter_name="r",
):
cols = [
"observationid",
"fieldra",
"fielddec",
"fiveSigmaDepth",
"rotSkyPos",
"filter",
]
super().__init__(col=cols, metric_name=metric_name)
filename = os.path.join(get_data_dir(), "maf", "monster_stars_uniformity_i15-18_sampled.parquet")
self.stars = Table.read(filename)
# We have to rename dec to decl for the selfcal code.
if "dec" in self.stars.dtype.names:
self.stars["decl"] = self.stars["dec"]
self.stars = self.stars.as_array()
self.nside_residual = nside_residual
self.highglat_cut = highglat_cut
self.outlier_nsig = outlier_nsig
self.units = "mmag"
self.filter_name = filter_name
[docs]
def run(self, data_slice, slice_point=None):
offsets = [OffsetSys(error_sys=0.03), OffsetSNR(lsst_filter=self.filter_name)]
visits = np.zeros(
len(data_slice),
dtype=[
("observationId", "i8"),
("ra", "f8"),
("dec", "f8"),
("fiveSigmaDepth", "f8"),
("rotSkyPos", "f8"),
],
)
visits["observationId"] = data_slice["observationId"]
visits["ra"] = data_slice["fieldRA"]
visits["dec"] = data_slice["fieldDec"]
visits["fiveSigmaDepth"] = data_slice["fiveSigmaDepth"]
visits["rotSkyPos"] = data_slice["rotSkyPos"]
good_stars = np.isfinite(self.stars[f"{self.filter_name}mag"])
stars = self.stars[good_stars]
observed_stars = generate_catalog(
visits,
stars,
offsets=offsets,
lsst_filter=self.filter_name,
n_patches=16,
verbose=False,
)
solver = LsqrSolver(observed_stars)
print("Starting solver...")
t0 = time.time()
solver.run()
fit_patches, fit_stars = solver.return_solution()
t1 = time.time()
dt = (t1 - t0) / 60.0
print("runtime= %.1f min" % dt)
# Trim stars to the ones with solutions.
a, b = _match(stars["id"], fit_stars["id"])
stars_trimmed = stars[a]
fit_stars_trimmed = fit_stars[b]
# Residuals after fit, removing floating zeropoint
resid = stars_trimmed[f"{self.filter_name}mag"] - fit_stars_trimmed["fit_mag"]
resid = resid - np.median(resid)
resid_map = healbin(
stars_trimmed["ra"],
stars_trimmed["dec"],
resid,
self.nside_residual,
reduce_func=np.median,
)
(ipring,) = np.where(resid_map > hp.UNSEEN)
scatter_full = median_abs_deviation(resid_map[ipring], scale="normal")
# Fraction of (4) sigma outliers.
highscat = np.abs(resid_map[ipring]) > self.outlier_nsig * scatter_full
outlier_frac_full = highscat.sum() / len(ipring)
# Cut to high latitude
ra, dec = hp.pix2ang(self.nside_residual, ipring, nest=False, lonlat=True)
coords = SkyCoord(ra, dec, frame="icrs", unit="deg")
b = coords.galactic.b.value
high_glat = np.abs(b) > self.highglat_cut
scatter_highglat = median_abs_deviation(resid_map[ipring[high_glat]], scale="normal")
# Fraction of (4) sigma outliers.
highscat = np.abs(resid_map[ipring[high_glat]]) > self.outlier_nsig * scatter_highglat
outlier_frac_highglat = highscat.sum() / len(ipring)
# Convert to mmag
scatter_full = scatter_full * 1000.0
scatter_highglat = scatter_highglat * 1000.0
result = {
"scatter_full": scatter_full,
"scatter_highglat": scatter_highglat,
"outlier_frac_full": outlier_frac_full,
"outlier_frac_highglat": outlier_frac_highglat,
"uniformity_map": resid_map,
}
return result
def reduce_scatter_full(self, metric_value):
return metric_value["scatter_full"]
def reduce_scatter_highglat(self, metric_value):
return metric_value["scatter_highglat"]
def reduce_outlier_frac_full(self, metric_value):
return metric_value["outlier_frac_full"]
def reduce_outlier_frac_highglat(self, metric_value):
return metric_value["outlier_frac_highglat"]