Source code for rubin_sim.maf.run_comparison.microlensing_compare

import glob
import os

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import rubin_sim.maf.db as db
import rubin_sim.maf.metric_bundles as metricBundles


[docs] def microlensing_fom( save_folder, result_db_path, metric_data_path, figsize=None, figure_name="microlensing_fom", ): """ Processes a folder, puts together results for discovery/detect metric, Npts metric, and Fisher metric, and plots them in the four tE bins of 1 - 10 days, 10 - 30 days, 30 - 100 days, and 100 - 1000 days. Parameters ---------- result_db_path : `str` Path to the directory storing the result databases generated by MAF. metric_data_path : `str` Path to the directory storing the npz files generated by MAF. """ # get a dictionary of resultDb from given directory result_dbs = get_results_dbs(result_db_path) # the following line will be useful if you did not run MAF on all opsims run_names = list(result_dbs.keys()) # retrieve metricBundles for each opsim run and store them in a dictionary bundle_dicts = {} for run_name in result_dbs: bundle_dicts[run_name] = bundle_dict_from_disk(result_dbs[run_name], run_name, metric_data_path) # generates results and metric info from default name of file results = np.zeros(len(list(bundle_dicts.keys()))) results_compare = [] run_names = [] metric_types = [] min_t_es = np.zeros(len(list(bundle_dicts.keys()))) max_t_es = np.zeros(len(list(bundle_dicts.keys()))) for run in range(len(list(bundle_dicts.keys()))): npz = np.load( result_db_path + "/" + list(bundle_dicts.keys())[run] + ".npz", allow_pickle=True, ) relevant_columns = ["metric_values", "mask"] df = pd.DataFrame.from_dict({item: npz[item] for item in relevant_columns}) run_name, metric_type, min_t_e, max_t_e = parse_t_e_run_types(list(bundle_dicts.keys())[run]) run_names.append(run_name) metric_types.append(metric_type) min_t_es[run] = min_t_e max_t_es[run] = max_t_e results[run] = get_results(df, metric_type) if metric_type == "Npts": nan_to_be = np.where(df["metric_values"] >= 10e10)[0] df["metric_values"][nan_to_be] = np.nan results_compare.append(df["metric_values"]) run_names = np.array(run_names) metric_types = np.array(metric_types) results_compare = np.array(results_compare) plot_fom( results, run_names, metric_types, min_t_es, max_t_es, save_folder, figure_name, figsize=figsize, ) plot_compare(results_compare, run_names, metric_types, min_t_es, max_t_es, save_folder) return
[docs] def parse_t_e_run_types(name): """ Parses names of MicrolensingMetric file names Parameters ---------- name : `str` A MicrolensingMetric file name """ split_name = name.split("MicrolensingMetric") run_name = split_name[0][:-1] metric_type = split_name[1].split("_")[1] min_t_e = split_name[1].split("_")[3] max_t_e = split_name[1].split("_")[4] return run_name, metric_type, min_t_e, max_t_e
[docs] def get_results(df, run_type, fisher_sigmat_e_t_e_cutoff=0.1): """ Plots the results from the discovery/detect metric, Npts metric, and Fisher metric in three sub plots Parameters ---------- df : `pandas.Dataframe` Pandas dataframe of the results npz file run_types : `np.ndarray`, (N,) Array of strings describing microlensing metric type: either 'detect', 'Npts', or 'Fisher' as parsed by the file name fisher_sigmat_e_t_e_cutoff : `float` Maximum normalized uncertainty in tE (sigmatE/tE) as determined by 3sigma values of pubished planet microlensing candidates """ total = len(df) if run_type == "detect": # Fraction of discovered/detected events result = len(np.where(df["metric_values"] == 1)[0]) / total elif run_type == "Npts": # Average number of points per lightcurve result = ( sum( df["metric_values"][~np.isnan(df["metric_values"])][df["metric_values"] >= 0][ df["metric_values"] <= 10e10 ] ) / total ) elif run_type == "Fisher": # Fraction of events with sigmatE/tE below the cutoff of 0.1 result = len(np.where(df["metric_values"] < fisher_sigmat_e_t_e_cutoff)[0]) / total return result
[docs] def plot_fom(results, run_names, run_types, min_t_e, max_t_e, save_folder, figure_name, figsize): """ Plots the results from the discovery/detect metric, Npts metric, and Fisher metric in three sub plots Parameters ---------- results : `np.ndarray`, (N,) Results from the MicrolensingMetric from get_results() from the respective microlensing metric type run_names : `np.ndarray`, (N,) Array of names of the OpSim run that was used in the metric run_types : `np.ndarray`, (N,) Array of strings describing microlensing metric type: either 'detect', 'Npts', or 'Fisher' as parsed by the file name min_t_e : `np.ndarray`, (N,) Array of values describing the minium einstein crossing time (tE) as parsed by the file name max_t_e : `np.ndarray`, (N,) Array of values describing the maximum einstein crossing time (tE) as parsed by the file name save_folder : `str` String of folder name to save figure figure_name : `str` String of figure name figsize : (`int`, `int`) Tuple of figure size in inches. Default is None, which sets figsize = (25, 30) """ if figsize is None: figsize = (25, 30) fig, (ax1, ax2, ax3) = plt.subplots(1, 3, sharey=True, figsize=figsize) plt.tight_layout() plt.subplots_adjust(wspace=0, hspace=0) subfig_list = [ax1, ax2, ax3] plt.rcdefaults() font = {"weight": "heavy", "size": 30} plt.rc("font", **font) t_e_range_list = [] time_run_names = [] for i, j in zip(np.unique(min_t_e), np.unique(max_t_e)): idx_in_range = np.where((min_t_e >= i) & (max_t_e <= j)) t_e_range_list.append(idx_in_range) time_run_names.append("tE {}-{} days".format(int(i), int(j))) detect_runs_idx = np.where(run_types == "detect") npts_runs_idx = np.where(run_types == "Npts") fisher_runs_idx = np.where(run_types == "Fisher") run_type_list = [detect_runs_idx, npts_runs_idx, fisher_runs_idx] for t_e_range in range(len(t_e_range_list)): for run_type in range(len(run_type_list)): # sorted alphabetically according to name of run idx_list = list( zip( np.intersect1d(t_e_range_list[t_e_range], run_type_list[run_type]), run_names[np.intersect1d(t_e_range_list[t_e_range], run_type_list[run_type])], ) ) idx_list.sort(key=lambda x: x[1]) sorted_idxs = np.array([x[0] for x in idx_list]) subfig_list[run_type].plot( results[sorted_idxs], run_names[sorted_idxs], label=time_run_names[t_e_range], marker=".", markersize=15, linewidth=2.5, ) ax3.legend(bbox_to_anchor=(1, 1), fontsize=20) plt.tight_layout() plt.subplots_adjust(bottom=0.05) ax1.set_xlabel("Discovery Efficiency") ax2.set_xlabel("Avg Number of Points") ax2.set_xscale("log") ax3.set_xlabel("Characaterization Efficiency \n ($\\sigma_{t_E}/t_E$ < 0.1)") plt.savefig(save_folder + "/" + figure_name + ".png", bbox_inches="tight") return
[docs] def plot_compare(results, run_names, run_types, min_t_e, max_t_e, save_folder, npts_required=10): """ Plots confusion matrix type plots comparing fraction detected, characterized (via Fisher), and fraction of events with at least npts_required points within 2 tE Parameters ---------- results : `np.ndarray`, (N,) Results from the MicrolensingMetric from get_results() from the respective microlensing metric type run_names : `np.ndarray`, (N,) Array of names of the OpSim run that was used in the metric run_types : `np.ndarray`, (N,) Array of strings describing microlensing metric type: either 'detect', 'Npts', or 'Fisher' as parsed by the file name min_t_e : `np.ndarray`, (N,) Array of values describing the minium einstein crossing time (tE) as parsed by the file name max_t_e : `np.ndarray`, (N,) Array of values describing the maximum einstein crossing time (tE) as parsed by the file name save_folder : `str` String of folder name to save figures npts_required : `int` Number of poitns within 2tE required for the number of points fraction. """ plt.rcdefaults() font = {"weight": "heavy", "size": 20} plt.rc("font", **font) t_e_range_list = [] time_run_names = [] for i, j in zip(np.unique(min_t_e), np.unique(max_t_e)): idx_in_range = np.where((min_t_e >= i) & (max_t_e <= j)) t_e_range_list.append(idx_in_range) time_run_names.append("tE {}-{} days".format(int(i), int(j))) detect_runs_idx = np.where(run_types == "detect") npts_runs_idx = np.where(run_types == "Npts") fisher_runs_idx = np.where(run_types == "Fisher") run_name_list = np.unique(run_names) for t_e_range in range(len(t_e_range_list)): for run_name in run_name_list: run_name_idxs = np.where(run_names == run_name) t_e_run_name_interesct = np.intersect1d(t_e_range_list[t_e_range], run_name_idxs) detect_results = results[np.intersect1d(t_e_run_name_interesct, detect_runs_idx)] npts_results = results[np.intersect1d(t_e_run_name_interesct, npts_runs_idx)] fisher_results = results[np.intersect1d(t_e_run_name_interesct, fisher_runs_idx)] detected_fisher_comparison_matrix = detected_fisher_comparison(fisher_results, detect_results) fisher_npts_comparison_matrix = fisher_npts_comparison(fisher_results, npts_results) detected_npts_comparison_matrix = detected_npts_comparison(detect_results, npts_results) confusion_matrix_plot( detected_fisher_comparison_matrix, "Discovered", "Characterized", run_name, time_run_names[t_e_range], save_folder, ) confusion_matrix_plot( fisher_npts_comparison_matrix, "More than {} Points".format(npts_required), "Characterized", run_name, time_run_names[t_e_range], save_folder, ) confusion_matrix_plot( detected_npts_comparison_matrix, "More than {} Points".format(npts_required), "Detected", run_name, time_run_names[t_e_range], save_folder, ) return
[docs] def confusion_matrix_plot(comparison_matrix, xlabel, ylabel, run_name, t_e_range, save_folder): """ Plots a confusion matrix type plot comparing two metric types. Parameters ---------- comparison_matrix : `np.ndarray`, (N,)` Array comparing two metric types (A and B) with the following shape: [[(Yes A and Yes B), (Yes A and No B)], [(No A and Yes B), (No A and No B)]] where Yes A and Yes B are the number of events that pass both the A and B criteria. xlabel : `str` Sring of xlabel (also used in file name of figure) ylabel : `str` Sring of ylabel (also used in file name of figure) run_name : `str` Name of the OpSim run that was used in the metric (used in labels and file name) t_e_range : `str` String of the range of the tE (used in labels and file name) save_folder : `str` String of folder name to save figures """ fig, ax = plt.subplots(figsize=(5, 5)) ax.matshow(comparison_matrix, cmap=plt.cm.Blues, alpha=0.3) for i in range(len(comparison_matrix[0])): for j in range(len(comparison_matrix[1])): ax.text( x=j, y=i, s="{}".format(comparison_matrix[i, j]), va="center", ha="center", size="medium", ) ax.set(ylabel=ylabel, xlabel=xlabel, title=run_name + "\n" + t_e_range + "\n") ax.set_xticklabels([np.nan, "Yes", "No"]) ax.set_yticklabels([np.nan, "Yes", "No"]) plt.tight_layout() plt.savefig(save_folder + "/{}_{}_{}_{}.png".format(run_name, t_e_range, ylabel, xlabel)) plt.show() plt.close() return
[docs] def detected_fisher_comparison(fisher_results, detect_results, fisher_sigmat_e_t_e_cutoff=0.1): """ Returns an array of the following form where A = fisher criteria and B = detection criteria: [[(Yes A and Yes B), (Yes A and No B)], [(No A and Yes B), (No A and No B)]] where Yes A and Yes B are the number of events that pass both the A and B criteria. Parameters ---------- fisher_results : `np.ndarray`, (N,) Array of results from running the Fisher metric of the microlensing metric detect_results : `np.ndarray`, (N,) Array of results from running the detect metric of the microlensing metric fisher_sigmat_e_t_e_cutoff : `float` Maximum normalized uncertainty in tE (sigmatE/tE) as determined by 3sigma values of pubished planet microlensing candidates """ char_detect = np.where((fisher_results < fisher_sigmat_e_t_e_cutoff) & (detect_results == 1))[0] char_ndetect = np.where((fisher_results < fisher_sigmat_e_t_e_cutoff) & (detect_results == 0))[0] nchar_detect = np.where((fisher_results > fisher_sigmat_e_t_e_cutoff) & (detect_results == 1))[0] nchar_ndetect = np.where((fisher_results > fisher_sigmat_e_t_e_cutoff) & (detect_results == 0))[0] return np.array([[len(char_detect), len(char_ndetect)], [len(nchar_detect), len(nchar_ndetect)]])
[docs] def fisher_npts_comparison(fisher_results, npts_results, npts_required=10, fisher_sigmat_e_t_e_cutoff=0.1): """ Returns an array of the following form where A = fisher criteria and B = npts criteria: [[(Yes A and Yes B), (Yes A and No B)], [(No A and Yes B), (No A and No B)]] where Yes A and Yes B are the number of events that pass both the A and B criteria. Parameters ---------- fisher_results : `np.ndarray`, (N,) Array of results from running the Fisher metric of the microlensing metric npts_results : `np.ndarray`, (N,) Array of results from running the Npts metric of the microlensing metric npts_required : `int` Number of poitns within 2tE required for the number of points fraction. fisher_sigmat_e_t_e_cutoff : `float` Maximum normalized uncertainty in tE (sigmatE/tE) as determined by 3sigma values of pubished planet microlensing candidates """ char_npts = np.where((fisher_results < fisher_sigmat_e_t_e_cutoff) & (npts_results > npts_required))[0] char_nnpts = np.where((fisher_results < fisher_sigmat_e_t_e_cutoff) & (npts_results < npts_required))[0] nchar_npts = np.where((fisher_results > fisher_sigmat_e_t_e_cutoff) & (npts_results > npts_required))[0] nchar_nnpts = np.where((fisher_results > fisher_sigmat_e_t_e_cutoff) & (npts_results < npts_required))[0] return np.array([[len(char_npts), len(char_nnpts)], [len(nchar_npts), len(nchar_nnpts)]])
[docs] def detected_npts_comparison(detect_results, npts_results, npts_required=10): """ Returns an array of the following form where A = detect criteria and B = npts criteria: [[(Yes A and Yes B), (Yes A and No B)], [(No A and Yes B), (No A and No B)]] where Yes A and Yes B are the number of events that pass both the A and B criteria. Parameters ---------- detect_results : `np.ndarray`, (N,) Array of results from running the detect metric of the microlensing metric npts_results : `np.ndarray`, (N,) Array of results from running the Npts metric of the microlensing metric npts_required : `int` Number of poitns within 2tE required for the number of points fraction. """ detect_npts = np.where((detect_results == 1) & (npts_results > npts_required))[0] detect_nnpts = np.where((detect_results == 1) & (npts_results < npts_required))[0] ndetect_npts = np.where((detect_results == 0) & (npts_results > npts_required))[0] ndetect_nnpts = np.where((detect_results == 0) & (npts_results < npts_required))[0] return np.array([[len(detect_npts), len(detect_nnpts)], [len(ndetect_npts), len(ndetect_nnpts)]])
[docs] def get_results_dbs(result_db_path): """ Create a dictionary of result_db from result_db files via PCW Hackathan 2020 Resources Parameters ---------- result_db_path : `str` Path to the directory storing the result databases generated by MAF. Returns ------- result_dbs : `dict` A dictionary containing the ResultDb objects reconstructed from result databases in the provided directory. """ result_dbs = {} result_db_list = glob.glob(os.path.join(result_db_path, "*_result.db")) for result_db in result_db_list: run_name = os.path.basename(result_db).rsplit("_", 1)[0] result_db = db.ResultsDb(database=result_db) # Don't add empty results.db file, if len(result_db.getAllMetricIds()) > 0: result_dbs[run_name] = result_db return result_dbs
[docs] def bundle_dict_from_disk(result_db, run_name, metric_data_path): """ Load metric data from disk and import them into metricBundles. via PCW Hackathan 2020 Resources Parameters ---------- results_db : `dict` A ResultsDb object run_name : `str` The name of the opsim database for the metrics in results_db metric_data_path : `str` The path to the directory where the metric datafiles are stored. Returns ------- bundle_dict : `dict` A dictionary of metricBundles reconstructed from the data stored on disk. """ bundle_dict = {} display_info = result_db.getMetricDisplayInfo() for item in display_info: metric_name = item["metric_name"] metric_file_name = item["metricDataFile"] metric_id = item["metric_id"] newbundle = metricBundles.create_empty_metric_bundle() newbundle.read(os.path.join(metric_data_path, metric_file_name)) newbundle.set_run_name(run_name) bundle_dict[metric_id, metric_name] = newbundle return bundle_dict