Source code for rubin_sim.moving_objects.base_obs

__all__ = ("BaseObs",)

import os
import warnings

import numpy as np
from rubin_scheduler.data import get_data_dir
from rubin_scheduler.utils import LsstCameraFootprint, angular_separation

from rubin_sim.phot_utils import Bandpass, Sed

from .ooephemerides import PyOrbEphemerides


[docs] class BaseObs: """ Base class to generate observations of a set of moving objects. Parameters ---------- footPrint: `str`, optional Specify the footprint for the FOV. Options include "camera", "circle", "rectangle". 'Camera' means use the actual LSST camera footprint (following a rough cut with a circular FOV). Default is camera FOV. r_fov : `float`, optional If footprint is "circular", this is the radius of the fov (in degrees). Default 1.75 degrees (only used for circular fov). x_tol : `float`, optional If footprint is "rectangular", this is half of the width of the (on-sky) fov in the RA direction (in degrees). Default 5 degrees. y_tol : `float`, optional If footprint is "rectangular", this is half of the width of the fov in Declination (in degrees). Default is 3 degrees eph_mode: `str`, optional Mode for ephemeris generation - nbody or 2body. Default is nbody. eph_type: `str`, optional Type of ephemerides to generate - full or basic. Full includes all values calculated by openorb; Basic includes a more basic set. Default is Basic. eph_file: `str` or None, optional The name of the planetary ephemerides file to use for ephemeris generation. Default (None) will use the default for PyOrbEphemerides. obs_code: `str`, optional Observatory code for ephemeris generation. Default is "I11" - Cerro Pachon. obs_time_col: `str`, optional Name of the time column in the obsData. Default 'observationStartMJD'. obs_time_scale: `str`, optional Type of timescale for MJD (TAI or UTC currently). Default TAI. seeing_col: `str`, optional Name of the seeing column in the obsData. Default 'seeingFwhmGeom'. This should be the geometric/physical seeing as it is used for the trailing loss calculation. visit_exp_time_col: `str`, optional Name of the visit exposure time column in the obsData. Default 'visitExposureTime'. obs_ra: `str`, optional Name of the RA column in the obsData. Default 'fieldRA'. obs_dec: `str`, optional Name of the Dec column in the obsData. Default 'fieldDec'. obs_rot_sky_pos: `str`, optional Name of the Rotator column in the obsData. Default 'rotSkyPos'. obs_degrees: `bool`, optional Whether the observational data is in degrees or radians. Default True (degrees). outfile_name : `str`, optional The output file name. Default is 'lsst_obs.dat'. obs_info : `str`, optional A string that captures provenance information about the observations. For example: 'baseline_v2.0_10yrs, years 0-5' or 'baseline2018a minus NES' Default ''. """ def __init__( self, footprint="camera", r_fov=1.75, x_tol=5, y_tol=3, eph_mode="nbody", eph_type="basic", obs_code="I11", eph_file=None, obs_time_col="observationStartMJD", obs_time_scale="TAI", seeing_col="seeingFwhmGeom", visit_exp_time_col="visitExposureTime", obs_ra="fieldRA", obs_dec="fieldDec", obs_rot_sky_pos="rotSkyPos", obs_degrees=True, outfile_name="lsst_obs.dat", obs_info="", camera_footprint_file=None, ): # Strings relating to the names of columns in the observation metadata. self.obs_code = obs_code self.obs_time_col = obs_time_col self.obs_time_scale = obs_time_scale self.seeing_col = seeing_col self.visit_exp_time_col = visit_exp_time_col self.obs_ra = obs_ra self.obs_dec = obs_dec self.obs_rot_sky_pos = obs_rot_sky_pos self.obs_degrees = obs_degrees # Save a space for the standard object colors. self.colors = {} self.outfile_name = outfile_name # Values for identifying observations. self.footprint = footprint.lower() if self.footprint == "camera": self._setup_camera(camera_footprint_file=camera_footprint_file) self.r_fov = r_fov self.x_tol = x_tol self.y_tol = y_tol # Values for ephemeris generation. if eph_mode.lower() not in ("2body", "nbody"): raise ValueError("Ephemeris generation must be 2body or nbody.") self.eph_mode = eph_mode self.eph_type = eph_type self.eph_file = eph_file self._setup_info(obs_info=obs_info) def _setup_info(self, obs_info=""): """Generate a dict to record relevant settings""" info = { "obs_info": obs_info, "footprint": self.footprint, "eph_mode": self.eph_mode, "eph_type": self.eph_type, } if self.footprint == "circle": info["r_fov"] = self.r_fov if self.footprint == "rectangle": info["xtol"] = self.x_tol info["ytol"] = self.y_tol # convert to numpy array in anticipation of saving names = list(info.keys()) types = [np.array(info[key]).dtype for key in names] self.info = np.zeros(1, dtype=list(zip(names, types))) for key in names: self.info[key] = info[key] def _setup_camera(self, camera_footprint_file=None): self.camera = LsstCameraFootprint(units="degrees", footprint_file=camera_footprint_file)
[docs] def setup_ephemerides(self): """Initialize the ephemeris generator. Save the setup PyOrbEphemeris class. This uses the default engine, pyoorb - however this could be overwritten to use another generator. """ self.ephems = PyOrbEphemerides(ephfile=self.eph_file)
[docs] def generate_ephemerides(self, sso, times, eph_mode=None, eph_type=None): """Generate ephemerides for 'sso' at times 'times' (assuming MJDs, with timescale self.obs_time_scale). The default engine here is pyoorb, however other ephemeris generation could be used with a matching API to PyOrbEphemerides. The initialized pyoorb class (PyOrbEphemerides) is saved, to skip setup on subsequent calls. Parameters ---------- sso : `rubin_sim.movingObjects.Orbits` Typically this will be a single object. times: `np.ndarray` The times at which to generate ephemerides. MJD. eph_mode: `str` or None, optional Potentially override default eph_mode (self.eph_mode). Must be '2body' or 'nbody'. Returns ------- ephs : `pd.Dataframe` Results from propigating the orbit(s) to the specified times. Columns like: obj_id, sedname, time, ra, dec, dradt, ddecdt, phase, solarelon. """ if not hasattr(self, "ephems"): self.setup_ephemerides() if eph_mode is None: eph_mode = self.eph_mode if eph_type is None: eph_type = self.eph_type self.ephems.set_orbits(sso) ephs = self.ephems.generate_ephemerides( times, time_scale=self.obs_time_scale, obscode=self.obs_code, eph_mode=eph_mode, eph_type=eph_type, by_object=True, ) return ephs
[docs] def calc_trailing_losses(self, velocity, seeing, texp=30.0): """Calculate the detection and SNR trailing losses. 'Trailing' losses = loss in sensitivity due to the photons from the source being spread over more pixels; thus more sky background is included when calculating the flux from the object and thus the SNR is lower than for an equivalent brightness stationary/PSF-like source. dmagTrail represents this loss. 'Detection' trailing losses = loss in sensitivity due to the photons from the source being spread over more pixels, in a non-stellar-PSF way, while source detection is (typically) done using a stellar PSF filter and 5-sigma cutoff values based on assuming peaks from stellar PSF's above the background; thus the SNR is lower than for an equivalent brightness stationary/PSF-like source (and by a greater factor than just the simple SNR trailing loss above). dmag_detect represents this loss. Parameters ---------- velocity : `np.ndarray` or `float` The velocity of the moving objects, in deg/day. seeing : `np.ndarray` or `float` The seeing of the images, in arcseconds. texp : `np.ndarray` or `float`, optional The exposure time of the images, in seconds. Default 30. Returns ------- dmag Trail, dmag_detect : (`np.ndarray` `np.ndarray`) or (`float`, `float`) dmag_trail and dmag_detect for each set of velocity/seeing/texp values. """ a_trail = 0.761 b_trail = 1.162 a_det = 0.420 b_det = 0.003 x = velocity * texp / seeing / 24.0 dmag_trail = 1.25 * np.log10(1 + a_trail * x**2 / (1 + b_trail * x)) dmag_detect = 1.25 * np.log10(1 + a_det * x**2 / (1 + b_det * x)) return (dmag_trail, dmag_detect)
[docs] def read_filters( self, filter_dir=None, bandpass_root="total_", bandpass_suffix=".dat", filterlist=("u", "g", "r", "i", "z", "y"), v_dir=None, v_filter="harris_V.dat", ): """Read (LSST) and Harris (V) filter throughput curves. Only the defaults are LSST specific; this can easily be adapted for any survey. Parameters ---------- filter_dir : `str`, optional Directory containing the filter throughput curves ('total*.dat') Default set by 'LSST_THROUGHPUTS_BASELINE' env variable. bandpass_root : `str`, optional Rootname of the throughput curves in filterlist. E.g. throughput curve names are bandpass_root + filterlist[i] + bandpass_suffix Default `total_` (appropriate for LSST throughput repo). bandpass_suffix : `str`, optional Suffix for the throughput curves in filterlist. Default '.dat' (appropriate for LSST throughput repo). filterlist : `list`, optional List containing the filter names to use to calculate colors. Default ('u', 'g', 'r', 'i', 'z', 'y') v_dir : `str`, optional Directory containing the V band throughput curve. Default None = $RUBIN_SIM_DATA_DIR/movingObjects v_filter : `str`, optional Name of the V band filter curve. Default harris_V.dat. """ if filter_dir is None: filter_dir = os.path.join(get_data_dir(), "throughputs/baseline") if v_dir is None: v_dir = os.path.join(get_data_dir(), "movingObjects") self.filterlist = filterlist # Read filter throughput curves from disk. self.lsst = {} for f in self.filterlist: self.lsst[f] = Bandpass() self.lsst[f].read_throughput(os.path.join(filter_dir, bandpass_root + f + bandpass_suffix)) self.vband = Bandpass() self.vband.read_throughput(os.path.join(v_dir, v_filter))
[docs] def calc_colors(self, sedname="C.dat", sed_dir=None): """Calculate the colors for a given SED. If the sedname is not already in the dictionary self.colors, this reads the SED from disk and calculates all V-[filter] colors for all filters in self.filterlist. The result is stored in self.colors[sedname][filter], so will not be recalculated if the SED + color is reused for another object. Parameters ---------- sedname : `str`, optional Name of the SED. Default 'C.dat'. sed_dir : `str`, optional Directory containing the SEDs of the moving objects. Default None = $RUBIN_SIM_DATA_DIR/movingObjects, Returns ------- colors : `dict` {'filter': color}} Dictionary of the colors in self.filterlist. """ if sedname not in self.colors: if sed_dir is None: sed_dir = os.path.join(get_data_dir(), "movingObjects") mo_sed = Sed() mo_sed.read_sed_flambda(os.path.join(sed_dir, sedname)) vmag = mo_sed.calc_mag(self.vband) self.colors[sedname] = {} for f in self.filterlist: self.colors[sedname][f] = mo_sed.calc_mag(self.lsst[f]) - vmag return self.colors[sedname]
[docs] def sso_in_circle_fov(self, ephems, obs_data): """Determine which observations are within a circular fov for a series of observations. Note that ephems and obs_data must be the same length. Parameters ---------- ephems : `np.recarray` Ephemerides for the objects. obs_data : `np.recarray` The observation pointings. Returns ------- indices : `np.ndarray` Returns the indexes of the numpy array of the object observations which are inside the fov. """ return self._sso_in_circle_fov(ephems, obs_data, self.r_fov)
def _sso_in_circle_fov(self, ephems, obs_data, r_fov): if not self.obs_degrees: sep = angular_separation( ephems["ra"], ephems["dec"], np.degrees(obs_data[self.obs_ra]), np.degrees(obs_data[self.obs_dec]), ) else: sep = angular_separation( ephems["ra"], ephems["dec"], obs_data[self.obs_ra], obs_data[self.obs_dec], ) idx_obs = np.where(sep <= r_fov)[0] return idx_obs
[docs] def sso_in_rectangle_fov(self, ephems, obs_data): """Determine which observations are within a rectangular FoV for a series of observations. Note that ephems and obs_data must be the same length. Parameters ---------- ephems : `np.recarray` Ephemerides for the objects. obs_data : `np.recarray` The observation pointings. Returns ------- indices : `np.ndarray` Returns the indexes of the numpy array of the object observations which are inside the fov. """ return self._sso_in_rectangle_fov(ephems, obs_data, self.x_tol, self.y_tol)
def _sso_in_rectangle_fov(self, ephems, obs_data, x_tol, y_tol): delta_dec = np.abs(ephems["dec"] - obs_data[self.obs_dec]) delta_ra = np.abs((ephems["ra"] - obs_data[self.obs_ra]) * np.cos(np.radians(obs_data[self.obs_dec]))) idx_obs = np.where((delta_dec <= y_tol) & (delta_ra <= x_tol))[0] return idx_obs
[docs] def sso_in_camera_fov(self, ephems, obs_data): """Determine which observations are within the actual camera footprint for a series of observations. Note that ephems and obs_data must be the same length. Parameters ---------- ephems : `np.ndarray` Ephemerides for the objects. obs_data : `np.ndarray` Observation pointings. Returns ------- indices : `np.ndarray` Returns the indexes of the numpy array of the object observations which are inside the fov. """ if not hasattr(self, "camera"): self._setup_camera() if not self.obs_degrees: idx = self.camera( ephems["ra"], ephems["dec"], np.degrees(obs_data[self.obs_ra]), np.degrees(obs_data[self.obs_dec]), np.degrees(obs_data[self.obs_rot_sky_pos]), ) else: idx = self.camera( ephems["ra"], ephems["dec"], obs_data[self.obs_ra], obs_data[self.obs_dec], obs_data[self.obs_rot_sky_pos], ) return idx
[docs] def sso_in_fov(self, ephems, obs_data): """Convenience layer - determine which footprint method to apply (from self.footprint) and use it. Parameters ---------- ephems : `np.ndarray` Ephemerides for the objects. obs_data : `np.ndarray` Observation pointings. Returns ------- indices : `np.ndarray` Returns the indexes of the numpy array of the object observations which are inside the fov. """ if self.footprint == "camera": return self.sso_in_camera_fov(ephems, obs_data) elif self.footprint == "rectangle": return self.sso_in_rectangle_fov(ephems, obs_data) elif self.footprint == "circle": return self.sso_in_circle_fov(ephems, obs_data) else: warnings.warn("Using circular fov; could not match specified footprint.") self.footprint = "circle" return self.sso_in_circle_fov(ephems, obs_data)