rail.core.stage module

Base class for PipelineStages in Rail

class rail.core.stage.RailPipeline

Bases: MiniPipeline

A pipeline intended for interactive use

Mainly this allows for more concise pipeline specification, along the lines of:

self.stage_1 = Stage1Class.build(…) self.stage_2 = Stage2Class.build(connections=dict(input=self.stage1.io.output), …)

And end up with a fully specified pipeline.

__init__()

Create a MiniRunner Pipeline

In addition to parent initialization parameters (see the Pipeline base class), this subclass can take these optional keywords.

Parameters:
  • callback (function(event_type: str, event_info: dict)) – A function called when jobs launch, complete, or fail, and when the pipeline aborts. Can be used for tracing execution. Default=None.

  • sleep (function(t: float)) – A function to replace time.sleep called in the pipeline to wait until the next time to check process completion Most normal usage will not need this. Default=None.

Return type:

None

static build_and_write(class_name, output_yaml, input_dict=None, stages_config=None, output_dir='.', log_dir='.', **kwargs)

Build a RailPipeline and write the config yaml for for it

Parameters:
  • class_name (str) – Full name of the class, e.g., rail.core.stage.RailPipeline

  • output_yaml (str) – Path to the output yaml file

  • input_dict (dict | None) – Dict of all the inputs needed to run the pipeline

  • stages_config (dict | None) – Stage configuration overrides

  • output_dir (str) – Directory to write pipeline outputs to

  • log_dir (str) – Directory to write pipeline log files to

  • **kwargs (Any) – Passed as arguements to the pipeline constructor

Return type:

None

classmethod get_pipeline_class(name)
Parameters:

name (str)

Return type:

type[RailPipeline]

static load_pipeline_class(class_name)

Import a particular RailPipeline subclass by name

Parameters:

class_name (str) – Full name of the class, e.g., rail.core.stage.RailPipeline

Returns:

Requested Pipeline sub-class

Return type:

type[RailPipeline]

pipeline_classes: dict[str, type[RailPipeline]] = {}
classmethod print_classes()
Return type:

None

class rail.core.stage.RailStage

Bases: PipelineStage

Base class for rail stages

This inherits from ceci.PipelineStage and implements rail-specific data handling In particular, this provides some very useful features:

1. Access to the DataStore, which keeps track of the various data used in a pipeline, and provides access to each by a unique key.

2. Functionality to help manage multiple instances of a particular class of stage. The original ceci design didn’t have a mechanism to handle this. If you tried you would run into name clashes between the different instances. In ceci 1.7 we added functionality to ceci to allow you to have multiple instances of a single class, in particular we distinguish between the class name (cls.name) and and the name of the particular instance (self.instance_name) and added aliasing for inputs and outputs, so that different instances of PipelineStage would be able to give different names to their inputs and outputs. However, using that functionality in a consistent way requires a bit of care. So here we are providing methods to do that, and to do it in a way that uses the DataStore to keep track of the various data products.

Notes

These methods typically take a tag as input (i.e., something like “input”), but use the “aliased_tag” (i.e., something like “inform_pz_input”) when interacting with the DataStore.

In particular, the get_handle(), get_data() and input_iterator() will get the data from the DataStore under the aliased tag. E.g., if you call self.get_data(‘input’) for a Stage that has aliased “input” to “special_pz_input”, it will get the data associated to “special_pz_input” in the DataStore.

Similarly, add_handle() and set_data() will add the data to the DataStore under the aliased tag e.g., if you call self.set_data(‘input’) for a Stage that has aliased “input” to “special_pz_input”, it will store the data in the DataStore under the key “special_pz_input”.

And connect_input() will do the alias lookup both on the input and output. I.e., it is the same as calling self.set_data(inputTag, other.get_handle(outputTag, allow_missing=True), do_read=False)

__init__(args, **kwargs)

Constructor: Do RailStage specific initialization

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

None

add_data(tag, data=None)

Adds a handle to the DataStore associated to a particular tag and attaches data to it.

Parameters:
  • tag (str) – The tag (from cls.inputs or cls.outputs) for this data

  • data (rail.core.data.DataLike) – Data being added

Returns:

The data accesed by the handle assocated to the tag

Return type:

DataLike

add_handle(tag, data=None, path=None)

Adds a DataHandle associated to a particular tag

Parameters:
  • tag (str) – The tag (from cls.inputs or cls.outputs) for this data

  • data (rail.core.data.DataLike) – If not None these data will be associated to the handle

  • path (str | None) – If not None, this will be the path used to read the data

