__all__ = (
"generate_microlensing_slicer",
"MicrolensingMetric",
"microlensing_amplification",
"microlensing_amplification_fsfb",
"info_peak_before_t0",
"fisher_matrix",
"coefficients_pspl",
"coefficients_fsfb",
)
import os
from copy import deepcopy
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from rubin_scheduler.data import get_data_dir
from rubin_scheduler.utils import hpid2_ra_dec
import rubin_sim.maf.metrics as metrics
import rubin_sim.maf.slicers as slicers
from rubin_sim.maf.utils import m52snr
# Via Natasha Abrams nsabrams@college.harvard.edu
[docs]
def microlensing_amplification(t, impact_parameter=1, crossing_time=1825.0, peak_time=100, blending_factor=1):
"""The microlensing amplification
Parameters
----------
t : `float`
The time of observation (days)
impact_parameter : `float`
The impact paramter (0 means big amplification)
crossing_time : `float`
Einstein crossing time (days)
peak_time : `float`
The peak time (days)
blending_factor: `float`
The blending factor where 1 is unblended
"""
lightcurve_u = np.sqrt(impact_parameter**2 + ((t - peak_time) ** 2 / crossing_time**2))
amplified_mag = (lightcurve_u**2 + 2) / (
lightcurve_u * np.sqrt(lightcurve_u**2 + 4)
) * blending_factor + (1 - blending_factor)
return amplified_mag
[docs]
def microlensing_amplification_fsfb(t, impact_parameter=1, crossing_time=1825.0, peak_time=100, fs=20, fb=20):
"""The microlensing amplification in terms of source flux and blend flux
Parameters
----------
t : `float`
The time of observation (days)
impact_parameter : `float`
The impact paramter (0 means big amplification)
crossing_time : `float`
Einstein crossing time (days)
peak_time : `float`
The peak time (days)
"""
lightcurve_u = np.sqrt(impact_parameter**2 + ((t - peak_time) ** 2 / crossing_time**2))
amplified_mag = (lightcurve_u**2 + 2) / (lightcurve_u * np.sqrt(lightcurve_u**2 + 4)) * fs + fb
return amplified_mag
[docs]
def info_peak_before_t0(impact_parameter=1, crossing_time=100.0):
"""Time of Maximum Information before peak
The point of maximum information before peak indicates when in a
lightcurve with constant snr the contribution to the
characterization of the event timescale tE is at maximum. After that
point in timem the timescale characterization is decisively improved.
via Markus Hundertmark markus.hundertmark@uni-heidelberg.de
Parameters
----------
impact_parameter : `float`
The impact paramter (0 means big amplification)
crossing_time : `float`
Einstein crossing time (days)
"""
optimal_time = (
crossing_time
* np.sqrt(
-(impact_parameter**2) + np.sqrt(9 * impact_parameter**4 + 36 * impact_parameter**2 + 4) - 2
)
/ 2
)
return np.array(optimal_time)
[docs]
def coefficients_pspl(t, t0, te, u0, fs, fb):
"""Coefficients for calculating the first derivatives wrt t0,te,u0
in the Fisher matrix in an effective way, optimized with sympy and cse
via Markus Hundertmark markus.hundertmark@uni-heidelberg.de
Parameters
----------
t : `float`
The time of observation (days usually as JD or HJD)
u0 : `float`
The impact parameter (0 means high magnification)
te : `float`
Einstein crossing time (days)
t0 : `float`
Time of peak (days usually as JD or HJD)
fs : `float`
Source flux (counts/s but here fs = 10.**(-0.4*mag_source)
in the respective passband)
fb : `float`
Blend flux (counts/s but here fs = 10.**(-0.4*mag_blend)
in the respective passband)
"""
x0 = te ** (-2)
x1 = (t - t0) ** 2
x2 = u0**2 + x0 * x1
x3 = 1 / np.sqrt(x2)
x4 = 2 * x3
x5 = x2 + 4
x6 = fs / np.sqrt(x5)
x7 = x1 / te**3
x8 = x6 * x7
x9 = x2 + 2
x10 = x9 / x2 ** (3 / 2)
x11 = fs * x3 * x9 / x5 ** (3 / 2)
x12 = u0 * x6
x13 = x0 * (-2 * t + 2 * t0)
x14 = x13 * x6
c0 = x10 * x8 + x11 * x7 - x4 * x8
c1 = -u0 * x11 - x10 * x12 + x12 * x4
c2 = -1 / 2 * x10 * x14 - 1 / 2 * x11 * x13 + x14 * x3
return [c2, c1, c0]
[docs]
def coefficients_fsfb(t, t0, te, u0, fs, fb):
"""Coefficients for calculating the first derivatives wrt fs,fb
in the Fisher (information) matrix in an effective way, optimized
with sympy and cse this function needs to be evaluated for each
passband respectively.
via Markus Hundertmark markus.hundertmark@uni-heidelberg.de
Parameters
----------
t : `float`
The time of observation (days usually as JD or HJD)
u0 : `float`
The impact parameter (0 means high magnification)
te : `float`
Einstein crossing time (days)
t0 : `float`
Time of peak (days usually as JD or HJD)
fs : `float`
Source flux (counts/s but here fs = 10.**(-0.4*mag_source)
in the respective passband)
fb : `float`
Blend flux (counts/s but here fs = 10.**(-0.4*mag_blend)
in the respective passband)
"""
x0 = u0**2 + (t - t0) ** 2 / te**2
c0 = (x0 + 2) / (np.sqrt(x0) * np.sqrt(x0 + 4))
c1 = 1
return [c1, c0]
[docs]
def fisher_matrix(t, t0, te, u0, fs, fb, snr, filters="ugriz", filter_name="i"):
"""The Fisher (information) matrix relying on first derivatives
wrt t0,te,u0,fs,fb, relying on with cse optimized coefficents.
This implementation for the non-linear magnification is subject to the
respective underlying assumptions, i.e. relatively small uncertainties.
NB the Fisher matrix is using first derivatives, alternatively one
could consider second derivatives the resulting covariance matrix as
inverse of the Fisher matrix is a Cramer Rao lower bound of the
respective uncertainty.
The function returns the full information matrix for all t.
via Markus Hundertmark markus.hundertmark@uni-heidelberg.de
Parameters
----------
t : `float`
The time of observation (days usually as JD or HJD)
u0 : `float`
The impact parameter (0 means high magnification)
te : `float`
Einstein crossing time (days)
t0 : `float`
Time of peak (days usually as JD or HJD)
fs : `float`
Source flux (counts/s but here fs = 10.**(-0.4*mag_source)
in the respective passband)
fb : `float`
Blend flux (counts/s but here fs = 10.**(-0.4*mag_blend)
in the respective passband)
snr : `float`
Signal to noise ratio in order to infer the flux uncertainty
"""
# indices are 0:te, 1:u0, 2:t0
# 3:fs_i, 4:fb_i, 5:fs_i+1, 6: fb_i+1, etc. for ugrizy
filters = filters
npars = 2 * len(filters) + 3
fis_mat = np.zeros((npars, npars))
filter_index = np.where(filters == filter_name)[0]
# initialize result matrix
for i in range(len(t)):
t_value = t[i]
# err = flux_err[i]
matrix = np.zeros((npars, npars))
derivatives = coefficients_pspl(t_value, t0, te, u0, fs, fb)
fsfb_derivatives = 2 * len(filters) * [0]
fbdiff, fsdiff = coefficients_fsfb(t_value, t0, te, u0, fs, fb)
np.array(fsfb_derivatives)[filter_index * 2] = fsdiff
np.array(fsfb_derivatives)[filter_index * 2 + 1] = fbdiff
derivatives = derivatives + fsfb_derivatives
for idx in range(npars):
for jdx in range(idx, npars):
matrix[idx, jdx] = derivatives[idx] * derivatives[jdx]
# restore symmetrical entries
matrix = matrix + matrix.T - np.diag(np.diag(matrix))
# microlenisng_amplification function to replace
usqr = (t_value - t0) ** 2 / te**2 + u0**2
mu = (usqr + 2.0) / (usqr * (usqr + 4)) ** 0.5
# may need to check shape of snr and t
flux_err = (fs * mu + fb) / snr[i]
matrix = matrix / (flux_err**2)
fis_mat = fis_mat + matrix
return fis_mat
[docs]
class MicrolensingMetric(metrics.BaseMetric):
"""
Quantifies detectability of Microlensing events.
Can also return the number of datapoints within two crossing times
of the peak of event.
Parameters
----------
metric_calc : `str`
Type of metric.
If metric_calc == 'detect', returns the number of microlensing events
detected within certain parameters.
If metric_calc == 'Npts', returns the number of points
within two crossing times of the peak of the vent.
pts_needed : `int`
Number of an object's lightcurve points required to be above the
5-sigma limiting depth before it is considered detected.
time_before_peak: `int` or `str`
Number of days before lightcurve peak to qualify event as triggered.
If time_before_peak == 'optimal', the number of days before the
lightcurve peak is the time of maximal information.
detect: `bool`
By default we trigger which only looks at points before the peak
of the lightcurve. When detect = True, observations on either side
of the lightcurve are considered.
Notes
-----
Expects slice_point to have keys of:
peak_time : float (days)
crossing_time : float (days)
impact_parameter : float (positive)
blending_factors (optional): float (between 0 and 1)
"""
def __init__(
self,
metric_name="MicrolensingMetric",
metric_calc="detect",
mjd_col="observationStartMJD",
m5_col="fiveSigmaDepth",
filter_col="filter",
night_col="night",
pts_needed=2,
mag_dict=None,
detect_sigma=3.0,
time_before_peak=0,
detect=False,
**kwargs,
):
self.metric_calc = metric_calc
if metric_calc == "detect":
self.units = "Detected, 0 or 1"
elif metric_calc == "Npts":
self.units = "Npts within 2tE"
elif metric_calc == "Fisher":
self.units = "Characterized, 0 or 1"
else:
raise Exception('metric_calc must be "detect", "Npts" or "Fisher".')
if mag_dict is None:
# Mean mag of stars in each filter from TRIstarDensity map
mag_dict = {
"u": 25.2,
"g": 25.0,
"r": 24.5,
"i": 23.4,
"z": 22.8,
"y": 22.5,
}
self.mjd_col = mjd_col
self.m5_col = m5_col
self.filter_col = filter_col
self.night_col = night_col
self.pts_needed = pts_needed
self.detect_sigma = detect_sigma
self.time_before_peak = time_before_peak
self.detect = detect
self.mags = mag_dict
cols = [self.mjd_col, self.m5_col, self.filter_col, self.night_col]
super(MicrolensingMetric, self).__init__(
col=cols, units=self.units, metric_name=metric_name + "_" + self.metric_calc, **kwargs
)
[docs]
def run(self, data_slice, slice_point=None):
if self.detect is True and self.time_before_peak > 0:
raise Exception("When detect = True, time_before_peak must be zero")
# Generate the lightcurve for this object
# make t a kind of simple way
t = data_slice[self.mjd_col] - np.min(data_slice[self.night_col])
t = t - t.min()
#
# Try for if a blending factor slice was added
if "apparent_m_no_blend_u" in slice_point:
amplitudes = np.zeros(len(t))
individual_mags = True
else:
if "blending_factor" in slice_point:
amplitudes = microlensing_amplification(
t,
impact_parameter=slice_point["impact_parameter"],
crossing_time=slice_point["crossing_time"],
peak_time=slice_point["peak_time"],
blending_factor=slice_point["blending_factor"],
)
individual_mags = False
else:
amplitudes = microlensing_amplification(
t,
impact_parameter=slice_point["impact_parameter"],
crossing_time=slice_point["crossing_time"],
peak_time=slice_point["peak_time"],
)
individual_mags = False
filters = np.unique(data_slice[self.filter_col])
amplified_mags = amplitudes * 0
npars = 2 * len(filters) + 3
fis_mat = np.zeros((npars, npars))
fis_mat_wzeros = deepcopy(fis_mat)
for filtername in filters:
infilt = np.where(data_slice[self.filter_col] == filtername)[0]
if individual_mags is True:
fs = slice_point["apparent_m_no_blend_{}".format(filtername)]
fb = slice_point["apparent_m_{}".format(filtername)] - fs
amplitudes = microlensing_amplification_fsfb(
t,
impact_parameter=slice_point["impact_parameter"],
crossing_time=slice_point["crossing_time"],
peak_time=slice_point["peak_time"],
fs=fs,
fb=fb,
)
self.mags[filtername] = slice_point["apparent_m_{}".format(filtername)]
amplified_mags[infilt] = self.mags[filtername] - 2.5 * np.log10(amplitudes[infilt])
else:
amplified_mags[infilt] = self.mags[filtername] - 2.5 * np.log10(amplitudes[infilt])
# The SNR of each point in the light curve
snr = m52snr(amplified_mags, data_slice[self.m5_col])
# The magnitude uncertainties that go with amplified mags
mag_uncert = 2.5 * np.log10(1 + 1.0 / snr)
n_pre = []
n_post = []
for filtername in filters:
if self.metric_calc == "detect":
if self.time_before_peak == "optimal":
time_before_peak_optimal = info_peak_before_t0(
slice_point["impact_parameter"], slice_point["crossing_time"]
)
# observations pre-peak and in the given filter
infilt = np.where(
(data_slice[self.filter_col] == filtername)
& (t < (slice_point["peak_time"] - time_before_peak_optimal))
)[0]
else:
# observations pre-peak and in the given filter
infilt = np.where(
(data_slice[self.filter_col] == filtername)
& (t < (slice_point["peak_time"] - self.time_before_peak))
)[0]
# observations post-peak and in the given filter
outfilt = np.where(
(data_slice[self.filter_col] == filtername) & (t > slice_point["peak_time"])
)[0]
# Broadcast to calc the mag_i - mag_j
diffs = amplified_mags[infilt] - amplified_mags[infilt][:, np.newaxis]
diffs_uncert = np.sqrt(mag_uncert[infilt] ** 2 + mag_uncert[infilt][:, np.newaxis] ** 2)
diffs_post = amplified_mags[outfilt] - amplified_mags[outfilt][:, np.newaxis]
diffs_post_uncert = np.sqrt(
mag_uncert[outfilt] ** 2 + mag_uncert[outfilt][:, np.newaxis] ** 2
)
# Calculating this as a catalog-level detection. In theory,
# we could have a high SNR template image, so there would be
# little to no additional uncertianty from the subtraction.
sigma_above = np.abs(diffs) / diffs_uncert
sigma_above_post = np.abs(diffs_post) / diffs_post_uncert
# divide by 2 because array has i,j and j,i
n_above = np.size(np.where(sigma_above > self.detect_sigma)[0]) / 2
n_pre.append(n_above)
n_above_post = np.size(np.where(sigma_above_post > self.detect_sigma)[0]) / 2
n_post.append(n_above_post)
elif self.metric_calc == "Npts":
# observations pre-peak and in the given filter within 2tE
infilt = np.where(
(data_slice[self.filter_col] == filtername)
& (t < (slice_point["peak_time"]))
& (t > (slice_point["peak_time"] - slice_point["crossing_time"]))
& (snr > self.detect_sigma)
)[0]
# observations post-peak and in the given filter within 2tE
outfilt = np.where(
(data_slice[self.filter_col] == filtername)
& (t > (slice_point["peak_time"]))
& (t < (slice_point["peak_time"] + slice_point["crossing_time"]))
& (snr > self.detect_sigma)
)[0]
n_pre.append(len(infilt))
n_post.append(len(outfilt))
elif self.metric_calc == "Fisher":
if individual_mags is True:
fs = slice_point["apparent_m_no_blend_{}".format(filtername)]
fb = slice_point["apparent_m_{}".format(filtername)] - fs
else:
fs = self.mags[filtername]
# assuming that in populated areas of the galaxy
# the blend fraction is 0.5
fb = fs
fis_mat = fis_mat_wzeros + fisher_matrix(
t[infilt],
t0=slice_point["peak_time"],
te=slice_point["crossing_time"],
u0=slice_point["impact_parameter"],
fs=fs,
fb=fb,
snr=snr[infilt],
filters=filters,
filter_name=filtername,
)
# We remove entries if there's no data in a filter
# which would keep it from being inverted
# detectable information in dimensions:
fis_mat_wzeros = deepcopy(fis_mat)
mask_idx = np.where(np.diag(fis_mat) == 0)
fis_mat = np.delete(fis_mat, mask_idx[0], 0)
fis_mat = np.delete(fis_mat, mask_idx[0], 1)
try:
cov = np.linalg.inv(fis_mat)
sigmat_e_t_e = cov[0, 0] ** 0.5 / slice_point["crossing_time"]
# sigmau0_u0 = cov[1, 1] ** 0.5
# / slice_point["impact_parameter"]
# corr_btwn_t_eu0 = cov[0, 1] /
# (slice_point["crossing_time"]
# * slice_point["impact_parameter"])
except np.linalg.LinAlgError:
sigmat_e_t_e = np.inf
# sigmau0_u0 = np.inf
# corr_btwn_t_eu0 = np.inf
npts = np.sum(n_pre)
npts_post = np.sum(n_post)
if self.metric_calc == "detect":
if self.detect is True:
if npts >= self.pts_needed and npts_post >= self.pts_needed:
return 1
else:
return 0
else:
if npts >= self.pts_needed:
return 1
else:
return 0
elif self.metric_calc == "Npts":
return npts + npts_post
elif self.metric_calc == "Fisher":
# cutoff of sigmat_e_t_e < 0.1 for
# values determined by the 3sigma of
# pubished planet candidates
return sigmat_e_t_e
[docs]
def generate_microlensing_slicer(
min_crossing_time=1,
max_crossing_time=10,
t_start=1,
t_end=3652,
n_events=10000,
seed=42,
nside=128,
filtername="r",
):
"""Generate a UserPointSlicer with a population of microlensing events.
To be used with MicrolensingMetric
Parameters
----------
min_crossing_time : `float`
The minimum crossing time for the events generated (days)
max_crossing_time : `float`
The max crossing time for the events generated (days)
t_start : `float`
The night to start generating peaks (days)
t_end : `float`
The night to end generating peaks (days)
n_events : `int`
Number of microlensing events to generate
seed : `float`
Random number seed
nside : `int`
HEALpix nside, used to pick which stellar density map to load
filtername : `str`
The filter to use for the stellar density map
Returns
-------
slicer : `maf.UserPointsSlicer`
A slicer populated by microlensing events
(each slice_point is a different event)
"""
# Seed the random number generator and generate random parameters
rng = np.random.default_rng(seed)
crossing_times = rng.uniform(low=min_crossing_time, high=max_crossing_time, size=n_events)
peak_times = rng.uniform(low=t_start, high=t_end, size=n_events)
impact_paramters = rng.uniform(low=0, high=1, size=n_events)
map_dir = os.path.join(get_data_dir(), "maps", "TriMaps")
data = np.load(os.path.join(map_dir, "TRIstarDensity_%s_nside_%i.npz" % (filtername, nside)))
star_density = data["starDensity"].copy()
# magnitude bins
bins = data["bins"].copy()
data.close()
star_mag = 22
bin_indx = np.where(bins[1:] >= star_mag)[0].min()
density_used = star_density[:, bin_indx].ravel()
order = np.argsort(density_used)
# I think the model might have a few outliers at the extreme,
# let's truncate it a bit
density_used[order[-10:]] = density_used[order[-11]]
# now, let's draw N from that distribution squared
dist = density_used[order] ** 2
cumm_dist = np.cumsum(dist)
cumm_dist = cumm_dist / np.max(cumm_dist)
uniform_draw = rng.uniform(size=n_events)
indexes = np.floor(np.interp(uniform_draw, cumm_dist, np.arange(cumm_dist.size)))
hp_ids = order[indexes.astype(int)]
gal_l, gal_b = hpid2_ra_dec(nside, hp_ids, nest=True)
c = SkyCoord(l=gal_l * u.deg, b=gal_b * u.deg, frame="galactic").transform_to("icrs")
ra, dec = c.ra.deg, c.dec.deg
# Set up the slicer to evaluate the catalog we just made
slicer = slicers.UserPointsSlicer(ra, dec, lat_lon_deg=True, badval=0)
# Add any additional information about each object to the slicer
slicer.slice_points["peak_time"] = peak_times
slicer.slice_points["crossing_time"] = crossing_times
slicer.slice_points["impact_parameter"] = impact_paramters
return slicer