forml.pipeline.payload¶
General payload manipulation utilities.
ForML is by design fairly payload-format agnostic leaving the choice of compatible operators/actors to the implementer.
This module provides a number of generic payload-related operators to be parameterized with particular actor implementations targeting different payload formats.
Note
For convenience, there is also a couple of payload-specific actors designed to be engaged
only with that particular payload format (typically pandas.DataFrame). This
does not make that format any more preferable from the general ForML perspective as it still
maintains its payload format neutrality.
Functions
- forml.pipeline.payload.pandas_params(method: Callable[[Actor, NDFrame], Any]) Callable[[Actor, Any], Any][source]¶
Decorator for converting the decorated actor method input parameters to Pandas format.
The parameters will be converted to
pandas.DataFrameorpandas.Seriesdepending on the dimensionality.- Parameters:
- Returns:
Decorated method converting its input payload to Pandas.
Warning
The conversion is attempted using an internal logic - if unsuccessful, the payload is passed through unchanged emitting a warning.
Examples:
class Concat(flow.Actor[pdtype.NDFrame, None, pandas.DataFrame]): @payload.pandas_params def apply(self, *features: pdtype.NDFrame) -> pandas.DataFrame: return pandas.concat(features)Todo
Make the internal conversion logic customizable.
Classes
- class forml.pipeline.payload.Apply(*, function: collections.abc.Callable[..., flow.Features])[source]¶
Bases:
ActorGeneric stateless function-based transformer actor.
It can work with an arbitrary number of M:N input/output ports depending on the topology implemented by the owning operator.
- Parameters:
- function: collections.abc.Callable[..., flow.Features]¶
Callable transformer to be applied to the data.
- Returns:
Result of the function applied to the input data.
Examples
>>> APPLY = payload.Apply(function=lambda df: df.dropna())
- class forml.pipeline.payload.CrossValidable(*args, **kwargs)[source]¶
Bases:
Protocol[Features,Labels,Column]Protocol for implementing the cross-validation splitters.
This matches for example all the particular
SKLearn Splitter Classes.
-
class forml.pipeline.payload.CVFoldable(crossvalidator: payload.CrossValidable[flow.Features, flow.Labels, Column], groups_extractor: Callable[[flow.Features], Column] | None =
None)[source]¶ Bases:
Generic[Features,Labels,Column],Actor[Features,Labels,Sequence[Features]]Abstract actor splitting the flow into N-fold train-test branches based on the provided cross-validator.
The actor keeps all the generated indices in its internal state so that it can be used repeatedly for example to split data and labels separately while in sync.
It represents a topology of 1:2N input/output ports. The splits are provided as a range of output ports where a given fold with index
iis delivered via ports:[2 * i]- trainset[2 * i + 1]- testset
- Parameters:
- crossvalidator: payload.CrossValidable[flow.Features, flow.Labels, Column]¶
Particular generator of the cross-validation indexes to be used for the splitting.
- groups_extractor: Callable[[flow.Features], Column] | None =
None¶ Optional callable to be applied to the data to extract the group membership vector.
Following is the abstract method that needs to be defined in the implementing classes:
-
class forml.pipeline.payload.Dump(apply: flow.Builder[payload.Dumpable] =
PandasCSVDumper.builder(), train: flow.Builder[payload.Dumpable] | None =None, *, path: str | pathlib.Path | None =None)[source]¶ Bases:
OperatorA transparent operator that dumps the input dataset externally (typically to a file) before passing it downstream.
If supplied as a template, the operator supports interpolation of potential placeholders in the dump path (e.g. file name). The supported placeholders are:
$seq- a sequence ID that gets incremented for each particular Actor instance$mode- a label of the mode in which the dumping occurs (trainorapply)
The path (or path template) must be provided either within the raw builder parameters or as the standalone
pathparameter which is used as a fallback option.- Parameters:
- apply: flow.Builder[payload.Dumpable] =
PandasCSVDumper.builder()¶ Dumpablebuilder for instantiating a Dumper operator to be used in apply-mode.- train: flow.Builder[payload.Dumpable] | None =
None¶ Optional
Dumpablebuilder for instantiating a Dumper operator to be used in train-mode (otherwise using the same one as for the apply-mode).- path: str | pathlib.Path | None =
None¶ Optional path template.
- apply: flow.Builder[payload.Dumpable] =
- Raises:
TypeError – If path is provided neither in the
apply/trainbuilders nor as the explicit parameter.
Examples
>>> PIPELINE = ( ... preprocessing.Action1() ... >> payload.Dump(path='/tmp/foobar/post_action1-$mode-$seq.csv') ... >> preprocessing.Action2() ... >> payload.Dump(path='/tmp/foobar/post_action2-$mode-$seq.csv') ... >> model.SomeModel() ... )
- class forml.pipeline.payload.Dumpable(path: str | Path, **kwargs)[source]¶
Bases:
Actor[Features,Labels,Result]Transparent actor interface that dumps the input dataset (typically to a file).
- Parameters:
- path: str | Path¶
Target path to be used for dumping the content.
- **kwargs¶
Optional keyword arguments to be passed to the
apply_dump()and/ortrain_dump()methods.
Following are the methods that can be overloaded with the actual dump action (no-op otherwise).
-
class forml.pipeline.payload.MapReduce(*mappers: flow.Builder, reducer: flow.Builder =
PandasConcat.builder())[source]¶ Bases:
OperatorOperator for applying parallel (possibly stateful) mapper actors and combining their outputs using a final (stateless) reducer.
- Parameters:
- *mappers: flow.Builder¶
Builders for individual mapper actors to be applied in parallel.
- reducer: flow.Builder =
PandasConcat.builder()¶ Builder for a final reducer actor. The actor needs to accept as many inputs as many mappers are provided. Defaults to
payload.PandasConcat.
Examples
>>> MAPRED = payload.MapReduce( ... payload.PandasSelect.builder(columns=['foo']), ... payload.PandasDrop.builder(columns=['foo']), ... )
- class forml.pipeline.payload.PandasConcat(**kwargs)[source]¶
Bases:
ActorStateless concatenation actor based on
pandas.concat().It plugs into a topology of N:1 input/output ports.
- Parameters:
- **kwargs¶
Any keywords accepted by
pandas.concat().
Examples
>>> CONCAT = payload.PandasConcat(axis='columns', ignore_index=False)
-
class forml.pipeline.payload.PandasCSVDumper(path: str | pathlib.Path, label_header: str =
'Label', converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] =pandas_read, **kwargs)[source]¶ Bases:
Dumpable[Any,Any,Any]A pass-through transformer that dumps the input datasets to CSV files.
The write operation including the CSV encoding is implemented using the
pandas.DataFrame.to_csv()method.The input payload is automatically converted to Pandas using the provided converter implementation (defaults to internal logic).
- Parameters:
- path: str | pathlib.Path¶
Target path to be used for dumping the content.
- label_header: str =
'Label'¶ Column name to be used for the train labels.
- converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] =
pandas_read¶ Optional callback to be used for converting the payload to Pandas.
- **kwargs¶
Optional keyword arguments to be passed to the
pandas.DataFrame.to_csv()method.
-
class forml.pipeline.payload.PandasCVFolds(crossvalidator: payload.CrossValidable[flow.Features, flow.Labels, Column], groups_extractor: Callable[[flow.Features], Column] | None =
None)[source]¶ Bases:
CVFoldable[DataFrame,NDFrame,Series]Cross-validation splitter of train-test folds working with Pandas payloads.
See the
payload.CVFoldablebase class for more details and the synopsis.
- class forml.pipeline.payload.PandasDrop(*, columns: collections.abc.Sequence[str])[source]¶
Bases:
ActorStateless mapper actor for
pandas.DataFramecolumn dropping.- Parameters:
- columns: collections.abc.Sequence[str]¶
Sequence of columns to be dropped.
- Returns:
New DataFrame without the dropped columns.
Examples
>>> DROP = payload.PandasDrop(columns=['foo', 'bar'])
- class forml.pipeline.payload.PandasSelect(*, columns: collections.abc.Sequence[str])[source]¶
Bases:
ActorStateless mapper actor for
pandas.DataFramecolumn selection.- Parameters:
- columns: collections.abc.Sequence[str]¶
Sequence of columns to be selected.
- Returns:
New DataFrame with only the selected columns.
Examples
>>> SELECT = payload.PandasSelect(columns=['foo', 'bar'])
- class forml.pipeline.payload.Sniff[source]¶
Bases:
OperatorDebugging operator for capturing the passing payload and exposing it using the
Sniff.Value.Futureinstance provided when used as a context manager.Without the context, the operator acts as a transparent identity pass-through operator.
The typical use case is in combination with the
runtime.virtuallauncher and the interactive mode.Examples
>>> SNIFFER = payload.Sniff() >>> with SNIFFER as future: ... SOURCE.bind(PIPELINE >> SNIFFER >> ANOTHER).launcher.train() >>> future.result()[0]
-
class forml.pipeline.payload.ToPandas(data: Any, *, columns: collections.abc.Sequence[str] | None =
None, converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] =pandas_read)[source]¶ Bases:
OperatorSimple 1:1 operator that attempts to convert the data on each of apply/train/label segments to Pandas
DataFrame/Series.- Parameters:
- data: Any¶
Input data.
- columns: collections.abc.Sequence[str] | None =
None¶ Optional column names.
- converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] =
pandas_read¶ Optional function to be used for attempting the conversion. It will receive two parameters - the data and the column names (if provided).
Warning
The default converter is using an internal logic - if unsuccessful, the payload is passed through unchanged emitting a warning.
- Returns:
Examples
>>> SOURCE >>= payload.ToPandas( ... columns=[f.name for f in SOURCE.extract.train.schema] ... )