Source code for rubin_sim.maf.batches.ddf_batch

__all__ = ("ddfBatch",)

import healpy as hp
import numpy as np
from rubin_scheduler.utils import (
    angular_separation,
    ddf_locations,
    ddf_locations_pre3_5,
    hpid2_ra_dec,
    sample_patch_on_sphere,
)

import rubin_sim.maf as maf

from .common import lightcurve_summary


[docs] def ddfBatch( run_name="run_name", nside=512, radius=2.5, nside_sne=128, extra_sql=None, extra_info_label=None, old_coords=False, ): """ A set of metrics to evaluate DDF fields. Parameters ---------- run_name : `str`, optional The name of the simulation (for plot titles and file outputs). nside : `int`, optional The HEALpix nside to run most of the metrics on. radius : `float` The radius to select around each ddf (degrees). The default value of 2.5 degrees has been chosen to balance selecting a large enough area to ensure gathering all of the double Euclid field or a run with large dithers, while not including too much background area (which can skew metrics of the median number of visits, etc.). nside_sne : `int`, optional The HEALpix nside to use with the SNe metric. The default is lower than the default nside for other metrics, as the SNe metric is more computationally expensive. extra_sql : `str`, optional Additional sql constraint (such as night<=365) to add to the necessary sql constraints for each metric. extra_info_label : `str`, optional Additional description information to add (alongside the extra_sql) old_coords : `bool` Use the default locations for the DDFs from pre-July 2024. Default False. Returns ------- metric_bundleDict : `dict` of `maf.MetricBundle` """ bundle_list = [] # Define the slicer to use for each DDF # Get standard DDF locations and reformat information as a dictionary ddfs = {} if old_coords: ddfs_rough = ddf_locations_pre3_5() else: ddfs_rough = ddf_locations() for ddf in ddfs_rough: ddfs[ddf] = {"ra": ddfs_rough[ddf][0], "dec": ddfs_rough[ddf][1]} # Combine the Euclid double-field into one - but with two ra/dec values ddfs["EDFS"] = { "ra": [ddfs["EDFS_a"]["ra"], ddfs["EDFS_b"]["ra"]], "dec": [ddfs["EDFS_a"]["dec"], ddfs["EDFS_b"]["dec"]], } del ddfs["EDFS_a"] del ddfs["EDFS_b"] # Let's include an arbitrary point that should be in the WFD for comparison ddfs["WFD"] = {"ra": 0, "dec": -20.0} ra, dec = hpid2_ra_dec(nside, np.arange(hp.nside2npix(nside))) ra_sne, dec_sne = hpid2_ra_dec(nside_sne, np.arange(hp.nside2npix(nside_sne))) ddf_slicers = {} ddf_slicers_sne = {} for ddf in ddfs: # Define the healpixels to use for this DDF if np.size(ddfs[ddf]["ra"]) > 1: goods = [] goods_sne = [] for ddf_ra, ddf_dec in zip(ddfs[ddf]["ra"], ddfs[ddf]["dec"]): dist = angular_separation(ra, dec, ddf_ra, ddf_dec) goods.append(np.where(dist <= radius)[0]) dist = angular_separation(ra_sne, dec_sne, ddf_ra, ddf_dec) goods_sne.append(np.where(dist <= radius)[0]) good = np.unique(np.concatenate(goods)) good_sne = np.unique(np.concatenate(goods_sne)) else: dist = angular_separation(ra, dec, np.mean(ddfs[ddf]["ra"]), np.mean(ddfs[ddf]["dec"])) good = np.where(dist <= radius)[0] dist = angular_separation(ra_sne, dec_sne, np.mean(ddfs[ddf]["ra"]), np.mean(ddfs[ddf]["dec"])) good_sne = np.where(dist <= radius)[0] ddf_slicers_sne[ddf] = maf.HealpixSubsetSlicer(nside_sne, good_sne, use_cache=False) ddf_slicers[ddf] = maf.HealpixSubsetSlicer(nside, good, use_cache=False) # Now define metrics # Set up basic all and per filter sql constraints. filterlist, colors, orders, sqls, info_labels = maf.filter_list( all=True, extra_sql=extra_sql, extra_info_label=extra_info_label ) summary_stats = [maf.MeanMetric(), maf.MedianMetric(), maf.SumMetric()] depth_stats = [maf.MedianMetric()] plotFuncs = [maf.HealpixSkyMap()] displayDict = {"group": "", "subgroup": "", "order": 0} order = 0 for ddf in ddfs: fieldname = ddf if not (fieldname.startswith("DD")): fieldname = f"DD:{fieldname}" plotDict = { "visufunc": hp.gnomview, "rot": (np.mean(ddfs[ddf]["ra"]), np.mean(ddfs[ddf]["dec"]), 0), "xsize": 500, } order += 1 # Number of SNe displayDict["group"] = "SNe" displayDict["subgroup"] = "N SNe" displayDict["caption"] = f"SNIa in the {fieldname} DDF." displayDict["order"] = order metric = maf.metrics.SNNSNMetric( verbose=False, n_bef=4, n_aft=10, zmin=0.1, zmax=1.1, z_step=0.03, daymax_step=3, coadd_night=True, gamma_name="gamma_DDF.hdf5", # have to add field name here, to avoid reduceDict key collissions metric_name=f"SNNSNMetric {fieldname}", ) bundle_list.append( maf.MetricBundle( metric, ddf_slicers_sne[ddf], constraint="", info_label=" ".join([fieldname, "all bands, only DDF observations"]), plot_dict=plotDict, plot_funcs=plotFuncs, summary_metrics=summary_stats, display_dict=displayDict, ) ) # Strong lensed SNe displayDict["group"] = "SNe" displayDict["subgroup"] = "SL SNe" displayDict["caption"] = f"Strongly Lensed SN metric in the {fieldname} DDF." displayDict["order"] = order metric = maf.SNSLMetric() bundle_list.append( maf.MetricBundle( metric, ddf_slicers[ddf], constraint=sqls["all"], info_label=" ".join([fieldname, info_labels["all"]]), plot_dict=plotDict, plot_funcs=plotFuncs, summary_metrics=summary_stats, display_dict=displayDict, ) ) # KNe delta = 5.0 # degrees n_kne = 5000 displayDict["group"] = "KNe" displayDict["subgroup"] = "" displayDict["caption"] = f"Number of KNe in the {fieldname} DDF from %i injected." % n_kne ra, dec = sample_patch_on_sphere( np.mean(ddfs[ddf]["ra"]), np.mean(ddfs[ddf]["dec"]), delta, n_kne, seed=1 ) metric = maf.KNePopMetric(metric_name="KNePopMetric_%s" % fieldname) slicer = maf.generate_kn_pop_slicer(n_events=n_kne, ra=ra, dec=dec) bundle_list.append( maf.MetricBundle( metric, slicer, "", info_label=" ".join([fieldname]), summary_metrics=lightcurve_summary(), display_dict=displayDict, ) ) # kuiper metric displayDict["group"] = "Kuiper" displayDict["subgroup"] = "" displayDict["caption"] = f"Kuiper metric in the {fieldname} DDF." sqls_gri = { "gri": "filter='g' or filter='r' or filter='i'", "riz": "filter='r' or filter='i' or filter='z'", } for sql in sqls_gri: metrics = [ maf.KuiperMetric( "rotSkyPos", metric_name=f"Kuiper statistic (0 is uniform, 1 is delta function),rotSkyPos,{fieldname}," + sql, ), maf.KuiperMetric( "rotTelPos", metric_name=f"Kuiper statistic (0 is uniform, 1 is delta function),rotTelPos,{fieldname}," + sql, ), ] for metric in metrics: bundle_list.append( maf.MetricBundle( metric, ddf_slicers[ddf], sqls_gri[sql], info_label=" ".join([fieldname]), plot_dict=plotDict, plot_funcs=plotFuncs, summary_metrics=summary_stats, display_dict=displayDict, ) ) # Weak lensing visits lim_ebv = 0.2 mag_cuts = 26.0 displayDict["group"] = "Weak Lensing" displayDict["subgroup"] = "" displayDict["caption"] = f"Weak lensing metric in the {fieldname} DDF." sqls_gri = { "gri": "filter='g' or filter='r' or filter='i'", "riz": "filter='r' or filter='i' or filter='z'", } for sql in sqls_gri: metric = maf.WeakLensingNvisits( lsst_filter="i", depth_cut=mag_cuts, ebvlim=lim_ebv, min_exp_time=20.0, metric_name="WeakLensingNvisits_" + sql, ) bundle_list.append( maf.MetricBundle( metric, ddf_slicers[ddf], sqls_gri[sql], info_label=" ".join([fieldname, sql]), plot_dict=plotDict, plot_funcs=plotFuncs, summary_metrics=summary_stats, display_dict=displayDict, ) ) # Number of QSOs in each band displayDict["group"] = "QSO" displayDict["subgroup"] = "Number" displayDict["caption"] = f"Number of QSO in the {fieldname} DDF." zmin = 0.3 extinction_cut = 1.0 for f in "ugrizy": displayDict["order"] = orders[f] summaryMetrics = [maf.SumMetric(metric_name="Total QSO")] metric = maf.QSONumberCountsMetric( f, units="mag", extinction_cut=extinction_cut, qlf_module="Shen20", qlf_model="A", sed_model="Richards06", zmin=zmin, zmax=None, ) bundle_list.append( maf.MetricBundle( metric, ddf_slicers[ddf], sqls[f], info_label=" ".join([fieldname, info_labels[f]]), plot_dict=plotDict, plot_funcs=plotFuncs, summary_metrics=summaryMetrics, display_dict=displayDict, ) ) # Run the TimeLag for each filter *and* all filters displayDict["group"] = "QSO" displayDict["subgroup"] = "TimeLags" nquist_threshold = 2.2 lag = 100 summaryMetrics = [maf.MeanMetric(), maf.MedianMetric(), maf.RmsMetric()] m = maf.AgnTimeLagMetric(threshold=nquist_threshold, lag=lag) for f in filterlist: displayDict["order"] = orders[f] displayDict["caption"] = ( f"Comparion of the time between visits compared to a defined sampling gap ({lag} days) in " f"{f} band." ) bundle_list.append( maf.MetricBundle( m, ddf_slicers[ddf], constraint=sqls[f], info_label=" ".join([fieldname, info_labels[f]]), run_name=run_name, plot_dict=plotDict, plot_funcs=plotFuncs, summary_metrics=summaryMetrics, display_dict=displayDict, ) ) # Run the TimeLag for each filter *and* all filters displayDict["group"] = "QSO" displayDict["subgroup"] = "TimeLags" nquist_threshold = 2.2 lag = 5 summaryMetrics = [maf.MeanMetric(), maf.MedianMetric(), maf.RmsMetric()] m = maf.AgnTimeLagMetric(threshold=nquist_threshold, lag=lag) for f in filterlist: displayDict["order"] = orders[f] displayDict["caption"] = ( f"Comparion of the time between visits compared to a defined sampling gap ({lag} days) in " f"{f} band." ) bundle_list.append( maf.MetricBundle( m, ddf_slicers[ddf], constraint=sqls[f], info_label=" ".join([fieldname, info_labels[f]]), run_name=run_name, plot_dict=plotDict, plot_funcs=plotFuncs, summary_metrics=summaryMetrics, display_dict=displayDict, ) ) # AGN structure function displayDict["group"] = "QSO" displayDict["subgroup"] = "Structure Function" agn_mags = {"u": 22.0, "g": 24, "r": 24, "i": 24, "z": 22, "y": 22} for f in "ugrizy": displayDict["order"] = orders[f] displayDict["caption"] = f"AGN Structure Function Error in {f} band in the {fieldname} DDF." summaryMetrics = [maf.MedianMetric(), maf.RmsMetric()] metric = maf.SFUncertMetric( mag=agn_mags[f], bins=np.logspace(0, np.log10(3650), 21), ) bundle_list.append( maf.MetricBundle( metric, ddf_slicers[ddf], sqls[f], info_label=" ".join([fieldname, info_labels[f]]), plot_dict=plotDict, plot_funcs=plotFuncs, summary_metrics=summaryMetrics, display_dict=displayDict, ) ) ####### # Coadded depth per filter, and count per filter displayDict["group"] = "Basics" for f in "ugrizy": displayDict["subgroup"] = "Coadd M5" displayDict["order"] = orders[f] displayDict["caption"] = f"Coadded m5 in {f} band in the {fieldname} DDF." metric = maf.Coaddm5Metric(metric_name=f"{fieldname} CoaddM5") bundle_list.append( maf.MetricBundle( metric, ddf_slicers[ddf], sqls[f], info_label=info_labels[f], plot_dict=plotDict, run_name=run_name, plot_funcs=plotFuncs, summary_metrics=depth_stats, display_dict=displayDict, ) ) displayDict["subgroup"] = "N Visits" displayDict["caption"] = f"Number of visits in the {f} band in the {fieldname} DDF." metric = maf.CountMetric(col="observationStartMJD", units="#", metric_name=f"{fieldname} NVisits") bundle_list.append( maf.MetricBundle( metric, ddf_slicers[ddf], sqls[f], info_label=info_labels[f], plot_dict=plotDict, run_name=run_name, plot_funcs=plotFuncs, summary_metrics=depth_stats, display_dict=displayDict, ) ) # Count over all filter displayDict["subgroup"] = "N Visits" displayDict["order"] = orders["all"] displayDict["caption"] = f"Number of visits in all bands in the {fieldname} DDF." metric = maf.CountMetric(col="observationStartMJD", units="#", metric_name=f"{fieldname} NVisits") bundle_list.append( maf.MetricBundle( metric, ddf_slicers[ddf], constraint=sqls["all"], info_label=info_labels["all"], plot_dict=plotDict, run_name=run_name, plot_funcs=plotFuncs, summary_metrics=depth_stats, display_dict=displayDict, ) ) # Count number of unique nights with visits displayDict["group"] = "Cadence" displayDict["subgroup"] = "N Nights" displayDict["order"] = orders["all"] displayDict["caption"] = f"Number of nights with visits in the {fieldname} DDF." metric = maf.CountUniqueMetric(col="night", units="#", metric_name=f"{fieldname} N Unique Nights") bundle_list.append( maf.MetricBundle( metric, ddf_slicers[ddf], constraint=sqls["all"], info_label=info_labels["all"], plot_dict=plotDict, run_name=run_name, plot_funcs=plotFuncs, summary_metrics=depth_stats, display_dict=displayDict, ) ) # Now to compute some things ~~at just the center of the DDF~~ NOPE # (will compute these "per DDF" not just at the center, since # the dithering pattern is not yet set and that will influence the # result -- once dithering is better determined, could add ptslicer). # For these metrics, add a requirement that the 'note' label # match the DDF, to avoid WFD visits skewing the results # (we want to exclude non-DD visits), if fieldname == "WFD": ptslicer = maf.UserPointsSlicer(np.mean(ddfs[ddf]["ra"]), np.mean(ddfs[ddf]["dec"])) else: ptslicer = maf.UniSlicer() # rely on query to remove non-DD visits # Add RA and Dec to slice_point data (for season calculations) # slice_points store ra/dec internally in radians. ptslicer.slice_points["ra"] = np.radians(np.mean(ddfs[ddf]["ra"])) ptslicer.slice_points["dec"] = np.radians(np.mean(ddfs[ddf]["dec"])) displayDict["group"] = "Cadence" displayDict["order"] = order fieldsqls = {} if ddf == "WFD": for f in filterlist: fieldsqls[f] = sqls[f] else: fieldsql = f"scheduler_note like '%{fieldname}%'" for f in filterlist: if len(sqls[f]) > 0: fieldsqls[f] = fieldsql + " and " + sqls[f] else: fieldsqls[f] = fieldsql displayDict["subgroup"] = "Sequence length" # Number of observations per night, any filter (sequence length) # Histogram the number of visits per night countbins = np.arange(0, 200, 5) metric = maf.NVisitsPerNightMetric( night_col="night", bins=countbins, metric_name=f"{fieldname} NVisitsPerNight", ) plotDict = {"bins": countbins, "xlabel": "Number of visits per night"} displayDict["caption"] = "Histogram of the number of visits in each night per DDF." plotFunc = maf.SummaryHistogram() bundle = maf.MetricBundle( metric, ptslicer, fieldsqls["all"], info_label=info_labels["all"], plot_dict=plotDict, display_dict=displayDict, plot_funcs=[plotFunc], ) bundle_list.append(bundle) # Coadded depth of observations per night, each filter # "magic numbers" to fill plot come from baseline v3.4 min_coadds = {"u": 22.3, "g": 22.3, "r": 22.9, "i": 23.1, "z": 21.7, "y": 21.5} max_coadds = {"u": 26, "g": 27.2, "r": 27, "i": 26.5, "z": 26.5, "y": 25.1} # Histogram the coadded depth per night, per filter for f in "ugrizy": magbins = np.arange(min_coadds[f], max_coadds[f], 0.05) metric = maf.CoaddM5PerNightMetric( night_col="night", m5_col="fiveSigmaDepth", bins=magbins, metric_name=f"{fieldname} CoaddM5PerNight", ) plotDict = {"bins": magbins, "xlabel": "Coadded Depth Per Night"} displayDict["caption"] = f"Histogram of the coadded depth in {f} in each night per DDF." plotFunc = maf.SummaryHistogram() bundle = maf.MetricBundle( metric, ptslicer, fieldsqls[f], info_label=info_labels[f], plot_dict=plotDict, display_dict=displayDict, plot_funcs=[plotFunc], ) bundle_list.append(bundle) # Plot of number of visits per night over time if fieldname.endswith("WFD"): pass else: displayDict["caption"] = f"Number of visits per night for {fieldname}." metric = maf.CountMetric("observationStartMJD", metric_name=f"{fieldname} Nvisits Per Night") slicer = maf.OneDSlicer(slice_col_name="night", bin_size=1, badval=0) plot_dict = {"filled_data": True} bundle = maf.MetricBundle( metric, slicer, fieldsqls["all"], info_label=info_labels["all"], plot_dict=plot_dict, display_dict=displayDict, summary_metrics=[ maf.MedianMetric(), maf.PercentileMetric(percentile=80, metric_name="80thPercentile"), maf.MinMetric(), maf.MaxMetric(), ], ) bundle_list.append(bundle) # Likewise, but coadded depth per filter if fieldname.endswith("WFD"): pass else: for f in "ugrizy": displayDict["caption"] = f"Coadded depth per night for {fieldname} in band {f}." metric = maf.Coaddm5Metric(metric_name=f"{fieldname} CoaddedM5 Per Night") slicer = maf.OneDSlicer(slice_col_name="night", bin_size=1, badval=min_coadds[f]) plot_dict = {"filled_data": True} bundle = maf.MetricBundle( metric, slicer, fieldsqls[f], info_label=info_labels[f], plot_dict=plot_dict, display_dict=displayDict, summary_metrics=[ maf.MedianMetric(), maf.PercentileMetric(percentile=80, metric_name="80thPercentile"), maf.MinMetric(), maf.MaxMetric(), ], ) bundle_list.append(bundle) displayDict["subgroup"] = "Sequence gaps" # Histogram of the number of nights between visits, all filters bins = np.arange(1, 40, 1) metric = maf.NightgapsMetric( bins=bins, night_col="night", metric_name=f"{fieldname} Delta Nights Histogram", ) displayDict["caption"] = f"Histogram of intervals between nights with visits, in the {fieldname} DDF." plotDict = {"bins": bins, "xlabel": "dT (nights)"} plotFunc = maf.SummaryHistogram() bundle = maf.MetricBundle( metric, ptslicer, constraint=fieldsqls["all"], info_label=info_labels["all"], plot_dict=plotDict, display_dict=displayDict, plot_funcs=[plotFunc], ) bundle_list.append(bundle) # Median inter-night gap in each and all filters for f in filterlist: metric = maf.InterNightGapsMetric( metric_name=f"{fieldname} Median Inter-Night Gap", reduce_func=np.median ) displayDict["order"] = orders[f] displayDict["caption"] = f"Median internight gap in {f} band in the {fieldname} DDF." bundle_list.append( maf.MetricBundle( metric, ptslicer, fieldsqls[f], info_label=info_labels[f], run_name=run_name, summary_metrics=[maf.MeanMetric()], plot_funcs=[], display_dict=displayDict, ) ) displayDict["subgroup"] = "Season length" # Histogram of the season lengths, all filters def rfunc(simdata): # Sometimes number of seasons is 10, sometimes 11 # (depending on where survey starts/end) # so normalize it so there's always 11 values # by adding 0 at the end. if len(simdata) < 11: simdata = np.concatenate([simdata, np.array([0], float)]) return simdata metric = maf.SeasonLengthMetric(reduce_func=rfunc, metric_dtype="object") plotDict = {"bins": np.arange(0, 12), "ylabel": "Season length (days)", "xlabel": "Season"} plotFunc = maf.SummaryHistogram() displayDict["caption"] = f"Plot of the season length per season in the {fieldname} DDF." displayDict["order"] = order bundle = maf.MetricBundle( metric, ptslicer, fieldsqls["all"], info_label=" ".join([fieldname, info_labels["all"]]), plot_dict=plotDict, display_dict=displayDict, plot_funcs=[plotFunc], ) bundle_list.append(bundle) # Median season Length metric = maf.SeasonLengthMetric( metric_name=f"{fieldname} Median Season Length", reduce_func=np.median ) displayDict["caption"] = f"Median season length in the {fieldname} DDF." bundle_list.append( maf.MetricBundle( metric, ptslicer, fieldsqls[f], info_label=info_labels["all"], run_name=run_name, summary_metrics=[maf.MeanMetric()], plot_funcs=[], display_dict=displayDict, ) ) # Cumulative distribution - only for DDF fields if fieldname.endswith("WFD"): pass else: displayDict["group"] = "Progress" displayDict["subgroup"] = "" displayDict["caption"] = ( f"Cumulative number of visits for the {fieldname.replace('DD:', '')} field." ) slicer = maf.UniSlicer() metric = maf.CumulativeMetric(metric_name=f"{fieldname} Cumulative NVisits") metricb = maf.MetricBundle( metric, slicer, fieldsqls["all"], info_label=info_labels["all"], plot_funcs=[maf.XyPlotter()], run_name=run_name, display_dict=displayDict, ) metricb.summary_metrics = [] bundle_list.append(metricb) for b in bundle_list: b.set_run_name(run_name) bundleDict = maf.make_bundles_dict_from_list(bundle_list) return bundleDict