Source code for rubin_sim.maf.stackers.coord_stackers

__all__ = ("ra_dec2_alt_az", "GalacticStacker", "EclipticStacker")

import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord, get_sun
from astropy.time import Time
from rubin_scheduler.utils import calc_lmst

from .base_stacker import BaseStacker


def wrap_ra(ra):
    """
    Wrap only RA values into 0-2pi (using mod).

    Parameters
    ----------
    ra : numpy.ndarray
        RA in radians

    Returns
    -------
    numpy.ndarray
        Wrapped RA values, in radians.
    """
    ra = ra % (2.0 * np.pi)
    return ra


[docs] def ra_dec2_alt_az(ra, dec, lat, lon, mjd, altonly=False): """Convert RA/Dec (and telescope site lat/lon) to alt/az. This uses simple equations and ignores aberation, precession, nutation. Parameters ---------- ra : `np.ndarray`, (N,) RA, in radians. dec : `np.ndarray`, (N,) Dec, in radians. Must be same length as `ra`. lat : `float` Latitude of the observatory in radians. lon : `float` Longitude of the observatory in radians. mjd : `float` Modified Julian Date. altonly : `bool`, optional Calculate altitude only. Returns ------- alt : `np.ndarray`, (N,) Altitude, same length as `ra` and `dec`. Radians. az : `np.ndarray`, (N,) Azimuth, same length as `ra` and `dec`. Radians. """ lmst = calc_lmst(mjd, lon) lmst = lmst / 12.0 * np.pi # convert to rad ha = lmst - ra sindec = np.sin(dec) sinlat = np.sin(lat) coslat = np.cos(lat) sinalt = sindec * sinlat + np.cos(dec) * coslat * np.cos(ha) # make sure sinalt is in the expected range. sinalt = np.where(sinalt < -1, -1, sinalt) sinalt = np.where(sinalt > 1, 1, sinalt) alt = np.arcsin(sinalt) if altonly: az = None else: cosaz = (sindec - np.sin(alt) * sinlat) / (np.cos(alt) * coslat) cosaz = np.where(cosaz < -1, -1, cosaz) cosaz = np.where(cosaz > 1, 1, cosaz) az = np.arccos(cosaz) signflip = np.where(np.sin(ha) > 0) az[signflip] = 2.0 * np.pi - az[signflip] return alt, az
[docs] class GalacticStacker(BaseStacker): """Add the galactic coordinates of each RA/Dec pointing: gall, galb Parameters ---------- ra_col : str, optional Name of the RA column. Default fieldRA. dec_col : str, optional Name of the Dec column. Default fieldDec. """ cols_added = ["gall", "galb"] def __init__(self, ra_col="fieldRA", dec_col="fieldDec", degrees=True): self.cols_req = [ra_col, dec_col] self.ra_col = ra_col self.dec_col = dec_col self.degrees = degrees if self.degrees: self.units = ["degrees", "degrees"] else: self.units = ["radians", "radians"] def _run(self, sim_data, cols_present=False): # raCol and DecCol in radians, gall/b in radians. if cols_present: # Column already present in data; # assume it is correct and does not need recalculating. return sim_data if self.degrees: c = SkyCoord(ra=sim_data[self.ra_col] * u.deg, dec=sim_data[self.dec_col] * u.deg).transform_to( "galactic" ) else: c = SkyCoord(ra=sim_data[self.ra_col] * u.rad, dec=sim_data[self.dec_col] * u.rad).transform_to( "galactic" ) sim_data["gall"] = c.l.rad sim_data["galb"] = c.b.rad return sim_data
[docs] class EclipticStacker(BaseStacker): """Add the ecliptic coordinates of each RA/Dec pointing: eclipLat, eclipLon Optionally subtract off the sun's ecliptic longitude and wrap. Parameters ---------- mjd_col : str, optional Name of the MJD column. Default expMJD. ra_col : str, optional Name of the RA column. Default fieldRA. dec_col : str, optional Name of the Dec column. Default fieldDec. subtract_sun_lon : bool, optional Flag to subtract the sun's ecliptic longitude. Default False. """ cols_added = ["eclipLat", "eclipLon"] def __init__( self, mjd_col="observationStartMJD", ra_col="fieldRA", dec_col="fieldDec", degrees=True, subtract_sun_lon=False, ): self.cols_req = [mjd_col, ra_col, dec_col] self.subtract_sun_lon = subtract_sun_lon self.degrees = degrees if self.degrees: self.units = ["degrees", "degrees"] else: self.units = ["radians", "radians"] self.mjd_col = mjd_col self.ra_col = ra_col self.dec_col = dec_col def _run(self, sim_data, cols_present=False): if cols_present: # Column already present in data; # assume it is correct and does not need recalculating. return sim_data for i in np.arange(sim_data.size): if self.degrees: coord = SkyCoord( ra=sim_data[self.ra_col] * u.degree, dec=sim_data[self.dec_col] * u.degree, ) else: coord = SkyCoord(ra=sim_data[self.ra_col] * u.rad, dec=sim_data[self.dec_col] * u.rad) coord_ecl = coord.geocentricmeanecliptic sim_data["eclipLat"] = coord_ecl.lat.rad if self.subtract_sun_lon: times = Time(sim_data[self.mjd_col]) sun = get_sun(times) sun_ecl = sun.geocentricmeanecliptic lon = wrap_ra(coord_ecl.lon.rad - sun_ecl.lon.rad) sim_data["eclipLon"] = lon else: sim_data["eclipLon"] = coord_ecl.lon.rad if self.degrees: sim_data["eclipLon"] = np.degrees(sim_data["eclipLon"]) sim_data["eclipLat"] = np.degrees(sim_data["eclipLat"]) return sim_data