Flow Topology

ForML has custom primitives for the logical representation of the task graph, which also provide the API for its assembly during the construction phase.

Note

Thanks to this runtime-agnostic internal representation of the task graph, ForML can support a number of different third-party runners simply by converting the DAG on demand from its internal structure to the particular representation of the target runtime.

Task Graph Primitives

While the actual unit of work - the vertex in the runtime DAG - is a task provided as the Actor implementation, its logical representation used by ForML internally is the abstract flow.Node structure and its subtype flow.Worker in particular:

class forml.flow.Node(szin: int, szout: int)[source]

Abstract primitive task graph node.

Parameters:
szin: int

Number of input Apply ports.

szout: int

Number of output Apply ports.

class forml.flow.Worker(builder: flow.Builder, /, szin: int, szout: int)[source]
class forml.flow.Worker(group: flow.Worker.Group, /, szin: int, szout: int)

Main primitive node type.

Parameters:
builder: flow.Builder

Actor builder instance.

group: flow.Worker.Group

Worker group container.

szin: int

Number of input Apply ports.

szout: int

Number of output Apply ports.

Creating a Worker

The Worker node gets created simply by providing a flow.Builder and the required number of the input and output (apply) ports:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from forml import flow
from forml.pipeline import payload  # let's use some existing actors

# one input, one output and the PandasSelect stateless actor builder
select_foobar = flow.Worker(
    payload.PandasSelect.builder(columns=['foo', 'bar']),
    szin=1,
    szout=1,
)
select_bar = flow.Worker(payload.PandasSelect.builder(columns=['bar']), szin=1, szout=1)
select_baz = flow.Worker(payload.PandasSelect.builder(columns=['baz']), szin=1, szout=1)

# one input, one output and the PandasDrop stateless actor builder
drop_bar = flow.Worker(payload.PandasDrop.builder(columns=['bar']), szin=1, szout=1)

# two inputs, one output and the PandasConcat stateless actor builder
concat = flow.Worker(payload.PandasConcat.builder(axis='columns'), szin=2, szout=1)

# one input, one output and the mean_impute stateful actor builder (from the previous chapter)
impute_baz_apply = flow.Worker(mean_impute.builder(column='foo'), szin=1, szout=1)

This gives us the following disconnected workers:

flowchart LR
    sfbi((i)) --> sfb([select_foobar]) --> sfbo((o))
    szi((i)) --> sz([select_baz]) --> szo((o))
    ci1((i1)) & ci2((i2)) --> c([concat]) --> co((o))
    dbi((i)) --> db([drop_bar]) --> dbo((o))
    sri((i)) --> sb([select_bar]) --> sro((o))
    ibai((i)) --> iba(["impute_baz.apply()"]) --> ibao((o))

Note

All the actors we chose in this example work with Pandas payload - by no means this is some official format required by ForML. ForML does not care about the particular payload and the choice of mutually compatible actors is an exclusive responsibility of the implementer.

Connecting Nodes

Let’s now create the actual dependency of the individual tasks by connecting the worker (apply) ports:

concat[0].subscribe(select_foobar[0])
concat[1].subscribe(select_baz[0])
drop_bar[0].subscribe(concat[0])
select_bar[0].subscribe(concat[0])
impute_baz_apply[0].subscribe(drop_bar[0])

The node[port_index] getitem syntax on a flow.Node instance returns a flow.PubSub object for the particular Apply port on the input or output side (determined by context) of that node. This can be used to publish or subscribe to another such object.

Caution

Any input port can be subscribed to at most one upstream output port but any output port can be publishing to multiple subscribed input ports. Any actor cannot subscribe to itself.

class forml.flow.PubSub(node: flow.Node, index: int)[source]

Input or output Apply port reference that can be used for both subscribing and publishing.

subscribe(publisher: flow.Publishable) None

Subscribe to the given publisher.

Parameters:
publisher: flow.Publishable

Publishable to subscribe to.

class forml.flow.Publishable(node: flow.Node, index: int)[source]

Output Apply port reference that can be used just for publishing.

Now, with the connections between our nodes, the topology looks shown:

flowchart LR
    sfbi((i)) --> sfb([select_foobar]) -- "0-0" --> c([concat])
    sbi((i)) --> sz([select_baz]) -- "0-1" --> c
    c -- "0-0" --> db([drop_bar]) -- "0-0" --> iba(["impute_baz.apply()"]) --> ibao((o))
    c -- "0-0" --> sb([select_bar]) --> sro((o))

Dealing with Worker State

So far we have discussed only the apply-mode connections. For stateful nodes (i.e. nodes representing stateful actors), we also need to take care of the train-mode connections to their Train and Label ports. This is achieved simply by using the .train() method on the worker object:

Worker.train(train: flow.Publishable, label: flow.Publishable) None[source]

Subscribe this node Train and Label ports to the given publishers.

Parameters:
train: flow.Publishable

Train port publisher.

label: flow.Publishable

Label port publisher.

