Source code for rail.creation.degradation.observing_condition_degrader

"""Degrader applied to the magnitude error based on a set of input observing condition maps"""

import os
from dataclasses import fields

import healpy as hp
import numpy as np
import pandas as pd
from ceci.config import StageParameter as Param
from photerr import LsstErrorModel, LsstErrorParams

from rail.creation.degrader import Degrader


[docs]class ObsCondition(Degrader): """Photometric errors based on observation conditions This degrader calculates spatially-varying photometric errors using input survey condition maps. The error is based on the LSSTErrorModel from the PhotErr python package. Parameters ---------- nside: int, optional nside used for the HEALPIX maps. mask: str, optional Path to the mask covering the survey footprint in HEALPIX format. Notice that all negative values will be set to zero. weight: str, optional Path to the weights HEALPIX format, used to assign sample galaxies to pixels. Default is weight="", which uses uniform weighting. tot_nVis_flag: bool, optional If any map for nVisYr are provided, this flag indicates whether the map shows the total number of visits in nYrObs (tot_nVis_flag=True), or the average number of visits per year (tot_nVis_flag=False). The default is set to True. random_seed: int, optional A random seed for reproducibility. map_dict: dict, optional A dictionary that contains the paths to the survey condition maps in HEALPIX format. This dictionary uses the same arguments as LSSTErrorModel (from PhotErr). The following arguments, if supplied, may contain either a single number (as in the case of LSSTErrorModel), or a path: [m5, nVisYr, airmass, gamma, msky, theta, km, tvis] For the following keys: [m5, nVisYr, gamma, msky, theta, km] numbers/paths for specific bands must be passed. Example: {"m5": {"u": path, ...}, "theta": {"u": path, ...},} Other LSSTErrorModel parameters can also be passed in this dictionary (e.g. a necessary one may be [nYrObs] for the survey condition maps). If any argument is not passed, the default value in PhotErr's LsstErrorModel is adopted. """ name = "ObsCondition" config_options = Degrader.config_options.copy() config_options.update( nside=Param( int, 128, msg="nside for the input maps in HEALPIX format.", ), mask=Param( str, os.path.join( os.path.dirname(__file__), "../../examples_data/creation_data/data/survey_conditions/DC2-mask-neg-nside-128.fits", ), msg="mask for the input maps in HEALPIX format.", ), weight=Param( str, os.path.join( os.path.dirname(__file__), "../../examples_data/creation_data/data/survey_conditions/DC2-dr6-galcounts-i20-i25.3-nside-128.fits", ), msg="weight for assigning pixels to galaxies in HEALPIX format.", ), tot_nVis_flag=Param( bool, True, msg="flag indicating whether nVisYr is the total or average per year if supplied.", ), random_seed=Param(int, 42, msg="random seed for reproducibility"), map_dict=Param( dict, { "m5": { "i": os.path.join( os.path.dirname(__file__), "../../examples_data/creation_data/data/survey_conditions/minion_1016_dc2_Median_fiveSigmaDepth_i_and_nightlt1825_HEAL.fits", ), }, "nYrObs": 5.0, }, msg="dictionary containing the paths to the survey condition maps and/or additional LSSTErrorModel parameters.", ), ) def __init__(self, args, comm=None): Degrader.__init__(self, args, comm=comm) # store a list of keys relevant for # survey conditions; # a path to the survey condition # map or a float number should be # provided if these keys are provided self.obs_cond_keys = [ "m5", "nVisYr", "airmass", "gamma", "msky", "theta", "km", "tvis", ] # validate input parameters self._validate_obs_config() # initiate self.maps self.maps = {} # load the maps self._get_maps() def _validate_obs_config(self): """ Validate the input """ ### Check nside type: # check if nside < 0 if self.config["nside"] < 0: raise ValueError("nside must be positive.") # check if nside is powers of two if not np.log2(self.config["nside"]).is_integer(): raise ValueError("nside must be powers of two.") ### Check mask type: # check if mask is provided if self.config["mask"] == "": raise ValueError("mask needs to be provided for the input maps.") # check if the path exists if not os.path.exists(self.config["mask"]): raise ValueError("The mask file is not found: " + self.config["mask"]) ### Check weight type: if self.config["weight"] != "": # check if the path exists if not os.path.exists(self.config["weight"]): raise ValueError("The weight file is not found: " + self.config["weight"]) ### Check map_dict: # Check if extra keys are passed # get lsst_error_model keys lsst_error_model_keys = [field.name for field in fields(LsstErrorParams)] if len(set(self.config["map_dict"].keys()) - set(lsst_error_model_keys)) != 0: extra_keys = set(self.config["map_dict"].keys()) - set(lsst_error_model_keys) raise ValueError("Extra keywords are passed to the configuration: \n" + str(extra_keys)) # Check data type for the keys: # Note that LSSTErrorModel checks # the data type for its parameters, # so here we only check the additional # parameters and the file paths # nYrObs may be used below, so we # check its type as well if len(self.config["map_dict"]) > 0: for key in self.config["map_dict"]: if key == "nYrObs": if not isinstance(self.config["map_dict"][key], float): raise TypeError("nYrObs must be a float.") elif key in self.obs_cond_keys: # band-independent keys: if key in ["airmass", "tvis"]: # check if the input is a string or number if not ( isinstance(self.config["map_dict"][key], str) or isinstance(self.config["map_dict"][key], float) ): raise TypeError(f"{key} must be a path (string) or a float.") # check if the paths exist if isinstance(self.config["map_dict"][key], str): if not os.path.exists(self.config["map_dict"][key]): raise ValueError( "The following file is not found: " + self.config["map_dict"][key] ) # band-dependent keys else: # they must be dictionaries: if not isinstance(self.config["map_dict"][key], dict): # pragma: no cover raise TypeError(f"{key} must be a dictionary.") # the dictionary cannot be empty if len(self.config["map_dict"][key]) == 0: raise ValueError(f"{key} is empty.") for band in self.config["map_dict"][key].keys(): # check if the input is a string or float: if not ( isinstance(self.config["map_dict"][key][band], str) or isinstance(self.config["map_dict"][key][band], float) ): raise TypeError(f"{key}['{band}'] must be a path (string) or a float.") # check if the paths exist if isinstance(self.config["map_dict"][key][band], str): if not os.path.exists(self.config["map_dict"][key][band]): raise ValueError( "The following file is not found: " + self.config["map_dict"][key][band] ) def _get_maps(self): """ Load in the maps from the paths provided by map_dict, if it is not empty A note on nVisYr: input map usually in terms of total number of exposures, so manually divide the map by nYrObs """ maps = {} # Load mask mask = hp.read_map(self.config["mask"]) if (mask < 0).any(): # set negative values (if any) to zero mask[mask < 0] = 0 pixels = np.arange(int(self.config["nside"] ** 2 * 12))[mask.astype(bool)] maps["pixels"] = pixels # Load weight if given if self.config["weight"] != "": maps["weight"] = hp.read_map(self.config["weight"])[pixels] # Load all other maps in map_dict if len(self.config["map_dict"]) > 0: for key in self.config["map_dict"]: if key in self.obs_cond_keys: # band-independent keys: if key in ["airmass", "tvis"]: if isinstance(self.config["map_dict"][key], str): maps[key] = hp.read_map(self.config["map_dict"][key])[pixels] elif isinstance(self.config["map_dict"][key], float): maps[key] = np.ones(len(pixels)) * self.config["map_dict"][key] # band-dependent keys else: maps[key] = {} for band in self.config["map_dict"][key].keys(): if isinstance(self.config["map_dict"][key][band], str): maps[key][band] = hp.read_map(self.config["map_dict"][key][band])[pixels] elif isinstance(self.config["map_dict"][key][band], float): maps[key][band] = np.ones(len(pixels)) * self.config["map_dict"][key][band] else: # copy all other lsst_error_model parameters supplied maps[key] = self.config["map_dict"][key] if "nVisYr" in list(self.config["map_dict"].keys()): if "nYrObs" not in list(maps.keys()): # Set to default: maps["nYrObs"] = 10.0 if self.config["tot_nVis_flag"] == True: # For each band, compute the average number of visits per year for band in maps["nVisYr"].keys(): maps["nVisYr"][band] /= float(maps["nYrObs"]) self.maps = maps
[docs] def get_pixel_conditions(self, pixel: int) -> dict: """ get the map values at given pixel output is a dictionary that only contains the LSSTErrorModel keys """ allpix = self.maps["pixels"] ind = allpix == pixel obs_conditions = {} for key in (self.maps).keys(): # For keys that may contain the survey condition maps if key in self.obs_cond_keys: # band-independent keys: if key in ["airmass", "tvis"]: obs_conditions[key] = float(self.maps[key][ind]) # band-dependent keys: else: obs_conditions[key] = {} for band in (self.maps[key]).keys(): obs_conditions[key][band] = float(self.maps[key][band][ind]) # For other keys in LSSTErrorModel: elif key not in ["pixels", "weight"]: obs_conditions[key] = self.maps[key] # obs_conditions should now only contain the LSSTErrorModel keys return obs_conditions
[docs] def assign_pixels(self, catalog: pd.DataFrame) -> pd.DataFrame: """ assign the pixels to the input catalog """ pixels = self.maps["pixels"] if "weight" in list((self.maps).keys()): weights = self.maps["weight"] weights = weights / sum(weights) else: weights = None assigned_pix = self.rng.choice(pixels, size=len(catalog), replace=True, p=weights) # make it a DataFrame object assigned_pix = pd.DataFrame(assigned_pix, columns=["pixel"]) catalog = pd.concat([catalog, assigned_pix], axis=1) return catalog
[docs] def run(self): """ Run the degrader. """ self.rng = np.random.default_rng(seed=self.config["random_seed"]) catalog = self.get_data("input", allow_missing=True) # if self.map_dict empty, call LsstErrorModel: if len(self.config["map_dict"]) == 0: print("Empty map_dict, using default parameters from LsstErrorModel.") errorModel = LsstErrorModel() catalog = errorModel(catalog, random_state=self.rng) self.add_data("output", catalog) # if maps are provided, compute mag err for each pixel elif len(self.config["map_dict"]) > 0: # assign each galaxy to a pixel print("Assigning pixels.") catalog = self.assign_pixels(catalog) # loop over each pixel pixel_cat_list = [] for pixel, pixel_cat in catalog.groupby("pixel"): # get the observing conditions for this pixel obs_conditions = self.get_pixel_conditions(pixel) # creating the error model for this pixel errorModel = LsstErrorModel(**obs_conditions) # calculate the error model for this pixel obs_cat = errorModel(pixel_cat, random_state=self.rng) # add this pixel catalog to the list pixel_cat_list.append(obs_cat) # recombine all the pixels into a single catalog catalog = pd.concat(pixel_cat_list) # sort index catalog = catalog.sort_index() self.add_data("output", catalog)
def __repr__(self): """ Define how the model is represented and printed. """ # start message printMsg = "Loaded observing conditions from configuration file: \n" printMsg += f"nside = {self.config['nside']}, \n" printMsg += f"mask file: {self.config['mask']}, \n" printMsg += f"weight file: {self.config['weight']}, \n" printMsg += f"tot_nVis_flag = {self.config['tot_nVis_flag']}, \n" printMsg += f"random_seed = {self.config['random_seed']}, \n" printMsg += "map_dict contains the following items: \n" printMsg += str(self.config["map_dict"]) return printMsg