Source code for rubin_sim.maf.maf_contrib.periodic_star_metric

__all__ = ("PeriodicStar", "PeriodicStarMetric")

import warnings

import numpy as np
from scipy.optimize import curve_fit

from rubin_sim.maf.metrics.base_metric import BaseMetric
from rubin_sim.maf.utils import m52snr


class PeriodicStar:
    def __init__(self, filternames):
        self.filternames = filternames

    def __call__(self, t, x0, x1, x2, x3, x4, x5, x6, x7, x8):
        """Approximate a periodic star as a simple sin wave.
        t: array with "time" in days, and "filter" dtype names.
        x0: Period (days)
        x1: Phase (days)
        x2: Amplitude (mag)
        x3: mean u mag
        x4: mean g mag
        x5: mean r mag
        x6: mean i mag
        x7: mean z mag
        x8: mean y mag
        """
        filter2index = {"u": 3, "g": 4, "r": 5, "i": 6, "z": 7, "y": 8}
        filter_names = np.unique(self.filternames)
        mags = x2 * np.sin((t + x1) / x0 * 2.0 * np.pi)
        x = [x0, x1, x2, x3, x4, x5, x6, x7, x8]
        for f in filter_names:
            good = np.where(self.filternames == f)
            mags[good] += x[filter2index[f]]
        return mags


[docs] class PeriodicStarMetric(BaseMetric): """At each slice_point, run a Monte Carlo simulation to see how well a periodic source can be fit. Assumes a simple sin-wave light-curve, and generates Gaussain noise based in the 5-sigma limiting depth of each observation. Parameters ---------- period : `float` The period to check, in days. amplitude : `float` The amplitude of the sinusoidal light curve, in mags. phase : `float` The phase of the lightcurve at the time of the first observation. n_monte : `int` The number of noise realizations to make in the Monte Carlo. period_tol : `float` The fractional tolerance on the period to require in order for a star to be considered well-fit amp_tol : `float` The fractional tolerance on the amplitude. means : `list` [`float`] The mean magnitudes in ugrizy of the star. mag_tol : `float` The mean magnitude tolerance, in magnitudes, for the star to be considered well-fit. n_bands : `int` Number of bands that must be within mag_tol. seed : `int` Random number seed for the noise realizations. """ def __init__( self, metric_name="PeriodicStarMetric", mjd_col="observationStartMJD", m5_col="fiveSigmaDepth", filter_col="filter", period=10.0, amplitude=0.5, phase=2.0, n_monte=1000, period_tol=0.05, amp_tol=0.10, means=[20.0, 20.0, 20.0, 20.0, 20.0, 20.0], mag_tol=0.10, n_bands=3, seed=42, **kwargs, ): """ period: days (default 10) amplitude: mags (default 1) n_monte: number of noise realizations to make in the Monte Carlo period_tol: fractional tolerance on the period to demand for a star to be considered well-fit amp_tol: fractional tolerance on the amplitude to demand means: mean magnitudes for ugrizy mag_tol: Mean magnitude tolerance (mags) n_bands: Number of bands that must be within mag_tol seed: random number seed """ self.mjd_col = mjd_col self.m5_col = m5_col self.filter_col = filter_col super(PeriodicStarMetric, self).__init__( col=[self.mjd_col, self.m5_col, self.filter_col], units="Fraction Detected", metric_name=metric_name, **kwargs, ) self.period = period self.amplitude = amplitude self.phase = phase self.n_monte = n_monte self.period_tol = period_tol self.amp_tol = amp_tol self.means = np.array(means) self.mag_tol = mag_tol self.n_bands = n_bands np.random.seed(seed) self.filter2index = {"u": 3, "g": 4, "r": 5, "i": 6, "z": 7, "y": 8}
[docs] def run(self, data_slice, slice_point=None): # Bail if we don't have enough points # (need to fit mean magnitudes in each of the available bands - # self.means and for a period, amplitude, and phase) if data_slice.size < self.means.size + 3: return self.badval # Generate input for true light curve t = np.empty(data_slice.size, dtype=list(zip(["time", "filter"], [float, "|U1"]))) t["time"] = data_slice[self.mjd_col] - data_slice[self.mjd_col].min() t["filter"] = data_slice[self.filter_col] # If we are adding a distance modulus to the magnitudes if "distMod" in list(slice_point.keys()): mags = self.means + slice_point["distMod"] else: mags = self.means true_params = np.append(np.array([self.period, self.phase, self.amplitude]), mags) true_obj = PeriodicStar(t["filter"]) true_lc = true_obj(t["time"], *true_params) # Array to hold the fit results fits = np.zeros((self.n_monte, true_params.size), dtype=float) for i in np.arange(self.n_monte): snr = m52snr(true_lc, data_slice[self.m5_col]) dmag = 2.5 * np.log10(1.0 + 1.0 / snr) noise = np.random.randn(true_lc.size) * dmag # Suppress warnings about failing on covariance fit_obj = PeriodicStar(t["filter"]) with warnings.catch_warnings(): warnings.simplefilter("ignore") # If it fails to converge, save values that should fail later try: parm_vals, pcov = curve_fit( fit_obj, t["time"], true_lc + noise, p0=true_params, sigma=dmag ) except RuntimeError: parm_vals = true_params * 0 + np.inf fits[i, :] = parm_vals # Throw out any magnitude fits if there are no observations # in that filter ufilters = np.unique(data_slice[self.filter_col]) if ufilters.size < 9: for key in list(self.filter2index.keys()): if key not in ufilters: fits[:, self.filter2index[key]] = -np.inf # Find the fraction of fits that meet the "well-fit" criteria period_frac_err = np.abs((fits[:, 0] - true_params[0]) / true_params[0]) amp_frac_err = np.abs((fits[:, 2] - true_params[2]) / true_params[2]) mag_err = np.abs(fits[:, 3:] - true_params[3:]) n_bands = np.zeros(mag_err.shape, dtype=int) n_bands[np.where(mag_err <= self.mag_tol)] = 1 n_bands = np.sum(n_bands, axis=1) n_recovered = np.size( np.where( (period_frac_err <= self.period_tol) & (amp_frac_err <= self.amp_tol) & (n_bands >= self.n_bands) )[0] ) frac_recovered = float(n_recovered) / self.n_monte return frac_recovered