Add a new algorithm / RAIL stage

New Algorithm

To add new functionality that adds a new dependency, you should create a new package that users will access through RAIL’s common API.

Create a new github repository using the RAIL-project-template. This template makes use of copier to create a new repository that will use the rail namespace. The README for that project contains a few more steps you should take on your repository to include the same best practices across all rail packages.

Wrap your algorithm in rail stages, using the documentation below in New RAIL Stage as a guide.

Once you have created a new package that is released through PyPI (don’t worry - this packaging is included in the template), you should create a PR against the rail package to add your package as a dependency. Include your new package name in the rail packages config. You should also run the create_interactive_structure.py script in rail_base and do a PR. This will add in the necessary *.pyi files so that the interactive mode will see your new algorithm.

New RAIL Stage

To make it easier to eventually run RAIL algorithms at scale, all of the various algorithms are implemented as subclasses of the rail.core.stage.RailStage class. A RailStage is intended to take a particular set of inputs and configuration parameters, run a single bit of analysis, and produce one or more output files. The inputs, outputs and configuration parameters are all defined in particular ways to allow RailStage objects to be integrated into larger data analysis pipelines.

Simple Example

The following example has all of the required pieces of a RailStage and almost nothing else.

class ColumnMapper (RailStage):
    """Utility stage that remaps the names of columns.

    Notes
    -----
    1. This operates on pandas dataframes in parquet files.

    2. In short, this does:
    `output_data = input_data.rename(columns=self.config.columns, inplace=self.config.inplace)`

    """
    name = 'ColumnMapper'
    entrypoint_function = "__call__"  # the "work" function of this class
    interactive_function = "column_mapper" # what the interative function should be called that wraps this class

    config_options = RailStage.config_options.copy()
    config_options.update(
        columns=Param(dict, required=True, msg="Map of columns to rename"),
        inplace=Param(bool, default=False, msg="Update file in place"),
    )

    inputs = [('input', PqHandle)]
    outputs = [('output', PqHandle)]

    extra_interactive_documentation = """
    Examples
    --------
    Provide examples of interactive usage here
    """

    def run(self):
        data = self.get_data('input', allow_missing=True)
        out_data = data.rename(columns=self.config.columns, inplace=self.config.inplace)
        if self.config.inplace:  #pragma: no cover
            out_data = data
        self.add_data('output', out_data)

    def __call__(self, data: pd.DataFrame) -> PqHandle:
        """Return a table with the columns names changed

        Parameters
        ----------
        sample : pd.DataFrame
            The data to be renamed

        Returns
        -------
        PqHandle
            The degraded sample
        """
        self.set_data('input', data)
        self.run()
        return self.get_handle('output')

The required pieces, in the order that they appear are:

  1. The ColumnMapper (RailStage): defines a class called ColumnMapper and specifies that it inherits from RailStage.

  2. The name attribute is used for Pipeline Mode, to interface with ceci, and must match exactly the defined name of the class (for rail.interactive)

  3. The entrypoint_function line provides the name of the main function that

the user should call to use this class as a string, for use in the interactive module. This should be the only function required in order to perform this stage, after make_stage (inherited from RailStage).

  1. The interactive_function line provides the name that will be given to this stage in the interactive module. Typically this should be a snake case version of the class name.

  2. The config_options lines define the configuration parameters for this class, as well as their default values. Note that here we are copying the configuration parameters from the RailStage as well as defining some new ones.

  3. The run() method does the actual work.

Additionally, the docstring of the entrypoint_function in this case __call__ must follow numpydoc guidelines. While these are good practice everywhere, they are required in these functions, so that the contents can be parsed and re-formatted for the interactive module. Not all of the numpydoc requirements must be met, so a custom test is provided to check the necessary items. This test can be run as part of the general test suite, or with the wumpydoc.py script in rail_base which can be called against a specific function: python wumpydoc.py rail.estimation.algos.random_gauss.RandomGaussEstimator.estimate to validate a single entrypoint function.

The entrypoint function also takes an unused **kwargs. This is another necessary modification in order to adapt to the rail.interactive interface, in which parameters cannot be distinguished between belonging to make_stage (i.e., being used for config_option) and belonging to the entrypoint function itself. Again, this is tested for in the interactive module test suite.

