Source code for rail.estimation.algos.knnpz

"""
quick implementation of k nearest neighbor estimator
First pass will ignore photometric errors and just do
things in terms of magnitudes, we will expand in a
future update
"""

import numpy as np
import copy

from ceci.config import StageParameter as Param
from rail.estimation.estimator import CatEstimator, CatInformer

from rail.evaluation.metrics.cdeloss import CDELoss
from rail.core.common_params import SHARED_PARAMS

import pandas as pd
import qp



def _computecolordata(df, ref_column_name, column_names):
    newdict = {}
    newdict['x'] = df[ref_column_name]
    nbands = len(column_names) - 1
    for k in range(nbands):
        newdict[f'x{k}'] = df[column_names[k]] - df[column_names[k + 1]]
    newdf = pd.DataFrame(newdict)
    coldata = newdf.to_numpy()
    return coldata


def _makepdf(dists, ids, szs, sigma):
    sigmas = np.full_like(dists, sigma)
    weights = 1. / dists
    weights /= weights.sum(axis=1, keepdims=True)
    means = szs[ids]
    pdfs = qp.Ensemble(qp.mixmod, data=dict(means=means, stds=sigmas, weights=weights))
    return pdfs


[docs]class Inform_KNearNeighPDF(CatInformer): """Train a KNN-based estimator """ name = 'Inform_KNearNeighPDF' config_options = CatInformer.config_options.copy() config_options.update(zmin=SHARED_PARAMS, zmax=SHARED_PARAMS, nzbins=SHARED_PARAMS, nondetect_val=SHARED_PARAMS, mag_limits=SHARED_PARAMS, bands=SHARED_PARAMS, ref_band=SHARED_PARAMS, redshift_col=SHARED_PARAMS, hdf5_groupname=SHARED_PARAMS, trainfrac=Param(float, 0.75, msg="fraction of training data used to make tree, rest used to set best sigma"), seed=Param(int, 0, msg="Random number seed for NN training"), sigma_grid_min=Param(float, 0.01, msg="minimum value of sigma for grid check"), sigma_grid_max=Param(float, 0.075, msg="maximum value of sigma for grid check"), ngrid_sigma=Param(int, 10, msg="number of grid points in sigma check"), leaf_size=Param(int, 15, msg="min leaf size for KDTree"), nneigh_min=Param(int, 3, msg="int, min number of near neighbors to use for PDF fit"), nneigh_max=Param(int, 7, msg="int, max number of near neighbors to use ofr PDF fit")) def __init__(self, args, comm=None): """ Constructor Do CatInformer specific initialization, then check on bands """ CatInformer.__init__(self, args, comm=comm) usecols = self.config.bands.copy() usecols.append(self.config.redshift_col) self.usecols = usecols self.zgrid = None
[docs] def run(self): """ train a KDTree on a fraction of the training data """ from sklearn.neighbors import KDTree if self.config.hdf5_groupname: training_data = self.get_data('input')[self.config.hdf5_groupname] else: # pragma: no cover training_data = self.get_data('input') knndf = pd.DataFrame(training_data, columns=self.config.bands) self.zgrid = np.linspace(self.config.zmin, self.config.zmax, self.config.nzbins) # replace nondetects # will fancy this up later with a flow to sample from truth for col in self.config.bands: if np.isnan(self.config.nondetect_val): # pragma: no cover knndf.loc[np.isnan(knndf[col]), col] = self.config.mag_limits[col] else: knndf.loc[np.isclose(knndf[col], self.config.nondetect_val), col] = self.config.mag_limits[col] trainszs = np.array(training_data[self.config.redshift_col]) colordata = _computecolordata(knndf, self.config.ref_band, self.config.bands) nobs = colordata.shape[0] rng = np.random.default_rng(seed=self.config.seed) perm = rng.permutation(nobs) ntrain = round(nobs * self.config.trainfrac) xtrain_data = colordata[perm[:ntrain]] train_data = copy.deepcopy(xtrain_data) val_data = colordata[perm[ntrain:]] xtrain_sz = trainszs[perm[:ntrain]].copy() train_sz = np.array(copy.deepcopy(xtrain_sz)) val_sz = np.array(trainszs[perm[ntrain:]]) print(f"split into {len(train_sz)} training and {len(val_sz)} validation samples") tmpmodel = KDTree(train_data, leaf_size=self.config.leaf_size) # Find best sigma and n_neigh by minimizing CDE Loss bestloss = 1e20 bestsig = self.config.sigma_grid_min bestnn = self.config.nneigh_min siggrid = np.linspace(self.config.sigma_grid_min, self.config.sigma_grid_max, self.config.ngrid_sigma) print("finding best fit sigma and NNeigh...") for sig in siggrid: for nn in range(self.config.nneigh_min, self.config.nneigh_max + 1): dists, idxs = tmpmodel.query(val_data, k=nn) ens = _makepdf(dists, idxs, train_sz, sig) cdelossobj = CDELoss(ens, self.zgrid, val_sz) cdeloss = cdelossobj.evaluate().statistic if cdeloss < bestloss: bestsig = sig bestnn = nn bestloss = cdeloss numneigh = bestnn sigma = bestsig print(f"\n\n\nbest fit values are sigma={sigma} and numneigh={numneigh}\n\n\n") # remake tree with full dataset! kdtree = KDTree(colordata, leaf_size=self.config.leaf_size) self.model = dict(kdtree=kdtree, bestsig=sigma, nneigh=numneigh, truezs=trainszs) self.add_data('model', self.model)
[docs]class KNearNeighPDF(CatEstimator): """KNN-based estimator """ name = 'KNearNeighPDF' config_options = CatEstimator.config_options.copy() config_options.update(zmin=SHARED_PARAMS, zmax=SHARED_PARAMS, nzbins=SHARED_PARAMS, bands=SHARED_PARAMS, ref_band=SHARED_PARAMS, nondetect_val=SHARED_PARAMS, mag_limits=SHARED_PARAMS, redshift_col=SHARED_PARAMS) def __init__(self, args, comm=None): """ Constructor: Do Estimator specific initialization """ self.sigma = None self.numneigh = None self.model = None self.trainszs = None self.zgrid = None CatEstimator.__init__(self, args, comm=comm) usecols = self.config.bands.copy() usecols.append(self.config.redshift_col) self.usecols = usecols
[docs] def open_model(self, **kwargs): CatEstimator.open_model(self, **kwargs) if self.model is None: #pragma: no cover return self.sigma = self.model['bestsig'] self.numneigh = self.model['nneigh'] self.kdtree = self.model['kdtree'] self.trainszs = self.model['truezs']
def _process_chunk(self, start, end, data, first): """ calculate and return PDFs for each galaxy using the trained flow """ print(f"Process {self.rank} estimating PZ PDF for rows {start:,} - {end:,}") knn_df = pd.DataFrame(data, columns=self.config.bands) self.zgrid = np.linspace(self.config.zmin, self.config.zmax, self.config.nzbins) # replace nondetects # will fancy this up later with a flow to sample from truth for col in self.config.bands: if np.isnan(self.config.nondetect_val): # pragma: no cover knn_df.loc[np.isnan(knn_df[col]), col] = self.config.mag_limits[col] else: knn_df.loc[np.isclose(knn_df[col], self.config.nondetect_val), col] = self.config.mag_limits[col] testcolordata = _computecolordata(knn_df, self.config.ref_band, self.config.bands) dists, idxs = self.kdtree.query(testcolordata, k=self.numneigh) test_ens = _makepdf(dists, idxs, self.trainszs, self.sigma) zmode = test_ens.mode(grid=self.zgrid) test_ens.set_ancil(dict(zmode=zmode)) self._do_chunk_output(test_ens, start, end, first)