Returns:

The handle that gives access to the associated data

Return type:

DataHandle

classmethod build(**kwargs)

Return an object that can be used to build a stage

Parameters:

kwargs (Any)

Return type:

RailStageBuild

connect_input(other, inputTag=None, outputTag=None)

Connect another stage to this stage as an input

Parameters:
  • other (PipelineStage) – The stage whose output is being connected

  • inputTag (str | None) – Which input tag of this stage to connect to. None -> self.inputs[0]

  • outputTag (str | None) – Which output tag of the other stage to connect to. None -> other.outputs[0]

Returns:

The input handle for this stage

Return type:

DataHandle

entrypoint_function: str | None = None
extra_interactive_documentation: str | None = None
get_data(tag, allow_missing=True)

Gets the data associated to a particular tag

Notes

1. This gets the data via the DataHandle, and can and will read the data from disk if needed.

Parameters:
  • tag (str) – The tag (from cls.inputs or cls.outputs) for this data

  • allow_missing (bool) – If False this will raise a key error if the tag is not in the DataStore

Returns:

The data accesed by the handle assocated to the tag

Return type:

DataLike

get_handle(tag, path=None, allow_missing=False)

Gets a DataHandle associated to a particular tag

Parameters:
  • tag (str) – The tag (from cls.inputs or cls.outputs) for this data

  • path (str | None) – The path to the data, only needed if we might need to read the data

  • allow_missing (bool) – If False this will raise a key error if the tag is not in the DataStore

Returns:

The handle that give access to the associated data

Return type:

DataHandle

input_iterator(tag, **kwargs)

Iterate the input assocated to a particular tag

Parameters:
  • tag (str) – The tag (from cls.inputs or cls.outputs) for this data

  • **kwargs (Any) – These will be passed to the Handle’s iterator method

Return type:

Fixme

interactive_function: str | None = None
classmethod make_and_connect(**kwargs)

Make a stage and connects it to other stages

Notes

kwargs are used to set stage configuration, the should be key, value pairs, where the key is the parameter name and the value is value we want to assign

The ‘connections’ keyword is special, it is a dict[str, DataHandle] and should define the Input connections for this stage

Return type:

A stage

Parameters:

kwargs (Any)

name = 'RailStage'
open_model(tag='model', **kwargs)

Load the mode and/or attach it to this Stage

Parameters:
  • tag (str) – Input tag associated to the model

  • **kwargs (Any) – Should include ‘model’, see notes

Return type:

rail.core.data.ModelLike

Notes

The keyword arguement ‘model’ should be either

  1. an object with a trained model,

  2. a path pointing to a file that can be read to obtain the trained model,

  3. or a ModelHandle providing access to the trained model.

Returns:

The object encapsulating the trained model.

Return type:

Any

Parameters:
  • tag (str)

  • kwargs (Any)

set_data(tag, data, path=None, do_read=True)

Sets the data associated to a particular tag

Notes

1. If data is a DataHandle and tag is one of the input tags, then this will add an alias between the two, i.e., it will set self.config.alias[tag] = data.tag. This allows the user to make connections between stages simply by passing DataHandles between them.

Parameters:
  • tag (str) – The tag (from cls.inputs or cls.outputs) for this data

  • data (rail.core.data.DataLike) – The data being set,

  • path (str | None) – Can be used to set the path for the data

  • do_read (bool) – If True, will read the data if it is not set

Returns:

The data accessed by the handle associated to the tag

Return type:

DataLike

stage_columns: list[str] | None
class rail.core.stage.RailStageBuild

Bases: object

A small utility class that building stages

This provides a mechasim to get the name of the stage from the attribute name in the Pipeline the stage belongs to.

I.e., we can do:

a_pipe.stage_name = StageClass.build(…)

And get a stage named ‘stage_name’, rather than having to do:

a_stage = StageClass.make_stage(..) a_pipe.add_stage(a_stage)

__init__(stage_class, **kwargs)
Parameters:
build(name)

Actually build the stage, this is called by the pipeline the stage belongs to

Parameters:

name (str) – The name for this stage we are building

Returns:

The newly built stage

Return type:

RailStage

property io: StageIO | None
class rail.core.stage.StageIO

Bases: object

A small utility class for Stage Input/ Output

This make it possible to get access to stage inputs and outputs as attributes rather that by using the get_handle() method.

In short it maps

a_stage.get_handle(‘input’, allow_missing=True) to a_stage.input

This allows users to be more concise when writing pipelines.

__init__(parent)
Parameters:

parent (RailStage)