Source code for rubin_sim.moving_objects.cheby_values

__all__ = ("ChebyValues",)

import os

import numpy as np
import pandas as pd

from .chebyshev_utils import chebeval


[docs] class ChebyValues: """Calculates positions, velocities, deltas, vmags and elongations, given a series of coefficients generated by ChebyFits. """ def __init__(self): self.coeffs = {} self.coeff_keys = [ "obj_id", "t_start", "t_end", "ra", "dec", "geo_dist", "vmag", "elongation", ] self.ephemeris_keys = [ "ra", "dradt", "dec", "ddecdt", "geo_dist", "vmag", "elongation", ]
[docs] def set_coefficients(self, cheby_fits): """Set coefficients using a ChebyFits object. (which contains a dictionary of obj_id, t_start, t_end, ra, dec, delta, vmag, and elongation lists). Parameters ---------- cheby_fits : `rubin_sim.movingObjects.chebyFits` ChebyFits object, with attribute 'coeffs' - a dictionary of lists of coefficients. """ self.coeffs = cheby_fits.coeffs # Convert list of coefficients into numpy arrays. for k in self.coeffs: self.coeffs[k] = np.array(self.coeffs[k]) # Check that expected values were received. missing_keys = set(self.coeff_keys) - set(self.coeffs) if len(missing_keys) > 0: raise ValueError("Expected to find key(s) %s in coefficients." % " ".join(list[missing_keys])) self.coeffs["meanRA"] = self.coeffs["ra"].swapaxes(0, 1)[0] self.coeffs["meanDec"] = self.coeffs["dec"].swapaxes(0, 1)[0]
[docs] def read_coefficients(self, cheby_fits_file): """Read coefficients from output file written by ChebyFits. Parameters ---------- cheby_fits_file : `str` The filename of the coefficients file. """ if not os.path.isfile(cheby_fits_file): raise IOError("Could not find cheby_fits_file at %s" % (cheby_fits_file)) # Read the coefficients file. coeffs = pd.read_table(cheby_fits_file, sep=r"\s+") # The header line provides information on the number of # coefficients for each parameter. datacols = coeffs.columns.values cols = {} coeff_cols = ["ra", "dec", "geo_dist", "vmag", "elongation"] for k in coeff_cols: cols[k] = [x for x in datacols if x.startswith(k)] # Translate dataframe to dictionary of numpy arrays # while consolidating RA/Dec/Delta/Vmag/Elongation coeffs. self.coeffs["obj_id"] = coeffs.obj_id.values self.coeffs["t_start"] = coeffs.t_start.values self.coeffs["t_end"] = coeffs.t_end.values for k in coeff_cols: self.coeffs[k] = np.empty([len(cols[k]), len(coeffs)], float) for i in range(len(cols[k])): self.coeffs[k][i] = coeffs["%s_%d" % (k, i)].values # Add the mean RA and Dec columns # (before swapping the coefficients axes). self.coeffs["meanRA"] = self.coeffs["ra"][0] self.coeffs["meanDec"] = self.coeffs["dec"][0] # Swap the coefficient axes so that they are [segment, coeff]. for k in coeff_cols: self.coeffs[k] = self.coeffs[k].swapaxes(0, 1)
def _eval_segment(self, segment_idx, times, subset_segments=None, mask=True): """Evaluate the ra/dec/delta/vmag/elongation values for a given segment at a series of times. Parameters ---------- segment_idx : `int` The index in (each of) self.coeffs for the segment. e.g. the first segment, for each object. times : `np.ndarray` The times at which to evaluate the segment. subset_segments : `np.ndarray`, optional Optionally specify a subset of the total segment indexes. This lets you pick out particular obj_ids. mask : `bool`, optional If True, returns NaNs for values outside the range of times in the segment. If False, extrapolates segment for times outside the segment time range. Returns ------- ephemeris : `dict` Dictionary of RA, Dec, delta, vmag, and elongation values for the segment indicated, at the time indicated. """ if subset_segments is None: subset_segments = np.ones(len(self.coeffs["obj_id"]), dtype=bool) t_start = self.coeffs["t_start"][subset_segments][segment_idx] t_end = self.coeffs["t_end"][subset_segments][segment_idx] t_scaled = times - t_start t_interval = np.array([t_start, t_end]) - t_start # Evaluate RA/Dec/Delta/Vmag/elongation. ephemeris = {} ephemeris["ra"], ephemeris["dradt"] = chebeval( t_scaled, self.coeffs["ra"][subset_segments][segment_idx], interval=t_interval, do_velocity=True, mask=mask, ) ephemeris["dec"], ephemeris["ddecdt"] = chebeval( t_scaled, self.coeffs["dec"][subset_segments][segment_idx], interval=t_interval, do_velocity=True, mask=mask, ) ephemeris["dradt"] = ephemeris["dradt"] * np.cos(np.radians(ephemeris["dec"])) for k in ("geo_dist", "vmag", "elongation"): ephemeris[k], _ = chebeval( t_scaled, self.coeffs[k][subset_segments][segment_idx], interval=t_interval, do_velocity=False, mask=mask, ) return ephemeris
[docs] def get_ephemerides(self, times, obj_ids=None, extrapolate=False): """Find the ephemeris information for 'obj_ids' at 'time'. Implicit in how this is currently written is that the segments are all expected to cover the same start/end time range across all objects. They do not have to have the same segment length for all objects. Parameters ---------- times : `float` or `np.ndarray` The time to calculate ephemeris positions. obj_ids : `np.ndarray`, opt The object ids for which to generate ephemerides. If None, then just uses all objects. extrapolate : `bool`, opt If True, extrapolate beyond ends of segments if time outside of segment range. If False, return ValueError if time is beyond range of segments. Returns ------- ephemerides : `np.ndarray` The ephemeris positions for all objects. Note that these may not be sorted in the same order as obj_ids. """ if isinstance(times, float) or isinstance(times, int): times = np.array([times], float) ntimes = len(times) ephemerides = {} # Find subset of segments which match obj_id, if specified. if obj_ids is None: obj_match = np.ones(len(self.coeffs["obj_id"]), dtype=bool) ephemerides["obj_id"] = np.unique(self.coeffs["obj_id"]) else: if isinstance(obj_ids, str) or isinstance(obj_ids, int): obj_ids = np.array([obj_ids]) obj_match = np.isin(self.coeffs["obj_id"], obj_ids) ephemerides["obj_id"] = obj_ids # Now find ephemeris values. ephemerides["time"] = np.zeros((len(ephemerides["obj_id"]), ntimes), float) + times for k in self.ephemeris_keys: ephemerides[k] = np.zeros((len(ephemerides["obj_id"]), ntimes), float) for it, t in enumerate(times): # Find subset of segments which contain the appropriate time. # Look for simplest subset first. segments = np.where( (self.coeffs["t_start"][obj_match] <= t) & (self.coeffs["t_end"][obj_match] > t) )[0] if len(segments) == 0: seg_start = self.coeffs["t_start"][obj_match].min() seg_end = self.coeffs["t_end"][obj_match].max() if seg_start > t or seg_end < t: if not extrapolate: for k in self.ephemeris_keys: ephemerides[k][:, it] = np.nan else: # Find the segments to use to extrapolate the times. if seg_start > t: segments = np.where(self.coeffs["t_start"][obj_match] == seg_start)[0] if seg_end < t: segments = np.where(self.coeffs["t_end"][obj_match] == seg_end)[0] elif seg_end == t: # Not extrapolating, but outside the # simple match case above. segments = np.where(self.coeffs["t_end"][obj_match] == seg_end)[0] for i, segmentIdx in enumerate(segments): ephemeris = self._eval_segment(segmentIdx, t, obj_match, mask=False) for k in self.ephemeris_keys: ephemerides[k][i][it] = ephemeris[k] ephemerides["obj_id"][i] = self.coeffs["obj_id"][obj_match][segmentIdx] if obj_ids is not None: if set(ephemerides["obj_id"]) != set(obj_ids): raise ValueError( "Did not find expected match between obj_ids provided and ephemeride obj_ids." ) return ephemerides