"""Rail-specific data management"""
import os
import tables_io
import pickle
import qp
[docs]class DataHandle:
"""Class to act as a handle for a bit of data. Associating it with a file and
providing tools to read & write it to that file
Parameters
----------
tag : str
The tag under which this data handle can be found in the store
data : any or None
The associated data
path : str or None
The path to the associated file
creator : str or None
The name of the stage that created this data handle
"""
suffix = ''
def __init__(self, tag, data=None, path=None, creator=None):
"""Constructor """
self.tag = tag
self.data = data
self.path = path
self.creator = creator
self.fileObj = None
self.groups = None
self.partial = False
[docs] def open(self, **kwargs):
"""Open and return the associated file
Notes
-----
This will simply open the file and return a file-like object to the caller.
It will not read or cache the data
"""
if self.path is None:
raise ValueError("DataHandle.open() called but path has not been specified")
self.fileObj = self._open(os.path.expandvars(self.path), **kwargs)
return self.fileObj
@classmethod
def _open(cls, path, **kwargs):
raise NotImplementedError("DataHandle._open") #pragma: no cover
[docs] def close(self, **kwargs): #pylint: disable=unused-argument
"""Close """
self.fileObj = None
[docs] def read(self, force=False, **kwargs):
"""Read and return the data from the associated file """
if self.data is not None and not force:
return self.data
self.set_data(self._read(os.path.expandvars(self.path), **kwargs))
return self.data
def __call__(self, **kwargs):
"""Return the data, re-reading the fill if needed"""
if self.has_data and not self.partial:
return self.data
return self.read(force=True, **kwargs)
@classmethod
def _read(cls, path, **kwargs):
raise NotImplementedError("DataHandle._read") #pragma: no cover
[docs] def write(self, **kwargs):
"""Write the data to the associatied file """
if self.path is None:
raise ValueError("TableHandle.write() called but path has not been specified")
if self.data is None:
raise ValueError(f"TableHandle.write() called for path {self.path} with no data")
outdir = os.path.dirname(os.path.abspath(os.path.expandvars(self.path)))
if not os.path.exists(outdir): #pragma: no cover
os.makedirs(outdir, exist_ok=True)
return self._write(self.data, os.path.expandvars(self.path), **kwargs)
@classmethod
def _write(cls, data, path, **kwargs):
raise NotImplementedError("DataHandle._write") #pragma: no cover
[docs] def initialize_write(self, data_lenght, **kwargs):
"""Initialize file to be written by chunks"""
if self.path is None: #pragma: no cover
raise ValueError("TableHandle.write() called but path has not been specified")
self.groups, self.fileObj = self._initialize_write(self.data, os.path.expandvars(self.path), data_lenght, **kwargs)
@classmethod
def _initialize_write(cls, data, path, data_lenght, **kwargs):
raise NotImplementedError("DataHandle._initialize_write") #pragma: no cover
[docs] def write_chunk(self, start, end, **kwargs):
"""Write the data to the associatied file """
if self.data is None:
raise ValueError(f"TableHandle.write_chunk() called for path {self.path} with no data")
if self.fileObj is None:
raise ValueError(f"TableHandle.write_chunk() called before open for {self.tag} : {self.path}")
return self._write_chunk(self.data, self.fileObj, self.groups, start, end, **kwargs)
@classmethod
def _write_chunk(cls, data, fileObj, groups, start, end, **kwargs):
raise NotImplementedError("DataHandle._write_chunk") #pragma: no cover
[docs] def finalize_write(self, **kwargs):
"""Finalize and close file written by chunks"""
if self.fileObj is None: #pragma: no cover
raise ValueError(f"TableHandle.finalize_wite() called before open for {self.tag} : {self.path}")
self._finalize_write(self.data, self.fileObj, **kwargs)
@classmethod
def _finalize_write(cls, data, fileObj, **kwargs):
raise NotImplementedError("DataHandle._finalize_write") #pragma: no cover
[docs] def iterator(self, **kwargs):
"""Iterator over the data"""
#if self.data is not None:
# for i in range(1):
# yield i, -1, self.data
return self._iterator(self.path, **kwargs)
[docs] def set_data(self, data, partial=False):
"""Set the data for a chunk, and set the partial flag to true"""
self.data = data
self.partial = partial
[docs] def size(self, **kwargs):
"""Return the size of the data associated to this handle"""
return self._size(self.path, **kwargs)
@classmethod
def _size(cls, path, **kwargs):
raise NotImplementedError("DataHandle._size") #pragma: no cover
@classmethod
def _iterator(cls, path, **kwargs):
raise NotImplementedError("DataHandle._iterator") #pragma: no cover
@property
def has_data(self):
"""Return true if the data for this handle are loaded """
return self.data is not None
@property
def has_path(self):
"""Return true if the path for the associated file is defined """
return self.path is not None
@property
def is_written(self):
"""Return true if the associated file has been written """
if self.path is None:
return False
return os.path.exists(os.path.expandvars(self.path))
def __str__(self):
s = f"{type(self)} "
if self.has_path:
s += f"{self.path}, ("
else:
s += "None, ("
if self.is_written:
s += "w"
if self.has_data:
s += "d"
s += ")"
return s
[docs] @classmethod
def make_name(cls, tag):
"""Construct and return file name for a particular data tag """
if cls.suffix:
return f"{tag}.{cls.suffix}"
else:
return tag #pragma: no cover
[docs]class TableHandle(DataHandle):
"""DataHandle for single tables of data
"""
suffix = None
@classmethod
def _open(cls, path, **kwargs):
"""Open and return the associated file
Notes
-----
This will simply open the file and return a file-like object to the caller.
It will not read or cache the data
"""
return tables_io.io.io_open(path, **kwargs) #pylint: disable=no-member
@classmethod
def _read(cls, path, **kwargs):
"""Read and return the data from the associated file """
return tables_io.read(path, **kwargs)
@classmethod
def _write(cls, data, path, **kwargs):
"""Write the data to the associatied file """
return tables_io.write(data, path, **kwargs)
@classmethod
def _size(cls, path, **kwargs):
return tables_io.io.getInputDataLengthHdf5(path, **kwargs)
@classmethod
def _iterator(cls, path, **kwargs):
"""Iterate over the data"""
return tables_io.iteratorNative(path, **kwargs)
[docs]class Hdf5Handle(TableHandle):
"""DataHandle for a table written to HDF5"""
suffix = 'hdf5'
@classmethod
def _initialize_write(cls, data, path, data_lenght, **kwargs):
initial_dict = cls._get_allocation_kwds(data, data_lenght)
comm = kwargs.get('communicator', None)
group, fout = tables_io.io.initializeHdf5WriteSingle(path, groupname=None, comm=comm, **initial_dict)
return group, fout
@classmethod
def _get_allocation_kwds(cls, data, data_lenght):
keywords = {}
for key, array in data.items():
shape = list(array.shape)
shape[0] = data_lenght
keywords[key] = (shape, array.dtype)
return keywords
@classmethod
def _write_chunk(cls, data, fileObj, groups, start, end, **kwargs):
tables_io.io.writeDictToHdf5ChunkSingle(fileObj, data, start, end, **kwargs)
@classmethod
def _finalize_write(cls, data, fileObj, **kwargs):
return tables_io.io.finalizeHdf5Write(fileObj, **kwargs)
[docs]class FitsHandle(TableHandle):
"""DataHandle for a table written to fits"""
suffix = 'fits'
[docs]class PqHandle(TableHandle):
"""DataHandle for a parquet table"""
suffix = 'pq'
[docs]class QPHandle(DataHandle):
"""DataHandle for qp ensembles
"""
suffix = 'hdf5'
@classmethod
def _open(cls, path, **kwargs):
"""Open and return the associated file
Notes
-----
This will simply open the file and return a file-like object to the caller.
It will not read or cache the data
"""
return tables_io.io.io_open(path, **kwargs) #pylint: disable=no-member
@classmethod
def _read(cls, path, **kwargs):
"""Read and return the data from the associated file """
return qp.read(path)
@classmethod
def _write(cls, data, path, **kwargs):
"""Write the data to the associatied file """
return data.write_to(path)
@classmethod
def _initialize_write(cls, data, path, data_lenght, **kwargs):
comm = kwargs.get('communicator', None)
return data.initializeHdf5Write(path, data_lenght, comm)
@classmethod
def _write_chunk(cls, data, fileObj, groups, start, end, **kwargs):
return data.writeHdf5Chunk(fileObj, start, end)
@classmethod
def _finalize_write(cls, data, fileObj, **kwargs):
return data.finalizeHdf5Write(fileObj)
[docs]def default_model_read(modelfile):
"""Default function to read model files, simply used pickle.load"""
return pickle.load(open(modelfile, 'rb'))
[docs]def default_model_write(model, path):
"""Write the model, this default implementation uses pickle"""
with open(path, 'wb') as fout:
pickle.dump(obj=model, file=fout, protocol=pickle.HIGHEST_PROTOCOL)
[docs]class ModelDict(dict):
"""
A specialized dict to keep track of individual estimation models objects: this is just a dict these additional features
1. Keys are paths
2. There is a read(path, force=False) method that reads a model object and inserts it into the dictionary
3. There is a single static instance of this class
"""
[docs] def open(self, path, mode, **kwargs): #pylint: disable=no-self-use
"""Open the file and return the file handle"""
return open(path, mode, **kwargs)
[docs] def read(self, path, force=False, reader=None, **kwargs): #pylint: disable=unused-argument
"""Read a model into this dict"""
if reader is None:
reader = default_model_read
if force or path not in self:
model = reader(path)
self.__setitem__(path, model)
return model
return self[path]
[docs] def write(self, model, path, force=False, writer=None, **kwargs): #pylint: disable=unused-argument
"""Write the model, this default implementation uses pickle"""
if writer is None:
writer = default_model_write
if force or path not in self:
self.__setitem__(path, model)
writer(model, path)
[docs]class ModelHandle(DataHandle):
"""DataHandle for machine learning models
"""
suffix = 'pkl'
model_factory = ModelDict()
@classmethod
def _open(cls, path, **kwargs):
"""Open and return the associated file
"""
kwcopy = kwargs.copy()
if kwcopy.pop('mode', 'r') == 'w':
return cls.model_factory.open(path, mode='wb', **kwcopy)
return cls.model_factory.read(path, **kwargs)
@classmethod
def _read(cls, path, **kwargs):
"""Read and return the data from the associated file """
return cls.model_factory.read(path, **kwargs)
@classmethod
def _write(cls, data, path, **kwargs):
"""Write the data to the associatied file """
return cls.model_factory.write(data, path, **kwargs)
[docs]class FlowDict(dict):
"""
A specialized dict to keep track of individual flow objects: this is just a dict these additional features
1. Keys are paths
2. Values are flow objects, this is checked at runtime.
3. There is a read(path, force=False) method that reads a flow object and inserts it into the dictionary
4. There is a single static instance of this class
"""
def __setitem__(self, key, value):
""" Add a key-value pair, and check to make sure that the value is a `Flow` object """
from pzflow import Flow
if not isinstance(value, Flow): #pragma: no cover
raise TypeError(f"Only values of type Flow can be added to a FlowFactory, not {type(value)}")
return dict.__setitem__(self, key, value)
[docs] def read(self, path, force=False):
""" Read a `Flow` object from disk and add it to this dictionary """
from pzflow import Flow
if force or path not in self:
flow = Flow(file=path)
self.__setitem__(path, flow)
return flow
return self[path] #pragma: no cover
[docs]class FlowHandle(ModelHandle):
"""
A wrapper around a file that describes a PZFlow object
"""
flow_factory = FlowDict()
suffix = 'pkl'
@classmethod
def _open(cls, path, **kwargs): #pylint: disable=unused-argument
if kwargs.get('mode', 'r') == 'w': #pragma: no cover
raise NotImplementedError("Use FlowHandle.write(), not FlowHandle.open(mode='w')")
return cls.flow_factory.read(path)
@classmethod
def _read(cls, path, **kwargs):
"""Read and return the data from the associated file """
return cls.flow_factory.read(path, **kwargs)
@classmethod
def _write(cls, data, path, **kwargs):
return data.save(path)
[docs]class DataStore(dict):
"""Class to provide a transient data store
This class:
1) associates data products with keys
2) provides functions to read and write the various data produces to associated files
"""
allow_overwrite = False
def __init__(self, **kwargs):
""" Build from keywords
Note
----
All of the values must be data handles of this will raise a TypeError
"""
dict.__init__(self)
for key, val in kwargs.items():
self[key] = val
def __str__(self):
""" Override __str__ casting to deal with `TableHandle` objects in the map """
s = "{"
for key, val in self.items():
s += f" {key}:{val}\n"
s += "}"
return s
def __repr__(self):
""" A custom representation """
s = "DataStore\n"
s += self.__str__()
return s
def __setitem__(self, key, value):
""" Override the __setitem__ to work with `TableHandle` """
if not isinstance(value, DataHandle):
raise TypeError(f"Can only add objects of type DataHandle to DataStore, not {type(value)}")
check = self.get(key)
if check is not None and not self.allow_overwrite:
raise ValueError(f"DataStore already has an item with key {key}, of type {type(check)}, created by {check.creator}")
dict.__setitem__(self, key, value)
return value
def __getattr__(self, key):
""" Allow attribute-like parameter access """
try:
return self.__getitem__(key)
except KeyError as msg:
# Kludge to get docstrings to work
if key in ['__objclass__']: #pragma: no cover
return None
raise KeyError from msg
def __setattr__(self, key, value):
""" Allow attribute-like parameter setting """
return self.__setitem__(key, value)
[docs] def add_data(self, key, data, handle_class, path=None, creator='DataStore'):
""" Create a handle for some data, and insert it into the DataStore """
handle = handle_class(key, path=path, data=data, creator=creator)
self[key] = handle
return handle
[docs] def read_file(self, key, handle_class, path, creator='DataStore', **kwargs):
""" Create a handle, use it to read a file, and insert it into the DataStore """
handle = handle_class(key, path=path, data=None, creator=creator)
handle.read(**kwargs)
self[key] = handle
return handle
[docs] def read(self, key, force=False, **kwargs):
""" Read the data associated to a particular key """
try:
handle = self[key]
except KeyError as msg:
raise KeyError(f"Failed to read data {key} because {msg}") from msg
return handle.read(force, **kwargs)
[docs] def open(self, key, mode='r', **kwargs):
""" Open and return the file associated to a particular key """
try:
handle = self[key]
except KeyError as msg:
raise KeyError(f"Failed to open data {key} because {msg}") from msg
return handle.open(mode=mode, **kwargs)
[docs] def write(self, key, **kwargs):
""" Write the data associated to a particular key """
try:
handle = self[key]
except KeyError as msg:
raise KeyError(f"Failed to write data {key} because {msg}") from msg
return handle.write(**kwargs)
[docs] def write_all(self, force=False, **kwargs):
""" Write all the data in this DataStore """
for key, handle in self.items():
local_kwargs = kwargs.get(key, {})
if handle.is_written and not force:
continue
handle.write(**local_kwargs)
_DATA_STORE = DataStore()
[docs]def DATA_STORE():
"""Return the factory instance"""
return _DATA_STORE