Source code for rubin_sim.maf.stackers.general_stackers

__all__ = (
    "NormAirmassStacker",
    "ParallaxFactorStacker",
    "HourAngleStacker",
    "ZenithDistStacker",
    "ParallacticAngleStacker",
    "DcrStacker",
    "FiveSigmaStacker",
    "SaturationStacker",
    "OverheadStacker",
)

import warnings

import astropy.units as u
import numpy as np
from astropy.coordinates import GCRS, SkyCoord
from astropy.time import Time
from rubin_scheduler.utils import Site, m5_flat_sed

from rubin_sim.maf.utils import load_inst_zeropoints

from .base_stacker import BaseStacker
from .date_stackers import DayObsMJDStacker


[docs] class SaturationStacker(BaseStacker): """Adds a calculated point-source saturation limit for each visit. Assumes Gaussian PSF. Parameters ---------- pixscale : `float`, optional Arcsec per pixel. saturation_e : `float`, optional The saturation level in electrons. zeropoints : dict-like, optional The zeropoints for the telescope. Keys should be str with filter names, values in mags. Default of None, will use Rubin calculated zeropoints. km : dict-like, optional Atmospheric extinction values. Keys should be str with filter names. If None, will use Rubin calculated atmospheric extinction values. """ cols_added = ["saturation_mag"] def __init__( self, seeing_col="seeingFwhmEff", skybrightness_col="skyBrightness", exptime_col="visitExposureTime", nexp_col="numExposures", filter_col="filter", airmass_col="airmass", saturation_e=150e3, zeropoints=None, km=None, pixscale=0.2, ): self.units = ["mag"] self.cols_req = [ seeing_col, skybrightness_col, exptime_col, nexp_col, filter_col, airmass_col, ] self.seeing_col = seeing_col self.skybrightness_col = skybrightness_col self.exptime_col = exptime_col self.nexp_col = nexp_col self.filter_col = filter_col self.airmass_col = airmass_col self.saturation_e = saturation_e self.pixscale = pixscale self.zeropoints = zeropoints self.km = km def _run(self, sim_data, cols_present=False): if self.zeropoints is None: zp_inst, k_atm = load_inst_zeropoints() self.zeropoints = zp_inst if self.km is None: self.km = k_atm for filtername in np.unique(sim_data[self.filter_col]): in_filt = np.where(sim_data[self.filter_col] == filtername)[0] # Calculate the length of the on-sky time per EXPOSURE exptime = sim_data[self.exptime_col][in_filt] / sim_data[self.nexp_col][in_filt] # Calculate sky counts per pixel per second # from skybrightness + zeropoint (e/1s) sky_counts = ( 10.0 ** (0.4 * (self.zeropoints[filtername] - sim_data[self.skybrightness_col][in_filt])) * self.pixscale**2 ) # Total sky counts in each exposure sky_counts = sky_counts * exptime # The counts available to the source (at peak) in each exposure is # the difference between saturation and sky remaining_counts_peak = self.saturation_e - sky_counts # Now to figure out how many counts there would be total, if there # are that many in the peak sigma = sim_data[self.seeing_col][in_filt] / 2.354 source_counts = remaining_counts_peak * 2.0 * np.pi * (sigma / self.pixscale) ** 2 # source counts = counts per exposure (expTimeCol / nexp) # Translate to counts per second, to apply zeropoint count_rate = source_counts / exptime sim_data["saturation_mag"][in_filt] = -2.5 * np.log10(count_rate) + self.zeropoints[filtername] # Airmass correction sim_data["saturation_mag"][in_filt] -= self.km[filtername] * ( sim_data[self.airmass_col][in_filt] - 1.0 ) # Explicitly make sure if sky has saturated we return NaN sim_data["saturation_mag"][np.where(remaining_counts_peak < 0)] = np.nan return sim_data
[docs] class FiveSigmaStacker(BaseStacker): """Calculate the 5-sigma limiting depth for a point source in the given conditions. This is generally not needed, unless the m5 parameters have been updated or m5 was not previously calculated. Parameters ---------- airmass_col : `str`, optional Name of the airmass column in the data. seeing_col : `str`, optional Name of the seeing column in the data. (FWHM of the single-Gaussian PSF) skybrightness_col : `str`, optional Name of the skybrightness column in the data. filter_col : `str`, optional Name of the filter bandpass column in the data. exptime_col : `str`, optional Name of the on-sky exposure time column in the data. """ cols_added = ["m5_simsUtils"] def __init__( self, airmass_col="airmass", seeing_col="seeingFwhmEff", skybrightness_col="skyBrightness", filter_col="filter", exptime_col="visitExposureTime", ): self.units = ["mag"] self.cols_req = [ airmass_col, seeing_col, skybrightness_col, filter_col, exptime_col, ] self.airmass_col = airmass_col self.seeing_col = seeing_col self.skybrightness_col = skybrightness_col self.filter_col = filter_col self.exptime_col = exptime_col def _run(self, sim_data, cols_present=False): if cols_present: # Column already present in data; assume it is fine. return sim_data filts = np.unique(sim_data[self.filter_col]) for filtername in filts: infilt = np.where(sim_data[self.filter_col] == filtername) sim_data["m5_simsUtils"][infilt] = m5_flat_sed( filtername, sim_data[infilt][self.skybrightness_col], sim_data[infilt][self.seeing_col], sim_data[infilt][self.exptime_col], sim_data[infilt][self.airmass_col], ) return sim_data
[docs] class NormAirmassStacker(BaseStacker): """Adds a calculated normairmass for each pointing. The normalized airmass is the airmass divided by the minimum airmass achievable at each pointing (which is defined by the declination of the field). Parameters ---------- airmass_col : `str`, optional The name of the airmass column in the data. dec_col : `str`, optional The name of the declination column in the data. degrees : `bool`, optional If True, angle columns are assumed to be in degrees and returned in degrees. If False, uses and calculates radians. telescope_lat : `float`, optional The latitude of the telescope, in degrees. """ cols_added = ["normairmass"] def __init__( self, airmass_col="airmass", dec_col="fieldDec", degrees=True, telescope_lat=-30.2446388, ): self.units = ["X / Xmin"] self.cols_req = [airmass_col, dec_col] self.airmass_col = airmass_col self.dec_col = dec_col self.telescope_lat = telescope_lat self.degrees = degrees def _run(self, sim_data, cols_present=False): # Run method is required to calculate column. # Driver runs getColInfo to know what columns are needed from db & # which are calculated, then gets data from db and then calculates # additional columns (via run methods here). if cols_present: # Column already present in data; assume it is correct # and does not need recalculating. return sim_data dec = sim_data[self.dec_col] if self.degrees: dec = np.radians(dec) min_z_possible = np.abs(dec - np.radians(self.telescope_lat)) min_airmass_possible = 1.0 / np.cos(min_z_possible) sim_data["normairmass"] = sim_data[self.airmass_col] / min_airmass_possible return sim_data
[docs] class ZenithDistStacker(BaseStacker): """Adds a calculated zenithDistance value for each pointing. Parameters ---------- alt_col : `str`, optional The name of the altitude column in the data. degrees : `bool`, optional If True, data in alt_col is in degrees, and values for zenithDistance will be in degrees. (Default). If False, data in alt_col is in radians and zenithDistance values will be in radians. """ cols_added = ["zenithDistance"] def __init__(self, alt_col="altitude", degrees=True): self.alt_col = alt_col self.degrees = degrees if self.degrees: self.units = ["degrees"] else: self.unit = ["radians"] self.cols_req = [self.alt_col] def _run(self, sim_data, cols_present=False): """Calculate new column for zenith distance.""" if cols_present: # Column already present in data; assume it is correct and does not # need recalculating. return sim_data if self.degrees: sim_data["zenithDistance"] = 90.0 - sim_data[self.alt_col] else: sim_data["zenithDistance"] = np.pi / 2.0 - sim_data[self.alt_col] return sim_data
[docs] class ParallaxFactorStacker(BaseStacker): """Add a parallax factor (in arcseconds) column for each visit. Parameters ---------- ra_col : `str`, optional Name of the RA column in the data. dec_col : `str`, optional Name of the declination column in the data. date_col : `str`, optional Name of the exposure start time column in the data. Date should be in units of MJD. degrees : `bool`, optional If true, assumes angles are in degrees. If False, radians. """ cols_added = ["ra_pi_amp", "dec_pi_amp"] def __init__( self, ra_col="fieldRA", dec_col="fieldDec", date_col="observationStartMJD", degrees=True, ): self.ra_col = ra_col self.dec_col = dec_col self.date_col = date_col self.units = ["arcsec", "arcsec"] self.cols_req = [ra_col, dec_col, date_col] self.degrees = degrees def _gnomonic_project_toxy(self, ra1, dec1, r_acen, deccen): # ra, dec values in RADIANS # also used in Global Telescope Network website cosc = np.sin(deccen) * np.sin(dec1) + np.cos(deccen) * np.cos(dec1) * np.cos(ra1 - r_acen) x = np.cos(dec1) * np.sin(ra1 - r_acen) / cosc y = (np.cos(deccen) * np.sin(dec1) - np.sin(deccen) * np.cos(dec1) * np.cos(ra1 - r_acen)) / cosc return x, y def _run(self, sim_data, cols_present=False): if cols_present: # Column already present in data; assume it is correct # and does not need recalculating. return sim_data ra_pi_amp = np.zeros(np.size(sim_data), dtype=[("ra_pi_amp", "float")]) dec_pi_amp = np.zeros(np.size(sim_data), dtype=[("dec_pi_amp", "float")]) ra = sim_data[self.ra_col] dec = sim_data[self.dec_col] if self.degrees: ra = np.radians(ra) dec = np.radians(dec) times = Time(sim_data[self.date_col], format="mjd") c = SkyCoord(ra * u.rad, dec * u.rad, obstime=times) geo_far = c.transform_to(GCRS) c_near = SkyCoord(ra * u.rad, dec * u.rad, distance=1 * u.pc, obstime=times) geo_near = c_near.transform_to(GCRS) x_geo1, y_geo1 = self._gnomonic_project_toxy(geo_near.ra.rad, geo_near.dec.rad, ra, dec) x_geo, y_geo = self._gnomonic_project_toxy(geo_far.ra.rad, geo_far.dec.rad, ra, dec) # Return ra_pi_amp and dec_pi_amp in arcseconds. ra_pi_amp[:] = np.degrees(x_geo1 - x_geo) * 3600.0 dec_pi_amp[:] = np.degrees(y_geo1 - y_geo) * 3600.0 sim_data["ra_pi_amp"] = ra_pi_amp sim_data["dec_pi_amp"] = dec_pi_amp return sim_data
[docs] class DcrStacker(BaseStacker): """Add columns representing the expected RA/Dec offsets expected for an object due to differential chromatic refraction, per visit. For DCR calculation, we also need zenithDistance, HA, and PA -- but these will be explicitly handled within this stacker so that setup is consistent and they run in order. If those values have already been calculated elsewhere, they will not be overwritten. Parameters ---------- filter_col : `str`, optional The name of the column with filter names. Default 'filter'. altCol : `str`, optional Name of the column with altitude info. Default 'altitude'. ra_col : `str`, optional Name of the column with RA. Default 'fieldRA'. dec_col : `str`, optional Name of the column with Dec. Default 'fieldDec'. lstCol : `str`, optional Name of the column with local sidereal time. Default 'observationStartLST'. site : `str` or `rubin_scheduler.utils.Site`, optional Name of the observory or a rubin_scheduler.utils.Site object. Default 'LSST'. mjdCol : `str`, optional Name of column with modified julian date. Default 'observationStartMJD' dcr_magnitudes : dict Magnitude of the DCR offset for each filter at an altitude/zenith distance of 45 degrees. Defaults u=0.07, g=0.07, r=0.50, i=0.045, z=0.042, y=0.04 (all values should be in arcseconds). Returns ------- data : `numpy.array` Returns array with additional columns 'ra_dcr_amp' and 'dec_dcr_amp' with the DCR offsets for each observation. Also runs ZenithDistStacker and ParallacticAngleStacker. """ cols_added = ["ra_dcr_amp", "dec_dcr_amp"] # zenithDist, HA, PA def __init__( self, filter_col="filter", alt_col="altitude", degrees=True, ra_col="fieldRA", dec_col="fieldDec", lst_col="observationStartLST", site="LSST", mjd_col="observationStartMJD", dcr_magnitudes=None, ): self.units = ["arcsec", "arcsec"] if dcr_magnitudes is None: # DCR amplitudes are in arcseconds. self.dcr_magnitudes = { "u": 0.07, "g": 0.07, "r": 0.050, "i": 0.045, "z": 0.042, "y": 0.04, } else: self.dcr_magnitudes = dcr_magnitudes self.zd_col = "zenithDistance" self.pa_col = "PA" self.filter_col = filter_col self.ra_col = ra_col self.dec_col = dec_col self.degrees = degrees self.cols_req = [filter_col, ra_col, dec_col, alt_col, lst_col] # 'zenithDist', 'PA', 'HA' are additional columns required, coming # from other stackers which must also be configured -- so we handle # this explicitly here. self.zstacker = ZenithDistStacker(alt_col=alt_col, degrees=self.degrees) self.pastacker = ParallacticAngleStacker( ra_col=ra_col, dec_col=dec_col, mjd_col=mjd_col, degrees=self.degrees, lst_col=lst_col, site=site, ) # Note that RA/Dec could be coming from a dither stacker! # But we will assume that coord stackers will be handled separately. def _run(self, sim_data, cols_present=False): if cols_present: # Column already present in data; assume it is correct and does not # need recalculating. return sim_data # Need to make sure the Zenith stacker gets run first Call _run method # because already added these columns due to 'colsAdded' line. sim_data = self.zstacker.run(sim_data) sim_data = self.pastacker.run(sim_data) if self.degrees: zenith_tan = np.tan(np.radians(sim_data[self.zd_col])) parallactic_angle = np.radians(sim_data[self.pa_col]) else: zenith_tan = np.tan(sim_data[self.zd_col]) parallactic_angle = sim_data[self.pa_col] dcr_in_ra = zenith_tan * np.sin(parallactic_angle) dcr_in_dec = zenith_tan * np.cos(parallactic_angle) for filtername in np.unique(sim_data[self.filter_col]): fmatch = np.where(sim_data[self.filter_col] == filtername) dcr_in_ra[fmatch] = self.dcr_magnitudes[filtername] * dcr_in_ra[fmatch] dcr_in_dec[fmatch] = self.dcr_magnitudes[filtername] * dcr_in_dec[fmatch] sim_data["ra_dcr_amp"] = dcr_in_ra sim_data["dec_dcr_amp"] = dcr_in_dec return sim_data
[docs] class HourAngleStacker(BaseStacker): """Add the Hour Angle (in decimal hours) for each observation. Parameters ---------- lst_col : `str`, optional Name of the LST column in the data. ra_col : `str`, optional Name of the RA column in the data. degrees : `bool`, optional If True, assumes angles (RA and LST) are in degrees. If False, assumes radians. """ cols_added = ["HA"] def __init__(self, lst_col="observationStartLST", ra_col="fieldRA", degrees=True): self.units = ["Hours"] self.cols_req = [lst_col, ra_col] self.lst_col = lst_col self.ra_col = ra_col self.degrees = degrees def _run(self, sim_data, cols_present=False): """HA = LST - RA""" if cols_present: # Column already present in data; assume it is correct and does not # need recalculating. return sim_data if len(sim_data) == 0: return sim_data if self.degrees: ra = np.radians(sim_data[self.ra_col]) lst = np.radians(sim_data[self.lst_col]) else: ra = sim_data[self.ra_col] lst = sim_data[self.lst_col] # Check that LST is reasonable if (np.min(lst) < 0) | (np.max(lst) > 2.0 * np.pi): warnings.warn("LST values are not between 0 and 2 pi") # Check that RA is reasonable if (np.min(ra) < 0) | (np.max(ra) > 2.0 * np.pi): warnings.warn("RA values are not between 0 and 2 pi") ha = lst - ra # Wrap the results so HA between -pi and pi ha = np.where(ha < -np.pi, ha + 2.0 * np.pi, ha) ha = np.where(ha > np.pi, ha - 2.0 * np.pi, ha) # Convert radians to hours sim_data["HA"] = ha * 12 / np.pi return sim_data
[docs] class ParallacticAngleStacker(BaseStacker): """Add the calculated parallactic angle to each visit. Parameters ---------- ra_col : `str`, optional Name of the RA column in the data. dec_col : `str`, optional Name of the declination column in the data. degrees : `bool`, optional If True, assumes ra and dec in degrees and returns Parallactic Angle in degrees. If False, assumes and returns radians. mjd_col : `str`, optional Name of the observation MJD column in the data. lst_col : `str`, optional Name of the LST column in the data. site : `str` or `rubin_scheduler.utils.Site`, optional Name of the observory or a rubin_scheduler.utils.Site object. Default 'LSST'. """ cols_added = ["PA"] def __init__( self, ra_col="fieldRA", dec_col="fieldDec", degrees=True, mjd_col="observationStartMJD", lst_col="observationStartLST", site="LSST", ): self.lst_col = lst_col self.ra_col = ra_col self.dec_col = dec_col self.degrees = degrees self.mjd_col = mjd_col self.site = Site(name=site) self.units = ["radians"] self.cols_req = [self.ra_col, self.dec_col, self.mjd_col, self.lst_col] self.ha_stacker = HourAngleStacker(lst_col=lst_col, ra_col=ra_col, degrees=self.degrees) def _run(self, sim_data, cols_present=False): # Equation from: # http://www.gb.nrao.edu/~rcreager/GBTMetrology/140ft/l0058/gbtmemo52/memo52.html # or # http://www.gb.nrao.edu/GBT/DA/gbtidl/release2pt9/contrib/contrib/parangle.pro if cols_present: # Column already present in data; assume it is correct and does not # need recalculating. return sim_data # Using the run method (not _run) means that if HA is present, it will # not be recalculated. sim_data = self.ha_stacker.run(sim_data) if self.degrees: dec = np.radians(sim_data[self.dec_col]) else: dec = sim_data[self.dec_col] sim_data["PA"] = np.arctan2( np.sin(sim_data["HA"] * np.pi / 12.0), ( np.cos(dec) * np.tan(self.site.latitude_rad) - np.sin(dec) * np.cos(sim_data["HA"] * np.pi / 12.0) ), ) if self.degrees: sim_data["PA"] = np.degrees(sim_data["PA"]) return sim_data
[docs] class OverheadStacker(BaseStacker): """Add time between visits in seconds. Parameters ---------- max_gap : `float`, optional The maximum gap between observations, in minutes. Assume anything longer the dome has closed. Defaults to infinity. mjd_col : `str`, optional The name of the column with the observation start MJD. Defaults to "observationStartMJD". visit_time_col : `str`, optional The name of the column with the total visit time (on-sky plus shutter and other overheads). Defaults to "visitTime". exposure_time_col : `str`, optional The name of the column with the visit on-sky exposure time. Defaults to "visitExposureTime." """ cols_added = ["overhead"] def __init__( self, max_gap=np.inf, mjd_col="observationStartMJD", visit_time_col="visitTime", exposure_time_col="visitExposureTime", ): # Set max_gap in minutes to match API of existing BruteOSFMetric self.max_gap = max_gap self.mjd_col = mjd_col self.visit_time_col = visit_time_col self.exposure_time_col = exposure_time_col self.units = ["seconds"] self.day_obs_mjd_stacker = DayObsMJDStacker(self.mjd_col) self.cols_req = [self.mjd_col, self.visit_time_col, self.exposure_time_col] self.cols_req.extend(self.day_obs_mjd_stacker.cols_req) self.cols_req = list(set(self.cols_req)) def _run(self, sim_data, cols_present=False): if cols_present: # Column already present in data; assume it is correct and does not # need recalculating. return sim_data # Count overhead as any time from the shutter close of the previous # visit, to the shutter close of the current visit, minus the # current on-sky exposure time. # This is all non-exposure time required for this visit -- # includes slew time to this target, any shutter overheads for this # visit, and any readouts not absorbed in slewtime for this visit. observation_end_mjd = sim_data[self.mjd_col] + sim_data[self.visit_time_col] / (24 * 60 * 60) overhead = ( np.diff(observation_end_mjd, prepend=np.nan) * 24 * 60 * 60 - sim_data[self.exposure_time_col] ) # Rough heuristic not to count downtime due to weather or instrument # problems as overhead. A more reliable way to do this would be to # use other sources of weather data or problem reporting to identify # these gaps. # We might also explicitly want to look at the gaps that arise from # these problems in order to measure time lost to these causes. overhead[overhead > self.max_gap * 60] = np.nan # If the gap includes a change of night, it isn't really overhead. For # most reasonable finite values of self.max_gap, this is never # relevant, but it's sometimes useful to set max_gap to inifinity to # catch all delays in the night. (The comment above.) But, in # even in this case, we would still not want to include gaps between # nights. day_obs_mjd = self.day_obs_mjd_stacker.run(sim_data)["day_obs_mjd"] different_night = np.diff(day_obs_mjd, prepend=np.nan) != 0 overhead[different_night] = np.nan sim_data[self.cols_added[0]] = overhead return sim_data