Source code for rubin_sim.maf.batches.radar_limited

__all__ = ("radar_limited",)

import healpy as hp
import numpy as np

import rubin_sim.maf as maf
import rubin_sim.maf.maps as maps
import rubin_sim.maf.metric_bundles as mb
import rubin_sim.maf.metrics as metrics
import rubin_sim.maf.plots as plots
import rubin_sim.maf.slicers as slicers

from .common import filter_list, lightcurve_summary, standard_summary


[docs] def radar_limited( runName="run name", nside=64, benchmarkArea=18000, benchmarkNvisits=825, minNvisits=750, long_microlensing=True, srd_only=False, mjd0=None, ): """A batch of metrics for looking at survey performance relative to the SRD and the main science drivers of LSST. Parameters ---------- runName : `str`, optional The simulation run name that should appear as plot titles. benchmarkArea : `float`, optional The area to use for SRD metrics (sq degrees) benchmarkNvisits : `int`, optional The number of visits to use for SRD metrics. minNvisits : `int`, optional The minimum number of visits to use for SRD metrics. long_microlensing : `bool`, optional Add the longer running microlensing metrics to the batch (a subset of crossing times only) srd_only : `bool`, optional Only return the SRD metrics mjd0 : float, optional The modified Julian date start date of the survey. Returns ------- metric_bundleDict : `dict` of `maf.MetricBundle` """ bundleList = [] # Get some standard per-filter coloring and sql constraints filterlist, colors, filterorders, filtersqls, filterinfo_label = filter_list(all=False) ( allfilterlist, allcolors, allfilterorders, allfiltersqls, allfilterinfo_label, ) = filter_list(all=True) standardStats = standard_summary(with_count=False) # This is the default slicer for most purposes in this batch. # Note that the cache is on - # if the metric requires a dust map, this is not the right slicer to use. healpixslicer = slicers.HealpixSlicer(nside=nside, use_cache=True) subsetPlots = [plots.HealpixSkyMap(), plots.HealpixHistogram()] ######################### ######################### # SRD, DM, etc ######################### ######################### # fO metric displayDict = {"group": "SRD", "subgroup": "FO metrics", "order": 0} # Configure the count metric which is what is used for f0 slicer. metric = metrics.CountExplimMetric(col="observationStartMJD", metric_name="fO") plotDict = { "xlabel": "Number of Visits", "asky": benchmarkArea, "n_visits": benchmarkNvisits, "x_min": 0, "x_max": 1500, } summaryMetrics = [ metrics.FOArea( nside=nside, norm=False, metric_name="fOArea", asky=benchmarkArea, n_visit=benchmarkNvisits, ), metrics.FOArea( nside=nside, norm=True, metric_name="fOArea/benchmark", asky=benchmarkArea, n_visit=benchmarkNvisits, ), metrics.FONv( nside=nside, norm=False, metric_name="fONv", asky=benchmarkArea, n_visit=benchmarkNvisits, ), metrics.FONv( nside=nside, norm=True, metric_name="fONv/benchmark", asky=benchmarkArea, n_visit=benchmarkNvisits, ), metrics.FOArea( nside=nside, norm=False, metric_name=f"fOArea_{minNvisits}", asky=benchmarkArea, n_visit=minNvisits, ), ] caption = "The FO metric evaluates the overall efficiency of observing. " caption += ( "foNv: out of %.2f sq degrees, the area receives at least X and a median of Y visits " "(out of %d, if compared to benchmark). " % (benchmarkArea, benchmarkNvisits) ) caption += ( "fOArea: this many sq deg (out of %.2f sq deg if compared " "to benchmark) receives at least %d visits. " % (benchmarkArea, benchmarkNvisits) ) displayDict["caption"] = caption bundle = mb.MetricBundle( metric, healpixslicer, "", plot_dict=plotDict, display_dict=displayDict, summary_metrics=summaryMetrics, plot_funcs=[plots.FOPlot()], ) bundleList.append(bundle) # Single visit depth distribution displayDict["subgroup"] = "Visit Depths" # Histogram values over all and per filter. value = "fiveSigmaDepth" for f in filterlist: displayDict["caption"] = "Histogram of %s" % (value) displayDict["caption"] += " for %s." % (filterinfo_label[f]) displayDict["order"] = filterorders[f] m = metrics.CountMetric(value, metric_name="%s Histogram" % (value)) slicer = slicers.OneDSlicer(slice_col_name=value) bundle = mb.MetricBundle( m, slicer, filtersqls[f], display_dict=displayDict, ) bundleList.append(bundle) displayDict["caption"] = "" for f in filterlist: slicer = maf.UniSlicer() metric = maf.MedianMetric(col="fiveSigmaDepth") bundle = mb.MetricBundle( metric, slicer, filtersqls[f], display_dict=displayDict, ) bundleList.append(bundle) ############## # Astrometry ############### rmags_para = [22.4, 24.0] rmags_pm = [20.5, 24.0] # Set up parallax/dcr stackers. parallaxStacker = maf.ParallaxFactorStacker() dcrStacker = maf.DcrStacker() # Set up parallax metrics. displayDict["subgroup"] = "Parallax" displayDict["order"] += 1 # Expected error on parallax at 10 AU. plotmaxVals = (2.0, 15.0) good_parallax_limit = 11.5 summary = [ metrics.AreaSummaryMetric( area=18000, reduce_func=np.median, decreasing=False, metric_name="Median Parallax Uncert (18k)", ), metrics.AreaThresholdMetric( upper_threshold=good_parallax_limit, metric_name="Area better than %.1f mas uncertainty" % good_parallax_limit, ), ] summary.append(metrics.PercentileMetric(percentile=95, metric_name="95th Percentile Parallax Uncert")) summary.extend(standard_summary()) for rmag, plotmax in zip(rmags_para, plotmaxVals): plotDict = {"x_min": 0, "x_max": plotmax, "color_min": 0, "color_max": plotmax} metric = metrics.ParallaxMetric( metric_name="Parallax Uncert @ %.1f" % (rmag), rmag=rmag, normalize=False, ) bundle = mb.MetricBundle( metric, healpixslicer, "", stacker_list=[parallaxStacker], display_dict=displayDict, plot_dict=plotDict, summary_metrics=summary, plot_funcs=subsetPlots, ) bundleList.append(bundle) displayDict["order"] += 1 # Parallax normalized to 'best possible'. # This separates the effect of cadence from depth. for rmag in rmags_para: metric = metrics.ParallaxMetric( metric_name="Normalized Parallax Uncert @ %.1f" % (rmag), rmag=rmag, normalize=True, ) bundle = mb.MetricBundle( metric, healpixslicer, "", stacker_list=[parallaxStacker], display_dict=displayDict, summary_metrics=standard_summary(), plot_funcs=subsetPlots, ) bundleList.append(bundle) displayDict["order"] += 1 # Parallax factor coverage. for rmag in rmags_para: metric = metrics.ParallaxCoverageMetric(metric_name="Parallax Coverage @ %.1f" % (rmag), rmag=rmag) bundle = mb.MetricBundle( metric, healpixslicer, "", stacker_list=[parallaxStacker], display_dict=displayDict, summary_metrics=standard_summary(), plot_funcs=subsetPlots, ) bundleList.append(bundle) displayDict["order"] += 1 # Parallax problems can be caused by HA and DCR degeneracies. # Check their correlation. for rmag in rmags_para: metric = metrics.ParallaxDcrDegenMetric( metric_name="Parallax-DCR degeneracy @ %.1f" % (rmag), rmag=rmag ) caption = "Correlation between parallax offset magnitude and hour angle for a r=%.1f star." % (rmag) caption += " (0 is good, near -1 or 1 is bad)." bundle = mb.MetricBundle( metric, healpixslicer, "", stacker_list=[dcrStacker, parallaxStacker], display_dict=displayDict, summary_metrics=standard_summary(), plot_funcs=subsetPlots, ) bundleList.append(bundle) displayDict["order"] += 1 # Proper Motion metrics. displayDict["subgroup"] = "Proper Motion" displayDict["order"] = 0 # Proper motion errors. plotmaxVals = (1.0, 5.0) summary = [ metrics.AreaSummaryMetric( area=18000, reduce_func=np.median, decreasing=False, metric_name="Median Proper Motion Uncert (18k)", ) ] summary.append(metrics.PercentileMetric(metric_name="95th Percentile Proper Motion Uncert")) summary.extend(standard_summary()) for rmag, plotmax in zip(rmags_pm, plotmaxVals): plotDict = {"x_min": 0, "x_max": plotmax, "color_min": 0, "color_max": plotmax} metric = metrics.ProperMotionMetric( metric_name="Proper Motion Uncert @ %.1f" % rmag, rmag=rmag, normalize=False, ) bundle = mb.MetricBundle( metric, healpixslicer, "", display_dict=displayDict, plot_dict=plotDict, summary_metrics=summary, plot_funcs=subsetPlots, ) bundleList.append(bundle) displayDict["order"] += 1 # Normalized proper motion. for rmag in rmags_pm: metric = metrics.ProperMotionMetric( metric_name="Normalized Proper Motion Uncert @ %.1f" % rmag, rmag=rmag, normalize=True, ) bundle = mb.MetricBundle( metric, healpixslicer, "", display_dict=displayDict, summary_metrics=standard_summary(), plot_funcs=subsetPlots, ) bundleList.append(bundle) displayDict["order"] += 1 # DCR precision metric displayDict["subgroup"] = "DCR" displayDict["order"] = 0 plotDict = {"caption": "Precision of DCR slope.", "percentile_clip": 95} metric = metrics.DcrPrecisionMetric() bundle = mb.MetricBundle( metric, healpixslicer, "", plot_dict=plotDict, plot_funcs=subsetPlots, display_dict=displayDict, summary_metrics=standard_summary(with_count=False), ) bundleList.append(bundle) # Rapid Revisit displayDict["subgroup"] = "Rapid Revisits" displayDict["order"] = 0 # Calculate the actual number of revisits within 30 minutes. dTmax = 30 # time in minutes m2 = metrics.NRevisitsMetric(d_t=dTmax, normed=False, metric_name="NumberOfQuickRevisits") plotDict = {"color_min": 400, "color_max": 2000, "x_min": 400, "x_max": 2000} caption = "Number of consecutive visits with return times faster than %.1f minutes, " % (dTmax) caption += "in any filter. " displayDict["caption"] = caption bundle = mb.MetricBundle( m2, healpixslicer, "", plot_dict=plotDict, plot_funcs=subsetPlots, display_dict=displayDict, summary_metrics=standard_summary(with_count=False), ) bundleList.append(bundle) displayDict["order"] += 1 # Better version of the rapid revisit requirements: # require a minimum number of visits between # dtMin and dtMax, but also a minimum number of visits # between dtMin and dtPair (the typical pair time). # 1 means the healpix met the requirements (0 means did not). dTmin = 40.0 / 60.0 # (minutes) 40s minimum for rapid revisit range dTpairs = 20.0 # minutes (time when pairs should start kicking in) dTmax = 30.0 # 30 minute maximum for rapid revisit range nOne = 82 # Number of revisits between 40s-30m required nTwo = 28 # Number of revisits between 40s - tPairs required. pix_area = float(hp.nside2pixarea(nside, degrees=True)) scale = pix_area * hp.nside2npix(nside) m1 = metrics.RapidRevisitMetric( metric_name="RapidRevisits", d_tmin=dTmin / 60.0 / 60.0 / 24.0, d_tpairs=dTpairs / 60.0 / 24.0, d_tmax=dTmax / 60.0 / 24.0, min_n1=nOne, min_n2=nTwo, ) plotDict = { "x_min": 0, "x_max": 1, "color_min": 0, "color_max": 1, "log_scale": False, } cutoff1 = 0.9 summaryStats = [metrics.FracAboveMetric(cutoff=cutoff1, scale=scale, metric_name="Area (sq deg)")] caption = "Rapid Revisit: area that receives at least %d visits between %.3f and %.1f minutes, " % ( nOne, dTmin, dTmax, ) caption += "with at least %d of those visits falling between %.3f and %.1f minutes. " % ( nTwo, dTmin, dTpairs, ) caption += ( 'Summary statistic "Area" indicates the area on the sky which meets this requirement.' " (SRD design specification is 2000 sq deg)." ) displayDict["caption"] = caption displayDict["order"] = 0 bundle = mb.MetricBundle( m1, healpixslicer, "", plot_dict=plotDict, plot_funcs=subsetPlots, display_dict=displayDict, summary_metrics=summaryStats, ) bundleList.append(bundle) # For SRD batch only, return here. if srd_only: for b in bundleList: b.set_run_name(runName) bundleDict = mb.make_bundles_dict_from_list(bundleList) return bundleDict # Year Coverage displayDict["subgroup"] = "Year Coverage" metric = metrics.YearCoverageMetric() for f in filterlist: plotDict = {"color_min": 7, "color_max": 10, "color": colors[f]} displayDict["caption"] = f"Number of years of coverage in {f} band." displayDict["order"] = filterorders[f] summary = [ metrics.AreaSummaryMetric( area=18000, reduce_func=np.mean, decreasing=True, metric_name="N Years (18k) %s" % f, ) ] bundle = mb.MetricBundle( metric, healpixslicer, filtersqls[f], plot_dict=plotDict, info_label=filterinfo_label[f], display_dict=displayDict, summary_metrics=summary, ) bundleList.append(bundle) ######################### ######################### # Galaxies ######################### ######################### # Run this per filter, to look at variations in # counts of galaxies in blue bands? displayDict = { "group": "Galaxies", "subgroup": "Galaxy Counts", "order": 0, "caption": None, } # make sure slicer has cache off slicer = slicers.HealpixSlicer(nside=nside, use_cache=False) displayDict["subgroup"] = "Surface Brightness" summary = [metrics.MedianMetric()] for filtername in "ugrizy": displayDict["caption"] = "Surface brightness limit in %s, no extinction applied." % filtername displayDict["order"] = filterorders[f] sql = 'filter="%s"' % filtername metric = metrics.SurfaceBrightLimitMetric() bundle = mb.MetricBundle( metric, healpixslicer, sql, display_dict=displayDict, summary_metrics=summary, plot_funcs=subsetPlots, ) bundleList.append(bundle) ######################### ######################### # Cosmology ######################### ######################### bandpass = "i" nfilters_needed = 6 lim_ebv = 0.2 offset = 0.1 mag_cuts = { 10: 26.0 - offset, } yrs = list(mag_cuts.keys()) maxYr = max(yrs) displayDict = {"group": "Cosmology"} subgroupCount = 1 displayDict["subgroup"] = f"{subgroupCount}: Static Science" ## Static Science # Calculate the static science metrics - effective survey area, # mean/median coadded depth, stdev of # coadded depth and the 3x2ptFoM emulator. dustmap = maps.DustMap(nside=nside, interp=False) pix_area = hp.nside2pixarea(nside, degrees=True) summaryMetrics = [ metrics.MeanMetric(), metrics.MedianMetric(), metrics.RmsMetric(), metrics.CountRatioMetric(norm_val=1 / pix_area, metric_name="Effective Area (deg)"), ] displayDict["order"] = 0 slicer = slicers.HealpixSlicer(nside=nside, use_cache=False) for yr_cut in yrs: ptsrc_lim_mag_i_band = mag_cuts[yr_cut] sqlconstraint = "night <= %s" % (yr_cut * 365.25 + 0.5) sqlconstraint += ' and scheduler_note not like "DD%"' info_label = f"{bandpass} band non-DD year {yr_cut}" ThreebyTwoSummary_simple = metrics.StaticProbesFoMEmulatorMetricSimple( nside=nside, year=yr_cut, metric_name="3x2ptFoM_simple" ) ThreebyTwoSummary = maf.StaticProbesFoMEmulatorMetric(nside=nside, metric_name="3x2ptFoM") m = metrics.ExgalM5WithCuts( lsst_filter=bandpass, n_filters=nfilters_needed, extinction_cut=lim_ebv, depth_cut=ptsrc_lim_mag_i_band, ) caption = ( f"Cosmology/Static science metrics are based on evaluating the region " f"of the sky that meets the requirements (in year {yr_cut} of coverage" f"in all {nfilters_needed} bands, a lower E(B-V) value than {lim_ebv} " f"and at least a coadded depth of {ptsrc_lim_mag_i_band} in {bandpass}. " f"From there the effective survey area, coadded depth, standard deviation of " f"the depth, and a 3x2pt static science figure of merit emulator are " f"calculated using the dust-extinction coadded depth map (over that reduced " f"footprint)." ) displayDict["caption"] = caption bundle = mb.MetricBundle( m, slicer, sqlconstraint, maps_list=[dustmap], info_label=info_label, summary_metrics=summaryMetrics + [ThreebyTwoSummary, ThreebyTwoSummary_simple], display_dict=displayDict, ) displayDict["order"] += 1 bundleList.append(bundle) ## LSS Science # The only metric we have from LSS is the NGals metric - # which is similar to the GalaxyCountsExtended # metric, but evaluated only on the depth/dust cuts footprint. subgroupCount += 1 displayDict["subgroup"] = f"{subgroupCount}: LSS" displayDict["order"] = 0 plotDict = {"n_ticks": 5} ## WL metrics # Calculates the number of visits per pointing, after removing # parts of the footprint due to dust/depth # Count visits in gri bands. subgroupCount += 1 displayDict["subgroup"] = f"{subgroupCount}: WL" displayDict["order"] = 0 sqlconstraint = 'scheduler_note not like "DD%" and (filter="g" or filter="r" or filter="i")' info_label = "gri band non-DD" minExpTime = 15 m = metrics.WeakLensingNvisits( lsst_filter=bandpass, depth_cut=mag_cuts[maxYr], ebvlim=lim_ebv, min_exp_time=minExpTime, metric_name="WeakLensingNvisits", ) slicer = slicers.HealpixSlicer(nside=nside, use_cache=False) displayDict["caption"] = ( f"The number of visits per pointing, over a similarly reduced footprint as " f"described above for the 3x2pt FOM, but allowing areas of sky with " f"fewer than {nfilters_needed} filters. " f"A cutoff of {minExpTime} removes very short visits." ) bundle = mb.MetricBundle( m, slicer, sqlconstraint, maps_list=[dustmap], info_label=info_label, summary_metrics=standardStats, display_dict=displayDict, ) bundleList.append(bundle) # Do the weak lensing per year for year in [10]: sqlconstraint = ( 'scheduler_note not like "DD%"' + ' and (filter="g" or filter="r" or filter="i") and night < %i' % (year * 365.25) ) m = metrics.WeakLensingNvisits( lsst_filter=bandpass, depth_cut=mag_cuts[year], ebvlim=lim_ebv, min_exp_time=minExpTime, metric_name="WeakLensingNvisits_gri_year%i" % year, ) bundle = mb.MetricBundle( m, slicer, sqlconstraint, maps_list=[dustmap], info_label=info_label, summary_metrics=standardStats, display_dict=displayDict, ) bundleList.append(bundle) m = metrics.RIZDetectionCoaddExposureTime( det_bands=["g", "r", "i"], metric_name="gri_exposure_time_year%i" % year ) bundle = mb.MetricBundle( m, slicer, sqlconstraint, maps_list=[dustmap], info_label=info_label, summary_metrics=standardStats, display_dict=displayDict, ) bundleList.append(bundle) sqlconstraint = ( 'scheduler_note not like "DD%"' + ' and (filter="r" or filter="i" or filter="z") and night < %i' % (year * 365.25) ) m = metrics.WeakLensingNvisits( lsst_filter=bandpass, depth_cut=mag_cuts[year], ebvlim=lim_ebv, min_exp_time=minExpTime, metric_name="WeakLensingNvisits_riz_year%i" % year, ) bundle = mb.MetricBundle( m, slicer, sqlconstraint, maps_list=[dustmap], info_label=info_label, summary_metrics=standardStats, display_dict=displayDict, ) bundleList.append(bundle) m = metrics.RIZDetectionCoaddExposureTime( det_bands=["g", "r", "i"], metric_name="riz_exposure_time_year%i" % year ) bundle = mb.MetricBundle( m, slicer, sqlconstraint, maps_list=[dustmap], info_label=info_label, summary_metrics=standardStats, display_dict=displayDict, ) bundleList.append(bundle) subgroupCount += 1 displayDict["subgroup"] = f"{subgroupCount}: Camera Rotator" metric1 = metrics.KuiperMetric("rotSkyPos") metric2 = metrics.KuiperMetric("rotTelPos") caption_root = "Kuiper statistic (0 is uniform, 1 is delta function) of the " for f in filterlist: for m in [metric1, metric2]: plotDict = {"color": colors[f]} displayDict["order"] = filterorders[f] displayDict["caption"] = caption_root + f"{m.colname} for visits in {f} band." bundleList.append( mb.MetricBundle( m, healpixslicer, filtersqls[f], plot_dict=plotDict, display_dict=displayDict, summary_metrics=standardStats, plot_funcs=subsetPlots, ) ) ############## # SNe Ia ############## displayDict = { "group": "Cosmology", "subgroup": "5: SNe Ia", "order": 0, "caption": "Expected discoveries of SNeIa, using the SNNSNMetric.", } sne_nside = 16 sn_summary = [ metrics.MedianMetric(), metrics.MeanMetric(), metrics.SumMetric(metric_name="Total detected"), metrics.CountMetric(metric_name="Total on sky", mask_val=0), ] snslicer = slicers.HealpixSlicer(nside=sne_nside, use_cache=False) metric = metrics.SNNSNMetric( n_bef=3, n_aft=8, coadd_night=True, add_dust=False, hard_dust_cut=0.25, zmin=0.2, zmax=0.5, z_step=0.03, daymax_step=3.0, zlim_coeff=0.95, gamma_name="gamma_WFD.hdf5", verbose=False, ) plotDict = {"percentile_clip": 95, "n_ticks": 5} # Run without DDF observations bundle = mb.MetricBundle( metric, snslicer, "scheduler_note not like '%DD%'", plot_dict=plotDict, display_dict=displayDict, info_label="DDF excluded", summary_metrics=sn_summary, plot_funcs=subsetPlots, ) bundleList.append(bundle) ######################### ######################### # Variables and Transients ######################### ######################### # Periodic Stars displayDict = {"group": "Variables/Transients", "order": 0} # Tidal Disruption Events displayDict["subgroup"] = "TDE" displayDict["caption"] = "TDE lightcurves that could be identified" displayDict["order"] = 0 metric = maf.TdePopMetric(mjd0=mjd0) tdeslicer = maf.generate_tde_pop_slicer() bundle = mb.MetricBundle( metric, tdeslicer, "", run_name=runName, summary_metrics=lightcurve_summary(), display_dict=displayDict, ) bundleList.append(bundle) displayDict["caption"] = "TDE lightcurves quality" metric = maf.TdePopMetricQuality(metric_name="TDE_Quality") bundle = mb.MetricBundle( metric, tdeslicer, "", run_name=runName, summary_metrics=lightcurve_summary(), display_dict=displayDict, ) bundleList.append(bundle) # Microlensing events displayDict["subgroup"] = "Microlensing" plotDict = {"nside": 128} n_events = 10000 # Let's evaluate a variety of crossing times crossing_times = [ [1, 5], [5, 10], [10, 20], [20, 30], [30, 60], [60, 90], [100, 200], [200, 500], [500, 1000], ] metric = maf.MicrolensingMetric() summaryMetrics = maf.batches.lightcurve_summary() order = 0 for crossing in crossing_times: displayDict["caption"] = "Microlensing events with crossing times between %i to %i days." % ( crossing[0], crossing[1], ) displayDict["order"] = order order += 1 slicer = maf.generate_microlensing_slicer( min_crossing_time=crossing[0], max_crossing_time=crossing[1], n_events=n_events, ) bundleList.append( maf.MetricBundle( metric, slicer, None, run_name=runName, summary_metrics=summaryMetrics, info_label=f"tE {crossing[0]}_{crossing[1]} days", display_dict=displayDict, plot_dict=plotDict, plot_funcs=[plots.HealpixSkyMap()], ) ) if long_microlensing: n_events = 10000 # Let's evaluate a subset of the crossing times for these crossing_times = [ [10, 20], [20, 30], [30, 60], [200, 500], ] metric_Npts = maf.MicrolensingMetric(metric_calc="Npts") summaryMetrics = maf.batches.microlensing_summary(metric_type="Npts") order = 0 for crossing in crossing_times: slicer = maf.generate_microlensing_slicer( min_crossing_time=crossing[0], max_crossing_time=crossing[1], n_events=n_events, ) displayDict["caption"] = "Microlensing events with crossing times between %i to %i days." % ( crossing[0], crossing[1], ) displayDict["order"] = order order += 1 bundleList.append( maf.MetricBundle( metric_Npts, slicer, None, run_name=runName, summary_metrics=summaryMetrics, info_label=f"tE {crossing[0]}_{crossing[1]} days", display_dict=displayDict, plot_dict=plotDict, plot_funcs=[], ) ) metric_Fisher = maf.MicrolensingMetric(metric_calc="Fisher") summaryMetrics = maf.batches.microlensing_summary(metric_type="Fisher") order = 0 for crossing in crossing_times: displayDict["caption"] = "Microlensing events with crossing times between %i to %i days." % ( crossing[0], crossing[1], ) displayDict["order"] = order order += 1 slicer = maf.generate_microlensing_slicer( min_crossing_time=crossing[0], max_crossing_time=crossing[1], n_events=n_events, ) bundleList.append( maf.MetricBundle( metric_Fisher, slicer, None, run_name=runName, summary_metrics=summaryMetrics, info_label=f"tE {crossing[0]}_{crossing[1]} days", display_dict=displayDict, plot_dict=plotDict, plot_funcs=[], ) ) # Kilonovae metric displayDict["group"] = "Variables/Transients" displayDict["subgroup"] = "KNe" n_events = 10000 caption = f"KNe metric, injecting {n_events} lightcurves over the entire sky, GW170817-like only." caption += " Ignoring DDF observations." displayDict["caption"] = caption displayDict["order"] = 0 # Kilonova parameters inj_params_list = [ {"mej_dyn": 0.005, "mej_wind": 0.050, "phi": 30, "theta": 25.8}, ] filename = maf.get_kne_filename(inj_params_list) kneslicer = maf.generate_kn_pop_slicer(n_events=n_events, n_files=len(filename), d_min=10, d_max=600) metric = maf.KNePopMetric( output_lc=False, file_list=filename, metric_name="KNePopMetric_single", mjd0=mjd0, ) bundle = mb.MetricBundle( metric, kneslicer, "scheduler_note not like 'DD%'", run_name=runName, info_label="single model", summary_metrics=lightcurve_summary(), display_dict=displayDict, ) bundleList.append(bundle) n_events = 10000 caption = f"KNe metric, injecting {n_events} lightcurves over the entire sky, entire model population." caption += " Ignoring DDF observations." displayDict["caption"] = caption displayDict["order"] = 1 # Kilonova parameters filename = maf.get_kne_filename(None) kneslicer_allkne = maf.generate_kn_pop_slicer( n_events=n_events, n_files=len(filename), d_min=10, d_max=600 ) metric_allkne = maf.KNePopMetric(output_lc=False, file_list=filename, metric_name="KNePopMetric_all") bundle = mb.MetricBundle( metric_allkne, kneslicer_allkne, "scheduler_note not like 'DD%'", run_name=runName, info_label="all models", summary_metrics=lightcurve_summary(), display_dict=displayDict, ) bundleList.append(bundle) # General time intervals displayDict = { "group": "TimeGaps", "subgroup": "Time", "caption": None, "order": 0, } for f in filterlist: gaps = [3.0, 7.0, 24.0] for gap in gaps: summary_stats = [] summary_stats.append( metrics.AreaSummaryMetric( area=18000, reduce_func=np.median, decreasing=True, metric_name="Median N gaps in %s at %ihr in top 18k" % (f, gap), ) ) summary_stats.append( metrics.AreaSummaryMetric( area=18000, reduce_func=np.mean, decreasing=True, metric_name="Mean N gaps in %s at %ihr in top 18k" % (f, gap), ) ) summary_stats.append(metrics.MeanMetric()) summary_stats.append(metrics.MedianMetric()) m2 = metrics.GapsMetric( time_scale=gap, metric_name="Gaps_%ihr" % gap, ) plotFuncs = [plots.HealpixSkyMap(), plots.HealpixHistogram()] plotDict = {"color_min": 0, "color": colors[f], "percentile_clip": 95} displayDict["caption"] = ( "Number of times the timescale of ~%i hours is sampled in %s band(s)." % (gap, f) ) displayDict["order"] = filterorders[f] bundleList.append( mb.MetricBundle( m2, healpixslicer, constraint=filtersqls[f], info_label=filterinfo_label[f], run_name=runName, summary_metrics=summary_stats, plot_dict=plotDict, plot_funcs=plotFuncs, display_dict=displayDict, ) ) # XRB metric displayDict["subgroup"] = "XRB" displayDict["order"] = 0 displayDict["caption"] = "Number or characterization of XRBs." n_events = 10000 xrbslicer = maf.generate_xrb_pop_slicer(n_events=n_events) metric = maf.XRBPopMetric(output_lc=False, mjd0=mjd0) xrb_summaryMetrics = [ maf.SumMetric(metric_name="Total detected"), maf.CountMetric(metric_name="Total lightcurves in footprint"), maf.CountMetric(metric_name="Total lightcurves on sky", mask_val=0), maf.MeanMetric(metric_name="Fraction detected in footprint"), maf.MeanMetric(mask_val=0, metric_name="Fraction detected of total"), maf.MedianMetric(metric_name="Median"), maf.MeanMetric(metric_name="Mean"), ] bundleList.append( maf.MetricBundle( metric, xrbslicer, "", run_name=runName, summary_metrics=xrb_summaryMetrics, display_dict=displayDict, ) ) ######################### ######################### # Milky Way ######################### ######################### displayDict = {"group": "Milky Way", "subgroup": ""} # Brown Dwarf Volume displayDict["subgroup"] = "Brown Dwarf" displayDict["order"] = 0 l7_bd_mags = {"i": 20.09, "z": 18.18, "y": 17.13} displayDict["caption"] = ( f"The expected maximum distance at which an L7 brown dwarf with magnitude {l7_bd_mags} " f"would have a parallax SNR of 10.0. The summary statistic represents the volume enclosed by " f"the result of this metric (BDParallaxMetric)." ) sum_stats = [metrics.VolumeSumMetric(nside=nside)] metric = metrics.BDParallaxMetric(mags=l7_bd_mags, metric_name="Brown Dwarf, L7") sql = "" plotDict = {} bundleList.append( mb.MetricBundle( metric, healpixslicer, sql, plot_dict=plotDict, summary_metrics=sum_stats, display_dict=displayDict, run_name=runName, ) ) l4_bd_mags = {"i": 18.35, "z": 16.68, "y": 15.66} displayDict["caption"] = ( f"The expected maximum distance at which an L4 brown dwarf with magnitude {l4_bd_mags} " f"would have a parallax SNR of 10.0. The summary statistic represents the total volume enclosed " f"by the result of this metric (BDParallaxMetric)." ) metric = metrics.BDParallaxMetric(mags=l4_bd_mags, metric_name="Brown Dwarf, L4") bundleList.append( mb.MetricBundle( metric, healpixslicer, sql, plot_dict=plotDict, summary_metrics=sum_stats, display_dict=displayDict, run_name=runName, ) ) # Set the run_name for all bundles and return the bundleDict. for b in bundleList: b.set_run_name(runName) bundleDict = mb.make_bundles_dict_from_list(bundleList) return bundleDict