Source code for rubin_sim.maf.batches.visitdepth_batch

"""Sets of metrics to look at general sky coverage -
nvisits/coadded depth/Teff.
"""

__all__ = ("nvisitsM5Maps", "tEffMetrics", "nvisitsPerNight", "nvisitsPerSubset")

import copy

import numpy as np

import rubin_sim.maf.metric_bundles as mb
import rubin_sim.maf.metrics as metrics
import rubin_sim.maf.slicers as slicers
import rubin_sim.maf.stackers as stackers
import rubin_sim.maf.utils as mafUtils

from .col_map_dict import col_map_dict
from .common import filter_list, standard_summary


[docs] def nvisitsM5Maps( colmap=None, runName="opsim", extraSql=None, extraInfoLabel=None, slicer=None, runLength=10.0, ): """Generate maps of the number of visits and coadded depth (with and without dust extinction) in all bands and per filter. Parameters ---------- colmap : `dict`, optional A dictionary with a mapping of column names. runName : `str`, optional The name of the simulated survey. extraSql : `str`, optional Additional constraint to add to any sql constraints. extraInfoLabel : `str`, optional Additional info_label to add before any below (i.e. "WFD"). slicer : `rubin_sim.maf.slicer` or None, optional Optionally, use something other than an nside=64 healpix slicer runLength : `float`, optional Length of the simulated survey, for scaling values for the plot limits. Returns ------- metric_bundleDict : `dict` of `maf.MetricBundle` """ if colmap is None: colmap = col_map_dict() bundleList = [] subgroup = extraInfoLabel if subgroup is None: subgroup = "All visits" raCol = colmap["ra"] decCol = colmap["dec"] degrees = colmap["raDecDeg"] # Set up basic all and per filter sql constraints. filterlist, colors, orders, sqls, info_label = filter_list( all=True, extra_sql=extraSql, extra_info_label=extraInfoLabel ) # Set up some values to make nicer looking plots. benchmarkVals = mafUtils.scale_benchmarks(runLength, benchmark="design") # Check that nvisits is not set to zero (for very short run length). for f in benchmarkVals["nvisits"]: if benchmarkVals["nvisits"][f] == 0: print("Updating benchmark nvisits value in %s to be nonzero" % (f)) benchmarkVals["nvisits"][f] = 1 benchmarkVals["coaddedDepth"] = mafUtils.calc_coadded_depth( benchmarkVals["nvisits"], benchmarkVals["singleVisitDepth"] ) # Scale the n_visit ranges for the runLength. nvisitsRange = { "u": [20, 80], "g": [50, 150], "r": [100, 250], "i": [100, 250], "z": [100, 300], "y": [100, 300], "all": [700, 1200], } scale = runLength / 10.0 for f in nvisitsRange: for i in [0, 1]: nvisitsRange[f][i] = int(np.floor(nvisitsRange[f][i] * scale)) # Generate Nvisit maps in all and per filters displayDict = {"group": "Nvisits Maps", "subgroup": subgroup} metric = metrics.CountMetric(colmap["mjd"], metric_name="NVisits", units="") if slicer is None: slicer = slicers.HealpixSlicer(nside=64, lat_col=decCol, lon_col=raCol, lat_lon_deg=degrees) slicerDust = slicers.HealpixSlicer( nside=64, lat_col=decCol, lon_col=raCol, lat_lon_deg=degrees, use_cache=False, ) else: # If there is already a slicer set up, ensure we have one for dust # which is NOT using cache. slicerDust = copy.deepcopy(slicer) slicerDust.use_cache = False for f in filterlist: sql = sqls[f] displayDict["caption"] = f"Number of visits per healpix in {info_label[f]}." displayDict["order"] = orders[f] bin_size = 2 if f == "all": bin_size = 5 plotDict = { "x_min": nvisitsRange[f][0], "x_max": nvisitsRange[f][1], "color_min": nvisitsRange[f][0], "color_max": nvisitsRange[f][1], "bin_size": bin_size, "color": colors[f], } bundle = mb.MetricBundle( metric, slicer, sql, info_label=info_label[f], display_dict=displayDict, plot_dict=plotDict, summary_metrics=standard_summary(), ) bundleList.append(bundle) # Generate Coadded depth maps per filter displayDict = {"group": "Coadded M5 Maps", "subgroup": subgroup} metric = metrics.Coaddm5Metric(m5_col=colmap["fiveSigmaDepth"], metric_name="CoaddM5") for f in filterlist: # Skip "all" for coadded depth. if f == "all": continue sql = sqls[f] displayDict["caption"] = f"Coadded depth per healpix in {info_label[f]}." displayDict["caption"] += " More positive numbers indicate fainter limiting magnitudes." displayDict["order"] = orders[f] plotDict = { "percentile_clip": 98, "color": colors[f], } bundle = mb.MetricBundle( metric, slicer, sql, info_label=info_label[f], display_dict=displayDict, plot_dict=plotDict, summary_metrics=standard_summary(), ) bundleList.append(bundle) # Add Coadded depth maps per filter WITH extragalactic extinction added displayDict = {"group": "Extragalactic Coadded M5 Maps", "subgroup": subgroup} metric = metrics.ExgalM5(m5_col=colmap["fiveSigmaDepth"], metric_name="Exgal_CoaddM5") for f in filterlist: # Skip "all" for coadded depth. if f == "all": continue sql = sqls[f] displayDict["caption"] = ( "Coadded depth per healpix for extragalactic purposes " "(i.e. combined with dust extinction maps), " f"in {info_label[f]}." ) displayDict["caption"] += " More positive numbers indicate fainter limiting magnitudes." displayDict["order"] = orders[f] plotDict = { "percentile_clip": 90, "color": colors[f], } bundle = mb.MetricBundle( metric, slicerDust, sql, info_label=info_label[f], display_dict=displayDict, plot_dict=plotDict, summary_metrics=standard_summary(), ) bundleList.append(bundle) # Set the run_name for all bundles and return the bundleDict. for b in bundleList: b.set_run_name(runName) return mb.make_bundles_dict_from_list(bundleList)
[docs] def tEffMetrics( colmap=None, runName="opsim", extraSql=None, extraInfoLabel=None, slicer=None, ): """Generate a series of Teff metrics. Teff total, per night, and sky maps (all and per filter). Parameters ---------- colmap : `dict`, optional A dictionary with a mapping of column names. runName : `str`, optional The name of the simulated survey. extraSql : `str`, optional Additional constraint to add to any sql constraints. extraInfoLabel : `str`, optional Additional info_label to add before any below (i.e. "WFD"). slicer : `rubin_sim.maf.BaseSlicer` or None, optional Optionally, use something other than an nside=64 healpix slicer Returns ------- metric_bundleDict : `dict` of `maf.MetricBundle` """ if colmap is None: colmap = col_map_dict() bundleList = [] subgroup = extraInfoLabel if subgroup is None: subgroup = "All visits" raCol = colmap["ra"] decCol = colmap["dec"] degrees = colmap["raDecDeg"] if slicer is not None: skyslicer = slicer else: skyslicer = slicers.HealpixSlicer(nside=64, lat_col=decCol, lon_col=raCol, lat_lon_deg=degrees) # Set up basic all and per filter sql constraints. filterlist, colors, orders, sqls, info_label = filter_list( all=True, extra_sql=extraSql, extra_info_label=extraInfoLabel ) if info_label["all"] is None: info_label["all"] = "All visits" # Total Teff and normalized Teff. displayDict = {"group": "T_eff Summary", "subgroup": subgroup} displayDict["caption"] = "Total effective time of the survey (see Teff metric)." displayDict["order"] = 0 metric = metrics.SumMetric(col="t_eff", metric_name="Total Teff") slicer = slicers.UniSlicer() bundle = mb.MetricBundle( metric, slicer, constraint=sqls["all"], display_dict=displayDict, info_label=info_label["all"], ) bundleList.append(bundle) displayDict["caption"] = "Normalized total effective time of the survey (see Teff metric)." displayDict["order"] = 1 metric = metrics.MeanMetric(col="t_eff", metric_name="Normalized Teff") normalized_teff_stacker = stackers.TeffStacker(normed=True) slicer = slicers.UniSlicer() bundle = mb.MetricBundle( metric, slicer, constraint=sqls["all"], stacker_list=[normalized_teff_stacker], display_dict=displayDict, info_label=info_label["all"], ) bundleList.append(bundle) # Generate Teff maps in all and per filters displayDict = {"group": "T_eff Maps", "subgroup": subgroup} metric = metrics.MeanMetric(col="t_eff", metric_name="Normalized Teff") normalized_teff_stacker = stackers.TeffStacker(normed=True) for f in filterlist: displayDict["caption"] = "Normalized effective time of the survey, for %s" % info_label[f] displayDict["order"] = orders[f] plotDict = {"color": colors[f]} bundle = mb.MetricBundle( metric, skyslicer, sqls[f], stacker_list=[normalized_teff_stacker], info_label=info_label[f], display_dict=displayDict, plot_dict=plotDict, summary_metrics=standard_summary(), ) bundleList.append(bundle) # Set the run_name for all bundles and return the bundleDict. for b in bundleList: b.set_run_name(runName) return mb.make_bundles_dict_from_list(bundleList)
[docs] def nvisitsPerNight( colmap=None, runName="opsim", binNights=1, extraSql=None, extraInfoLabel=None, subgroup=None, ): """Count the number of visits per night through the survey. Parameters ---------- colmap : `dict` or None, optional A dictionary with a mapping of column names. runName : `str`, optional The name of the simulated survey. Default is "opsim". binNights : `int`, optional Number of nights to count in each bin. extraSql : `str` or None, optional Additional constraint to add to any sql constraints. extraInfoLabel : `str` or None, optional Additional info_label to add before any below (i.e. "WFD"). subgroup : `str` or None, optional Use this for the 'subgroup' in the display_dict, instead of info_label. Returns ------- metric_bundleDict : `dict` of `maf.MetricBundle` """ if colmap is None: colmap = col_map_dict() subgroup = subgroup if subgroup is None: subgroup = extraInfoLabel if subgroup is None: subgroup = "All visits" infoCaption = extraInfoLabel if extraInfoLabel is None: if extraSql is not None: infoCaption = extraSql else: infoCaption = "all visits" bundleList = [] displayDict = {"group": "Nvisits Per Night", "subgroup": subgroup} displayDict["caption"] = "Number of visits per night for %s." % (infoCaption) displayDict["order"] = 0 metric = metrics.CountMetric(colmap["mjd"], metric_name="Nvisits") slicer = slicers.OneDSlicer(slice_col_name=colmap["night"], bin_size=binNights) bundle = mb.MetricBundle( metric, slicer, extraSql, info_label=infoCaption, display_dict=displayDict, summary_metrics=standard_summary(), ) bundleList.append(bundle) # Set the run_name for all bundles and return the bundleDict. for b in bundleList: b.set_run_name(runName) return mb.make_bundles_dict_from_list(bundleList)
[docs] def nvisitsPerSubset( colmap=None, runName="opsim", binNights=1, constraint=None, footprintConstraint=None, extraInfoLabel=None, ): """Look at the distribution of a given sql constraint or footprint constraint's visits, total number and distribution over time (# per night), if possible. Parameters ---------- opsdb : `str` or database connection Name of the opsim sqlite database. colmap : `dict` or None, optional A dictionary with a mapping of column names. runName : `str`, optional The name of the simulated survey. binNights : `int`, optional Number of nights to count in each bin. constraint : `str` or None, optional SQL constraint to add to all metrics. This would be the way to select only a given "Note". footprintConstraint : `np.ndarray` or None, optional Footprint to look for visits within (and then identify via WFDlabelStacker). The footprint = a full length heapix array, filled with 0/1 values. extraInfoLabel : `str` or None, optional Additional info_label to add before any below (i.e. "WFD"). Returns ------- metric_bundleDict : `dict` of `rubin_sim.maf.MetricBundle` """ if colmap is None: colmap = col_map_dict() bdict = {} bundleList = [] if footprintConstraint is None: if extraInfoLabel is None and constraint is not None: extraInfoLabel += " %s" % constraint # Nvisits per night, this constraint. bdict.update( nvisitsPerNight( colmap=colmap, runName=runName, binNights=binNights, extraSql=constraint, extraInfoLabel=extraInfoLabel, ) ) # Nvisits total, this constraint. metric = metrics.CountMetric(colmap["mjd"], metric_name="Nvisits") slicer = slicers.UniSlicer() displayDict = { "group": "Nvisit Summary", "subgroup": extraInfoLabel, } displayDict["caption"] = f"Total number of visits for {extraInfoLabel}." bundle = mb.MetricBundle( metric, slicer, constraint, info_label=extraInfoLabel, display_dict=displayDict, ) bundleList.append(bundle) # Or count the total number of visits that contribute # towards a given footprint if footprintConstraint is not None: # Set up a stacker to use this footprint to label visits if extraInfoLabel is None: extraInfoLabel = "Footprint" footprintStacker = stackers.WFDlabelStacker( footprint=footprintConstraint, fp_threshold=0.4, area_id_name=extraInfoLabel, exclude_dd=True, ) metric = metrics.CountSubsetMetric( col="area_id", subset=extraInfoLabel, units="#", metric_name="Nvisits" ) slicer = slicers.UniSlicer() displayDict = { "group": "Nvisit Summary", "subgroup": extraInfoLabel, "caption": f"Visits within footprint {extraInfoLabel}.", } bundle = mb.MetricBundle( metric, slicer, constraint, stacker_list=[footprintStacker], info_label=extraInfoLabel, display_dict=displayDict, ) bundleList.append(bundle) for b in bundleList: b.set_run_name(runName) bdict.update(mb.make_bundles_dict_from_list(bundleList)) return bdict