Source code for rubin_sim.maf.utils.maf_utils

__all__ = (
    "optimal_bins",
    "percentile_clipping",
    "radec2pix",
    "coadd_m5",
    "collapse_night",
    "load_inst_zeropoints",
)

import os
import warnings

import healpy as hp
import numpy as np
from rubin_scheduler.data import get_data_dir
from rubin_scheduler.utils import SysEngVals, int_binned_stat
from scipy.stats import binned_statistic

from rubin_sim.phot_utils import Bandpass, PhotometricParameters


[docs] def load_inst_zeropoints(): """Load up and return instrumental zeropoints and atmospheric extinctions. """ zp_inst = {} datadir = get_data_dir() for filtername in "ugrizy": # set gain and exptime to 1 so the instrumental zeropoint will be in # photoelectrons and per second phot_params = PhotometricParameters(nexp=1, gain=1, exptime=1, bandpass=filtername) bp = Bandpass() bp.read_throughput(os.path.join(datadir, "throughputs/baseline/", "total_%s.dat" % filtername)) zp_inst[filtername] = bp.calc_zp_t(phot_params) syseng = SysEngVals() k_atm = syseng.k_atm return zp_inst, k_atm
[docs] def coadd_m5(mags): """Coadded depth, assuming Gaussian noise.""" return 1.25 * np.log10(np.sum(10.0 ** (0.8 * np.array(mags))))
[docs] def collapse_night( data_slice, night_col="night", filter_col="filter", m5_col="fiveSigmaDepth", mjd_col="observationStartMJD", ): """Collapse a data_slice into per-filter, per-night values for the 'night', 'filter', 'median observationStartMJD', and 'fiveSigmaDepth'. """ filters = np.unique(data_slice[filter_col]) # Find the per-night, per-filter values night_slice = {} for filtername in filters: infilt = np.where(data_slice[filter_col] == filtername)[0] unight = np.unique(data_slice[night_col][infilt]) right = unight + 0.5 bins = [unight[0] - 0.5] + right.tolist() coadds, be, bn = binned_statistic( data_slice[night_col][infilt], data_slice[m5_col][infilt], bins=bins, statistic=coadd_m5, ) unights, median_mjd_per_night = int_binned_stat( data_slice[night_col][infilt], data_slice[mjd_col][infilt], statistic=np.median, ) night_slice[filtername] = np.array( list(zip(unight, median_mjd_per_night, coadds, filtername * len(unight))), dtype=[ (night_col, int), (mjd_col, float), (m5_col, float), (filter_col, "U1"), ], ) night_slice = np.concatenate([night_slice[f] for f in night_slice]) night_slice.sort(order=["observationStartMJD"]) return night_slice
[docs] def optimal_bins(datain, binmin=None, binmax=None, nbin_max=200, nbin_min=1, verbose=False): """ Set an 'optimal' number of bins using the Freedman-Diaconis rule. Parameters ---------- datain : `numpy.ndarray` or `numpy.ma.MaskedArray` The data for which we want to set the bin_size. binmin : `float` The minimum bin value to consider (if None, uses minimum data value). binmax : `float` The maximum bin value to consider (if None, uses maximum data value). nbin_max : `int` The maximum number of bins to create. Sometimes the 'optimal bin_size' implies an unreasonably large number of bins, if the data distribution is unusual. nbin_min : `int` The minimum number of bins to create. Default is 1. verbose : `bool` Turn off warning messages. This utility very often raises warnings and these should likely be logging messages at a lower logging level, but for now - just use the verbose flag to turn these off or on. Returns ------- nbins : `int` The number of bins. """ # if it's a masked array, only use unmasked values if hasattr(datain, "compressed"): data = datain.compressed() else: data = datain # Check that any good data values remain. if data.size == 0: nbins = nbin_max if verbose: warnings.warn( f"No unmasked data available for calculating optimal bin size: returning {nbins} bins" ) # Else proceed. else: if binmin is None: binmin = np.nanmin(data) if binmax is None: binmax = np.nanmax(data) cond = np.where((data >= binmin) & (data <= binmax))[0] # Check if any data points remain within binmin/binmax. if np.size(data[cond]) == 0: nbins = nbin_max warnings.warn( "No data available for calculating optimal bin size within range of %f, %f" % (binmin, binmax) + ": returning %i bins" % (nbins) ) else: iqr = np.percentile(data[cond], 75) - np.percentile(data[cond], 25) binwidth = 2 * iqr * (np.size(data[cond]) ** (-1.0 / 3.0)) nbins = (binmax - binmin) / binwidth if nbins > nbin_max: warnings.warn( "Optimal bin calculation tried to make %.0f bins, returning %i" % (nbins, nbin_max) ) nbins = nbin_max if nbins < nbin_min: warnings.warn( "Optimal bin calculation tried to make %.0f bins, returning %i" % (nbins, nbin_min) ) nbins = nbin_min if np.isnan(nbins): warnings.warn("Optimal bin calculation calculated NaN: returning %i" % (nbin_max)) nbins = nbin_max return int(nbins)
[docs] def percentile_clipping(data, percentile=95.0): """Calculate the minimum and maximum values of a distribution of points, after discarding data more than 'percentile' from the median. This is useful for determining useful data ranges for plots. Note that 'percentile' percent of the data is retained. Parameters ---------- data : `numpy.ndarray` The data to clip. percentile : `float` Retain values within percentile of the median. Returns ------- minimum, maximum : `float`, `float` The minimum and maximum values of the clipped data. """ lower_percentile = (100 - percentile) / 2.0 upper_percentile = 100 - lower_percentile min_value = np.percentile(data, lower_percentile) max_value = np.percentile(data, upper_percentile) return min_value, max_value
[docs] def radec2pix(nside, ra, dec): """Calculate the nearest healpixel ID of an RA/Dec array, assuming nside. Parameters ---------- nside : `int` The nside value of the healpix grid. ra : `numpy.ndarray`, (N,) The RA values to be converted to healpix ids, in radians. dec : `numpy.ndarray`, (N,) The Dec values to be converted to healpix ids, in radians. Returns ------- hpid : `numpy.ndarray`, (N,) The healpix ids. """ lat = np.pi / 2.0 - dec hpid = hp.ang2pix(nside, lat, ra) return hpid