Source code for rubin_sim.selfcal.generate_catalog

__all__ = ("generate_catalog",)

import sys
import warnings

import numpy as np
import numpy.lib.recfunctions as rfn
from scipy.spatial import cKDTree as kdtree  # noqa: N813

from .offsets import OffsetSNR
from .star_tools import assign_patches, stars_project


def wrap_ra(ra):
    """Wraps RA into 0-360 degrees."""
    ra = ra % 360.0
    return ra


def cap_dec(dec):
    """Terminates declination at +/- 90 degrees."""
    dec = np.where(dec > 90, 90, dec)
    dec = np.where(dec < -90, -90, dec)
    return dec


def treexyz(ra, dec):
    """Calculate x/y/z values for ra/dec points, ra/dec in radians."""
    # Note ra/dec can be arrays.
    x = np.cos(dec) * np.cos(ra)
    y = np.cos(dec) * np.sin(ra)
    z = np.sin(dec)
    return x, y, z


def build_tree(ra, dec, leafsize=100):
    """Build KD tree on RA/dec and set radius (via setRad) for matching.

    Parameters
    ----------
    ra : `nd.ndarray`, (N,)
        RA values of the tree (in radians)
    dec : `nd.ndarray`, (N,)
        Dec values of the tree (in radians).
    leafsize : `float`, opt
        The number of RA/Dec pointings in each leafnode.
        Default 100.
    """
    if np.any(np.abs(ra) > np.pi * 2.0) or np.any(np.abs(dec) > np.pi * 2.0):
        raise ValueError("Expecting RA and Dec values to be in radians.")
    x, y, z = treexyz(ra, dec)
    data = list(zip(x, y, z))
    if np.size(data) > 0:
        star_tree = kdtree(data, leafsize=leafsize)
    else:
        raise ValueError("ra and dec should have length greater than 0.")
    return star_tree


[docs] def generate_catalog( visits, stars_array, offsets=None, lsst_filter="r", n_patches=16, radius_fov=1.8, seed=42, uncert_floor=0.005, verbose=True, ): """ Generate a catalog of observed stellar magnitudes. Parameters ---------- visits : `np.array`, (N,) A numpy array with the properties of the visits. Expected columns of fiveSigmaDepth, ra, dec, rotSkyPos (all degrees) offsets : `list` of rubin_sim.selfcal.Offset classes A list of instatiated classes that will apply offsets to the stars lsst_filter : `str` Which filter to use for the observed stars. n_patches : `int` Number of patches to divide the FoV into. Must be an integer squared radius_fov : `float` Radius of the telescope field of view in degrees seed : `float` Random number seed uncert_floor : `float` Value to add in quadrature to magnitude uncertainties (mags) verbose : `bool` Should we be verbose """ if offsets is None: # Maybe change this to just run with a default SNR offset warnings.warn("Warning, no offsets set, returning without running") return # For computing what the 'expected' uncertainty on the observation will be mag_uncert = OffsetSNR(lsst_filter=lsst_filter) # set the radius for the kdtree x0, y0, z0 = (1, 0, 0) x1, y1, z1 = treexyz(np.radians(radius_fov), 0) tree_radius = np.sqrt((x1 - x0) ** 2 + (y1 - y0) ** 2 + (z1 - z0) ** 2) newcols = [ "x", "y", "radius", "patch_id", "sub_patch", "observed_mag", "mag_uncert", ] newtypes = [float, float, float, int, int, float, float] stars_new = np.zeros(stars_array.size, dtype=list(zip(newcols, newtypes))) # only need to output these columns (saving rmag+ for now for convienence) output_cols = [ "id", "patch_id", "observed_mag", "mag_uncert", "%smag" % lsst_filter, "ra", "decl", ] output_dtypes = [int, int, float, float, float, float, float] stars = rfn.merge_arrays([stars_array, stars_new], flatten=True) # Build a KDTree for the stars star_tree = build_tree(np.radians(stars["ra"]), np.radians(stars["decl"])) # XXX--maybe update the way seeding is going on np.random.seed(seed) list_of_observed_arrays = [] n_visits = np.size(visits) for i, visit in enumerate(visits): dmags = {} # Calc x,y, radius for each star, crop off stars outside the FoV # could replace with code to see where each star falls and get chipID. vx, vy, vz = treexyz(np.radians(visit["ra"]), np.radians(visit["dec"])) indices = star_tree.query_ball_point((vx, vy, vz), tree_radius) stars_in = stars[indices] stars_in = stars_project(stars_in, visit) # Assign patchIDs stars_in = assign_patches(stars_in, visit, n_patches=n_patches) # Apply the offsets that have been configured for offset in offsets: dmags[offset.newkey] = offset(stars_in, visit, dmags=dmags) # Total up all the dmag's to make the observed magnitude keys = list(dmags.keys()) obs_mag = stars_in["%smag" % lsst_filter].copy() for key in keys: obs_mag += dmags[key] # Calculate the uncertainty in the observed mag: mag_err = ( mag_uncert.calc_mag_errors(obs_mag, err_only=True, m5=visit["fiveSigmaDepth"]) ** 2 + uncert_floor**2 ) ** 0.5 # put values into the right columns stars_in["observed_mag"] = obs_mag stars_in["mag_uncert"] = mag_err # Should shrink this down so we only return the needed columns # observed_mag, mag_uncert, patchid, star_id sub_cols = np.empty(stars_in.size, dtype=list(zip(output_cols, output_dtypes))) for key in output_cols: sub_cols[key] = stars_in[key] list_of_observed_arrays.append(sub_cols) if verbose: progress = i / n_visits * 100 text = "\rprogress = %.2f%%" % progress sys.stdout.write(text) sys.stdout.flush() result = np.concatenate(list_of_observed_arrays) return result