Pipeline Usage

This page details the usage of RAIL in pipeline mode. A RAIL pipeline is an ordered arrangement of RAIL stages, with defined inputs, outputs, and configurations. Like with stages, running RAIL via pipelines is configurable and reproducible. It is the recommended mode of usage for:

  • well-defined workflows

  • operating on large data sets

  • workflows that make use of parallelization

To learn more about RAIL stages, visit the What Are RAIL Stages? page.

What does a pipeline do? Why would you want to make a pipeline?

A RAIL pipeline is a directed acyclic graph of RAIL stages, defined by the guarantee that the inputs to each stage either exist already or are produced as output of earlier stages in the pipeline. A pipeline is defined by two .yml files, containing:

  1. All the configuration parameters for every stage in the pipeline, including each stage’s name and its inputs and outputs

  2. The order in which the stages are to be run

With pipelines, a user can reproduce workflows and outputs. Pipelines allow a user to detail the order of stages to be run, as well as the inputs, outputs, and settings of each stage, prior to execution. Pipelines can also be parallelized. Thus, an example use case would be running RAIL on HPC systems, where the configuration method and parallelization are suited for that workflow.

This is an example of how RAIL stages are arranged in a RAIL pipeline.

../../_images/pipeline_example.png

To learn how to run RAIL in an exploratory environment, visit the Interactive Usage page.

Pipeline API

RAIL Pipelines are run via the ceci framework. Execution of a pipeline entails an initialize() step, in which ceci checks that each stage’s inputs either exist or will be produced by an earlier stage in the pipeline, followed by a run() step to actually perform the specified calculations.

To learn more about ceci, visit the ceci documentation.

rail.core.RailPipeline is the base class for all RAIL pipelines. Subclasses can build particular types of analysis pipelines subject to some configuration choices, such as which algorithms to use.

class rail.core.RailPipeline

A pipeline intended for interactive use

Mainly this allows for more concise pipeline specification, along the lines of:

self.stage_1 = Stage1Class.build(…) self.stage_2 = Stage2Class.build(connections=dict(input=self.stage1.io.output), …)

And end up with a fully specified pipeline.

__init__()

Create a MiniRunner Pipeline

In addition to parent initialization parameters (see the Pipeline base class), this subclass can take these optional keywords.

Parameters:
  • callback (function(event_type: str, event_info: dict)) – A function called when jobs launch, complete, or fail, and when the pipeline aborts. Can be used for tracing execution. Default=None.

  • sleep (function(t: float)) – A function to replace time.sleep called in the pipeline to wait until the next time to check process completion Most normal usage will not need this. Default=None.

Return type:

None

classmethod __new__(*args, **kwargs)

Making a pipeline configuration file

static RailPipeline.build_and_write(class_name, output_yaml, input_dict=None, stages_config=None, output_dir='.', log_dir='.', **kwargs)

Build a RailPipeline and write the config yaml for for it

Parameters:
  • class_name (str) – Full name of the class, e.g., rail.core.stage.RailPipeline

  • output_yaml (str) – Path to the output yaml file

  • input_dict (dict | None) – Dict of all the inputs needed to run the pipeline

  • stages_config (dict | None) – Stage configuration overrides

  • output_dir (str) – Directory to write pipeline outputs to

  • log_dir (str) – Directory to write pipeline log files to

  • **kwargs (Any) – Passed as arguements to the pipeline constructor

Return type:

None

Introspection various types of Pipelines

classmethod RailPipeline.print_classes()
Return type:

None

classmethod RailPipeline.get_pipeline_class(name)
Parameters:

name (str)

Return type:

type[RailPipeline]

static RailPipeline.load_pipeline_class(class_name)

Import a particular RailPipeline subclass by name

Parameters:

class_name (str) – Full name of the class, e.g., rail.core.stage.RailPipeline

Returns:

Requested Pipeline sub-class

Return type:

type[RailPipeline]

Making Pipelines Interactively

