__all__ = ("just_return", "SkyModel")
import warnings
import numpy as np
from astropy import units as u
from astropy.coordinates import AltAz, EarthLocation, SkyCoord, get_body, get_sun
from astropy.time import Time
from rubin_scheduler.utils import Site, _approx_alt_az2_ra_dec, _approx_ra_dec2_alt_az, haversine
from rubin_sim.phot_utils import Sed
from .interp_components import (
Airglow,
LowerAtm,
MergedSpec,
MoonInterp,
ScatteredStar,
TwilightInterp,
UpperAtm,
ZodiacalInterp,
)
from .utils import wrap_ra
[docs]
def just_return(inval):
"""
Really, just return the input.
Parameters
----------
input : anything
Returns
-------
input : anything
Just return whatever you sent in.
"""
return inval
def inrange(inval, minimum=-1.0, maximum=1.0):
"""
Make sure values are within min/max
"""
inval = np.array(inval)
below = np.where(inval < minimum)
inval[below] = minimum
above = np.where(inval > maximum)
inval[above] = maximum
return inval
def calc_az_rel_moon(azs, moon_az):
az_rel_moon = wrap_ra(azs - moon_az)
if isinstance(azs, np.ndarray):
over = np.where(az_rel_moon > np.pi)
az_rel_moon[over] = 2.0 * np.pi - az_rel_moon[over]
else:
if az_rel_moon > np.pi:
az_rel_moon = 2.0 * np.pi - az_rel_moon
return az_rel_moon
class SkyModel:
def __init__(
self,
observatory=None,
twilight=True,
zodiacal=True,
moon=True,
airglow=True,
lower_atm=False,
upper_atm=False,
scattered_star=False,
merged_spec=True,
mags=False,
precise_alt_az=False,
airmass_limit=3.0,
):
"""A model of the sky, including all of the required
template spectra or magnitudes needed to interpolate the
sky spectrum or magnitudes during twilight or night time
at any point on the sky.
Parameters
----------
observatory : `rubin_scheduler.site_models.Site`, optional
Default of None loads the LSST site.
twilight : `bool`, optional
Include twilight component (True)
zodiacal : `bool`, optional
Include zodiacal light component (True)
moon : `bool`, optional
Include scattered moonlight component (True)
airglow : `bool`, optional
Include airglow component
lower_atm : `bool`, optional
Include lower atmosphere component.
This component is part of `merged_spec`.
upper_atm : `bool`, optional
Include upper atmosphere component.
This component is part of `merged_spec`.
scattered_star : `bool`, optional
Include scattered starlight component.
This component is part of `merged_spec`.
merged_spec : `bool`, optional
Compute the lower_atm, upper_atm, and scattered_star
simultaneously since they are all functions of only airmass.
mags : `bool`, optional
By default, the sky model computes a 17,001 element spectrum.
If `mags` is True,
the model will return the LSST ugrizy magnitudes (in that order).
precise_alt_az : `bool`, optional
If False, use the fast alt, az to ra, dec coordinate
transformations that do not take aberation, diffraction, etc
into account. Results in errors up to ~1.5 degrees,
but an order of magnitude faster than the precise coordinate
transformations available in rubin_scheduler.utils.
airmass_limit : `float`, optional
Most of the models are only accurate to airmass 3.0.
If set higher, airmass values higher than 3.0 are set to 3.0.
"""
self.moon = moon
self.lower_atm = lower_atm
self.twilight = twilight
self.zodiacal = zodiacal
self.upper_atm = upper_atm
self.airglow = airglow
self.scattered_star = scattered_star
self.merged_spec = merged_spec
self.mags = mags
self.precise_alt_az = precise_alt_az
# set this as a way to track if coords have been set
self.azs = None
# Airmass limit.
self.airmass_limit = airmass_limit
if self.mags:
self.npix = 6
else:
self.npix = 11001
self.components = {
"moon": self.moon,
"lower_atm": self.lower_atm,
"twilight": self.twilight,
"upper_atm": self.upper_atm,
"airglow": self.airglow,
"zodiacal": self.zodiacal,
"scattered_star": self.scattered_star,
"merged_spec": self.merged_spec,
}
# Check that the merged component isn't being run with other components
merged_comps = [self.lower_atm, self.upper_atm, self.scattered_star]
for comp in merged_comps:
if comp & self.merged_spec:
warnings.warn("Adding component multiple times to the final output spectra.")
interpolators = {
"scattered_star": ScatteredStar,
"airglow": Airglow,
"lower_atm": LowerAtm,
"upper_atm": UpperAtm,
"merged_spec": MergedSpec,
"moon": MoonInterp,
"zodiacal": ZodiacalInterp,
"twilight": TwilightInterp,
}
# Load up the interpolation objects for each component
self.interp_objs = {}
for key in self.components:
if self.components[key]:
self.interp_objs[key] = interpolators[key](mags=self.mags)
if observatory is None:
self.telescope = Site("LSST")
else:
self.telescope = observatory
self.location = EarthLocation(
lat=self.telescope.latitude_rad * u.rad,
lon=self.telescope.longitude_rad * u.rad,
height=self.telescope.height * u.m,
)
# Note that observing conditions have not been set
self.params_set = False
def _init_points(self):
"""
Set up an array for all the interpolation points
"""
names = [
"airmass",
"nightTimes",
"alt",
"az",
"azRelMoon",
"moonSunSep",
"moonAltitude",
"altEclip",
"azEclipRelSun",
"sunAlt",
"azRelSun",
"solar_flux",
]
types = [float] * len(names)
self.points = np.zeros(self.npts, list(zip(names, types)))
def set_ra_dec_mjd(
self,
lon,
lat,
mjd,
degrees=False,
az_alt=False,
solar_flux=130.0,
filter_names=["u", "g", "r", "i", "z", "y"],
):
"""
Set the sky parameters by computing the sky conditions on a
given MJD and sky location.
Parameters
----------
lon : `float` or `np.ndarray`, (N,)
Longitude-like (RA or Azimuth).
Can be single number, list, or numpy array
lat: `float` or `np.ndarray`, (N,)
Latitude-like (Dec or Altitude)
mjd: `float`
Modified Julian Date for the calculation. Must be single number.
degrees: `bool`, optional
If True, lon/lat are in degrees. If False, lon/lat in radians.
az_alt: `bool`, optional
Assume lon, lat are RA, Dec unless az_alt=True
solar_flux: `float`
Solar flux in SFU Between 50 and 310. Default=130. 1 SFU=10^4 Jy.
filter_names: `list` [`str`]
List of filter for which to return magnitudes
(if initialized with mags=True).
"""
self.filter_names = filter_names
if self.mags:
self.npix = len(self.filter_names)
# Wrap in array just in case single points were passed
if np.size(lon) == 1:
lon = np.array([lon]).ravel()
lat = np.array([lat]).ravel()
else:
lon = np.array(lon)
lat = np.array(lat)
if degrees:
self.ra = np.radians(lon)
self.dec = np.radians(lat)
else:
self.ra = lon
self.dec = lat
if np.size(mjd) > 1:
raise ValueError("mjd must be single value.")
self.mjd = mjd
if az_alt:
self.azs = self.ra.copy()
self.alts = self.dec.copy()
if self.precise_alt_az:
raise ValueError("Need to swap in astropy")
# self.ra, self.dec = _ra_dec_from_alt_az(
# self.alts,
# self.azs,
# ObservationMetaData(mjd=self.mjd, site=self.telescope),
# )
else:
self.ra, self.dec = _approx_alt_az2_ra_dec(
self.alts,
self.azs,
self.telescope.latitude_rad,
self.telescope.longitude_rad,
mjd,
)
else:
if self.precise_alt_az:
raise ValueError("Need to swap in astropy")
# self.alts, self.azs, pa = _alt_az_pa_from_ra_dec(
# self.ra,
# self.dec,
# ObservationMetaData(mjd=self.mjd, site=self.telescope),
# )
else:
self.alts, self.azs = _approx_ra_dec2_alt_az(
self.ra,
self.dec,
self.telescope.latitude_rad,
self.telescope.longitude_rad,
mjd,
)
self.npts = self.ra.size
self._init_points()
self.solar_flux = solar_flux
self.points["solar_flux"] = self.solar_flux
self._setup_point_grid()
self.params_set = True
# Interpolate the templates to the set Parameters
self.good_pix = np.where((self.airmass <= self.airmass_limit) & (self.airmass >= 1.0))[0]
if self.good_pix.size <= 0:
raise ValueError(
"No valid points. Airmass limit=%.1f, min airmass of requested points=%.1f"
% (self.airmass_limit, np.min(self.airmass))
)
else:
self._interp_sky()
def set_ra_dec_alt_az_mjd(
self,
ra,
dec,
alt,
az,
mjd,
degrees=False,
solar_flux=130.0,
filter_names=["u", "g", "r", "i", "z", "y"],
):
"""
Set the sky parameters by computing the sky conditions on a
given MJD and sky location.
Use if you already have alt az coordinates so you can skip the
coordinate conversion.
"""
self.filter_names = filter_names
if self.mags:
self.npix = len(self.filter_names)
# Wrap in array just in case single points were passed
if not type(ra).__module__ == np.__name__:
if np.size(ra) == 1:
ra = np.array([ra]).ravel()
dec = np.array([dec]).ravel()
alt = np.array(alt).ravel()
az = np.array(az).ravel()
else:
ra = np.array(ra)
dec = np.array(dec)
alt = np.array(alt)
az = np.array(az)
if degrees:
self.ra = np.radians(ra)
self.dec = np.radians(dec)
self.alts = np.radians(alt)
self.azs = np.radians(az)
else:
self.ra = ra
self.dec = dec
self.azs = az
self.alts = alt
if np.size(mjd) > 1:
raise ValueError("mjd must be single value.")
self.mjd = mjd
self.npts = self.ra.size
self._init_points()
self.solar_flux = solar_flux
self.points["solar_flux"] = self.solar_flux
self._setup_point_grid()
self.params_set = True
# Interpolate the templates to the set Parameters
self.good_pix = np.where((self.airmass <= self.airmass_limit) & (self.airmass >= 1.0))[0]
if self.good_pix.size <= 0:
raise ValueError(
"No valid points. Airmass limit=%.1f, min airmass of requested points=%.1f"
% (self.airmass_limit, np.min(self.airmass))
)
else:
self._interp_sky()
def get_computed_vals(self):
"""
Return the intermediate values that are caluculated by
set_ra_dec_mjd and used for interpolation.
All of these values are also accessible as class attributes, this is
a convenience method to grab them all at once and document the formats.
Returns
-------
out : `dict`
Dictionary of all the intermediate calculated values that may
be of use outside (the key:values in the output dict)
ra : `np.ndarray`, (N,)
RA of the interpolation points (radians)
dec : `np.ndarray`, (N,)
Dec of the interpolation points (radians)
alts : `np.ndarray`, (N,)
Altitude (radians)
azs : `np.ndarray`, (N,)
Azimuth of interpolation points (radians)
airmass : `np.ndarray`, (N,)
Airmass values for each point,
computed via 1./np.cos(np.pi/2.-self.alts).
solar_flux : `float`
The solar flux used (SFU).
sunAz : `float`
Azimuth of the sun (radians)
sunAlt : `float`
Altitude of the sun (radians)
sunRA : `float`
RA of the sun (radians)
sunDec : `float`
Dec of the sun (radians)
azRelSun : `np.ndarray`, (N,)
Azimuth of each point relative to the sun
(0=same direction as sun) (radians)
moonAz : `float`
Azimuth of the moon (radians)
moonAlt : `float`
Altitude of the moon (radians)
moonRA : `float`
RA of the moon (radians)
moonDec : `float`
Dec of the moon (radians). Note, if you want distances
moon_phase : `float`
Phase of the moon (0-100)
moonSunSep : `float`
Seperation of moon and sun (radians)
azRelMoon : `np.ndarray`, (N,)
Azimuth of each point relative to teh moon
eclipLon : `np.ndarray`, (N,)
Ecliptic longitude (radians) of each point
eclipLat : `np.ndarray`, (N,)
Ecliptic latitude (radians) of each point
sunEclipLon: `np.ndarray`, (N,)
Ecliptic longitude (radians) of each point with the sun at
longitude zero
Note that since the alt and az can be calculated using the fast
approximation, if one wants to compute the distance between the points
and the sun or moon, it is probably better to use the ra,dec positions
rather than the alt,az positions.
"""
result = {}
attributes = [
"ra",
"dec",
"alts",
"azs",
"airmass",
"solar_flux",
"moon_phase",
"moon_az",
"moon_alt",
"sun_alt",
"sun_az",
"az_rel_sun",
"moon_sun_sep",
"az_rel_moon",
"eclip_lon",
"eclip_lat",
"moon_ra",
"moon_dec",
"sun_ra",
"sun_dec",
"sun_eclip_lon",
]
for attribute in attributes:
if hasattr(self, attribute):
result[attribute] = getattr(self, attribute)
else:
result[attribute] = None
return result
def _setup_point_grid(self):
"""
Setup the points for the interpolation functions.
"""
time = Time(self.mjd, format="mjd")
aa = AltAz(location=self.location, obstime=time)
sun_coords = get_sun(time)
self.sun_ra = sun_coords.ra.rad
self.sun_dec = sun_coords.dec.rad
sun_coords = sun_coords.transform_to(aa)
self.sun_alt = sun_coords.alt.rad
self.sun_az = sun_coords.az.rad
# Compute airmass the same way as ESO model
self.airmass = 1.0 / np.cos(np.pi / 2.0 - self.alts)
self.points["airmass"] = self.airmass
self.points["nightTimes"] = 0
self.points["alt"] = self.alts
self.points["az"] = self.azs
if self.twilight:
self.points["sunAlt"] = self.sun_alt
self.az_rel_sun = wrap_ra(self.azs - self.sun_az)
self.points["azRelSun"] = self.az_rel_sun
if self.moon:
moon_coords = get_body("moon", time)
self.moon_ra = moon_coords.ra.rad
self.moon_dec = moon_coords.dec.rad
moon_coords = moon_coords.transform_to(aa)
self.moon_alt = moon_coords.alt.rad
self.moon_az = moon_coords.az.rad
moon_coords = get_body("moon", time)
sun_coords = get_sun(time)
sep = sun_coords.separation(moon_coords)
# looks like phase is 0-100
self.moon_phase = sep.deg * 100 / 180.0
# Calc azimuth relative to moon
self.az_rel_moon = calc_az_rel_moon(self.azs, self.moon_az)
self.moon_targ_sep = haversine(self.azs, self.alts, self.moon_az, self.moon_alt)
# Oof, looks like some things were stored as degrees.
self.points["moonAltitude"] += np.degrees(self.moon_alt)
self.points["azRelMoon"] += self.az_rel_moon
self.moon_sun_sep = sep.rad
self.points["moonSunSep"] += np.degrees(self.moon_sun_sep)
if self.zodiacal:
self.eclip_lon = np.zeros(self.npts)
self.eclip_lat = np.zeros(self.npts)
coord = SkyCoord(ra=self.ra * u.rad, dec=self.dec * u.rad)
coord_ecl = coord.geocentricmeanecliptic
self.eclip_lon = coord_ecl.lon.rad
self.eclip_lat = coord_ecl.lat.rad
# Subtract off the sun ecliptic longitude
sun_coords = get_sun(time)
sun_eclip = sun_coords.geocentricmeanecliptic
self.sun_eclip_lon = sun_eclip.lon.rad
self.points["altEclip"] += self.eclip_lat
self.points["azEclipRelSun"] += wrap_ra(self.eclip_lon - self.sun_eclip_lon)
self.mask = np.where((self.airmass > self.airmass_limit) | (self.airmass < 1.0))[0]
self.good_pix = np.where((self.airmass <= self.airmass_limit) & (self.airmass >= 1.0))[0]
def set_params(
self,
airmass=1.0,
azs=90.0,
alts=None,
moon_phase=31.67,
moon_alt=45.0,
moon_az=0.0,
sun_alt=-12.0,
sun_az=0.0,
sun_eclip_lon=0.0,
eclip_lon=135.0,
eclip_lat=90.0,
degrees=True,
solar_flux=130.0,
filter_names=["u", "g", "r", "i", "z", "y"],
):
"""
Set parameters manually.
Note, you can put in unphysical combinations of Parameters if you
want to (e.g., put a full moon at zenith at sunset).
If the alts kwarg is set it will override the airmass kwarg.
MoonPhase is percent of moon illuminated (0-100)
"""
# Convert all values to radians for internal use.
self.filter_names = filter_names
if self.mags:
self.npix = len(self.filter_names)
if degrees:
convert_func = np.radians
else:
convert_func = just_return
self.solar_flux = solar_flux
self.sun_alt = convert_func(sun_alt)
self.moon_phase = moon_phase
self.moon_alt = convert_func(moon_alt)
self.moon_az = convert_func(moon_az)
self.eclip_lon = convert_func(eclip_lon)
self.eclip_lat = convert_func(eclip_lat)
self.sun_eclip_lon = convert_func(sun_eclip_lon)
self.azs = convert_func(azs)
if alts is not None:
self.airmass = 1.0 / np.cos(np.pi / 2.0 - convert_func(alts))
self.alts = convert_func(alts)
else:
self.airmass = airmass
self.alts = np.pi / 2.0 - np.arccos(1.0 / airmass)
self.moon_targ_sep = haversine(self.azs, self.alts, moon_az, self.moon_alt)
self.npts = np.size(self.airmass)
self._init_points()
self.points["airmass"] = self.airmass
self.points["nightTimes"] = 0
self.points["alt"] = self.alts
self.points["az"] = self.azs
self.az_rel_moon = calc_az_rel_moon(self.azs, self.moon_az)
self.points["moonAltitude"] += np.degrees(self.moon_alt)
self.points["azRelMoon"] = self.az_rel_moon
self.moon_sun_sep = self.moon_phase / 100.0 * np.pi
self.points["moonSunSep"] += self.moon_phase / 100.0 * 180.0
self.eclip_lon = convert_func(eclip_lon)
self.eclip_lat = convert_func(eclip_lat)
self.sun_eclip_lon = convert_func(sun_eclip_lon)
self.points["altEclip"] += self.eclip_lat
self.points["azEclipRelSun"] += wrap_ra(self.eclip_lon - self.sun_eclip_lon)
self.sun_az = convert_func(sun_az)
self.points["sunAlt"] = self.sun_alt
self.points["azRelSun"] = wrap_ra(self.azs - self.sun_az)
self.points["solar_flux"] = solar_flux
self.params_set = True
self.mask = np.where((self.airmass > self.airmass_limit) | (self.airmass < 1.0))[0]
self.good_pix = np.where((self.airmass <= self.airmass_limit) & (self.airmass >= 1.0))[0]
# Interpolate the templates to the set Parameters
if self.good_pix.size > 0:
self._interp_sky()
else:
warnings.warn("No points in interpolation range")
def _interp_sky(self):
"""
Interpolate the template spectra to the set RA, Dec and MJD.
the results are stored as attributes of the class:
.wave = the wavelength in nm
.spec = array of spectra with units of ergs/s/cm^2/nm
"""
if not self.params_set:
raise ValueError(
"No parameters have been set. Must run set_ra_dec_mjd or setParams before running interpSky."
)
# set up array to hold the resulting spectra for each ra, dec point.
self.spec = np.zeros((self.npts, self.npix), dtype=float)
# Rebuild the components dict so things can be turned on/off
self.components = {
"moon": self.moon,
"lower_atm": self.lower_atm,
"twilight": self.twilight,
"upper_atm": self.upper_atm,
"airglow": self.airglow,
"zodiacal": self.zodiacal,
"scattered_star": self.scattered_star,
"merged_spec": self.merged_spec,
}
# Loop over each component and add it to the result.
mask = np.ones(self.npts)
for key in self.components:
if self.components[key]:
result = self.interp_objs[key](self.points[self.good_pix], filter_names=self.filter_names)
# Make sure the component has something
if np.size(result["spec"]) == 0:
self.spec[self.mask, :] = np.nan
return
if np.max(result["spec"]) > 0:
mask[np.where(np.sum(result["spec"], axis=1) == 0)] = 0
self.spec[self.good_pix] += result["spec"]
if not hasattr(self, "wave"):
self.wave = result["wave"]
else:
if not np.allclose(result["wave"], self.wave, rtol=1e-4, atol=1e-4):
warnings.warn("Wavelength arrays of components do not match.")
if self.airmass_limit <= 2.5:
self.spec[np.where(mask == 0), :] = 0
self.spec[self.mask, :] = np.nan
def return_wave_spec(self):
"""
Return the wavelength and spectra.
Wavelenth in nm
spectra is flambda in ergs/cm^2/s/nm
"""
if self.azs is None:
raise ValueError(
"No coordinates set. Use set_ra_dec_mjd, setRaDecAltAzMjd, or "
"setParams methods before calling returnWaveSpec."
)
if self.mags:
raise ValueError("SkyModel set to interpolate magnitudes. Initialize object with mags=False")
# Mask out high airmass points
# self.spec[self.mask] *= 0
return self.wave.copy(), self.spec.copy()
def return_mags(self, bandpasses=None):
"""Return the skybrightness in magnitudes.
Convert the computed spectra to a magnitude using the
supplied bandpass, or, if self.mags=True, return the mags in the
LSST filters.
Parameters
----------
bandpasses : `dict` [`str`, `rubin_sim.phot_utils.Bandpass`], optional
Dictionary with bandpass name as keys and `Bandpass` objects
as values.
If mags=True when initialized, return mags returns a structured array
with dtype names u,g,r,i,z,y; the default LSST bandpasses are used.
Returns
-------
mags : `np.ndarray`, (N,)
Sky brightness in AB mags/sq arcsec
"""
if self.azs is None:
raise ValueError(
"No coordinates set. Use set_ra_dec_mjd, setRaDecAltAzMjd, or "
"setParams methods before calling return_mags."
)
if self.mags:
if bandpasses:
warnings.warn("Ignoring set bandpasses and returning LSST ugrizy.")
mags = -2.5 * np.log10(self.spec) + np.log10(3631.0)
# Mask out high airmass
mags[self.mask] *= np.nan
mags = mags.swapaxes(0, 1)
mags_back = {}
for i, f in enumerate(self.filter_names):
mags_back[f] = mags[i]
else:
mags_back = {}
for key in bandpasses:
mags = np.zeros(self.npts, dtype=float) - 666
temp_sed = Sed()
is_through = np.where(bandpasses[key].sb > 0)
min_wave = bandpasses[key].wavelen[is_through].min()
max_wave = bandpasses[key].wavelen[is_through].max()
in_band = np.where((self.wave >= min_wave) & (self.wave <= max_wave))
for i, ra in enumerate(self.ra):
# Check that there is flux in the band,
# otherwise calc_mag fails
if np.max(self.spec[i, in_band]) > 0:
temp_sed.set_sed(self.wave, flambda=self.spec[i, :])
mags[i] = temp_sed.calc_mag(bandpasses[key])
# Mask out high airmass
mags[self.mask] *= np.nan
mags_back[key] = mags
return mags_back