__all__ = ("ChebyFits",)
import os
import warnings
import numpy as np
from .chebyshev_utils import chebfit, make_cheb_matrix, make_cheb_matrix_only_x
from .ooephemerides import PyOrbEphemerides, get_oorb_data_dir
from .orbits import Orbits
def three_sixty_to_neg(ra):
"""Wrap discontiguous RA values into more-contiguous results."""
if (ra.min() < 100) and (ra.max() > 270):
ra = np.where(ra > 270, ra - 360, ra)
return ra
[docs]
class ChebyFits:
"""Generates chebyshev coefficients for a provided set of orbits.
Calculates true ephemerides using PyEphemerides, then fits these
positions with a constrained Chebyshev Polynomial, using the routines
in chebyshevUtils.py.
Parameters
----------
orbits_obj : `rubin_sim.moving_objects.Orbits`
The orbits for which to fit chebyshev polynomial coefficients.
t_start : `float`
The starting point in time to fit coefficients. MJD.
t_span : `float`
The time span (starting at t_start) over which to fit coefficients
(Days).
time_scale : `str`, optional
One of {'TAI', 'UTC', 'TT'}
The timescale of the MJD time, t_start, and the time_scale
that should be used with the chebyshev coefficients.
obsCode : `int`, optional
The observatory code of the location for which to generate
ephemerides. Default I11 (Cerro Pachon).
sky_tolerance : `float`, optional
The desired tolerance in mas between ephemerides calculated by
OpenOrb and fitted values.
Default 2.5 mas.
nCoeff_position : `int`, optional
The number of Chebyshev coefficients to fit for the RA/Dec positions.
Default 14.
nCoeff_vmag : `int`, optional
The number of Chebyshev coefficients to fit for the V magnitude values.
Default 9.
nCoeff_delta : `int`, optional
The number of Chebyshev coefficients to fit for the distance
between Earth/Object. Default 5.
nCoeff_elongation : `int`, optional
The number of Chebyshev coefficients to fit for the solar
elongation. Default 5.
ngran : `int`, optional
The number of ephemeris points within each Chebyshev
polynomial segment. Default 64.
eph_file : `str`, optional
The path to the JPL ephemeris file to use.
Default is '$OORB_DATA/de405.dat'.
n_decimal : `int`, optional
The number of decimal places to allow in the segment length
(and thus the times of the endpoints) can be limited to
n_decimal places. Default 10.
For LSST SIMS moving object database, this should be 13 decimal
places for NEOs and 0 for all others.
Notes
-----
Many chebyshev polynomials are used to fit one moving object over
a given timeperiod; typically, the length of each segment is typically
about 2 days for MBAs. The start and end of each segment must match
exactly, and the entire segments must fit into the total timespan an
integer number of times. This is accomplished by setting n_decimal to
the number of decimal places desired in the 'time' value.
For faster moving objects, this number needs be greater to allow for
smaller subdivisions.
It's tempting to allow flexibility to the point of not
enforcing this non-overlap; however, then the resulting ephemeris
may have multiple values depending on which polynomial segment was
used to calculate the ephemeris.
The length of each chebyshev polynomial is related to the number of
ephemeris positions used to fit that polynomial by ngran:
length = timestep * ngran
The length of each polynomial is adjusted so that the residuals in
RA/Dec position are less than sky_tolerance - default = 2.5mas.
The polynomial length (and the resulting residuals) is affected
by ngran (i.e. timestep).
Default values are based on Yusra AlSayaad's work.
"""
def __init__(
self,
orbits_obj,
t_start,
t_span,
time_scale="TAI",
obscode="I11",
sky_tolerance=2.5,
n_coeff_position=14,
n_coeff_vmag=9,
n_coeff_delta=5,
n_coeff_elongation=6,
ngran=64,
eph_file=None,
n_decimal=10,
):
# Set up PyOrbEphemerides.
if eph_file is None:
self.eph_file = os.path.join(get_oorb_data_dir(), "de405.dat")
else:
self.eph_file = eph_file
self.pyephems = PyOrbEphemerides(self.eph_file)
# And then set orbits.
self._set_orbits(orbits_obj)
# Save input parameters.
# We have to play some games with the start and end times,
# using Decimal, in order to get the subdivision and times to
# match exactly, up to n_decimal places.
self.n_decimal = int(n_decimal)
self.t_start = round(t_start, self.n_decimal)
self.t_span = round(t_span, self.n_decimal)
self.t_end = round(self.t_start + self.t_span, self.n_decimal)
if time_scale.upper() == "TAI":
self.time_scale = "TAI"
elif time_scale.upper() == "UTC":
self.time_scale = "UTC"
elif time_scale.upper() == "TT":
self.time_scale = "TT"
else:
raise ValueError("Do not understand time_scale; use TAI, UTC or TT.")
self.obscode = obscode
self.sky_tolerance = sky_tolerance
self.n_coeff = {}
self.n_coeff["position"] = int(n_coeff_position)
self.n_coeff["geo_dist"] = int(n_coeff_delta)
self.n_coeff["vmag"] = int(n_coeff_vmag)
self.n_coeff["elongation"] = int(n_coeff_elongation)
self.ngran = int(ngran)
# Precompute multipliers (we only do this once).
self._precompute_multipliers()
# Initialize attributes to save the coefficients and residuals.
self.coeffs = {
"obj_id": [],
"t_start": [],
"t_end": [],
"ra": [],
"dec": [],
"geo_dist": [],
"vmag": [],
"elongation": [],
}
self.resids = {
"obj_id": [],
"t_start": [],
"t_end": [],
"pos": [],
"geo_dist": [],
"vmag": [],
"elongation": [],
}
self.failed = []
def _set_orbits(self, orbits_obj):
"""Set the orbits, to be used to generate ephemerides.
Parameters
----------
orbits_obj : `rubin_sim.moving_objects.Orbits`
The orbits to use to generate ephemerides.
"""
if not isinstance(orbits_obj, Orbits):
raise ValueError("Need to provide an Orbits object.")
self.orbits_obj = orbits_obj
self.pyephems.set_orbits(self.orbits_obj)
def _precompute_multipliers(self):
"""Calculate multipliers for Chebyshev fitting.
Calculate these once, rather than for each segment.
"""
# The nPoints are predetermined here, based on Yusra's earlier work.
# The weight is based on Newhall, X. X. 1989, Celestial Mechanics,
# 45, p. 305-310
self.multipliers = {}
self.multipliers["position"] = make_cheb_matrix(self.ngran + 1, self.n_coeff["position"], weight=0.16)
self.multipliers["vmag"] = make_cheb_matrix_only_x(self.ngran + 1, self.n_coeff["vmag"])
self.multipliers["geo_dist"] = make_cheb_matrix_only_x(self.ngran + 1, self.n_coeff["geo_dist"])
self.multipliers["elongation"] = make_cheb_matrix_only_x(self.ngran + 1, self.n_coeff["elongation"])
def _length_to_timestep(self, length):
"""Convert chebyshev polynomial segment lengths to the
corresponding timestep over the segment.
Parameters
----------
length : `float`
The chebyshev polynomial segment length (nominally, days).
Returns
-------
timestep : `float`
The corresponding timestep, = length/ngran (nominally, days).
"""
return length / self.ngran
[docs]
def make_all_times(self):
"""Using t_start and t_end, generate a numpy array containing
times spaced at timestep = self.length/self.ngran.
The expected use for this time array would be to generate
ephemerides at each timestep.
Returns
-------
times : `np.ndarray`
Numpy array of times.
"""
try:
self.length
except AttributeError:
raise AttributeError("Need to set self.timestep first, using calcSegmentLength.")
timestep = self._length_to_timestep(self.length)
times = np.arange(self.t_start, self.t_end + timestep / 2, timestep)
return times
[docs]
def generate_ephemerides(self, times, by_object=True):
"""Generate ephemerides using OpenOrb for all orbits.
Parameters
----------
times : `np.ndarray`
The times to use for ephemeris generation.
"""
return self.pyephems.generate_ephemerides(
times,
obscode=self.obscode,
eph_mode="N",
eph_type="basic",
time_scale=self.time_scale,
by_object=by_object,
)
def _round_length(self, length):
"""Modify length, to fit in an 'integer multiple' within the
t_start/t_end, and to have the desired number of decimal values.
Parameters
----------
length : `float`
The input length value to be rounded.
Returns
-------
length : `float`
The rounded length value.
"""
length = round(length, self.n_decimal)
length_in = length
# Make length an integer value within the time interval,
# to last decimal place accuracy.
counter = 0
prev_int_factor = 0
num_tolerance = 10.0 ** (-1 * (self.n_decimal - 1))
while ((self.t_span % length) > num_tolerance) and (length > 0) and (counter < 20):
int_factor = int(self.t_span / length) + 1 # round up / ceiling
if int_factor == prev_int_factor:
int_factor = prev_int_factor + 1
prev_int_factor = int_factor
length = round(self.t_span / int_factor, self.n_decimal)
counter += 1
if (self.t_span % length) > num_tolerance or (length <= 0):
# Add this entire segment into the failed list.
for obj_id in self.orbits_obj.orbits["obj_id"].as_matrix():
self.failed.append((obj_id, self.t_start, self.t_end))
raise ValueError(
"Could not find a suitable length for the timespan (start %f, span %f), "
"starting with length %s, ending with length value %f"
% (self.t_start, self.t_span, str(length_in), length)
)
return length
def _test_residuals(self, length, cutoff=99):
"""Calculate the position residual, for a test case.
Convenience function to make calcSegmentLength easier to read.
"""
# The pos_resid used will be the 'cutoff' percentile of all
# max residuals per object.
max_pos_resids = np.zeros(len(self.orbits_obj), float)
timestep = self._length_to_timestep(length)
# Test for one segment near the start (would do at midpoint,
# but for long timespans this is not efficient ..
# a point near the start should be fine).
times = np.arange(self.t_start, self.t_start + length + timestep / 2, timestep)
# We must regenerate ephemerides here, because the timestep is
# different each time.
ephs = self.generate_ephemerides(times, by_object=True)
# Look for the coefficients and residuals.
for i, e in enumerate(ephs):
coeff_ra, coeff_dec, max_pos_resids[i] = self._get_coeffs_position(e)
# Find a representative value and return.
pos_resid = np.percentile(max_pos_resids, cutoff)
ratio = pos_resid / self.sky_tolerance
return pos_resid, ratio
[docs]
def calc_segment_length(self, length=None):
"""Set the typical initial ephemeris timestep and segment length
for all objects between t_start/t_end.
Sets self.length.
The segment length will fit into the time period between
t_start/t_end an approximately integer multiple of times,
and will only have a given number of decimal places.
Parameters
----------
length : `float`, optional
If specified, this value for the length is used,
instead of calculating it here.
"""
# If length is specified, use it and do nothing else.
if length is not None:
length = self._round_length(length)
pos_resid, ratio = self._test_residuals(length)
if pos_resid > self.sky_tolerance:
warnings.warn(
"Will set length and timestep, but this value of length "
"produces residuals (%f) > skyTolerance (%f)." % (pos_resid, self.sky_tolerance)
)
self.length = length
return
# Otherwise, calculate an appropriate length and timestep.
# Give a guess at a very approximate segment length,
# given the skyTolerance,
# purposefully trying to overestimate this value.
# The actual behavior of the residuals is not linear with
# segment length.
# There is a linear increase at low residuals
# < ~2 mas / segment length < 2 days
# Then at around 2 days the residuals blow up,
# increasing rapidly to about 5000 mas
# (depending on orbit .. TNOs, for example, increase but
# only to about 300 mas, when the residuals resume ~linear growth
# out to 70 day segments if ngran=128)
# Make an arbitrary cap on segment length at 60 days,
# (25000 mas) ~.5 arcminute accuracy.
max_length = 60
max_iterations = 50
if self.sky_tolerance < 5:
# This is the cap of the low-linearity regime,
# looping below will refine this value.
length = 2.0
elif self.sky_tolerance >= 5000:
# Make a very rough guess.
length = np.round((5000.0 / 20.0) * (self.sky_tolerance - 5000.0)) + 5.0
length = np.min([max_length, int(length * 10) / 10.0])
else:
# Try to pick a length in the middle of the fast increase.
length = 4.0
# Tidy up some characteristics of "length":
# make it fit an integer number of times into overall timespan.
# and use a given number of decimal places
# (easier for database storage).
length = self._round_length(length)
# Check the resulting residuals.
pos_resid, ratio = self._test_residuals(length)
counter = 0
# Now should be relatively close.
# Start to zero in using slope around the value.ngran
while pos_resid > self.sky_tolerance and counter <= max_iterations and length > 0:
length = length / 2
length = self._round_length(length)
pos_resid, ratio = self._test_residuals(length)
counter += 1
if counter > max_iterations or length <= 0:
# Add this entire segment into the failed list.
for obj_id in self.orbits_obj.orbits["obj_id"].as_matrix():
self.failed.append((obj_id, self.t_start, self.t_end))
error_message = "Could not find good segment length to meet skyTolerance %f" % (
self.sky_tolerance
)
error_message += " milliarcseconds within %d iterations. " % (max_iterations)
error_message += "Final residual was %f milli-arcseconds." % (pos_resid)
raise ValueError(error_message)
else:
self.length = length
def _get_coeffs_position(self, ephs):
"""Calculate coefficients for the ra/dec values of a
single objects ephemerides.
Parameters
----------
times : `np.ndarray`
The times of the ephemerides.
ephs : `np.ndarray`
The structured array returned by PyOrbEphemerides
holding ephemeris values, for one object.
Returns
-------
coeff_ra, coeff_dec, max_pos_resid : `np.ndarray`, `np.ndarray`,
`np.ndarray`
The ra coefficients, dec coefficients, and the positional error
residuals between fit and ephemeris values, in mas.
"""
dradt_coord = ephs["dradt"] / np.cos(np.radians(ephs["dec"]))
coeff_ra, resid_ra, rms_ra_resid, max_ra_resid = chebfit(
ephs["time"],
three_sixty_to_neg(ephs["ra"]),
dxdt=dradt_coord,
x_multiplier=self.multipliers["position"][0],
dx_multiplier=self.multipliers["position"][1],
n_poly=self.n_coeff["position"],
)
coeff_dec, resid_dec, rms_dec_resid, max_dec_resid = chebfit(
ephs["time"],
ephs["dec"],
dxdt=ephs["ddecdt"],
x_multiplier=self.multipliers["position"][0],
dx_multiplier=self.multipliers["position"][1],
n_poly=self.n_coeff["position"],
)
max_pos_resid = np.max(np.sqrt(resid_dec**2 + (resid_ra * np.cos(np.radians(ephs["dec"]))) ** 2))
# Convert position residuals to mas.
max_pos_resid *= 3600.0 * 1000.0
return coeff_ra, coeff_dec, max_pos_resid
def _get_coeffs_other(self, ephs):
"""Calculate coefficients for the ra/dec values of a
single objects ephemerides.
Parameters
----------
ephs : `np.ndarray`
The structured array returned by PyOrbEphemerides
holding ephemeris values, for one object.
Returns
-------
coeffs, max_resids : `dict` of `float`
Dictionary containing the coefficients for each of 'geo_dist',
'vmag', 'elongation', and another dictionary containing the
max residual values for each of 'geo_dist', 'vmag', 'elongation'.
"""
coeffs = {}
max_resids = {}
for key, ephValue in zip(("geo_dist", "vmag", "elongation"), ("geo_dist", "magV", "solarelon")):
coeffs[key], resid, rms, max_resids[key] = chebfit(
ephs["time"],
ephs[ephValue],
dxdt=None,
x_multiplier=self.multipliers[key],
dx_multiplier=None,
n_poly=self.n_coeff[key],
)
return coeffs, max_resids
[docs]
def calc_segments(self):
"""Run the calculation of all segments over the entire time span."""
# First calculate ephemerides for all objects, over entire time span.
# For some objects, we will end up recalculating the ephemeride values,
# but most should be fine.
times = self.make_all_times()
ephs = self.generate_ephemerides(times)
eps = self._length_to_timestep(self.length) / 4.0
# Loop through each object to generate coefficients.
for orbit_obj, e in zip(self.orbits_obj, ephs):
t_segment_start = self.t_start
# Cycle through all segments until we reach the end of the
# period we're fitting.
while t_segment_start < (self.t_end - eps):
# Identify the subset of times and ephemerides
# which are relevant for this segment
# (at the default segment size).
t_segment_end = round(t_segment_start + self.length, self.n_decimal)
subset = np.where((times >= t_segment_start) & (times < t_segment_end + eps))
self.calc_one_segment(orbit_obj, e[subset])
t_segment_start = t_segment_end
[docs]
def calc_one_segment(self, orbit_obj, ephs):
"""Calculate the coefficients for a single Chebyshev segment,
for a single object.
Calculates the coefficients and residuals, and saves this
information to self.coeffs, self.resids, and
(if there are problems), self.failed.
Parameters
----------
orbit_obj : `rubin_sim.moving_objects.Orbits`
The single Orbits object we're fitting at the moment.
ephs : `np.ndarray`
The ephemerides we're fitting at the moment
(for the single object / single segment).
"""
obj_id = orbit_obj.orbits.obj_id.iloc[0]
t_segment_start = ephs["time"][0]
t_segment_end = ephs["time"][-1]
coeff_ra, coeff_dec, max_pos_resid = self._get_coeffs_position(ephs)
if max_pos_resid > self.sky_tolerance:
self._subdivide_segment(orbit_obj, ephs)
else:
coeffs, max_resids = self._get_coeffs_other(ephs)
fit_failed = False
for k in max_resids:
if np.isnan(max_resids[k]):
fit_failed = True
if fit_failed:
warnings.warn(
"Fit failed for orbit_obj %s for times between %f and %f"
% (obj_id, t_segment_start, t_segment_end)
)
self.failed.append((orbit_obj.orbits["obj_id"], t_segment_start, t_segment_end))
else:
# Consolidate items into the tracked coefficient values.
self.coeffs["obj_id"].append(obj_id)
self.coeffs["t_start"].append(t_segment_start)
self.coeffs["t_end"].append(t_segment_end)
self.coeffs["ra"].append(coeff_ra)
self.coeffs["dec"].append(coeff_dec)
self.coeffs["geo_dist"].append(coeffs["geo_dist"])
self.coeffs["vmag"].append(coeffs["vmag"])
self.coeffs["elongation"].append(coeffs["elongation"])
# Consolidate items into the tracked residual values.
self.resids["obj_id"].append(obj_id)
self.resids["t_start"].append(t_segment_start)
self.resids["t_end"].append(t_segment_end)
self.resids["pos"].append(max_pos_resid)
self.resids["geo_dist"].append(max_resids["geo_dist"])
self.resids["vmag"].append(max_resids["geo_dist"])
self.resids["elongation"].append(max_resids["elongation"])
def _subdivide_segment(self, orbit_obj, ephs):
"""Subdivide a segment, then calculate the segment coefficients.
Parameters
----------
orbit_obj : `rubin_sim.moving_objects.Orbits`
The single Orbits object we're fitting at the moment.
ephs : `np.ndarray`
The ephemerides we're fitting at the moment
(for the single object / single segment).
"""
new_cheby = ChebyFits(
orbit_obj,
ephs["time"][0],
(ephs["time"][-1] - ephs["time"][0]),
time_scale=self.time_scale,
obscode=self.obscode,
sky_tolerance=self.sky_tolerance,
n_coeff_position=self.n_coeff["position"],
n_coeff_vmag=self.n_coeff["vmag"],
n_coeff_delta=self.n_coeff["geo_dist"],
n_coeff_elongation=self.n_coeff["elongation"],
ngran=self.ngran,
eph_file=self.eph_file,
n_decimal=self.n_decimal,
)
try:
new_cheby.calc_segment_length()
except ValueError as ve:
# Could not find a good segment length.
warningmessage = "Objid %s, segment %f to %f " % (
orbit_obj.orbits.obj_id.iloc[0],
ephs["time"][0],
ephs["time"][-1],
)
warningmessage += " - error: %s" % (ve)
warnings.warn(warningmessage)
self.failed += new_cheby.failed
return
new_cheby.calc_segments()
# Add subdivided segment values into tracked values here.
for k in self.coeffs:
self.coeffs[k] += new_cheby.coeffs[k]
for k in self.resids:
self.resids[k] += new_cheby.resids[k]
self.failed += new_cheby.failed
[docs]
def write(self, coeff_file, resid_file, failed_file, append=False):
"""Write coefficients, residuals and failed fits to disk.
Parameters
----------
coeff_file : `str`
The filename for the coefficient values.
resid_file : `str`
The filename for the residual values.
failed_file : `str`
The filename to write the failed fit information
(if failed objects exist).
append : `bool`, optional
Flag to append (or overwrite) the output files.
"""
warnings.warn(
"Writing cheby fit values may have cross-platform issues. Consider passing values directly"
)
if append:
open_mode = "aw"
else:
open_mode = "w"
# Write a header to the coefficients file, if writing to a new file:
if (not append) or (not os.path.isfile(coeff_file)):
header = "obj_id t_start t_end "
header += " ".join(["ra_%d" % x for x in range(self.n_coeff["position"])]) + " "
header += " ".join(["dec_%d" % x for x in range(self.n_coeff["position"])]) + " "
header += " ".join(["geo_dist_%d" % x for x in range(self.n_coeff["geo_dist"])]) + " "
header += " ".join(["vmag_%d" % x for x in range(self.n_coeff["vmag"])]) + " "
header += " ".join(["elongation_%d" % x for x in range(self.n_coeff["elongation"])])
else:
header = None
if (not append) or (not os.path.isfile(resid_file)):
resid_header = "obj_id segNum t_start t_end length pos geo_dist vmag elong"
else:
resid_header = None
timeformat = "%." + "%s" % self.n_decimal + "f"
with open(coeff_file, open_mode) as f:
if header is not None:
print(header, file=f)
for i, (obj_id, t_start, t_end, cRa, cDec, cDelta, cVmag, cE) in enumerate(
zip(
self.coeffs["obj_id"],
self.coeffs["t_start"],
self.coeffs["t_end"],
self.coeffs["ra"],
self.coeffs["dec"],
self.coeffs["geo_dist"],
self.coeffs["vmag"],
self.coeffs["elongation"],
)
):
print(
"%s %s %s %s %s %s %s %s"
% (
obj_id,
timeformat % t_start,
timeformat % t_end,
" ".join("%.14e" % j for j in cRa),
" ".join("%.14e" % j for j in cDec),
" ".join("%.7e" % j for j in cDelta),
" ".join("%.7e" % j for j in cVmag),
" ".join("%.7e" % j for j in cE),
),
file=f,
)
with open(resid_file, open_mode) as f:
if resid_header is not None:
print(resid_header, file=f)
for i, (obj_id, t_start, t_end, rPos, rDelta, rVmag, rE) in enumerate(
zip(
self.resids["obj_id"],
self.resids["t_start"],
self.resids["t_end"],
self.resids["pos"],
self.resids["geo_dist"],
self.resids["vmag"],
self.resids["elongation"],
)
):
print(
"%s %i %.14f %.14f %.14f %.14e %.14e %.14e %.14e"
% (
obj_id,
i + 1,
t_start,
t_end,
(t_end - t_start),
rPos,
rDelta,
rVmag,
rE,
),
file=f,
)
if len(self.failed) > 0:
with open(failed_file, open_mode) as f:
for i, failed in enumerate(self.failed):
print(" ".join([str(x) for x in failed]), file=f)