Here is an example of the first part of the goldenspike pipeline definition.

 from rail.utils.path_utils import RAILDIR
 flow_file = os.path.join(RAILDIR, 'rail/examples_data/goldenspike_data/data/pretrained_flow.pkl')

class GoldenspikePipeline(RailPipeline):

     default_input_dict = dict(
         model=flow_file,
     )


    def __init__(self):
        RailPipeline.__init__(self)

        bands = ['u','g','r','i','z','y']
        band_dict = {band:f'mag_{band}_lsst' for band in bands}
        rename_dict = {f'mag_{band}_lsst_err':f'mag_err_{band}_lsst' for band in bands}


     self.flow_engine_train = FlowCreator.build(
         model=flow_file,
         n_samples=50,
         seed=1235,
     )

     self.lsst_error_model_train = LSSTErrorModel.build(
         connections=dict(input=self.flow_engine_train.io.output),
         renameDict=band_dict, seed=29,
     )

     self.inv_redshift = InvRedshiftIncompleteness.build(
         connections=dict(input=self.lsst_error_model_train.io.output),
         pivot_redshift=1.0,
     )

     self.line_confusion = LineConfusion.build(
         connections=dict(input=self.inv_redshift.io.output),
         true_wavelen=5007., wrong_wavelen=3727., frac_wrong=0.05,
     )

What this is doing is:

  1. Finding the pretrained model flow_file to use to generate data.

  2. Defining a class GoldenspikePipeline to encapsulate the pipeline and setting up that pipeline.

  3. Defining some common parameters, e.g., bands, bands_dict for the pipeline.

  4. Defining four stages, and adding them to the pipeline, note that for each stage the syntax is more or less the same. We have to define,

  • The name of the stage, i.e., self.flow_engine_train will make a stage called flow_engine_train through some python cleverness.

  • The class of the stage, which is specified by which type of stage we ask to build, FlowEngine.build will make a FlowEngine stage.

  • Any configuration parameters, which are specified as keyword arguments, e.g., n_samples=50.

  • Any input connections from other stages, e.g., connections=dict(input=self.flow_engine_train.io.output), in the self.lsst_error_model_train block will connect the output of self.flow_engine_train to the input of self.lsst_error_model_train. Later in that example we can see how to connect multiple inputs, e.g., one named input and another named model, as required for an estimator stage.

Making a configurable Pipeline

Here is an example of a configurable pipeline, where we select which algorithms to include in the pipeline.

from rail.utils.algo_library import PZ_ALGORITHMS

eval_shared_stage_opts = dict(
    metrics=['all'],
    exclude_metrics=['rmse', 'ks', 'kld', 'cvm', 'ad', 'rbpe', 'outlier'],
    hdf5_groupname="",
    limits=[0, 3.5],
    truth_point_estimates=['redshift'],
    point_estimates=['zmode'],
)


class PzPipeline(RailPipeline):

    default_input_dict={
        'input_train':'dummy.in',
        'input_test':'dummy.in',
    }

    def __init__(self, algorithms: dict|None=None):
        RailPipeline.__init__(self)

        if algorithms is None:
            algorithms = PZ_ALGORITHMS

        for key, val in algorithms.items():
            inform_class = ceci.PipelineStage.get_stage(val['Inform'], val['Module'])
            the_informer = inform_class.make_and_connect(
                name=f'inform_{key}',
                aliases=dict(input='input_train'),
                hdf5_groupname='',
            )
            self.add_stage(the_informer)

            estimate_class = ceci.PipelineStage.get_stage(val['Estimate'], val['Module'])
            the_estimator = estimate_class.make_and_connect(
                name=f'estimate_{key}',
                aliases=dict(input='input_test'),
                connections=dict(
                    model=the_informer.io.model,
                ),
                calculated_point_estimates=['zmode'],
                hdf5_groupname='',
            )
            self.add_stage(the_estimator)

            the_evaluator = SingleEvaluator.make_and_connect(
                name=f'evaluate_{key}',
                aliases=dict(truth='input_test'),
                connections=dict(
                    input=the_estimator.io.output,
                ),
                **eval_shared_stage_opts,
            )
            self.add_stage(the_evaluator)