The extra_interactive_documentation attribute in the example class shows above is an optional property for RAIL stages. The contents of this attribute (if provided) will be appended to the docstring generated for the interactive module. It is recommended to use this as a location to showcase examples.

Note

Note that contents of both the entrypoint function and the stage class docstrings will be used in the docstring of the generated interactive function wrapping each class. These docstrings should thus refer as little as possible to the class-specific elements of RAIL.

For example, description for the Returns section of the docstring for the entrypoint function should not detail the format of the data (typically a rail.core.DataHandle), but instead the contents of that DataHandle.

Advanced Example

Here is an example of a slightly more complicated RailStage.

class NaiveStackSummarizer(PZSummarizer):
    """Summarizer which simply histograms a point estimate
    """

    name = 'NaiveStack'
    entrypoint_function = "summarize"  # the user-facing science function for this class
    interactive_function = "naive_stack_summarizer"

    config_options = PZSummarizer.config_options.copy()
    config_options.update(zmin=SharedParams.copy_param("zmin"),
                          zmax=SharedParams.copy_param("zmax"),
                          nzbins=SharedParams.copy_param("nzbins"),
                          seed=Param(int, 87, msg="random seed"),
                          nsamples=Param(int, 1000, msg="Number of sample distributions to create"))
    outputs = [('output', QPHandle), ('single_NZ', QPHandle)]

    def __init__(self, args, **kwargs):
        super().__init__(self, args, **kwargs)
        self.zgrid = None

    def summarize(
        self, input_data: qp.Ensemble, **kwargs
    ) -> QPHandle | dict[str, QPHandle]:
        """Summarizer for NaiveStack which returns multiple items

        Parameters
        ----------
        input_data : qp.Ensemble
            Per-galaxy p(z), and any ancillary data associated with it

        Returns
        -------
        QPHandle | dict[str, QPHandle]
            Ensemble with n(z), and any ancillary data
            Return type depends on `output_mode`
        """
        self.set_data("input", input_data)
        self.run()
        self.finalize()
        if len(self.outputs) == 1 or self.config.output_mode != "return":
            results = self.get_handle("output")
        # if there is more than one output and output_mode = return, return them all as a dictionary
        elif len(self.outputs) > 1 and self.config.output_mode == "return":
            results = {}
            for output in self.outputs:
                results[output[0]] = self.get_handle(output[0])
        return results

    def run(self):
        rng = np.random.default_rng(seed=self.config.seed)
        test_data = self.get_data('input')
        self.zgrid = np.linspace(self.config.zmin, self.config.zmax, self.config.nzbins + 1)
        pdf_vals = test_data.pdf(self.zgrid)
        yvals = np.expand_dims(np.sum(np.where(np.isfinite(pdf_vals), pdf_vals, 0.), axis=0), 0)
        qp_d = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=yvals))

        bvals = np.empty((self.config.nsamples, len(self.zgrid)))
        for i in range(self.config.nsamples):
            bootstrap_draws = rng.integers(low=0, high=test_data.npdf, size=test_data.npdf)
            bvals[i] = np.sum(pdf_vals[bootstrap_draws], axis=0)
        sample_ens = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=bvals))

        self.add_data('output', sample_ens)
        self.add_data('single_NZ', qp_d)

The main difference with this new class is that it inherits from the PZSummarizer RailStage subclass. A rail.estimation.summarizer.PZSummarizer will take an ensemble of p(z) distributions for many objects, and summarize them into a single n(z) distribution for that ensemble.

A few things to note:

  1. We copy the configuration parameters for PZSummarizer and then add additional ones. In particular, some of the configuration parameters are copied from the SHARED_PARAMS dictionary using the SharedParams class. This copies over the existing Parameter including its default value and message. See Shared Parameters for a list of the existing Shared Parameters and more information on them.

  2. While we define the outputs here, we just use the inputs as defined in PZSummarizer.

  3. Because the number of outputs does not match that of the parent class PZSummarizer, we must re-define the entrypoint function summarize in order for the docstring to match the actual return, and for the case of output_mode = "return" (used for interactive mode), wherein all stage outputs are returned as a dictionary, instead of written to files. Were this not the case, the summarize method could simply be inherited from PZSummarizer