Source code for rail.estimation.algos.pzflow
"""
first pass implementation of pzflow estimator
First pass will ignore photometric errors and just do
things in terms of magnitudes, we will expand in a
future update
"""
import numpy as np
from ceci.config import StageParameter as Param
from rail.estimation.estimator import CatEstimator, CatInformer
from rail.core.data import FlowHandle, TableHandle
import pandas as pd
import qp
[docs]def computemeanstd(df):
"""Compute colors from the magnitudes and compute their
means and stddevs for data whitening
Parameters
----------
df: pandas dataframe
ordered dict of raw input data
Returns
-------
means, stds: numpy arrays
means and stddevs for the mags and colors
"""
tmpdict = {}
tmpdict['redshift'] = df['redshift']
tmpdict['i'] = df['mag_i_lsst']
bands = ['u', 'g', 'r', 'i', 'z', 'y']
usecols = ['redshift', 'i']
for k in range(5):
newcol = f'{bands[k]}{bands[k+1]}'
usecols.append(newcol)
tmpdict[f'{bands[k]}{bands[k+1]}'] = \
df[f'mag_{bands[k]}_lsst'] - \
df[f'mag_{bands[k+1]}_lsst']
tmpdf = pd.DataFrame(tmpdict)
tmpdata = np.array(tmpdf[usecols])
means = tmpdata.mean(axis=0)
stds = tmpdata.std(axis=0)
return means, stds
def_bands = ['u', 'g', 'r', 'i', 'z', 'y']
refcols = [f"mag_{band}_lsst" for band in def_bands]
all_columns = refcols.copy()
for band in def_bands:
all_columns.append(f"mag_{band}_lsst_err")
def_maglims = dict(mag_u_lsst=27.79,
mag_g_lsst=29.04,
mag_r_lsst=29.06,
mag_i_lsst=28.62,
mag_z_lsst=27.98,
mag_y_lsst=27.05)
def_errornames=dict(mag_err_u_lsst="mag_u_lsst_err",
mag_err_g_lsst="mag_g_lsst_err",
mag_err_r_lsst="mag_r_lsst_err",
mag_err_i_lsst="mag_i_lsst_err",
mag_err_z_lsst="mag_z_lsst_err",
mag_err_y_lsst="mag_y_lsst_err")
[docs]class Inform_PZFlowPDF(CatInformer):
""" Subclass to train a pzflow-based estimator
"""
name = 'Inform_PZFlowPdf'
outputs = [('model', FlowHandle)]
config_options = CatInformer.config_options.copy()
config_options.update(zmin=Param(float, 0.0, msg="min z"),
zmax=Param(float, 3.0, msg="max_z"),
nzbins=Param(int, 301, msg="num z bins"),
flow_seed=Param(int, 0, msg="seed for flow"),
ref_column_name=Param(str, 'mag_i_lsst',
msg="name for reference column"),
column_names=Param(list, refcols,
msg="column names to be used in flow"),
mag_limits=Param(dict, def_maglims,
msg="1 sigma mag limits"),
include_mag_errors=Param(bool, False,
msg="Boolean flag on whether to marginalize"
"over mag errors (NOTE: much slower on CPU!)"),
error_names_dict=Param(dict, def_errornames,
msg="dictionary to rename error columns"),
n_error_samples=Param(int, 1000,
msg="umber of error samples in marginalization"),
soft_sharpness=Param(int, 10,
msg="sharpening paremeter for SoftPlus"),
soft_idx_col=Param(int, 0,
msg="index column for SoftPlus"),
redshift_column_name=Param(str, 'redshift',
msg="name of redshift column"),
num_training_epochs=Param(int, 50,
msg="number flow training epochs"))
def __init__(self, args, comm=None):
"""Constructor, build the CatInformer, then do PZFlow specific setup
"""
CatInformer.__init__(self, args, comm=comm)
usecols = self.config.column_names.copy()
allcols = usecols.copy()
if self.config.include_mag_errors: # only include errors if option set
for item in self.config.error_names_dict:
allcols.append(self.config.error_names_dict[item])
usecols.append(self.config.redshift_column_name)
allcols.append(self.config.redshift_column_name)
self.usecols = usecols
self.allcols = allcols
[docs] def run(self):
"""
train a flow based on the training data
This is mostly based off of the pzflow example notebook
"""
from pzflow import Flow
from pzflow.bijectors import Chain, ColorTransform, InvSoftplus
from pzflow.bijectors import StandardScaler, RollingSplineCoupling
if self.config.hdf5_groupname:
training_data = self.get_data('input')[self.config.hdf5_groupname]
else: #pragma: no cover
training_data = self.get_data('input')
input_df = pd.DataFrame(training_data)
flowdf = input_df[self.usecols]
# replace nondetects
# will fancy this up later with a flow to sample from truth
for col in self.config.column_names:
flowdf.loc[np.isclose(flowdf[col], 99.), col] = self.config.mag_limits[col]
# compute means and stddevs for StandardScalar transform
col_means, col_stds = computemeanstd(flowdf)
ref_idx = flowdf.columns.get_loc(self.config.ref_column_name)
mag_idx = [flowdf.columns.get_loc(col) for col in self.config.column_names]
nlayers = flowdf.shape[1]
bijector = Chain(
ColorTransform(ref_idx, mag_idx),
InvSoftplus(self.config.soft_idx_col, self.config.soft_sharpness),
StandardScaler(col_means, col_stds),
RollingSplineCoupling(nlayers),
)
self.model = Flow(flowdf.columns, bijector, seed=self.config.flow_seed)
_ = self.model.train(flowdf[self.usecols], epochs=self.config.num_training_epochs,
verbose=True)
self.model.save(self.get_output('model'))
[docs]class PZFlowPDF(CatEstimator):
"""CatEstimator which uses PZFlow
"""
name = 'PZFlowPDF'
inputs = [('model', FlowHandle),
('input', TableHandle)]
config_options = CatEstimator.config_options.copy()
config_options.update(zmin=Param(float, 0.0, msg="The minimum redshift of the z grid"),
zmax=Param(float, 3.0, msg="The maximum redshift of the z grid"),
nzbins=Param(int, 301, msg="The number of gridpoints in the z grid"),
flow_seed=Param(int, 0, msg="seed for flow"),
ref_column_name=Param(str, 'mag_i_lsst',
msg="name for reference column"),
column_names=Param(list, refcols,
msg="column names to be used in flow"),
mag_limits=Param(dict, def_maglims,
msg="1 sigma mag limits"),
include_mag_errors=Param(bool, False,
msg="Boolean flag on whether to marginalize"
"over mag errors (NOTE: much slower on CPU!)"),
error_names_dict=Param(dict, def_errornames,
msg="dictionary to rename error columns"),
n_error_samples=Param(int, 1000,
msg="umber of error samples in marginalization"),
redshift_column_name=Param(str, 'redshift',
msg="name of redshift column"))
def __init__(self, args, comm=None):
CatEstimator.__init__(self, args, comm=comm)
usecols = self.config.column_names.copy()
allcols = usecols.copy()
if self.config.include_mag_errors: #pragma: no cover
for item in self.config.error_names_dict:
allcols.append(self.config.error_names_dict[item])
usecols.append(self.config.redshift_column_name)
allcols.append(self.config.redshift_column_name)
self.usecols = usecols
self.allcols = allcols
self.zgrid = None
def _process_chunk(self, start, end, data, first):
"""
calculate and return PDFs for each galaxy using the trained flow
"""
# flow expects dataframe
test_df = pd.DataFrame(data)
if self.config.include_mag_errors: #pragma: no cover
# rename the error columns to end in _err!
test_df.rename(columns=self.config.error_names_dict, inplace=True)
flow_df = test_df[self.allcols]
# replace nondetects
if self.config.include_mag_errors: #pragma: no cover
err_names = list(self.config.error_names_dict.values())
for col, err_col in zip(self.config.column_names, err_names):
# set error to 0.7525 for 1 sigma detection 2.5log10(2) = .75257
flow_df.loc[np.isclose(flow_df[col], 99.), err_col] = 0.75257
flow_df.loc[np.isclose(flow_df[col], 99.), col] = self.config.mag_limits[col]
else:
for col in self.config.column_names:
flow_df.loc[np.isclose(flow_df[col], 99.), col] = self.config.mag_limits[col]
self.zgrid = np.linspace(self.config.zmin, self.config.zmax, self.config.nzbins)
if self.config.include_mag_errors: #pragma: no cover
pdfs = self.model.posterior(flow_df,
column=self.config.redshift_column_name,
seed=self.config.flow_seed,
grid=self.zgrid,
err_samples=self.config.n_error_samples)
else:
pdfs = self.model.posterior(flow_df,
column=self.config.redshift_column_name,
grid=self.zgrid)
zmode = np.array([self.zgrid[np.argmax(pdf)] for pdf in pdfs]).flatten()
qp_distn = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=pdfs))
qp_distn.set_ancil(dict(zmode=zmode))
self._do_chunk_output(qp_distn, start, end, first)