__all__ = (
"get_sim_data",
"get_visit_data",
"scale_benchmarks",
"calc_coadded_depth",
)
import os
import sqlite3
import urllib
from contextlib import closing
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import make_url
def _local_get_sim_data(
db_con,
sqlconstraint=None,
dbcols=None,
stackers=None,
table_name=None,
full_sql_query=None,
*,
return_class=np.recarray,
):
if sqlconstraint is None:
sqlconstraint = ""
need_sql: bool = full_sql_query is not None or len(sqlconstraint) > 0
# Check that file exists
if isinstance(db_con, str):
if os.path.isfile(db_con) is False:
raise FileNotFoundError("No file %s" % db_con)
# Check if this is an HDF5 file
is_hdf5 = isinstance(db_con, str) and db_con.lower().endswith((".h5", ".hdf5"))
# Check if table is "observations" or "SummaryAllProps"
if (table_name is None) & (full_sql_query is None) & (isinstance(db_con, str)):
if is_hdf5:
# For HDF5, detect table names by reading the store keys
with pd.HDFStore(db_con, mode="r") as store:
table_keys = [key.lstrip("/") for key in store.keys()]
if "observations" in table_keys:
table_name = "observations"
elif "SummaryAllProps" in table_keys:
table_name = "SummaryAllProps"
elif "summary" in table_keys:
table_name = "summary"
else:
raise ValueError("Could not guess table_name, set with table_name or full_sql_query kwargs")
else:
url = make_url("sqlite:///" + db_con)
eng = create_engine(url)
inspector = inspect(eng)
tables = [inspector.get_table_names(schema=schema) for schema in inspector.get_schema_names()]
if "observations" in tables[0]:
table_name = "observations"
elif "SummaryAllProps" in tables[0]:
table_name = "SummaryAllProps"
elif "summary" in tables[0]:
table_name = "summary"
else:
raise ValueError("Could not guess table_name, set with table_name or full_sql_query kwargs")
elif (table_name is None) & (full_sql_query is None):
# If someone passes in a connection object with an old table_name
# things will fail
# that's probably fine, keep people from getting fancy with old sims
table_name = "observations"
if full_sql_query is None:
col_str = "*" if dbcols is None else ", ".join(dbcols)
query = "SELECT %s FROM %s" % (col_str, table_name)
if len(sqlconstraint) > 0:
query += " WHERE %s" % (sqlconstraint)
query += ";"
else:
query = full_sql_query
sim_data: np.recarray | pd.DataFrame | None = None
if is_hdf5 and not need_sql:
# Pure HDF5 path - no SQL filtering needed
sim_data = pd.read_hdf(db_con, key=table_name)
elif isinstance(db_con, sqlite3.Connection):
sim_data = pd.read_sql(query, db_con)
elif isinstance(db_con, str) and os.path.isfile(db_con):
# Handle HDF5 files with SQL constraints by loading into
# in-memory SQLite
if is_hdf5:
with closing(sqlite3.connect(":memory:")) as con:
with pd.HDFStore(db_con, mode="r") as store:
for key in store.keys():
tbl_name = key.lstrip("/")
df = pd.read_hdf(db_con, key=key)
df.to_sql(tbl_name, con, index=False)
sim_data = pd.read_sql(query, con)
else:
with closing(sqlite3.connect(db_con)) as con:
sim_data = pd.read_sql(query, con)
else:
raise RuntimeError(f"Cannot find {db_con}.")
if len(sim_data) == 0:
raise UserWarning("No data found matching sqlconstraint %s" % (sqlconstraint))
# Now add the stacker columns.
# This fails for pandas.DataFrames, so convert to recarray
# if necessary.
if stackers is not None:
if not isinstance(sim_data, np.recarray):
assert isinstance(sim_data, pd.DataFrame)
sim_data = sim_data.to_records(index=False)
for s in stackers:
sim_data = s.run(sim_data).view(np.recarray)
if return_class is np.recarray:
if isinstance(sim_data, pd.DataFrame):
sim_data = sim_data.to_records(index=False)
if return_class is pd.DataFrame:
if isinstance(sim_data, np.recarray):
sim_data = pd.DataFrame(sim_data)
assert isinstance(sim_data, return_class)
return sim_data
[docs]
def get_sim_data(
db_con,
sqlconstraint=None,
dbcols=None,
stackers=None,
table_name=None,
full_sql_query=None,
*,
return_class=np.recarray,
):
"""Query an opsim database for the needed data columns
and run any required stackers.
Parameters
----------
db_con : `str` or SQLAlchemy connectable, or sqlite3 connection
Filename or lsst.resources path to an hdf5 or sqlite3 file,
or a connection object that can be used by pandas.read_sql
sqlconstraint : `str` or None
SQL constraint to apply to query for observations.
Ignored if full_sql_query is set.
dbcols : `list` [`str`]
Columns required from the database. Ignored if full_sql_query is set.
stackers : `list` [`rubin_sim.maf.stackers`], optional
Stackers to be used to generate additional columns. Default None.
table_name : `str` (None)
Name of the table to query.
Default None will try "observations".
Ignored if full_sql_query is set.
full_sql_query : `str`
The full SQL query to use. Overrides sqlconstraint, dbcols, tablename.
Returns
-------
sim_data : `np.ndarray`
A numpy structured array with columns resulting from dbcols + stackers,
for observations matching the SQLconstraint.
"""
if (not isinstance(db_con, str)) or urllib.parse.urlparse(db_con).scheme == "":
# Already have a local copy
sim_data = _local_get_sim_data(
db_con, sqlconstraint, dbcols, stackers, table_name, full_sql_query, return_class=return_class
)
else:
try:
from lsst.resources import ResourcePath
with ResourcePath(db_con).as_local() as local_db_path:
sim_data = _local_get_sim_data(
local_db_path,
sqlconstraint,
dbcols,
stackers,
table_name,
full_sql_query,
return_class=return_class,
)
except ModuleNotFoundError:
raise RuntimeError(
f"Cannot read visits from {db_con}."
"Maybe it does not exist, or maybe you need to install lsst.resources."
)
return sim_data
# This almost-alias to get_sim_data is named to be less misleading,
# in that the same get_*_data function can be used for real visits
# from consdb as well. Return a DF instead of a recarray by default
# because much of the code designed for reading consdb output
# wants that.
[docs]
def get_visit_data(
db_con,
sqlconstraint=None,
dbcols=None,
stackers=None,
table_name=None,
full_sql_query=None,
*,
return_class=pd.DataFrame,
):
"""Query an opsim database, returning a `pandas.DataFrame` by default.
Thin wrapper around `get_sim_data` with ``return_class`` defaulting to
`pandas.DataFrame` instead of `numpy.recarray`. All parameters are
forwarded unchanged; see `get_sim_data` for full documentation.
Parameters
----------
db_con : `str` or SQLAlchemy connectable, or sqlite3 connection
Filename or lsst.resources path to an hdf5 or sqlite3 file,
or a connection object that can be used by pandas.read_sql
sqlconstraint : `str` or None
SQL constraint to apply to query for observations.
Ignored if full_sql_query is set.
dbcols : `list` [`str`]
Columns required from the database. Ignored if full_sql_query is set.
stackers : `list` [`rubin_sim.maf.stackers`], optional
Stackers to be used to generate additional columns. Default None.
table_name : `str` (None)
Name of the table to query.
Default None will try "observations".
Ignored if full_sql_query is set.
full_sql_query : `str`
The full SQL query to use. Overrides sqlconstraint, dbcols, tablename.
return_class : `type`, optional
Class of the returned data. Default `pandas.DataFrame`.
Returns
-------
sim_data : `pandas.DataFrame`
A `pandas.DataFrame` with columns resulting from dbcols + stackers,
for observations matching the sqlconstraint.
See Also
--------
get_sim_data : Equivalent function with ``return_class`` defaulting to
`numpy.recarray`.
"""
return get_sim_data(
db_con,
sqlconstraint=sqlconstraint,
dbcols=dbcols,
stackers=stackers,
table_name=table_name,
full_sql_query=full_sql_query,
return_class=return_class,
)
[docs]
def scale_benchmarks(run_length, benchmark="design"):
"""Set design and stretch values of the number of visits or
area of the footprint or seeing/Fwhmeff/skybrightness and single visit
depth (based on SRD values).
Scales number of visits for the length of the run, relative to 10 years.
Parameters
----------
run_length : `float`
The length (in years) of the run.
benchmark : `str`
design or stretch - which version of the SRD values to return.
Returns
-------
benchmarks: `dict` of floats
A dictionary containing the number of visits, area of footprint,
seeing and FWHMeff values, skybrightness and single visit depth
for either the design or stretch SRD values.
"""
# Set baseline (default) numbers for the baseline survey length (10 years).
baseline = 10.0
design = {}
stretch = {}
design["nvisitsTotal"] = 825
stretch["nvisitsTotal"] = 1000
design["Area"] = 18000
stretch["Area"] = 20000
design["nvisits"] = {"u": 56, "g": 80, "r": 184, "i": 184, "z": 160, "y": 160}
stretch["nvisits"] = {"u": 70, "g": 100, "r": 230, "i": 230, "z": 200, "y": 200}
# mag/sq arcsec
design["skybrightness"] = {
"u": 21.8,
"g": 22.0,
"r": 21.3,
"i": 20.0,
"z": 19.1,
"y": 17.5,
}
stretch["skybrightness"] = {
"u": 21.8,
"g": 22.0,
"r": 21.3,
"i": 20.0,
"z": 19.1,
"y": 17.5,
}
# arcsec - old seeing values
design["seeing"] = {"u": 0.77, "g": 0.73, "r": 0.7, "i": 0.67, "z": 0.65, "y": 0.63}
stretch["seeing"] = {
"u": 0.77,
"g": 0.73,
"r": 0.7,
"i": 0.67,
"z": 0.65,
"y": 0.63,
}
# arcsec - new FWHMeff values (scaled from old seeing)
design["FWHMeff"] = {
"u": 0.92,
"g": 0.87,
"r": 0.83,
"i": 0.80,
"z": 0.78,
"y": 0.76,
}
stretch["FWHMeff"] = {
"u": 0.92,
"g": 0.87,
"r": 0.83,
"i": 0.80,
"z": 0.78,
"y": 0.76,
}
design["singleVisitDepth"] = {
"u": 23.9,
"g": 25.0,
"r": 24.7,
"i": 24.0,
"z": 23.3,
"y": 22.1,
}
stretch["singleVisitDepth"] = {
"u": 24.0,
"g": 25.1,
"r": 24.8,
"i": 24.1,
"z": 23.4,
"y": 22.2,
}
# Scale the number of visits.
if run_length != baseline:
scalefactor = float(run_length) / float(baseline)
# Calculate scaled value for design and stretch values of nvisits,
# per filter.
for f in design["nvisits"]:
design["nvisits"][f] = int(np.floor(design["nvisits"][f] * scalefactor))
stretch["nvisits"][f] = int(np.floor(stretch["nvisits"][f] * scalefactor))
if benchmark == "design":
return design
elif benchmark == "stretch":
return stretch
else:
raise ValueError("Benchmark value %s not understood: use 'design' or 'stretch'" % (benchmark))
[docs]
def calc_coadded_depth(nvisits, single_visit_depth):
"""Calculate the coadded depth expected for a given number of visits
and single visit depth.
Parameters
----------
nvisits : `dict` of `int` or `float`
Dictionary (per filter) of number of visits
single_visit_depth : `dict` of `float`
Dictionary (per filter) of the single visit depth
Returns
-------
coadded_depth : `dict` of `float`
Dictionary of coadded depths per filter.
"""
coadded_depth = {}
for f in nvisits:
if f not in single_visit_depth:
raise ValueError("Filter keys in nvisits and single_visit_depth must match")
coadded_depth[f] = float(1.25 * np.log10(nvisits[f] * 10 ** (0.8 * single_visit_depth[f])))
if not np.isfinite(coadded_depth[f]):
coadded_depth[f] = single_visit_depth[f]
return coadded_depth