Source code for rubin_sim.maf.slicers.healpix_subset_slicer

__all__ = (
    "make_circle_subset_slicer",
    "make_wfd_subset_slicer",
    "HealpixSubsetSlicer",
)

from functools import wraps

import healpy as hp
import numpy as np
import rubin_scheduler.utils as simsUtils
from rubin_scheduler.scheduler.utils import get_current_footprint

from .healpix_slicer import HealpixSlicer


[docs] def make_circle_subset_slicer(ra_cen, dec_cen, radius=3.0, nside=512, use_cache=False): """Create a circular healpix subset slicer, centered on ra_cen/dec_cen. Parameters ---------- ra_cen : `float` RA of the center of the slicer (degrees). dec_cen : `float` Dec of the center of the slicer (degrees). radius : `float`, optional Radius of the circular slicer shape (degrees). nside : `int`, optional Nside resolution of the healpix subset slicer. use_cache : `bool`, optional Set up the slicer to `use_cache` or not. Returns ------- subsetslicer, plot_dict : `maf.HealpixSubsetSlicer`, `dict` A healpix subset slicer, defined according to a circle centered on `ra_cen`, `dec_cen`, with radius `radius`. Also returns a minimal plot dict, with the visufunc and rotation information to plot a small circular area with the HealpixSkyMap plotter. """ ra, dec = simsUtils.hpid2_ra_dec(nside, np.arange(hp.nside2npix(nside))) dist = simsUtils.angular_separation(ra_cen, dec_cen, ra, dec) close = np.where(dist <= radius)[0] subsetslicer = HealpixSubsetSlicer(nside, close, use_cache=use_cache) plot_dict = { "visufunc": hp.gnomview, "rot": (ra_cen, dec_cen, 0), "xsize": 500, } return subsetslicer, plot_dict
[docs] def make_wfd_subset_slicer(nside=64, use_cache=True, wfd_labels=None): """Create a wfd-only healpix subset slicer. Parameters ---------- nside : `int`, optional Nside resolution of the healpix subset slicer. use_cache : `bool`, optional Set up the slicer to `use_cache` or not. wfd_labels : `list` [`str`] or None List of the labels from the footprint to use for the "WFD" (or other) footprint. Default of None will use the current WFD level areas, including the WFD-level galactic plane area. Returns ------- subsetslicer : `maf.HealpixSubsetSlicer` A healpix subset slicer, defined according to the most current version of the scheduler footprint using `get_current_footprint`. """ if wfd_labels is None: wfd_labels = ["lowdust", "euclid_overlap", "virgo", "bulgy", "LMC_SMC"] footprints, labels = get_current_footprint(nside=nside) wfdpix = np.where(np.in1d(labels, wfd_labels))[0] slicer = HealpixSubsetSlicer(nside=nside, hpid=wfdpix, use_cache=use_cache) return slicer
[docs] class HealpixSubsetSlicer(HealpixSlicer): """A spatial slicer that evaluates pointings on a subset of a healpix grid. The advantage of using this healpixSubsetSlicer (rather than just putting the RA/Dec values into the UserPointsSlicer, which is another valid approach) is that you preserve the full healpix array. This means you could do things like calculate the power spectrum and plot without remapping into healpixels first. The downside is that you must first (externally) define the healpixels that you wish to use. When plotting with RA/Dec, the default HealpixSkyMap can be used, corresponding to {'rot': (0, 0, 0), 'flip': 'astro'}. Parameters ---------- nside : `int` The nside parameter of the healpix grid. Must be a power of 2. hpid : `np.ndarray`, (N,) The subset of healpix id's to use to calculate the metric. Because the hpid should be defined based on a particular nside, these first two arguments are not optional for this slicer. lon_col : `str`, optional Name of the longitude (RA equivalent) column to use from the input data. Default fieldRA lat_col : `str`, optional Name of the latitude (Dec equivalent) column to use from the input data. Default fieldDec lat_lon_deg : `bool`, optional Flag indicating whether the lat and lon values in the input data are in degrees (True) or radians (False). Default True. verbose : `bool`, optional Flag to indicate whether to write additional information to stdout during runtime. Default True. badval : `float`, optional Bad value flag, relevant for plotting. Default the np.nan value (in order to properly flag bad data points for plotting with the healpix plotting routines). In general, this should not be changed. use_cache : `bool`, optional Flag allowing the user to indicate whether to cache (and reuse) metric results calculated with the same set of simulated data pointings. This can be safely set to True for slicers not using maps and will result in increased speed. When calculating metric results using maps, the map data at each individual ra/dec point may influence the metric results and so use_cache should be set to False. Default True. leafsize : `int`, optional Leafsize value for kdtree. Default 100. radius : `float`, optional Radius for matching in the kdtree. Equivalent to the radius of the FOV. Degrees. Default 1.75. use_camera : `bool`, optional Flag to indicate whether to use the LSST camera footprint or not. Default False. camera_footprint_file : `str`, optional Name of the camera footprint map to use. Can be None, which will use the default. rot_sky_pos_col_name : `str`, optional Name of the rotSkyPos column in the input data. Only used if use_camera is True. Describes the orientation of the camera orientation compared to the sky. Default rotSkyPos. """ def __init__( self, nside, hpid, lon_col="fieldRA", lat_col="fieldDec", lat_lon_deg=True, verbose=True, badval=np.nan, use_cache=True, leafsize=100, radius=2.45, use_camera=True, camera_footprint_file=None, rot_sky_pos_col_name="rotSkyPos", ): """Instantiate and set up healpix slicer object.""" super().__init__( nside=nside, verbose=verbose, lon_col=lon_col, lat_col=lat_col, badval=badval, radius=radius, leafsize=leafsize, use_cache=use_cache, use_camera=use_camera, rot_sky_pos_col_name=rot_sky_pos_col_name, camera_footprint_file=camera_footprint_file, lat_lon_deg=lat_lon_deg, ) self.hpid = hpid # add hpid to the slicer_init values self.slicer_init["hpid"] = hpid self.len_hpid = len(self.hpid) # Set up a mask for the metric values to use self.mask = np.ones(hp.nside2npix(self.nside), bool) self.mask[self.hpid] = False
[docs] def __eq__(self, other_slicer): """Evaluate if two slicers are equivalent.""" # If the two slicers are both healpix slicers, check nsides value. result = False if isinstance(other_slicer, HealpixSubsetSlicer): if other_slicer.nside == self.nside: if np.array_equal(other_slicer.hpid, self.hpid): if other_slicer.lon_col == self.lon_col and other_slicer.lat_col == self.lat_col: if other_slicer.radius == self.radius: if other_slicer.use_camera == self.use_camera: if other_slicer.rotSkyPosColName == self.rotSkyPosColName: if np.all(other_slicer.shape == self.shape): result = True return result
[docs] def __iter__(self): """Iterate over the slices.""" self.hpid_counter = 0 return self
[docs] def __next__(self): """Returns results of self._slice_sim_data when iterating over slicer. Results of self._slice_sim_data should be dictionary of {'idxs': the data indexes relevant for this slice of the slicer, 'slice_point': the metadata for the slice_point, which always includes 'sid' key for ID of slice_point.} """ if self.hpid_counter >= self.len_hpid: raise StopIteration # Set up 'current' islice = self.hpid[self.hpid_counter] # Set up 'next' self.hpid_counter += 1 # Return 'current' return self._slice_sim_data(islice)
[docs] def setup_slicer(self, sim_data, maps=None): """Use sim_data[self.lon_col] and sim_data[self.lat_col] (in radians) to set up KDTree. Parameters ----------- sim_data : `numpy.ndarray`, (N,) The simulated data, including the location of each pointing. maps : `list` of `rubin_sim.maf.maps` objects, optional List of maps (such as dust extinction) that will run to build up additional metadata at each slice_point. This additional metadata is available to metrics via the slice_point dictionary. Default None. """ super().setup_slicer(sim_data=sim_data, maps=maps) @wraps(self._slice_sim_data) def _slice_sim_data(islice): """Return indexes for relevant opsim data at slice_point (slice_point=lon_col/lat_col value .. usually ra/dec).""" # Subclass this method, just to make sure we return # no data for points not in self.hpid slice_point = {"sid": islice, "nside": self.nside} if islice not in self.hpid: indices = [] else: sx, sy, sz = simsUtils._xyz_from_ra_dec( self.slice_points["ra"][islice], self.slice_points["dec"][islice] ) # Query against tree. indices = self.opsimtree.query_ball_point((sx, sy, sz), self.rad) if (self.use_camera) & (len(indices) > 0): # Find the indices *of those indices* which fall in # the camera footprint camera_idx = self.camera( self.slice_points["ra"][islice], self.slice_points["dec"][islice], self.data_ra[indices], self.data_dec[indices], self.data_rot[indices], ) indices = np.array(indices)[camera_idx] # Loop through all the slice_point keys. # If the first dimension of slice_point[key] has # the same shape as the slicer, assume it is information # per slice_point. # Otherwise, pass the whole slice_point[key] information. # Useful for stellar LF maps # where we want to pass only the relevant LF and the bins # that go with it. for key in self.slice_points: if len(np.shape(self.slice_points[key])) == 0: keyShape = 0 else: keyShape = np.shape(self.slice_points[key])[0] if keyShape == self.nslice: slice_point[key] = self.slice_points[key][islice] else: slice_point[key] = self.slice_points[key] return {"idxs": indices, "slice_point": slice_point} setattr(self, "_slice_sim_data", _slice_sim_data)