forml.pipeline.payload

General payload manipulation utilities.

ForML is by design fairly payload-format agnostic leaving the choice of compatible operators/actors to the implementer.

This module provides a number of generic payload-related operators to be parameterized with particular actor implementations targeting different payload formats.

Note

For convenience, there is also a couple of payload-specific actors designed to be engaged only with that particular payload format (typically pandas.DataFrame). This does not make that format any more preferable from the general ForML perspective as it still maintains its payload format neutrality.

Functions

forml.pipeline.payload.pandas_params(method: Callable[[Actor, NDFrame], Any]) Callable[[Actor, Any], Any][source]

Decorator for converting the decorated actor method input parameters to Pandas format.

The parameters will be converted to pandas.DataFrame or pandas.Series depending on the dimensionality.

Parameters:
method: Callable[[Actor, NDFrame], Any]

Actor method to be decorated expecting input to be in Pandas format.

Returns:

Decorated method converting its input payload to Pandas.

Warning

The conversion is attempted using an internal logic - if unsuccessful, the payload is passed through unchanged emitting a warning.

Examples:

class Concat(flow.Actor[pdtype.NDFrame, None, pandas.DataFrame]):

    @payload.pandas_params
    def apply(self, *features: pdtype.NDFrame) -> pandas.DataFrame:
        return pandas.concat(features)

Todo

Make the internal conversion logic customizable.

Classes

class forml.pipeline.payload.Apply(*, function: collections.abc.Callable[..., flow.Features])[source]

Bases: Actor

Generic stateless function-based transformer actor.

It can work with an arbitrary number of M:N input/output ports depending on the topology implemented by the owning operator.

Parameters:
function: collections.abc.Callable[..., flow.Features]

Callable transformer to be applied to the data.

Returns:

Result of the function applied to the input data.

Examples

>>> APPLY = payload.Apply(function=lambda df: df.dropna())
class forml.pipeline.payload.CrossValidable(*args, **kwargs)[source]

Bases: Protocol[Features, Labels, Column]

Protocol for implementing the cross-validation splitters.

This matches for example all the particular SKLearn Splitter Classes.

split(features, labels=None, groups=None, /)[source]

Generate indices to split data into training and test set.

Parameters:
features

Train features data.

labels=None

Target data.

groups=None

Group membership vector indicating which group each record belongs to.

Returns:

Iterable of tuples of train/test indexes.

get_n_splits(features, labels=None, groups=None, /)[source]

Get the number of splitting iterations in the cross-validator.

Parameters:
features

Train features data.

labels=None

Target data.

groups=None

Group membership vector indicating which group each record belongs to.

Returns:

Number of folds.

class forml.pipeline.payload.CVFoldable(crossvalidator: payload.CrossValidable[flow.Features, flow.Labels, Column], groups_extractor: Callable[[flow.Features], Column] | None = None)[source]

Bases: Generic[Features, Labels, Column], Actor[Features, Labels, Sequence[Features]]

Abstract actor splitting the flow into N-fold train-test branches based on the provided cross-validator.

The actor keeps all the generated indices in its internal state so that it can be used repeatedly for example to split data and labels separately while in sync.

It represents a topology of 1:2N input/output ports. The splits are provided as a range of output ports where a given fold with index i is delivered via ports:

  • [2 * i] - trainset

  • [2 * i + 1] - testset

Parameters:
crossvalidator: payload.CrossValidable[flow.Features, flow.Labels, Column]

Particular generator of the cross-validation indexes to be used for the splitting.

groups_extractor: Callable[[flow.Features], Column] | None = None

Optional callable to be applied to the data to extract the group membership vector.

Following is the abstract method that needs to be defined in the implementing classes:

split(features, indices)[source]

Splitting implementation.

Parameters:
features

Source features to split.

indices

Sequence of fold indices to split by.

Returns:

Sequence of repeated train, test, train, test, … sets of split fold indexes.

class forml.pipeline.payload.Dump(apply: flow.Builder[payload.Dumpable] = PandasCSVDumper.builder(), train: flow.Builder[payload.Dumpable] | None = None, *, path: str | pathlib.Path | None = None)[source]

Bases: Operator

A transparent operator that dumps the input dataset externally (typically to a file) before passing it downstream.

If supplied as a template, the operator supports interpolation of potential placeholders in the dump path (e.g. file name). The supported placeholders are:

  • $seq - a sequence ID that gets incremented for each particular Actor instance

  • $mode - a label of the mode in which the dumping occurs (train or apply)

The path (or path template) must be provided either within the raw builder parameters or as the standalone path parameter which is used as a fallback option.

Parameters:
apply: flow.Builder[payload.Dumpable] = PandasCSVDumper.builder()

Dumpable builder for instantiating a Dumper operator to be used in apply-mode.

train: flow.Builder[payload.Dumpable] | None = None

Optional Dumpable builder for instantiating a Dumper operator to be used in train-mode (otherwise using the same one as for the apply-mode).

path: str | pathlib.Path | None = None

Optional path template.

Raises:

TypeError – If path is provided neither in the apply/train builders nor as the explicit parameter.

Examples

>>> PIPELINE = (
...     preprocessing.Action1()
...     >> payload.Dump(path='/tmp/foobar/post_action1-$mode-$seq.csv')
...     >> preprocessing.Action2()
...     >> payload.Dump(path='/tmp/foobar/post_action2-$mode-$seq.csv')
...     >> model.SomeModel()
... )
class forml.pipeline.payload.Dumpable(path: str | Path, **kwargs)[source]

