Source code for rubin_sim.maf.db.tracking_db

__all__ = ("TrackingDb", "add_run_to_database")

import os

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.engine import url
from sqlalchemy.exc import DatabaseError
from sqlalchemy.orm import declarative_base, sessionmaker

from . import ResultsDb, VersionRow

Base = declarative_base()


class RunRow(Base):
    """
    Define contents and format of run list table.

    Table to list all available MAF results,
    along with their opsim run and some comment info.
    """

    __tablename__ = "runs"
    # Define columns in metric list table.
    maf_run_id = Column(Integer, primary_key=True)
    run_group = Column(String)
    run_name = Column(String)
    run_comment = Column(String)
    run_version = Column(String)
    run_date = Column(String)
    db_file = Column(String)
    maf_comment = Column(String)
    maf_version = Column(String)
    maf_date = Column(String)
    maf_dir = Column(String)

    def __repr__(self):
        rstr = (
            "<Run(maf_run_id='%d', run_group='%s', run_name='%s', run_comment='%s', "
            "run_version='%s', run_date='%s', maf_comment='%s', "
            "maf_version='%s', maf_date='%s', maf_dir='%s', db_file='%s'>"
            % (
                self.maf_run_id,
                self.run_group,
                self.run_name,
                self.maf_comment,
                self.maf_version,
                self.run_date,
                self.maf_comment,
                self.maf_version,
                self.maf_date,
                self.maf_dir,
                self.db_file,
            )
        )
        return rstr