Training and applying even the same worker are two distinct tasks, hence they need to be represented using two related but separate worker nodes. ForML transparently manages these related workers using a flow.Worker.Group instance. All workers in the same group have the same shape and share the same actor builder instance.

class forml.flow.Worker.Group(builder: flow.Builder)

A container for holding all forked workers.

Parameters:
builder: flow.Builder

Actor builder instance.

Based on the group membership (and the general context), ForML automatically handles the runtime state management between the different modes of the same actor (the State ports are system-level ports and cannot be connected from the user-level API).

Workers of the same group can be created using one of the two methods:

Worker.fork() flow.Worker[source]

Create new a node belonging to the same group (having the same shape and actor as self) but without any subscriptions.

Returns:

Forked node.

classmethod Worker.fgen(builder: flow.Builder, szin: int, szout: int) Iterator[flow.Worker][source]

Generator producing forks of the same node belonging to the same group.

Parameters:
builder: flow.Builder

Actor builder.

szin: int

Worker input apply port size.

szout: int

Worker output apply port size.

Returns:

Generator producing same-group worker forks.

impute_baz_train = impute_baz_apply.fork()
impute_baz_train.train(drop_bar[0], select_bar[0])

Now we have one more worker node impute_baz_train logically grouped as a companion of the original impute_baz_apply. The task graph now looks like this:

flowchart LR
    subgraph Group
        iba(["impute_baz.apply()"])
        ibt["impute_baz.train()"]
    end
    sfbi((i)) --> sfb([select_foobar]) -- "0-0" --> c([concat])
    sbi((i)) --> sz([select_baz]) -- "0-1" --> c
    c -- "0-0" --> db([drop_bar]) -- "0-0" --> iba --> ibao((o))
    c -- "0-0" --> sb([select_bar]) -- "0-L" --> ibt
    db -- "0-T" --> ibt

Caution

Worker groups and the trained workers impose a couple of additional constraints:

  • At most one worker in the same group can be trained.

  • Either both Train and Label or all Apply input and output ports of each worker must be connected.

Future Nodes

In addition to the Worker nodes, there is a special node implementation called flow.Future representing a future worker placeholder. Future can be used during topology construction when the real connected worker-to-be is not known yet (e.g. when implementing an operator which does not know what up/downstream workers will it eventually be composed with). When connected to a real worker, the Future node will automatically collapse and disappear from the topology.

class forml.flow.Future(szin: int = 1, szout: int = 1)[source]

Fake transparent Apply port node that can be used as a lazy publisher/subscriber that disappears from the chain once it gets connected to another apply node(s).

The following example demonstrates the effect when using the Future nodes:

1
2
3
4
5
6
7
8
9
from forml import flow

worker1 = flow.Worker(SomeActor.builder(), szin=1, szout=1)
worker2 = flow.Worker(AnotherActor.builder(), szin=1, szout=1)
future1 = flow.Future()  # defaults to szin=1, szout=1 (other shapes also possible)
future2 = flow.Future()

future1[0].subscribe(worker1[0])
worker2[0].subscribe(future2[0])
flowchart LR
    w1([worker1]) --> f1((future1))
    f2((future2)) --> w2([worker2])

As the diagram shows, we have worker1 node connecting its first apply output port to the future1 Future node and another future2 connected to the input port of worker2. Now, after subscribing the future2 node to the future1 output, you can see how both the Future nodes disappear from the topology and the workers become connected directly:

future2[0].subscribe(future1[0])
flowchart LR
    w1([worker1]) --> w2([worker2])

Warning

Flow containing Future nodes is considered incomplete and cannot be passed for execution until all Future nodes are collapsed.

Logical Structures

When implementing more complex topologies (typically in the scope of operator development), the significant parts of the task graph become its entry and exit nodes (as that’s where new connections are being added), while the inner nodes (already fully connected) fade away from the perspective of the ongoing construction.

For this purpose, ForML uses the flow.Segment abstraction representing a subgraph between one entry (.head) and exit (.tail) node and providing a useful API to work with this part of the task graph:

class forml.flow.Segment(head: flow.Node, tail: flow.Node | None = None)[source]

Representing an acyclic (sub)graph between two apply-mode nodes.

Each of the two boundary nodes must be externally facing with just a single port (.head node having a single input port and .tail node having a single output port).

The tail node (if provided) must be reachable from the head node via the existing connections.

Parameters:
head: flow.Node

The first (input) node in this segment.

tail: flow.Node | None = None

The last (output) node in this segment (auto-traced if not provided).

property publisher : flow.Publishable

Publishable tail node representation.

Returns:

Publishable tail Apply port reference.

subscribe(publisher: flow.Publishable | flow.Segment) None[source]

Subscribe our head node to the given publisher.

Parameters:
publisher: flow.Publishable | flow.Segment

Another segment or a general publisher to subscribe to.

extend(right: flow.Segment | flow.Node | None = None, tail: flow.Node | None = None) flow.Segment[source]

