Source code for rubin_sim.maf.run_comparison.archive

"""Tools for use of project-generated opsim simulations and analysis.
"""

__all__ = (
    "get_metric_subsets",
    "create_metric_subset",
    "write_metric_subsets",
    "get_metric_summaries",
    "get_runs",
    "get_family_runs",
    "download_runs",
    "get_family_descriptions",
    "describe_families",
)


import copy
import os
import sys
import urllib
import warnings

import numpy as np
import pandas as pd

try:
    import IPython
except ModuleNotFoundError:
    pass

from rubin_sim.data import get_data_dir

from .summary_plots import plot_run_metric

FAMILY_SOURCE = os.environ.get(
    "RUBIN_SIM_FAMILY_SOURCE",
    "https://raw.githubusercontent.com/lsst-pst/survey_strategy/main/fbs_2.0/runs_v2.2.json",
)

SUMMARY_SOURCE = os.environ.get(
    "RUBIN_SIM_SUMMARY_SOURCE",
    "https://s3df.slac.stanford.edu/data/rubin/sim-data/sims_featureScheduler_runs3.4/maf/summary.h5",
)

if os.uname().nodename.endswith(".datalab.noao.edu"):
    DEFAULT_OPSIM_DB_DIR = "/sims_maf"
else:
    DEFAULT_OPSIM_DB_DIR = os.getcwd()
OPSIM_DB_DIR = os.environ.get("OPSIM_DB_DIR", DEFAULT_OPSIM_DB_DIR)

BY_RUN_COLS = ["run", "brief", "filepath", "url"]