[docs] class TrackingDb: """Sqlite database to track MAF output runs and their locations, for show_maf """ def __init__(self, database=None, trackingDbverbose=False): """ Set up the Tracking Database. """ self.verbose = trackingDbverbose self.driver = "sqlite" # connecting to non-existent database creates it automatically if database is None: # Default is a file in the current directory. self.database = "trackingDb_sqlite.db" self.tracking_db_dir = "." else: self.database = database self.tracking_db_dir = os.path.dirname(database) # only sqlite dbAddress = url.URL.create(drivername=self.driver, database=self.database) engine = create_engine(dbAddress, echo=self.verbose) if self.verbose: print("Created or connected to MAF tracking %s database at %s" % (self.driver, self.database)) self.Session = sessionmaker(bind=engine) self.session = self.Session() # Create the tables, if they don't already exist. try: Base.metadata.create_all(engine) except DatabaseError: raise DatabaseError( "Cannot create a %s database at %s. Check directory exists." % (self.driver, self.database) ) def close(self): self.session.close()
[docs] def add_run( self, run_group=None, run_name=None, run_comment=None, run_version=None, run_date=None, maf_comment=None, maf_version=None, maf_date=None, maf_dir=None, db_file=None, maf_run_id=None, ): """Add a run to the tracking database. Parameters ---------- run_group : `str`, optional Set a name to group this run with (eg. "Tier 1, 2016"). run_name : `str`, optional Set a name for the opsim run. run_comment : `str`, optional Set a comment describing the opsim run. run_version : `str`, optional Set the version of opsim. run_date : `str`, optional Set the date the opsim run was created. maf_comment : `str`, optional Set a comment to describe the MAF analysis. maf_version : `str`, optional Set the version of MAF used for analysis. maf_date : `str`, optional Set the date the MAF analysis was run. maf_dir : `str`, optional The relative path to the MAF directory. Will be converted to a relative path if absolute. db_file : `str`, optional The relative path to the Opsim SQLite database file. maf_run_id : `int`, optional The maf_run_id to assign to this record in the database (note this is a primary key!). If this run (ie the maf_dir) exists in the database already, this will be ignored. Returns ------- maf_run_id : `int` The maf_run_id stored in the database. """ if run_group is None: run_group = "NULL" if run_name is None: run_name = "NULL" if run_comment is None: run_comment = "NULL" if run_version is None: run_version = "NULL" if run_date is None: run_date = "NULL" if maf_comment is None: maf_comment = "NULL" if maf_version is None: maf_version = "NULL" if maf_date is None: maf_date = "NULL" if maf_dir is None: maf_dir = "NULL" if maf_dir is not None: maf_dir = os.path.relpath(maf_dir, start=self.tracking_db_dir) if db_file is None: db_file = "NULL" # Test if maf_dir already exists in database. prevrun = self.session.query(RunRow).filter_by(maf_dir=maf_dir).all() if len(prevrun) > 0: runIds = [] for run in prevrun: runIds.append(run.maf_run_id) print( "Updating information in tracking database - %s already present with runId %s." % (maf_dir, runIds) ) for run in prevrun: self.session.delete(run) self.session.commit() runinfo = RunRow( maf_run_id=runIds[0], run_group=run_group, run_name=run_name, run_comment=run_comment, run_version=run_version, run_date=run_date, maf_comment=maf_comment, maf_version=maf_version, maf_date=maf_date, maf_dir=maf_dir, db_file=db_file, ) else: if maf_run_id is not None: # Check if maf_run_id exists already. existing = self.session.query(RunRow).filter_by(maf_run_id=maf_run_id).all() if len(existing) > 0: raise ValueError( "maf_run_id %d already exists in database, for %s. " "Record must be deleted first." % (maf_run_id, existing[0].maf_dir) ) runinfo = RunRow( maf_run_id=maf_run_id, run_group=run_group, run_name=run_name, run_comment=run_comment, run_version=run_version, run_date=run_date, maf_comment=maf_comment, maf_version=maf_version, maf_date=maf_date, maf_dir=maf_dir, db_file=db_file, ) else: runinfo = RunRow( run_group=run_group, run_name=run_name, run_comment=run_comment, run_version=run_version, run_date=run_date, maf_comment=maf_comment, maf_version=maf_version, maf_date=maf_date, maf_dir=maf_dir, db_file=db_file, ) self.session.add(runinfo) self.session.commit() return runinfo.maf_run_id
[docs] def delRun(self, runId): """ Remove a run from the tracking database. """ runinfo = self.session.query(RunRow).filter_by(maf_run_id=runId).all() if len(runinfo) == 0: raise Exception("Could not find run with maf_run_id %d" % (runId)) if len(runinfo) > 1: raise Exception("Found more than one run with maf_run_id %d" % (runId)) print("Removing run info for runId %d " % (runId)) print(" ", runinfo) self.session.delete(runinfo[0]) self.session.commit()
[docs] def add_run_to_database( maf_dir, tracking_db_file, run_group=None, run_name=None, run_comment=None, maf_comment=None, db_file=None, maf_version=None, maf_date=None, sched_version=None, sched_date=None, skip_extras=False, ): """Adds information about a MAF analysis run to a MAF tracking database. Parameters ---------- maf_dir : `str` Path to the directory where the MAF results are located. tracking_db_file : `str` Full filename (+path) to the tracking database to use. run_group: `str`, optional Name to use to group this run with other opsim runs. run_name : `str`, optional Name of the opsim run. run_comment : `str`, optional Comment about the opsim run. run_version : `str`, optional Value to use for the opsim version information. maf_comment : `str`, optional Comment about the MAF analysis. db_file : `str`, optional Relative path + name of the opsim database file. """ trackingDb = TrackingDb(database=tracking_db_file) maf_dir = os.path.relpath(maf_dir, start=os.path.dirname(trackingDb.tracking_db_dir)) if not os.path.isdir(maf_dir): raise ValueError("There is no directory containing MAF outputs at %s." % (maf_dir)) # Connect to resultsDb for additional information if available if os.path.isfile(os.path.join(maf_dir, "resultsDb_sqlite.db")) and not skip_extras: resdb = ResultsDb(maf_dir) if run_name is None: run_name = resdb.get_run_name() if len(run_name) > 1: run_name = 0 else: run_name = run_name[0] if maf_version is None or maf_date is None: resdb.open() query = resdb.session.query(VersionRow).all() for v in query: if maf_version is None: maf_version = v.version if maf_date is None: maf_date = v.run_date resdb.close() print("Adding to tracking database at %s:" % (tracking_db_file)) print(" Maf_dir = %s" % (maf_dir)) print(" Maf_comment = %s" % (maf_comment)) print(" run_group = %s" % (run_group)) print(" run_name = %s" % (run_name)) print(" run_comment = %s" % (run_comment)) print(" run_version = %s" % (sched_version)) print(" run_date = %s" % (sched_date)) print(" maf_version = %s" % (maf_version)) print(" maf_date = %s" % (maf_date)) print(" db_file = %s" % (db_file)) runId = trackingDb.add_run( run_group=run_group, run_name=run_name, run_comment=run_comment, run_version=sched_version, run_date=sched_date, maf_comment=maf_comment, maf_version=maf_version, maf_date=maf_date, maf_dir=maf_dir, db_file=db_file, ) print("Used MAF RunID %d" % (runId)) trackingDb.close()