Source code for rubin_sim.maf.batches.glance_batch

__all__ = ("glanceBatch",)

import warnings

from rubin_scheduler.scheduler.utils import EuclidOverlapFootprint
from rubin_scheduler.utils import ddf_locations

import rubin_sim.maf.metric_bundles as metric_bundles
import rubin_sim.maf.metrics as metrics
import rubin_sim.maf.plots as plots
import rubin_sim.maf.slicers as slicers
import rubin_sim.maf.stackers as stackers

from .col_map_dict import col_map_dict
from .common import standard_summary
from .hourglass_batch import hourglassPlots
from .slew_batch import slewBasics


[docs] def glanceBatch( colmap=None, run_name="run_name", nside=64, filternames=("u", "g", "r", "i", "z", "y"), nyears=10, pairnside=32, sql_constraint=None, slicer_camera="LSST", ): """Generate a handy set of metrics that give a quick overview of how well a survey performed. This is a meta-set of other batches, to some extent. Parameters ---------- colmap : `dict`, optional A dictionary with a mapping of column names. run_name : `str`, optional The name of the simulated survey. nside : `int`, optional The nside for the healpix slicers. filternames : `list` of `str`, optional The list of individual filters to use when running metrics. There is always an all-visits version of the metrics run as well. nyears : `int`, optional How many years to attempt to make hourglass plots for pairnside : `int`, optional nside to use for the pair fraction metric (it's slow, so nice to use lower resolution) sql_constraint : `str` or None, optional Additional SQL constraint to apply to all metrics. slicer_camera : `str` Sets which spatial slicer to use. options are 'LSST' and 'ComCam' Returns ------- metric_bundleDict : `dict` of `maf.MetricBundle` """ if isinstance(colmap, str): raise ValueError("colmap must be a dictionary, not a string") if colmap is None: colmap = col_map_dict() bundle_list = [] if sql_constraint is None: sqlC = "" else: sqlC = "(%s) and" % sql_constraint if slicer_camera == "LSST": spatial_slicer = slicers.HealpixSlicer elif slicer_camera == "ComCam": spatial_slicer = slicers.HealpixComCamSlicer else: raise ValueError("Camera must be LSST or Comcam") sql_per_filt = ['%s %s="%s"' % (sqlC, colmap["filter"], filtername) for filtername in filternames] sql_per_and_all_filters = [sql_constraint] + sql_per_filt standardStats = standard_summary() subsetPlots = [plots.HealpixSkyMap(), plots.HealpixHistogram()] # Super basic things displayDict = {"group": "Basic Stats", "order": 1} sql = sql_constraint slicer = slicers.UniSlicer() # Length of Survey metric = metrics.FullRangeMetric(col=colmap["mjd"], metric_name="Length of Survey (days)") bundle = metric_bundles.MetricBundle(metric, slicer, sql, display_dict=displayDict) bundle_list.append(bundle) # Total number of filter changes metric = metrics.NChangesMetric(col=colmap["filter"], order_by=colmap["mjd"]) bundle = metric_bundles.MetricBundle(metric, slicer, sql, display_dict=displayDict) bundle_list.append(bundle) # Total open shutter fraction metric = metrics.OpenShutterFractionMetric( slew_time_col=colmap["slewtime"], exp_time_col=colmap["exptime"], visit_time_col=colmap["visittime"], ) bundle = metric_bundles.MetricBundle(metric, slicer, sql, display_dict=displayDict) bundle_list.append(bundle) # Total effective exposure time metric = metrics.MeanMetric(col="t_eff") teff_stacker = stackers.TeffStacker(normed=True) for sql in sql_per_and_all_filters: bundle = metric_bundles.MetricBundle( metric, slicer, sql, stacker_list=[teff_stacker], display_dict=displayDict ) bundle_list.append(bundle) # Number of observations, all and each filter metric = metrics.CountMetric(col=colmap["mjd"], metric_name="Number of Exposures") for sql in sql_per_and_all_filters: bundle = metric_bundles.MetricBundle(metric, slicer, sql, display_dict=displayDict) bundle_list.append(bundle) # The alt/az plots of all the pointings slicer = spatial_slicer( nside=nside, lat_col=colmap["alt"], lon_col=colmap["az"], lat_lon_deg=colmap["raDecDeg"], use_cache=False, ) metric = metrics.CountMetric(colmap["mjd"], metric_name="Nvisits as function of Alt/Az") plotFuncs = [plots.LambertSkyMap()] plotDict = {"norm": "log"} for sql in sql_per_and_all_filters: bundle = metric_bundles.MetricBundle( metric, slicer, sql, plot_funcs=plotFuncs, display_dict=displayDict, plot_dict=plotDict, ) bundle_list.append(bundle) # alt az of long gaps sql = "scheduler_note = 'long'" metric = metrics.CountMetric(colmap["mjd"], metric_name="Nvisits long") bundle = metric_bundles.MetricBundle( metric, slicer, sql, plot_funcs=plotFuncs, display_dict=displayDict, plot_dict=plotDict, ) bundle_list.append(bundle) sql = "scheduler_note like 'blob_long%'" metric = metrics.CountMetric(colmap["mjd"], metric_name="Nvisits blob long") bundle = metric_bundles.MetricBundle( metric, slicer, sql, plot_funcs=plotFuncs, display_dict=displayDict, plot_dict=plotDict, ) bundle_list.append(bundle) sql = "scheduler_note like '%neo%' or scheduler_note like '%near_sun%'" metric = metrics.CountMetric(colmap["mjd"], metric_name="Nvisits twilight near sun") bundle = metric_bundles.MetricBundle( metric, slicer, sql, plot_funcs=plotFuncs, display_dict=displayDict, plot_dict=plotDict, ) bundle_list.append(bundle) # alt,az pf ToO sql = "scheduler_note like 'ToO%'" metric = metrics.CountMetric(colmap["mjd"], metric_name="Nvisits long") bundle = metric_bundles.MetricBundle( metric, slicer, sql, plot_funcs=plotFuncs, display_dict=displayDict, plot_dict=plotDict, ) bundle_list.append(bundle) # Things to check per night # Open Shutter per night displayDict = {"group": "Pointing Efficency", "order": 2} slicer = slicers.OneDSlicer(slice_col_name=colmap["night"], bin_size=1, bin_min=-0.5) metric = metrics.OpenShutterFractionMetric( slew_time_col=colmap["slewtime"], exp_time_col=colmap["exptime"], visit_time_col=colmap["visittime"], ) sql = sql_constraint bundle = metric_bundles.MetricBundle( metric, slicer, sql, summary_metrics=standardStats, display_dict=displayDict ) bundle_list.append(bundle) # Number of filter changes per night slicer = slicers.OneDSlicer(slice_col_name=colmap["night"], bin_size=1, bin_min=-0.5) metric = metrics.NChangesMetric( col=colmap["filter"], order_by=colmap["mjd"], metric_name="Filter Changes" ) bundle = metric_bundles.MetricBundle( metric, slicer, sql, summary_metrics=standardStats, display_dict=displayDict ) bundle_list.append(bundle) # A few basic maps # Number of observations, coadded depths extended_stats = standardStats.copy() extended_stats.append(metrics.AreaSummaryMetric(decreasing=True, metric_name="top18k")) extended_stats.append(metrics.PercentileMetric(col="metricdata", percentile=10)) displayDict = {"group": "Basic Maps", "order": 3} slicer = spatial_slicer( nside=nside, lat_col=colmap["dec"], lon_col=colmap["ra"], lat_lon_deg=colmap["raDecDeg"], ) metric = metrics.CountMetric(col=colmap["mjd"]) plotDict = {"percentile_clip": 95.0} for sql in sql_per_and_all_filters: bundle = metric_bundles.MetricBundle( metric, slicer, sql, summary_metrics=extended_stats, display_dict=displayDict, plot_dict=plotDict, ) bundle_list.append(bundle) metric = metrics.Coaddm5Metric(m5_col=colmap["fiveSigmaDepth"]) for sql in sql_per_and_all_filters: bundle = metric_bundles.MetricBundle( metric, slicer, sql, summary_metrics=extended_stats, display_dict=displayDict, ) bundle_list.append(bundle) # Let's look at two years displayDict = {"group": "Roll Check", "order": 1} rolling_metrics = [] rolling_metrics.append(metrics.CountMetric(col=colmap["mjd"], metric_name="Year1.0Count")) rolling_metrics.append(metrics.CountMetric(col=colmap["mjd"], metric_name="Year2.5Count")) rolling_metrics.append(metrics.CountMetric(col=colmap["mjd"], metric_name="Year3.5Count")) rolling_sqls = [] rolling_sqls.append("night < 365.25") rolling_sqls.append("night > %f and night < %f" % (365.25 * 2.5, 365.25 * 3.5)) rolling_sqls.append("night > %f and night < %f" % (365.25 * 3.5, 365.25 * 4.5)) for metric, sql in zip(rolling_metrics, rolling_sqls): bundle = metric_bundles.MetricBundle( metric, slicer, sql, summary_metrics=extended_stats, plot_dict=plotDict, display_dict=displayDict, ) bundle_list.append(bundle) # Make a cumulative plot of a WFD spot sql = "scheduler_note not like '%NEO%' and scheduler_note not like '%near_sun%'" uslicer = slicers.UserPointsSlicer(ra=0, dec=-20) metric = metrics.CumulativeMetric() metricb = metric_bundles.MetricBundle( metric, uslicer, sql, plot_funcs=[plots.XyPlotter()], run_name=run_name, display_dict=displayDict, ) metricb.summary_metrics = [] bundle_list.append(metricb) # Checking a few basic science things # Maybe check astrometry, observation pairs, SN plotDict = {"percentile_clip": 95.0} displayDict = {"group": "Science", "subgroup": "Astrometry", "order": 4} stackerList = [] stacker = stackers.ParallaxFactorStacker( ra_col=colmap["ra"], dec_col=colmap["dec"], degrees=colmap["raDecDeg"], date_col=colmap["mjd"], ) stackerList.append(stacker) astrom_stats = [ metrics.AreaSummaryMetric(decreasing=False, metric_name="best18k"), metrics.PercentileMetric(col="metricdata", percentile=90), ] displayDict["caption"] = r"Parallax precision of an $r=20$ flat SED star" metric = metrics.ParallaxMetric( m5_col=colmap["fiveSigmaDepth"], filter_col=colmap["filter"], seeing_col=colmap["seeingGeom"], ) sql = sql_constraint bundle = metric_bundles.MetricBundle( metric, slicer, sql, plot_funcs=subsetPlots, display_dict=displayDict, stacker_list=stackerList, plot_dict=plotDict, summary_metrics=astrom_stats, ) bundle_list.append(bundle) displayDict["caption"] = r"Proper motion precision of an $r=20$ flat SED star" metric = metrics.ProperMotionMetric( m5_col=colmap["fiveSigmaDepth"], mjd_col=colmap["mjd"], filter_col=colmap["filter"], seeing_col=colmap["seeingGeom"], ) bundle = metric_bundles.MetricBundle( metric, slicer, sql, plot_funcs=subsetPlots, display_dict=displayDict, plot_dict=plotDict, summary_metrics=astrom_stats, ) bundle_list.append(bundle) # Solar system stuff displayDict["caption"] = "Fraction of observations that are in pairs" displayDict["subgroup"] = "Solar System" sql = '%s (filter="g" or filter="r" or filter="i")' % sqlC pairSlicer = slicers.HealpixSlicer( nside=pairnside, lat_col=colmap["dec"], lon_col=colmap["ra"], lat_lon_deg=colmap["raDecDeg"], ) metric = metrics.PairFractionMetric(mjd_col=colmap["mjd"]) bundle = metric_bundles.MetricBundle( metric, pairSlicer, sql, plot_funcs=subsetPlots, display_dict=displayDict ) bundle_list.append(bundle) # stats from the scheduler_note column if "scheduler_note" in colmap.keys(): displayDict = {"group": "Basic Stats", "subgroup": "Percent root stats"} metric = metrics.StringCountMetric( col=colmap["scheduler_note_root"], percent=True, metric_name="Percents", clip_end=False ) sql = "" slicer = slicers.UniSlicer() bundle = metric_bundles.MetricBundle(metric, slicer, sql, display_dict=displayDict) bundle_list.append(bundle) displayDict["subgroup"] = "Count root Stats" metric = metrics.StringCountMetric( col=colmap["scheduler_note_root"], metric_name="Counts", clip_end=False ) bundle = metric_bundles.MetricBundle(metric, slicer, sql, display_dict=displayDict) bundle_list.append(bundle) # For pairs and twilights if "scheduler_note" in colmap.keys(): displayDict = {"group": "Basic Stats", "subgroup": "Percent stats"} metric = metrics.StringCountMetric( col=colmap["scheduler_note"], percent=True, metric_name="Percents", clip_end=False ) sql = ( "scheduler_note like 'pair%%' or scheduler_note like 'twilight%%' or scheduler_note like 'blob%%'" ) slicer = slicers.UniSlicer() bundle = metric_bundles.MetricBundle(metric, slicer, sql, display_dict=displayDict) bundle_list.append(bundle) displayDict["subgroup"] = "Count Stats" metric = metrics.StringCountMetric(col=colmap["scheduler_note"], metric_name="Counts", clip_end=False) bundle = metric_bundles.MetricBundle(metric, slicer, sql, display_dict=displayDict) bundle_list.append(bundle) # DDF progress ddf_surveys = ddf_locations() displayDict["group"] = "DDF" displayDict["subgroup"] = "" for ddf in ddf_surveys: label = ddf.replace("DD:", "") sql = 'scheduler_note like "%s%%"' % ("DD:" + label) slicer = slicers.UniSlicer() metric = metrics.CumulativeMetric() metricb = metric_bundles.MetricBundle( metric, slicer, sql, plot_funcs=[plots.XyPlotter()], run_name=run_name, display_dict=displayDict, ) metricb.summary_metrics = [] bundle_list.append(metricb) # Add a sky saturation check displayDict = {} displayDict["group"] = "Basic Stats" displayDict["subgroup"] = "Saturation" sql = "" metric = metrics.SkySaturationMetric() summary = metrics.SumMetric() slicer = slicers.UniSlicer() bundle_list.append( metric_bundles.MetricBundle(metric, slicer, sql, summary_metrics=summary, display_dict=displayDict) ) benchmarkArea = 18000 benchmarkNvisits = 825 minNvisits = 750 displayDict = {"group": "SRD", "subgroup": "FO metrics", "order": 0} # Configure the count metric which is what is used for f0 slicer. metric = metrics.CountExplimMetric(metric_name="fO") plotDict = { "xlabel": "Number of Visits", "asky": benchmarkArea, "n_visits": minNvisits, "x_min": 0, "x_max": 1500, } summaryMetrics = [ metrics.FOArea( nside=nside, norm=False, metric_name="fOArea", asky=benchmarkArea, n_visit=benchmarkNvisits, ), metrics.FOArea( nside=nside, norm=True, metric_name="fOArea/benchmark", asky=benchmarkArea, n_visit=benchmarkNvisits, ), metrics.FONv( nside=nside, norm=False, metric_name="fONv", asky=benchmarkArea, n_visit=benchmarkNvisits, ), metrics.FONv( nside=nside, norm=True, metric_name="fONv/benchmark", asky=benchmarkArea, n_visit=benchmarkNvisits, ), metrics.FOArea( nside=nside, norm=False, metric_name=f"fOArea_{minNvisits}", asky=benchmarkArea, n_visit=minNvisits, ), ] caption = "The FO metric evaluates the overall efficiency of observing. " caption += ( "foNv: out of %.2f sq degrees, the area receives at least X and a median of Y visits " "(out of %d, if compared to benchmark). " % (benchmarkArea, benchmarkNvisits) ) caption += ( "fOArea: this many sq deg (out of %.2f sq deg if compared " "to benchmark) receives at least %d visits. " % (benchmarkArea, benchmarkNvisits) ) displayDict["caption"] = caption slicer = slicers.HealpixSlicer(nside=nside) bundle = metric_bundles.MetricBundle( metric, slicer, "", plot_dict=plotDict, display_dict=displayDict, summary_metrics=summaryMetrics, plot_funcs=[plots.FOPlot()], ) bundle_list.append(bundle) # check that we have coverage for the first year displayDict = {} displayDict["group"] = "Year 1" displayDict["subgroup"] = "Coverage" nside_foot = 32 slicer = slicers.HealpixSlicer(nside=nside_foot, badval=0) sky = EuclidOverlapFootprint(nside=nside_foot, smc_radius=4, lmc_radius=6) footprints_hp_array, labels = sky.return_maps() for filtername in filternames: sql = "filter='%s' and night < 365" % filtername metric = metrics.CountMetric(col="night", metric_name="N year 1") summary_stat = metrics.FootprintFractionMetric( footprint=footprints_hp_array[filtername], n_min=3, ) bundle = metric_bundles.MetricBundle( metric, slicer, sql, display_dict=displayDict, summary_metrics=summary_stat, plot_funcs=subsetPlots, plot_dict={"color_max": 10}, ) bundle_list.append(bundle) # Some ToO stats displayDict = {"group": "ToO", "order": 1} slicer = spatial_slicer( nside=nside, lat_col=colmap["dec"], lon_col=colmap["ra"], lat_lon_deg=colmap["raDecDeg"], ) for filtername in filternames: sql = "filter='%s' and scheduler_note like 'ToO%%'" % filtername metric = metrics.CountMetric(col=colmap["mjd"], metric_name="N ToO") bundle = metric_bundles.MetricBundle( metric, slicer, sql, display_dict=displayDict, summary_metrics=extended_stats, plot_funcs=subsetPlots, plot_dict={}, ) bundle_list.append(bundle) too_sqls = ["scheduler_note like 'ToO, %" + "t%i'" % hour for hour in [0, 1, 2, 4, 24, 48]] + [ "scheduler_note like 'ToO, %'" ] slicer = slicers.UniSlicer() for sql in too_sqls: metric = metrics.CountMetric(col="night") bundle = metric_bundles.MetricBundle( metric, slicer, sql, display_dict=displayDict, ) bundle_list.append(bundle) metric = metrics.CountUniqueMetric(col="night") bundle = metric_bundles.MetricBundle( metric, slicer, sql, display_dict=displayDict, ) bundle_list.append(bundle) for b in bundle_list: b.set_run_name(run_name) bd = metric_bundles.make_bundles_dict_from_list(bundle_list) # Add hourglass plots. hrDict = hourglassPlots(colmap=colmap, runName=run_name, nyears=nyears, extraSql=sql_constraint) bd.update(hrDict) # Add basic slew stats. try: slewDict = slewBasics(colmap=colmap, run_name=run_name) bd.update(slewDict) except KeyError as e: warnings.warn("Could not add slew stats: missing required key %s from colmap" % (e)) return bd