Bases: Actor[Features, Labels, Result]

Transparent actor interface that dumps the input dataset (typically to a file).

Parameters:
path: str | Path

Target path to be used for dumping the content.

**kwargs

Optional keyword arguments to be passed to the apply_dump() and/or train_dump() methods.

Following are the methods that can be overloaded with the actual dump action (no-op otherwise).

apply_dump(features, path, **kwargs)[source]

Dump the features when in the apply-mode.

Parameters:
features

Input data.

path

Target dump location.

**kwargs

Additional keyword arguments supplied via constructor.

train_dump(features, labels, path, **kwargs)[source]

Dump the features and labels when in the train-mode.

Parameters:
features

Input features.

labels

Input labels.

path

Target dump location.

**kwargs

Additional keyword arguments supplied via constructor.

class forml.pipeline.payload.MapReduce(*mappers: flow.Builder, reducer: flow.Builder = PandasConcat.builder())[source]

Bases: Operator

Operator for applying parallel (possibly stateful) mapper actors and combining their outputs using a final (stateless) reducer.

Parameters:
*mappers: flow.Builder

Builders for individual mapper actors to be applied in parallel.

reducer: flow.Builder = PandasConcat.builder()

Builder for a final reducer actor. The actor needs to accept as many inputs as many mappers are provided. Defaults to payload.PandasConcat.

Examples

>>> MAPRED = payload.MapReduce(
...     payload.PandasSelect.builder(columns=['foo']),
...     payload.PandasDrop.builder(columns=['foo']),
... )
class forml.pipeline.payload.PandasConcat(**kwargs)[source]

Bases: Actor

Stateless concatenation actor based on pandas.concat().

It plugs into a topology of N:1 input/output ports.

Parameters:
**kwargs

Any keywords accepted by pandas.concat().

Examples

>>> CONCAT = payload.PandasConcat(axis='columns', ignore_index=False)
class forml.pipeline.payload.PandasCSVDumper(path: str | pathlib.Path, label_header: str = 'Label', converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] = pandas_read, **kwargs)[source]

Bases: Dumpable[Any, Any, Any]

A pass-through transformer that dumps the input datasets to CSV files.

The write operation including the CSV encoding is implemented using the pandas.DataFrame.to_csv() method.

The input payload is automatically converted to Pandas using the provided converter implementation (defaults to internal logic).

Parameters:
path: str | pathlib.Path

Target path to be used for dumping the content.

label_header: str = 'Label'

Column name to be used for the train labels.

converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] = pandas_read

Optional callback to be used for converting the payload to Pandas.

**kwargs

Optional keyword arguments to be passed to the pandas.DataFrame.to_csv() method.

class forml.pipeline.payload.PandasCVFolds(crossvalidator: payload.CrossValidable[flow.Features, flow.Labels, Column], groups_extractor: Callable[[flow.Features], Column] | None = None)[source]

Bases: CVFoldable[DataFrame, NDFrame, Series]

Cross-validation splitter of train-test folds working with Pandas payloads.

See the payload.CVFoldable base class for more details and the synopsis.

class forml.pipeline.payload.PandasDrop(*, columns: collections.abc.Sequence[str])[source]

Bases: Actor

Stateless mapper actor for pandas.DataFrame column dropping.

Parameters:
columns: collections.abc.Sequence[str]

Sequence of columns to be dropped.

Returns:

New DataFrame without the dropped columns.

Examples

>>> DROP = payload.PandasDrop(columns=['foo', 'bar'])
class forml.pipeline.payload.PandasSelect(*, columns: collections.abc.Sequence[str])[source]

Bases: Actor

Stateless mapper actor for pandas.DataFrame column selection.

Parameters:
columns: collections.abc.Sequence[str]

Sequence of columns to be selected.

Returns:

New DataFrame with only the selected columns.

Examples

>>> SELECT = payload.PandasSelect(columns=['foo', 'bar'])
class forml.pipeline.payload.Sniff[source]

Bases: Operator

Debugging operator for capturing the passing payload and exposing it using the Sniff.Value.Future instance provided when used as a context manager.

Without the context, the operator acts as a transparent identity pass-through operator.

The typical use case is in combination with the runtime.virtual launcher and the interactive mode.

Examples

>>> SNIFFER = payload.Sniff()
>>> with SNIFFER as future:
...     SOURCE.bind(PIPELINE >> SNIFFER >> ANOTHER).launcher.train()
>>> future.result()[0]
class forml.pipeline.payload.ToPandas(data: Any, *, columns: collections.abc.Sequence[str] | None = None, converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] = pandas_read)[source]

Bases: Operator

Simple 1:1 operator that attempts to convert the data on each of apply/train/label segments to Pandas DataFrame/Series.

Parameters:
data: Any

Input data.

columns: collections.abc.Sequence[str] | None = None

Optional column names.

converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] = pandas_read

Optional function to be used for attempting the conversion. It will receive two parameters - the data and the column names (if provided).

Warning

The default converter is using an internal logic - if unsuccessful, the payload is passed through unchanged emitting a warning.

Returns:

Pandas DataFrame/Series.

Examples

>>> SOURCE >>= payload.ToPandas(
...     columns=[f.name for f in SOURCE.extract.train.schema]
... )