__all__ = ("MoObjSlicer",)
import numpy as np
import pandas as pd
from rubin_sim.maf.plots.mo_plotters import MetricVsH, MetricVsOrbit
from rubin_sim.moving_objects.orbits import Orbits
from .base_slicer import BaseSlicer
[docs]
class MoObjSlicer(BaseSlicer):
"""Slice moving object _observations_, per object and optionally
clone/per H value.
Iteration over the MoObjSlicer will go as:
* iterate over each orbit;
* if Hrange is not None, for each orbit, iterate over Hrange.
Parameters
----------
h_range : numpy.ndarray or None
The H values to clone the orbital parameters over.
If Hrange is None, will not clone orbits.
"""
def __init__(self, h_range=None, verbose=True, badval=0):
super(MoObjSlicer, self).__init__(verbose=verbose, badval=badval)
self.Hrange = h_range
self.slicer_init = {"h_range": h_range, "badval": badval}
# Set default plot_funcs.
self.plot_funcs = [
MetricVsH(),
MetricVsOrbit(xaxis="q", yaxis="e"),
MetricVsOrbit(xaxis="q", yaxis="inc"),
]
[docs]
def setup_slicer(self, orbit_file, delim=None, skiprows=None, obs_file=None):
"""Set up the slicer and read orbit_file and obs_file from disk.
Sets self.orbits (with orbit parameters), self.all_obs, and self.obs
self.orbit_file and self.obs_file
Parameters
----------
orbit_file : str
The file containing the orbit information.
This is necessary, in order to be able to generate plots.
obs_file : str, optional
The file containing the observations of each object, optional.
If not provided (default, None), then the slicer will not be
able to slice, but can still plot.
"""
self.read_orbits(orbit_file, delim=delim, skiprows=skiprows)
if obs_file is not None:
self.read_obs(obs_file)
else:
self.obs_file = None
self.all_obs = None
self.obs = None
# Add these filenames to the slicer init values,
# to preserve in output files.
self.slicer_init["orbit_file"] = self.orbit_file
self.slicer_init["obs_file"] = self.obs_file
def read_orbits(self, orbit_file, delim=None, skiprows=None):
# Use sims_movingObjects to read orbit files.
orb = Orbits()
orb.read_orbits(orbit_file, delim=delim, skiprows=skiprows)
self.orbit_file = orbit_file
self.orbits = orb.orbits
# Then go on as previously. Need to refactor this
# into 'setup_slicer' style.
self.nSso = len(self.orbits)
self.slice_points = {}
self.slice_points["orbits"] = self.orbits
# And set the slicer shape/size.
if self.Hrange is not None:
self.shape = [self.nSso, len(self.Hrange)]
self.slice_points["H"] = self.Hrange
else:
self.shape = [self.nSso, 1]
self.slice_points["H"] = self.orbits["H"]
# Set the rest of the slice_point information once
self.nslice = self.shape[0] * self.shape[1]
[docs]
def read_obs(self, obs_file):
"""Read observations of the solar system objects
(such as created by sims_movingObjects).
Parameters
----------
obs_file: str
The file containing the observation information.
"""
# For now, just read all the observations
# (should be able to chunk this though).
restore_file = np.load(obs_file)
self.all_obs = restore_file["object_observations"].copy()
restore_file.close()
self.obs_file = obs_file
self.all_obs = pd.DataFrame(self.all_obs)
if "velocity" not in self.all_obs.columns:
self.all_obs["velocity"] = np.sqrt(self.all_obs["dradt"] ** 2 + self.all_obs["ddecdt"] ** 2)
[docs]
def subset_obs(self, pandas_constraint=None):
"""Choose a subset of all the observations,
such as those in a particular time period.
"""
if pandas_constraint is None:
self.obs = self.all_obs
else:
self.obs = self.all_obs.query(pandas_constraint)
def _slice_obs(self, idx):
"""Return the observations of a given ssoId.
For now this works for any ssoId; in the future,
this might only work as ssoId is
progressively iterated through the series of ssoIds
(so we can chunk the reading).
Parameters
----------
idx : integer
The integer index of the particular SSO in the orbits dataframe.
"""
# Find the matching orbit.
orb = self.orbits.iloc[idx]
# Find the matching observations.
if self.obs["obj_id"].dtype == "object":
obs = self.obs.query('obj_id == "%s"' % (orb["obj_id"]))
else:
obs = self.obs.query("obj_id == %d" % (orb["obj_id"]))
# Return the values for H to consider for metric.
if self.Hrange is not None:
Hvals = self.Hrange
else:
Hvals = np.array([orb["H"]], float)
# Note that ssoObs / obs is a recarray not Dataframe!
# But that the orbit IS a Dataframe.
return {"obs": obs.to_records(), "orbit": orb, "Hvals": Hvals}
[docs]
def __iter__(self):
"""Iterate through each of the ssoIds."""
self.idx = 0
return self
[docs]
def __next__(self):
"""Returns result of self._getObs when iterating over moSlicer."""
if self.idx >= self.nSso:
raise StopIteration
idx = self.idx
self.idx += 1
return self._slice_obs(idx)
def __getitem__(self, idx):
# This may not be guaranteed to work if/when we
# implement chunking of the obs_file.
return self._slice_obs(idx)
[docs]
def __eq__(self, other_slicer):
"""Evaluate if two slicers are equal."""
result = False
if isinstance(other_slicer, MoObjSlicer):
if other_slicer.orbit_file == self.orbit_file:
if other_slicer.obs_file == self.obs_file:
if np.array_equal(other_slicer.slice_points["H"], self.slice_points["H"]):
result = True
return result