Source code for rubin_sim.satellite_constellations.model_observatory

__all__ = ("ModelObservatory",)

import numpy as np
from rubin_scheduler.scheduler.model_observatory import ModelObservatory as oMO
from rubin_scheduler.site_models import Almanac
from rubin_scheduler.utils import _healbin, survey_start_mjd

# Take the model observatory from the scheduler and
# subclass to expand to include satellite constellations


[docs] class ModelObservatory(oMO): """A class to generate a realistic telemetry stream for the scheduler Parameters ---------- nside : `int` The healpix nside resolution mjd_start : `float` The MJD to start the observatory up at. Uses util to lookup default if None. alt_min : `float` The minimum altitude to compute models at (degrees). lax_dome : `bool` Passed to observatory model. If true, allows dome creep. cloud_limit : `float` The limit to stop taking observations if the cloud model returns something equal or higher sim_to_o : `sim_targetoO` If one would like to inject simulated ToOs into the telemetry stream. seeing_db : `str` If one would like to use an alternate seeing database, filename of sqlite file park_after : `float` Park the telescope after a gap longer than park_after (minutes) init_load_length : `int` The length of pre-scheduled sky brighntess to load initially (days). alt_limit : `float` Altitude limit for considering satellite streaks (degrees). satellite_dt : `float` The time step to use for computing satellite positions (seconds). sat_nside : `int` The HEALpix nside to use for satellite streak maps. constellation : `rubin_sim.satellite_constellations.Constellation` The satellite constellation to use. """ def __init__( self, nside=None, mjd_start=None, seed=42, alt_min=5.0, lax_dome=True, cloud_limit=0.3, sim_to_o=None, seeing_db=None, park_after=10.0, init_load_length=10, sat_nside=64, satellite_dt=10.0, constellation=None, alt_limit=20.0, ): # Add in the new satellite information self.alt_limit = np.radians(alt_limit) self.satelite_dt = satellite_dt / 3600.0 / 24.0 # Seconds to days self.sat_nside = sat_nside self.constellation = constellation # Need to do a little fiddle with the MJD since # self.mjd needs self.night set now. self.mjd_start = survey_start_mjd() if mjd_start is None else mjd_start self.almanac = Almanac(mjd_start=self.mjd_start) self.night = -1 # Run the rest of the regular __init__ steps super().__init__( nside=None, mjd_start=self.mjd_start, seed=seed, alt_min=alt_min, lax_dome=lax_dome, cloud_limit=cloud_limit, sim_to_o=sim_to_o, seeing_db=seeing_db, park_after=park_after, init_load_length=init_load_length, )
[docs] def return_conditions(self): """ Returns ------- conditions: `rubin_sim.scheduler.features.conditions` Current conditions as simulated by the ModelObservatory. """ # Spot to put in satellite streak prediction maps self.conditions.satellite_mjds = self.sat_mjds self.conditions.satellite_maps = self.satellite_maps # Run the regular return conditions super().return_conditions() # I guess running super() means return statement gets skipped? return self.conditions
@property def mjd(self): return self._mjd @mjd.setter def mjd(self, value): self._mjd = value self.almanac_indx = self.almanac.mjd_indx(value) # Update night if needed if self.almanac.sunsets["night"][self.almanac_indx] != self.night: self.night = self.almanac.sunsets["night"][self.almanac_indx] # Update the satellite prediction map for the night self._update_satellite_maps() def _update_satellite_maps(self): """Make the satellite prediction maps for the night. Will set self.sat_mjds and self.satellite_maps that can then be attached to a conditions object in self.return_conditions """ sunset = self.almanac.sunsets["sun_n12_setting"][self.almanac_indx] sunrise = self.almanac.sunsets["sun_n12_rising"][self.almanac_indx] self.sat_mjds = np.arange(sunset, sunrise, self.satelite_dt) # Compute RA and decs for when sun is down ras, decs, alts, illums = self.constellation.paths_array(self.sat_mjds) below_limit = np.where(alts < self.alt_limit) weights = np.zeros(ras.shape, dtype=int) weights[illums] = 1 weights[below_limit] = 0 satellite_maps = [] for i, mjd in enumerate(self.sat_mjds): spot_map = _healbin( ras[:, i][illums[:, i]], decs[:, i][illums[:, i]], weights[:, i][illums[:, i]], self.sat_nside, reduce_func=np.sum, dtype=int, fill_val=0, ) satellite_maps.append(spot_map) self.satellite_maps = np.vstack(satellite_maps)