Source code for rubin_sim.maf.batches.moving_objects_batch

__all__ = (
    "ss_population_defaults",
    "quick_discovery_batch",
    "discovery_batch",
    "run_completeness_summary",
    "plot_completeness",
    "characterization_inner_batch",
    "characterization_outer_batch",
    "run_fraction_summary",
    "plot_fractions",
    "plot_single",
    "plot_activity",
    "read_and_combine",
    "combine_subsets",
)

import os
from copy import deepcopy

import matplotlib.pyplot as plt
import numpy as np
import numpy.ma as ma

import rubin_sim.maf.metric_bundles as mb
import rubin_sim.maf.metrics as metrics
import rubin_sim.maf.plots as plots
import rubin_sim.maf.stackers as stackers
from rubin_sim.maf.metric_bundles import MoMetricBundle

from .col_map_dict import col_map_dict
from .common import (
    fraction_population_at_threshold,
    summary_completeness_at_time,
    summary_completeness_over_h,
)


[docs] def ss_population_defaults(objtype): """Provide useful default ranges for H, based on objtype of population type. """ defaults = {} defaults["Vatira"] = { "h_range": [16, 28, 0.2], "h_mark": 22, "magtype": "asteroid", "char": "inner", } defaults["PHA"] = { "h_range": [16, 28, 0.2], "h_mark": 22, "magtype": "asteroid", "char": "inner", } defaults["NEO"] = { "h_range": [16, 28, 0.2], "h_mark": 22, "magtype": "asteroid", "char": "inner", } defaults["MBA"] = { "h_range": [16, 26, 0.2], "h_mark": 20, "magtype": "asteroid", "char": "inner", } defaults["MBC"] = { "h_range": [16, 26, 0.2], "h_mark": 20, "magtype": "asteroid", "char": "inner", } defaults["Trojan"] = { "h_range": [14, 22, 0.2], "h_mark": 18, "magtype": "asteroid", "char": "inner", } defaults["TNO"] = { "h_range": [4, 12, 0.2], "h_mark": 8, "magtype": "asteroid", "char": "outer", } defaults["SDO"] = { "h_range": [4, 12, 0.2], "h_mark": 8, "magtype": "asteroid", "char": "outer", } defaults["LPC"] = { "h_range": [6, 22, 0.5], "h_mark": 13, "magtype": "comet_oort", "char": "outer", } defaults["SPC"] = { "h_range": [4, 20, 0.5], "h_mark": 8, "magtype": "comet_short", "char": "outer", } defaults["generic"] = { "h_range": [4, 28, 0.5], "h_mark": 10, "magtype": "asteroid", "char": "inner", } # Some objtypes may be provided by other names if objtype.upper().startswith("GRANVIK"): objtype = "NEO" if objtype.upper().startswith("L7"): objtype = "TNO" if objtype.upper().startswith("OCC"): objtype = "LPC" if objtype not in defaults: print( f"## Could not find {objtype} in default keys ({defaults.keys()}). \n" f"## Using generic default values instead." ) objtype = "generic" return defaults[objtype]
[docs] def quick_discovery_batch( slicer, colmap=None, run_name="run_name", detection_losses="detection", objtype="", albedo=None, h_mark=None, np_reduce=np.mean, constraint_info_label="", constraint=None, magtype="asteroid", ): """A subset of discovery metrics, using only the default discovery criteria of 3 pairs in 15 or 30 nights. """ if colmap is None: colmap = col_map_dict() bundleList = [] basicPlotDict = { "albedo": albedo, "h_mark": h_mark, "np_reduce": np_reduce, "nxbins": 200, "nybins": 200, } plot_funcs = [plots.MetricVsH()] display_dict = {"group": f"{objtype}", "subgroup": "Discovery"} if constraint_info_label == "" and constraint is not None: constraint_info_label = constraint.replace("filter", "").replace("==", "").replace(" ", " ") info_label = objtype + " " + constraint_info_label info_label = info_label.rstrip(" ") if detection_losses not in ("detection", "trailing"): raise ValueError("Please choose detection or trailing as options for detection_losses.") if detection_losses == "trailing": magStacker = stackers.MoMagStacker(loss_col="dmag_trail", magtype=magtype) detection_losses = " trailing loss" else: magStacker = stackers.MoMagStacker(loss_col="dmag_detect", magtype=magtype) detection_losses = " detection loss" # Set up a dictionary to pass to each metric for the column names. colkwargs = { "mjd_col": colmap["mjd"], "seeing_col": colmap["seeingGeom"], "exp_time_col": colmap["exptime"], "m5_col": colmap["fiveSigmaDepth"], "night_col": colmap["night"], "filter_col": colmap["filter"], } def _setup_child_metrics(parentMetric): childMetrics = {} childMetrics["Time"] = metrics.DiscoveryTimeMetric(parentMetric, **colkwargs) childMetrics["N_Chances"] = metrics.DiscoveryNChancesMetric(parentMetric, **colkwargs) # Could expand to add N_chances per year, but not really necessary. return childMetrics def _configure_child_bundles(parentBundle): dispDict = { "group": f"{objtype}", "subgroup": "Completeness Over Time", "caption": "Time of discovery of objects", "order": 0, } parentBundle.child_bundles["Time"].set_display_dict(dispDict) dispDict = { "group": f"{objtype}", "subgroup": "N Chances", "caption": "Number of chances for discovery of objects", "order": 0, } parentBundle.child_bundles["N_Chances"].set_display_dict(dispDict) return t_min = 5.0 / 60.0 / 24.0 t_max = 90.0 / 60.0 / 24.0 # 3 pairs in 15 md = info_label + " 3 pairs in 15 nights" + detection_losses # Set up plot dict. plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.DiscoveryMetric( n_obs_per_night=2, t_min=t_min, t_max=t_max, n_nights_per_window=3, t_window=15, **colkwargs, ) childMetrics = _setup_child_metrics(metric) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, child_metrics=childMetrics, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) _configure_child_bundles(bundle) bundleList.append(bundle) # 3 pairs in 30 md = info_label + " 3 pairs in 30 nights" + detection_losses plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.DiscoveryMetric( n_obs_per_night=2, t_min=t_min, t_max=t_max, n_nights_per_window=3, t_window=30, **colkwargs, ) childMetrics = _setup_child_metrics(metric) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, child_metrics=childMetrics, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) _configure_child_bundles(bundle) bundleList.append(bundle) # Set the run_name for all bundles and return the bundleDict. for b in bundleList: b.set_run_name(run_name) return mb.make_bundles_dict_from_list(bundleList)
[docs] def discovery_batch( slicer, colmap=None, run_name="run_name", detection_losses="detection", objtype="", albedo=None, h_mark=None, np_reduce=np.mean, constraint_info_label="", constraint=None, magtype="asteroid", ): """A comprehensive set of discovery metrics, using a wide range of discovery criteria. """ if colmap is None: colmap = col_map_dict() bundleList = [] basicPlotDict = { "albedo": albedo, "h_mark": h_mark, "np_reduce": np_reduce, "nxbins": 200, "nybins": 200, } plot_funcs = [plots.MetricVsH()] display_dict = {"group": f"{objtype}", "subgroup": "Discovery"} if constraint_info_label == "" and constraint is not None: constraint_info_label = constraint.replace("filter", "").replace("==", "").replace(" ", " ") info_label = objtype + " " + constraint_info_label info_label = info_label.rstrip(" ") if detection_losses not in ("detection", "trailing"): raise ValueError("Please choose detection or trailing as options for detection_losses.") if detection_losses == "trailing": # These are the SNR-losses only. magStacker = stackers.MoMagStacker(loss_col="dmag_trail", magtype=magtype) detection_losses = " trailing loss" else: # SNR losses, plus additional loss due to detecting with stellar PSF. magStacker = stackers.MoMagStacker(loss_col="dmag_detect", magtype=magtype) detection_losses = " detection loss" # Set up a dictionary to pass to each metric for the column names. colkwargs = { "mjd_col": colmap["mjd"], "seeing_col": colmap["seeingGeom"], "exp_time_col": colmap["exptime"], "m5_col": colmap["fiveSigmaDepth"], "night_col": colmap["night"], "filter_col": colmap["filter"], } def _setup_child_metrics(parentMetric): childMetrics = {} childMetrics["Time"] = metrics.DiscoveryTimeMetric(parentMetric, **colkwargs) childMetrics["N_Chances"] = metrics.DiscoveryNChancesMetric(parentMetric, **colkwargs) # Could expand to add N_chances per year, but not really necessary. return childMetrics def _configure_child_bundles(parentBundle): dispDict = { "group": f"{objtype}", "subgroup": "Completeness Over Time", "caption": "Time of discovery of objects", "order": 0, } parentBundle.child_bundles["Time"].set_display_dict(dispDict) dispDict = { "group": f"{objtype}", "subgroup": "N Chances", "caption": "Number of chances for discovery of objects", "order": 0, } parentBundle.child_bundles["N_Chances"].set_display_dict(dispDict) t_min = 5.0 / 60.0 / 24.0 t_max = 90.0 / 60.0 / 24.0 # 3 pairs in 15 and 3 pairs in 30 done in 'quickDiscoveryBatch' (with vis). # 4 pairs in 20 md = info_label + " 4 pairs in 20 nights" + detection_losses plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.DiscoveryMetric( n_obs_per_night=2, t_min=t_min, t_max=t_max, n_nights_per_window=4, t_window=20, **colkwargs, ) childMetrics = _setup_child_metrics(metric) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, child_metrics=childMetrics, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) _configure_child_bundles(bundle) bundleList.append(bundle) # 3 triplets in 30 md = info_label + " 3 triplets in 30 nights" + detection_losses plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.DiscoveryMetric( n_obs_per_night=3, t_min=t_min, t_max=120.0 / 60.0 / 24.0, n_nights_per_window=3, t_window=30, **colkwargs, ) childMetrics = _setup_child_metrics(metric) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, child_metrics=childMetrics, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) _configure_child_bundles(bundle) bundleList.append(bundle) # 1 quad md = info_label + " 1 quad in 1 night" + detection_losses plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.DiscoveryMetric( n_obs_per_night=4, t_min=t_min, t_max=150.0 / 60.0 / 24.0, n_nights_per_window=1, t_window=2, **colkwargs, ) childMetrics = _setup_child_metrics(metric) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, child_metrics=childMetrics, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) _configure_child_bundles(bundle) bundleList.append(bundle) # Play with SNR. # First standard SNR / probabilistic visibility (SNR~5) # 3 pairs in 15 md = info_label + " 3 pairs in 15 nights SNR=5" + detection_losses # Set up plot dict. plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.DiscoveryMetric( n_obs_per_night=2, t_min=t_min, t_max=t_max, n_nights_per_window=3, t_window=15, snr_limit=5, **colkwargs, ) childMetrics = _setup_child_metrics(metric) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, child_metrics=childMetrics, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) _configure_child_bundles(bundle) bundleList.append(bundle) # 3 pairs in 15, SNR=4. md = info_label + " 3 pairs in 15 nights SNR=4" + detection_losses plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.DiscoveryMetric( n_obs_per_night=2, t_min=t_min, t_max=t_max, n_nights_per_window=3, t_window=15, snr_limit=4, **colkwargs, ) childMetrics = _setup_child_metrics(metric) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, child_metrics=childMetrics, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) _configure_child_bundles(bundle) bundleList.append(bundle) # 3 pairs in 15, SNR=3 md = info_label + " 3 pairs in 15 nights SNR=3" + detection_losses plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.DiscoveryMetric( n_obs_per_night=2, t_min=t_min, t_max=t_max, n_nights_per_window=3, t_window=15, snr_limit=3, **colkwargs, ) childMetrics = _setup_child_metrics(metric) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, child_metrics=childMetrics, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) _configure_child_bundles(bundle) bundleList.append(bundle) # SNR = 0 # 3 pairs in 15, SNR=0 md = info_label + " 3 pairs in 15 nights SNR=0" + detection_losses plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.DiscoveryMetric( n_obs_per_night=2, t_min=t_min, t_max=t_max, n_nights_per_window=3, t_window=15, snr_limit=0, **colkwargs, ) childMetrics = _setup_child_metrics(metric) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, child_metrics=childMetrics, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) _configure_child_bundles(bundle) bundleList.append(bundle) # Play with weird strategies. # Single detection. md = info_label + " Single detection" + detection_losses plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.DiscoveryMetric( n_obs_per_night=1, t_min=t_min, t_max=t_max, n_nights_per_window=1, t_window=5, **colkwargs, ) childMetrics = _setup_child_metrics(metric) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, child_metrics=childMetrics, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) _configure_child_bundles(bundle) bundleList.append(bundle) # Single pair of detections. md = info_label + " Single pair" + detection_losses plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.DiscoveryMetric( n_obs_per_night=2, t_min=t_min, t_max=t_max, n_nights_per_window=1, t_window=5, **colkwargs, ) childMetrics = _setup_child_metrics(metric) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, child_metrics=childMetrics, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) _configure_child_bundles(bundle) bundleList.append(bundle) # High velocity discovery. md = info_label + " High velocity pair" + detection_losses plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.HighVelocityNightsMetric(psf_factor=2.0, n_obs_per_night=2, **colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) # "magic" detection - 6 in 60 days. md = info_label + " 6 detections in 60 nights" + detection_losses plotDict = {"title": "%s: %s" % (run_name, md)} plotDict.update(basicPlotDict) metric = metrics.MagicDiscoveryMetric(n_obs=6, t_window=60, **colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=[magStacker], run_name=run_name, info_label=md, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) # Set the run_name for all bundles and return the bundleDict. for b in bundleList: b.set_run_name(run_name) return mb.make_bundles_dict_from_list(bundleList)
[docs] def run_completeness_summary(bdict, h_mark, times, out_dir, results_db): """Calculate completeness and create completeness bundles from all N_Chances and Time (child) metrics of the (discovery) bundles in bdict, and write completeness at h_mark to results_db, save bundle to disk. This should be done after combining any sub-sets of the metric results. Parameters ---------- bdict : `dict` of `maf.MetricBundle` Dict containing ~rubin_sim.maf.MoMetricBundles, including bundles we're expecting to contain completeness. h_mark : `float` h_mark value to add to completeness plotting dict. If not defined (None), then the h_mark from the plotdict from the metric bundle will be used if available. If None and h_mark not in plot_dict, then median of h_range values will be used. times : `np.ndarray` The times at which to calculate completeness (over time). out_dir : `str` Output directory to save completeness bundles to disk. results_db : `maf.ResultsDb` Results database to save information about completeness bundle. Returns ------- metricDict : `dict` of `maf.MetricBundles` A dictionary of the new completeness bundles. Keys match original keys, with additions of "[Differential,Cumulative]Completeness@Time" and "[Differential,Cumulative]Completeness" to distinguish new entries. """ # Add completeness bundles and write completeness at h_mark to results_db. completeness = {} def _compbundles(b, bundle, h_mark, results_db): # Find h_mark if not set (this may be different for different bundles). if h_mark is None and "h_mark" in bundle.plot_dict: h_mark = bundle.plot_dict["h_mark"] if h_mark is None: h_mark = np.median(bundle.slicer.slice_points["H"]) # Set up the summary metrics. summaryTimeMetrics = summary_completeness_at_time(times, h_val=h_mark, h_index=0.33) summaryTimeMetrics2 = summary_completeness_at_time(times, h_val=h_mark - 2, h_index=0.33) summaryHMetrics = summary_completeness_over_h(requiredChances=1, Hindex=0.33) comp = {} # Bundle = single metric bundle. # Add differential and cumulative completeness. if "Time" in bundle.metric.name: for metric in summaryTimeMetrics: newkey = b + " " + metric.name comp[newkey] = mb.make_completeness_bundle(bundle, metric, h_mark=None, results_db=results_db) comp[newkey].plot_dict["times"] = times comp[newkey].plot_dict["h_val"] = metric.hval for metric in summaryTimeMetrics2: newkey = b + " " + metric.name comp[newkey] = mb.make_completeness_bundle(bundle, metric, h_mark=None, results_db=results_db) comp[newkey].plot_dict["times"] = times comp[newkey].plot_dict["h_val"] = metric.hval elif "NChances" in bundle.metric.name or "N_Chances" in bundle.metric.name: for metric in summaryHMetrics: newkey = b + " " + metric.name comp[newkey] = mb.make_completeness_bundle( bundle, metric, h_mark=h_mark, results_db=results_db ) elif "MagicDiscovery" in bundle.metric.name: for metric in summaryHMetrics: newkey = b + " " + metric.name comp[newkey] = mb.make_completeness_bundle( bundle, metric, h_mark=h_mark, results_db=results_db ) elif "HighVelocity" in bundle.metric.name: for metric in summaryHMetrics: newkey = b + " " + metric.name comp[newkey] = mb.make_completeness_bundle( bundle, metric, h_mark=h_mark, results_db=results_db ) return comp # Generate the completeness bundles for the various discovery metrics. for b, bundle in bdict.items(): if "Discovery" in bundle.metric.name: completeness.update(_compbundles(b, bundle, h_mark, results_db)) if "MagicDiscovery" in bundle.metric.name: completeness.update(_compbundles(b, bundle, h_mark, results_db)) if "HighVelocity" in bundle.metric.name: completeness.update(_compbundles(b, bundle, h_mark, results_db)) # Write the completeness bundles to disk, so we can re-read them later. # (also set the display dict properties, for the results_db output). for b, bundle in completeness.items(): bundle.display_dict["subgroup"] = "Completeness" bundle.write(out_dir=out_dir, results_db=results_db) # Calculate total number of objects - currently for NEOs and PHAs only for b, bundle in completeness.items(): if "DifferentialCompleteness" in b and "@Time" not in b: if "NEO" in bundle.info_label: nobj_metrics = [ metrics.TotalNumberSSO(h_mark=22, dndh_func=metrics.neo_dndh_granvik), metrics.TotalNumberSSO(h_mark=25, dndh_func=metrics.neo_dndh_granvik), ] bundle.set_summary_metrics(nobj_metrics) bundle.compute_summary_stats(results_db) if "PHA" in bundle.info_label: nobj_metrics = [metrics.TotalNumberSSO(h_mark=22, dndh_func=metrics.pha_dndh_granvik)] bundle.set_summary_metrics(nobj_metrics) bundle.compute_summary_stats(results_db) return completeness
[docs] def plot_completeness( bdictCompleteness, figroot=None, run_name=None, results_db=None, out_dir=".", fig_format="pdf", ): """Plot a minor subset of the completeness results.""" # Separate some subsets to plot together - # first just the simple 15 and 30 night detection loss metrics. keys = [ "3_pairs_in_30_nights_detection_loss", "3_pairs_in_15_nights_detection_loss", ] plotTimes = {} plotComp = {} plotDiff = {} for k in bdictCompleteness: for key in keys: if key in k: if "Time" in k: if "Cumulative" in k: plotTimes[k] = bdictCompleteness[k] elif "Chances" in k: if "Differential" in k: plotDiff[k] = bdictCompleteness[k] elif "Cumulative" in k: plotComp[k] = bdictCompleteness[k] # Add plot dictionaries to code 30 nights red, 15 nights blue, # differentials dotted. def _codePlot(key): plotDict = {} if "Differential" in k: plotDict["linestyle"] = ":" else: plotDict["linestyle"] = "-" if "30_nights" in k: plotDict["color"] = "r" if "15_nights" in k: plotDict["color"] = "b" return plotDict # Apply color-coding. for k, b in plotTimes.items(): b.set_plot_dict(_codePlot(k)) for k, b in plotDiff.items(): b.set_plot_dict(_codePlot(k)) for k, b in plotComp.items(): b.set_plot_dict(_codePlot(k)) first = bdictCompleteness[list(bdictCompleteness.keys())[0]] if run_name is None: run_name = first.run_name if figroot is None: figroot = run_name display_dict = deepcopy(first.display_dict) # Plot completeness as a function of time. # Make custom plot, then save it with PlotHandler. fig = plt.figure(figsize=(8, 6)) for k in plotTimes: plt.plot( plotTimes[k].plot_dict["times"], plotTimes[k].metric_values[0, :], label=plotTimes[k].plot_dict["label"] + " @H=%.2f" % plotTimes[k].plot_dict["h_val"], ) plt.legend() plt.xlabel("Time (MJD)") plt.ylabel("Completeness") plt.grid(True, alpha=0.3) # Make a PlotHandler to deal with savings/results_db, etc. ph = plots.PlotHandler(fig_format=fig_format, results_db=results_db, out_dir=out_dir) display_dict["subgroup"] = "Completeness over time" display_dict["caption"] = "Completeness over time, for H values indicated in legend." ph.save_fig( fig, f"{figroot}_CompletenessOverTime", "Combo", "CompletenessOverTime", "MoObjSlicer", figroot, None, None, display_dict=display_dict, ) plt.savefig( os.path.join(out_dir, f"{figroot}_CompletenessOverTime.{fig_format}"), format=fig_format, ) # Plot cumulative completeness. ph = plots.PlotHandler(fig_format=fig_format, results_db=results_db, out_dir=out_dir) ph.set_metric_bundles(plotComp) plotDict = {"ylabel": "Completeness", "figsize": (8, 6), "albedo": 0.14} ph.plot( plot_func=plots.MetricVsH(), plot_dicts=plotDict, outfile_root=figroot + "_CumulativeCompleteness", ) # Plot differential completeness. ph = plots.PlotHandler(fig_format=fig_format, results_db=results_db, out_dir=out_dir) ph.set_metric_bundles(plotDiff) plotDict = {"ylabel": "Completeness", "figsize": (8, 6)} ph.plot( plot_func=plots.MetricVsH(), plot_dicts=plotDict, outfile_root=figroot + "_DifferentialCompleteness", ) # And add the rest of the completeness calculations. allComp = [] for k in bdictCompleteness: if "DiscoveryNChances" in k: if "Cumulative" in k: allComp.append(bdictCompleteness[k]) if "Magic" in k: if "Cumulative" in k: allComp.append(bdictCompleteness[k]) ph = plots.PlotHandler(fig_format=fig_format, results_db=results_db, out_dir=out_dir) ph.set_metric_bundles(allComp) plotDict = { "ylabel": "Completeness", "figsize": (8, 6), "legendloc": (1.01, 0.1), "color": None, } display_dict["subgroup"] = "Completeness all criteria" display_dict["caption"] = "Plotting all of the cumulative completeness curves together." ph.plot( plot_func=plots.MetricVsH(), plot_dicts=plotDict, display_dict=display_dict, outfile_root=figroot + "_Many_CumulativeCompleteness", )
[docs] def characterization_inner_batch( slicer, colmap=None, run_name="run_name", objtype="", magtype="asteroid", albedo=None, h_mark=None, constraint_info_label="", constraint=None, npReduce=np.mean, windows=None, bins=None, ): """Characterization metrics for inner solar system objects.""" if colmap is None: colmap = col_map_dict() bundleList = [] # Set up a dictionary to pass to each metric for the column names. colkwargs = { "mjd_col": colmap["mjd"], "seeing_col": colmap["seeingGeom"], "exp_time_col": colmap["exptime"], "m5_col": colmap["fiveSigmaDepth"], "night_col": colmap["night"], "filter_col": colmap["filter"], } basicPlotDict = { "albedo": albedo, "h_mark": h_mark, "np_reduce": npReduce, "nxbins": 200, "nybins": 200, } plot_funcs = [plots.MetricVsH()] if constraint_info_label == "" and constraint is not None: constraint_info_label = constraint.replace("filter", "").replace("==", "").replace(" ", " ") info_label = objtype + " " + constraint_info_label info_label = info_label.rstrip(" ") display_dict = {"group": f"{objtype}"} # Stackers magStacker = stackers.MoMagStacker(loss_col="dmag_detect", magtype=magtype) eclStacker = stackers.EclStacker() stackerList = [magStacker, eclStacker] # Windows are the different 'length of activity' if windows is None: windows = np.arange(10, 200, 30.0) # Bins are the different 'anomaly variations' of activity if bins is None: bins = np.arange(5, 185, 20.0) # Number of observations. md = info_label display_dict["subgroup"] = "N Obs" plotDict = { "ylabel": "Number of observations (#)", "title": "%s: Number of observations %s" % (run_name, md), } plotDict.update(basicPlotDict) metric = metrics.NObsMetric(**colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=stackerList, run_name=run_name, info_label=md, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) # Observational arc. md = info_label display_dict["subgroup"] = "Obs Arc" plotDict = { "ylabel": "Observational Arc (days)", "title": "%s: Observational Arc Length %s" % (run_name, md), } plotDict.update(basicPlotDict) metric = metrics.ObsArcMetric(**colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=stackerList, run_name=run_name, info_label=md, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) # Activity detection. display_dict["subgroup"] = "Activity" for w in windows: md = info_label + " activity lasting %.0f days" % w plotDict = { "title": "%s: Chances of detecting %s" % (run_name, md), "ylabel": "Probability of detection per %.0f day window" % w, } metric_name = "Chances of detecting activity lasting %.0f days" % w metric = metrics.ActivityOverTimeMetric(w, metric_name=metric_name, **colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=stackerList, run_name=run_name, info_label=info_label, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) for b in bins: md = info_label + " activity covering %.0f deg" % (b) plotDict = { "title": "%s: Chances of detecting %s" % (run_name, md), "ylabel": "Probability of detection per %.0f deg window" % b, } metric_name = "Chances of detecting activity covering %.0f deg" % (b) metric = metrics.ActivityOverPeriodMetric(b, metric_name=metric_name, **colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=stackerList, run_name=run_name, info_label=info_label, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) # Lightcurve inversion. md = info_label display_dict["subgroup"] = "Color/Inversion" plotDict = { "y_min": 0, "y_max": 1, "ylabel": "Fraction of objects", "title": "%s: Fraction with potential lightcurve inversion %s" % (run_name, md), } plotDict.update(basicPlotDict) metric = metrics.LightcurveInversionAsteroidMetric(**colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=stackerList, run_name=run_name, info_label=md, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) # Color determination. md = info_label plotDict = { "y_min": 0, "y_max": 1, "ylabel": "Fraction of objects", "title": "%s: Fraction of population with colors in X filters %s" % (run_name, md), } plotDict.update(basicPlotDict) metric = metrics.ColorAsteroidMetric(**colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=stackerList, run_name=run_name, info_label=md, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) # Set the run_name for all bundles and return the bundleDict. for b in bundleList: b.set_run_name(run_name) return mb.make_bundles_dict_from_list(bundleList)
[docs] def characterization_outer_batch( slicer, colmap=None, run_name="run_name", objtype="", magtype="asteroid", albedo=None, h_mark=None, constraint_info_label="", constraint=None, npReduce=np.mean, windows=None, bins=None, ): """Characterization metrics for outer solar system objects.""" if colmap is None: colmap = col_map_dict() bundleList = [] # Set up a dictionary to pass to each metric for the column names. colkwargs = { "mjd_col": colmap["mjd"], "seeing_col": colmap["seeingGeom"], "exp_time_col": colmap["exptime"], "m5_col": colmap["fiveSigmaDepth"], "night_col": colmap["night"], "filter_col": colmap["filter"], } basicPlotDict = { "albedo": albedo, "h_mark": h_mark, "np_reduce": npReduce, "nxbins": 200, "nybins": 200, } plot_funcs = [plots.MetricVsH()] if constraint_info_label == "" and constraint is not None: constraint_info_label = constraint.replace("filter", "").replace("==", "").replace(" ", " ") info_label = objtype + " " + constraint_info_label info_label = info_label.rstrip(" ") display_dict = {"group": f"{objtype}"} # Stackers magStacker = stackers.MoMagStacker(loss_col="dmag_detect", magtype=magtype) eclStacker = stackers.EclStacker() stackerList = [magStacker, eclStacker] # Windows are the different 'length of activity' if windows is None: windows = np.arange(10, 200, 30.0) # Bins are the different 'anomaly variations' of activity if bins is None: bins = np.arange(5, 185, 20.0) # Number of observations. md = info_label display_dict["subgroup"] = "N Obs" plotDict = { "ylabel": "Number of observations (#)", "title": "%s: Number of observations %s" % (run_name, md), } plotDict.update(basicPlotDict) metric = metrics.NObsMetric(**colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=stackerList, run_name=run_name, info_label=md, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) # Observational arc. md = info_label display_dict["subgroup"] = "Obs Arc" plotDict = { "ylabel": "Observational Arc (days)", "title": "%s: Observational Arc Length %s" % (run_name, md), } plotDict.update(basicPlotDict) metric = metrics.ObsArcMetric(**colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=stackerList, run_name=run_name, info_label=md, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) # Activity detection. display_dict["subgroup"] = "Activity" for w in windows: md = info_label + " activity lasting %.0f days" % w plotDict = { "title": "%s: Chances of detecting %s" % (run_name, md), "ylabel": "Probability of detection per %.0f day window" % w, } metric_name = "Chances of detecting activity lasting %.0f days" % w metric = metrics.ActivityOverTimeMetric(w, metric_name=metric_name, **colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=stackerList, run_name=run_name, info_label=info_label, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) for b in bins: md = info_label + " activity covering %.0f deg" % (b) plotDict = { "title": "%s: Chances of detecting %s" % (run_name, md), "ylabel": "Probability of detection per %.2f deg window" % b, } metric_name = "Chances of detecting activity covering %.0f deg" % (b) metric = metrics.ActivityOverPeriodMetric(b, metric_name=metric_name, **colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=stackerList, run_name=run_name, info_label=info_label, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) # Color determination. md = info_label display_dict["subgroup"] = "Color/Inversion" plotDict = { "y_min": 0, "y_max": 1, "ylabel": "Fraction of objects", "title": "%s: Fraction of population with colors in X filters %s" % (run_name, md), } plotDict.update(basicPlotDict) metric = metrics.LightcurveColorOuterMetric(**colkwargs) bundle = MoMetricBundle( metric, slicer, constraint, stacker_list=stackerList, run_name=run_name, info_label=md, plot_dict=plotDict, plot_funcs=plot_funcs, display_dict=display_dict, ) bundleList.append(bundle) # Set the run_name for all bundles and return the bundleDict. for b in bundleList: b.set_run_name(run_name) return mb.make_bundles_dict_from_list(bundleList)
[docs] def run_fraction_summary(bdict, h_mark, out_dir, results_db): """ Calculate fractional completeness of the population for color and lightcurve metrics. This should be done after combining any sub-sets of the metric results. Parameters ---------- bdict : `dict` of `maf.MoMetricBundle` Dict containing bundles contianing lightcurve/color evaluations. h_mark : `float` h_mark value to add to completeness plotting dict. If defined, this value is used. If None, but h_mark in plot_dict for metric, then this value (-2) is used. If h_mark not in plotdict, then use the median h_range value-2. times : `np.ndarray` The times at which to calculate completeness (over time). out_dir : `str` Output directory to save completeness bundles to disk. results_db : `maf.ResultsDb` Results database to save information about completeness bundle. Returns ------- metricDict : `dict` of `maf.MetricBundle` Dictionary of the metric bundles for the fractional evaluation of the population. """ fractions = {} # Look for metrics from asteroid or outer solar system # color/lightcurve metrics. inversionSummary = fraction_population_at_threshold([1], ["Lightcurve Inversion"]) asteroidColorSummary = fraction_population_at_threshold( [4, 3, 2, 1], ["6 of ugrizy", "5 of grizy", "4 of grizy", "2 of g, r or i, z or y"], ) asteroidSummaryMetrics = { "LightcurveInversionAsteroid": inversionSummary, "LightcurveInversion_Asteroid": inversionSummary, "ColorAsteroid": asteroidColorSummary, "Color_Asteroid": asteroidColorSummary, } outerColorSummary = fraction_population_at_threshold( [6, 5, 4, 3, 2, 1], ["6 filters", "5 filters", "4 filters", "3 filters", "2 filters", "1 filters"], ) outerSummaryMetrics = { "LightcurveColorOuter": outerColorSummary, "lightcurveColor_Outer": outerColorSummary, } for b, bundle in bdict.items(): # Find h_mark if not set (this may be different for different bundles). if h_mark is None and "h_mark" in bundle.plot_dict: h_mark = bundle.plot_dict["h_mark"] - 2 if h_mark is None: h_mark = np.median(bundle.slicer.slice_points["H"]) - 2 # Make sure we didn't push h_mark outside the range of H values if h_mark < bundle.slicer.slice_points["H"].min(): h_mark = bundle.slicer.slice_points["H"].min() for k in asteroidSummaryMetrics: if k in b: for summary_metric in asteroidSummaryMetrics[k]: newkey = b + " " + summary_metric.name fractions[newkey] = mb.make_completeness_bundle( bundle, summary_metric, h_mark=h_mark, results_db=results_db ) for k in outerSummaryMetrics: if k in b: for summary_metric in outerSummaryMetrics[k]: newkey = b + " " + summary_metric.name fractions[newkey] = mb.make_completeness_bundle( bundle, summary_metric, h_mark=h_mark, results_db=results_db ) # Write fractional populations bundles to disk, so we can re-read later. for b, bundle in fractions.items(): bundle.write(out_dir=out_dir, results_db=results_db) return fractions
def plot_fractions( bdictFractions, figroot=None, run_name=None, results_db=None, out_dir=".", fig_format="pdf", ): # Set colors for the fractions. for b in bdictFractions.values(): k = b.metric.name if "6" in k: b.plot_dict["color"] = "b" if "5" in k: b.plot_dict["color"] = "cyan" if "4" in k: b.plot_dict["color"] = "orange" if "2" in k: b.plot_dict["color"] = "r" if "1" in k: b.plot_dict["color"] = "magenta" if "Lightcurve Inversion" in k: b.plot_dict["color"] = "k" b.plot_dict["linestyle"] = ":" b.plot_dict["linewidth"] = 3 first = bdictFractions[list(bdictFractions.keys())[0]] if run_name is None: run_name = first.run_name if figroot is None: figroot = run_name display_dict = deepcopy(first.display_dict) display_dict["subgroup"] = "Characterization Fraction" ph = plots.PlotHandler(fig_format=fig_format, results_db=results_db, out_dir=out_dir) ph.set_metric_bundles(bdictFractions) ph.joint_metric_names = "Fraction of population for colors or lightcurve inversion" plotDict = {"ylabel": "Fraction of population", "figsize": (8, 6)} ph.plot( plot_func=plots.MetricVsH(), plot_dicts=plotDict, display_dict=display_dict, outfile_root=figroot + "_characterization", )
[docs] def plot_single(bundle, results_db=None, out_dir=".", fig_format="pdf"): """Plot 5%/25%/50%/75%/95% iles for a metric value.""" pDict = { "95%ile": { "color": "k", "linestyle": "--", "label": "95th %ile", "np_reduce": lambda x, axis: np.percentile(x, 95, axis=axis), }, "75%ile": { "color": "magenta", "linestyle": ":", "label": "75th %ile", "np_reduce": lambda x, axis: np.percentile(x, 75, axis=axis), }, "Median": { "color": "b", "linestyle": "-", "label": "Median", "np_reduce": lambda x, axis: np.median(x, axis=axis), }, "Mean": { "color": "g", "linestyle": "--", "label": "Mean", "np_reduce": np.mean, }, "25%ile": { "color": "magenta", "linestyle": ":", "label": "25th %ile", "np_reduce": lambda x, axis: np.percentile(x, 25, axis=axis), }, "5%ile": { "color": "k", "linestyle": "--", "label": "5th %ile", "np_reduce": lambda x, axis: np.percentile(x, 5, axis=axis), }, } ph = plots.PlotHandler(fig_format=fig_format, results_db=results_db, out_dir=out_dir) plot_bundles = [] plot_dicts = [] for percentile in pDict: plot_bundles.append(bundle) plot_dicts.append(pDict[percentile]) plot_dicts[0].update({"figsize": (8, 6), "legendloc": "upper right", "y_min": 0}) # Remove the h_mark line because these plots get complicated already. for r in plot_dicts: r["h_mark"] = None ph.set_metric_bundles(plot_bundles) ph.plot( plot_func=plots.MetricVsH(), plot_dicts=plot_dicts, display_dict=bundle.display_dict, )
def plot_not_found(nChances, h_mark): pass def plot_activity(bdict, figroot=None, results_db=None, out_dir=".", fig_format="pdf"): activity_deg = {} activity_days = {} for k in bdict: if "Chances_of_detecting_activity" in k: if "deg" in k: activity_deg[k] = bdict[k] if "days" in k: activity_days[k] = bdict[k] first = bdict[list(bdict.keys())[0]] if figroot is None: figroot = first.run_name display_dict = deepcopy(first.display_dict) if len(activity_days) > 0: # Plot (mean) likelihood of detection of activity over X days ph = plots.PlotHandler(fig_format=fig_format, results_db=results_db, out_dir=out_dir) ph.set_metric_bundles(activity_days) ph.joint_metric_names = "Chances of detecting activity lasting X days" plot_dict = {"ylabel": "Mean likelihood of detection", "figsize": (8, 6)} ph.plot( plot_func=plots.MetricVsH(), plot_dicts=plot_dict, display_dict=display_dict, outfile_root=figroot + "_activityDays", ) if len(activity_deg) > 0: # Plot mean likelihood of detection of activity over X amount of orbit ph = plots.PlotHandler(fig_format=fig_format, results_db=results_db, out_dir=out_dir) ph.set_metric_bundles(activity_deg) ph.joint_metric_names = "Chances of detecting activity covering X deg" plot_dict = {"ylabel": "Mean likelihood of detection", "figsize": (8, 6)} ph.plot( plot_func=plots.MetricVsH(), plot_dicts=plot_dict, display_dict=display_dict, outfile_root=figroot + "_activityDeg", )
[docs] def read_and_combine(orbitRoot, baseDir, splits, metricfile): """Read and combine the metric results from split locations, returning a single bundle. This will read the files from baseDir/orbitRoot_[split]/metricfile where split = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], etc. (the subsets the original orbit file was split into). Parameters ---------- orbitRoot : `str` The root of the orbit file - l7_5k, mbas_5k, etc. baseDir: `str` The root directory containing the subset directories. (e.g. '.' often) splits:` np.ndarray` or `list` of `ints` The integers describing the split directories (e.g. [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) metricfile: `str` The metric filename. Returns ------- metric_bundle : `~rubin_sim.maf.MoMetricBundle` A single metric bundle containing the combined data from the subsets. Note that this won't work for particularly complex metric values, such as the parent Discovery metrics. However, you can read and combine their child metrics, as for these we can propagate the data masks. """ subsets = {} for i in splits: subsets[i] = mb.create_empty_mo_metric_bundle() ddir = os.path.join(baseDir, f"{orbitRoot}_{i}") subsets[i].read(os.path.join(ddir, metricfile)) bundle = combine_subsets(subsets) return bundle
def combine_subsets(mbSubsets): # Combine the data from the subset metric bundles. # The first bundle will be used a template for the slicer. if isinstance(mbSubsets, dict): first = mbSubsets[list(mbSubsets.keys())[0]] else: first = mbSubsets[0] subsetdict = {} for i, b in enumerate(mbSubsets): subsetdict[i] = b mbSubsets = subsetdict joint = mb.create_empty_mo_metric_bundle() # Check if they're the same slicer. slicer = deepcopy(first.slicer) for i in mbSubsets: if np.any(slicer.slice_points["H"] != mbSubsets[i].slicer.slice_points["H"]): if np.any(slicer.slice_points["orbits"] != mbSubsets[i].slicer.slice_points["orbits"]): raise ValueError("Bundle %s has a different slicer than the first bundle" % (i)) # Join metric values. joint.slicer = slicer joint.metric = first.metric # Don't just use the slicer shape to define the metric_values, # because of CompletenessBundles. metric_values = np.zeros(first.metric_values.shape, float) metric_values_mask = np.zeros(first.metric_values.shape, bool) for i in mbSubsets: metric_values += mbSubsets[i].metric_values.filled(0) metric_values_mask = np.where(metric_values_mask & mbSubsets[i].metric_values.mask, True, False) joint.metricValues = ma.MaskedArray(data=metric_values, mask=metric_values_mask, fill_value=0) joint.info_label = first.info_label joint.run_name = first.run_name joint.file_root = first.file_root.replace(".npz", "") joint.plotDict = first.plotDict joint.display_dict = first.display_dict return joint