Source code for rubin_sim.sim_archive.make_snapshot

__all__ = [
    "get_scheduler",
    "save_scheduler",
    "add_make_scheduler_snapshot_args",
    "make_scheduler_snapshot_cli",
    "get_scheduler_from_config",
    "get_scheduler_instance_from_repo",
]

import argparse
import bz2
import gzip
import importlib.util
import logging
import lzma
import pickle
import sys
import types
import typing
from pathlib import Path
from tempfile import TemporaryDirectory

import numpy as np
from git import Repo
from rubin_scheduler.scheduler.example import example_scheduler
from rubin_scheduler.scheduler.schedulers.core_scheduler import CoreScheduler
from rubin_scheduler.scheduler.utils import SchemaConverter


[docs] def get_scheduler_from_config(config_script_path: str | Path) -> typing.Tuple[int, CoreScheduler]: """Generate a CoreScheduler according to a configuration in a file. Parameters ---------- config_script_path : `str` The configuration script path Returns ------- nside : `int` The nside. scheduler : `CoreScheduler` An instance of the Rubin Observatory FBS. Raises ------ ValueError If the config file is invalid, or has invalid content. """ # Follow example from official python docs: # https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly # The T&S supplied files have code in a __name__ == 'config' conditional # that we need to be executed, so we *must* name the module "config" config_module_name: str = "config" config_module_spec = importlib.util.spec_from_file_location(config_module_name, config_script_path) if config_module_spec is None or config_module_spec.loader is None: # Make type checking happy raise ValueError(f"Cannot load config file {config_script_path}") config_module: types.ModuleType = importlib.util.module_from_spec(config_module_spec) sys.modules[config_module_name] = config_module config_module_spec.loader.exec_module(config_module) try: scheduler: CoreScheduler = config_module.scheduler nside: int = scheduler.nside except NameError: nside, scheduler = config_module.get_scheduler() assert isinstance(nside, int) assert isinstance(scheduler, CoreScheduler) return nside, scheduler
[docs] def get_scheduler_instance_from_repo( config_repo: str, config_script: str, config_branch: str = "main", ) -> CoreScheduler: """Generate a CoreScheduler according to a configuration in git. Parameters ---------- config_repo : `str` The git repository with the configuration. config_script : `str` The configuration script path (relative to the repository root). config_branch : `str`, optional The branch of the repository to use, by default "main" Returns ------- scheduler : `CoreScheduler` An instance of the Rubin Observatory FBS. Raises ------ ValueError If the config file is invalid, or has invalid content. """ with TemporaryDirectory() as local_config_repo_parent: repo: Repo = Repo.clone_from(config_repo, local_config_repo_parent, branch=config_branch) full_config_script_path: Path = Path(repo.working_dir).joinpath(config_script) scheduler = get_scheduler_from_config(full_config_script_path)[1] return scheduler
[docs] def get_scheduler( config_repo: str | None = None, config_script: str | None = None, config_branch: str = "main", visits_db: str | None = None, ) -> CoreScheduler: """Generate a CoreScheduler according to a configuration. Parameters ---------- config_repo : `str` The git repository with the configuration. config_script : `str` The configuration script path (relative to the repository root). config_branch : `str`, optional The branch of the repository to use, by default "main" visits_db : `str` or `None` Database from which to load pre-existing visits Returns ------- scheduler : `CoreScheduler` An instance of the Rubin Observatory FBS. Raises ------ ValueError If the config file is invalid, or has invalid content. """ if config_repo is not None and len(config_repo) > 0: if config_script is None: raise ValueError("If the config repo is set, the script must be as well.") logging.info( f"Instantiating scheduler form {config_script} on the {config_branch} branch of {config_repo}" ) scheduler = get_scheduler_instance_from_repo( config_repo=config_repo, config_script=config_script, config_branch=config_branch ) elif config_script is not None: logging.info(f"Reading scheduler from {config_script}") scheduler = get_scheduler_from_config(config_script)[1] else: logging.info("Creating example scheduler") example_scheduler_result = example_scheduler() if isinstance(example_scheduler_result, CoreScheduler): scheduler = example_scheduler_result else: # It might return a observatory, scheduler, observations tuple # instead. scheduler = example_scheduler_result[1] if isinstance(visits_db, str) and len(visits_db) > 0: logging.info(f"Adding visits from {visits_db}.") obs: np.recarray = SchemaConverter().opsim2obs(visits_db) if len(obs) > 0: scheduler.add_observations_array(obs) logging.info("Finished adding visits") return scheduler
[docs] def save_scheduler(scheduler: CoreScheduler, file_name: str) -> None: """Save an instances of the scheduler in a pickle file, compressed according to its extension. Parameters ---------- scheduler : `CoreScheduler` The scheduler to save. file_name : `str` The file in which to save the schedulers. """ opener: typing.Callable = open if file_name.endswith(".bz2"): opener = bz2.open elif file_name.endswith(".xz"): opener = lzma.open elif file_name.endswith(".gz"): opener = gzip.open with opener(file_name, "wb") as pio: pickle.dump(scheduler, pio)
[docs] def add_make_scheduler_snapshot_args(parser: argparse.ArgumentParser) -> None: """Add arguments needed for saving a scheduler to an argument parser.""" parser.add_argument("--scheduler_fname", type=str, help="The file in which to save the scheduler.") parser.add_argument( "--repo", type=str, default=None, help="The repository from which to load the configuration." ) parser.add_argument( "--script", type=str, default=None, help="The path to the config script (relative to the repo root)." ) parser.add_argument( "--branch", type=str, default="main", help="The branch of the repo from which to get the script" ) parser.add_argument( "--visits", type=str, default="", help="Opsim database from which to load previous visits." )
def make_scheduler_snapshot_cli(cli_args: list = []) -> None: parser = argparse.ArgumentParser(description="Create a scheduler pickle") add_make_scheduler_snapshot_args(parser) args: argparse.Namespace = parser.parse_args() if len(cli_args) == 0 else parser.parse_args(cli_args) logging.basicConfig( level=logging.DEBUG, format="%(asctime)s:%(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%dT%H:%M:%S", ) scheduler: CoreScheduler = get_scheduler(args.repo, args.script, args.branch, args.visits) save_scheduler(scheduler, args.scheduler_fname) if __name__ == "__main__": make_scheduler_snapshot_cli()