__all__ = ("generate_catalog",)
import sys
import warnings
import numpy as np
import numpy.lib.recfunctions as rfn
from scipy.spatial import cKDTree as kdtree # noqa: N813
from .offsets import OffsetSNR
from .star_tools import assign_patches, stars_project
def wrap_ra(ra):
"""Wraps RA into 0-360 degrees."""
ra = ra % 360.0
return ra
def cap_dec(dec):
"""Terminates declination at +/- 90 degrees."""
dec = np.where(dec > 90, 90, dec)
dec = np.where(dec < -90, -90, dec)
return dec
def treexyz(ra, dec):
"""Calculate x/y/z values for ra/dec points, ra/dec in radians."""
# Note ra/dec can be arrays.
x = np.cos(dec) * np.cos(ra)
y = np.cos(dec) * np.sin(ra)
z = np.sin(dec)
return x, y, z
def build_tree(ra, dec, leafsize=100):
"""Build KD tree on RA/dec and set radius (via setRad) for matching.
Parameters
----------
ra : `nd.ndarray`, (N,)
RA values of the tree (in radians)
dec : `nd.ndarray`, (N,)
Dec values of the tree (in radians).
leafsize : `float`, opt
The number of RA/Dec pointings in each leafnode.
Default 100.
"""
if np.any(np.abs(ra) > np.pi * 2.0) or np.any(np.abs(dec) > np.pi * 2.0):
raise ValueError("Expecting RA and Dec values to be in radians.")
x, y, z = treexyz(ra, dec)
data = list(zip(x, y, z))
if np.size(data) > 0:
star_tree = kdtree(data, leafsize=leafsize)
else:
raise ValueError("ra and dec should have length greater than 0.")
return star_tree
[docs]
def generate_catalog(
visits,
stars_array,
offsets=None,
lsst_filter="r",
n_patches=16,
radius_fov=1.8,
seed=42,
uncert_floor=0.005,
verbose=True,
):
"""
Generate a catalog of observed stellar magnitudes.
Parameters
----------
visits : `np.array`, (N,)
A numpy array with the properties of the visits.
Expected columns of fiveSigmaDepth, ra, dec, rotSkyPos (all degrees)
offsets : `list` of rubin_sim.selfcal.Offset classes
A list of instatiated classes that will apply offsets to the stars
lsst_filter : `str`
Which filter to use for the observed stars.
n_patches : `int`
Number of patches to divide the FoV into. Must be an integer squared
radius_fov : `float`
Radius of the telescope field of view in degrees
seed : `float`
Random number seed
uncert_floor : `float`
Value to add in quadrature to magnitude uncertainties (mags)
verbose : `bool`
Should we be verbose
"""
if offsets is None:
# Maybe change this to just run with a default SNR offset
warnings.warn("Warning, no offsets set, returning without running")
return
# For computing what the 'expected' uncertainty on the observation will be
mag_uncert = OffsetSNR(lsst_filter=lsst_filter)
# set the radius for the kdtree
x0, y0, z0 = (1, 0, 0)
x1, y1, z1 = treexyz(np.radians(radius_fov), 0)
tree_radius = np.sqrt((x1 - x0) ** 2 + (y1 - y0) ** 2 + (z1 - z0) ** 2)
newcols = [
"x",
"y",
"radius",
"patch_id",
"sub_patch",
"observed_mag",
"mag_uncert",
]
newtypes = [float, float, float, int, int, float, float]
stars_new = np.zeros(stars_array.size, dtype=list(zip(newcols, newtypes)))
# only need to output these columns (saving rmag+ for now for convienence)
output_cols = [
"id",
"patch_id",
"observed_mag",
"mag_uncert",
"%smag" % lsst_filter,
"ra",
"decl",
]
output_dtypes = [int, int, float, float, float, float, float]
stars = rfn.merge_arrays([stars_array, stars_new], flatten=True)
# Build a KDTree for the stars
star_tree = build_tree(np.radians(stars["ra"]), np.radians(stars["decl"]))
# XXX--maybe update the way seeding is going on
np.random.seed(seed)
list_of_observed_arrays = []
n_visits = np.size(visits)
for i, visit in enumerate(visits):
dmags = {}
# Calc x,y, radius for each star, crop off stars outside the FoV
# could replace with code to see where each star falls and get chipID.
vx, vy, vz = treexyz(np.radians(visit["ra"]), np.radians(visit["dec"]))
indices = star_tree.query_ball_point((vx, vy, vz), tree_radius)
stars_in = stars[indices]
stars_in = stars_project(stars_in, visit)
# Assign patchIDs
stars_in = assign_patches(stars_in, visit, n_patches=n_patches)
# Apply the offsets that have been configured
for offset in offsets:
dmags[offset.newkey] = offset(stars_in, visit, dmags=dmags)
# Total up all the dmag's to make the observed magnitude
keys = list(dmags.keys())
obs_mag = stars_in["%smag" % lsst_filter].copy()
for key in keys:
obs_mag += dmags[key]
# Calculate the uncertainty in the observed mag:
mag_err = (
mag_uncert.calc_mag_errors(obs_mag, err_only=True, m5=visit["fiveSigmaDepth"]) ** 2
+ uncert_floor**2
) ** 0.5
# put values into the right columns
stars_in["observed_mag"] = obs_mag
stars_in["mag_uncert"] = mag_err
# Should shrink this down so we only return the needed columns
# observed_mag, mag_uncert, patchid, star_id
sub_cols = np.empty(stars_in.size, dtype=list(zip(output_cols, output_dtypes)))
for key in output_cols:
sub_cols[key] = stars_in[key]
list_of_observed_arrays.append(sub_cols)
if verbose:
progress = i / n_visits * 100
text = "\rprogress = %.2f%%" % progress
sys.stdout.write(text)
sys.stdout.flush()
result = np.concatenate(list_of_observed_arrays)
return result