forml.pipeline.payload¶
General payload manipulation utilities.
ForML is by design fairly payload-format agnostic leaving the choice of compatible operators/actors to the implementer.
This module provides a number of generic payload-related operators to be parameterized with particular actor implementations targeting different payload formats.
Note
For convenience, there is also a couple of payload-specific actors designed to be engaged
only with that particular payload format (typically pandas.DataFrame
). This
does not make that format any more preferable from the general ForML perspective as it still
maintains its payload format neutrality.
Functions
- forml.pipeline.payload.pandas_params(method: Callable[[Actor, NDFrame], Any]) Callable[[Actor, Any], Any] [source]¶
Decorator for converting the decorated actor method input parameters to Pandas format.
The parameters will be converted to
pandas.DataFrame
orpandas.Series
depending on the dimensionality.- Parameters:
- Returns:
Decorated method converting its input payload to Pandas.
Warning
The conversion is attempted using an internal logic - if unsuccessful, the payload is passed through unchanged emitting a warning.
Examples:
class Concat(flow.Actor[pdtype.NDFrame, None, pandas.DataFrame]): @payload.pandas_params def apply(self, *features: pdtype.NDFrame) -> pandas.DataFrame: return pandas.concat(features)
Todo
Make the internal conversion logic customizable.
Classes
- class forml.pipeline.payload.Apply(*, function: collections.abc.Callable[..., flow.Features])[source]¶
Bases:
Actor
Generic stateless function-based transformer actor.
It can work with an arbitrary number of M:N input/output ports depending on the topology implemented by the owning operator.
- Parameters:
- function: collections.abc.Callable[..., flow.Features]¶
Callable transformer to be applied to the data.
- Returns:
Result of the function applied to the input data.
Examples
>>> APPLY = payload.Apply(function=lambda df: df.dropna())
- class forml.pipeline.payload.CrossValidable(*args, **kwargs)[source]¶
Bases:
Protocol
[Features
,Labels
,Column
]Protocol for implementing the cross-validation splitters.
This matches for example all the particular
SKLearn Splitter Classes
.
-
class forml.pipeline.payload.CVFoldable(crossvalidator: payload.CrossValidable[flow.Features, flow.Labels, Column], groups_extractor: Callable[[flow.Features], Column] | None =
None
)[source]¶ Bases:
Generic
[Features
,Labels
,Column
],Actor
[Features
,Labels
,Sequence
[Features
]]Abstract actor splitting the flow into N-fold train-test branches based on the provided cross-validator.
The actor keeps all the generated indices in its internal state so that it can be used repeatedly for example to split data and labels separately while in sync.
It represents a topology of 1:2N input/output ports. The splits are provided as a range of output ports where a given fold with index
i
is delivered via ports:[2 * i]
- trainset[2 * i + 1]
- testset
- Parameters:
- crossvalidator: payload.CrossValidable[flow.Features, flow.Labels, Column]¶
Particular generator of the cross-validation indexes to be used for the splitting.
- groups_extractor: Callable[[flow.Features], Column] | None =
None
¶ Optional callable to be applied to the data to extract the group membership vector.
Following is the abstract method that needs to be defined in the implementing classes:
-
class forml.pipeline.payload.Dump(apply: flow.Builder[payload.Dumpable] =
PandasCSVDumper.builder()
, train: flow.Builder[payload.Dumpable] | None =None
, *, path: str | pathlib.Path | None =None
)[source]¶ Bases:
Operator
A transparent operator that dumps the input dataset externally (typically to a file) before passing it downstream.
If supplied as a template, the operator supports interpolation of potential placeholders in the dump path (e.g. file name). The supported placeholders are:
$seq
- a sequence ID that gets incremented for each particular Actor instance$mode
- a label of the mode in which the dumping occurs (train
orapply
)
The path (or path template) must be provided either within the raw builder parameters or as the standalone
path
parameter which is used as a fallback option.- Parameters:
- apply: flow.Builder[payload.Dumpable] =
PandasCSVDumper.builder()
¶ Dumpable
builder for instantiating a Dumper operator to be used in apply-mode.- train: flow.Builder[payload.Dumpable] | None =
None
¶ Optional
Dumpable
builder for instantiating a Dumper operator to be used in train-mode (otherwise using the same one as for the apply-mode).- path: str | pathlib.Path | None =
None
¶ Optional path template.
- apply: flow.Builder[payload.Dumpable] =
- Raises:
TypeError – If path is provided neither in the
apply
/train
builders nor as the explicit parameter.
Examples
>>> PIPELINE = ( ... preprocessing.Action1() ... >> payload.Dump(path='/tmp/foobar/post_action1-$mode-$seq.csv') ... >> preprocessing.Action2() ... >> payload.Dump(path='/tmp/foobar/post_action2-$mode-$seq.csv') ... >> model.SomeModel() ... )
- class forml.pipeline.payload.Dumpable(path: str | Path, **kwargs)[source]¶
Bases:
Actor
[Features
,Labels
,Result
]Transparent actor interface that dumps the input dataset (typically to a file).
- Parameters:
- path: str | Path¶
Target path to be used for dumping the content.
- **kwargs¶
Optional keyword arguments to be passed to the
apply_dump()
and/ortrain_dump()
methods.
Following are the methods that can be overloaded with the actual dump action (no-op otherwise).
-
class forml.pipeline.payload.MapReduce(*mappers: flow.Builder, reducer: flow.Builder =
PandasConcat.builder()
)[source]¶ Bases:
Operator
Operator for applying parallel (possibly stateful) mapper actors and combining their outputs using a final (stateless) reducer.
- Parameters:
- *mappers: flow.Builder¶
Builders for individual mapper actors to be applied in parallel.
- reducer: flow.Builder =
PandasConcat.builder()
¶ Builder for a final reducer actor. The actor needs to accept as many inputs as many mappers are provided. Defaults to
payload.PandasConcat
.
Examples
>>> MAPRED = payload.MapReduce( ... payload.PandasSelect.builder(columns=['foo']), ... payload.PandasDrop.builder(columns=['foo']), ... )
- class forml.pipeline.payload.PandasConcat(**kwargs)[source]¶
Bases:
Actor
Stateless concatenation actor based on
pandas.concat()
.It plugs into a topology of N:1 input/output ports.
- Parameters:
- **kwargs¶
Any keywords accepted by
pandas.concat()
.
Examples
>>> CONCAT = payload.PandasConcat(axis='columns', ignore_index=False)
-
class forml.pipeline.payload.PandasCSVDumper(path: str | pathlib.Path, label_header: str =
'Label'
, converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] =pandas_read
, **kwargs)[source]¶ Bases:
Dumpable
[Any
,Any
,Any
]A pass-through transformer that dumps the input datasets to CSV files.
The write operation including the CSV encoding is implemented using the
pandas.DataFrame.to_csv()
method.The input payload is automatically converted to Pandas using the provided converter implementation (defaults to internal logic).
- Parameters:
- path: str | pathlib.Path¶
Target path to be used for dumping the content.
- label_header: str =
'Label'
¶ Column name to be used for the train labels.
- converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] =
pandas_read
¶ Optional callback to be used for converting the payload to Pandas.
- **kwargs¶
Optional keyword arguments to be passed to the
pandas.DataFrame.to_csv()
method.
-
class forml.pipeline.payload.PandasCVFolds(crossvalidator: payload.CrossValidable[flow.Features, flow.Labels, Column], groups_extractor: Callable[[flow.Features], Column] | None =
None
)[source]¶ Bases:
CVFoldable
[DataFrame
,NDFrame
,Series
]Cross-validation splitter of train-test folds working with Pandas payloads.
See the
payload.CVFoldable
base class for more details and the synopsis.
- class forml.pipeline.payload.PandasDrop(*, columns: collections.abc.Sequence[str])[source]¶
Bases:
Actor
Stateless mapper actor for
pandas.DataFrame
column dropping.- Parameters:
- columns: collections.abc.Sequence[str]¶
Sequence of columns to be dropped.
- Returns:
New DataFrame without the dropped columns.
Examples
>>> DROP = payload.PandasDrop(columns=['foo', 'bar'])
- class forml.pipeline.payload.PandasSelect(*, columns: collections.abc.Sequence[str])[source]¶
Bases:
Actor
Stateless mapper actor for
pandas.DataFrame
column selection.- Parameters:
- columns: collections.abc.Sequence[str]¶
Sequence of columns to be selected.
- Returns:
New DataFrame with only the selected columns.
Examples
>>> SELECT = payload.PandasSelect(columns=['foo', 'bar'])
- class forml.pipeline.payload.Sniff[source]¶
Bases:
Operator
Debugging operator for capturing the passing payload and exposing it using the
Sniff.Value.Future
instance provided when used as a context manager.Without the context, the operator acts as a transparent identity pass-through operator.
The typical use case is in combination with the
runtime.virtual
launcher and the interactive mode.Examples
>>> SNIFFER = payload.Sniff() >>> with SNIFFER as future: ... SOURCE.bind(PIPELINE >> SNIFFER >> ANOTHER).launcher.train() >>> future.result()[0]
-
class forml.pipeline.payload.ToPandas(data: Any, *, columns: collections.abc.Sequence[str] | None =
None
, converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] =pandas_read
)[source]¶ Bases:
Operator
Simple 1:1 operator that attempts to convert the data on each of apply/train/label segments to Pandas
DataFrame
/Series
.- Parameters:
- data: Any¶
Input data.
- columns: collections.abc.Sequence[str] | None =
None
¶ Optional column names.
- converter: collections.abc.Callable[[Any, collections.abc.Sequence[str] | None], pandas.core.generic.NDFrame] =
pandas_read
¶ Optional function to be used for attempting the conversion. It will receive two parameters - the data and the column names (if provided).
Warning
The default converter is using an internal logic - if unsuccessful, the payload is passed through unchanged emitting a warning.
- Returns:
Examples
>>> SOURCE >>= payload.ToPandas( ... columns=[f.name for f in SOURCE.extract.train.schema] ... )