Add a new algorithm / RAIL stage
New Algorithm
To add new functionality that adds a new dependency, you should create a new package that users will access through RAIL’s common API.
Create a new github repository using the RAIL-project-template. This template makes use of
copier to create a new repository that will use the rail namespace. The README
for that project contains a few more steps you should take on your repository to include
the same best practices across all rail packages.
Wrap your algorithm in rail stages, using the documentation below in New RAIL Stage as a guide.
Once you have created a new package that is released through PyPI (don’t worry - this
packaging is included in the template), you should create a PR against the rail
package to add your package as a dependency. Include your new package name in the rail
packages config. You should also
run the create_interactive_structure.py script in rail_base and do a PR. This will add in the necessary
*.pyi files so that the interactive mode will see your new algorithm.
New RAIL Stage
To make it easier to eventually run RAIL algorithms at scale, all of the various
algorithms are implemented as subclasses of the
rail.core.stage.RailStage class. A RailStage is intended to
take a particular set of inputs and configuration parameters, run a single bit
of analysis, and produce one or more output files. The inputs, outputs and
configuration parameters are all defined in particular ways to allow
RailStage objects to be integrated into larger data analysis pipelines.
Simple Example
The following example has all of the required pieces of a RailStage and
almost nothing else.
class ColumnMapper (RailStage):
"""Utility stage that remaps the names of columns.
Notes
-----
1. This operates on pandas dataframes in parquet files.
2. In short, this does:
`output_data = input_data.rename(columns=self.config.columns, inplace=self.config.inplace)`
"""
name = 'ColumnMapper'
entrypoint_function = "__call__" # the "work" function of this class
interactive_function = "column_mapper" # what the interative function should be called that wraps this class
config_options = RailStage.config_options.copy()
config_options.update(
columns=Param(dict, required=True, msg="Map of columns to rename"),
inplace=Param(bool, default=False, msg="Update file in place"),
)
inputs = [('input', PqHandle)]
outputs = [('output', PqHandle)]
extra_interactive_documentation = """
Examples
--------
Provide examples of interactive usage here
"""
def run(self):
data = self.get_data('input', allow_missing=True)
out_data = data.rename(columns=self.config.columns, inplace=self.config.inplace)
if self.config.inplace: #pragma: no cover
out_data = data
self.add_data('output', out_data)
def __call__(self, data: pd.DataFrame) -> PqHandle:
"""Return a table with the columns names changed
Parameters
----------
sample : pd.DataFrame
The data to be renamed
Returns
-------
PqHandle
The degraded sample
"""
self.set_data('input', data)
self.run()
return self.get_handle('output')
The required pieces, in the order that they appear are:
The
ColumnMapper (RailStage):defines a class calledColumnMapperand specifies that it inherits fromRailStage.The
nameattribute is used for Pipeline Mode, to interface withceci, and must match exactly the defined name of the class (forrail.interactive)The
entrypoint_functionline provides the name of the main function that
the user should call to use this class as a string, for use in the interactive module. This should be the only function required in order to perform this stage, after
make_stage(inherited fromRailStage).
The
interactive_functionline provides the name that will be given to this stage in the interactive module. Typically this should be a snake case version of the class name.The
config_optionslines define the configuration parameters for this class, as well as their default values. Note that here we are copying the configuration parameters from theRailStageas well as defining some new ones.The
run()method does the actual work.
Additionally, the docstring of the entrypoint_function in this case __call__
must follow numpydoc guidelines. While these are good practice
everywhere, they are required in these functions, so that the contents can be parsed and
re-formatted for the interactive module. Not all of the numpydoc requirements must be
met, so a custom test is provided to check the necessary items. This test can be run as
part of the general test suite, or with the wumpydoc.py script in rail_base
which can be called against a specific function: python wumpydoc.py
rail.estimation.algos.random_gauss.RandomGaussEstimator.estimate to validate a single
entrypoint function.
The entrypoint function also takes an unused **kwargs. This is another necessary
modification in order to adapt to the rail.interactive interface, in which
parameters cannot be distinguished between belonging to make_stage (i.e., being used
for config_option) and belonging to the entrypoint function itself. Again, this is
tested for in the interactive module test suite.
The extra_interactive_documentation attribute in the example class shows above is an
optional property for RAIL stages. The contents of this attribute (if provided) will be
appended to the docstring generated for the interactive module. It is recommended to use
this as a location to showcase examples.
Note
Note that contents of both the entrypoint function and the stage class docstrings will be used in the docstring of the generated interactive function wrapping each class. These docstrings should thus refer as little as possible to the class-specific elements of RAIL.
For example, description for the Returns section of the docstring for the entrypoint
function should not detail the format of the data (typically a
rail.core.DataHandle), but instead the contents of that DataHandle.
Advanced Example
Here is an example of a slightly more complicated RailStage.
class NaiveStackSummarizer(PZSummarizer):
"""Summarizer which simply histograms a point estimate
"""
name = 'NaiveStack'
entrypoint_function = "summarize" # the user-facing science function for this class
interactive_function = "naive_stack_summarizer"
config_options = PZSummarizer.config_options.copy()
config_options.update(zmin=SharedParams.copy_param("zmin"),
zmax=SharedParams.copy_param("zmax"),
nzbins=SharedParams.copy_param("nzbins"),
seed=Param(int, 87, msg="random seed"),
nsamples=Param(int, 1000, msg="Number of sample distributions to create"))
outputs = [('output', QPHandle), ('single_NZ', QPHandle)]
def __init__(self, args, **kwargs):
super().__init__(self, args, **kwargs)
self.zgrid = None
def summarize(
self, input_data: qp.Ensemble, **kwargs
) -> QPHandle | dict[str, QPHandle]:
"""Summarizer for NaiveStack which returns multiple items
Parameters
----------
input_data : qp.Ensemble
Per-galaxy p(z), and any ancillary data associated with it
Returns
-------
QPHandle | dict[str, QPHandle]
Ensemble with n(z), and any ancillary data
Return type depends on `output_mode`
"""
self.set_data("input", input_data)
self.run()
self.finalize()
if len(self.outputs) == 1 or self.config.output_mode != "return":
results = self.get_handle("output")
# if there is more than one output and output_mode = return, return them all as a dictionary
elif len(self.outputs) > 1 and self.config.output_mode == "return":
results = {}
for output in self.outputs:
results[output[0]] = self.get_handle(output[0])
return results
def run(self):
rng = np.random.default_rng(seed=self.config.seed)
test_data = self.get_data('input')
self.zgrid = np.linspace(self.config.zmin, self.config.zmax, self.config.nzbins + 1)
pdf_vals = test_data.pdf(self.zgrid)
yvals = np.expand_dims(np.sum(np.where(np.isfinite(pdf_vals), pdf_vals, 0.), axis=0), 0)
qp_d = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=yvals))
bvals = np.empty((self.config.nsamples, len(self.zgrid)))
for i in range(self.config.nsamples):
bootstrap_draws = rng.integers(low=0, high=test_data.npdf, size=test_data.npdf)
bvals[i] = np.sum(pdf_vals[bootstrap_draws], axis=0)
sample_ens = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=bvals))
self.add_data('output', sample_ens)
self.add_data('single_NZ', qp_d)
The main difference with this new class is that it inherits from the
PZSummarizer RailStage subclass. A
rail.estimation.summarizer.PZSummarizer will take an ensemble of p(z)
distributions for many objects, and summarize them into a single n(z) distribution
for that ensemble.
A few things to note:
We copy the configuration parameters for
PZSummarizerand then add additional ones. In particular, some of the configuration parameters are copied from theSHARED_PARAMSdictionary using theSharedParamsclass. This copies over the existing Parameter including its default value and message. See Shared Parameters for a list of the existing Shared Parameters and more information on them.While we define the
outputshere, we just use the inputs as defined inPZSummarizer.Because the number of outputs does not match that of the parent class
PZSummarizer, we must re-define the entrypoint functionsummarizein order for the docstring to match the actual return, and for the case ofoutput_mode = "return"(used for interactive mode), wherein all stage outputs are returned as a dictionary, instead of written to files. Were this not the case, thesummarizemethod could simply be inherited fromPZSummarizer