Source code for rubin_sim.skybrightness.interp_components

__all__ = (
    "id2intid",
    "intid2id",
    "load_spec_files",
    "BaseSingleInterp",
    "ScatteredStar",
    "LowerAtm",
    "UpperAtm",
    "MergedSpec",
    "Airglow",
    "TwilightInterp",
    "MoonInterp",
    "ZodiacalInterp",
)

import glob
import os
import warnings

import healpy as hp
import numpy as np
from rubin_scheduler.data import get_data_dir
from scipy.interpolate import InterpolatedUnivariateSpline, interp1d

from rubin_sim.phot_utils import Bandpass, Sed

from .twilight_func import twilight_func

# Make backwards compatible with healpy
if hasattr(hp, "get_interp_weights"):
    get_neighbours = hp.get_interp_weights
elif hasattr(hp, "get_neighbours"):
    get_neighbours = hp.get_neighbours
else:
    print("Could not find appropriate healpy function for get_interp_weight or get_neighbours")


[docs] def id2intid(ids): """Take an array of ids, and convert them to an integer id. Handy if you want to put things into a sparse array. """ uids = np.unique(ids) order = np.argsort(ids) oids = ids[order] uintids = np.arange(np.size(uids), dtype=int) left = np.searchsorted(oids, uids) right = np.searchsorted(oids, uids, side="right") intids = np.empty(ids.size, dtype=int) for i in range(np.size(left)): intids[left[i] : right[i]] = uintids[i] result = intids * 0 result[order] = intids return result, uids, uintids
[docs] def intid2id(intids, uintids, uids, dtype=int): """convert an int back to an id""" ids = np.zeros(np.size(intids)) order = np.argsort(intids) ointids = intids[order] left = np.searchsorted(ointids, uintids, side="left") right = np.searchsorted(ointids, uintids, side="right") for i, (le, ri) in enumerate(zip(left, right)): ids[le:ri] = uids[i] result = np.zeros(np.size(intids), dtype=dtype) result[order] = ids return result
[docs] def load_spec_files(filenames, mags=False): """Load up the ESO spectra. The ESO npz files contain the following arrays: * filter_wave: The central wavelengths of the pre-computed magnitudes * wave: wavelengths for the spectra * spec: array of spectra and magnitudes along with the relevant variable inputs. For example, airglow has dtype = [('airmass', '<f8'), ('solar_flux', '<f8'), ('spectra','<f8', (17001,)), ('mags', '<f8', (6,)] For each unique airmass and solar_flux value, there is a 17001 elements spectra and 6 magnitudes. """ if len(filenames) == 1: temp = np.load(filenames[0]) wave = temp["wave"].copy() # Note old camelCase here. Might need to update if sav # files regenerated filter_wave = temp["filterWave"].copy() if mags: # don't copy the spectra to save memory space dt = np.dtype( [ (key, temp["spec"].dtype[i]) for i, key in enumerate(temp["spec"].dtype.names) if key != "spectra" ] ) spec = np.zeros(temp["spec"].size, dtype=dt) for key in temp["spec"].dtype.names: if key != "spectra": spec[key] = temp["spec"][key].copy() else: spec = temp["spec"].copy() else: temp = np.load(filenames[0]) wave = temp["wave"].copy() filter_wave = temp["filterWave"].copy() if mags: # don't copy the spectra to save memory space dt = np.dtype( [ (key, temp["spec"].dtype[i]) for i, key in enumerate(temp["spec"].dtype.names) if key != "spectra" ] ) spec = np.zeros(temp["spec"].size, dtype=dt) for key in temp["spec"].dtype.names: if key != "spectra": spec[key] = temp["spec"][key].copy() else: spec = temp["spec"].copy() for filename in filenames[1:]: temp = np.load(filename) if mags: # don't copy the spectra to save memory space dt = np.dtype( [ (key, temp["spec"].dtype[i]) for i, key in enumerate(temp["spec"].dtype.names) if key != "spectra" ] ) tempspec = np.zeros(temp["spec"].size, dtype=dt) for key in temp["spec"].dtype.names: if key != "spectra": tempspec[key] = temp["spec"][key].copy() else: tempspec = temp["spec"] spec = np.append(spec, tempspec) return spec, wave, filter_wave
[docs] class BaseSingleInterp: """Base class for interpolating sky components which only depend on airmass. Parameters ---------- comp_name : `str`, optional Component name. sorted_order : `list` [`str`], optional Order of the dimensions in the input .npz files. mags : `bool`, optional Return magnitudes (only) rather than the full spectrum. """ def __init__(self, comp_name=None, sorted_order=["airmass", "nightTimes"], mags=False): self.mags = mags data_dir = os.path.join(get_data_dir(), "skybrightness", "ESO_Spectra/" + comp_name) filenames = sorted(glob.glob(data_dir + "/*.npz")) self.spec, self.wave, self.filter_wave = load_spec_files(filenames, mags=self.mags) # Take the log of the spectra in case we want to interp in log space. if not mags: self.log_spec = np.zeros(self.spec["spectra"].shape, dtype=float) good = np.where(self.spec["spectra"] != 0) self.log_spec[good] = np.log10(self.spec["spectra"][good]) self.spec_size = self.spec["spectra"][0].size else: self.spec_size = 0 # What order are the dimensions sorted by (from how # the .npz was packaged) self.sorted_order = sorted_order self.dim_dict = {} self.dim_sizes = {} for dt in self.sorted_order: self.dim_dict[dt] = np.unique(self.spec[dt]) self.dim_sizes[dt] = np.size(np.unique(self.spec[dt])) # Set up and save the dict to order the filters once. self.filter_name_dict = {"u": 0, "g": 1, "r": 2, "i": 3, "z": 4, "y": 5}
[docs] def __call__(self, interp_points, filter_names=["u", "g", "r", "i", "z", "y"]): """At `interp_points (e.g. airmass), return values.""" if self.mags: return self.interp_mag(interp_points, filter_names=filter_names) else: return self.interp_spec(interp_points)
def _indx_and_weights(self, points, grid): """For given 1-D points, find the grid points on either side and return the weights assume grid is sorted. Parameters ---------- points : `np.ndarray`, (N,) The points on the grid to query. grid : `np.ndarray`, (N,) The grid on which to locate `points`. Returns ------- indx_r, indx_l : `np.ndarray`, `np.ndarray` The grid indexes for each of the 1-d points w_r, w_l : `np.ndarray`, `np.ndarray` The weights for each of these grid points. """ order = np.argsort(points) indx_r = np.empty(points.size, dtype=int) indx_r[order] = np.searchsorted(grid, points[order]) indx_l = indx_r - 1 # If points off the grid were requested, just use the edge grid point off_grid = np.where(indx_r == grid.size) indx_r[off_grid] = grid.size - 1 full_range = grid[indx_r] - grid[indx_l] w_l = np.zeros(full_range.size, dtype=float) w_r = np.ones(full_range.size, dtype=float) good = np.where(full_range != 0) w_l[good] = (grid[indx_r][good] - points[good]) / full_range[good] w_r[good] = (points[good] - grid[indx_l[good]]) / full_range[good] return indx_r, indx_l, w_r, w_l def _weighting(self, interp_points, values): """ given a list/array of airmass values, return a dict with the interpolated spectrum at each airmass and the wavelength array. Input interp_points should be sorted """ results = np.zeros((interp_points.size, np.size(values[0])), dtype=float) in_range = np.where( (interp_points["airmass"] <= np.max(self.dim_dict["airmass"])) & (interp_points["airmass"] >= np.min(self.dim_dict["airmass"])) ) indx_r, indx_l, w_r, w_l = self._indx_and_weights( interp_points["airmass"][in_range], self.dim_dict["airmass"] ) nextra = 1 # XXX--should I use the log spectra? # Make a check and switch back and forth? results[in_range] = ( w_r[:, np.newaxis] * values[indx_r * nextra] + w_l[:, np.newaxis] * values[indx_l * nextra] ) return results def interp_spec(self, interp_points): result = self._weighting(interp_points, self.log_spec) mask = np.where(result == 0.0) result = 10.0**result result[mask] = 0.0 return {"spec": result, "wave": self.wave} def interp_mag(self, interp_points, filter_names=["u", "g", "r", "i", "z", "y"]): filterindx = [self.filter_name_dict[key] for key in filter_names] result = self._weighting(interp_points, self.spec["mags"][:, filterindx]) mask = np.where(result == 0.0) result = 10.0 ** (-0.4 * (result - np.log10(3631.0))) result[mask] = 0.0 return {"spec": result, "wave": self.filter_wave}
[docs] class ScatteredStar(BaseSingleInterp): """Interpolate the spectra caused by scattered starlight.""" def __init__(self, comp_name="ScatteredStarLight", mags=False): super(ScatteredStar, self).__init__(comp_name=comp_name, mags=mags)
[docs] class LowerAtm(BaseSingleInterp): """Interpolate the spectra caused by the lower atmosphere.""" def __init__(self, comp_name="LowerAtm", mags=False): super(LowerAtm, self).__init__(comp_name=comp_name, mags=mags)
[docs] class UpperAtm(BaseSingleInterp): """Interpolate the spectra caused by the upper atmosphere.""" def __init__(self, comp_name="UpperAtm", mags=False): super(UpperAtm, self).__init__(comp_name=comp_name, mags=mags)
[docs] class MergedSpec(BaseSingleInterp): """Interpolate the combined spectra caused by the sum of the scattered starlight, air glow, upper and lower atmosphere. """ def __init__(self, comp_name="MergedSpec", mags=False): super(MergedSpec, self).__init__(comp_name=comp_name, mags=mags)
[docs] class Airglow(BaseSingleInterp): """Interpolate the spectra caused by airglow.""" def __init__(self, comp_name="Airglow", sorted_order=["airmass", "solarFlux"], mags=False): super(Airglow, self).__init__(comp_name=comp_name, mags=mags, sorted_order=sorted_order) self.n_solar_flux = np.size(self.dim_dict["solarFlux"]) def _weighting(self, interp_points, values): results = np.zeros((interp_points.size, np.size(values[0])), dtype=float) # Only interpolate point that lie in the model grid in_range = np.where( (interp_points["airmass"] <= np.max(self.dim_dict["airmass"])) & (interp_points["airmass"] >= np.min(self.dim_dict["airmass"])) & (interp_points["solar_flux"] >= np.min(self.dim_dict["solarFlux"])) & (interp_points["solar_flux"] <= np.max(self.dim_dict["solarFlux"])) ) use_points = interp_points[in_range] am_right_index, am_left_index, am_right_w, am_left_w = self._indx_and_weights( use_points["airmass"], self.dim_dict["airmass"] ) sf_right_index, sf_left_index, sf_right_w, sf_left_w = self._indx_and_weights( use_points["solar_flux"], self.dim_dict["solarFlux"] ) for am_index, amW in zip([am_right_index, am_left_index], [am_right_w, am_left_w]): for sf_index, sfW in zip([sf_right_index, sf_left_index], [sf_right_w, sf_left_w]): results[in_range] += ( amW[:, np.newaxis] * sfW[:, np.newaxis] * values[am_index * self.n_solar_flux + sf_index] ) return results
[docs] class TwilightInterp: """Use the Solar Spectrum to provide an interpolated spectra or magnitudes for the twilight sky. Parameters ---------- mags : `bool` If True, only return the LSST filter magnitudes, otherwise return the full spectrum dark_sky_mags : `dict` Dict of the zenith dark sky values to be assumed. The twilight fits are done relative to the dark sky level. fit_results : `dict` Dict of twilight parameters based on twilight_func. Keys should be filter names. """ def __init__(self, mags=False, dark_sky_mags=None, fit_results=None): if dark_sky_mags is None: dark_sky_mags = { "u": 22.8, "g": 22.3, "r": 21.2, "i": 20.3, "z": 19.3, "y": 18.0, "B": 22.35, "G": 21.71, "R": 21.3, } self.mags = mags data_dir = os.path.join(get_data_dir(), "skybrightness") solar_saved = np.load(os.path.join(data_dir, "solarSpec/solarSpec.npz")) self.solar_spec = Sed(wavelen=solar_saved["wave"], flambda=solar_saved["spec"]) solar_saved.close() canon_filters = {} fnames = ["blue_canon.csv", "green_canon.csv", "red_canon.csv"] # Filter names, from bluest to reddest. self.filter_names = ["B", "G", "R"] # Supress warning that Canon filters are low sampling warnings.filterwarnings("ignore", message="There is an area of") warnings.filterwarnings("ignore", message="Wavelength sampling of") for fname, filter_name in zip(fnames, self.filter_names): bpdata = np.genfromtxt( os.path.join(data_dir, "Canon/", fname), delimiter=", ", dtype=list(zip(["wave", "through"], [float] * 2)), ) bp_temp = Bandpass() bp_temp.set_bandpass(bpdata["wave"], bpdata["through"]) bp_temp.resample_bandpass( wavelen_min=self.solar_spec.wavelen.min(), wavelen_max=self.solar_spec.wavelen.max(), wavelen_step=self.solar_spec.wavelen[1] - self.solar_spec.wavelen[0], ) # Force wavelengths to be identical so # it doesn't try to resample again later bp_temp.wavelen = self.solar_spec.wavelen canon_filters[filter_name] = bp_temp # Tack on the LSST filters through_path = os.path.join(get_data_dir(), "throughputs", "baseline") lsst_keys = ["u", "g", "r", "i", "z", "y"] for key in lsst_keys: bp = np.loadtxt( os.path.join(through_path, "total_" + key + ".dat"), dtype=list(zip(["wave", "trans"], [float] * 2)), ) temp_b = Bandpass() temp_b.set_bandpass(bp["wave"], bp["trans"]) canon_filters[key] = temp_b self.filter_names.append(key) # MAGIC NUMBERS from fitting the all-sky camera: # Code to generate values in # sims_skybrightness/examples/fitTwiSlopesSimul.py # Which in turn uses twilight maps from # sims_skybrightness/examples/buildTwilMaps.py # values are of the form: # 0: ratio of f^z_12 to f_dark^z # 1: slope of curve wrt sun alt # 2: airmass term (10^(arg[2]*(X-1))) # 3: azimuth term. # 4: zenith dark sky flux (erg/s/cm^2) # For z and y, just assuming the shape parameter # fits are similar to the other bands. # Looks like the diode is not sensitive enough to detect faint sky. # Using the Patat et al 2006 I-band values for z and # modeified a little for y as a temp fix. if fit_results is None: self.fit_results = { "B": [ 7.56765633e00, 2.29798055e01, 2.86879956e-01, 3.01162143e-01, 2.58462036e-04, ], "G": [ 2.38561156e00, 2.29310648e01, 2.97733083e-01, 3.16403197e-01, 7.29660095e-04, ], "R": [ 1.75498017e00, 2.22011802e01, 2.98619033e-01, 3.28880254e-01, 3.24411056e-04, ], "z": [2.29, 24.08, 0.3, 0.3, -666], "y": [2.0, 24.08, 0.3, 0.3, -666], } # XXX-completely arbitrary fudge factor to make things # brighter in the blue # Just copy the blue and say it's brighter. self.fit_results["u"] = [ 16.0, 2.29622121e01, 2.85862729e-01, 2.99902574e-01, 2.32325117e-04, ] else: self.fit_results = fit_results # Take out any filters that don't have fit results self.filter_names = [key for key in self.filter_names if key in self.fit_results] self.eff_wave = [] self.solar_mag = [] for filter_name in self.filter_names: self.eff_wave.append(canon_filters[filter_name].calc_eff_wavelen()[0]) self.solar_mag.append(self.solar_spec.calc_mag(canon_filters[filter_name])) order = np.argsort(self.eff_wave) self.filter_names = np.array(self.filter_names)[order] self.eff_wave = np.array(self.eff_wave)[order] self.solar_mag = np.array(self.solar_mag)[order] # update the fit results to be zeropointed properly for key in self.fit_results: f0 = 10.0 ** (-0.4 * (dark_sky_mags[key] - np.log10(3631.0))) self.fit_results[key][-1] = f0 self.solar_wave = self.solar_spec.wavelen self.solar_flux = self.solar_spec.flambda # This one isn't as bad as the model grids, maybe we could get # away with computing the magnitudes in the __call__ each time. if mags: # Load up the LSST filters and convert the # solarSpec.flambda and solarSpec.wavelen to fluxes self.lsst_filter_names = ["u", "g", "r", "i", "z", "y"] self.lsst_equations = np.zeros( (np.size(self.lsst_filter_names), np.size(self.fit_results["B"])), dtype=float, ) self.lsst_eff_wave = [] fits = np.empty((np.size(self.eff_wave), np.size(self.fit_results["B"])), dtype=float) for i, fn in enumerate(self.filter_names): fits[i, :] = self.fit_results[fn] through_path = os.path.join(get_data_dir(), "throughputs", "baseline") for filtername in self.lsst_filter_names: bp = np.loadtxt( os.path.join(through_path, "total_" + filtername + ".dat"), dtype=list(zip(["wave", "trans"], [float] * 2)), ) temp_b = Bandpass() temp_b.set_bandpass(bp["wave"], bp["trans"]) self.lsst_eff_wave.append(temp_b.calc_eff_wavelen()[0]) # Loop through the parameters and interpolate to new # eff wavelengths for i in np.arange(self.lsst_equations[0, :].size): interp = InterpolatedUnivariateSpline(self.eff_wave, fits[:, i]) self.lsst_equations[:, i] = interp(self.lsst_eff_wave) # Set the dark sky flux for i, filter_name in enumerate(self.lsst_filter_names): self.lsst_equations[i, -1] = 10.0 ** (-0.4 * (dark_sky_mags[filter_name] - np.log10(3631.0))) self.filter_name_dict = {"u": 0, "g": 1, "r": 2, "i": 3, "z": 4, "y": 5}
[docs] def print_fits_used(self): """Print out the fit parameters being used""" print( r"\\tablehead{\colhead{Filter} & \colhead{$r_{12/z}$} & " r"\colhead{$a$ (1/radians)} & \colhead{$b$ (1/airmass)} & " r"\colhead{$c$ (az term/airmass)} & " r"\colhead{$f_z_dark$ (erg/s/cm$^2$)$\\times 10^8$} & " r"\colhead{m$_z_dark$}}" ) for key in self.fit_results: numbers = "" for num in self.fit_results[key]: if num > 0.001: numbers += " & %.2f" % num else: numbers += " & %.2f" % (num * 1e8) print( key, numbers, " & ", "%.2f" % (-2.5 * np.log10(self.fit_results[key][-1]) + np.log10(3631.0)), )
def __call__(self, intep_points, filter_names=["u", "g", "r", "i", "z", "y"]): if self.mags: return self.interp_mag(intep_points, filter_names=filter_names) else: return self.interp_spec(intep_points)
[docs] def interp_mag( self, interp_points, max_am=3.0, limits=(np.radians(15.0), np.radians(-20.0)), filter_names=["u", "g", "r", "i", "z", "y"], ): """ Parameters ---------- interp_points : `np.ndarray`, (N, 3) Interpolation points. Should contain sunAlt, airmass and azRelSun. max_am : `float`, optional Maximum airmass to calculate twilight sky to. limits : `np.ndarray`, (N,), optional Sun altitude limits Returns ------- spectra, wavelength : `np.ndarray`, (N, 3), `np.ndarray`, (M,) Note ---- Originally fit the twilight with a cutoff of sun altitude of -11 degrees. I think it can be safely extrapolated farther, but be warned you may be entering a regime where it breaks down. """ npts = len(filter_names) result = np.zeros((np.size(interp_points), npts), dtype=float) out_of_range = np.where(interp_points["sunAlt"] > np.radians(-11))[0] if np.size(out_of_range) > 0: warnings.warn("Extrapolating twilight beyond a sun altitude of -11 degrees") good = np.where( (interp_points["sunAlt"] >= np.min(limits)) & (interp_points["sunAlt"] <= np.max(limits)) & (interp_points["airmass"] <= max_am) & (interp_points["airmass"] >= 1.0) )[0] for i, filterName in enumerate(filter_names): out_of_range = np.where(interp_points["sunAlt"] > np.max(limits))[0] if np.size(out_of_range) > 0: result[:, i] = np.nan else: result[good, i] = twilight_func( interp_points[good], *self.lsst_equations[self.filter_name_dict[filterName], :].tolist() ) return {"spec": result, "wave": self.lsst_eff_wave}
[docs] def interp_spec(self, interp_points, max_am=3.0, limits=(np.radians(15.0), np.radians(-20.0))): """ Parameters ---------- interp_points : `np.ndarray`, (N, 3) Interpolation points. Should contain sunAlt, airmass and azRelSun. max_am : `float`, optional Maximum airmass to calculate twilight sky to. limits : `np.ndarray`, (N,), optional Sun altitude limits Returns ------- spectra, wavelength : `np.ndarray`, (N, 3), `np.ndarray`, (M,) Note ---- Originally fit the twilight with a cutoff of sun altitude of -11 degrees. I think it can be safely extrapolated farther, but be warned you may be entering a regime where it breaks down. """ npts = np.size(self.solar_wave) result = np.zeros((np.size(interp_points), npts), dtype=float) out_of_range = np.where(interp_points["sunAlt"] > np.radians(-11))[0] if np.size(out_of_range) > 0: warnings.warn("Extrapolating twilight beyond a sun altitude of -11 degrees") good = np.where( (interp_points["sunAlt"] >= np.min(limits)) & (interp_points["sunAlt"] <= np.max(limits)) & (interp_points["airmass"] <= max_am) & (interp_points["airmass"] >= 1.0) )[0] # Compute the expected flux in each of the filters that # we have fits for fluxes = [] for filter_name in self.filter_names: out_of_range = np.where(interp_points["sunAlt"] > np.max(limits))[0] if np.size(out_of_range) > 0: fluxes.append(np.nan) else: fluxes.append(twilight_func(interp_points[good], *self.fit_results[filter_name])) fluxes = np.array(fluxes) # ratio of model flux to raw solar flux: yvals = fluxes.T / (10.0 ** (-0.4 * (self.solar_mag - np.log10(3631.0)))) # Find wavelengths bluer than cutoff blue_region = np.where(self.solar_wave < np.min(self.eff_wave)) for i, yval in enumerate(yvals): interp_f = interp1d(self.eff_wave, yval, bounds_error=False, fill_value=yval[-1]) ratio = interp_f(self.solar_wave) interp_blue = InterpolatedUnivariateSpline(self.eff_wave, yval, k=1) ratio[blue_region] = interp_blue(self.solar_wave[blue_region]) result[good[i]] = self.solar_flux * ratio return {"spec": result, "wave": self.solar_wave}
[docs] class MoonInterp(BaseSingleInterp): """ Read in the saved Lunar spectra and interpolate. """ def __init__( self, comp_name="Moon", sorted_order=["moonSunSep", "moonAltitude", "hpid"], mags=False, ): super(MoonInterp, self).__init__(comp_name=comp_name, sorted_order=sorted_order, mags=mags) # Magic number from when the templates were generated self.nside = 4 def _weighting(self, interp_points, values): """ Weighting for the scattered moonlight. """ result = np.zeros((interp_points.size, np.size(values[0])), dtype=float) # Check that moonAltitude is in range, otherwise return zero array if np.max(interp_points["moonAltitude"]) < np.min(self.dim_dict["moonAltitude"]): return result # Find the neighboring healpixels hpids, hweights = get_neighbours( self.nside, np.pi / 2.0 - interp_points["alt"], interp_points["azRelMoon"] ) badhp = np.isin(hpids.ravel(), self.dim_dict["hpid"], invert=True).reshape(hpids.shape) hweights[badhp] = 0.0 norm = np.sum(hweights, axis=0) good = np.where(norm != 0.0)[0] hweights[:, good] = hweights[:, good] / norm[good] # Find the neighboring moonAltitude points in the grid right_m_as, left_m_as, ma_right_w, ma_left_w = self._indx_and_weights( interp_points["moonAltitude"], self.dim_dict["moonAltitude"] ) # Find the neighboring moonSunSep points in the grid right_mss, left_mss, mss_right_w, mss_left_w = self._indx_and_weights( interp_points["moonSunSep"], self.dim_dict["moonSunSep"] ) nhpid = self.dim_dict["hpid"].size n_ma = self.dim_dict["moonAltitude"].size # Convert the hpid to an index. tmp = intid2id(hpids.ravel(), self.dim_dict["hpid"], np.arange(self.dim_dict["hpid"].size)) hpindx = tmp.reshape(hpids.shape) # loop though the hweights and the moonAltitude weights for hpid, hweight in zip(hpindx, hweights): for maid, maW in zip([right_m_as, left_m_as], [ma_right_w, ma_left_w]): for mssid, mssW in zip([right_mss, left_mss], [mss_right_w, mss_left_w]): weight = hweight * maW * mssW result += weight[:, np.newaxis] * values[mssid * nhpid * n_ma + maid * nhpid + hpid] return result
[docs] class ZodiacalInterp(BaseSingleInterp): """ Interpolate the zodiacal light based on the airmass and the healpix ID where the healpixels are in ecliptic coordinates, with the sun at ecliptic longitude zero """ def __init__(self, comp_name="Zodiacal", sorted_order=["airmass", "hpid"], mags=False): super(ZodiacalInterp, self).__init__(comp_name=comp_name, sorted_order=sorted_order, mags=mags) self.nside = hp.npix2nside( np.size(np.where(self.spec["airmass"] == np.unique(self.spec["airmass"])[0])[0]) ) def _weighting(self, interp_points, values): """ interp_points is a numpy array where interpolation is desired values are the model values. """ result = np.zeros((interp_points.size, np.size(values[0])), dtype=float) in_range = np.where( (interp_points["airmass"] <= np.max(self.dim_dict["airmass"])) & (interp_points["airmass"] >= np.min(self.dim_dict["airmass"])) ) use_points = interp_points[in_range] # Find the neighboring healpixels hpids, hweights = get_neighbours( self.nside, np.pi / 2.0 - use_points["altEclip"], use_points["azEclipRelSun"], ) badhp = np.isin(hpids.ravel(), self.dim_dict["hpid"], invert=True).reshape(hpids.shape) hweights[badhp] = 0.0 norm = np.sum(hweights, axis=0) good = np.where(norm != 0.0)[0] hweights[:, good] = hweights[:, good] / norm[good] am_right_index, am_left_index, am_right_w, am_left_w = self._indx_and_weights( use_points["airmass"], self.dim_dict["airmass"] ) nhpid = self.dim_dict["hpid"].size # loop though the hweights and the airmass weights for hpid, hweight in zip(hpids, hweights): for am_index, amW in zip([am_right_index, am_left_index], [am_right_w, am_left_w]): weight = hweight * amW result[in_range] += weight[:, np.newaxis] * values[am_index * nhpid + hpid] return result