The main differences with the previous example are that:

  • We pass in a dict that gives the names of all the algorithms to include, as well as information on how to load the stages in question.

  • Instead of using build we use make_and_connect followed by add_stage. This is because we are making several stages of the same type, but with different names, inside a loop, so the cleverness behind the build mechanism would not work here.

Making Pipelines with YAML

Coming soon

Data Handles

One particularity of RAIL is that we wrap data in rail.core.DataHandle objects rather than passing the data directly to functions. There are a few reasons for this.

Potentially large data volume

One of the challenges that RAIL must address is the potentially very large datasets that we use. At times we will be dealing with billions of objects, and will not be able to load the object tables into the memory of a single processor.

Parallel processing

DataHandle API

rail.core.DataHandle is the class that lets users connect data to RAIL.

class rail.core.DataHandle

Class to act as a handle for a bit of data. Associating it with a file and providing tools to read & write it to that file

__init__(tag, data=None, path=None, creator=None)

Constructor

Parameters:
  • tag (str) – The tag under which this data handle can be found in the store

  • data (DataLike | None) – The associated data

  • path (str | None) – The path to the associated file

  • creator (str | None) – The name of the stage that created this data handle

Return type:

None

classmethod __new__(*args, **kwargs)

Basic file-like operations

DataHandle.open(**kwargs)

Open and return the associated file

Parameters:

**kwargs (Any) – Passed to the call to open the file in question

Returns:

Newly opened file

Return type:

FileLike

Notes

This will simply open the file and return a FileLike object to the caller. It will not read or cache the data

DataHandle.close(**kwargs)

Close the associated file

Parameters:

kwargs (Any)

Return type:

None

DataHandle.read(force=False, **kwargs)

Read and return the data from the associated file

Parameters:
  • force (bool) – If true, force re-reading the data

  • **kwargs (Any) – Passed to the call to read the data

Returns:

Data that were read

Return type:

DataLike

Notes

This will read the entire file, and while useful for testing on small files, will not work on very large files.

DataHandle.write(**kwargs)

Write the data to the associated file

Parameters:

kwargs (Any)

Return type:

None

Operations for parallelized access to data

DataHandle.iterator(**kwargs)

Iterator over the data

Parameters:

kwargs (Any)

Return type:

Iterable

DataHandle.size(**kwargs)

Return the size of the data associated to this handle

Parameters:

kwargs (Any)

Return type:

int

DataHandle.data_size(**kwargs)

Return the size of the in memory data

Parameters:

kwargs (Any)

Return type:

int

DataHandle.initialize_write(data_length, **kwargs)

Initialize file to be written by chunks

Parameters:
  • data_length (int) – Number of rows of data that we will write, used to reserve space

  • **kwargs (Any) – Information about the columns we will write

Return type:

None

DataHandle.write_chunk(start, end, **kwargs)

Write the data to the associated file

Parameters:
  • start (int) – Index of starting row for this chunk of data

  • end (int) – Index of ending row for this chunk of data

  • **kwargs (Any) – Passed to call to write this chunk of data

Return type:

None

DataHandle.finalize_write(**kwargs)

Finalize and close file written by chunks

Parameters:

**kwargs (Any) – Passed to call to write this chunk of data

Return type:

None

DataHandle.iterator(**kwargs)

Iterator over the data

Parameters:

kwargs (Any)

Return type:

Iterable

DataHandle.size(**kwargs)

Return the size of the data associated to this handle

Parameters:

kwargs (Any)

Return type:

int

Functions for working with DataHandles

DataHandle.set_data(data, partial=False)

Set the data for a chunk, and set the partial flag to true

Parameters:
  • data (rail.core.data.DataLike)

  • partial (bool)

Return type:

None

classmethod DataHandle.make_name(tag)

Construct and return file name for a particular data tag

Parameters:

tag (str)

Return type:

str