Source code for rubin_sim.maf.maf_contrib.kne_metrics

__all__ = ("get_kne_filename", "KnLc", "KNePopMetric", "generate_kn_pop_slicer")

import glob
import os

import numpy as np
from rubin_scheduler.data import get_data_dir
from rubin_scheduler.utils import SURVEY_START_MJD, uniform_sphere

from rubin_sim.maf.metrics import BaseMetric
from rubin_sim.maf.slicers import UserPointsSlicer
from rubin_sim.maf.utils import m52snr
from rubin_sim.phot_utils import DustValues


[docs] def get_kne_filename(inj_params_list=None): """Given kilonova parameters, get the filename from the grid of models developed by M. Bulla Parameters ---------- inj_params_list : `list` [`dict`] parameters for the kilonova model such as mass of the dynamical ejecta (mej_dyn), mass of the disk wind ejecta (mej_wind), semi opening angle of the cylindrically-symmetric ejecta fan ('phi'), and viewing angle ('theta'). For example inj_params_list = [{'mej_dyn': 0.005, 'mej_wind': 0.050, 'phi': 30, 'theta': 25.8}] """ # Get files, model grid developed by M. Bulla datadir = get_data_dir() file_list = glob.glob(os.path.join(datadir, "maf", "bns", "*.dat")) # If no specific parameters passed - return everything. if inj_params_list is None or len(inj_params_list) == 0: return file_list # Otherwise find the parameters for each file and # then find the relevant matches. params = {} matched_files = [] for filename in file_list: key = filename.replace(".dat", "").split("/")[-1] params[key] = {} params[key]["filename"] = filename key_split = key.split("_") # Binary neutron star merger models if key_split[0] == "nsns": mejdyn = float(key_split[2].replace("mejdyn", "")) mejwind = float(key_split[3].replace("mejwind", "")) phi0 = float(key_split[4].replace("phi", "")) theta = float(key_split[5]) params[key]["mej_dyn"] = mejdyn params[key]["mej_wind"] = mejwind params[key]["phi"] = phi0 params[key]["theta"] = theta # Neutron star--black hole merger models elif key_split[0] == "nsbh": mej_dyn = float(key_split[2].replace("mejdyn", "")) mej_wind = float(key_split[3].replace("mejwind", "")) phi = float(key_split[4].replace("phi", "")) theta = float(key_split[5]) params[key]["mej_dyn"] = mej_dyn params[key]["mej_wind"] = mej_wind params[key]["phi"] = phi params[key]["theta"] = theta for key in params.keys(): for inj_params in inj_params_list: match = all([np.isclose(params[key][var], inj_params[var]) for var in inj_params.keys()]) if match: matched_files.append(params[key]["filename"]) print(f"Found match for {inj_params}") print( f"Found matches for {len(matched_files)}/{len(inj_params_list)} \ sets of parameters" ) return matched_files
[docs] class KnLc: """Read in some KNe lightcurves Parameters ---------- file_list : `list` [`str`] or None List of file paths to load. If None, loads up all the files from data/bns/ """ def __init__(self, file_list=None): if file_list is None: datadir = get_data_dir() # Get files, model grid developed by M. Bulla file_list = glob.glob(os.path.join(datadir, "maf", "bns", "*.dat")) filts = ["u", "g", "r", "i", "z", "y"] magidxs = [1, 2, 3, 4, 5, 6] # Let's organize the data in to a list of dicts for easy lookup self.data = [] for filename in file_list: mag_ds = np.loadtxt(filename) t = mag_ds[:, 0] new_dict = {} for ii, (filt, magidx) in enumerate(zip(filts, magidxs)): new_dict[filt] = {"ph": t, "mag": mag_ds[:, magidx]} self.data.append(new_dict)
[docs] def interp(self, t, filtername, lc_indx=0): """Do the interpolation of the lightcurve for a given time and filter. Parameters ---------- t : `np.ndarray`, (N,) The times to interpolate the light curve to. filtername : `str` The filter. one of ugrizy lc_index : `int`, optional Which file to use. Returns ------- result : `np.ndarray`, (N,) Array of lightcurve brightnesses at the times of t.= """ result = np.interp( t, self.data[lc_indx][filtername]["ph"], self.data[lc_indx][filtername]["mag"], left=99, right=99, ) return result
[docs] class KNePopMetric(BaseMetric): def __init__( self, metric_name="KNePopMetric", mjd_col="observationStartMJD", m5_col="fiveSigmaDepth", filter_col="filter", night_col="night", pts_needed=2, file_list=None, mjd0=SURVEY_START_MJD, output_lc=False, badval=-666, **kwargs, ): maps = ["DustMap"] self.mjd_col = mjd_col self.m5_col = m5_col self.filter_col = filter_col self.night_col = night_col self.pts_needed = pts_needed # `bool` variable, if True the light curve will be exported self.output_lc = output_lc self.lightcurves = KnLc(file_list=file_list) self.mjd0 = mjd0 dust_properties = DustValues() self.ax1 = dust_properties.ax1 cols = [self.mjd_col, self.m5_col, self.filter_col, self.night_col] super(KNePopMetric, self).__init__( col=cols, units="Detected, 0 or 1", metric_name=metric_name, maps=maps, badval=badval, **kwargs, ) def _multi_detect(self, around_peak): """Simple detection criteria: detect at least a certain number of times """ result = 1 # Detected data points if np.size(around_peak) < self.pts_needed: return 0 return result def _ztfrest_simple( self, around_peak, mags, mags_unc, t, filters, min_dt=0.125, min_fade=0.3, max_rise=-1.0, select_red=False, select_blue=False, ): """Selection criteria based on rise or decay rate; simplified version of the methods employed by the ZTFReST project (Andreoni & Coughlin et al., 2021) Parameters ---------- around_peak : `np.ndarray`, (N,) indexes corresponding to 5sigma detections mags : `np.ndarray`, (N,) magnitudes obtained interpolating models on the data_slice t : `np.ndarray`, (N,) relative times filters : `np.ndarray`, (N,) filters in which detections happened min_dt : `float` minimum time gap between first and last detection in a given band min_fade : `float` fade rate threshold (positive, mag/day) max_rise : `float` rise rate threshold (negative, mag/day) select_red : `bool` if True, only red 'izy' filters will be considered select_blue : `bool` if True, only blue 'ugr' filters will be considered Examples ---------- A transient: rising by 0.74 mag/day will pass a threshold max_rise=-0.5 rising by 0.74 mag/day will not pass a threshold max_rise=-1.0 fading by 0.6 mag/day will pass a threshold min_fade=0.3 fading by 0.2 mag/day will not pass a threshold min_fade=0.3 """ result = 1 # Quick check on the number of detected points if np.size(around_peak) < self.pts_needed: return 0 # Quick check on the time gap between first and last detection elif np.max(t[around_peak]) - np.min(t[around_peak]) < min_dt: return 0 else: evol_rate = [] fil = [] # Check time gaps and rise or fade rate for each band for f in set(filters): if select_red is True and f not in "izy": continue elif select_blue is True and f not in "ugr": continue times_f = t[around_peak][np.where(filters == f)[0]] mags_f = mags[around_peak][np.where(filters == f)[0]] mags_unc_f = mags_unc[around_peak][np.where(filters == f)[0]] # Check if the evolution is significant enough idx_max = np.argmax(mags_f) idx_min = np.argmin(mags_f) if mags_f[idx_min] + mags_unc_f[idx_min] < mags_f[idx_max] - mags_unc_f[idx_max]: signif = True else: signif = False # Time difference between max and min dt_f = np.abs(times_f[idx_max] - times_f[idx_min]) # Get the evolution rate, if the time gap condition is met if dt_f > min_dt and signif is True: # Calculate evolution rate evol_rate_f = (np.max(mags_f) - np.min(mags_f)) / (times_f[idx_max] - times_f[idx_min]) evol_rate.append(evol_rate_f) else: evol_rate.append(0) fil.append(f) if len(evol_rate) == 0: return 0 # Check if the conditions on the evolution rate are met if np.max(evol_rate) < min_fade and np.min(evol_rate) > max_rise: return 0 return result def _multi_color_detect(self, filters): """Color-based simple detection criteria: detect at least twice, with at least two filters """ result = 1 # detected in at least two filters if np.size(np.unique(filters)) < 2: return 0 return result def _red_color_detect(self, filters, min_det=4): """Detected at least min_det times in either izy colors Parameters ---------- filters : `np.ndarray`, (N,) filters in which detections happened min_det : `float` or `int` minimum number of detections required in izy bands """ result = 1 # Number of detected points in izy bands n_red_det = ( np.size(np.where(filters == "i")[0]) + np.size(np.where(filters == "z")[0]) + np.size(np.where(filters == "y")[0]) ) # Condition if n_red_det < min_det: return 0 return result def _blue_color_detect(self, filters, min_det=4): """Detected at least min_det times in either ugr colors Parameters ---------- filters : `np.ndarray`, (N,) filters in which detections happened min_det : `float` or `int` minimum number of detections required in ugr bands """ result = 1 # Number of detected points in ugr bands n_blue_det = ( np.size(np.where(filters == "u")[0]) + np.size(np.where(filters == "g")[0]) + np.size(np.where(filters == "r")[0]) ) # Condition if n_blue_det < min_det: return 0 return result
[docs] def run(self, data_slice, slice_point=None): result = {} t = data_slice[self.mjd_col] - self.mjd0 - slice_point["peak_time"] mags = np.zeros(t.size, dtype=float) for filtername in np.unique(data_slice[self.filter_col]): infilt = np.where(data_slice[self.filter_col] == filtername) mags[infilt] = self.lightcurves.interp(t[infilt], filtername, lc_indx=slice_point["file_indx"]) # Apply dust extinction on the light curve a_x = self.ax1[filtername] * slice_point["ebv"] mags[infilt] += a_x distmod = 5 * np.log10(slice_point["distance"] * 1e6) - 5.0 mags[infilt] += distmod # Find the detected points around_peak = np.where((t > 0) & (t < 30) & (mags < data_slice[self.m5_col]))[0] # Filters in which the detections happened filters = data_slice[self.filter_col][around_peak] # Magnitude uncertainties with Gaussian approximation snr = m52snr(mags, data_slice[self.m5_col]) mags_unc = 2.5 * np.log10(1.0 + 1.0 / snr) result["multi_detect"] = self._multi_detect(around_peak) result["ztfrest_simple"] = self._ztfrest_simple( around_peak, mags, mags_unc, t, filters, select_red=False ) result["ztfrest_simple_red"] = self._ztfrest_simple( around_peak, mags, mags_unc, t, filters, select_red=True ) result["ztfrest_simple_blue"] = self._ztfrest_simple( around_peak, mags, mags_unc, t, filters, select_blue=True ) result["multi_color_detect"] = self._multi_color_detect(filters) result["red_color_detect"] = self._red_color_detect(filters) result["blue_color_detect"] = self._blue_color_detect(filters) # Export the light curve if self.output_lc is True: mags[np.where(mags > 50)[0]] = 99.0 result["lc"] = [ data_slice[self.mjd_col], mags, mags_unc, data_slice[self.m5_col], data_slice[self.filter_col], ] result["lc_colnames"] = ("t", "mag", "mag_unc", "maglim", "filter") return result
def reduce_multi_detect(self, metric): return metric["multi_detect"] def reduce_ztfrest_simple(self, metric): return metric["ztfrest_simple"] def reduce_ztfrest_simple_red(self, metric): return metric["ztfrest_simple_red"] def reduce_ztfrest_simple_blue(self, metric): return metric["ztfrest_simple_blue"] def reduce_multi_color_detect(self, metric): return metric["multi_color_detect"] def reduce_red_color_detect(self, metric): return metric["red_color_detect"] def reduce_blue_color_detect(self, metric): return metric["blue_color_detect"]
[docs] def generate_kn_pop_slicer( t_start=1, t_end=3652, n_events=10000, seed=42, n_files=308, d_min=10, d_max=300, ra=None, dec=None, ): """Generate a population of KNe events, and put the info about them into a UserPointSlicer object Parameters ---------- t_start : `float`, optional The night to start kilonova events on (days) t_end : `float`, optional The final night of kilonova events n_events : `int`, optional The number of kilonova events to generate seed : `float`, optional The seed passed to np.random n_files : `int`, optional The number of different kilonova lightcurves to use This should match the length of the filenames list passed to the KNePopMetric directly. d_min : `float` or `int`, optional Minimum luminosity distance (Mpc) d_max : `float` or `int`, optional Maximum luminosity distance (Mpc) ra, dec : `np.ndarray`, (N,) or None The ra and dec to use for event positions. Generates uniformly on the spehere if None. (degrees) """ def rndm(a, b, g, size=1): """Power-law gen for pdf(x) propto x^{g-1} for a<=x<=b""" r = np.random.random(size=size) ag, bg = a**g, b**g return (ag + (bg - ag) * r) ** (1.0 / g) if ra is None: ra, dec = uniform_sphere(n_events, seed=seed) peak_times = np.random.uniform(low=t_start, high=t_end, size=n_events) file_indx = np.floor(np.random.uniform(low=0, high=n_files, size=n_events)).astype(int) # Define the distance distance = rndm(d_min, d_max, 4, size=n_events) # Set up the slicer to evaluate the catalog we just made slicer = UserPointsSlicer(ra, dec, lat_lon_deg=True, badval=0) # Add any additional information about each object to the slicer slicer.slice_points["peak_time"] = peak_times slicer.slice_points["file_indx"] = file_indx slicer.slice_points["distance"] = distance return slicer