Source code for rubin_sim.maf.maf_contrib.selfcal_uniformity_metric
__all__ = ("PhotometricSelfCalUniformityMetric",)
import os
import time
import healpy as hp
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.table import Table
from rubin_scheduler.data import get_data_dir
from rubin_scheduler.utils import healbin
from scipy.stats import median_abs_deviation
from rubin_sim.maf.metrics import BaseMetric
from rubin_sim.selfcal import LsqrSolver, OffsetSNR, generate_catalog
from rubin_sim.selfcal.offsets import OffsetSys
def _match(arr1, arr2):
    st1 = np.argsort(arr1)
    sub1 = np.searchsorted(arr1, arr2, sorter=st1)
    if arr2.max() > arr1.max():
        bad = sub1 == arr1.size
        sub1[bad] = arr1.size - 1
    (sub2,) = np.where(arr1[st1[sub1]] == arr2)
    sub1 = st1[sub1[sub2]]
    return sub1, sub2
[docs]
class PhotometricSelfCalUniformityMetric(BaseMetric):
    def __init__(
        self,
        nside_residual=128,
        highglat_cut=30.0,
        outlier_nsig=4.0,
        metric_name="PhotometricSelfCalUniformityMetric",
        filter_name="r",
    ):
        cols = [
            "observationid",
            "fieldra",
            "fielddec",
            "fiveSigmaDepth",
            "rotSkyPos",
            "filter",
        ]
        super().__init__(col=cols, metric_name=metric_name)
        filename = os.path.join(get_data_dir(), "maf", "monster_stars_uniformity_i15-18_sampled.parquet")
        self.stars = Table.read(filename)
        # We have to rename dec to decl for the selfcal code.
        if "dec" in self.stars.dtype.names:
            self.stars["decl"] = self.stars["dec"]
        self.stars = self.stars.as_array()
        self.nside_residual = nside_residual
        self.highglat_cut = highglat_cut
        self.outlier_nsig = outlier_nsig
        self.units = "mmag"
        self.filter_name = filter_name
[docs]
    def run(self, data_slice, slice_point=None):
        offsets = [OffsetSys(error_sys=0.03), OffsetSNR(lsst_filter=self.filter_name)]
        visits = np.zeros(
            len(data_slice),
            dtype=[
                ("observationId", "i8"),
                ("ra", "f8"),
                ("dec", "f8"),
                ("fiveSigmaDepth", "f8"),
                ("rotSkyPos", "f8"),
            ],
        )
        visits["observationId"] = data_slice["observationId"]
        visits["ra"] = data_slice["fieldRA"]
        visits["dec"] = data_slice["fieldDec"]
        visits["fiveSigmaDepth"] = data_slice["fiveSigmaDepth"]
        visits["rotSkyPos"] = data_slice["rotSkyPos"]
        good_stars = np.isfinite(self.stars[f"{self.filter_name}mag"])
        stars = self.stars[good_stars]
        observed_stars = generate_catalog(
            visits,
            stars,
            offsets=offsets,
            lsst_filter=self.filter_name,
            n_patches=16,
            verbose=False,
        )
        solver = LsqrSolver(observed_stars)
        print("Starting solver...")
        t0 = time.time()
        solver.run()
        fit_patches, fit_stars = solver.return_solution()
        t1 = time.time()
        dt = (t1 - t0) / 60.0
        print("runtime= %.1f min" % dt)
        # Trim stars to the ones with solutions.
        a, b = _match(stars["id"], fit_stars["id"])
        stars_trimmed = stars[a]
        fit_stars_trimmed = fit_stars[b]
        # Residuals after fit, removing floating zeropoint
        resid = stars_trimmed[f"{self.filter_name}mag"] - fit_stars_trimmed["fit_mag"]
        resid = resid - np.median(resid)
        resid_map = healbin(
            stars_trimmed["ra"],
            stars_trimmed["dec"],
            resid,
            self.nside_residual,
            reduce_func=np.median,
        )
        (ipring,) = np.where(resid_map > hp.UNSEEN)
        scatter_full = median_abs_deviation(resid_map[ipring], scale="normal")
        # Fraction of (4) sigma outliers.
        highscat = np.abs(resid_map[ipring]) > self.outlier_nsig * scatter_full
        outlier_frac_full = highscat.sum() / len(ipring)
        # Cut to high latitude
        ra, dec = hp.pix2ang(self.nside_residual, ipring, nest=False, lonlat=True)
        coords = SkyCoord(ra, dec, frame="icrs", unit="deg")
        b = coords.galactic.b.value
        high_glat = np.abs(b) > self.highglat_cut
        scatter_highglat = median_abs_deviation(resid_map[ipring[high_glat]], scale="normal")
        # Fraction of (4) sigma outliers.
        highscat = np.abs(resid_map[ipring[high_glat]]) > self.outlier_nsig * scatter_highglat
        outlier_frac_highglat = highscat.sum() / len(ipring)
        # Convert to mmag
        scatter_full = scatter_full * 1000.0
        scatter_highglat = scatter_highglat * 1000.0
        result = {
            "scatter_full": scatter_full,
            "scatter_highglat": scatter_highglat,
            "outlier_frac_full": outlier_frac_full,
            "outlier_frac_highglat": outlier_frac_highglat,
            "uniformity_map": resid_map,
        }
        return result
    def reduce_scatter_full(self, metric_value):
        return metric_value["scatter_full"]
    def reduce_scatter_highglat(self, metric_value):
        return metric_value["scatter_highglat"]
    def reduce_outlier_frac_full(self, metric_value):
        return metric_value["outlier_frac_full"]
    def reduce_outlier_frac_highglat(self, metric_value):
        return metric_value["outlier_frac_highglat"]