Flow Topology¶
ForML has custom primitives for the logical representation of the task graph, which also provide the API for its assembly during the construction phase.
Note
Thanks to this runtime-agnostic internal representation of the task graph, ForML can support a number of different third-party runners simply by converting the DAG on demand from its internal structure to the particular representation of the target runtime.
Task Graph Primitives¶
While the actual unit of work - the vertex in the runtime DAG - is a task provided as the
Actor implementation, its logical representation used by ForML internally is the
abstract flow.Node
structure and its subtype flow.Worker
in particular:
- class forml.flow.Worker(builder: flow.Builder, /, szin: int, szout: int)[source]¶
- class forml.flow.Worker(group: flow.Worker.Group, /, szin: int, szout: int)
Main primitive node type.
- Parameters:
- builder: flow.Builder¶
Actor builder instance.
- group: flow.Worker.Group
Worker group container.
- szin: int¶
Number of input Apply ports.
- szout: int¶
Number of output Apply ports.
Creating a Worker¶
The Worker node gets created simply by providing a flow.Builder
and
the required number of the input and output (apply) ports:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
This gives us the following disconnected workers:
flowchart LR
sfbi((i)) --> sfb([select_foobar]) --> sfbo((o))
szi((i)) --> sz([select_baz]) --> szo((o))
ci1((i1)) & ci2((i2)) --> c([concat]) --> co((o))
dbi((i)) --> db([drop_bar]) --> dbo((o))
sri((i)) --> sb([select_bar]) --> sro((o))
ibai((i)) --> iba(["impute_baz.apply()"]) --> ibao((o))
Note
All the actors we chose in this example work with Pandas payload - by no means this is some official format required by ForML. ForML does not care about the particular payload and the choice of mutually compatible actors is an exclusive responsibility of the implementer.
Connecting Nodes¶
Let’s now create the actual dependency of the individual tasks by connecting the worker (apply) ports:
concat[0].subscribe(select_foobar[0])
concat[1].subscribe(select_baz[0])
drop_bar[0].subscribe(concat[0])
select_bar[0].subscribe(concat[0])
impute_baz_apply[0].subscribe(drop_bar[0])
The node[port_index]
getitem syntax on a flow.Node
instance returns a flow.PubSub
object for the particular Apply port on the input or output side
(determined by context) of that node. This can be used to publish or subscribe to another such
object.
Caution
Any input port can be subscribed to at most one upstream output port but any output port can be publishing to multiple subscribed input ports. Any actor cannot subscribe to itself.
- class forml.flow.PubSub(node: flow.Node, index: int)[source]¶
Input or output Apply port reference that can be used for both subscribing and publishing.
- subscribe(publisher: flow.Publishable) None ¶
Subscribe to the given publisher.
- Parameters:
- publisher: flow.Publishable¶
Publishable to subscribe to.
- class forml.flow.Publishable(node: flow.Node, index: int)[source]¶
Output Apply port reference that can be used just for publishing.
Now, with the connections between our nodes, the topology looks shown:
flowchart LR
sfbi((i)) --> sfb([select_foobar]) -- "0-0" --> c([concat])
sbi((i)) --> sz([select_baz]) -- "0-1" --> c
c -- "0-0" --> db([drop_bar]) -- "0-0" --> iba(["impute_baz.apply()"]) --> ibao((o))
c -- "0-0" --> sb([select_bar]) --> sro((o))
Dealing with Worker State¶
So far we have discussed only the apply-mode connections. For stateful nodes (i.e. nodes
representing stateful actors), we also need to take care of the train-mode
connections to their Train and Label ports. This is achieved simply by using the .train()
method on the worker object:
- Worker.train(train: flow.Publishable, label: flow.Publishable) None [source]¶
Subscribe this node Train and Label ports to the given publishers.
- Parameters:
- train: flow.Publishable¶
Train port publisher.
- label: flow.Publishable¶
Label port publisher.
Training and applying even the same worker are two distinct tasks, hence they need to be
represented using two related but separate worker nodes. ForML transparently manages these
related workers using a flow.Worker.Group
instance. All workers in the same group have the
same shape and share the same actor builder
instance.
- class forml.flow.Worker.Group(builder: flow.Builder)¶
A container for holding all forked workers.
- Parameters:
- builder: flow.Builder¶
Actor builder instance.
Based on the group membership (and the general context), ForML automatically handles the runtime state management between the different modes of the same actor (the State ports are system-level ports and cannot be connected from the user-level API).
Workers of the same group can be created using one of the two methods:
- Worker.fork() flow.Worker [source]¶
Create new a node belonging to the same group (having the same shape and actor as self) but without any subscriptions.
- Returns:
Forked node.
- classmethod Worker.fgen(builder: flow.Builder, szin: int, szout: int) Iterator[flow.Worker] [source]¶
Generator producing forks of the same node belonging to the same group.
impute_baz_train = impute_baz_apply.fork()
impute_baz_train.train(drop_bar[0], select_bar[0])
Now we have one more worker node impute_baz_train
logically grouped as a companion of the
original impute_baz_apply
. The task graph now looks like this:
flowchart LR
subgraph Group
iba(["impute_baz.apply()"])
ibt["impute_baz.train()"]
end
sfbi((i)) --> sfb([select_foobar]) -- "0-0" --> c([concat])
sbi((i)) --> sz([select_baz]) -- "0-1" --> c
c -- "0-0" --> db([drop_bar]) -- "0-0" --> iba --> ibao((o))
c -- "0-0" --> sb([select_bar]) -- "0-L" --> ibt
db -- "0-T" --> ibt
Caution
Worker groups and the trained workers impose a couple of additional constraints:
At most one worker in the same group can be trained.
Either both Train and Label or all Apply input and output ports of each worker must be connected.
Future Nodes¶
In addition to the Worker nodes, there is a special node implementation called flow.Future
representing a future worker placeholder. Future can be used during topology construction when
the real connected worker-to-be is not known yet (e.g. when implementing an
operator which does not know what up/downstream workers will it eventually be
composed with). When connected to a real worker, the Future node will automatically collapse and
disappear from the topology.
-
class forml.flow.Future(szin: int =
1
, szout: int =1
)[source]¶ Fake transparent Apply port node that can be used as a lazy publisher/subscriber that disappears from the chain once it gets connected to another apply node(s).
The following example demonstrates the effect when using the Future nodes:
1 2 3 4 5 6 7 8 9 |
|
flowchart LR
w1([worker1]) --> f1((future1))
f2((future2)) --> w2([worker2])
As the diagram shows, we have worker1
node connecting its first apply output port to the
future1
Future node and another future2
connected to the input port of worker2
. Now,
after subscribing the future2
node to the future1
output, you can see how both the
Future nodes disappear from the topology and the workers become connected directly:
future2[0].subscribe(future1[0])
flowchart LR
w1([worker1]) --> w2([worker2])
Warning
Flow containing Future nodes is considered incomplete and cannot be passed for execution until all Future nodes are collapsed.
Logical Structures¶
When implementing more complex topologies (typically in the scope of operator development), the significant parts of the task graph become its entry and exit nodes (as that’s where new connections are being added), while the inner nodes (already fully connected) fade away from the perspective of the ongoing construction.
For this purpose, ForML uses the flow.Segment
abstraction representing a subgraph between one
entry (.head
) and exit (.tail
) node and providing a useful API to work with this part of
the task graph:
-
class forml.flow.Segment(head: flow.Node, tail: flow.Node | None =
None
)[source]¶ Representing an acyclic (sub)graph between two apply-mode nodes.
Each of the two boundary nodes must be externally facing with just a single port (
.head
node having a single input port and.tail
node having a single output port).The
tail
node (if provided) must be reachable from thehead
node via the existing connections.- Parameters:
- property publisher : flow.Publishable¶
Publishable tail node representation.
- Returns:
Publishable tail Apply port reference.
- subscribe(publisher: flow.Publishable | flow.Segment) None [source]¶
Subscribe our head node to the given publisher.
- Parameters:
- publisher: flow.Publishable | flow.Segment¶
Another segment or a general publisher to subscribe to.
-
extend(right: flow.Segment | flow.Node | None =
None
, tail: flow.Node | None =None
) flow.Segment [source]¶ Create a new segment by appending the right head to our tail or retracing this segment up to its physical or explicit tail.
- copy() flow.Segment [source]¶
Make a copy of the apply-mode topology within this segment (all trained nodes are ignored).
Copied nodes remain members of the same worker groups.
- Returns:
Copy of the Apply segment topology.
Caution
Note the .head
node must have exactly one input port and the .tail
node must have
exactly one output port.
Segment Coherence¶
To carry one of the core ForML traits - the inseparability of the train and apply-mode
implementations - ForML uses the flow.Trunk
structure as the integrated representation of the
related segments. There are actually three segments that need to be bound together to cover all
the matching tasks across the different modes. While the apply-mode is represented by its
single segment, the train-mode needs in addition to its train segment also a dedicated
segment for its label task graph.
-
class forml.flow.Trunk(apply: flow.Segment | flow.Node | None =
None
, train: flow.Segment | flow.Node | None =None
, label: flow.Segment | flow.Node | None =None
)[source]¶ Structure for integrating the three related segments representing both the runtime modes.
-
extend(apply: flow.Segment | flow.Node | None =
None
, train: flow.Segment | flow.Node | None =None
, label: flow.Segment | flow.Node | None =None
) flow.Trunk [source]¶ Helper for creating new Trunk with the specified segments extended by the provided instances.
- Parameters:
- apply: flow.Segment | flow.Node | None =
None
¶ Optional segment to extend our existing apply segment with.
- train: flow.Segment | flow.Node | None =
None
¶ Optional segment to extend our existing train segment with.
- label: flow.Segment | flow.Node | None =
None
¶ Optional segment to extend our existing label segment with.
- apply: flow.Segment | flow.Node | None =
- Returns:
New Trunk instance.
-
use(apply: flow.Segment | flow.Node | None =
None
, train: flow.Segment | flow.Node | None =None
, label: flow.Segment | flow.Node | None =None
) flow.Trunk [source]¶ Helper for creating new Trunk with the specified segments replaced by the provided values.
- Parameters:
- apply: flow.Segment | flow.Node | None =
None
¶ Optional segment to replace our existing apply segment with.
- train: flow.Segment | flow.Node | None =
None
¶ Optional segment to replace our existing train segment with.
- label: flow.Segment | flow.Node | None =
None
¶ Optional segment to replace our existing label segment with.
- apply: flow.Segment | flow.Node | None =
- Returns:
New Trunk instance.
-
extend(apply: flow.Segment | flow.Node | None =
Flow Compiler¶
While representing the task graph using linked structures is practical for implementing the user-level API, a more efficient structure for its actual runtime execution is the (actor) adjacency matrix produced by the internal flow compiler.
ForML uses its compiler to:
Augment the task graph by adding any necessary system-level nodes (e.g. to automatically manage the persistence of any stateful actors) which might be any of the following:
System Node
Purpose
Loader
Provides a previously persisted state.
Dumper
Persists a new state of the given actor.
Committer
Collates all the related states as a new generation.
Getter
Physically extracts a specific output port out of a Python function return value.
Optimizing the task graph by removing any irrelevant or redundant parts.
Generating a portable set of instructions suitable for runtime execution.
See our existing topology enhanced by the compiler with adding the state Dumper and Loader system nodes plus connecting the relevant State ports (dotted lines):
flowchart TD
subgraph Group
iba(["impute_baz.apply()"])
ibt["impute_baz.train()"]
end
sfbi((i)) --> sfb([select_foobar]) -- "0-0" --> c([concat])
sbi((i)) --> sz([select_baz]) -- "0-1" --> c
c -- "0-0" --> db([drop_bar]) -- "0-0" --> iba --> ibao((o))
c -- "0-0" --> sb([select_bar]) -- "0-L" --> ibt
db -- "0-T" --> ibt
ibt -. state .-> iba
l[(loader)] -. state .-> ibt -. state .-> d[(dumper)]
Following is a brief description of the compiler API:
-
forml.flow.compile(segment: flow.Segment, assets: asset.State | None =
None
) Collection[flow.Symbol] [source]¶ Generate the portable low-level runtime symbol table representing the given flow topology segment augmented with all the necessary system instructions.
- Parameters:
- segment: flow.Segment¶
Flow topology segment to generate the symbol table for.
- assets: asset.State | None =
None
¶ Runtime state asset accessors for all the involved persistent workers.
- Returns:
The portable runtime symbol table.
-
class forml.flow.Symbol(instruction: flow.Instruction, arguments: Sequence[flow.Instruction] | None =
None
)[source]¶ The main unit of the compiled low-level runtime code.
It represents the executable instruction and its dependency on other instructions in the task graph.
- Parameters:
- instruction: flow.Instruction¶
The executable instruction.
- arguments: Sequence[flow.Instruction] | None =
None
¶ The sequence of instructions whose output constitutes parameters to this symbol’s instruction.