Source code for rubin_sim.maf.metrics.calibration_metrics

__all__ = (
    "ParallaxMetric",
    "ProperMotionMetric",
    "RadiusObsMetric",
    "ParallaxCoverageMetric",
    "ParallaxDcrDegenMetric",
)

import numpy as np
import rubin_scheduler.utils as utils
from scipy.optimize import curve_fit

import rubin_sim.maf.utils as mafUtils

from .base_metric import BaseMetric


[docs] class ParallaxMetric(BaseMetric): """Calculate the uncertainty in a parallax measurement given a series of observations. Uses columns ra_pi_amp and dec_pi_amp, calculated by the ParallaxFactorStacker. Parameters ---------- m5_col : `str`, optional The default column name for m5 information in the input data. filter_col : `str`, optional The column name for the filter information. seeing_col : `str`, optional The column name for the seeing information. Since the astrometry errors are based on the physical size of the PSF, this should be the FWHM of the physical psf, e.g. seeingFwhmGeom. rmag : `float`, optional The r magnitude of the fiducial star in r band. Other filters are scaled using sedTemplate keyword. SedTemplate : `str`, optional The template to use. This can be 'flat' or 'O','B','A','F','G','K','M'. atm_err : `float`, optional The expected centroiding error due to the atmosphere, in arcseconds. Default 0.01. normalize : `bool`, optional Compare the astrometric uncertainty to the uncertainty that would result if half the observations were taken at the start and half at the end. A perfect survey will have a value close to 1, while a poorly scheduled survey will be close to 0. badval : `float`, optional The value to return when the metric value cannot be calculated. """ def __init__( self, m5_col="fiveSigmaDepth", filter_col="filter", seeing_col="seeingFwhmGeom", rmag=20.0, sed_template="flat", badval=-666, atm_err=0.01, normalize=False, **kwargs, ): cols = [m5_col, filter_col, seeing_col, "ra_pi_amp", "dec_pi_amp"] if normalize: units = "ratio" else: units = "mas" super().__init__(cols, units=units, badval=badval, **kwargs) # set return type self.m5_col = m5_col self.seeing_col = seeing_col self.filter_col = filter_col filters = ["u", "g", "r", "i", "z", "y"] self.mags = {} if sed_template == "flat": for f in filters: self.mags[f] = rmag else: self.mags = utils.stellarMags(sed_template, rmag=rmag) self.atm_err = atm_err self.normalize = normalize self.comment = ( "Estimated uncertainty in parallax measurement " "(assuming no proper motion or that proper motion " ) self.comment += ( "is well fit). Uses measurements in all bandpasses, " "and estimates astrometric error based on SNR " ) self.comment += "in each visit. " if sed_template == "flat": self.comment += "Assumes a flat SED. " if self.normalize: self.comment += ( "This normalized version of the metric displays the " "estimated uncertainty in the parallax measurement, " ) self.comment += "divided by the minimum parallax uncertainty possible " self.comment += "(if all visits were six months apart). " self.comment += "Values closer to 1 indicate more optimal " "scheduling for parallax measurement." def _final_sigma(self, position_errors, ra_pi_amp, dec_pi_amp): """Assume parallax in RA and DEC are fit independently, then combined. All inputs assumed to be arcsec""" with np.errstate(divide="ignore", invalid="ignore"): sigma_a = position_errors / ra_pi_amp sigma_b = position_errors / dec_pi_amp sigma_ra = np.sqrt(1.0 / np.sum(1.0 / sigma_a**2)) sigma_dec = np.sqrt(1.0 / np.sum(1.0 / sigma_b**2)) # Combine RA and Dec uncertainties, convert to mas sigma = np.sqrt(1.0 / (1.0 / sigma_ra**2 + 1.0 / sigma_dec**2)) * 1e3 return sigma
[docs] def run(self, data_slice, slice_point=None): filters = np.unique(data_slice[self.filter_col]) if hasattr(filters[0], "decode"): filters = [str(f.decode("utf-8")) for f in filters] snr = np.zeros(len(data_slice), dtype="float") # compute SNR for all observations for filt in filters: good = np.where(data_slice[self.filter_col] == filt) snr[good] = mafUtils.m52snr(self.mags[str(filt)], data_slice[self.m5_col][good]) position_errors = mafUtils.astrom_precision(data_slice[self.seeing_col], snr, self.atm_err) sigma = self._final_sigma(position_errors, data_slice["ra_pi_amp"], data_slice["dec_pi_amp"]) if self.normalize: # Leave the dec parallax as zero since one can't have # ra and dec maximized at the same time. sigma = ( self._final_sigma( position_errors, data_slice["ra_pi_amp"] * 0 + 1.0, data_slice["dec_pi_amp"] * 0, ) / sigma ) return sigma
[docs] class ProperMotionMetric(BaseMetric): """Calculate the uncertainty in the returned proper motion. This metric assumes gaussian errors in the astrometry measurements. Parameters ---------- metricName : `str`, optional Default 'properMotion'. m5_col : `str`, optional The default column name for m5 information in the input data. Default fiveSigmaDepth. mjd_col : `str`, optional The column name for the exposure time. Default observationStartMJD. filterCol : `str`, optional The column name for the filter information. Default filter. seeing_col : `str`, optional The column name for the seeing information. Since the astrometry errors are based on the physical size of the PSF, this should be the FWHM of the physical psf. Default seeingFwhmGeom. rmag : `float`, optional The r magnitude of the fiducial star in r band. Other filters are sclaed using sedTemplate keyword. Default 20.0 SedTemplate : `str`, optional The template to use. This can be 'flat' or 'O','B','A','F','G','K','M'. Default flat. atm_err : `float`, optional The expected centroiding error due to the atmosphere, in arcseconds. Default 0.01. normalize : `bool`, optional Compare the astrometric uncertainty to the uncertainty that would result if half the observations were taken at the start and half at the end. A perfect survey will have a value close to 1, while a poorly scheduled survey will be close to 0. Default False. baseline : `float`, optional The length of the survey used for the normalization, in years. Default 10. badval : `float`, optional The value to return when the metric value cannot be calculated. Default -666. """ def __init__( self, metric_name="properMotion", m5_col="fiveSigmaDepth", mjd_col="observationStartMJD", filter_col="filter", seeing_col="seeingFwhmGeom", rmag=20.0, sed_template="flat", badval=-666, atm_err=0.01, normalize=False, baseline=10.0, **kwargs, ): cols = [m5_col, mjd_col, filter_col, seeing_col] if normalize: units = "ratio" else: units = "mas/yr" super(ProperMotionMetric, self).__init__( col=cols, metric_name=metric_name, units=units, badval=badval, **kwargs ) # set return type self.mjd_col = mjd_col self.seeing_col = seeing_col self.m5_col = m5_col filters = ["u", "g", "r", "i", "z", "y"] self.mags = {} if sed_template == "flat": for f in filters: self.mags[f] = rmag else: self.mags = utils.stellarMags(sed_template, rmag=rmag) self.atm_err = atm_err self.normalize = normalize self.baseline = baseline self.comment = ( "Estimated uncertainty of the proper motion fit " "(assuming no parallax or that parallax is well fit). " ) self.comment += ( "Uses visits in all bands, and generates approximate " "astrometric errors using the SNR in each visit. " ) if sed_template == "flat": self.comment += "Assumes a flat SED. " if self.normalize: self.comment += ( "This normalized version of the metric represents " "the estimated uncertainty in the proper " ) self.comment += "motion divided by the minimum uncertainty possible " self.comment += "(if all visits were obtained on the first and last days of the survey). " self.comment += "Values closer to 1 indicate more optimal scheduling."
[docs] def run(self, data_slice, slice_point=None): filters = np.unique(data_slice["filter"]) filters = [str(f) for f in filters] precis = np.zeros(data_slice.size, dtype="float") for f in filters: observations = np.where(data_slice["filter"] == f) if np.size(observations[0]) < 2: precis[observations] = self.badval else: snr = mafUtils.m52snr(self.mags[f], data_slice[self.m5_col][observations]) precis[observations] = mafUtils.astrom_precision( data_slice[self.seeing_col][observations], snr, self.atm_err ) good = np.where(precis != self.badval) result = mafUtils.sigma_slope(data_slice[self.mjd_col][good], precis[good]) result = result * 365.25 * 1e3 # Convert to mas/yr if (self.normalize) & (good[0].size > 0): new_dates = data_slice[self.mjd_col][good] * 0 n_dates = new_dates.size new_dates[n_dates // 2 :] = self.baseline * 365.25 result = (mafUtils.sigma_slope(new_dates, precis[good]) * 365.25 * 1e3) / result # Observations that are very close together can still fail if np.isnan(result): result = self.badval return result
[docs] class ParallaxCoverageMetric(BaseMetric): """Check how well the parallax factor is distributed. Subtracts the weighted mean position of the parallax offsets, then computes the weighted mean radius of the points. If points are well distributed, the mean radius will be near 1. If phase coverage is bad, radius will be close to zero. For points on the Ecliptic, uniform sampling should result in a metric value of ~0.5. At the poles, uniform sampling would result in a metric value of ~1. Conceptually, it is helpful to remember that the parallax motion of a star at the pole is a (nearly circular) ellipse while the motion of a star on the ecliptic is a straight line. Thus, any pair of observations separated by 6 months will give the full parallax range for a star on the pole but only observations on very specific dates will give the full range for a star on the ecliptic. Optionally also demand that there are observations above the snr_limit kwarg spanning theta_range radians. Parameters ---------- m5_col : `str`, optional Column name for individual visit m5. Default fiveSigmaDepth. mjd_col : `str`, optional Column name for exposure time dates. Default observationStartMJD. filter_col : `str`, optional Column name for filter. Default filter. seeing_col : `str`, optional Column name for seeing (assumed FWHM). Default seeingFwhmGeom. rmag : `float`, optional Magnitude of fiducial star in r filter. Other filters are scaled using sedTemplate keyword. Default 20.0 sedTemplate : `str`, optional Template to use (can be 'flat' or 'O','B','A','F','G','K','M'). Default 'flat'. atm_err : `float`, optional Centroiding error due to atmosphere in arcsec. Default 0.01 (arcseconds). theta_range : `float`, optional Range of parallax offset angles to demand (in radians). Default=0 (means no range requirement). snr_limit : `float`, optional Only include points above the snr_limit when computing theta_range. Default 5. Returns -------- metricValu e: `float` Returns a weighted mean of the length of the parallax factor vectors. Values near 1 imply that the points are well distributed. Values near 0 imply that the parallax phase coverage is bad. Near the ecliptic, uniform sampling results in metric values of about 0.5. Notes ----- Uses the ParallaxFactor stacker to calculate ra_pi_amp and dec_pi_amp. """ def __init__( self, metric_name="ParallaxCoverageMetric", m5_col="fiveSigmaDepth", mjd_col="observationStartMJD", filter_col="filter", seeing_col="seeingFwhmGeom", rmag=20.0, sed_template="flat", atm_err=0.01, theta_range=0.0, snr_limit=5, **kwargs, ): cols = ["ra_pi_amp", "dec_pi_amp", m5_col, mjd_col, filter_col, seeing_col] units = "ratio" super(ParallaxCoverageMetric, self).__init__(cols, metric_name=metric_name, units=units, **kwargs) self.m5_col = m5_col self.seeing_col = seeing_col self.filter_col = filter_col self.mjd_col = mjd_col # Demand the range of theta values self.theta_range = theta_range self.snr_limit = snr_limit filters = ["u", "g", "r", "i", "z", "y"] self.mags = {} if sed_template == "flat": for f in filters: self.mags[f] = rmag else: self.mags = utils.stellarMags(sed_template, rmag=rmag) self.atm_err = atm_err caption = "Parallax factor coverage for an r=%.2f star (0 is bad, 0.5-1 is good). " % (rmag) caption += "One expects the parallax factor coverage to vary because stars on the ecliptic " caption += "can be observed when they have no parallax offset while stars at the pole are always " caption += "offset by the full parallax offset." "" self.comment = caption def _theta_check(self, ra_pi_amp, dec_pi_amp, snr): good = np.where(snr >= self.snr_limit) theta = np.arctan2(dec_pi_amp[good], ra_pi_amp[good]) # Make values between 0 and 2pi theta = theta - np.min(theta) result = 0.0 if np.max(theta) >= self.theta_range: # Check that things are in different quadrants theta = (theta + np.pi) % 2.0 * np.pi theta = theta - np.min(theta) if np.max(theta) >= self.theta_range: result = 1 return result def _compute_weights(self, data_slice, snr): # Compute centroid uncertainty in each visit position_errors = mafUtils.astrom_precision(data_slice[self.seeing_col], snr, self.atm_err) weights = 1.0 / position_errors**2 return weights def _weighted_r(self, dec_pi_amp, ra_pi_amp, weights): ycoord = dec_pi_amp - np.average(dec_pi_amp, weights=weights) xcoord = ra_pi_amp - np.average(ra_pi_amp, weights=weights) radius = np.sqrt(xcoord**2 + ycoord**2) ave_rad = np.average(radius, weights=weights) return ave_rad
[docs] def run(self, data_slice, slice_point=None): if np.size(data_slice) < 2: return self.badval filters = np.unique(data_slice[self.filter_col]) filters = [str(f) for f in filters] snr = np.zeros(len(data_slice), dtype="float") # compute SNR for all observations for filt in filters: in_filt = np.where(data_slice[self.filter_col] == filt) snr[in_filt] = mafUtils.m52snr(self.mags[str(filt)], data_slice[self.m5_col][in_filt]) weights = self._compute_weights(data_slice, snr) ave_r = self._weighted_r(data_slice["ra_pi_amp"], data_slice["dec_pi_amp"], weights) if self.theta_range > 0: theta_check = self._theta_check(data_slice["ra_pi_amp"], data_slice["dec_pi_amp"], snr) else: theta_check = 1.0 result = ave_r * theta_check return result
[docs] class ParallaxDcrDegenMetric(BaseMetric): """Use the full parallax and DCR displacement vectors to find if they are degenerate. Parameters ---------- metricName : `str`, optional Default 'ParallaxDcrDegenMetric'. seeing_col : `str`, optional Default 'FWHMgeom' m5_col : `str`, optional Default 'fiveSigmaDepth' filter_col : `str` Default 'filter' atm_err : `float` Minimum error in photometry centroids introduced by the atmosphere (arcseconds). Default 0.01. rmag : `float` r-band magnitude of the fiducual star that is being used (mag). SedTemplate : `str` The SED template to use for fiducia star colors, passed to rubin_scheduler.utils.stellarMags. Default 'flat' tol : `float` Tolerance for how well curve_fit needs to work before believing the covariance result. Default 0.05. Returns ------- metricValue : `float` Returns the correlation coefficient between the best-fit parallax amplitude and DCR amplitude. The RA and Dec offsets are fit simultaneously. Values close to zero are good, values close to +/- 1 are bad. Experience with fitting Monte Carlo simulations suggests the astrometric fits start becoming poor around a correlation of 0.7. """ def __init__( self, metric_name="ParallaxDcrDegenMetric", seeing_col="seeingFwhmGeom", m5_col="fiveSigmaDepth", atm_err=0.01, rmag=20.0, sed_template="flat", filter_col="filter", tol=0.05, **kwargs, ): self.m5_col = m5_col self.seeing_col = seeing_col self.filter_col = filter_col self.tol = tol units = "Correlation" # just put all the columns that all the stackers will need here? cols = [ "ra_pi_amp", "dec_pi_amp", "ra_dcr_amp", "dec_dcr_amp", seeing_col, m5_col, ] super(ParallaxDcrDegenMetric, self).__init__(cols, metric_name=metric_name, units=units, **kwargs) self.mags = {} if sed_template == "flat": for f in ["u", "g", "r", "i", "z", "y"]: self.mags[f] = rmag else: self.mags = utils.stellarMags(sed_template, rmag=rmag) self.atm_err = atm_err def _positions(self, x, a, b): """Function to find parallax and dcr amplitudes x should be a vector with [[parallax_x1, parallax_x2..., parallax_y1, parallax_y2...], [dcr_x1, dcr_x2..., dcr_y1, dcr_y2...]] """ result = a * x[0, :] + b * x[1, :] return result
[docs] def run(self, data_slice, slice_point=None): # The idea here is that we calculate position errors (in RA and Dec) # for all observations. Then we generate arrays of the parallax # offsets (delta RA parallax = ra_pi_amp, etc) and the DCR offsets # (delta RA DCR = ra_dcr_amp, etc), and just add them together into one # RA (and Dec) offset. Then, we try to fit for how we combined these # offsets, but while considering the astrometric noise. If we can # figure out that we just added them together # (i.e. the curve_fit result is [a=1, b=1] for the function # _positions above) then we should be able to disentangle the # parallax and DCR offsets when fitting 'for real'. # compute SNR for all observations snr = np.zeros(len(data_slice), dtype="float") for filt in np.unique(data_slice[self.filter_col]): in_filt = np.where(data_slice[self.filter_col] == filt) snr[in_filt] = mafUtils.m52snr(self.mags[filt], data_slice[self.m5_col][in_filt]) # Compute the centroiding uncertainties # Note that these centroiding uncertainties depend on the physical # size of the PSF, thus we are using seeingFwhmGeom for these metrics, # not seeingFwhmEff. position_errors = mafUtils.astrom_precision(data_slice[self.seeing_col], snr, self.atm_err) # Construct the vectors of RA/Dec offsets. xdata is the "input data". # ydata is the "output". xdata = np.empty((2, data_slice.size * 2), dtype=float) xdata[0, :] = np.concatenate((data_slice["ra_pi_amp"], data_slice["dec_pi_amp"])) xdata[1, :] = np.concatenate((data_slice["ra_dcr_amp"], data_slice["dec_dcr_amp"])) ydata = np.sum(xdata, axis=0) # Use curve_fit to compute covariance between parallax # and dcr amplitudes # Set the initial guess slightly off from the correct [1,1] to # make sure it iterates. popt, pcov = curve_fit( self._positions, xdata, ydata, p0=[1.1, 0.9], sigma=np.concatenate((position_errors, position_errors)), absolute_sigma=True, ) # Catch if the fit failed to converge on the correct solution. if np.max(np.abs(popt - np.array([1.0, 1.0]))) > self.tol: return self.badval # Covariance between best fit parallax amplitude and DCR amplitude. cov = pcov[1, 0] # Convert covariance between parallax and DCR amplitudes to normalized # correlation perr = np.sqrt(np.diag(pcov)) correlation = cov / (perr[0] * perr[1]) result = correlation # This can throw infs. if np.isinf(result): result = self.badval return result
def calc_dist_cosines(ra1, dec1, ra2, dec2): """Calculates distance on a sphere using spherical law of cosines. Note: floats can be replaced by numpy arrays of RA/Dec. For very small distances, rounding errors may cause distance errors. Parameters ---------- ra1, dec1 : `float`, `float` RA and Dec of one point. (radians) ra2, dec2 : `float`, `float` RA and Dec of another point. (radians) Returns ------- distance : `float` Angular distance between the points in radians. """ # This formula can have rounding errors for case where distances are small. # Oh, the joys of wikipedia - # http://en.wikipedia.org/wiki/Great-circle_distance # For the purposes of these calculations, this is probably accurate enough. D = np.sin(dec2) * np.sin(dec1) + np.cos(dec1) * np.cos(dec2) * np.cos(ra2 - ra1) D = np.arccos(D) return D
[docs] class RadiusObsMetric(BaseMetric): """Evaluate slice point radial position in the focal plane of each visit, reducing to the mean, rms and full range of these radial distances. """ def __init__( self, metric_name="radiusObs", ra_col="fieldRA", dec_col="fieldDec", units="radians", **kwargs ): self.ra_col = ra_col self.dec_col = dec_col super(RadiusObsMetric, self).__init__( col=[self.ra_col, self.dec_col], metric_name=metric_name, units=units, **kwargs )
[docs] def run(self, data_slice, slice_point): ra = slice_point["ra"] dec = slice_point["dec"] distances = calc_dist_cosines( ra, dec, np.radians(data_slice[self.ra_col]), np.radians(data_slice[self.dec_col]), ) distances = np.degrees(distances) return distances
def reduce_mean(self, distances): return np.mean(distances) def reduce_rms(self, distances): return np.std(distances) def reduce_full_range(self, distances): return np.max(distances) - np.min(distances)