Contributing
Where to contribute: RAIL packages
Similar to the installation process, depending on how you want to contribute to RAIL you will be contributing to one or more of the RAIL packages. Given the package structure we imagine three main use cases for contributions:
If you are contributing to the core code base, or developing an algorithm that has minimal dependencies, you will probably only be contributing to RAIL, and only need to install the source code for RAIL.
If you are contritubing a new algorithm that does depend on a number of other packages beyond numpy, scipy and sklearn, you will probably be making a new rail_<algorithm> package, and eventually adding to the dependencies in rail_hub.
If you are using existing algorithms to do studies and build analysis pipelines to do those studies, you will probably only be contriubting to rail_pipelines.
Contribution workflow
The RAIL and rail_<xxx> repositories use an issue-branch-review workflow. When you identify something that should be done, make an issue for it. We ask that if applicable and you are comfortable doing so, you add labels to the issue to mark what part of the code base it relates to, its priority level, and if it’s well-suited to newcomers, as opposed to requiring more familiarity with the code or technical expertise. Also, if you intend to work on the issue yourself, please assign the issue to yourself.
To contribute, isolate an issue to work on, assign yourself, and leave a comment on the issue’s discussion page to let others know you’re working on it. Then, make a branch with a name of the form issue/[#]/brief-description and make changes in your branch. While developing in a branch, don’t forget to pull from main regularly to make sure your work is compatible with other recent changes.
- Before you make a pull request we ask that you do two things:
Run pylint and clean up the code accordingly. You may need to install pylint to do this.
Add unit tests and make sure that the new code is fully covered (see below). You make need to install pytest and pytest-cov to do this. You can use the do_cover.sh script in the top level directory to run pytest and generate a coverage report.
As regards full coverage, the automatic tests will require that 100% of the lines are covered by the tests. However, do note that you can use the comment #pragma: no cover to skip bits of code, e.g., a line of code that raises an exception if an input file is missing, rather than test every possible failure mode.
When you’re ready to merge your branch into the main branch, make a pull request, and request that other team members review it if you have any in mind, for example, those who have consulted on some of the work. Once the changes have been approved, you can merge and squash the pull request as well as close its corresponding issue by putting closes #[#] in the comment closing the pull request.
To review a pull request, it’s a good idea to start by pulling the changes and running the unit tests (see above). If there are no problems with that, you can make suggestions for optional improvements (e.g. adding a one-line comment before a clever block of code or including a demonstration of new functionality in the example notebooks) or request necessary changes (e.g. including an exception for an edge case that will break the code or separating out code that’s repeated in multiple places).
Adding a new Rail Stage
To make it easier to eventually run RAIL algorithms at scale, all of the various algorithms are implemented as RailStage python classes. A RailStage is intended to take a particular set of inputs and configuration parameters, run a single bit of analysis, and produce one or more output files. The inputs, outputs and configuration parameters are all defined in particular ways to allow RailStage objects to be integrated into larger data analysis pipelines.
Here is an example of a very simple RailStage.
class ColumnMapper(RailStage):
"""Utility stage that remaps the names of columns.
Notes
-----
1. This operates on pandas dataframs in parquet files.
2. In short, this does:
`output_data = input_data.rename(columns=self.config.columns, inplace=self.config.inplace)`
"""
name = 'ColumnMapper'
config_options = RailStage.config_options.copy()
config_options.update(chunk_size=100_000, columns=dict, inplace=False)
inputs = [('input', PqHandle)]
outputs = [('output', PqHandle)]
def __init__(self, args, comm=None):
RailStage.__init__(self, args, comm=comm)
def run(self):
data = self.get_data('input', allow_missing=True)
out_data = data.rename(columns=self.config.columns, inplace=self.config.inplace)
if self.config.inplace: #pragma: no cover
out_data = data
self.add_data('output', out_data)
def __call__(self, data: pd.DataFrame) -> pd.DataFrame:
"""Return a table with the columns names changed
Parameters
----------
sample : pd.DataFrame
The data to be renamed
Returns
-------
pd.DataFrame
The degraded sample
"""
self.set_data('input', data)
self.run()
return self.get_handle('output')
This particular example has all of the required pieces and almost nothing else. The required pieces, in the order that they appear are:
The ColumnMapper(RailStage): defines a class called ColumnMapper and specifies that it inherits from RailStage.
The name = ColumnMapper is required, and should match the class name.
The config_options lines define the configuration parameters for this class, as well as their default values. Note that here we are copying the configuration parameters from the RailStage as well as defining some new ones.
The inputs = [(‘input’, PqHandle)] and outputs = [(‘output’, PqHandle)] define the inputs and outputs, and the expected data types for those, in this case Parquet files.
The __init__ method does any class-specific initialization. In this case there isn’t any and the method is superflous.
The run() method does the actual work, note that it doesn’t take any arguments, that it uses methods self.get_data() and self.add_data() to access the input data and set the output data, and that it uses self.config to access the configuration parameters.
The __call__() method provides an interface for interactive use. It provide a way to pass in data (and in other cases configuraiton parameters) to the class so that they can be used in the run method.
Here is an example of a slightly more complicated RailStage.
class NaiveStack(PZSummarizer):
"""Summarizer which simply histograms a point estimate
"""
name = 'NaiveStack'
config_options = PZSummarizer.config_options.copy()
config_options.update(zmin=Param(float, 0.0, msg="The minimum redshift of the z grid"),
zmax=Param(float, 3.0, msg="The maximum redshift of the z grid"),
nzbins=Param(int, 301, msg="The number of gridpoints in the z grid"),
seed=Param(int, 87, msg="random seed"),
nsamples=Param(int, 1000, msg="Number of sample distributions to create"))
outputs = [('output', QPHandle),
('single_NZ', QPHandle)]
def __init__(self, args, comm=None):
PZSummarizer.__init__(self, args, comm=comm)
self.zgrid = None
def run(self):
rng = np.random.default_rng(seed=self.config.seed)
test_data = self.get_data('input')
self.zgrid = np.linspace(self.config.zmin, self.config.zmax, self.config.nzbins + 1)
pdf_vals = test_data.pdf(self.zgrid)
yvals = np.expand_dims(np.sum(np.where(np.isfinite(pdf_vals), pdf_vals, 0.), axis=0), 0)
qp_d = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=yvals))
bvals = np.empty((self.config.nsamples, len(self.zgrid)))
for i in range(self.config.nsamples):
bootstrap_draws = rng.integers(low=0, high=test_data.npdf, size=test_data.npdf)
bvals[i] = np.sum(pdf_vals[bootstrap_draws], axis=0)
sample_ens = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=bvals))
self.add_data('output', sample_ens)
self.add_data('single_NZ', qp_d)
The main difference with this new class is that it inherit from the PZSummarizer RailStage sub-class. A PZSummarizer will take an ensemble of p(z) distributions for many objects, and summarize them into a single n(z) distribution for that ensemble.
A few things to note:
We copy the configuration parameters for PZSummarizer and then add addtional ones.
The run() method is implemented here, but the function for interactive use summarize() is actually defined in PZSummarizer.
While we define the outputs here, we just use the inputs as defined in PZSummarizer.
Adding a new Rail Pipeline
Here is an example of the first part of the goldenspike pipeline defintion.
class GoldenspikePipeline(RailPipeline):
def __init__(self):
RailPipeline.__init__(self)
DS = RailStage.data_store
DS.__class__.allow_overwrite = True
bands = ['u','g','r','i','z','y']
band_dict = {band:f'mag_{band}_lsst' for band in bands}
rename_dict = {f'mag_{band}_lsst_err':f'mag_err_{band}_lsst' for band in bands}
self.flow_engine_train = FlowEngine.build(
flow=flow_file,
n_samples=50,
seed=1235,
output=os.path.join(namer.get_data_dir(DataType.catalog, CatalogType.created), "output_flow_engine_train.pq"),
)
self.lsst_error_model_train = LSSTErrorModel.build(
connections=dict(input=self.flow_engine_train.io.output),
bandNames=band_dict, seed=29,
output=os.path.join(namer.get_data_dir(DataType.catalog, CatalogType.degraded), "output_lsst_error_model_train.pq"),
)
self.inv_redshift = InvRedshiftIncompleteness.build(
connections=dict(input=self.lsst_error_model_train.io.output),
pivot_redshift=1.0,
output=os.path.join(namer.get_data_dir(DataType.catalog, CatalogType.degraded), "output_inv_redshift.pq"),
)
self.line_confusion = LineConfusion.build(
connections=dict(input=self.inv_redshift.io.output),
true_wavelen=5007., wrong_wavelen=3727., frac_wrong=0.05,
output=os.path.join(namer.get_data_dir(DataType.catalog, CatalogType.degraded), "output_line_confusion.pq"),
)
What this is doing is:
Defining a class GoldenspikePipeline to encapsulate the pipeline and setting up that pipeline.
Set up the rail DataStore for interactive use, allowing you to overwrite output files, (say if you re-run the pipeline in a notebook cell).
Defining some common parameters, e.g., bands, bands_dict for the pipeline.
Defining four stages, and adding them to the pipeline, note that for each stage the syntax is more or less the same. We have to define,
The name of the stage, i.e., self.flow_engine_train will make a stage called flow_engine_train through some python cleverness.
The class of the stage, which is specified by which type of stage we ask to build, FlowEngine.build will make a FlowEngine stage.
Any configuration parameters, which are specified as keyword argurments, e.g., n_samples=50.
Any input connections from other stages, e.g., connections=dict(input=self.flow_engine_train.io.output), in the self.lsst_error_model_train block will connect the output of self.flow_engine_train to the input of self.lsst_error_model_train. Later in that example we can see how to connect multiple inputs, e.g., one named input and another named model, as required for an estimator stage.
We use the namer class and enumerations to ensure that the data end up following our location convenctions.