[docs] def get_metric_subsets(metric_subset_source=None): """Get metadata on named subsets of related metrics. Parameters ---------- metric_subset_source : `str` or None File name or URL for the json file from which to load the data. If it is set to `None`, the data is loaded from `metric_subsets.json` in the $rubin_sim_data.maf directory. Returns ------- metric_subsets : `pandas.DataFrame` ``metric_subset`` The 1st level of the index is the name of a subset of metrics (`str`). ``metric`` The 2nd level of the index is the full name of the metric (`str`). ``metric`` The full name of the metric (`str`). ``short_name`` An abbreviated name for the metric (`str`).. ``style`` The ``matplotlib`` linestyle suggested for plots of the metric (`str`). ``invert`` When normalizing, invert the metric value first? (`bool`) ``mag`` Is the value an (astronomical) magnitude? (`bool`) """ if metric_subset_source is None: metric_subset_source = os.path.join(get_data_dir(), "maf", "metric_subsets.json") if isinstance(metric_subset_source, pd.DataFrame): metric_subsets = metric_subset_source else: metric_subsets = ( pd.read_json(metric_subset_source) .set_index("metric subset") .set_index("metric", append=True, drop=False) ) return metric_subsets
[docs] def create_metric_subset( metric_subset_name, metrics, short_name=None, style="-", invert=False, mag=False, ): """Create a DataFrame that defines a metric subset. Parameters ---------- metric_subset_name : `str` The name of the new metric subset. metrics : `list` [`str`] A list of metric names in the subset. short_name : `list` [`str`], optional A list of shorter metric names, by default None style : `list` [`str`], optional The matplotlib line style symbol for lines representing the metric, by default "-" invert : `list` [`bool`], optional Are smaller values of the metric better, such as for errors?, by default False mag : `list` [`bool`], optional Is the metric an astronomical magnitude?, by default False Returns ------- metric_subset : `pandas.DataFrame` A table of metrics and normalization and plotting flags defining the content of a metric subset. """ if short_name is None: short_name = metrics metric_subset = ( pd.DataFrame( { "metric subset": metric_subset_name, "metric": metrics, "short_name": short_name, "style": style, "invert": invert, "mag": mag, } ) .set_index("metric subset") .set_index("metric", append=True, drop=False) ) return metric_subset
[docs] def write_metric_subsets(metric_subset_file, metric_subsets): """Write an updated metric_subset dataframe to disk. Parameters ---------- metric_subset_file : `str` Output file name. metric_subsets : `pandas.DataFrame` Metric_subset dataframe, as defined in get_metric_subsets """ tmp = metric_subsets.reset_index("metric subset") tmp.to_json(metric_subset_file, orient="records", indent=2)
[docs] def get_metric_summaries( run_families=tuple(), metric_subsets=tuple(), runs=tuple(), metrics=tuple(), summary_source=None, runs_source=None, metric_subset_source=None, run_order="family", metric_order="summary", ): """Get summary metric values for a set of runs and metrics. Parameters ---------- run_families : iterable [`str`] Families of runs to include in the summary. metric_subsets : iterable [`str`] subsets of metrics to include in the summary. runs : iterable [`str`] Runs to include in the summary (in addition to any that are part of families included in ``run_families``). metrics : iterable [`str`] Metrics to include in the summary (in addition to any that are part of subsets included in ``metric_subsets``). summary_source : `str` or `pandas.DataFrame` File name or URL for the file from which to load the data. If the supplied value is a `pandas.DataFrame`, it the table returned will be a subset of this supplied table. run_source : `pandas.DataFrame` or `str` Either a `pandas.DataFrame` of runs metadata (as returned by `archive.get_runs`), or a file name or URL for the json file from which to load the run metadata. If it is set to `None`, the data is loaded from the URL specified by the `archive.RUNS_SOURCE` constant. metric_subset_source : `pandas.DataFrame` or `str` Either a `pandas.DataFrame` of metric subset specifications (as returned by `archive.get_metric_subsets`) or a file name for the json file from which to load the data. run_order : `str` Sort runs according to family definition ("family") or summary file ("summary") order. metric_order : `str` Sort metrics according to subset definition ("subset") or summary file ("summary") order. Returns ------- summaries : `pandas.DataFrame` Metric summary values are returned in a `pandas.DataFrame`, with each column providing the metrics for one run, and each row the values for one metric. The metric names constitute the index, and the column names are the canonical run names. Note ---- The entire summary statistic values for all of the runs and metrics can be downloaded from the default sources first, by simply calling .. code-block:: python summary = get_metric_summaries() Then, you can use `get_metric_summaries` to get a subset without redownloading the whole set by passing `summary_source=summary`. If you are selecting multiple subsets of the summary, this avoids needing to download the summary data multiple times. """ summary_source = SUMMARY_SOURCE if summary_source is None else summary_source runs = list(runs) metrics = list(metrics) if isinstance(run_families, str): run_families = [run_families] if isinstance(metric_subsets, str): metric_subsets = [metric_subsets] if isinstance(summary_source, pd.DataFrame): all_summaries = summary_source else: try: all_summaries = pd.read_csv(summary_source, index_col=0, low_memory=False) except UnicodeDecodeError: # then this was probably the h5 file instead all_summaries = pd.read_hdf(summary_source) all_summaries.index.name = "OpsimRun" if len(run_families) > 0: families = get_family_runs(runs_source) for run_family in run_families: runs.extend(pd.Series(families.loc[run_family, "run"]).tolist()) if len(metric_subsets) > 0: metric_subset_df = get_metric_subsets(metric_subset_source) for metric_subset in metric_subsets: metrics.extend(list(metric_subset_df.loc[metric_subset, "metric"])) if len(runs) == 0: runs = slice(None) else: if run_order == "summary": runs = [r for r in all_summaries.index if r in runs] if len(metrics) == 0: metrics = slice(None) else: requested_metrics = copy.copy(metrics) for metric in requested_metrics: if metric not in all_summaries.columns: warnings.warn(f'Metric "{metric}" not in summary, skipping') metrics.remove(metric) if metric_order == "summary": metrics = [m for m in all_summaries.columns if m in metrics] summaries = all_summaries.loc[runs, metrics] summaries.columns.name = "metric" summaries.index.name = "run" return summaries
[docs] def get_family_runs(run_source=None): """Load a data frame that supplies run names for each run family Parameters ---------- run_source : `None` or `str` File name or URL for the json file from which to load the metadata. If it is set to `None`, the data is loaded from the URL specified by the `archive.RUNS_SOURCE` constant. Returns ------- families : `pandas.DataFrame` ``families`` The index is the run family. (`str`) ``run`` the project-standard name for the run (`str`) ``OpsimGroup`` The name for the group to which the runs belong (`str`) ``OpsimComment`` Short description of the run (`str`) ``OpsimVersion`` Opsim version name (`str`) ``OpsimDate`` Date for the version of opsim (TODO: ?) ``brief`` A list of descriptions for the run. Runs may have a different description for each family it belongs to, so it a list of the same length as the families column (`list` [`str`]) ``url`` The URL from which the opsim output database for this run can be downloaded. Notes ----- Because runs can be members of multiple families, more than one row may provide metadata on the same run. The same content (in a different form) can be obtained using ``get_runs``. ``get_runs`` is more convenient when indexing by run; ``get_family_runs`` when indexing by family. """ run_source = FAMILY_SOURCE if run_source is None else run_source if isinstance(run_source, pd.DataFrame): runs = run_source else: families = pd.read_json(run_source, orient="index") families.index.name = "family" runs = families.explode(BY_RUN_COLS) return runs
[docs] def get_runs(run_source=None): """Load metadata on opsim runs into a `pandas.DataFrame`. Parameters ---------- run_source : `None` or `str` File name or URL for the json file from which to load the metadata. If it is set to `None`, the data is loaded from the URL specified by the `archive.RUNS_SOURCE` constant. Returns ------- runs : `pandas.DataFrame` ``run`` The index of the DataFrame is the project-standard name for the run (`str`) ``family`` A list of run families to which this run belongs (`list` [`str`]) ``version`` The simulation version ``brief`` A list of descriptions for the run. Runs may have a different description for each family it belongs to, so it a list of the same length as the families column (`list` [`str`]) ``filepath`` The file path, relative to a base opsim output directory. ``url`` The URL from which the opsim output database for this run can be downloaded. Notes ----- The same content (in a different form) can be obtained using ``get_family_runs``. ``get_runs`` is more convenient when indexing by run; ``get_family_runs`` when indexing by family. """ family_runs = get_family_runs(run_source) runs = ( family_runs.reset_index() .groupby(BY_RUN_COLS) .agg({c: list for c in family_runs.reset_index().columns if c not in BY_RUN_COLS}) .reset_index() .set_index("run") .loc[:, ["family", "version", "brief", "filepath", "url"]] ) return runs
[docs] def download_runs(runs, dest_dir=None, runs_source=None, clobber=False): """Download opsim visit databases for specified runs to a local directory. Parameters ---------- runs : `pandas.DataFrame` or iterable [`str`] If a `pandas.DataFrame` is provided, the `OpsimRun` column will be used to get run names, and data will be read from the url specified in the `url` column. If a collection of `str` is provided, these will be interpreted as run names supplied by data originating in the run metadata provided by the ``runs_source`` parameter. dest_dir : `str` The local directory into which to write downloaded visit databases. runs_source : `str` File name or URL for the json file from which to load the metadata. If it is set to `None`, the data is loaded from the URL specified by the `archive.RUNS_SOURCE` constant. This parameter is ignored if the ``runs`` parameter is set to a `pandas.DataFrame`. clobber : `bool` If ``False``, runs that would clobber an existing file will be skipped. If ``True``, existing files will be overwritten. Returns ------- runs : `pandas.DataFrame` Metadata on runs downloaded (in the same structure as the return of ``archive.get_runs``). """ if isinstance(runs, str): runs = [runs] if not isinstance(runs, pd.DataFrame): all_runs = get_runs(runs_source) runs = all_runs.loc[runs, :] if dest_dir is None: dest_dir = OPSIM_DB_DIR if not os.path.exists(dest_dir): raise FileNotFoundError(dest_dir) dest_fnames = pd.Series(name="fname", index=pd.Index([], name="OpsimRun"), dtype=object) for run_name, run in runs.iterrows(): dest_fnames[run_name] = os.path.join(dest_dir, run.filepath) # Create the directory if it does not exist os.makedirs(os.path.dirname(dest_fnames[run_name]), exist_ok=True) if clobber or not os.path.exists(dest_fnames[run_name]): urllib.request.urlretrieve(run.url, dest_fnames[run_name]) else: warnings.warn(f"{dest_fnames[run_name]} already exists; not downloading") return dest_fnames
[docs] def get_family_descriptions(family_source=None): """Get description of families or funs. Parameters ---------- family_source : `str File name or URL for the json file from which to load the family descriptinos. If it is set to `None`, the data is loaded from the URL specified by the `archive.FAMILY_SOURCE` constant. Returns ------- families : `pandas.DataFrame` Family descriptions, with comments. """ family_source = FAMILY_SOURCE if family_source is None else family_source if isinstance(family_source, pd.DataFrame): families = family_source else: families = pd.read_json(family_source, orient="index") families.index.name = "family" by_family_cols = [c for c in families.columns if c not in BY_RUN_COLS] families = families.loc[:, by_family_cols + BY_RUN_COLS] return families
[docs] def describe_families( families, summary=None, table_metric_subset=None, plot_metric_subset=None, baseline_run=None, round_table=2, ): """Display (in a jupyter on IPython notebook) family descirptions Parameters ---------- families : `pandas.DataFrame` Data family descriptions as returned by get_family_descriptions. summary : `pandas.DataFrame` Summary metrics for each run, as returned by get_metric_summaries. table_metric_subset : `pandas.DataFrame` Metadata on metrics to be included in the table, with columns and index as returned by get_metric_subsets. None if no metrics should be included in the table. plot_metric_subset : `pandas.DataFrame` Metadata on metrics to be included in the plot, with columns and index as returned by get_metric_subsets. None if no plot should be made. baseline_run : `str` The name of the run to use to normalize metrics in the plot. None if normalization should be skipped. round_table : `int`, opt Decimal places to which to round the table_metrics. Default 2. Returns ------- fig : `matplotlib.figure.Figure` The plot figure. ax : `matplotilb.axes.Axes` The plot axes. """ family_runs = families.explode(["run", "brief", "filepath"]).loc[:, ["run", "brief", "filepath"]] # If there is just one run in the family, we might # get a pd.Series back rather than a pd.DataFrame. # Make sure we have a DataFrame if isinstance(family_runs, pd.Series): family_runs = pd.DataFrame([family_runs]) for family_name, family in families.iterrows(): # Use awkward appending of each line to string rather # than a tripple quote to keep flake8 from complaining # about blanks at the end of lines, which are # meaningful in markdown (and desired here). description = "---\n" description += f"{family.description} \n" description += f"**version**: {family.version} \n" description += "**runs**: \n" these_runs = family_runs.loc[[family_name], :] if summary is not None: if table_metric_subset is not None: table_metric_summary = summary.loc[these_runs["run"], table_metric_subset["metric"]] table_metric_summary.rename(table_metric_subset["short_name"], axis=1, inplace=True) if round_table is not None: table_metric_summary = table_metric_summary.round(round_table) else: table_metric_summary = summary.loc[these_runs["run"]] these_runs = these_runs.join(table_metric_summary, on="run", how="left") num_columns = len(these_runs.columns) if num_columns > 5 and "filepath" in these_runs.columns: these_runs = these_runs.drop(columns=["filepath"]) with pd.option_context("display.max_colwidth", 0): if "IPython" in sys.modules: IPython.display.display_markdown(description, raw=True) IPython.display.display( IPython.display.HTML(these_runs.set_index("run").to_html().replace("\\n", "<br>")) ) else: print(description) print(these_runs.set_index("run")) if plot_metric_subset is not None: these_runs = family_runs["run"].values if baseline_run is not None and baseline_run not in these_runs: these_runs = np.concatenate([[baseline_run], these_runs]) these_metrics = [m for m in plot_metric_subset["metric"] if m in summary.columns] fig, ax = plot_run_metric( # pylint: disable=invalid-name summary.loc[these_runs, these_metrics], metric_subset=plot_metric_subset, metric_label_map=plot_metric_subset["short_name"], baseline_run=baseline_run, vertical_quantity="value", horizontal_quantity="run", ) else: fig, ax = None, None # pylint: disable=invalid-name return fig, ax