import glob
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import rubin_sim.maf.db as db
import rubin_sim.maf.metric_bundles as metricBundles
[docs]
def microlensing_fom(
save_folder,
result_db_path,
metric_data_path,
figsize=None,
figure_name="microlensing_fom",
):
"""
Processes a folder, puts together results for
discovery/detect metric, Npts metric, and Fisher metric,
and plots them in the four tE bins of 1 - 10 days,
10 - 30 days, 30 - 100 days, and 100 - 1000 days.
Parameters
----------
result_db_path : `str`
Path to the directory storing the result databases
generated by MAF.
metric_data_path : `str`
Path to the directory storing the npz files
generated by MAF.
"""
# get a dictionary of resultDb from given directory
result_dbs = get_results_dbs(result_db_path)
# the following line will be useful if you did not run MAF on all opsims
run_names = list(result_dbs.keys())
# retrieve metricBundles for each opsim run and store them in a dictionary
bundle_dicts = {}
for run_name in result_dbs:
bundle_dicts[run_name] = bundle_dict_from_disk(result_dbs[run_name], run_name, metric_data_path)
# generates results and metric info from default name of file
results = np.zeros(len(list(bundle_dicts.keys())))
results_compare = []
run_names = []
metric_types = []
min_t_es = np.zeros(len(list(bundle_dicts.keys())))
max_t_es = np.zeros(len(list(bundle_dicts.keys())))
for run in range(len(list(bundle_dicts.keys()))):
npz = np.load(
result_db_path + "/" + list(bundle_dicts.keys())[run] + ".npz",
allow_pickle=True,
)
relevant_columns = ["metric_values", "mask"]
df = pd.DataFrame.from_dict({item: npz[item] for item in relevant_columns})
run_name, metric_type, min_t_e, max_t_e = parse_t_e_run_types(list(bundle_dicts.keys())[run])
run_names.append(run_name)
metric_types.append(metric_type)
min_t_es[run] = min_t_e
max_t_es[run] = max_t_e
results[run] = get_results(df, metric_type)
if metric_type == "Npts":
nan_to_be = np.where(df["metric_values"] >= 10e10)[0]
df["metric_values"][nan_to_be] = np.nan
results_compare.append(df["metric_values"])
run_names = np.array(run_names)
metric_types = np.array(metric_types)
results_compare = np.array(results_compare)
plot_fom(
results,
run_names,
metric_types,
min_t_es,
max_t_es,
save_folder,
figure_name,
figsize=figsize,
)
plot_compare(results_compare, run_names, metric_types, min_t_es, max_t_es, save_folder)
return
[docs]
def parse_t_e_run_types(name):
"""
Parses names of MicrolensingMetric file names
Parameters
----------
name : `str`
A MicrolensingMetric file name
"""
split_name = name.split("MicrolensingMetric")
run_name = split_name[0][:-1]
metric_type = split_name[1].split("_")[1]
min_t_e = split_name[1].split("_")[3]
max_t_e = split_name[1].split("_")[4]
return run_name, metric_type, min_t_e, max_t_e
[docs]
def get_results(df, run_type, fisher_sigmat_e_t_e_cutoff=0.1):
"""
Plots the results from the discovery/detect metric, Npts metric,
and Fisher metric in three sub plots
Parameters
----------
df : `pandas.Dataframe`
Pandas dataframe of the results npz file
run_types : `np.ndarray`, (N,)
Array of strings describing microlensing metric type:
either 'detect', 'Npts', or 'Fisher' as parsed by the file name
fisher_sigmat_e_t_e_cutoff : `float`
Maximum normalized uncertainty in tE (sigmatE/tE) as determined by
3sigma values of pubished planet microlensing candidates
"""
total = len(df)
if run_type == "detect":
# Fraction of discovered/detected events
result = len(np.where(df["metric_values"] == 1)[0]) / total
elif run_type == "Npts":
# Average number of points per lightcurve
result = (
sum(
df["metric_values"][~np.isnan(df["metric_values"])][df["metric_values"] >= 0][
df["metric_values"] <= 10e10
]
)
/ total
)
elif run_type == "Fisher":
# Fraction of events with sigmatE/tE below the cutoff of 0.1
result = len(np.where(df["metric_values"] < fisher_sigmat_e_t_e_cutoff)[0]) / total
return result
[docs]
def plot_fom(results, run_names, run_types, min_t_e, max_t_e, save_folder, figure_name, figsize):
"""
Plots the results from the discovery/detect metric, Npts metric,
and Fisher metric in three sub plots
Parameters
----------
results : `np.ndarray`, (N,)
Results from the MicrolensingMetric from get_results() from the
respective microlensing metric type
run_names : `np.ndarray`, (N,)
Array of names of the OpSim run that was used in the metric
run_types : `np.ndarray`, (N,)
Array of strings describing microlensing metric type:
either 'detect', 'Npts', or 'Fisher' as parsed by the file name
min_t_e : `np.ndarray`, (N,)
Array of values describing the minium einstein crossing time (tE)
as parsed by the file name
max_t_e : `np.ndarray`, (N,)
Array of values describing the maximum einstein crossing time (tE)
as parsed by the file name
save_folder : `str`
String of folder name to save figure
figure_name : `str`
String of figure name
figsize : (`int`, `int`)
Tuple of figure size in inches.
Default is None, which sets figsize = (25, 30)
"""
if figsize is None:
figsize = (25, 30)
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, sharey=True, figsize=figsize)
plt.tight_layout()
plt.subplots_adjust(wspace=0, hspace=0)
subfig_list = [ax1, ax2, ax3]
plt.rcdefaults()
font = {"weight": "heavy", "size": 30}
plt.rc("font", **font)
t_e_range_list = []
time_run_names = []
for i, j in zip(np.unique(min_t_e), np.unique(max_t_e)):
idx_in_range = np.where((min_t_e >= i) & (max_t_e <= j))
t_e_range_list.append(idx_in_range)
time_run_names.append("tE {}-{} days".format(int(i), int(j)))
detect_runs_idx = np.where(run_types == "detect")
npts_runs_idx = np.where(run_types == "Npts")
fisher_runs_idx = np.where(run_types == "Fisher")
run_type_list = [detect_runs_idx, npts_runs_idx, fisher_runs_idx]
for t_e_range in range(len(t_e_range_list)):
for run_type in range(len(run_type_list)):
# sorted alphabetically according to name of run
idx_list = list(
zip(
np.intersect1d(t_e_range_list[t_e_range], run_type_list[run_type]),
run_names[np.intersect1d(t_e_range_list[t_e_range], run_type_list[run_type])],
)
)
idx_list.sort(key=lambda x: x[1])
sorted_idxs = np.array([x[0] for x in idx_list])
subfig_list[run_type].plot(
results[sorted_idxs],
run_names[sorted_idxs],
label=time_run_names[t_e_range],
marker=".",
markersize=15,
linewidth=2.5,
)
ax3.legend(bbox_to_anchor=(1, 1), fontsize=20)
plt.tight_layout()
plt.subplots_adjust(bottom=0.05)
ax1.set_xlabel("Discovery Efficiency")
ax2.set_xlabel("Avg Number of Points")
ax2.set_xscale("log")
ax3.set_xlabel("Characaterization Efficiency \n ($\\sigma_{t_E}/t_E$ < 0.1)")
plt.savefig(save_folder + "/" + figure_name + ".png", bbox_inches="tight")
return
[docs]
def plot_compare(results, run_names, run_types, min_t_e, max_t_e, save_folder, npts_required=10):
"""
Plots confusion matrix type plots comparing fraction detected,
characterized (via Fisher),
and fraction of events with at least npts_required points within 2 tE
Parameters
----------
results : `np.ndarray`, (N,)
Results from the MicrolensingMetric from get_results() from the
respective microlensing metric type
run_names : `np.ndarray`, (N,)
Array of names of the OpSim run that was used in the metric
run_types : `np.ndarray`, (N,)
Array of strings describing microlensing metric type:
either 'detect', 'Npts', or 'Fisher' as parsed by the file name
min_t_e : `np.ndarray`, (N,)
Array of values describing the minium einstein crossing time (tE)
as parsed by the file name
max_t_e : `np.ndarray`, (N,)
Array of values describing the maximum einstein crossing time (tE)
as parsed by the file name
save_folder : `str`
String of folder name to save figures
npts_required : `int`
Number of poitns within 2tE required for the number of points fraction.
"""
plt.rcdefaults()
font = {"weight": "heavy", "size": 20}
plt.rc("font", **font)
t_e_range_list = []
time_run_names = []
for i, j in zip(np.unique(min_t_e), np.unique(max_t_e)):
idx_in_range = np.where((min_t_e >= i) & (max_t_e <= j))
t_e_range_list.append(idx_in_range)
time_run_names.append("tE {}-{} days".format(int(i), int(j)))
detect_runs_idx = np.where(run_types == "detect")
npts_runs_idx = np.where(run_types == "Npts")
fisher_runs_idx = np.where(run_types == "Fisher")
run_name_list = np.unique(run_names)
for t_e_range in range(len(t_e_range_list)):
for run_name in run_name_list:
run_name_idxs = np.where(run_names == run_name)
t_e_run_name_interesct = np.intersect1d(t_e_range_list[t_e_range], run_name_idxs)
detect_results = results[np.intersect1d(t_e_run_name_interesct, detect_runs_idx)]
npts_results = results[np.intersect1d(t_e_run_name_interesct, npts_runs_idx)]
fisher_results = results[np.intersect1d(t_e_run_name_interesct, fisher_runs_idx)]
detected_fisher_comparison_matrix = detected_fisher_comparison(fisher_results, detect_results)
fisher_npts_comparison_matrix = fisher_npts_comparison(fisher_results, npts_results)
detected_npts_comparison_matrix = detected_npts_comparison(detect_results, npts_results)
confusion_matrix_plot(
detected_fisher_comparison_matrix,
"Discovered",
"Characterized",
run_name,
time_run_names[t_e_range],
save_folder,
)
confusion_matrix_plot(
fisher_npts_comparison_matrix,
"More than {} Points".format(npts_required),
"Characterized",
run_name,
time_run_names[t_e_range],
save_folder,
)
confusion_matrix_plot(
detected_npts_comparison_matrix,
"More than {} Points".format(npts_required),
"Detected",
run_name,
time_run_names[t_e_range],
save_folder,
)
return
[docs]
def confusion_matrix_plot(comparison_matrix, xlabel, ylabel, run_name, t_e_range, save_folder):
"""
Plots a confusion matrix type plot comparing two metric types.
Parameters
----------
comparison_matrix : `np.ndarray`, (N,)`
Array comparing two metric types (A and B) with the following shape:
[[(Yes A and Yes B), (Yes A and No B)], [(No A and Yes B),
(No A and No B)]]
where Yes A and Yes B are the number of events that pass both
the A and B criteria.
xlabel : `str`
Sring of xlabel (also used in file name of figure)
ylabel : `str`
Sring of ylabel (also used in file name of figure)
run_name : `str`
Name of the OpSim run that was used in the metric
(used in labels and file name)
t_e_range : `str`
String of the range of the tE (used in labels and file name)
save_folder : `str`
String of folder name to save figures
"""
fig, ax = plt.subplots(figsize=(5, 5))
ax.matshow(comparison_matrix, cmap=plt.cm.Blues, alpha=0.3)
for i in range(len(comparison_matrix[0])):
for j in range(len(comparison_matrix[1])):
ax.text(
x=j,
y=i,
s="{}".format(comparison_matrix[i, j]),
va="center",
ha="center",
size="medium",
)
ax.set(ylabel=ylabel, xlabel=xlabel, title=run_name + "\n" + t_e_range + "\n")
ax.set_xticklabels([np.nan, "Yes", "No"])
ax.set_yticklabels([np.nan, "Yes", "No"])
plt.tight_layout()
plt.savefig(save_folder + "/{}_{}_{}_{}.png".format(run_name, t_e_range, ylabel, xlabel))
plt.show()
plt.close()
return
[docs]
def detected_fisher_comparison(fisher_results, detect_results, fisher_sigmat_e_t_e_cutoff=0.1):
"""
Returns an array of the following form where
A = fisher criteria and B = detection criteria:
[[(Yes A and Yes B), (Yes A and No B)], [(No A and Yes B),
(No A and No B)]]
where Yes A and Yes B are the number of events that pass both the
A and B criteria.
Parameters
----------
fisher_results : `np.ndarray`, (N,)
Array of results from running the Fisher metric of the
microlensing metric
detect_results : `np.ndarray`, (N,)
Array of results from running the detect metric of the
microlensing metric
fisher_sigmat_e_t_e_cutoff : `float`
Maximum normalized uncertainty in tE (sigmatE/tE) as determined by
3sigma values of pubished planet microlensing candidates
"""
char_detect = np.where((fisher_results < fisher_sigmat_e_t_e_cutoff) & (detect_results == 1))[0]
char_ndetect = np.where((fisher_results < fisher_sigmat_e_t_e_cutoff) & (detect_results == 0))[0]
nchar_detect = np.where((fisher_results > fisher_sigmat_e_t_e_cutoff) & (detect_results == 1))[0]
nchar_ndetect = np.where((fisher_results > fisher_sigmat_e_t_e_cutoff) & (detect_results == 0))[0]
return np.array([[len(char_detect), len(char_ndetect)], [len(nchar_detect), len(nchar_ndetect)]])
[docs]
def fisher_npts_comparison(fisher_results, npts_results, npts_required=10, fisher_sigmat_e_t_e_cutoff=0.1):
"""
Returns an array of the following form where
A = fisher criteria and B = npts criteria:
[[(Yes A and Yes B), (Yes A and No B)], [(No A and Yes B),
(No A and No B)]]
where Yes A and Yes B are the number of events that pass both the
A and B criteria.
Parameters
----------
fisher_results : `np.ndarray`, (N,)
Array of results from running the Fisher metric of the
microlensing metric
npts_results : `np.ndarray`, (N,)
Array of results from running the Npts metric of the
microlensing metric
npts_required : `int`
Number of poitns within 2tE required for the number of points fraction.
fisher_sigmat_e_t_e_cutoff : `float`
Maximum normalized uncertainty in tE (sigmatE/tE) as determined by
3sigma values of pubished planet microlensing candidates
"""
char_npts = np.where((fisher_results < fisher_sigmat_e_t_e_cutoff) & (npts_results > npts_required))[0]
char_nnpts = np.where((fisher_results < fisher_sigmat_e_t_e_cutoff) & (npts_results < npts_required))[0]
nchar_npts = np.where((fisher_results > fisher_sigmat_e_t_e_cutoff) & (npts_results > npts_required))[0]
nchar_nnpts = np.where((fisher_results > fisher_sigmat_e_t_e_cutoff) & (npts_results < npts_required))[0]
return np.array([[len(char_npts), len(char_nnpts)], [len(nchar_npts), len(nchar_nnpts)]])
[docs]
def detected_npts_comparison(detect_results, npts_results, npts_required=10):
"""
Returns an array of the following form where
A = detect criteria and B = npts criteria:
[[(Yes A and Yes B), (Yes A and No B)], [(No A and Yes B),
(No A and No B)]]
where Yes A and Yes B are the number of events that pass both the
A and B criteria.
Parameters
----------
detect_results : `np.ndarray`, (N,)
Array of results from running the detect metric of the
microlensing metric
npts_results : `np.ndarray`, (N,)
Array of results from running the Npts metric of the
microlensing metric
npts_required : `int`
Number of poitns within 2tE required for the number of points fraction.
"""
detect_npts = np.where((detect_results == 1) & (npts_results > npts_required))[0]
detect_nnpts = np.where((detect_results == 1) & (npts_results < npts_required))[0]
ndetect_npts = np.where((detect_results == 0) & (npts_results > npts_required))[0]
ndetect_nnpts = np.where((detect_results == 0) & (npts_results < npts_required))[0]
return np.array([[len(detect_npts), len(detect_nnpts)], [len(ndetect_npts), len(ndetect_nnpts)]])
[docs]
def get_results_dbs(result_db_path):
"""
Create a dictionary of result_db from result_db files
via PCW Hackathan 2020 Resources
Parameters
----------
result_db_path : `str`
Path to the directory storing the result databases
generated by MAF.
Returns
-------
result_dbs : `dict`
A dictionary containing the ResultDb objects
reconstructed from result databases in the provided directory.
"""
result_dbs = {}
result_db_list = glob.glob(os.path.join(result_db_path, "*_result.db"))
for result_db in result_db_list:
run_name = os.path.basename(result_db).rsplit("_", 1)[0]
result_db = db.ResultsDb(database=result_db)
# Don't add empty results.db file,
if len(result_db.getAllMetricIds()) > 0:
result_dbs[run_name] = result_db
return result_dbs
[docs]
def bundle_dict_from_disk(result_db, run_name, metric_data_path):
"""
Load metric data from disk and import them into metricBundles.
via PCW Hackathan 2020 Resources
Parameters
----------
results_db : `dict`
A ResultsDb object
run_name : `str`
The name of the opsim database for the metrics in results_db
metric_data_path : `str`
The path to the directory where the metric datafiles are stored.
Returns
-------
bundle_dict : `dict`
A dictionary of metricBundles reconstructed from the data
stored on disk.
"""
bundle_dict = {}
display_info = result_db.getMetricDisplayInfo()
for item in display_info:
metric_name = item["metric_name"]
metric_file_name = item["metricDataFile"]
metric_id = item["metric_id"]
newbundle = metricBundles.create_empty_metric_bundle()
newbundle.read(os.path.join(metric_data_path, metric_file_name))
newbundle.set_run_name(run_name)
bundle_dict[metric_id, metric_name] = newbundle
return bundle_dict