Operator Architecture

Operators are the high-level workflow entities used for implementing the actual ML pipeline expressions. They can be seen as dynamic macro-instructions expanding the particular task graph based on their composition logic. This is a very powerful concept as it completely abstracts away the internal wiring complexity of the low-level task graph assembly providing a simple interface for the pipeline expressions.

If parameterized (rather than hard-coded) with the particular actor implementations, operators can be independent of the actual data types and formats as they deal purely with the topology. Therefore, many operators can be shared as library components turning advanced techniques into reusable commodity-like modules (see the ensembler).

Built upon the pipeline mode duality principle, operators always deliver the related task graphs for both the train and apply modes together. That’s how ForML enforces the train-predict integrity at every step of the workflow.

The operator layer also happens to be the ideal stage for carrying out unit testing. For this purpose, ForML provides a complete operator unit testing framework.

Generic Implementation

Operators can implement arbitrarily complex functionality based on any number of actors. They are using the logical topology structures to define the internal task graph and its composition with the preceding operators.

The base abstraction for implementing operators is the flow.Composable interface and the main flow.Operator base class:

class forml.flow.Composable[source]

Common interface for operators and expressions.

abstract compose(scope: flow.Composable) flow.Trunk[source]

Implementation of the internal task graph and its composition with the preceding part of the expression.

scope: flow.Composable

Preceding part of the expression that this operator is supposed to compose with.


Trunk instance representing the composed task graph.

abstract expand() flow.Trunk[source]

Compose this instance and the entire preceding part of the expression and return the resulting trunk.

This is typically called by a downstream operator (right side of the expression) within its compose() method where this is passed as part of the left side of the expression.


Trunk instance representing the composed task graph.

class forml.flow.Operator[source]

Bases: Composable

Base class for operator implementations.

Let’s explain the operator development process by implementing a typical Stateful Mapper operator. Conceptually, this operator works as follows:

  1. in the train-mode:

    1. it first gets trained (Task 1 - .train()) using the train features (via Train port) and labels (via Label port)

    2. then, using the state acquired during the training task, it maps (Task 2 - .apply()) the train features (via Apply input port) producing the transformed output (via Apply output port)

  2. in the apply-mode:

    1. again, using the state acquired during the training task, it maps (Task 3 - .apply()) this time the apply features (via Apply input port) producing the transformed output (via Apply output port)

The following diagram outlines the flows:

flowchart LR
    subgraph Mapper Worker Group
        tt -. state .-> ta & aa
    subgraph Trunk Heads
        ti((T)) --> tt & ta
        li((L)) -- L --> tt
        ai((A)) --> aa
    subgraph Trunk Tails
        ta --> to((T))
        li -- L --> lo((L))
        aa --> ao((A))

The segment between the A head/tail nodes represents the apply-mode task graph, while the segment between the T (+ L) nodes represents the train-mode task graph.

Proceeding to the actual implementation, we simply extend the flow.Operator class and provide the .compose() method:

from forml import flow

class StatefulMapper(flow.Operator):
    """Generic stateful mapper operator."""

    def __init__(self, actor_builder: flow.Builder):
        assert actor_builder.actor.is_stateful(), 'Stateful expected'
        self._actor_builder = actor_builder

    def compose(self, scope: flow.Composable) -> flow.Trunk:
        preceding: flow.Trunk = scope.expand()
        mapper_trainmode_train = flow.Worker(self._actor_builder, 1, 1)
        mapper_trainmode_apply = mapper_trainmode_train.fork()
        mapper_applymode_apply = mapper_trainmode_train.fork()
        mapper_trainmode_train.train(preceding.train.publisher, preceding.label.publisher)
        return preceding.extend(mapper_applymode_apply, mapper_trainmode_apply)

We can see the three workers (forked from the common instance to make them part of the same worker group) attached to the relevant segments of the preceding trunk. Note the operator is truly generic as the actual actor implementing the particular mapping function is provided as a parameter.

Operator Composition

Given the mean_impute example actor implemented earlier, we can now create two imputation operators and use them to compose a simple workflow using the >> syntax:

impute_foo = StatefulMapper(MeanImpute.builder(column='foo'))
impute_bar = StatefulMapper(MeanImpute.builder(column='bar'))

pipeline = impute_foo >> impute_bar

That would render the following task graphs:

flowchart TD
    subgraph Foo Worker Group
        tft -. state .-> tfa & afa
    subgraph Bar Worker Group
        tbt -. state .-> tba & aba
    subgraph Trunk Heads
        ti((T)) --> tft & tfa
        li((L)) -- L --> tft
        ai((A)) --> afa
    tfa --> tbt & tba
    li --> tbt
    afa --> aba
    subgraph Trunk Tails
        tba --> to((T))
        li -- L --> lo((L))
        aba --> ao((A))

Composition is the operation described using the ML workflow expressions based on the individual operators, which allows for shaping the entire task graph in a fully flexible manner.

As shown, the pipeline composition expressions are using the >> syntax to compose two operators together. This can be chained further down by engaging multiple operators.

The .compose() method of each operator is receiving the composition scope - the upstream (left) side of the expression - in an unexpanded form allowing the .compose() implementation to expand it (by calling the scope.expand()) itself as many times as needed.

The expansion process triggers the chained .compose() calls of the upstream operators all the way up to the origin of the given composition scope. Explicit scoping can be defined using intuitive parenthetical notation. That makes this operation non-associative - e.g. the expansion scope of operator C composition in expression A >> B >> C is the whole A >> B, while in expression A >> (B >> C) it is just the B operator.

Further practical details of the composition concept are demonstrated in the workflow case study.

Wrapped Operators

Instead of implementing the entire flow.Operator base class, operators can in special cases be defined using the wrappers provided within the pipeline library.

This approach is applicable to basic ML entities based on individual actors like transformers or estimators.

Simple Decorated Operators

Custom actors can be turned into operators easily by wrapping particular actors within the provided wrap.Operator.* decorators from the pipeline library:

Stateless mapper operator example
 def DropColumn(
     features: pandas.DataFrame, *, column: str
 ) -> pandas.DataFrame:
     return df.drop(columns=column)

 PIPELINE = AnotherOperator() >> DropColumn(column='foo')

For a complete reference of the decorated operators including further examples see the wrap.Operator class documentation.

Auto-Wrapped Operators

Another option for defining particular operators is reusing third-party implementations that are providing the desired functionality. We have already shown how these entities can be easily mapped into ForML actors. It can, however, be even easier to transparently auto-wrap them directly into ForML operators right upon importing. This can be achieved using the wrap.importer context manager:

Auto-wrapping imported 3rd party entities as ForML operators
 with wrap.importer():
     from sklearn.ensemble import GradientBoostingClassifier

 # This is now actually ForML operator wrapping the Sklearn classifier
 GBC = GradientBoostingClassifier(n_estimators=30, max_depth=10)

What and how gets actually wrapped upon importing is controlled by the set of special auto-wrappers instances passed to the wrap.importer context manager, which defaults to a content of the wrap.AUTO list. Additional custom auto-wrappers can be implemented by extending the wrap.Auto base class.