Create a new segment by appending the right head to our tail or retracing this segment up to its physical or explicit tail.

Parameters:
right: flow.Segment | flow.Node | None = None

An optional segment to extend with (retracing to the physical or explicit tail if not provided).

tail: flow.Node | None = None

An optional tail as a segment exit node.

Returns:

New extended segment.

copy() flow.Segment[source]

Make a copy of the apply-mode topology within this segment (all trained nodes are ignored).

Copied nodes remain members of the same worker groups.

Returns:

Copy of the Apply segment topology.

Caution

Note the .head node must have exactly one input port and the .tail node must have exactly one output port.

Segment Coherence

To carry one of the core ForML traits - the inseparability of the train and apply-mode implementations - ForML uses the flow.Trunk structure as the integrated representation of the related segments. There are actually three segments that need to be bound together to cover all the matching tasks across the different modes. While the apply-mode is represented by its single segment, the train-mode needs in addition to its train segment also a dedicated segment for its label task graph.

class forml.flow.Trunk(apply: flow.Segment | flow.Node | None = None, train: flow.Segment | flow.Node | None = None, label: flow.Segment | flow.Node | None = None)[source]

Structure for integrating the three related segments representing both the runtime modes.

extend(apply: flow.Segment | flow.Node | None = None, train: flow.Segment | flow.Node | None = None, label: flow.Segment | flow.Node | None = None) flow.Trunk[source]

Helper for creating new Trunk with the specified segments extended by the provided instances.

Parameters:
apply: flow.Segment | flow.Node | None = None

Optional segment to extend our existing apply segment with.

train: flow.Segment | flow.Node | None = None

Optional segment to extend our existing train segment with.

label: flow.Segment | flow.Node | None = None

Optional segment to extend our existing label segment with.

Returns:

New Trunk instance.

use(apply: flow.Segment | flow.Node | None = None, train: flow.Segment | flow.Node | None = None, label: flow.Segment | flow.Node | None = None) flow.Trunk[source]

Helper for creating new Trunk with the specified segments replaced by the provided values.

Parameters:
apply: flow.Segment | flow.Node | None = None

Optional segment to replace our existing apply segment with.

train: flow.Segment | flow.Node | None = None

Optional segment to replace our existing train segment with.

label: flow.Segment | flow.Node | None = None

Optional segment to replace our existing label segment with.

Returns:

New Trunk instance.

Flow Compiler

While representing the task graph using linked structures is practical for implementing the user-level API, a more efficient structure for its actual runtime execution is the (actor) adjacency matrix produced by the internal flow compiler.

ForML uses its compiler to:

  1. Augment the task graph by adding any necessary system-level nodes (e.g. to automatically manage the persistence of any stateful actors) which might be any of the following:

    System Node

    Purpose

    Loader

    Provides a previously persisted state.

    Dumper

    Persists a new state of the given actor.

    Committer

    Collates all the related states as a new generation.

    Getter

    Physically extracts a specific output port out of a Python function return value.

  2. Optimizing the task graph by removing any irrelevant or redundant parts.

  3. Generating a portable set of instructions suitable for runtime execution.

See our existing topology enhanced by the compiler with adding the state Dumper and Loader system nodes plus connecting the relevant State ports (dotted lines):

flowchart TD
    subgraph Group
        iba(["impute_baz.apply()"])
        ibt["impute_baz.train()"]
    end
    sfbi((i)) --> sfb([select_foobar]) -- "0-0" --> c([concat])
    sbi((i)) --> sz([select_baz]) -- "0-1" --> c
    c -- "0-0" --> db([drop_bar]) -- "0-0" --> iba --> ibao((o))
    c -- "0-0" --> sb([select_bar]) -- "0-L" --> ibt
    db -- "0-T" --> ibt
    ibt -. state .-> iba
    l[(loader)] -. state .-> ibt -. state .-> d[(dumper)]

Following is a brief description of the compiler API:

forml.flow.compile(segment: flow.Segment, assets: asset.State | None = None) Collection[flow.Symbol][source]

Generate the portable low-level runtime symbol table representing the given flow topology segment augmented with all the necessary system instructions.

Parameters:
segment: flow.Segment

Flow topology segment to generate the symbol table for.

assets: asset.State | None = None

Runtime state asset accessors for all the involved persistent workers.

Returns:

The portable runtime symbol table.

class forml.flow.Symbol(instruction: flow.Instruction, arguments: Sequence[flow.Instruction] | None = None)[source]

The main unit of the compiled low-level runtime code.

It represents the executable instruction and its dependency on other instructions in the task graph.

Parameters:
instruction: flow.Instruction

The executable instruction.

arguments: Sequence[flow.Instruction] | None = None

The sequence of instructions whose output constitutes parameters to this symbol’s instruction.

class forml.flow.Instruction[source]

Executable part of the compiled symbol responsible for performing the processing activity.

abstract execute(*args: Any) Any[source]

Actual instruction functionality.

Parameters:
*args: Any

A sequence of input arguments.

Returns:

Instruction result.